diff options
Diffstat (limited to 'syncapi/storage/sqlite3/syncserver.go')
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 191 |
1 files changed, 42 insertions, 149 deletions
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index a2253dcd..0da05eab 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/url" - "time" "github.com/sirupsen/logrus" @@ -53,16 +52,13 @@ type stateDelta struct { // SyncServerDatasource represents a sync server datasource which manages // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { + shared.Database db *sql.DB common.PartitionOffsetStatements streamID streamIDStatements - accountData accountDataStatements - events outputRoomEventsStatements roomstate currentRoomStateStatements - eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities tables.BackwardsExtremities - shared *shared.Database } // NewSyncServerDatasource creates a new sync server database @@ -87,7 +83,6 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro if err = d.prepare(); err != nil { return nil, err } - d.eduCache = cache.New() return &d, nil } @@ -98,10 +93,12 @@ func (d *SyncServerDatasource) prepare() (err error) { if err = d.streamID.prepare(d.db); err != nil { return err } - if err = d.accountData.prepare(d.db, &d.streamID); err != nil { + accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID) + if err != nil { return err } - if err = d.events.prepare(d.db, &d.streamID); err != nil { + events, err := NewSqliteEventsTable(d.db, &d.streamID) + if err != nil { return err } if err = d.roomstate.prepare(d.db, &d.streamID); err != nil { @@ -118,9 +115,12 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } - d.shared = &shared.Database{ - DB: d.db, - Invites: invites, + d.Database = shared.Database{ + DB: d.db, + Invites: invites, + AccountData: accountData, + OutputEvents: events, + EDUCache: cache.New(), } return nil } @@ -130,22 +130,6 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s return d.roomstate.selectJoinedUsers(ctx) } -// Events lookups a list of event by their event ID. -// Returns a list of events matching the requested IDs found in the database. -// If an event is not found in the database then it will be omitted from the list. -// Returns an error if there was a problem talking with the database. -// Does not include any transaction IDs in the returned events. -func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { - streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) - if err != nil { - return nil, err - } - - // We don't include a device here as we only include transaction IDs in - // incremental syncs. - return d.StreamEventsToEvents(nil, streamEvents), nil -} - // handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. @@ -156,7 +140,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) + prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs()) if err != nil { return err } @@ -192,7 +176,7 @@ func (d *SyncServerDatasource) WriteEvent( ) (pduPosition types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent( + pos, err := d.Database.OutputEvents.InsertEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { @@ -253,6 +237,19 @@ func (d *SyncServerDatasource) updateRoomState( return nil } +// SyncPosition returns the latest positions for syncing. +func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + pos, err := d.syncPositionTx(ctx, txn) + if err != nil { + return err + } + tok = *pos + return nil + }) + return +} + // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // If no event could be found, returns nil // If there was an issue during the retrieval, returns an error @@ -309,46 +306,7 @@ func (d *SyncServerDatasource) GetEventsInTopologicalRange( } // Retrieve the events' contents using their IDs. - events, err = d.events.selectEvents(ctx, nil, eIDs) - return -} - -// GetEventsInStreamingRange retrieves all of the events on a given ordering using the -// given extremities and limit. -func (d *SyncServerDatasource) GetEventsInStreamingRange( - ctx context.Context, - from, to *types.StreamingToken, - roomID string, limit int, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - if backwardOrdering { - // When using backward ordering, we want the most recent events first. - if events, err = d.events.selectRecentEvents( - ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, - ); err != nil { - return - } - } else { - // When using forward ordering, we want the least recent events first. - if events, err = d.events.selectEarlyEvents( - ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, - ); err != nil { - return - } - } - return events, err -} - -// SyncPosition returns the latest positions for syncing. -func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - pos, err := d.syncPositionTx(ctx, txn) - if err != nil { - return err - } - tok = *pos - return nil - }) + events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) return } @@ -378,7 +336,7 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition( return nil, err } - return d.events.selectEvents(ctx, nil, eIDs) + return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) } func (d *SyncServerDatasource) EventPositionInTopology( @@ -399,18 +357,18 @@ func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos type func (d *SyncServerDatasource) syncStreamPositionTx( ctx context.Context, txn *sql.Tx, ) (types.StreamPosition, error) { - maxID, err := d.events.selectMaxEventID(ctx, txn) + maxID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) if err != nil { return 0, err } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) if err != nil { return 0, err } if maxAccountDataID > maxID { maxID = maxAccountDataID } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) + maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return 0, err } @@ -424,18 +382,18 @@ func (d *SyncServerDatasource) syncPositionTx( ctx context.Context, txn *sql.Tx, ) (*types.StreamingToken, error) { - maxEventID, err := d.events.selectMaxEventID(ctx, txn) + maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) if err != nil { return nil, err } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) if err != nil { return nil, err } if maxAccountDataID > maxEventID { maxEventID = maxAccountDataID } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) + maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return nil, err } @@ -444,7 +402,7 @@ func (d *SyncServerDatasource) syncPositionTx( } sp := types.NewStreamToken( types.StreamPosition(maxEventID), - types.StreamPosition(d.eduCache.GetLatestSyncPosition()), + types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition()), ) return &sp, nil } @@ -518,7 +476,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUPosition()), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -654,7 +612,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.events.selectRecentEvents( + recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), numRecentEventsPerRoom, true, true, ) @@ -729,78 +687,13 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -// GetAccountDataInRange returns all account data for a given user inserted or -// updated between two given positions -// Returns a map following the format data[roomID] = []dataTypes -// If no data is retrieved, returns an empty map -// If there was an issue with the retrieval, returns an error -func (d *SyncServerDatasource) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, - accountDataFilterPart *gomatrixserverlib.EventFilter, -) (map[string][]string, error) { - return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) -} - -// UpsertAccountData keeps track of new or updated account data, by saving the type -// of the new/updated data, and the user ID and room ID the data is related to (empty) -// room ID means the data isn't specific to any room) -// If no data with the given type, user ID and room ID exists in the database, -// creates a new row, else update the existing one -// Returns an error if there was an issue with the upsert -func (d *SyncServerDatasource) UpsertAccountData( - ctx context.Context, userID, roomID, dataType string, -) (sp types.StreamPosition, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType) - return err - }) - return -} - -// AddInviteEvent stores a new invite event for a user. -// If the invite was successfully stored this returns the stream ID it was stored at. -// Returns an error if there was a problem communicating with the database. -func (d *SyncServerDatasource) AddInviteEvent( - ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, -) (sp types.StreamPosition, err error) { - return d.shared.AddInviteEvent(ctx, inviteEvent) -} - -// RetireInviteEvent removes an old invite event from the database. -// Returns an error if there was a problem communicating with the database. -func (d *SyncServerDatasource) RetireInviteEvent( - ctx context.Context, inviteEventID string, -) error { - return d.shared.RetireInviteEvent(ctx, inviteEventID) -} - -func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.eduCache.SetTimeoutCallback(fn) -} - -// AddTypingUser adds a typing user to the typing cache. -// Returns the newly calculated sync position for typing notifications. -func (d *SyncServerDatasource) AddTypingUser( - userID, roomID string, expireTime *time.Time, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) -} - -// RemoveTypingUser removes a typing user from the typing cache. -// Returns the newly calculated sync position for typing notifications. -func (d *SyncServerDatasource) RemoveTypingUser( - userID, roomID string, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) -} - func (d *SyncServerDatasource) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, fromPos, toPos types.StreamPosition, res *types.Response, ) error { - invites, err := d.shared.Invites.SelectInviteEventsInRange( + invites, err := d.Database.Invites.SelectInviteEventsInRange( ctx, txn, userID, fromPos, toPos, ) if err != nil { @@ -853,7 +746,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. endPos = delta.membershipPos } - recentStreamEvents, err := d.events.selectRecentEvents( + recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents( ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), numRecentEventsPerRoom, true, true, ) @@ -943,7 +836,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( ) ([]types.StreamEvent, error) { // Fetch from the events table first so we pick up the stream ID for the // event. - events, err := d.events.selectEvents(ctx, txn, eventIDs) + events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs) if err != nil { return nil, err } @@ -997,7 +890,7 @@ func (d *SyncServerDatasource) getStateDeltas( var deltas []stateDelta // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) if err != nil { return nil, nil, err } @@ -1083,7 +976,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) if err != nil { return nil, nil, err } |