aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2023-04-27 12:54:20 +0100
committerGitHub <noreply@github.com>2023-04-27 12:54:20 +0100
commitb189edf4f43ff34b69d6c60aeb0efb60dd549c86 (patch)
tree4c08aeda694f3e1cf17c66cf0e4b2b306af6a8df /syncapi
parent2475cf4b61747e76a524af6f71a4eb7e112812af (diff)
Remove gmsl.HeaderedEvent (#3068)
Replaced with types.HeaderedEvent _for now_. In reality we want to move them all to gmsl.Event and only use HeaderedEvent when we _need_ to bundle the version/event ID with the event (seriailsation boundaries, and even then only when we don't have the room version). Requires https://github.com/matrix-org/gomatrixserverlib/pull/373
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go10
-rw-r--r--syncapi/internal/history_visibility.go11
-rw-r--r--syncapi/notifier/notifier.go4
-rw-r--r--syncapi/notifier/notifier_test.go8
-rw-r--r--syncapi/routing/context.go12
-rw-r--r--syncapi/routing/messages.go23
-rw-r--r--syncapi/routing/relations.go4
-rw-r--r--syncapi/routing/search.go6
-rw-r--r--syncapi/routing/search_test.go3
-rw-r--r--syncapi/storage/interface.go33
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go21
-rw-r--r--syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go3
-rw-r--r--syncapi/storage/postgres/invites_table.go12
-rw-r--r--syncapi/storage/postgres/memberships_table.go5
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go35
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go5
-rw-r--r--syncapi/storage/shared/storage_consumer.go35
-rw-r--r--syncapi/storage/shared/storage_sync.go14
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go21
-rw-r--r--syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go3
-rw-r--r--syncapi/storage/sqlite3/invites_table.go12
-rw-r--r--syncapi/storage/sqlite3/memberships_table.go5
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go31
-rw-r--r--syncapi/storage/sqlite3/output_room_events_topology_table.go5
-rw-r--r--syncapi/storage/storage_test.go27
-rw-r--r--syncapi/storage/tables/interface.go27
-rw-r--r--syncapi/storage/tables/memberships_test.go6
-rw-r--r--syncapi/streams/stream_pdu.go51
-rw-r--r--syncapi/syncapi_test.go7
-rw-r--r--syncapi/synctypes/clientevent.go5
-rw-r--r--syncapi/types/types.go9
-rw-r--r--syncapi/types/types_test.go3
32 files changed, 250 insertions, 206 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index c842c0c3..83ac82ff 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -21,7 +21,6 @@ import (
"fmt"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
@@ -31,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -318,7 +318,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
pduPos, err := s.db.WriteEvent(
ctx,
ev,
- []*gomatrixserverlib.HeaderedEvent{},
+ []*rstypes.HeaderedEvent{},
[]string{}, // adds no state
[]string{}, // removes no state
nil, // no transaction
@@ -362,7 +362,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return nil
}
-func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
+func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *rstypes.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
if ev.Type() != spec.MRoomMember {
return sp, nil
}
@@ -496,7 +496,7 @@ func (s *OutputRoomEventConsumer) onPurgeRoom(
}
}
-func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
+func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) (*rstypes.HeaderedEvent, error) {
if event.StateKey() == nil {
return event, nil
}
@@ -531,7 +531,7 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
return event, err
}
-func (s *OutputRoomEventConsumer) writeFTS(ev *gomatrixserverlib.HeaderedEvent, pduPosition types.StreamPosition) error {
+func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPosition types.StreamPosition) error {
if !s.cfg.Fulltext.Enabled {
return nil
}
diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go
index 10d7383b..7449b464 100644
--- a/syncapi/internal/history_visibility.go
+++ b/syncapi/internal/history_visibility.go
@@ -26,6 +26,7 @@ import (
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage"
)
@@ -98,16 +99,16 @@ func (ev eventVisibility) allowed() (allowed bool) {
}
}
-// ApplyHistoryVisibilityFilter applies the room history visibility filter on gomatrixserverlib.HeaderedEvents.
+// ApplyHistoryVisibilityFilter applies the room history visibility filter on types.HeaderedEvents.
// Returns the filtered events and an error, if any.
func ApplyHistoryVisibilityFilter(
ctx context.Context,
syncDB storage.DatabaseTransaction,
rsAPI api.SyncRoomserverAPI,
- events []*gomatrixserverlib.HeaderedEvent,
+ events []*types.HeaderedEvent,
alwaysIncludeEventIDs map[string]struct{},
userID, endpoint string,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*types.HeaderedEvent, error) {
if len(events) == 0 {
return events, nil
}
@@ -120,7 +121,7 @@ func ApplyHistoryVisibilityFilter(
}
// Get the mapping from eventID -> eventVisibility
- eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
+ eventsFiltered := make([]*types.HeaderedEvent, 0, len(events))
visibilities := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID())
for _, ev := range events {
evVis := visibilities[ev.EventID()]
@@ -170,7 +171,7 @@ func ApplyHistoryVisibilityFilter(
func visibilityForEvents(
ctx context.Context,
rsAPI api.SyncRoomserverAPI,
- events []*gomatrixserverlib.HeaderedEvent,
+ events []*types.HeaderedEvent,
userID, roomID string,
) map[string]eventVisibility {
eventIDs := make([]string, len(events))
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go
index 81b0e7f9..b76bdce3 100644
--- a/syncapi/notifier/notifier.go
+++ b/syncapi/notifier/notifier.go
@@ -20,9 +20,9 @@ import (
"time"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
log "github.com/sirupsen/logrus"
)
@@ -87,7 +87,7 @@ func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) {
// Typically a consumer supplies a posUpdate with the latest sync position for the
// event type it handles, leaving other fields as 0.
func (n *Notifier) OnNewEvent(
- ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
+ ev *rstypes.HeaderedEvent, roomID string, userIDs []string,
posUpdate types.StreamingToken,
) {
// update the current position then notify relevant /sync streams.
diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go
index b0631371..36577a0e 100644
--- a/syncapi/notifier/notifier_test.go
+++ b/syncapi/notifier/notifier_test.go
@@ -22,16 +22,16 @@ import (
"testing"
"time"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
var (
- randomMessageEvent gomatrixserverlib.HeaderedEvent
- aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
- bobLeaveEvent gomatrixserverlib.HeaderedEvent
+ randomMessageEvent rstypes.HeaderedEvent
+ aliceInviteBobEvent rstypes.HeaderedEvent
+ bobLeaveEvent rstypes.HeaderedEvent
syncPositionVeryOld = types.StreamingToken{PDUPosition: 5}
syncPositionBefore = types.StreamingToken{PDUPosition: 11}
syncPositionAfter = types.StreamingToken{PDUPosition: 12}
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index afd61eae..b4d61ecc 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -27,12 +27,12 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
roomserver "github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@@ -122,7 +122,7 @@ func Context(
// verify the user is allowed to see the context for this room/event
startTime := time.Now()
- filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, []*rstypes.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
return jsonerror.InternalServerError()
@@ -212,9 +212,9 @@ func Context(
// and an error, if any.
func applyHistoryVisibilityOnContextEvents(
ctx context.Context, snapshot storage.DatabaseTransaction, rsAPI roomserver.SyncRoomserverAPI,
- eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent,
+ eventsBefore, eventsAfter []*rstypes.HeaderedEvent,
userID string,
-) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) {
+) (filteredBefore, filteredAfter []*rstypes.HeaderedEvent, err error) {
eventIDsBefore := make(map[string]struct{}, len(eventsBefore))
eventIDsAfter := make(map[string]struct{}, len(eventsAfter))
@@ -245,7 +245,7 @@ func applyHistoryVisibilityOnContextEvents(
return filteredBefore, filteredAfter, nil
}
-func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
+func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, startEvents, endEvents []*rstypes.HeaderedEvent) (start, end types.TopologyToken, err error) {
if len(startEvents) > 0 {
start, err = snapshot.EventPositionInTopology(ctx, startEvents[0].EventID())
if err != nil {
@@ -265,7 +265,7 @@ func applyLazyLoadMembers(
roomID string,
events []synctypes.ClientEvent,
lazyLoadCache caching.LazyLoadCache,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*rstypes.HeaderedEvent, error) {
eventSenders := make(map[string]struct{})
// get members who actually send an event
for _, e := range events {
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 6e508a92..0d087403 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
@@ -303,7 +304,7 @@ func (r *messagesReq) retrieveEvents() (
return
}
- var events []*gomatrixserverlib.HeaderedEvent
+ var events []*rstypes.HeaderedEvent
util.GetLogger(r.ctx).WithFields(logrus.Fields{
"start": r.from,
"end": r.to,
@@ -342,8 +343,8 @@ func (r *messagesReq) retrieveEvents() (
// Sort the events to ensure we send them in the right order.
if r.backwardOrdering {
// This reverses the array from old->new to new->old
- reversed := func(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
- out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
+ reversed := func(in []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
+ out := make([]*rstypes.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[len(in)-i-1]
}
@@ -367,7 +368,7 @@ func (r *messagesReq) retrieveEvents() (
return synctypes.HeaderedToClientEvents(filteredEvents, synctypes.FormatAll), start, end, err
}
-func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
+func (r *messagesReq) getStartEnd(events []*rstypes.HeaderedEvent) (start, end types.TopologyToken, err error) {
if r.backwardOrdering {
start = *r.from
if events[len(events)-1].Type() == spec.MRoomCreate {
@@ -406,7 +407,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
// Returns an error if there was an issue talking with the database or
// backfilling.
func (r *messagesReq) handleEmptyEventsSlice() (
- events []*gomatrixserverlib.HeaderedEvent, err error,
+ events []*rstypes.HeaderedEvent, err error,
) {
backwardExtremities, err := r.snapshot.BackwardExtremitiesForRoom(r.ctx, r.roomID)
@@ -420,7 +421,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
} else {
// If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice.
- events = []*gomatrixserverlib.HeaderedEvent{}
+ events = []*rstypes.HeaderedEvent{}
}
return
@@ -432,7 +433,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// through backfilling if needed.
// Returns an error if there was an issue while backfilling.
func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent) (
- events []*gomatrixserverlib.HeaderedEvent, err error,
+ events []*rstypes.HeaderedEvent, err error,
) {
// Check if we have enough events.
isSetLargeEnough := len(streamEvents) >= r.filter.Limit
@@ -460,7 +461,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
- var pdus []*gomatrixserverlib.HeaderedEvent
+ var pdus []*rstypes.HeaderedEvent
// Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit-len(streamEvents))
if err != nil {
@@ -478,7 +479,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
return
}
-type eventsByDepth []*gomatrixserverlib.HeaderedEvent
+type eventsByDepth []*rstypes.HeaderedEvent
func (e eventsByDepth) Len() int {
return len(e)
@@ -499,7 +500,7 @@ func (e eventsByDepth) Less(i, j int) bool {
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
-func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]*rstypes.HeaderedEvent, error) {
var res api.PerformBackfillResponse
err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{
RoomID: roomID,
@@ -532,7 +533,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
_, err = r.db.WriteEvent(
context.Background(),
res.Events[i],
- []*gomatrixserverlib.HeaderedEvent{},
+ []*rstypes.HeaderedEvent{},
[]string{},
[]string{},
nil, true,
diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go
index 79533883..275ab65c 100644
--- a/syncapi/routing/relations.go
+++ b/syncapi/routing/relations.go
@@ -18,13 +18,13 @@ import (
"net/http"
"strconv"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -96,7 +96,7 @@ func Relations(
return util.ErrorResponse(err)
}
- headeredEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
+ headeredEvents := make([]*rstypes.HeaderedEvent, 0, len(events))
for _, event := range events {
headeredEvents = append(headeredEvents, event.HeaderedEvent)
}
diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go
index 224c6807..beaa227c 100644
--- a/syncapi/routing/search.go
+++ b/syncapi/routing/search.go
@@ -22,7 +22,6 @@ import (
"time"
"github.com/blevesearch/bleve/v2/search"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@@ -32,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/userapi/api"
@@ -263,10 +263,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
func contextEvents(
ctx context.Context,
snapshot storage.DatabaseTransaction,
- event *gomatrixserverlib.HeaderedEvent,
+ event *types.HeaderedEvent,
roomFilter *synctypes.RoomEventFilter,
searchReq SearchRequest,
-) ([]*gomatrixserverlib.HeaderedEvent, []*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*types.HeaderedEvent, []*types.HeaderedEvent, error) {
id, _, err := snapshot.SelectContextEvent(ctx, event.RoomID(), event.EventID())
if err != nil {
logrus.WithError(err).Error("failed to query context event")
diff --git a/syncapi/routing/search_test.go b/syncapi/routing/search_test.go
index c8b9c604..1cc95a87 100644
--- a/syncapi/routing/search_test.go
+++ b/syncapi/routing/search_test.go
@@ -9,6 +9,7 @@ import (
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -215,7 +216,7 @@ func TestSearch(t *testing.T) {
// store the events in the database
var sp types.StreamPosition
for _, x := range room.Events() {
- var stateEvents []*gomatrixserverlib.HeaderedEvent
+ var stateEvents []*rstypes.HeaderedEvent
var stateEventIDs []string
if x.Type() == spec.MRoomMember {
stateEvents = append(stateEvents, x)
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index f5c1223a..302b9bad 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -42,16 +43,16 @@ type DatabaseTransaction interface {
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error)
- CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
- GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
+ GetBackwardTopologyPos(ctx context.Context, events []*rstypes.HeaderedEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
- InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
+ InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error)
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
@@ -65,15 +66,15 @@ type DatabaseTransaction interface {
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
- Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error)
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error
- GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
+ GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error)
// GetStateEventsForRoom fetches the state events for a given room.
// Returns an empty slice if no state events could be found for this room.
// Returns an error if there was an issue with the retrieval.
- GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error)
+ GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*rstypes.HeaderedEvent, err error)
// GetAccountDataInRange returns all account data for a given user inserted or
// updated between two given positions
// Returns a map following the format data[roomID] = []dataTypes
@@ -89,15 +90,15 @@ type DatabaseTransaction interface {
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
- StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent
+ StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the
// relevant events within the given ranges for the supplied user ID and device ID.
SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error)
// GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error)
- SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
- SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
- SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
+ SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error)
+ SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error)
+ SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error)
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
@@ -123,11 +124,11 @@ type Database interface {
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
- Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error)
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event.
- WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
+ WriteEvent(ctx context.Context, ev *rstypes.HeaderedEvent, addStateEvents []*rstypes.HeaderedEvent,
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (types.StreamPosition, error)
@@ -146,7 +147,7 @@ type Database interface {
// AddInviteEvent stores a new invite event for a user.
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
- AddInviteEvent(ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
+ AddInviteEvent(ctx context.Context, inviteEvent *rstypes.HeaderedEvent) (types.StreamPosition, error)
// RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite.
// Returns an error if there was a problem communicating with the database.
RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
@@ -173,12 +174,12 @@ type Database interface {
// goes wrong.
PutFilter(ctx context.Context, localpart string, filter *synctypes.Filter) (string, error)
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
- RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
+ RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error
// StoreReceipt stores new receipt events
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp spec.Timestamp) (pos types.StreamPosition, err error)
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
- ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error)
- UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
+ ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error)
+ UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error
RedactRelations(ctx context.Context, roomID, redactedEventID string) error
SelectMemberships(
ctx context.Context,
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index c2a870c0..0cc96373 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -24,6 +24,7 @@ import (
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -274,7 +275,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *synctypes.StateFilter,
excludeEventIDs []string,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
senders, notSenders := getSendersStateFilterFilter(stateFilter)
// We're going to query members later, so remove them from this request
@@ -320,7 +321,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
+ event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@@ -378,8 +379,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -394,8 +395,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return events, nil
}
-func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
- result := []*gomatrixserverlib.HeaderedEvent{}
+func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
+ result := []*rstypes.HeaderedEvent{}
for rows.Next() {
var eventID string
var eventBytes []byte
@@ -403,8 +404,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, &ev)
@@ -414,7 +415,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
+) (*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@@ -424,7 +425,7 @@ func (s *currentRoomStateStatements) SelectStateEvent(
if err != nil {
return nil, err
}
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
index d68ed8d5..37660ee9 100644
--- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
@@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -79,7 +80,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom
defer rows.Close() // nolint: errcheck
var eventBytes []byte
var roomID string
- var event gomatrixserverlib.HeaderedEvent
+ var event types.HeaderedEvent
var hisVis gomatrixserverlib.HistoryVisibility
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
for rows.Next() {
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index 151bffa5..267209bb 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -22,9 +22,9 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
const inviteEventsSchema = `
@@ -89,7 +89,7 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
}
func (s *inviteEventsStatements) InsertInviteEvent(
- ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
var headeredJSON []byte
headeredJSON, err = json.Marshal(inviteEvent)
@@ -119,7 +119,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
-) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
+) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
var lastPos types.StreamPosition
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
@@ -127,8 +127,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
return nil, nil, lastPos, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
- result := map[string]*gomatrixserverlib.HeaderedEvent{}
- retired := map[string]*gomatrixserverlib.HeaderedEvent{}
+ result := map[string]*rstypes.HeaderedEvent{}
+ retired := map[string]*rstypes.HeaderedEvent{}
for rows.Next() {
var (
id types.StreamPosition
@@ -151,7 +151,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
continue
}
- var event *gomatrixserverlib.HeaderedEvent
+ var event *rstypes.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, lastPos, err
}
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index 47833893..3905f9ab 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -19,9 +19,8 @@ import (
"database/sql"
"fmt"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -100,7 +99,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
}
func (s *membershipsStatements) UpsertMembership(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 8ee5098c..3aadbccf 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -264,7 +265,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
}.Prepare(db)
}
-func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error {
+func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
@@ -329,8 +330,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@@ -375,7 +376,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
+ event *rstypes.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
) (streamPos types.StreamPosition, err error) {
var txnID *string
@@ -465,8 +466,8 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -577,7 +578,7 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
-func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
+func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt rstypes.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
@@ -595,7 +596,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (evts []*rstypes.HeaderedEvent, err error) {
senders, notSenders := getSendersRoomEventFilter(filter)
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
@@ -612,7 +613,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
@@ -630,7 +631,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (lastID int, evts []*rstypes.HeaderedEvent, err error) {
senders, notSenders := getSendersRoomEventFilter(filter)
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
@@ -647,7 +648,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
@@ -680,8 +681,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -709,7 +710,7 @@ func (s *outputRoomEventsStatements) PurgeEvents(
return err
}
-func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
if err != nil {
return nil, err
@@ -718,14 +719,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
var eventID string
var id int64
- result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ result := make(map[int64]rstypes.HeaderedEvent)
for rows.Next() {
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
var eventBytes []byte
if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
return nil, err
}
- if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ if err = json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result[id] = ev
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index 2382fca5..7140a92f 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -18,10 +18,9 @@ import (
"context"
"database/sql"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -105,7 +104,7 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
// InsertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition,
) (topoPos types.StreamPosition, err error) {
err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go
index 47490fb0..05c7eec6 100644
--- a/syncapi/storage/shared/storage_consumer.go
+++ b/syncapi/storage/shared/storage_consumer.go
@@ -22,6 +22,7 @@ import (
"github.com/tidwall/gjson"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
@@ -90,7 +91,7 @@ func (d *Database) NewDatabaseTransaction(ctx context.Context) (*DatabaseTransac
}, nil
}
-func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) {
streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs, nil, false)
if err != nil {
return nil, err
@@ -105,7 +106,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *Database) AddInviteEvent(
- ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, inviteEvent *rstypes.HeaderedEvent,
) (sp types.StreamPosition, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
sp, err = d.Invites.InsertInviteEvent(ctx, txn, inviteEvent)
@@ -189,8 +190,8 @@ func (d *Database) UpsertAccountData(
return
}
-func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent {
- out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
+func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent {
+ out := make([]*rstypes.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
@@ -213,7 +214,7 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
// This function should always be called within a sqlutil.Writer for safety in SQLite.
-func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
+func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *rstypes.HeaderedEvent) error {
if err := d.BackwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
return err
}
@@ -246,8 +247,8 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e
func (d *Database) WriteEvent(
ctx context.Context,
- ev *gomatrixserverlib.HeaderedEvent,
- addStateEvents []*gomatrixserverlib.HeaderedEvent,
+ ev *rstypes.HeaderedEvent,
+ addStateEvents []*rstypes.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
@@ -288,7 +289,7 @@ func (d *Database) WriteEvent(
func (d *Database) updateRoomState(
ctx context.Context, txn *sql.Tx,
removedEventIDs []string,
- addedEvents []*gomatrixserverlib.HeaderedEvent,
+ addedEvents []*rstypes.HeaderedEvent,
pduPosition types.StreamPosition,
topoPosition types.StreamPosition,
) error {
@@ -342,7 +343,7 @@ func (d *Database) PutFilter(
return filterID, err
}
-func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
+func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error {
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
if err != nil {
return err
@@ -351,13 +352,13 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
return nil
}
- eventToRedact := redactedEvents[0].Unwrap()
- redactionEvent := redactedBecause.Unwrap()
+ eventToRedact := redactedEvents[0].Event
+ redactionEvent := redactedBecause.Event
if err = eventutil.RedactEvent(redactionEvent, eventToRedact); err != nil {
return err
}
- newEvent := eventToRedact.Headered(redactedBecause.RoomVersion)
+ newEvent := &rstypes.HeaderedEvent{Event: eventToRedact}
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.OutputEvents.UpdateEventJSON(ctx, txn, newEvent)
})
@@ -521,14 +522,14 @@ func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userI
return
}
-func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error) {
return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
}
-func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error) {
return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
}
-func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error) {
return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
@@ -560,7 +561,7 @@ func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID s
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
}
-func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error) {
return d.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{
spec.MRoomName,
spec.MRoomTopic,
@@ -568,7 +569,7 @@ func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64
})
}
-func (d *Database) UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
+func (d *Database) UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error {
// No need to unmarshal if the event is a redaction
if event.Type() == spec.MRoomRedaction {
return nil
diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go
index f11cbb57..e8ba3e25 100644
--- a/syncapi/storage/shared/storage_sync.go
+++ b/syncapi/storage/shared/storage_sync.go
@@ -6,11 +6,11 @@ import (
"fmt"
"math"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -84,7 +84,7 @@ func (d *DatabaseTransaction) MaxStreamPositionForNotificationData(ctx context.C
return types.StreamPosition(id), nil
}
-func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilterPart, excludeEventIDs)
}
@@ -161,7 +161,7 @@ func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID st
return d.Topology.SelectPositionInTopology(ctx, d.txn, eventID)
}
-func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
+func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
return d.Invites.SelectInviteEventsInRange(ctx, d.txn, targetUserID, r)
}
@@ -178,7 +178,7 @@ func (d *DatabaseTransaction) RoomReceiptsAfter(ctx context.Context, roomIDs []s
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
-func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
+func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) {
streamEvents, err := d.OutputEvents.SelectEvents(ctx, d.txn, eventIDs, nil, false)
if err != nil {
return nil, err
@@ -207,13 +207,13 @@ func (d *DatabaseTransaction) SharedUsers(ctx context.Context, userID string, ot
func (d *DatabaseTransaction) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
+) (*rstypes.HeaderedEvent, error) {
return d.CurrentRoomState.SelectStateEvent(ctx, d.txn, roomID, evType, stateKey)
}
func (d *DatabaseTransaction) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilter *synctypes.StateFilter,
-) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) {
+) (stateEvents []*rstypes.HeaderedEvent, err error) {
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilter, nil)
return
}
@@ -302,7 +302,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition(
// oldest event in the room's topology.
func (d *DatabaseTransaction) GetBackwardTopologyPos(
ctx context.Context,
- events []*gomatrixserverlib.HeaderedEvent,
+ events []*rstypes.HeaderedEvent,
) (types.TopologyToken, error) {
zeroToken := types.TopologyToken{}
if len(events) == 0 {
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index 01dcff69..1b8632eb 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -268,7 +269,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *synctypes.StateFilter,
excludeEventIDs []string,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*rstypes.HeaderedEvent, error) {
// We're going to query members later, so remove them from this request
if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
notTypes := &[]string{spec.MRoomMember}
@@ -319,7 +320,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
+ event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@@ -405,8 +406,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -421,8 +422,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return events, nil
}
-func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
- result := []*gomatrixserverlib.HeaderedEvent{}
+func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
+ result := []*rstypes.HeaderedEvent{}
for rows.Next() {
var eventID string
var eventBytes []byte
@@ -430,8 +431,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, &ev)
@@ -441,7 +442,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
+) (*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@@ -451,7 +452,7 @@ func (s *currentRoomStateStatements) SelectStateEvent(
if err != nil {
return nil, err
}
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
index d23f0756..f7ce6531 100644
--- a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
@@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -91,7 +92,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom
defer rows.Close() // nolint: errcheck
var eventBytes []byte
var roomID string
- var event gomatrixserverlib.HeaderedEvent
+ var event types.HeaderedEvent
var hisVis gomatrixserverlib.HistoryVisibility
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
for rows.Next() {
diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go
index 19450099..347523cf 100644
--- a/syncapi/storage/sqlite3/invites_table.go
+++ b/syncapi/storage/sqlite3/invites_table.go
@@ -22,9 +22,9 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
const inviteEventsSchema = `
@@ -89,7 +89,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Inv
}
func (s *inviteEventsStatements) InsertInviteEvent(
- ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil {
@@ -130,7 +130,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
-) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
+) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
var lastPos types.StreamPosition
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
@@ -138,8 +138,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
return nil, nil, lastPos, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
- result := map[string]*gomatrixserverlib.HeaderedEvent{}
- retired := map[string]*gomatrixserverlib.HeaderedEvent{}
+ result := map[string]*rstypes.HeaderedEvent{}
+ retired := map[string]*rstypes.HeaderedEvent{}
for rows.Next() {
var (
id types.StreamPosition
@@ -162,7 +162,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
continue
}
- var event *gomatrixserverlib.HeaderedEvent
+ var event *rstypes.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, lastPos, err
}
diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go
index 2cc46a10..c09fa151 100644
--- a/syncapi/storage/sqlite3/memberships_table.go
+++ b/syncapi/storage/sqlite3/memberships_table.go
@@ -19,9 +19,8 @@ import (
"database/sql"
"fmt"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -103,7 +102,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
}
func (s *membershipsStatements) UpsertMembership(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index b5b6ea88..d63e7606 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -166,7 +167,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
}.Prepare(db)
}
-func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error {
+func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
@@ -249,8 +250,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@@ -296,7 +297,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
+ event *rstypes.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
) (types.StreamPosition, error) {
var txnID *string
@@ -498,8 +499,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -523,7 +524,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
func (s *outputRoomEventsStatements) SelectContextEvent(
ctx context.Context, txn *sql.Tx, roomID, eventID string,
-) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
+) (id int, evt rstypes.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
var historyVisibility gomatrixserverlib.HistoryVisibility
@@ -540,7 +541,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (evts []*rstypes.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextBeforeEventSQL,
[]interface{}{
@@ -564,7 +565,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
@@ -582,7 +583,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (lastID int, evts []*rstypes.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextAfterEventSQL,
[]interface{}{
@@ -606,7 +607,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
@@ -642,7 +643,7 @@ func (s *outputRoomEventsStatements) PurgeEvents(
return err
}
-func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) {
params := make([]interface{}, len(types)+1)
params[0] = afterID
for i := range types {
@@ -664,14 +665,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
var eventID string
var id int64
- result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ result := make(map[int64]rstypes.HeaderedEvent)
for rows.Next() {
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
var eventBytes []byte
if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
return nil, err
}
- if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ if err = json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result[id] = ev
diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go
index dc698de2..68b75f5b 100644
--- a/syncapi/storage/sqlite3/output_room_events_topology_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go
@@ -18,9 +18,8 @@ import (
"context"
"database/sql"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -104,7 +103,7 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
// insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition,
) (types.StreamPosition, error) {
_, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 2cc1378b..08ca99a7 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -34,9 +35,9 @@ func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, fun
return db, close
}
-func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
+func MustWriteEvents(t *testing.T, db storage.Database, events []*rstypes.HeaderedEvent) (positions []types.StreamPosition) {
for _, ev := range events {
- var addStateEvents []*gomatrixserverlib.HeaderedEvent
+ var addStateEvents []*rstypes.HeaderedEvent
var addStateEventIDs []string
var removeStateEventIDs []string
if ev.StateKey() != nil {
@@ -106,7 +107,7 @@ func TestRecentEventsPDU(t *testing.T) {
To types.StreamPosition
Limit int
ReverseOrder bool
- WantEvents []*gomatrixserverlib.HeaderedEvent
+ WantEvents []*rstypes.HeaderedEvent
WantLimited bool
}{
// The purpose of this test is to make sure that incremental syncs are including up to the latest events.
@@ -316,7 +317,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
t.Parallel()
db := MustCreateDatabase(t)
- var events []*gomatrixserverlib.HeaderedEvent
+ var events []*types.HeaderedEvent
events = append(events, MustCreateEvent(t, testRoomID, nil, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
Type: "m.room.create",
@@ -324,7 +325,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Sender: testUserIDA,
Depth: int64(len(events) + 1),
}))
- events = append(events, MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
+ events = append(events, MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &testUserIDA,
@@ -332,7 +333,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Depth: int64(len(events) + 1),
}))
// fork the dag into three, same prev_events and depth
- parent := []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}
+ parent := []*types.HeaderedEvent{events[len(events)-1]}
depth := int64(len(events) + 1)
for i := 0; i < 3; i++ {
events = append(events, MustCreateEvent(t, testRoomID, parent, &gomatrixserverlib.EventBuilder{
@@ -365,7 +366,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Name string
From types.TopologyToken
Limit int
- Wants []*gomatrixserverlib.HeaderedEvent
+ Wants []*types.HeaderedEvent
}{
{
Name: "Pagination over the whole fork",
@@ -406,7 +407,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
t.Parallel()
db := MustCreateDatabase(t)
- makeEvents := func(roomID string) (events []*gomatrixserverlib.HeaderedEvent) {
+ makeEvents := func(roomID string) (events []*types.HeaderedEvent) {
events = append(events, MustCreateEvent(t, roomID, nil, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
Type: "m.room.create",
@@ -414,7 +415,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
Sender: testUserIDA,
Depth: int64(len(events) + 1),
}))
- events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
+ events = append(events, MustCreateEvent(t, roomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &testUserIDA,
@@ -460,14 +461,14 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
// "federation" join
userC := fmt.Sprintf("@radiance:%s", testOrigin)
- joinEvent := MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
+ joinEvent := MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &userC,
Sender: userC,
Depth: int64(len(events) + 1),
})
- MustWriteEvents(t, db, []*gomatrixserverlib.HeaderedEvent{joinEvent})
+ MustWriteEvents(t, db, []*types.HeaderedEvent{joinEvent})
// Sync will return this for the prev_batch
from := topologyTokenBefore(t, db, joinEvent.EventID())
@@ -638,7 +639,7 @@ func TestInviteBehaviour(t *testing.T) {
StateKey: &testUserIDA,
Sender: "@inviteUser2:somewhere",
})
- for _, ev := range []*gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} {
+ for _, ev := range []*types.HeaderedEvent{inviteEvent1, inviteEvent2} {
_, err := db.AddInviteEvent(ctx, ev)
if err != nil {
t.Fatalf("Failed to AddInviteEvent: %s", err)
@@ -695,7 +696,7 @@ func assertInvitedToRooms(t *testing.T, res *types.Response, roomIDs []string) {
}
}
-func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*gomatrixserverlib.HeaderedEvent) {
+func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*types.HeaderedEvent) {
t.Helper()
if len(gots) != len(wants) {
t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants))
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 6384fb91..854292bd 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -35,11 +36,11 @@ type AccountData interface {
}
type Invites interface {
- InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
+ InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent) (streamPos types.StreamPosition, err error)
DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error)
// SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value
// for the room.
- SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, maxID types.StreamPosition, err error)
+ SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*rstypes.HeaderedEvent, retired map[string]*rstypes.HeaderedEvent, maxID types.StreamPosition, err error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
PurgeInvites(ctx context.Context, txn *sql.Tx, roomID string) error
}
@@ -59,7 +60,7 @@ type Events interface {
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
InsertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent,
+ event *rstypes.HeaderedEvent,
addState, removeState []string,
transactionID *api.TransactionID,
excludeFromSync bool,
@@ -70,16 +71,16 @@ type Events interface {
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *synctypes.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
- UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error
+ UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
- SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
- SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
- SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
+ SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, rstypes.HeaderedEvent, error)
+ SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error)
+ SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error)
PurgeEvents(ctx context.Context, txn *sql.Tx, roomID string) error
- ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error)
+ ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]rstypes.HeaderedEvent, error)
}
// Topology keeps track of the depths and stream positions for all events.
@@ -87,7 +88,7 @@ type Events interface {
type Topology interface {
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
- InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
+ InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
@@ -101,13 +102,13 @@ type Topology interface {
}
type CurrentRoomState interface {
- SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
+ SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error)
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
- UpsertRoomState(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
+ UpsertRoomState(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
// SelectCurrentState returns all the current state events for the given room.
- SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
+ SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error)
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
// SelectRoomIDsWithAnyMembership returns a map of all memberships for the given user.
@@ -191,7 +192,7 @@ type Receipts interface {
}
type Memberships interface {
- UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
+ UpsertMembership(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomID string) error
diff --git a/syncapi/storage/tables/memberships_test.go b/syncapi/storage/tables/memberships_test.go
index 2e8375c3..4afa2ac5 100644
--- a/syncapi/storage/tables/memberships_test.go
+++ b/syncapi/storage/tables/memberships_test.go
@@ -6,10 +6,10 @@ import (
"testing"
"time"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
@@ -47,7 +47,7 @@ func TestMembershipsTable(t *testing.T) {
room := test.NewRoom(t, alice)
// Create users
- var userEvents []*gomatrixserverlib.HeaderedEvent
+ var userEvents []*rstypes.HeaderedEvent
users := []string{alice.ID}
for _, x := range room.CurrentState() {
if x.StateKeyEquals(alice.ID) {
@@ -114,7 +114,7 @@ func testMembershipCount(t *testing.T, ctx context.Context, table tables.Members
})
}
-func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *gomatrixserverlib.HeaderedEvent, user *test.User, room *test.Room) {
+func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *rstypes.HeaderedEvent, user *test.User, room *test.Room) {
t.Run("upserting works as expected", func(t *testing.T) {
if err := table.UpsertMembership(ctx, nil, membershipEvent, 1, 1); err != nil {
t.Fatalf("failed to upsert membership: %s", err)
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 41c58481..723dd88f 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -8,6 +8,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -273,10 +274,21 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
recentStreamEvents := dbEvents[delta.RoomID].Events
limited := dbEvents[delta.RoomID].Limited
- recentEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
- snapshot.StreamEventsToEvents(device, recentStreamEvents),
+ hisVisMap := map[string]gomatrixserverlib.HistoryVisibility{}
+ for _, re := range recentStreamEvents {
+ hisVisMap[re.EventID()] = re.Visibility
+ }
+ recEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
+ toEvents(snapshot.StreamEventsToEvents(device, recentStreamEvents)),
gomatrixserverlib.TopologicalOrderByPrevEvents,
)
+ recentEvents := make([]*rstypes.HeaderedEvent, len(recEvents))
+ for i := range recEvents {
+ recentEvents[i] = &rstypes.HeaderedEvent{
+ Event: recEvents[i],
+ Visibility: hisVisMap[recEvents[i].EventID()],
+ }
+ }
// If we didn't return any events at all then don't bother doing anything else.
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
@@ -341,10 +353,21 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// Now that we've filtered the timeline, work out which state events are still
// left. Anything that appears in the filtered timeline will be removed from the
// "state" section and kept in "timeline".
- delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(
- removeDuplicates(delta.StateEvents, events),
+ hisVisMap = map[string]gomatrixserverlib.HistoryVisibility{}
+ for _, re := range delta.StateEvents {
+ hisVisMap[re.EventID()] = re.Visibility
+ }
+ sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
+ toEvents(removeDuplicates(delta.StateEvents, events)),
gomatrixserverlib.TopologicalOrderByAuthEvents,
)
+ delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents))
+ for i := range sEvents {
+ delta.StateEvents[i] = &rstypes.HeaderedEvent{
+ Event: sEvents[i],
+ Visibility: hisVisMap[sEvents[i].EventID()],
+ }
+ }
if len(delta.StateEvents) > 0 {
if last := delta.StateEvents[len(delta.StateEvents)-1]; last != nil {
@@ -407,8 +430,8 @@ func applyHistoryVisibilityFilter(
snapshot storage.DatabaseTransaction,
rsAPI roomserverAPI.SyncRoomserverAPI,
roomID, userID string,
- recentEvents []*gomatrixserverlib.HeaderedEvent,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ recentEvents []*rstypes.HeaderedEvent,
+) ([]*rstypes.HeaderedEvent, error) {
// We need to make sure we always include the latest state events, if they are in the timeline.
alwaysIncludeIDs := make(map[string]struct{})
var stateTypes []string
@@ -555,8 +578,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
ctx context.Context, snapshot storage.DatabaseTransaction, roomID string,
incremental, limited bool, stateFilter *synctypes.StateFilter,
device *userapi.Device,
- timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ timelineEvents, stateEvents []*rstypes.HeaderedEvent,
+) ([]*rstypes.HeaderedEvent, error) {
if len(timelineEvents) == 0 {
return stateEvents, nil
}
@@ -573,7 +596,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
}
}
// Preallocate with the same amount, even if it will end up with fewer values
- newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
+ newStateEvents := make([]*rstypes.HeaderedEvent, 0, len(stateEvents))
// Remove existing membership events we don't care about, e.g. users not in the timeline.events
for _, event := range stateEvents {
if event.Type() == spec.MRoomMember && event.StateKey() != nil {
@@ -633,7 +656,7 @@ func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapsho
return nil
}
-func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
+func removeDuplicates(stateEvents, recentEvents []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
@@ -654,3 +677,11 @@ func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEve
}
return stateEvents
}
+
+func toEvents(events []*rstypes.HeaderedEvent) []*gomatrixserverlib.Event {
+ result := make([]*gomatrixserverlib.Event, len(events))
+ for i := range events {
+ result[i] = events[i].Event
+ }
+ return result
+}
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index c120f882..bc766e66 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -19,6 +19,7 @@ import (
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -490,7 +491,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
- eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
+ eventsToSend = append([]*rstypes.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
@@ -523,7 +524,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
}
}
-func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []synctypes.ClientEvent) {
+func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *rstypes.HeaderedEvent, chunk []synctypes.ClientEvent) {
t.Helper()
if wantVisible {
for _, ev := range chunk {
@@ -1228,7 +1229,7 @@ func syncUntil(t *testing.T,
}
}
-func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
+func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*rstypes.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {
var addsStateIDs []string
diff --git a/syncapi/synctypes/clientevent.go b/syncapi/synctypes/clientevent.go
index 0bf04cdb..4b253f66 100644
--- a/syncapi/synctypes/clientevent.go
+++ b/syncapi/synctypes/clientevent.go
@@ -16,6 +16,7 @@
package synctypes
import (
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
)
@@ -56,7 +57,7 @@ func ToClientEvents(serverEvs []*gomatrixserverlib.Event, format ClientEventForm
}
// HeaderedToClientEvents converts headered server events to client events.
-func HeaderedToClientEvents(serverEvs []*gomatrixserverlib.HeaderedEvent, format ClientEventFormat) []ClientEvent {
+func HeaderedToClientEvents(serverEvs []*types.HeaderedEvent, format ClientEventFormat) []ClientEvent {
evs := make([]ClientEvent, 0, len(serverEvs))
for _, se := range serverEvs {
if se == nil {
@@ -86,6 +87,6 @@ func ToClientEvent(se *gomatrixserverlib.Event, format ClientEventFormat) Client
}
// HeaderedToClientEvent converts a single headered server event to a client event.
-func HeaderedToClientEvent(se *gomatrixserverlib.HeaderedEvent, format ClientEventFormat) ClientEvent {
+func HeaderedToClientEvent(se *types.HeaderedEvent, format ClientEventFormat) ClientEvent {
return ToClientEvent(se.Event, format)
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index ed17eb56..983bf48a 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -26,6 +26,7 @@ import (
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
)
@@ -38,7 +39,7 @@ var (
type StateDelta struct {
RoomID string
- StateEvents []*gomatrixserverlib.HeaderedEvent
+ StateEvents []*types.HeaderedEvent
NewlyJoined bool
Membership string
// The PDU stream position of the latest membership event for this user, if applicable.
@@ -59,7 +60,7 @@ func NewStreamPositionFromString(s string) (StreamPosition, error) {
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct {
- *gomatrixserverlib.HeaderedEvent
+ *types.HeaderedEvent
StreamPosition StreamPosition
TransactionID *api.TransactionID
ExcludeFromSync bool
@@ -538,7 +539,7 @@ type InviteResponse struct {
}
// NewInviteResponse creates an empty response with initialised arrays.
-func NewInviteResponse(event *gomatrixserverlib.HeaderedEvent) *InviteResponse {
+func NewInviteResponse(event *types.HeaderedEvent) *InviteResponse {
res := InviteResponse{}
res.InviteState.Events = []json.RawMessage{}
@@ -551,7 +552,7 @@ func NewInviteResponse(event *gomatrixserverlib.HeaderedEvent) *InviteResponse {
// Then we'll see if we can create a partial of the invite event itself.
// This is needed for clients to work out *who* sent the invite.
- inviteEvent := synctypes.ToClientEvent(event.Unwrap(), synctypes.FormatSync)
+ inviteEvent := synctypes.ToClientEvent(event.Event, synctypes.FormatSync)
inviteEvent.Unsigned = nil
if ev, err := json.Marshal(inviteEvent); err == nil {
res.InviteState.Events = append(res.InviteState.Events, ev)
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index a864c6f4..60ff02bd 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -5,6 +5,7 @@ import (
"reflect"
"testing"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -55,7 +56,7 @@ func TestNewInviteResponse(t *testing.T) {
t.Fatal(err)
}
- res := NewInviteResponse(ev.Headered(gomatrixserverlib.RoomVersionV5))
+ res := NewInviteResponse(&types.HeaderedEvent{Event: ev})
j, err := json.Marshal(res)
if err != nil {
t.Fatal(err)