diff options
author | Kegsay <kegan@matrix.org> | 2020-02-20 09:28:03 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-20 09:28:03 +0000 |
commit | 5caae6f3a0fc56f5ae654dcc2a65822885cda0a0 (patch) | |
tree | c600120687b9448376427895e62966ada452f19a /syncapi/storage | |
parent | 3dabf4d4ed52459a4661db32c983018c2c73a431 (diff) |
sqlite: fixes from sytest (#872)
* bugfix: fix panic on new invite events from sytest
I'm unsure why the previous code didn't work, but it's
clearer, quicker and easier to read the `LastInsertID()` way.
Previously, the code would panic as the SELECT would fail
to find the last inserted row ID.
* sqlite: Fix UNIQUE violations and close more cursors
- Add missing `defer rows.Close()`
- Do not have the state block NID as a PRIMARY KEY else it breaks for blocks
with >1 state event in them. Instead, rejig the queries so we can still
have monotonically increasing integers without using AUTOINCREMENT (which
mandates PRIMARY KEY).
* sqlite: Add missing variadic function
* Use LastInsertId because empirically it works over the SELECT form (though I don't know why that is)
* sqlite: Fix invite table by using the global stream pos rather than one specific to invites
If we don't use the global, clients don't get notified about any invites
because the position is too low.
* linting: shadowing
* sqlite: do not use last rowid, we already know the stream pos!
* sqlite: Fix account data table in syncapi by commiting insert txns!
* sqlite: Fix failing federation invite
Was failing with 'database is locked' due to multiple write txns
being taken out.
* sqlite: Ensure we return exactly the number of events found in the database
Previously we would return exactly the number of *requested* events, which
meant that several zero-initialised events would bubble through the system,
failing at JSON serialisation time.
* sqlite: let's just ignore the problem for now....
* linting
Diffstat (limited to 'syncapi/storage')
-rw-r--r-- | syncapi/storage/sqlite3/account_data_table.go | 51 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/current_room_state_table.go | 13 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/invites_table.go | 34 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/output_room_events_table.go | 12 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 55 |
5 files changed, 77 insertions, 88 deletions
diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 3274e66e..71105d0c 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -19,15 +19,13 @@ import ( "context" "database/sql" - "github.com/lib/pq" - "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) const accountDataSchema = ` CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id INTEGER PRIMARY KEY, user_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, @@ -43,9 +41,7 @@ const insertAccountDataSQL = "" + const selectAccountDataInRangeSQL = "" + "SELECT room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + - " AND ( $4 IS NULL OR type IN ($4) )" + - " AND ( $5 IS NULL OR NOT(type IN ($5)) )" + - " ORDER BY id ASC LIMIT $6" + " ORDER BY id ASC" const selectMaxAccountDataIDSQL = "" + "SELECT MAX(id) FROM syncapi_account_data_type" @@ -53,8 +49,8 @@ const selectMaxAccountDataIDSQL = "" + type accountDataStatements struct { streamIDStatements *streamIDStatements insertAccountDataStmt *sql.Stmt - selectAccountDataInRangeStmt *sql.Stmt selectMaxAccountDataIDStmt *sql.Stmt + selectAccountDataInRangeStmt *sql.Stmt } func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { @@ -66,10 +62,10 @@ func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { return } - if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { + if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { return } - if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { + if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { return } return @@ -83,8 +79,7 @@ func (s *accountDataStatements) insertAccountData( if err != nil { return } - insertStmt := common.TxStmt(txn, s.insertAccountDataStmt) - _, err = insertStmt.ExecContext(ctx, pos, userID, roomID, dataType) + _, err = txn.Stmt(s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType) return } @@ -103,14 +98,13 @@ func (s *accountDataStatements) selectAccountDataInRange( oldPos-- } - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, - pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.Types)), - pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.NotTypes)), - accountDataFilterPart.Limit, - ) + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos) if err != nil { return } + defer rows.Close() // nolint: errcheck + + var entries int for rows.Next() { var dataType string @@ -120,22 +114,41 @@ func (s *accountDataStatements) selectAccountDataInRange( return } + // check if we should add this by looking at the filter. + // It would be nice if we could do this in SQL-land, but the mix of variadic + // and positional parameters makes the query annoyingly hard to do, it's easier + // and clearer to do it in Go-land. If there are no filters for [not]types then + // this gets skipped. + for _, includeType := range accountDataFilterPart.Types { + if includeType != dataType { // TODO: wildcard support + continue + } + } + for _, excludeType := range accountDataFilterPart.NotTypes { + if excludeType == dataType { // TODO: wildcard support + continue + } + } + if len(data[roomID]) > 0 { data[roomID] = append(data[roomID], dataType) } else { data[roomID] = []string{dataType} } + entries++ + if entries >= accountDataFilterPart.Limit { + break + } } - return + return data, nil } func (s *accountDataStatements) selectMaxAccountDataID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 - stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt) - err = stmt.QueryRowContext(ctx).Scan(&nullableID) + err = txn.Stmt(s.selectMaxAccountDataIDStmt).QueryRowContext(ctx).Scan(&nullableID) if nullableID.Valid { id = nullableID.Int64 } diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 4ce94666..eb969c95 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -19,6 +19,7 @@ import ( "context" "database/sql" "encoding/json" + "strings" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -88,7 +89,6 @@ type currentRoomStateStatements struct { selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt - selectEventsWithEventIDsStmt *sql.Stmt selectStateEventStmt *sql.Stmt } @@ -113,9 +113,6 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB, streamID *streamIDState if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { return } - if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { - return - } if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { return } @@ -233,8 +230,12 @@ func (s *currentRoomStateStatements) upsertRoomState( func (s *currentRoomStateStatements) selectEventsWithEventIDs( ctx context.Context, txn *sql.Tx, eventIDs []string, ) ([]types.StreamEvent, error) { - stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) - rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) + iEventIDs := make([]interface{}, len(eventIDs)) + for k, v := range eventIDs { + iEventIDs[k] = v + } + query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", common.QueryVariadic(len(iEventIDs)), 1) + rows, err := txn.QueryContext(ctx, query, iEventIDs...) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 74dba245..baf8871b 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -26,7 +26,7 @@ import ( const inviteEventsSchema = ` CREATE TABLE IF NOT EXISTS syncapi_invite_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id INTEGER PRIMARY KEY, event_id TEXT NOT NULL, room_id TEXT NOT NULL, target_user_id TEXT NOT NULL, @@ -39,11 +39,8 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx ON syncapi_invite_events const insertInviteEventSQL = "" + "INSERT INTO syncapi_invite_events" + - " (room_id, event_id, target_user_id, event_json)" + - " VALUES ($1, $2, $3, $4)" - -const selectLastInsertedInviteEventSQL = "" + - "SELECT id FROM syncapi_invite_events WHERE rowid = last_insert_rowid()" + " (id, room_id, event_id, target_user_id, event_json)" + + " VALUES ($1, $2, $3, $4, $5)" const deleteInviteEventSQL = "" + "DELETE FROM syncapi_invite_events WHERE event_id = $1" @@ -57,12 +54,11 @@ const selectMaxInviteIDSQL = "" + "SELECT MAX(id) FROM syncapi_invite_events" type inviteEventsStatements struct { - streamIDStatements *streamIDStatements - insertInviteEventStmt *sql.Stmt - selectLastInsertedInviteEventStmt *sql.Stmt - selectInviteEventsInRangeStmt *sql.Stmt - deleteInviteEventStmt *sql.Stmt - selectMaxInviteIDStmt *sql.Stmt + streamIDStatements *streamIDStatements + insertInviteEventStmt *sql.Stmt + selectInviteEventsInRangeStmt *sql.Stmt + deleteInviteEventStmt *sql.Stmt + selectMaxInviteIDStmt *sql.Stmt } func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { @@ -74,9 +70,6 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { return } - if s.selectLastInsertedInviteEventStmt, err = db.Prepare(selectLastInsertedInviteEventSQL); err != nil { - return - } if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { return } @@ -90,19 +83,16 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement } func (s *inviteEventsStatements) insertInviteEvent( - ctx context.Context, inviteEvent gomatrixserverlib.Event, -) (streamPos types.StreamPosition, err error) { - _, err = s.insertInviteEventStmt.ExecContext( + ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.Event, streamPos types.StreamPosition, +) (err error) { + _, err = txn.Stmt(s.insertInviteEventStmt).ExecContext( ctx, + streamPos, inviteEvent.RoomID(), inviteEvent.EventID(), *inviteEvent.StateKey(), inviteEvent.JSON(), ) - if err != nil { - return - } - err = s.selectLastInsertedInviteEventStmt.QueryRowContext(ctx).Scan(&streamPos) return } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 8c01f2ce..4535688d 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -54,9 +54,6 @@ const insertEventSQL = "" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $11" -const selectLastInsertedEventSQL = "" + - "SELECT id FROM syncapi_output_room_events WHERE rowid = last_insert_rowid()" - const selectEventsSQL = "" + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1" @@ -105,7 +102,6 @@ const selectStateInRangeSQL = "" + type outputRoomEventsStatements struct { streamIDStatements *streamIDStatements insertEventStmt *sql.Stmt - selectLastInsertedEventStmt *sql.Stmt selectEventsStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt @@ -123,9 +119,6 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDState if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return } - if s.selectLastInsertedEventStmt, err = db.Prepare(selectLastInsertedEventSQL); err != nil { - return - } if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { return } @@ -270,7 +263,6 @@ func (s *outputRoomEventsStatements) insertEvent( } insertStmt := common.TxStmt(txn, s.insertEventStmt) - selectStmt := common.TxStmt(txn, s.selectLastInsertedEventStmt) _, err = insertStmt.ExecContext( ctx, streamPos, @@ -286,10 +278,6 @@ func (s *outputRoomEventsStatements) insertEvent( txnID, excludeFromSync, ) - if err != nil { - return - } - err = selectStmt.QueryRowContext(ctx).Scan(&streamPos) return } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 8cfc1884..6ad3419c 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -193,24 +193,20 @@ func (d *SyncServerDatasource) WriteEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { - fmt.Println("d.events.insertEvent:", err) return err } pduPosition = pos if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil { - fmt.Println("d.topology.insertEventInTopology:", err) return err } if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { - fmt.Println("d.handleBackwardExtremities:", err) return err } if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. - fmt.Println("nothing to do") return nil } @@ -340,8 +336,12 @@ func (d *SyncServerDatasource) GetEventsInRange( } // SyncPosition returns the latest positions for syncing. -func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.PaginationToken, error) { - return d.syncPositionTx(ctx, nil) +func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.PaginationToken, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + tok, err = d.syncPositionTx(ctx, txn) + return err + }) + return } // BackwardExtremitiesForRoom returns the event IDs of all of the backward @@ -380,8 +380,12 @@ func (d *SyncServerDatasource) EventPositionInTopology( } // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. -func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { - return d.syncStreamPositionTx(ctx, nil) +func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos types.StreamPosition, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + pos, err = d.syncStreamPositionTx(ctx, txn) + return err + }) + return } func (d *SyncServerDatasource) syncStreamPositionTx( @@ -625,18 +629,15 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( if err != nil { return } - fmt.Println("Joined rooms:", joinedRoomIDs) stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request // Build up a /sync response. Add joined rooms. for _, roomID := range joinedRoomIDs { - fmt.Println("WE'RE ON", roomID) var stateEvents []gomatrixserverlib.Event stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart) if err != nil { - fmt.Println("d.roomstate.selectCurrentState:", err) return } //fmt.Println("State events:", stateEvents) @@ -648,7 +649,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( numRecentEventsPerRoom, true, true, ) if err != nil { - fmt.Println("d.events.selectRecentEvents:", err) return } //fmt.Println("Recent stream events:", recentStreamEvents) @@ -658,10 +658,9 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( var backwardTopologyPos types.StreamPosition backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) if err != nil { - fmt.Println("d.topology.selectPositionInTopology:", err) return nil, types.PaginationToken{}, []string{}, err } - fmt.Println("Backward topology position:", backwardTopologyPos) + if backwardTopologyPos-1 <= 0 { backwardTopologyPos = types.StreamPosition(1) } else { @@ -683,7 +682,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( } if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil { - fmt.Println("d.addInvitesToResponse:", err) return } @@ -744,18 +742,10 @@ func (d *SyncServerDatasource) GetAccountDataInRange( func (d *SyncServerDatasource) UpsertAccountData( ctx context.Context, userID, roomID, dataType string, ) (sp types.StreamPosition, err error) { - txn, err := d.db.BeginTx(ctx, nil) - if err != nil { - return types.StreamPosition(0), err - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType) + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType) + return err + }) return } @@ -764,8 +754,15 @@ func (d *SyncServerDatasource) UpsertAccountData( // Returns an error if there was a problem communicating with the database. func (d *SyncServerDatasource) AddInviteEvent( ctx context.Context, inviteEvent gomatrixserverlib.Event, -) (types.StreamPosition, error) { - return d.invites.insertInviteEvent(ctx, inviteEvent) +) (streamPos types.StreamPosition, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + streamPos, err = d.streamID.nextStreamID(ctx, txn) + if err != nil { + return err + } + return d.invites.insertInviteEvent(ctx, txn, inviteEvent, streamPos) + }) + return } // RetireInviteEvent removes an old invite event from the database. |