diff options
author | kegsay <kegan@matrix.org> | 2023-04-27 12:54:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-27 12:54:20 +0100 |
commit | b189edf4f43ff34b69d6c60aeb0efb60dd549c86 (patch) | |
tree | 4c08aeda694f3e1cf17c66cf0e4b2b306af6a8df /syncapi/storage | |
parent | 2475cf4b61747e76a524af6f71a4eb7e112812af (diff) |
Remove gmsl.HeaderedEvent (#3068)
Replaced with types.HeaderedEvent _for now_. In reality we want to move
them all to gmsl.Event and only use HeaderedEvent when we _need_ to
bundle the version/event ID with the event (seriailsation boundaries,
and even then only when we don't have the room version).
Requires https://github.com/matrix-org/gomatrixserverlib/pull/373
Diffstat (limited to 'syncapi/storage')
18 files changed, 153 insertions, 147 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index f5c1223a..302b9bad 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/types" @@ -42,16 +43,16 @@ type DatabaseTransaction interface { MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error) - CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) + CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error) GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error) RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) - GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error) + GetBackwardTopologyPos(ctx context.Context, events []*rstypes.HeaderedEvent) (types.TopologyToken, error) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) - InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) + InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. @@ -65,15 +66,15 @@ type DatabaseTransaction interface { // If an event is not found in the database then it will be omitted from the list. // Returns an error if there was a problem talking with the database. // Does not include any transaction IDs in the returned events. - Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) + Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // If no event could be found, returns nil // If there was an issue during the retrieval, returns an error - GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) + GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error) // GetStateEventsForRoom fetches the state events for a given room. // Returns an empty slice if no state events could be found for this room. // Returns an error if there was an issue with the retrieval. - GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) + GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*rstypes.HeaderedEvent, err error) // GetAccountDataInRange returns all account data for a given user inserted or // updated between two given positions // Returns a map following the format data[roomID] = []dataTypes @@ -89,15 +90,15 @@ type DatabaseTransaction interface { // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. - StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent + StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent // SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the // relevant events within the given ranges for the supplied user ID and device ID. SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error) // GetRoomReceipts gets all receipts for a given roomID GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) - SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) - SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) - SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) + SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error) + SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error) + SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error) StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) // SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found @@ -123,11 +124,11 @@ type Database interface { // If an event is not found in the database then it will be omitted from the list. // Returns an error if there was a problem talking with the database. // Does not include any transaction IDs in the returned events. - Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) + Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the sync stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. - WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent, + WriteEvent(ctx context.Context, ev *rstypes.HeaderedEvent, addStateEvents []*rstypes.HeaderedEvent, addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility, ) (types.StreamPosition, error) @@ -146,7 +147,7 @@ type Database interface { // AddInviteEvent stores a new invite event for a user. // If the invite was successfully stored this returns the stream ID it was stored at. // Returns an error if there was a problem communicating with the database. - AddInviteEvent(ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error) + AddInviteEvent(ctx context.Context, inviteEvent *rstypes.HeaderedEvent) (types.StreamPosition, error) // RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite. // Returns an error if there was a problem communicating with the database. RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error) @@ -173,12 +174,12 @@ type Database interface { // goes wrong. PutFilter(ctx context.Context, localpart string, filter *synctypes.Filter) (string, error) // RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event - RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error + RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error // StoreReceipt stores new receipt events StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp spec.Timestamp) (pos types.StreamPosition, err error) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error - ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) - UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error + ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error) + UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error RedactRelations(ctx context.Context, roomID, redactedEventID string) error SelectMemberships( ctx context.Context, diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index c2a870c0..0cc96373 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -24,6 +24,7 @@ import ( "github.com/lib/pq" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -274,7 +275,7 @@ func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string, -) ([]*gomatrixserverlib.HeaderedEvent, error) { +) ([]*rstypes.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt) senders, notSenders := getSendersStateFilterFilter(stateFilter) // We're going to query members later, so remove them from this request @@ -320,7 +321,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom( func (s *currentRoomStateStatements) UpsertRoomState( ctx context.Context, txn *sql.Tx, - event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition, + event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition, ) error { // Parse content as JSON and search for an "url" key containsURL := false @@ -378,8 +379,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er return nil, err } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } @@ -394,8 +395,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er return events, nil } -func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { - result := []*gomatrixserverlib.HeaderedEvent{} +func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) { + result := []*rstypes.HeaderedEvent{} for rows.Next() { var eventID string var eventBytes []byte @@ -403,8 +404,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { return nil, err } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } result = append(result, &ev) @@ -414,7 +415,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { func (s *currentRoomStateStatements) SelectStateEvent( ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string, -) (*gomatrixserverlib.HeaderedEvent, error) { +) (*rstypes.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt) var res []byte err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) @@ -424,7 +425,7 @@ func (s *currentRoomStateStatements) SelectStateEvent( if err != nil { return nil, err } - var ev gomatrixserverlib.HeaderedEvent + var ev rstypes.HeaderedEvent if err = json.Unmarshal(res, &ev); err != nil { return nil, err } diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go index d68ed8d5..37660ee9 100644 --- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" + "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -79,7 +80,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom defer rows.Close() // nolint: errcheck var eventBytes []byte var roomID string - var event gomatrixserverlib.HeaderedEvent + var event types.HeaderedEvent var hisVis gomatrixserverlib.HistoryVisibility historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) for rows.Next() { diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 151bffa5..267209bb 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -22,9 +22,9 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) const inviteEventsSchema = ` @@ -89,7 +89,7 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) { } func (s *inviteEventsStatements) InsertInviteEvent( - ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent, + ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent, ) (streamPos types.StreamPosition, err error) { var headeredJSON []byte headeredJSON, err = json.Marshal(inviteEvent) @@ -119,7 +119,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent( // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) SelectInviteEventsInRange( ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range, -) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) { +) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) { var lastPos types.StreamPosition stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt) rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High()) @@ -127,8 +127,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange( return nil, nil, lastPos, err } defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed") - result := map[string]*gomatrixserverlib.HeaderedEvent{} - retired := map[string]*gomatrixserverlib.HeaderedEvent{} + result := map[string]*rstypes.HeaderedEvent{} + retired := map[string]*rstypes.HeaderedEvent{} for rows.Next() { var ( id types.StreamPosition @@ -151,7 +151,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange( continue } - var event *gomatrixserverlib.HeaderedEvent + var event *rstypes.HeaderedEvent if err := json.Unmarshal(eventJSON, &event); err != nil { return nil, nil, lastPos, err } diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go index 47833893..3905f9ab 100644 --- a/syncapi/storage/postgres/memberships_table.go +++ b/syncapi/storage/postgres/memberships_table.go @@ -19,9 +19,8 @@ import ( "database/sql" "fmt" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -100,7 +99,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) { } func (s *membershipsStatements) UpsertMembership( - ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, + ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, streamPos, topologicalPos types.StreamPosition, ) error { membership, err := event.Membership() diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 8ee5098c..3aadbccf 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -264,7 +265,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { }.Prepare(db) } -func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error { +func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error { headeredJSON, err := json.Marshal(event) if err != nil { return err @@ -329,8 +330,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -375,7 +376,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID( // of the inserted event. func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, - event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, + event *rstypes.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility, ) (streamPos types.StreamPosition, err error) { var txnID *string @@ -465,8 +466,8 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( return nil, err } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } @@ -577,7 +578,7 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom( return err } -func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) { +func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt rstypes.HeaderedEvent, err error) { row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID) var eventAsString string @@ -595,7 +596,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn func (s *outputRoomEventsStatements) SelectContextBeforeEvent( ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter, -) (evts []*gomatrixserverlib.HeaderedEvent, err error) { +) (evts []*rstypes.HeaderedEvent, err error) { senders, notSenders := getSendersRoomEventFilter(filter) rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext( ctx, roomID, id, filter.Limit, @@ -612,7 +613,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( for rows.Next() { var ( eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + evt *rstypes.HeaderedEvent historyVisibility gomatrixserverlib.HistoryVisibility ) if err = rows.Scan(&eventBytes, &historyVisibility); err != nil { @@ -630,7 +631,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( func (s *outputRoomEventsStatements) SelectContextAfterEvent( ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter, -) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) { +) (lastID int, evts []*rstypes.HeaderedEvent, err error) { senders, notSenders := getSendersRoomEventFilter(filter) rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext( ctx, roomID, id, filter.Limit, @@ -647,7 +648,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( for rows.Next() { var ( eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + evt *rstypes.HeaderedEvent historyVisibility gomatrixserverlib.HistoryVisibility ) if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil { @@ -680,8 +681,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { return nil, err } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } @@ -709,7 +710,7 @@ func (s *outputRoomEventsStatements) PurgeEvents( return err } -func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) { +func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) { rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit) if err != nil { return nil, err @@ -718,14 +719,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l var eventID string var id int64 - result := make(map[int64]gomatrixserverlib.HeaderedEvent) + result := make(map[int64]rstypes.HeaderedEvent) for rows.Next() { - var ev gomatrixserverlib.HeaderedEvent + var ev rstypes.HeaderedEvent var eventBytes []byte if err = rows.Scan(&id, &eventID, &eventBytes); err != nil { return nil, err } - if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + if err = json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } result[id] = ev diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 2382fca5..7140a92f 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -18,10 +18,9 @@ import ( "context" "database/sql" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -105,7 +104,7 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { // InsertEventInTopology inserts the given event in the room's topology, based // on the event's depth. func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( - ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, + ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition, ) (topoPos types.StreamPosition, err error) { err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext( ctx, event.EventID(), event.Depth(), event.RoomID(), pos, diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go index 47490fb0..05c7eec6 100644 --- a/syncapi/storage/shared/storage_consumer.go +++ b/syncapi/storage/shared/storage_consumer.go @@ -22,6 +22,7 @@ import ( "github.com/tidwall/gjson" + rstypes "github.com/matrix-org/dendrite/roomserver/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib/spec" @@ -90,7 +91,7 @@ func (d *Database) NewDatabaseTransaction(ctx context.Context) (*DatabaseTransac }, nil } -func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { +func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) { streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs, nil, false) if err != nil { return nil, err @@ -105,7 +106,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse // If the invite was successfully stored this returns the stream ID it was stored at. // Returns an error if there was a problem communicating with the database. func (d *Database) AddInviteEvent( - ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent, + ctx context.Context, inviteEvent *rstypes.HeaderedEvent, ) (sp types.StreamPosition, err error) { _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { sp, err = d.Invites.InsertInviteEvent(ctx, txn, inviteEvent) @@ -189,8 +190,8 @@ func (d *Database) UpsertAccountData( return } -func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent { - out := make([]*gomatrixserverlib.HeaderedEvent, len(in)) +func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent { + out := make([]*rstypes.HeaderedEvent, len(in)) for i := 0; i < len(in); i++ { out[i] = in[i].HeaderedEvent if device != nil && in[i].TransactionID != nil { @@ -213,7 +214,7 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. // This function should always be called within a sqlutil.Writer for safety in SQLite. -func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { +func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *rstypes.HeaderedEvent) error { if err := d.BackwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } @@ -246,8 +247,8 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e func (d *Database) WriteEvent( ctx context.Context, - ev *gomatrixserverlib.HeaderedEvent, - addStateEvents []*gomatrixserverlib.HeaderedEvent, + ev *rstypes.HeaderedEvent, + addStateEvents []*rstypes.HeaderedEvent, addStateEventIDs, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility, @@ -288,7 +289,7 @@ func (d *Database) WriteEvent( func (d *Database) updateRoomState( ctx context.Context, txn *sql.Tx, removedEventIDs []string, - addedEvents []*gomatrixserverlib.HeaderedEvent, + addedEvents []*rstypes.HeaderedEvent, pduPosition types.StreamPosition, topoPosition types.StreamPosition, ) error { @@ -342,7 +343,7 @@ func (d *Database) PutFilter( return filterID, err } -func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error { +func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error { redactedEvents, err := d.Events(ctx, []string{redactedEventID}) if err != nil { return err @@ -351,13 +352,13 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction") return nil } - eventToRedact := redactedEvents[0].Unwrap() - redactionEvent := redactedBecause.Unwrap() + eventToRedact := redactedEvents[0].Event + redactionEvent := redactedBecause.Event if err = eventutil.RedactEvent(redactionEvent, eventToRedact); err != nil { return err } - newEvent := eventToRedact.Headered(redactedBecause.RoomVersion) + newEvent := &rstypes.HeaderedEvent{Event: eventToRedact} err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.OutputEvents.UpdateEventJSON(ctx, txn, newEvent) }) @@ -521,14 +522,14 @@ func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userI return } -func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) { +func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error) { return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID) } -func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { +func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error) { return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter) } -func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { +func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error) { return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) } @@ -560,7 +561,7 @@ func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID s return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos) } -func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) { +func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error) { return d.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{ spec.MRoomName, spec.MRoomTopic, @@ -568,7 +569,7 @@ func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64 }) } -func (d *Database) UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { +func (d *Database) UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error { // No need to unmarshal if the event is a redaction if event.Type() == spec.MRoomRedaction { return nil diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index f11cbb57..e8ba3e25 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -6,11 +6,11 @@ import ( "fmt" "math" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/internal/eventutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -84,7 +84,7 @@ func (d *DatabaseTransaction) MaxStreamPositionForNotificationData(ctx context.C return types.StreamPosition(id), nil } -func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { +func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error) { return d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilterPart, excludeEventIDs) } @@ -161,7 +161,7 @@ func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID st return d.Topology.SelectPositionInTopology(ctx, d.txn, eventID) } -func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) { +func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) { return d.Invites.SelectInviteEventsInRange(ctx, d.txn, targetUserID, r) } @@ -178,7 +178,7 @@ func (d *DatabaseTransaction) RoomReceiptsAfter(ctx context.Context, roomIDs []s // If an event is not found in the database then it will be omitted from the list. // Returns an error if there was a problem talking with the database. // Does not include any transaction IDs in the returned events. -func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { +func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) { streamEvents, err := d.OutputEvents.SelectEvents(ctx, d.txn, eventIDs, nil, false) if err != nil { return nil, err @@ -207,13 +207,13 @@ func (d *DatabaseTransaction) SharedUsers(ctx context.Context, userID string, ot func (d *DatabaseTransaction) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, -) (*gomatrixserverlib.HeaderedEvent, error) { +) (*rstypes.HeaderedEvent, error) { return d.CurrentRoomState.SelectStateEvent(ctx, d.txn, roomID, evType, stateKey) } func (d *DatabaseTransaction) GetStateEventsForRoom( ctx context.Context, roomID string, stateFilter *synctypes.StateFilter, -) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) { +) (stateEvents []*rstypes.HeaderedEvent, err error) { stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilter, nil) return } @@ -302,7 +302,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition( // oldest event in the room's topology. func (d *DatabaseTransaction) GetBackwardTopologyPos( ctx context.Context, - events []*gomatrixserverlib.HeaderedEvent, + events []*rstypes.HeaderedEvent, ) (types.TopologyToken, error) { zeroToken := types.TopologyToken{} if len(events) == 0 { diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 01dcff69..1b8632eb 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -268,7 +269,7 @@ func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string, -) ([]*gomatrixserverlib.HeaderedEvent, error) { +) ([]*rstypes.HeaderedEvent, error) { // We're going to query members later, so remove them from this request if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers { notTypes := &[]string{spec.MRoomMember} @@ -319,7 +320,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom( func (s *currentRoomStateStatements) UpsertRoomState( ctx context.Context, txn *sql.Tx, - event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition, + event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition, ) error { // Parse content as JSON and search for an "url" key containsURL := false @@ -405,8 +406,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er return nil, err } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } @@ -421,8 +422,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er return events, nil } -func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { - result := []*gomatrixserverlib.HeaderedEvent{} +func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) { + result := []*rstypes.HeaderedEvent{} for rows.Next() { var eventID string var eventBytes []byte @@ -430,8 +431,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { return nil, err } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } result = append(result, &ev) @@ -441,7 +442,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { func (s *currentRoomStateStatements) SelectStateEvent( ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string, -) (*gomatrixserverlib.HeaderedEvent, error) { +) (*rstypes.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt) var res []byte err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) @@ -451,7 +452,7 @@ func (s *currentRoomStateStatements) SelectStateEvent( if err != nil { return nil, err } - var ev gomatrixserverlib.HeaderedEvent + var ev rstypes.HeaderedEvent if err = json.Unmarshal(res, &ev); err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go index d23f0756..f7ce6531 100644 --- a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" + "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -91,7 +92,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom defer rows.Close() // nolint: errcheck var eventBytes []byte var roomID string - var event gomatrixserverlib.HeaderedEvent + var event types.HeaderedEvent var hisVis gomatrixserverlib.HistoryVisibility historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) for rows.Next() { diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 19450099..347523cf 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -22,9 +22,9 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) const inviteEventsSchema = ` @@ -89,7 +89,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Inv } func (s *inviteEventsStatements) InsertInviteEvent( - ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent, + ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent, ) (streamPos types.StreamPosition, err error) { streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn) if err != nil { @@ -130,7 +130,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent( // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) SelectInviteEventsInRange( ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range, -) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) { +) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) { var lastPos types.StreamPosition stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt) rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High()) @@ -138,8 +138,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange( return nil, nil, lastPos, err } defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed") - result := map[string]*gomatrixserverlib.HeaderedEvent{} - retired := map[string]*gomatrixserverlib.HeaderedEvent{} + result := map[string]*rstypes.HeaderedEvent{} + retired := map[string]*rstypes.HeaderedEvent{} for rows.Next() { var ( id types.StreamPosition @@ -162,7 +162,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange( continue } - var event *gomatrixserverlib.HeaderedEvent + var event *rstypes.HeaderedEvent if err := json.Unmarshal(eventJSON, &event); err != nil { return nil, nil, lastPos, err } diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go index 2cc46a10..c09fa151 100644 --- a/syncapi/storage/sqlite3/memberships_table.go +++ b/syncapi/storage/sqlite3/memberships_table.go @@ -19,9 +19,8 @@ import ( "database/sql" "fmt" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -103,7 +102,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { } func (s *membershipsStatements) UpsertMembership( - ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, + ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, streamPos, topologicalPos types.StreamPosition, ) error { membership, err := event.Membership() diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index b5b6ea88..d63e7606 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -166,7 +167,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even }.Prepare(db) } -func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error { +func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error { headeredJSON, err := json.Marshal(event) if err != nil { return err @@ -249,8 +250,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -296,7 +297,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID( // of the inserted event. func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, - event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, + event *rstypes.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility, ) (types.StreamPosition, error) { var txnID *string @@ -498,8 +499,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { return nil, err } // TODO: Handle redacted events - var ev gomatrixserverlib.HeaderedEvent - if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + var ev rstypes.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } @@ -523,7 +524,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { } func (s *outputRoomEventsStatements) SelectContextEvent( ctx context.Context, txn *sql.Tx, roomID, eventID string, -) (id int, evt gomatrixserverlib.HeaderedEvent, err error) { +) (id int, evt rstypes.HeaderedEvent, err error) { row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID) var eventAsString string var historyVisibility gomatrixserverlib.HistoryVisibility @@ -540,7 +541,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent( func (s *outputRoomEventsStatements) SelectContextBeforeEvent( ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter, -) (evts []*gomatrixserverlib.HeaderedEvent, err error) { +) (evts []*rstypes.HeaderedEvent, err error) { stmt, params, err := prepareWithFilters( s.db, txn, selectContextBeforeEventSQL, []interface{}{ @@ -564,7 +565,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( for rows.Next() { var ( eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + evt *rstypes.HeaderedEvent historyVisibility gomatrixserverlib.HistoryVisibility ) if err = rows.Scan(&eventBytes, &historyVisibility); err != nil { @@ -582,7 +583,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( func (s *outputRoomEventsStatements) SelectContextAfterEvent( ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter, -) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) { +) (lastID int, evts []*rstypes.HeaderedEvent, err error) { stmt, params, err := prepareWithFilters( s.db, txn, selectContextAfterEventSQL, []interface{}{ @@ -606,7 +607,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( for rows.Next() { var ( eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + evt *rstypes.HeaderedEvent historyVisibility gomatrixserverlib.HistoryVisibility ) if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil { @@ -642,7 +643,7 @@ func (s *outputRoomEventsStatements) PurgeEvents( return err } -func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) { +func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) { params := make([]interface{}, len(types)+1) params[0] = afterID for i := range types { @@ -664,14 +665,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l var eventID string var id int64 - result := make(map[int64]gomatrixserverlib.HeaderedEvent) + result := make(map[int64]rstypes.HeaderedEvent) for rows.Next() { - var ev gomatrixserverlib.HeaderedEvent + var ev rstypes.HeaderedEvent var eventBytes []byte if err = rows.Scan(&id, &eventID, &eventBytes); err != nil { return nil, err } - if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + if err = json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } result[id] = ev diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index dc698de2..68b75f5b 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -18,9 +18,8 @@ import ( "context" "database/sql" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -104,7 +103,7 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { // insertEventInTopology inserts the given event in the room's topology, based // on the event's depth. func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( - ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, + ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition, ) (types.StreamPosition, error) { _, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext( ctx, event.EventID(), event.Depth(), event.RoomID(), pos, diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 2cc1378b..08ca99a7 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -34,9 +35,9 @@ func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, fun return db, close } -func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) { +func MustWriteEvents(t *testing.T, db storage.Database, events []*rstypes.HeaderedEvent) (positions []types.StreamPosition) { for _, ev := range events { - var addStateEvents []*gomatrixserverlib.HeaderedEvent + var addStateEvents []*rstypes.HeaderedEvent var addStateEventIDs []string var removeStateEventIDs []string if ev.StateKey() != nil { @@ -106,7 +107,7 @@ func TestRecentEventsPDU(t *testing.T) { To types.StreamPosition Limit int ReverseOrder bool - WantEvents []*gomatrixserverlib.HeaderedEvent + WantEvents []*rstypes.HeaderedEvent WantLimited bool }{ // The purpose of this test is to make sure that incremental syncs are including up to the latest events. @@ -316,7 +317,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { t.Parallel() db := MustCreateDatabase(t) - var events []*gomatrixserverlib.HeaderedEvent + var events []*types.HeaderedEvent events = append(events, MustCreateEvent(t, testRoomID, nil, &gomatrixserverlib.EventBuilder{ Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)), Type: "m.room.create", @@ -324,7 +325,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { Sender: testUserIDA, Depth: int64(len(events) + 1), })) - events = append(events, MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ + events = append(events, MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ Content: []byte(`{"membership":"join"}`), Type: "m.room.member", StateKey: &testUserIDA, @@ -332,7 +333,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { Depth: int64(len(events) + 1), })) // fork the dag into three, same prev_events and depth - parent := []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]} + parent := []*types.HeaderedEvent{events[len(events)-1]} depth := int64(len(events) + 1) for i := 0; i < 3; i++ { events = append(events, MustCreateEvent(t, testRoomID, parent, &gomatrixserverlib.EventBuilder{ @@ -365,7 +366,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { Name string From types.TopologyToken Limit int - Wants []*gomatrixserverlib.HeaderedEvent + Wants []*types.HeaderedEvent }{ { Name: "Pagination over the whole fork", @@ -406,7 +407,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) { t.Parallel() db := MustCreateDatabase(t) - makeEvents := func(roomID string) (events []*gomatrixserverlib.HeaderedEvent) { + makeEvents := func(roomID string) (events []*types.HeaderedEvent) { events = append(events, MustCreateEvent(t, roomID, nil, &gomatrixserverlib.EventBuilder{ Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)), Type: "m.room.create", @@ -414,7 +415,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) { Sender: testUserIDA, Depth: int64(len(events) + 1), })) - events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ + events = append(events, MustCreateEvent(t, roomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ Content: []byte(`{"membership":"join"}`), Type: "m.room.member", StateKey: &testUserIDA, @@ -460,14 +461,14 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) { // "federation" join userC := fmt.Sprintf("@radiance:%s", testOrigin) - joinEvent := MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ + joinEvent := MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ Content: []byte(`{"membership":"join"}`), Type: "m.room.member", StateKey: &userC, Sender: userC, Depth: int64(len(events) + 1), }) - MustWriteEvents(t, db, []*gomatrixserverlib.HeaderedEvent{joinEvent}) + MustWriteEvents(t, db, []*types.HeaderedEvent{joinEvent}) // Sync will return this for the prev_batch from := topologyTokenBefore(t, db, joinEvent.EventID()) @@ -638,7 +639,7 @@ func TestInviteBehaviour(t *testing.T) { StateKey: &testUserIDA, Sender: "@inviteUser2:somewhere", }) - for _, ev := range []*gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} { + for _, ev := range []*types.HeaderedEvent{inviteEvent1, inviteEvent2} { _, err := db.AddInviteEvent(ctx, ev) if err != nil { t.Fatalf("Failed to AddInviteEvent: %s", err) @@ -695,7 +696,7 @@ func assertInvitedToRooms(t *testing.T, res *types.Response, roomIDs []string) { } } -func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*gomatrixserverlib.HeaderedEvent) { +func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*types.HeaderedEvent) { t.Helper() if len(gots) != len(wants) { t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants)) diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 6384fb91..854292bd 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -35,11 +36,11 @@ type AccountData interface { } type Invites interface { - InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error) + InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent) (streamPos types.StreamPosition, err error) DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error) // SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value // for the room. - SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, maxID types.StreamPosition, err error) + SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*rstypes.HeaderedEvent, retired map[string]*rstypes.HeaderedEvent, maxID types.StreamPosition, err error) SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error) PurgeInvites(ctx context.Context, txn *sql.Tx, roomID string) error } @@ -59,7 +60,7 @@ type Events interface { SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error) InsertEvent( ctx context.Context, txn *sql.Tx, - event *gomatrixserverlib.HeaderedEvent, + event *rstypes.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool, @@ -70,16 +71,16 @@ type Events interface { // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *synctypes.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error) - UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error + UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely. DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) - SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) - SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) - SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) + SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, rstypes.HeaderedEvent, error) + SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error) + SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error) PurgeEvents(ctx context.Context, txn *sql.Tx, roomID string) error - ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) + ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]rstypes.HeaderedEvent, error) } // Topology keeps track of the depths and stream positions for all events. @@ -87,7 +88,7 @@ type Events interface { type Topology interface { // InsertEventInTopology inserts the given event in the room's topology, based on the event's depth. // `pos` is the stream position of this event in the events table, and is used to order events which have the same depth. - InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error) + InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error) // SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order. // Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`. // `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned. @@ -101,13 +102,13 @@ type Topology interface { } type CurrentRoomState interface { - SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) + SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error) SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) - UpsertRoomState(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error + UpsertRoomState(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition) error DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error // SelectCurrentState returns all the current state events for the given room. - SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) + SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error) // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) // SelectRoomIDsWithAnyMembership returns a map of all memberships for the given user. @@ -191,7 +192,7 @@ type Receipts interface { } type Memberships interface { - UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error + UpsertMembership(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error) SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) PurgeMemberships(ctx context.Context, txn *sql.Tx, roomID string) error diff --git a/syncapi/storage/tables/memberships_test.go b/syncapi/storage/tables/memberships_test.go index 2e8375c3..4afa2ac5 100644 --- a/syncapi/storage/tables/memberships_test.go +++ b/syncapi/storage/tables/memberships_test.go @@ -6,10 +6,10 @@ import ( "testing" "time" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/dendrite/internal/sqlutil" + rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" @@ -47,7 +47,7 @@ func TestMembershipsTable(t *testing.T) { room := test.NewRoom(t, alice) // Create users - var userEvents []*gomatrixserverlib.HeaderedEvent + var userEvents []*rstypes.HeaderedEvent users := []string{alice.ID} for _, x := range room.CurrentState() { if x.StateKeyEquals(alice.ID) { @@ -114,7 +114,7 @@ func testMembershipCount(t *testing.T, ctx context.Context, table tables.Members }) } -func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *gomatrixserverlib.HeaderedEvent, user *test.User, room *test.Room) { +func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *rstypes.HeaderedEvent, user *test.User, room *test.Room) { t.Run("upserting works as expected", func(t *testing.T) { if err := table.UpsertMembership(ctx, nil, membershipEvent, 1, 1); err != nil { t.Fatalf("failed to upsert membership: %s", err) |