diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2023-06-28 20:29:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-28 20:29:49 +0200 |
commit | 23cd7877a14bca5315467591cd47a7d51aec22ce (patch) | |
tree | 8176360a4366e7b4670839b6581b7ff728599a02 /syncapi | |
parent | 4722f12fab65f3247cd253825d86206bfbfc6f95 (diff) |
Add `MXIDMapping` for pseudoID rooms (#3112)
Add `MXIDMapping` on membership events when
creating/joining rooms.
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/roomserver.go | 72 | ||||
-rw-r--r-- | syncapi/routing/search_test.go | 1 | ||||
-rw-r--r-- | syncapi/storage/postgres/current_room_state_table.go | 4 | ||||
-rw-r--r-- | syncapi/storage/postgres/invites_table.go | 2 | ||||
-rw-r--r-- | syncapi/storage/postgres/memberships_table.go | 2 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 2 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/current_room_state_table.go | 4 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/invites_table.go | 2 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/memberships_table.go | 2 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/output_room_events_table.go | 2 | ||||
-rw-r--r-- | syncapi/storage/storage_test.go | 1 | ||||
-rw-r--r-- | syncapi/storage/tables/current_room_state_test.go | 8 | ||||
-rw-r--r-- | syncapi/storage/tables/memberships_test.go | 2 | ||||
-rw-r--r-- | syncapi/streams/stream_pdu.go | 131 |
14 files changed, 203 insertions, 32 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 90f9ff67..e6b5ddbb 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -256,16 +256,19 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } } - pduPos, err := s.db.WriteEvent( - ctx, - ev, - addsStateEvents, - msg.AddsStateEventIDs, - msg.RemovesStateEventIDs, - msg.TransactionID, - false, - msg.HistoryVisibility, - ) + validRoomID, err := spec.NewRoomID(ev.RoomID()) + if err != nil { + return err + } + + userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID()) + if err != nil { + return err + } + + ev.UserID = *userID + + pduPos, err := s.db.WriteEvent(ctx, ev, addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, false, msg.HistoryVisibility) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ @@ -315,16 +318,19 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( // hack but until we have some better strategy for dealing with // old events in the sync API, this should at least prevent us // from confusing clients into thinking they've joined/left rooms. - pduPos, err := s.db.WriteEvent( - ctx, - ev, - []*rstypes.HeaderedEvent{}, - []string{}, // adds no state - []string{}, // removes no state - nil, // no transaction - ev.StateKey() != nil, // exclude from sync?, - msg.HistoryVisibility, - ) + + validRoomID, err := spec.NewRoomID(ev.RoomID()) + if err != nil { + return err + } + + userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID()) + if err != nil { + return err + } + ev.UserID = *userID + + pduPos, err := s.db.WriteEvent(ctx, ev, []*rstypes.HeaderedEvent{}, []string{}, []string{}, nil, ev.StateKey() != nil, msg.HistoryVisibility) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ @@ -420,6 +426,8 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( return } + msg.Event.UserID = *userID + pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) if err != nil { sentry.CaptureException(err) @@ -537,6 +545,7 @@ func (s *OutputRoomEventConsumer) onPurgeRoom( } func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) (*rstypes.HeaderedEvent, error) { + event.StateKeyResolved = event.StateKey() if event.StateKey() == nil { return event, nil } @@ -556,6 +565,29 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) return event, err } + validRoomID, err := spec.NewRoomID(event.RoomID()) + if err != nil { + return event, err + } + + if event.StateKey() != nil { + if *event.StateKey() != "" { + var sku *spec.UserID + sku, err = s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, spec.SenderID(stateKey)) + if err == nil && sku != nil { + sKey := sku.String() + event.StateKeyResolved = &sKey + } + } + } + + userID, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, event.SenderID()) + if err != nil { + return event, err + } + + event.UserID = *userID + if prevEvent == nil || prevEvent.EventID() == event.EventID() { return event, nil } diff --git a/syncapi/routing/search_test.go b/syncapi/routing/search_test.go index f6d7fb4e..905a9a1a 100644 --- a/syncapi/routing/search_test.go +++ b/syncapi/routing/search_test.go @@ -230,6 +230,7 @@ func TestSearch(t *testing.T) { stateEvents = append(stateEvents, x) stateEventIDs = append(stateEventIDs, x.EventID()) } + x.StateKeyResolved = x.StateKey() sp, err = db.WriteEvent(processCtx.Context(), x, stateEvents, stateEventIDs, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared) assert.NoError(t, err) if x.Type() != "m.room.message" { diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index bfe5e9bd..112fa9d4 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -343,9 +343,9 @@ func (s *currentRoomStateStatements) UpsertRoomState( event.RoomID(), event.EventID(), event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, - *event.StateKey(), + *event.StateKeyResolved, headeredJSON, membership, addedAt, diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 267209bb..7b8d2d73 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -101,7 +101,7 @@ func (s *inviteEventsStatements) InsertInviteEvent( ctx, inviteEvent.RoomID(), inviteEvent.EventID(), - *inviteEvent.StateKey(), + inviteEvent.UserID.String(), headeredJSON, ).Scan(&streamPos) return diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go index 3905f9ab..09b47432 100644 --- a/syncapi/storage/postgres/memberships_table.go +++ b/syncapi/storage/postgres/memberships_table.go @@ -109,7 +109,7 @@ func (s *membershipsStatements) UpsertMembership( _, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext( ctx, event.RoomID(), - *event.StateKey(), + event.StateKeyResolved, membership, event.EventID(), streamPos, diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index e068afab..b58cf59f 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -407,7 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent( event.EventID(), headeredJSON, event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, pq.StringArray(addState), pq.StringArray(removeState), diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index e432e483..3bd19b36 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -342,9 +342,9 @@ func (s *currentRoomStateStatements) UpsertRoomState( event.RoomID(), event.EventID(), event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, - *event.StateKey(), + *event.StateKeyResolved, headeredJSON, membership, addedAt, diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 347523cf..7e0d895f 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -108,7 +108,7 @@ func (s *inviteEventsStatements) InsertInviteEvent( streamPos, inviteEvent.RoomID(), inviteEvent.EventID(), - *inviteEvent.StateKey(), + inviteEvent.UserID.String(), headeredJSON, ) return diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go index c09fa151..a9e880d2 100644 --- a/syncapi/storage/sqlite3/memberships_table.go +++ b/syncapi/storage/sqlite3/memberships_table.go @@ -112,7 +112,7 @@ func (s *membershipsStatements) UpsertMembership( _, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext( ctx, event.RoomID(), - *event.StateKey(), + event.StateKeyResolved, membership, event.EventID(), streamPos, diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 5a47aec4..06c65419 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -348,7 +348,7 @@ func (s *outputRoomEventsStatements) InsertEvent( event.EventID(), headeredJSON, event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, string(addStateJSON), string(removeStateJSON), diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index f56e44a3..f57b0d61 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -43,6 +43,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*rstypes.Header var addStateEventIDs []string var removeStateEventIDs []string if ev.StateKey() != nil { + ev.StateKeyResolved = ev.StateKey() addStateEvents = append(addStateEvents, ev) addStateEventIDs = append(addStateEventIDs, ev.EventID()) } diff --git a/syncapi/storage/tables/current_room_state_test.go b/syncapi/storage/tables/current_room_state_test.go index 7d4ec812..2df111a2 100644 --- a/syncapi/storage/tables/current_room_state_test.go +++ b/syncapi/storage/tables/current_room_state_test.go @@ -54,7 +54,13 @@ func TestCurrentRoomStateTable(t *testing.T) { events := room.CurrentState() err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error { for i, ev := range events { - err := tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i)) + ev.StateKeyResolved = ev.StateKey() + userID, err := spec.NewUserID(string(ev.SenderID()), true) + if err != nil { + return err + } + ev.UserID = *userID + err = tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i)) if err != nil { return fmt.Errorf("failed to UpsertRoomState: %w", err) } diff --git a/syncapi/storage/tables/memberships_test.go b/syncapi/storage/tables/memberships_test.go index 4afa2ac5..a421a977 100644 --- a/syncapi/storage/tables/memberships_test.go +++ b/syncapi/storage/tables/memberships_test.go @@ -80,6 +80,7 @@ func TestMembershipsTable(t *testing.T) { defer cancel() for _, ev := range userEvents { + ev.StateKeyResolved = ev.StateKey() if err := table.UpsertMembership(ctx, nil, ev, types.StreamPosition(ev.Depth()), 1); err != nil { t.Fatalf("failed to upsert membership: %s", err) } @@ -134,6 +135,7 @@ func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, mem ev := room.CreateAndInsert(t, user, spec.MRoomMember, map[string]interface{}{ "membership": spec.Join, }, test.WithStateKey(user.ID)) + ev.StateKeyResolved = ev.StateKey() // Insert the same event again, but with different positions, which should get updated if err = table.UpsertMembership(ctx, nil, ev, 2, 2); err != nil { t.Fatalf("failed to upsert membership: %s", err) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 7939dd8f..1a4e5351 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -3,6 +3,7 @@ package streams import ( "context" "database/sql" + "encoding/json" "fmt" "time" @@ -15,6 +16,8 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/gomatrixserverlib" @@ -346,13 +349,40 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // Now that we've filtered the timeline, work out which state events are still // left. Anything that appears in the filtered timeline will be removed from the // "state" section and kept in "timeline". + + // update the powerlevel event for timeline events + for i, ev := range events { + if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs { + continue + } + if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { + continue + } + var newEvent gomatrixserverlib.PDU + newEvent, err = p.updatePowerLevelEvent(ctx, ev) + if err != nil { + return r.From, err + } + events[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } + sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering( gomatrixserverlib.ToPDUs(removeDuplicates(delta.StateEvents, events)), gomatrixserverlib.TopologicalOrderByAuthEvents, ) delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents)) for i := range sEvents { - delta.StateEvents[i] = sEvents[i].(*rstypes.HeaderedEvent) + ev := sEvents[i] + delta.StateEvents[i] = ev.(*rstypes.HeaderedEvent) + // update the powerlevel event for state events + if ev.Version() == gomatrixserverlib.RoomVersionPseudoIDs && ev.Type() == spec.MRoomPowerLevels && ev.StateKeyEquals("") { + var newEvent gomatrixserverlib.PDU + newEvent, err = p.updatePowerLevelEvent(ctx, ev.(*rstypes.HeaderedEvent)) + if err != nil { + return r.From, err + } + delta.StateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } } if len(delta.StateEvents) > 0 { @@ -421,6 +451,75 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return latestPosition, nil } +func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent) (gomatrixserverlib.PDU, error) { + pls, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev) + if err != nil { + return nil, err + } + newPls := make(map[string]int64) + var userID *spec.UserID + for user, level := range pls.Users { + validRoomID, _ := spec.NewRoomID(ev.RoomID()) + userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) + if err != nil { + return nil, err + } + newPls[userID.String()] = level + } + var newPlBytes, newEv []byte + newPlBytes, err = json.Marshal(newPls) + if err != nil { + return nil, err + } + newEv, err = sjson.SetRawBytes(ev.JSON(), "content.users", newPlBytes) + if err != nil { + return nil, err + } + + // do the same for prev content + prevContent := gjson.GetBytes(ev.JSON(), "unsigned.prev_content") + if !prevContent.Exists() { + var evNew gomatrixserverlib.PDU + evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false) + if err != nil { + return nil, err + } + + return evNew, err + } + pls = gomatrixserverlib.PowerLevelContent{} + err = json.Unmarshal([]byte(prevContent.Raw), &pls) + if err != nil { + return nil, err + } + + newPls = make(map[string]int64) + for user, level := range pls.Users { + validRoomID, _ := spec.NewRoomID(ev.RoomID()) + userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) + if err != nil { + return nil, err + } + newPls[userID.String()] = level + } + newPlBytes, err = json.Marshal(newPls) + if err != nil { + return nil, err + } + newEv, err = sjson.SetRawBytes(newEv, "unsigned.prev_content.users", newPlBytes) + if err != nil { + return nil, err + } + + var evNew gomatrixserverlib.PDU + evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false) + if err != nil { + return nil, err + } + + return evNew, err +} + // applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make // sure we always return the required events in the timeline. func applyHistoryVisibilityFilter( @@ -470,6 +569,7 @@ func applyHistoryVisibilityFilter( return events, nil } +// nolint: gocyclo func (p *PDUStreamProvider) getJoinResponseForCompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, @@ -563,6 +663,35 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( prevBatch.Decrement() } + // Update powerlevel events for timeline events + for i, ev := range events { + if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs { + continue + } + if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { + continue + } + newEvent, err := p.updatePowerLevelEvent(ctx, ev) + if err != nil { + return nil, err + } + events[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } + // Update powerlevel events for state events + for i, ev := range stateEvents { + if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs { + continue + } + if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { + continue + } + newEvent, err := p.updatePowerLevelEvent(ctx, ev) + if err != nil { + return nil, err + } + stateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } + jr.Timeline.PrevBatch = prevBatch jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) |