aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-06-14 14:23:46 +0000
committerGitHub <noreply@github.com>2023-06-14 14:23:46 +0000
commite4665979bfbe006368d55189f074e456fe19b198 (patch)
treee909d694a022478d0dbe3cc58ee8a2dc289bc969 /syncapi
parent7a2e325d1014d76188b47a011730a42443f3c174 (diff)
Merge SenderID & Per Room User Key work (#3109)
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go23
-rw-r--r--syncapi/internal/history_visibility.go6
-rw-r--r--syncapi/internal/keychange.go12
-rw-r--r--syncapi/internal/keychange_test.go2
-rw-r--r--syncapi/notifier/notifier.go9
-rw-r--r--syncapi/notifier/notifier_test.go2
-rw-r--r--syncapi/routing/context.go10
-rw-r--r--syncapi/routing/getevent.go18
-rw-r--r--syncapi/routing/memberships.go12
-rw-r--r--syncapi/routing/messages.go4
-rw-r--r--syncapi/routing/relations.go9
-rw-r--r--syncapi/routing/search.go24
-rw-r--r--syncapi/routing/search_test.go2
-rw-r--r--syncapi/storage/shared/storage_consumer.go15
-rw-r--r--syncapi/streams/stream_invite.go8
-rw-r--r--syncapi/streams/stream_pdu.go24
-rw-r--r--syncapi/syncapi_test.go2
-rw-r--r--syncapi/synctypes/clientevent.go16
18 files changed, 149 insertions, 49 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index c5f2db9c..d468dfc9 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -377,7 +377,11 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *rst
return sp, fmt.Errorf("unexpected nil state_key")
}
- userID, err := s.rsAPI.QueryUserIDForSender(ctx, ev.RoomID(), spec.SenderID(*ev.StateKey()))
+ validRoomID, err := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ return sp, err
+ }
+ userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(*ev.StateKey()))
if err != nil || userID == nil {
return sp, fmt.Errorf("failed getting userID for sender: %w", err)
}
@@ -404,7 +408,11 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
return
}
- userID, err := s.rsAPI.QueryUserIDForSender(ctx, msg.Event.RoomID(), spec.SenderID(*msg.Event.StateKey()))
+ validRoomID, err := spec.NewRoomID(msg.Event.RoomID())
+ if err != nil {
+ return
+ }
+ userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(*msg.Event.StateKey()))
if err != nil || userID == nil {
return
}
@@ -454,7 +462,16 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
// Notify any active sync requests that the invite has been retired.
s.inviteStream.Advance(pduPos)
- userID, err := s.rsAPI.QueryUserIDForSender(ctx, msg.RoomID, msg.TargetSenderID)
+ validRoomID, err := spec.NewRoomID(msg.RoomID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "event_id": msg.EventID,
+ "room_id": msg.RoomID,
+ log.ErrorKey: err,
+ }).Errorf("roomID is invalid")
+ return
+ }
+ userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, msg.TargetSenderID)
if err != nil || userID == nil {
log.WithFields(log.Fields{
"event_id": msg.EventID,
diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go
index ab1a7f83..ce6846ca 100644
--- a/syncapi/internal/history_visibility.go
+++ b/syncapi/internal/history_visibility.go
@@ -139,7 +139,11 @@ func ApplyHistoryVisibilityFilter(
if err != nil {
return nil, err
}
- senderID, err := rsAPI.QuerySenderIDForUser(ctx, ev.RoomID(), *user)
+ roomID, err := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ return nil, err
+ }
+ senderID, err := rsAPI.QuerySenderIDForUser(ctx, *roomID, *user)
if err == nil {
if ev.Type() == spec.MRoomMember && ev.StateKeyEquals(string(senderID)) {
eventsFiltered = append(eventsFiltered, ev)
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index f4b6ace5..24ffcc04 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -170,11 +170,15 @@ func TrackChangedUsers(
return nil, nil, err
}
for roomID, state := range stateRes.Rooms {
+ validRoomID, roomErr := spec.NewRoomID(roomID)
+ if roomErr != nil {
+ continue
+ }
for tuple, membership := range state {
if membership != spec.Join {
continue
}
- user, queryErr := rsAPI.QueryUserIDForSender(ctx, roomID, spec.SenderID(tuple.StateKey))
+ user, queryErr := rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(tuple.StateKey))
if queryErr != nil || user == nil {
continue
}
@@ -216,13 +220,17 @@ func TrackChangedUsers(
return nil, left, err
}
for roomID, state := range stateRes.Rooms {
+ validRoomID, err := spec.NewRoomID(roomID)
+ if err != nil {
+ continue
+ }
for tuple, membership := range state {
if membership != spec.Join {
continue
}
// new user who we weren't previously sharing rooms with
if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
- user, err := rsAPI.QueryUserIDForSender(ctx, roomID, spec.SenderID(tuple.StateKey))
+ user, err := rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(tuple.StateKey))
if err != nil || user == nil {
continue
}
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index efa64147..3f5e990c 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -64,7 +64,7 @@ type mockRoomserverAPI struct {
roomIDToJoinedMembers map[string][]string
}
-func (s *mockRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+func (s *mockRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return spec.NewUserID(string(senderID), true)
}
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go
index 4ee7c860..af8ab010 100644
--- a/syncapi/notifier/notifier.go
+++ b/syncapi/notifier/notifier.go
@@ -101,13 +101,20 @@ func (n *Notifier) OnNewEvent(
n._removeEmptyUserStreams()
if ev != nil {
+ validRoomID, err := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
+ "Notifier.OnNewEvent: RoomID is invalid",
+ )
+ return
+ }
// Map this event's room_id to a list of joined users, and wake them up.
usersToNotify := n._joinedUsers(ev.RoomID())
// Map this event's room_id to a list of peeking devices, and wake them up.
peekingDevicesToNotify := n._peekingDevices(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
- targetUserID, err := n.rsAPI.QueryUserIDForSender(context.Background(), ev.RoomID(), spec.SenderID(*ev.StateKey()))
+ targetUserID, err := n.rsAPI.QueryUserIDForSender(context.Background(), *validRoomID, spec.SenderID(*ev.StateKey()))
if err != nil {
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
"Notifier.OnNewEvent: Failed to find the userID for this event",
diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go
index 7076f713..f86301a0 100644
--- a/syncapi/notifier/notifier_test.go
+++ b/syncapi/notifier/notifier_test.go
@@ -109,7 +109,7 @@ func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
type TestRoomServer struct{ api.SyncRoomserverAPI }
-func (t *TestRoomServer) QueryUserIDForSender(ctx context.Context, roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+func (t *TestRoomServer) QueryUserIDForSender(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return spec.NewUserID(string(senderID), true)
}
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index 55fd3c5a..649d77b4 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -200,10 +200,10 @@ func Context(
}
}
- eventsBeforeClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBeforeFiltered), synctypes.FormatAll, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ eventsBeforeClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBeforeFiltered), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
- eventsAfterClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfterFiltered), synctypes.FormatAll, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ eventsAfterClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfterFiltered), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
@@ -211,7 +211,7 @@ func Context(
if filter.LazyLoadMembers {
allEvents := append(eventsBeforeFiltered, eventsAfterFiltered...)
allEvents = append(allEvents, &requestedEvent)
- evs := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(allEvents), synctypes.FormatAll, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ evs := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(allEvents), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
newState, err = applyLazyLoadMembers(ctx, device, snapshot, roomID, evs, lazyLoadCache)
@@ -224,14 +224,14 @@ func Context(
}
}
- ev := synctypes.ToClientEventDefault(func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ ev := synctypes.ToClientEventDefault(func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
}, requestedEvent)
response := ContextRespsonse{
Event: &ev,
EventsAfter: eventsAfterClient,
EventsBefore: eventsBeforeClient,
- State: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(newState), synctypes.FormatAll, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ State: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(newState), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
}),
}
diff --git a/syncapi/routing/getevent.go b/syncapi/routing/getevent.go
index de790e5c..09c2aef0 100644
--- a/syncapi/routing/getevent.go
+++ b/syncapi/routing/getevent.go
@@ -102,14 +102,28 @@ func GetEvent(
}
sender := spec.UserID{}
- senderUserID, err := rsAPI.QueryUserIDForSender(req.Context(), roomID, events[0].SenderID())
+ validRoomID, err := spec.NewRoomID(roomID)
+ if err != nil {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: spec.BadJSON("roomID is invalid"),
+ }
+ }
+ senderUserID, err := rsAPI.QueryUserIDForSender(req.Context(), *validRoomID, events[0].SenderID())
if err == nil && senderUserID != nil {
sender = *senderUserID
}
sk := events[0].StateKey()
if sk != nil && *sk != "" {
- skUserID, err := rsAPI.QueryUserIDForSender(ctx, events[0].RoomID(), spec.SenderID(*events[0].StateKey()))
+ evRoomID, err := spec.NewRoomID(events[0].RoomID())
+ if err != nil {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: spec.BadJSON("roomID is invalid"),
+ }
+ }
+ skUserID, err := rsAPI.QueryUserIDForSender(ctx, *evRoomID, spec.SenderID(*events[0].StateKey()))
if err == nil && skUserID != nil {
skString := skUserID.String()
sk = &skString
diff --git a/syncapi/routing/memberships.go b/syncapi/routing/memberships.go
index cf6769ba..5e5d0125 100644
--- a/syncapi/routing/memberships.go
+++ b/syncapi/routing/memberships.go
@@ -152,7 +152,15 @@ func GetMemberships(
}
}
- userID, err := rsAPI.QueryUserIDForSender(req.Context(), ev.RoomID(), ev.SenderID())
+ validRoomID, err := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("roomID is invalid")
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: spec.InternalServerError{},
+ }
+ }
+ userID, err := rsAPI.QueryUserIDForSender(req.Context(), *validRoomID, ev.SenderID())
if err != nil || userID == nil {
util.GetLogger(req.Context()).WithError(err).Error("rsAPI.QueryUserIDForSender failed")
return util.JSONResponse{
@@ -175,7 +183,7 @@ func GetMemberships(
}
return util.JSONResponse{
Code: http.StatusOK,
- JSON: getMembershipResponse{synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(result), synctypes.FormatAll, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ JSON: getMembershipResponse{synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(result), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
})},
}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 6784a27b..937e20ad 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -273,7 +273,7 @@ func OnIncomingMessagesRequest(
JSON: spec.InternalServerError{},
}
}
- res.State = append(res.State, synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(membershipEvents), synctypes.FormatAll, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ res.State = append(res.State, synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(membershipEvents), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
})...)
}
@@ -389,7 +389,7 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv
"events_before": len(events),
"events_after": len(filteredEvents),
}).Debug("applied history visibility (messages)")
- return synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(filteredEvents), synctypes.FormatAll, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ return synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(filteredEvents), synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
}), start, end, err
}
diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go
index 6efa065a..17933b2f 100644
--- a/syncapi/routing/relations.go
+++ b/syncapi/routing/relations.go
@@ -110,19 +110,24 @@ func Relations(
return util.ErrorResponse(err)
}
+ validRoomID, err := spec.NewRoomID(roomID)
+ if err != nil {
+ return util.ErrorResponse(err)
+ }
+
// Convert the events into client events, and optionally filter based on the event
// type if it was specified.
res.Chunk = make([]synctypes.ClientEvent, 0, len(filteredEvents))
for _, event := range filteredEvents {
sender := spec.UserID{}
- userID, err := rsAPI.QueryUserIDForSender(req.Context(), event.RoomID(), event.SenderID())
+ userID, err := rsAPI.QueryUserIDForSender(req.Context(), *validRoomID, event.SenderID())
if err == nil && userID != nil {
sender = *userID
}
sk := event.StateKey()
if sk != nil && *sk != "" {
- skUserID, err := rsAPI.QueryUserIDForSender(req.Context(), event.RoomID(), spec.SenderID(*event.StateKey()))
+ skUserID, err := rsAPI.QueryUserIDForSender(req.Context(), *validRoomID, spec.SenderID(*event.StateKey()))
if err == nil && skUserID != nil {
skString := skUserID.String()
sk = &skString
diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go
index 7d9182f4..d892b604 100644
--- a/syncapi/routing/search.go
+++ b/syncapi/routing/search.go
@@ -205,9 +205,14 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
profileInfos := make(map[string]ProfileInfoResponse)
for _, ev := range append(eventsBefore, eventsAfter...) {
- userID, queryErr := rsAPI.QueryUserIDForSender(req.Context(), ev.RoomID(), ev.SenderID())
+ validRoomID, roomErr := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ logrus.WithError(roomErr).WithField("room_id", ev.RoomID()).Warn("failed to query userprofile")
+ continue
+ }
+ userID, queryErr := rsAPI.QueryUserIDForSender(req.Context(), *validRoomID, ev.SenderID())
if queryErr != nil {
- logrus.WithError(queryErr).WithField("sender_id", event.SenderID()).Warn("failed to query userprofile")
+ logrus.WithError(queryErr).WithField("sender_id", ev.SenderID()).Warn("failed to query userprofile")
continue
}
@@ -231,14 +236,19 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
}
sender := spec.UserID{}
- userID, err := rsAPI.QueryUserIDForSender(req.Context(), event.RoomID(), event.SenderID())
+ validRoomID, roomErr := spec.NewRoomID(event.RoomID())
+ if err != nil {
+ logrus.WithError(roomErr).WithField("room_id", event.RoomID()).Warn("failed to query userprofile")
+ continue
+ }
+ userID, err := rsAPI.QueryUserIDForSender(req.Context(), *validRoomID, event.SenderID())
if err == nil && userID != nil {
sender = *userID
}
sk := event.StateKey()
if sk != nil && *sk != "" {
- skUserID, err := rsAPI.QueryUserIDForSender(req.Context(), event.RoomID(), spec.SenderID(*event.StateKey()))
+ skUserID, err := rsAPI.QueryUserIDForSender(req.Context(), *validRoomID, spec.SenderID(*event.StateKey()))
if err == nil && skUserID != nil {
skString := skUserID.String()
sk = &skString
@@ -248,10 +258,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
Context: SearchContextResponse{
Start: startToken.String(),
End: endToken.String(),
- EventsAfter: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfter), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ EventsAfter: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfter), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
}),
- EventsBefore: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBefore), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ EventsBefore: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBefore), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
}),
ProfileInfo: profileInfos,
@@ -272,7 +282,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
JSON: spec.InternalServerError{},
}
}
- stateForRooms[event.RoomID()] = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(state), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ stateForRooms[event.RoomID()] = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(state), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
})
}
diff --git a/syncapi/routing/search_test.go b/syncapi/routing/search_test.go
index 5eb094ca..f6d7fb4e 100644
--- a/syncapi/routing/search_test.go
+++ b/syncapi/routing/search_test.go
@@ -25,7 +25,7 @@ import (
type FakeSyncRoomserverAPI struct{ rsapi.SyncRoomserverAPI }
-func (f *FakeSyncRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+func (f *FakeSyncRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return spec.NewUserID(string(senderID), true)
}
diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go
index 799e3d16..1827218b 100644
--- a/syncapi/storage/shared/storage_consumer.go
+++ b/syncapi/storage/shared/storage_consumer.go
@@ -114,7 +114,14 @@ func (d *Database) StreamEventsToEvents(ctx context.Context, device *userapi.Dev
}).WithError(err).Warnf("Failed to add transaction ID to event")
continue
}
- deviceSenderID, err := rsAPI.QuerySenderIDForUser(ctx, in[i].RoomID(), *userID)
+ roomID, err := spec.NewRoomID(in[i].RoomID())
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ "event_id": out[i].EventID(),
+ }).WithError(err).Warnf("Room ID is invalid")
+ continue
+ }
+ deviceSenderID, err := rsAPI.QuerySenderIDForUser(ctx, *roomID, *userID)
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
@@ -515,7 +522,11 @@ func getMembershipFromEvent(ctx context.Context, ev gomatrixserverlib.PDU, userI
if err != nil {
return "", ""
}
- senderID, err := rsAPI.QuerySenderIDForUser(ctx, ev.RoomID(), *fullUser)
+ roomID, err := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ return "", ""
+ }
+ senderID, err := rsAPI.QuerySenderIDForUser(ctx, *roomID, *fullUser)
if err != nil {
return "", ""
}
diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go
index 3a5badd9..7c29d84a 100644
--- a/syncapi/streams/stream_invite.go
+++ b/syncapi/streams/stream_invite.go
@@ -65,14 +65,18 @@ func (p *InviteStreamProvider) IncrementalSync(
for roomID, inviteEvent := range invites {
user := spec.UserID{}
- sender, err := p.rsAPI.QueryUserIDForSender(ctx, inviteEvent.RoomID(), inviteEvent.SenderID())
+ validRoomID, err := spec.NewRoomID(inviteEvent.RoomID())
+ if err != nil {
+ continue
+ }
+ sender, err := p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, inviteEvent.SenderID())
if err == nil && sender != nil {
user = *sender
}
sk := inviteEvent.StateKey()
if sk != nil && *sk != "" {
- skUserID, err := p.rsAPI.QueryUserIDForSender(ctx, inviteEvent.RoomID(), spec.SenderID(*inviteEvent.StateKey()))
+ skUserID, err := p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(*inviteEvent.StateKey()))
if err == nil && skUserID != nil {
skString := skUserID.String()
sk = &skString
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index f728d4ae..7939dd8f 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -376,13 +376,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
jr.Timeline.PrevBatch = &prevBatch
- jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
- jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Join[delta.RoomID] = jr
@@ -391,11 +391,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
// TODO: Apply history visibility on peeked rooms
- jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
jr.Timeline.Limited = limited
- jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Peek[delta.RoomID] = jr
@@ -406,13 +406,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case spec.Ban:
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
- lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
- lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Leave[delta.RoomID] = lr
@@ -564,13 +564,13 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
jr.Timeline.PrevBatch = prevBatch
- jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
- jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync, func(roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+ jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
return jr, nil
@@ -585,6 +585,10 @@ func (p *PDUStreamProvider) lazyLoadMembers(
if len(timelineEvents) == 0 {
return stateEvents, nil
}
+ validRoomID, err := spec.NewRoomID(roomID)
+ if err != nil {
+ return nil, err
+ }
// Work out which memberships to include
timelineUsers := make(map[string]struct{})
if !incremental {
@@ -606,8 +610,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
isGappedIncremental := limited && incremental
// We want this users membership event, keep it in the list
userID := ""
- stateKeyUserID, err := p.rsAPI.QueryUserIDForSender(ctx, roomID, spec.SenderID(*event.StateKey()))
- if err == nil && stateKeyUserID != nil {
+ stateKeyUserID, queryErr := p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(*event.StateKey()))
+ if queryErr == nil && stateKeyUserID != nil {
userID = stateKeyUserID.String()
}
if _, ok := timelineUsers[userID]; ok || isGappedIncremental || userID == device.UserID {
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index b9f13c51..19815b79 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -40,7 +40,7 @@ type syncRoomserverAPI struct {
rooms []*test.Room
}
-func (s *syncRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID string, senderID spec.SenderID) (*spec.UserID, error) {
+func (s *syncRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return spec.NewUserID(string(senderID), true)
}
diff --git a/syncapi/synctypes/clientevent.go b/syncapi/synctypes/clientevent.go
index 433be39f..6f03d9ff 100644
--- a/syncapi/synctypes/clientevent.go
+++ b/syncapi/synctypes/clientevent.go
@@ -52,14 +52,18 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat,
continue // TODO: shouldn't happen?
}
sender := spec.UserID{}
- userID, err := userIDForSender(se.RoomID(), se.SenderID())
+ validRoomID, err := spec.NewRoomID(se.RoomID())
+ if err != nil {
+ continue
+ }
+ userID, err := userIDForSender(*validRoomID, se.SenderID())
if err == nil && userID != nil {
sender = *userID
}
sk := se.StateKey()
if sk != nil && *sk != "" {
- skUserID, err := userIDForSender(se.RoomID(), spec.SenderID(*sk))
+ skUserID, err := userIDForSender(*validRoomID, spec.SenderID(*sk))
if err == nil && skUserID != nil {
skString := skUserID.String()
sk = &skString
@@ -95,14 +99,18 @@ func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender sp
// It provides default logic for event.SenderID & event.StateKey -> userID conversions.
func ToClientEventDefault(userIDQuery spec.UserIDForSender, event gomatrixserverlib.PDU) ClientEvent {
sender := spec.UserID{}
- userID, err := userIDQuery(event.RoomID(), event.SenderID())
+ validRoomID, err := spec.NewRoomID(event.RoomID())
+ if err != nil {
+ return ClientEvent{}
+ }
+ userID, err := userIDQuery(*validRoomID, event.SenderID())
if err == nil && userID != nil {
sender = *userID
}
sk := event.StateKey()
if sk != nil && *sk != "" {
- skUserID, err := userIDQuery(event.RoomID(), spec.SenderID(*event.StateKey()))
+ skUserID, err := userIDQuery(*validRoomID, spec.SenderID(*event.StateKey()))
if err == nil && skUserID != nil {
skString := skUserID.String()
sk = &skString