diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-02-04 10:39:34 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-04 10:39:34 +0000 |
commit | eb352a5f6bdb48cb2d795e3fe2cd7d354580a761 (patch) | |
tree | deefb3239e44be8938dcd784cc2094274e1d30ef /roomserver/storage/shared/room_updater.go | |
parent | 4d9f5b2e5787d23e1dbcebfda1c6d99d3498ec7e (diff) |
Full roomserver input transactional isolation (#2141)
* Add transaction to all database tables in roomserver, rename latest events updater to room updater, use room updater for all RS input
* Better transaction management
* Tweak order
* Handle cases where the room does not exist
* Other fixes
* More tweaks
* Fill some gaps
* Fill in the gaps
* good lord it gets worse
* Don't roll back transactions when events rejected
* Pass through errors properly
* Fix bugs
* Fix incorrect error check
* Don't panic on nil txns
* Tweaks
* Hopefully fix panics for good in SQLite this time
* Fix rollback
* Minor bug fixes with latest event updater
* Some review comments
* Revert "Some review comments"
This reverts commit 0caf8cf53e62c33f7b83c52e9df1d963871f751e.
* Fix a couple of bugs
* Clearer commit and rollback results
* Remove unnecessary prepares
Diffstat (limited to 'roomserver/storage/shared/room_updater.go')
-rw-r--r-- | roomserver/storage/shared/room_updater.go | 262 |
1 files changed, 262 insertions, 0 deletions
diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go new file mode 100644 index 00000000..bb9f5dc6 --- /dev/null +++ b/roomserver/storage/shared/room_updater.go @@ -0,0 +1,262 @@ +package shared + +import ( + "context" + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type RoomUpdater struct { + transaction + d *Database + roomInfo *types.RoomInfo + latestEvents []types.StateAtEventAndReference + lastEventIDSent string + currentStateSnapshotNID types.StateSnapshotNID +} + +func rollback(txn *sql.Tx) { + if txn == nil { + return + } + txn.Rollback() // nolint: errcheck +} + +func NewRoomUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo *types.RoomInfo) (*RoomUpdater, error) { + // If the roomInfo is nil then that means that the room doesn't exist + // yet, so we can't do `SelectLatestEventsNIDsForUpdate` because that + // would involve locking a row on the table that doesn't exist. Instead + // we will just run with a normal database transaction. It'll either + // succeed, processing a create event which creates the room, or it won't. + if roomInfo == nil { + return &RoomUpdater{ + transaction{ctx, txn}, d, nil, nil, "", 0, + }, nil + } + + eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := + d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID) + if err != nil { + rollback(txn) + return nil, err + } + stateAndRefs, err := d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs) + if err != nil { + rollback(txn) + return nil, err + } + var lastEventIDSent string + if lastEventNIDSent != 0 { + lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent) + if err != nil { + rollback(txn) + return nil, err + } + } + return &RoomUpdater{ + transaction{ctx, txn}, d, roomInfo, stateAndRefs, lastEventIDSent, currentStateSnapshotNID, + }, nil +} + +// Implements sqlutil.Transaction +func (u *RoomUpdater) Commit() error { + if u.txn == nil { // SQLite mode probably + return nil + } + return u.txn.Commit() +} + +// Implements sqlutil.Transaction +func (u *RoomUpdater) Rollback() error { + if u.txn == nil { // SQLite mode probably + return nil + } + return u.txn.Rollback() +} + +// RoomVersion implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) { + return u.roomInfo.RoomVersion +} + +// LatestEvents implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) LatestEvents() []types.StateAtEventAndReference { + return u.latestEvents +} + +// LastEventIDSent implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) LastEventIDSent() string { + return u.lastEventIDSent +} + +// CurrentStateSnapshotNID implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID { + return u.currentStateSnapshotNID +} + +// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer +func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { + return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { + for _, ref := range previousEventReferences { + if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { + return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) + } + } + return nil + }) +} + +func (u *RoomUpdater) Events( + ctx context.Context, eventNIDs []types.EventNID, +) ([]types.Event, error) { + return u.d.events(ctx, u.txn, eventNIDs) +} + +func (u *RoomUpdater) SnapshotNIDFromEventID( + ctx context.Context, eventID string, +) (types.StateSnapshotNID, error) { + return u.d.snapshotNIDFromEventID(ctx, u.txn, eventID) +} + +func (u *RoomUpdater) StoreEvent( + ctx context.Context, event *gomatrixserverlib.Event, + authEventNIDs []types.EventNID, isRejected bool, +) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { + return u.d.storeEvent(ctx, u, event, authEventNIDs, isRejected) +} + +func (u *RoomUpdater) StateBlockNIDs( + ctx context.Context, stateNIDs []types.StateSnapshotNID, +) ([]types.StateBlockNIDList, error) { + return u.d.stateBlockNIDs(ctx, u.txn, stateNIDs) +} + +func (u *RoomUpdater) StateEntries( + ctx context.Context, stateBlockNIDs []types.StateBlockNID, +) ([]types.StateEntryList, error) { + return u.d.stateEntries(ctx, u.txn, stateBlockNIDs) +} + +func (u *RoomUpdater) StateEntriesForTuples( + ctx context.Context, + stateBlockNIDs []types.StateBlockNID, + stateKeyTuples []types.StateKeyTuple, +) ([]types.StateEntryList, error) { + return u.d.stateEntriesForTuples(ctx, u.txn, stateBlockNIDs, stateKeyTuples) +} + +func (u *RoomUpdater) AddState( + ctx context.Context, + roomNID types.RoomNID, + stateBlockNIDs []types.StateBlockNID, + state []types.StateEntry, +) (stateNID types.StateSnapshotNID, err error) { + return u.d.addState(ctx, u.txn, roomNID, stateBlockNIDs, state) +} + +func (u *RoomUpdater) SetState( + ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID, +) error { + return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { + return u.d.EventsTable.UpdateEventState(ctx, txn, eventNID, stateNID) + }) +} + +func (u *RoomUpdater) EventTypeNIDs( + ctx context.Context, eventTypes []string, +) (map[string]types.EventTypeNID, error) { + return u.d.eventTypeNIDs(ctx, u.txn, eventTypes) +} + +func (u *RoomUpdater) EventStateKeyNIDs( + ctx context.Context, eventStateKeys []string, +) (map[string]types.EventStateKeyNID, error) { + return u.d.eventStateKeyNIDs(ctx, u.txn, eventStateKeys) +} + +func (u *RoomUpdater) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) { + return u.d.roomInfo(ctx, u.txn, roomID) +} + +func (u *RoomUpdater) EventIDs( + ctx context.Context, eventNIDs []types.EventNID, +) (map[types.EventNID]string, error) { + return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs) +} + +func (u *RoomUpdater) StateAtEventIDs( + ctx context.Context, eventIDs []string, +) ([]types.StateAtEvent, error) { + return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs) +} + +func (u *RoomUpdater) StateEntriesForEventIDs( + ctx context.Context, eventIDs []string, +) ([]types.StateEntry, error) { + return u.d.EventsTable.BulkSelectStateEventByID(ctx, u.txn, eventIDs) +} + +func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) { + return u.d.eventsFromIDs(ctx, u.txn, eventIDs) +} + +func (u *RoomUpdater) GetMembershipEventNIDsForRoom( + ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool, +) ([]types.EventNID, error) { + return u.d.getMembershipEventNIDsForRoom(ctx, u.txn, roomNID, joinOnly, localOnly) +} + +// IsReferenced implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) { + err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256) + if err == nil { + return true, nil + } + if err == sql.ErrNoRows { + return false, nil + } + return false, fmt.Errorf("u.d.PrevEventsTable.SelectPreviousEventExists: %w", err) +} + +// SetLatestEvents implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) SetLatestEvents( + roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID, + currentStateSnapshotNID types.StateSnapshotNID, +) error { + eventNIDs := make([]types.EventNID, len(latest)) + for i := range latest { + eventNIDs[i] = latest[i].EventNID + } + return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { + if err := u.d.RoomsTable.UpdateLatestEventNIDs(u.ctx, txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID); err != nil { + return fmt.Errorf("u.d.RoomsTable.updateLatestEventNIDs: %w", err) + } + if roomID, ok := u.d.Cache.GetRoomServerRoomID(roomNID); ok { + if roomInfo, ok := u.d.Cache.GetRoomInfo(roomID); ok { + roomInfo.StateSnapshotNID = currentStateSnapshotNID + roomInfo.IsStub = false + u.d.Cache.StoreRoomInfo(roomID, roomInfo) + } + } + return nil + }) +} + +// HasEventBeenSent implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) { + return u.d.EventsTable.SelectEventSentToOutput(u.ctx, u.txn, eventNID) +} + +// MarkEventAsSent implements types.RoomRecentEventsUpdater +func (u *RoomUpdater) MarkEventAsSent(eventNID types.EventNID) error { + return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { + return u.d.EventsTable.UpdateEventSentToOutput(u.ctx, txn, eventNID) + }) +} + +func (u *RoomUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (*MembershipUpdater, error) { + return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomInfo.RoomNID, targetUserNID, targetLocal) +} |