aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2023-04-27 12:54:20 +0100
committerGitHub <noreply@github.com>2023-04-27 12:54:20 +0100
commitb189edf4f43ff34b69d6c60aeb0efb60dd549c86 (patch)
tree4c08aeda694f3e1cf17c66cf0e4b2b306af6a8df /syncapi/storage
parent2475cf4b61747e76a524af6f71a4eb7e112812af (diff)
Remove gmsl.HeaderedEvent (#3068)
Replaced with types.HeaderedEvent _for now_. In reality we want to move them all to gmsl.Event and only use HeaderedEvent when we _need_ to bundle the version/event ID with the event (seriailsation boundaries, and even then only when we don't have the room version). Requires https://github.com/matrix-org/gomatrixserverlib/pull/373
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/interface.go33
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go21
-rw-r--r--syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go3
-rw-r--r--syncapi/storage/postgres/invites_table.go12
-rw-r--r--syncapi/storage/postgres/memberships_table.go5
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go35
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go5
-rw-r--r--syncapi/storage/shared/storage_consumer.go35
-rw-r--r--syncapi/storage/shared/storage_sync.go14
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go21
-rw-r--r--syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go3
-rw-r--r--syncapi/storage/sqlite3/invites_table.go12
-rw-r--r--syncapi/storage/sqlite3/memberships_table.go5
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go31
-rw-r--r--syncapi/storage/sqlite3/output_room_events_topology_table.go5
-rw-r--r--syncapi/storage/storage_test.go27
-rw-r--r--syncapi/storage/tables/interface.go27
-rw-r--r--syncapi/storage/tables/memberships_test.go6
18 files changed, 153 insertions, 147 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index f5c1223a..302b9bad 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -42,16 +43,16 @@ type DatabaseTransaction interface {
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error)
- CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
- GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
+ GetBackwardTopologyPos(ctx context.Context, events []*rstypes.HeaderedEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
- InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
+ InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error)
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
@@ -65,15 +66,15 @@ type DatabaseTransaction interface {
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
- Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error)
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error
- GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
+ GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error)
// GetStateEventsForRoom fetches the state events for a given room.
// Returns an empty slice if no state events could be found for this room.
// Returns an error if there was an issue with the retrieval.
- GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error)
+ GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*rstypes.HeaderedEvent, err error)
// GetAccountDataInRange returns all account data for a given user inserted or
// updated between two given positions
// Returns a map following the format data[roomID] = []dataTypes
@@ -89,15 +90,15 @@ type DatabaseTransaction interface {
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
- StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent
+ StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the
// relevant events within the given ranges for the supplied user ID and device ID.
SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error)
// GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error)
- SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
- SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
- SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
+ SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error)
+ SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error)
+ SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error)
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
@@ -123,11 +124,11 @@ type Database interface {
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
- Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error)
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event.
- WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
+ WriteEvent(ctx context.Context, ev *rstypes.HeaderedEvent, addStateEvents []*rstypes.HeaderedEvent,
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (types.StreamPosition, error)
@@ -146,7 +147,7 @@ type Database interface {
// AddInviteEvent stores a new invite event for a user.
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
- AddInviteEvent(ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
+ AddInviteEvent(ctx context.Context, inviteEvent *rstypes.HeaderedEvent) (types.StreamPosition, error)
// RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite.
// Returns an error if there was a problem communicating with the database.
RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
@@ -173,12 +174,12 @@ type Database interface {
// goes wrong.
PutFilter(ctx context.Context, localpart string, filter *synctypes.Filter) (string, error)
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
- RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
+ RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error
// StoreReceipt stores new receipt events
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp spec.Timestamp) (pos types.StreamPosition, err error)
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
- ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error)
- UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
+ ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error)
+ UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error
RedactRelations(ctx context.Context, roomID, redactedEventID string) error
SelectMemberships(
ctx context.Context,
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index c2a870c0..0cc96373 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -24,6 +24,7 @@ import (
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -274,7 +275,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *synctypes.StateFilter,
excludeEventIDs []string,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
senders, notSenders := getSendersStateFilterFilter(stateFilter)
// We're going to query members later, so remove them from this request
@@ -320,7 +321,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
+ event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@@ -378,8 +379,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -394,8 +395,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return events, nil
}
-func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
- result := []*gomatrixserverlib.HeaderedEvent{}
+func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
+ result := []*rstypes.HeaderedEvent{}
for rows.Next() {
var eventID string
var eventBytes []byte
@@ -403,8 +404,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, &ev)
@@ -414,7 +415,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
+) (*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@@ -424,7 +425,7 @@ func (s *currentRoomStateStatements) SelectStateEvent(
if err != nil {
return nil, err
}
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
index d68ed8d5..37660ee9 100644
--- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
@@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -79,7 +80,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom
defer rows.Close() // nolint: errcheck
var eventBytes []byte
var roomID string
- var event gomatrixserverlib.HeaderedEvent
+ var event types.HeaderedEvent
var hisVis gomatrixserverlib.HistoryVisibility
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
for rows.Next() {
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index 151bffa5..267209bb 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -22,9 +22,9 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
const inviteEventsSchema = `
@@ -89,7 +89,7 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
}
func (s *inviteEventsStatements) InsertInviteEvent(
- ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
var headeredJSON []byte
headeredJSON, err = json.Marshal(inviteEvent)
@@ -119,7 +119,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
-) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
+) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
var lastPos types.StreamPosition
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
@@ -127,8 +127,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
return nil, nil, lastPos, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
- result := map[string]*gomatrixserverlib.HeaderedEvent{}
- retired := map[string]*gomatrixserverlib.HeaderedEvent{}
+ result := map[string]*rstypes.HeaderedEvent{}
+ retired := map[string]*rstypes.HeaderedEvent{}
for rows.Next() {
var (
id types.StreamPosition
@@ -151,7 +151,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
continue
}
- var event *gomatrixserverlib.HeaderedEvent
+ var event *rstypes.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, lastPos, err
}
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index 47833893..3905f9ab 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -19,9 +19,8 @@ import (
"database/sql"
"fmt"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -100,7 +99,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
}
func (s *membershipsStatements) UpsertMembership(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 8ee5098c..3aadbccf 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -264,7 +265,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
}.Prepare(db)
}
-func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error {
+func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
@@ -329,8 +330,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@@ -375,7 +376,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
+ event *rstypes.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
) (streamPos types.StreamPosition, err error) {
var txnID *string
@@ -465,8 +466,8 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -577,7 +578,7 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
-func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
+func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt rstypes.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
@@ -595,7 +596,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (evts []*rstypes.HeaderedEvent, err error) {
senders, notSenders := getSendersRoomEventFilter(filter)
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
@@ -612,7 +613,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
@@ -630,7 +631,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (lastID int, evts []*rstypes.HeaderedEvent, err error) {
senders, notSenders := getSendersRoomEventFilter(filter)
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
@@ -647,7 +648,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
@@ -680,8 +681,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -709,7 +710,7 @@ func (s *outputRoomEventsStatements) PurgeEvents(
return err
}
-func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
if err != nil {
return nil, err
@@ -718,14 +719,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
var eventID string
var id int64
- result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ result := make(map[int64]rstypes.HeaderedEvent)
for rows.Next() {
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
var eventBytes []byte
if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
return nil, err
}
- if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ if err = json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result[id] = ev
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index 2382fca5..7140a92f 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -18,10 +18,9 @@ import (
"context"
"database/sql"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -105,7 +104,7 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
// InsertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition,
) (topoPos types.StreamPosition, err error) {
err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go
index 47490fb0..05c7eec6 100644
--- a/syncapi/storage/shared/storage_consumer.go
+++ b/syncapi/storage/shared/storage_consumer.go
@@ -22,6 +22,7 @@ import (
"github.com/tidwall/gjson"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
@@ -90,7 +91,7 @@ func (d *Database) NewDatabaseTransaction(ctx context.Context) (*DatabaseTransac
}, nil
}
-func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) {
streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs, nil, false)
if err != nil {
return nil, err
@@ -105,7 +106,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *Database) AddInviteEvent(
- ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, inviteEvent *rstypes.HeaderedEvent,
) (sp types.StreamPosition, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
sp, err = d.Invites.InsertInviteEvent(ctx, txn, inviteEvent)
@@ -189,8 +190,8 @@ func (d *Database) UpsertAccountData(
return
}
-func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent {
- out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
+func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent {
+ out := make([]*rstypes.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
@@ -213,7 +214,7 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
// This function should always be called within a sqlutil.Writer for safety in SQLite.
-func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
+func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *rstypes.HeaderedEvent) error {
if err := d.BackwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
return err
}
@@ -246,8 +247,8 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e
func (d *Database) WriteEvent(
ctx context.Context,
- ev *gomatrixserverlib.HeaderedEvent,
- addStateEvents []*gomatrixserverlib.HeaderedEvent,
+ ev *rstypes.HeaderedEvent,
+ addStateEvents []*rstypes.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
@@ -288,7 +289,7 @@ func (d *Database) WriteEvent(
func (d *Database) updateRoomState(
ctx context.Context, txn *sql.Tx,
removedEventIDs []string,
- addedEvents []*gomatrixserverlib.HeaderedEvent,
+ addedEvents []*rstypes.HeaderedEvent,
pduPosition types.StreamPosition,
topoPosition types.StreamPosition,
) error {
@@ -342,7 +343,7 @@ func (d *Database) PutFilter(
return filterID, err
}
-func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
+func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error {
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
if err != nil {
return err
@@ -351,13 +352,13 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
return nil
}
- eventToRedact := redactedEvents[0].Unwrap()
- redactionEvent := redactedBecause.Unwrap()
+ eventToRedact := redactedEvents[0].Event
+ redactionEvent := redactedBecause.Event
if err = eventutil.RedactEvent(redactionEvent, eventToRedact); err != nil {
return err
}
- newEvent := eventToRedact.Headered(redactedBecause.RoomVersion)
+ newEvent := &rstypes.HeaderedEvent{Event: eventToRedact}
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.OutputEvents.UpdateEventJSON(ctx, txn, newEvent)
})
@@ -521,14 +522,14 @@ func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userI
return
}
-func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error) {
return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
}
-func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error) {
return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
}
-func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error) {
return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
@@ -560,7 +561,7 @@ func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID s
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
}
-func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error) {
return d.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{
spec.MRoomName,
spec.MRoomTopic,
@@ -568,7 +569,7 @@ func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64
})
}
-func (d *Database) UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
+func (d *Database) UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error {
// No need to unmarshal if the event is a redaction
if event.Type() == spec.MRoomRedaction {
return nil
diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go
index f11cbb57..e8ba3e25 100644
--- a/syncapi/storage/shared/storage_sync.go
+++ b/syncapi/storage/shared/storage_sync.go
@@ -6,11 +6,11 @@ import (
"fmt"
"math"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -84,7 +84,7 @@ func (d *DatabaseTransaction) MaxStreamPositionForNotificationData(ctx context.C
return types.StreamPosition(id), nil
}
-func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilterPart, excludeEventIDs)
}
@@ -161,7 +161,7 @@ func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID st
return d.Topology.SelectPositionInTopology(ctx, d.txn, eventID)
}
-func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
+func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
return d.Invites.SelectInviteEventsInRange(ctx, d.txn, targetUserID, r)
}
@@ -178,7 +178,7 @@ func (d *DatabaseTransaction) RoomReceiptsAfter(ctx context.Context, roomIDs []s
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
-func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) {
streamEvents, err := d.OutputEvents.SelectEvents(ctx, d.txn, eventIDs, nil, false)
if err != nil {
return nil, err
@@ -207,13 +207,13 @@ func (d *DatabaseTransaction) SharedUsers(ctx context.Context, userID string, ot
func (d *DatabaseTransaction) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
+) (*rstypes.HeaderedEvent, error) {
return d.CurrentRoomState.SelectStateEvent(ctx, d.txn, roomID, evType, stateKey)
}
func (d *DatabaseTransaction) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilter *synctypes.StateFilter,
-) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) {
+) (stateEvents []*rstypes.HeaderedEvent, err error) {
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilter, nil)
return
}
@@ -302,7 +302,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition(
// oldest event in the room's topology.
func (d *DatabaseTransaction) GetBackwardTopologyPos(
ctx context.Context,
- events []*gomatrixserverlib.HeaderedEvent,
+ events []*rstypes.HeaderedEvent,
) (types.TopologyToken, error) {
zeroToken := types.TopologyToken{}
if len(events) == 0 {
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index 01dcff69..1b8632eb 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -268,7 +269,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *synctypes.StateFilter,
excludeEventIDs []string,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*rstypes.HeaderedEvent, error) {
// We're going to query members later, so remove them from this request
if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
notTypes := &[]string{spec.MRoomMember}
@@ -319,7 +320,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
+ event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@@ -405,8 +406,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -421,8 +422,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return events, nil
}
-func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
- result := []*gomatrixserverlib.HeaderedEvent{}
+func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
+ result := []*rstypes.HeaderedEvent{}
for rows.Next() {
var eventID string
var eventBytes []byte
@@ -430,8 +431,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, &ev)
@@ -441,7 +442,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
+) (*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@@ -451,7 +452,7 @@ func (s *currentRoomStateStatements) SelectStateEvent(
if err != nil {
return nil, err
}
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
index d23f0756..f7ce6531 100644
--- a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
@@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -91,7 +92,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom
defer rows.Close() // nolint: errcheck
var eventBytes []byte
var roomID string
- var event gomatrixserverlib.HeaderedEvent
+ var event types.HeaderedEvent
var hisVis gomatrixserverlib.HistoryVisibility
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
for rows.Next() {
diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go
index 19450099..347523cf 100644
--- a/syncapi/storage/sqlite3/invites_table.go
+++ b/syncapi/storage/sqlite3/invites_table.go
@@ -22,9 +22,9 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
const inviteEventsSchema = `
@@ -89,7 +89,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Inv
}
func (s *inviteEventsStatements) InsertInviteEvent(
- ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil {
@@ -130,7 +130,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
-) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
+) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
var lastPos types.StreamPosition
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
@@ -138,8 +138,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
return nil, nil, lastPos, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
- result := map[string]*gomatrixserverlib.HeaderedEvent{}
- retired := map[string]*gomatrixserverlib.HeaderedEvent{}
+ result := map[string]*rstypes.HeaderedEvent{}
+ retired := map[string]*rstypes.HeaderedEvent{}
for rows.Next() {
var (
id types.StreamPosition
@@ -162,7 +162,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
continue
}
- var event *gomatrixserverlib.HeaderedEvent
+ var event *rstypes.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, lastPos, err
}
diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go
index 2cc46a10..c09fa151 100644
--- a/syncapi/storage/sqlite3/memberships_table.go
+++ b/syncapi/storage/sqlite3/memberships_table.go
@@ -19,9 +19,8 @@ import (
"database/sql"
"fmt"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -103,7 +102,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
}
func (s *membershipsStatements) UpsertMembership(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index b5b6ea88..d63e7606 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -166,7 +167,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
}.Prepare(db)
}
-func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error {
+func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
@@ -249,8 +250,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@@ -296,7 +297,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
+ event *rstypes.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
) (types.StreamPosition, error) {
var txnID *string
@@ -498,8 +499,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -523,7 +524,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
func (s *outputRoomEventsStatements) SelectContextEvent(
ctx context.Context, txn *sql.Tx, roomID, eventID string,
-) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
+) (id int, evt rstypes.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
var historyVisibility gomatrixserverlib.HistoryVisibility
@@ -540,7 +541,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (evts []*rstypes.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextBeforeEventSQL,
[]interface{}{
@@ -564,7 +565,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
@@ -582,7 +583,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (lastID int, evts []*rstypes.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextAfterEventSQL,
[]interface{}{
@@ -606,7 +607,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
@@ -642,7 +643,7 @@ func (s *outputRoomEventsStatements) PurgeEvents(
return err
}
-func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) {
params := make([]interface{}, len(types)+1)
params[0] = afterID
for i := range types {
@@ -664,14 +665,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
var eventID string
var id int64
- result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ result := make(map[int64]rstypes.HeaderedEvent)
for rows.Next() {
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
var eventBytes []byte
if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
return nil, err
}
- if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ if err = json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result[id] = ev
diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go
index dc698de2..68b75f5b 100644
--- a/syncapi/storage/sqlite3/output_room_events_topology_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go
@@ -18,9 +18,8 @@ import (
"context"
"database/sql"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -104,7 +103,7 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
// insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition,
) (types.StreamPosition, error) {
_, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 2cc1378b..08ca99a7 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -34,9 +35,9 @@ func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, fun
return db, close
}
-func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
+func MustWriteEvents(t *testing.T, db storage.Database, events []*rstypes.HeaderedEvent) (positions []types.StreamPosition) {
for _, ev := range events {
- var addStateEvents []*gomatrixserverlib.HeaderedEvent
+ var addStateEvents []*rstypes.HeaderedEvent
var addStateEventIDs []string
var removeStateEventIDs []string
if ev.StateKey() != nil {
@@ -106,7 +107,7 @@ func TestRecentEventsPDU(t *testing.T) {
To types.StreamPosition
Limit int
ReverseOrder bool
- WantEvents []*gomatrixserverlib.HeaderedEvent
+ WantEvents []*rstypes.HeaderedEvent
WantLimited bool
}{
// The purpose of this test is to make sure that incremental syncs are including up to the latest events.
@@ -316,7 +317,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
t.Parallel()
db := MustCreateDatabase(t)
- var events []*gomatrixserverlib.HeaderedEvent
+ var events []*types.HeaderedEvent
events = append(events, MustCreateEvent(t, testRoomID, nil, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
Type: "m.room.create",
@@ -324,7 +325,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Sender: testUserIDA,
Depth: int64(len(events) + 1),
}))
- events = append(events, MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
+ events = append(events, MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &testUserIDA,
@@ -332,7 +333,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Depth: int64(len(events) + 1),
}))
// fork the dag into three, same prev_events and depth
- parent := []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}
+ parent := []*types.HeaderedEvent{events[len(events)-1]}
depth := int64(len(events) + 1)
for i := 0; i < 3; i++ {
events = append(events, MustCreateEvent(t, testRoomID, parent, &gomatrixserverlib.EventBuilder{
@@ -365,7 +366,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Name string
From types.TopologyToken
Limit int
- Wants []*gomatrixserverlib.HeaderedEvent
+ Wants []*types.HeaderedEvent
}{
{
Name: "Pagination over the whole fork",
@@ -406,7 +407,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
t.Parallel()
db := MustCreateDatabase(t)
- makeEvents := func(roomID string) (events []*gomatrixserverlib.HeaderedEvent) {
+ makeEvents := func(roomID string) (events []*types.HeaderedEvent) {
events = append(events, MustCreateEvent(t, roomID, nil, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
Type: "m.room.create",
@@ -414,7 +415,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
Sender: testUserIDA,
Depth: int64(len(events) + 1),
}))
- events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
+ events = append(events, MustCreateEvent(t, roomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &testUserIDA,
@@ -460,14 +461,14 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
// "federation" join
userC := fmt.Sprintf("@radiance:%s", testOrigin)
- joinEvent := MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
+ joinEvent := MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &userC,
Sender: userC,
Depth: int64(len(events) + 1),
})
- MustWriteEvents(t, db, []*gomatrixserverlib.HeaderedEvent{joinEvent})
+ MustWriteEvents(t, db, []*types.HeaderedEvent{joinEvent})
// Sync will return this for the prev_batch
from := topologyTokenBefore(t, db, joinEvent.EventID())
@@ -638,7 +639,7 @@ func TestInviteBehaviour(t *testing.T) {
StateKey: &testUserIDA,
Sender: "@inviteUser2:somewhere",
})
- for _, ev := range []*gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} {
+ for _, ev := range []*types.HeaderedEvent{inviteEvent1, inviteEvent2} {
_, err := db.AddInviteEvent(ctx, ev)
if err != nil {
t.Fatalf("Failed to AddInviteEvent: %s", err)
@@ -695,7 +696,7 @@ func assertInvitedToRooms(t *testing.T, res *types.Response, roomIDs []string) {
}
}
-func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*gomatrixserverlib.HeaderedEvent) {
+func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*types.HeaderedEvent) {
t.Helper()
if len(gots) != len(wants) {
t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants))
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 6384fb91..854292bd 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -35,11 +36,11 @@ type AccountData interface {
}
type Invites interface {
- InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
+ InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent) (streamPos types.StreamPosition, err error)
DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error)
// SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value
// for the room.
- SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, maxID types.StreamPosition, err error)
+ SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*rstypes.HeaderedEvent, retired map[string]*rstypes.HeaderedEvent, maxID types.StreamPosition, err error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
PurgeInvites(ctx context.Context, txn *sql.Tx, roomID string) error
}
@@ -59,7 +60,7 @@ type Events interface {
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
InsertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent,
+ event *rstypes.HeaderedEvent,
addState, removeState []string,
transactionID *api.TransactionID,
excludeFromSync bool,
@@ -70,16 +71,16 @@ type Events interface {
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *synctypes.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
- UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error
+ UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
- SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
- SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
- SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
+ SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, rstypes.HeaderedEvent, error)
+ SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error)
+ SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error)
PurgeEvents(ctx context.Context, txn *sql.Tx, roomID string) error
- ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error)
+ ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]rstypes.HeaderedEvent, error)
}
// Topology keeps track of the depths and stream positions for all events.
@@ -87,7 +88,7 @@ type Events interface {
type Topology interface {
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
- InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
+ InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
@@ -101,13 +102,13 @@ type Topology interface {
}
type CurrentRoomState interface {
- SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
+ SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error)
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
- UpsertRoomState(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
+ UpsertRoomState(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
// SelectCurrentState returns all the current state events for the given room.
- SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error)
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
// SelectRoomIDsWithAnyMembership returns a map of all memberships for the given user.
@@ -191,7 +192,7 @@ type Receipts interface {
}
type Memberships interface {
- UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
+ UpsertMembership(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomID string) error
diff --git a/syncapi/storage/tables/memberships_test.go b/syncapi/storage/tables/memberships_test.go
index 2e8375c3..4afa2ac5 100644
--- a/syncapi/storage/tables/memberships_test.go
+++ b/syncapi/storage/tables/memberships_test.go
@@ -6,10 +6,10 @@ import (
"testing"
"time"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
@@ -47,7 +47,7 @@ func TestMembershipsTable(t *testing.T) {
room := test.NewRoom(t, alice)
// Create users
- var userEvents []*gomatrixserverlib.HeaderedEvent
+ var userEvents []*rstypes.HeaderedEvent
users := []string{alice.ID}
for _, x := range room.CurrentState() {
if x.StateKeyEquals(alice.ID) {
@@ -114,7 +114,7 @@ func testMembershipCount(t *testing.T, ctx context.Context, table tables.Members
})
}
-func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *gomatrixserverlib.HeaderedEvent, user *test.User, room *test.Room) {
+func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *rstypes.HeaderedEvent, user *test.User, room *test.Room) {
t.Run("upserting works as expected", func(t *testing.T) {
if err := table.UpsertMembership(ctx, nil, membershipEvent, 1, 1); err != nil {
t.Fatalf("failed to upsert membership: %s", err)