aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-06-28 20:29:49 +0200
committerGitHub <noreply@github.com>2023-06-28 20:29:49 +0200
commit23cd7877a14bca5315467591cd47a7d51aec22ce (patch)
tree8176360a4366e7b4670839b6581b7ff728599a02 /syncapi
parent4722f12fab65f3247cd253825d86206bfbfc6f95 (diff)
Add `MXIDMapping` for pseudoID rooms (#3112)
Add `MXIDMapping` on membership events when creating/joining rooms.
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go72
-rw-r--r--syncapi/routing/search_test.go1
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go4
-rw-r--r--syncapi/storage/postgres/invites_table.go2
-rw-r--r--syncapi/storage/postgres/memberships_table.go2
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go2
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go4
-rw-r--r--syncapi/storage/sqlite3/invites_table.go2
-rw-r--r--syncapi/storage/sqlite3/memberships_table.go2
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go2
-rw-r--r--syncapi/storage/storage_test.go1
-rw-r--r--syncapi/storage/tables/current_room_state_test.go8
-rw-r--r--syncapi/storage/tables/memberships_test.go2
-rw-r--r--syncapi/streams/stream_pdu.go131
14 files changed, 203 insertions, 32 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 90f9ff67..e6b5ddbb 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -256,16 +256,19 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
}
- pduPos, err := s.db.WriteEvent(
- ctx,
- ev,
- addsStateEvents,
- msg.AddsStateEventIDs,
- msg.RemovesStateEventIDs,
- msg.TransactionID,
- false,
- msg.HistoryVisibility,
- )
+ validRoomID, err := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ return err
+ }
+
+ userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID())
+ if err != nil {
+ return err
+ }
+
+ ev.UserID = *userID
+
+ pduPos, err := s.db.WriteEvent(ctx, ev, addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, false, msg.HistoryVisibility)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@@ -315,16 +318,19 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
// hack but until we have some better strategy for dealing with
// old events in the sync API, this should at least prevent us
// from confusing clients into thinking they've joined/left rooms.
- pduPos, err := s.db.WriteEvent(
- ctx,
- ev,
- []*rstypes.HeaderedEvent{},
- []string{}, // adds no state
- []string{}, // removes no state
- nil, // no transaction
- ev.StateKey() != nil, // exclude from sync?,
- msg.HistoryVisibility,
- )
+
+ validRoomID, err := spec.NewRoomID(ev.RoomID())
+ if err != nil {
+ return err
+ }
+
+ userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID())
+ if err != nil {
+ return err
+ }
+ ev.UserID = *userID
+
+ pduPos, err := s.db.WriteEvent(ctx, ev, []*rstypes.HeaderedEvent{}, []string{}, []string{}, nil, ev.StateKey() != nil, msg.HistoryVisibility)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@@ -420,6 +426,8 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
return
}
+ msg.Event.UserID = *userID
+
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
sentry.CaptureException(err)
@@ -537,6 +545,7 @@ func (s *OutputRoomEventConsumer) onPurgeRoom(
}
func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) (*rstypes.HeaderedEvent, error) {
+ event.StateKeyResolved = event.StateKey()
if event.StateKey() == nil {
return event, nil
}
@@ -556,6 +565,29 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent)
return event, err
}
+ validRoomID, err := spec.NewRoomID(event.RoomID())
+ if err != nil {
+ return event, err
+ }
+
+ if event.StateKey() != nil {
+ if *event.StateKey() != "" {
+ var sku *spec.UserID
+ sku, err = s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, spec.SenderID(stateKey))
+ if err == nil && sku != nil {
+ sKey := sku.String()
+ event.StateKeyResolved = &sKey
+ }
+ }
+ }
+
+ userID, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, event.SenderID())
+ if err != nil {
+ return event, err
+ }
+
+ event.UserID = *userID
+
if prevEvent == nil || prevEvent.EventID() == event.EventID() {
return event, nil
}
diff --git a/syncapi/routing/search_test.go b/syncapi/routing/search_test.go
index f6d7fb4e..905a9a1a 100644
--- a/syncapi/routing/search_test.go
+++ b/syncapi/routing/search_test.go
@@ -230,6 +230,7 @@ func TestSearch(t *testing.T) {
stateEvents = append(stateEvents, x)
stateEventIDs = append(stateEventIDs, x.EventID())
}
+ x.StateKeyResolved = x.StateKey()
sp, err = db.WriteEvent(processCtx.Context(), x, stateEvents, stateEventIDs, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
assert.NoError(t, err)
if x.Type() != "m.room.message" {
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index bfe5e9bd..112fa9d4 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -343,9 +343,9 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
- event.SenderID(),
+ event.UserID.String(),
containsURL,
- *event.StateKey(),
+ *event.StateKeyResolved,
headeredJSON,
membership,
addedAt,
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index 267209bb..7b8d2d73 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -101,7 +101,7 @@ func (s *inviteEventsStatements) InsertInviteEvent(
ctx,
inviteEvent.RoomID(),
inviteEvent.EventID(),
- *inviteEvent.StateKey(),
+ inviteEvent.UserID.String(),
headeredJSON,
).Scan(&streamPos)
return
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index 3905f9ab..09b47432 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -109,7 +109,7 @@ func (s *membershipsStatements) UpsertMembership(
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
- *event.StateKey(),
+ event.StateKeyResolved,
membership,
event.EventID(),
streamPos,
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index e068afab..b58cf59f 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -407,7 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
- event.SenderID(),
+ event.UserID.String(),
containsURL,
pq.StringArray(addState),
pq.StringArray(removeState),
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index e432e483..3bd19b36 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -342,9 +342,9 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
- event.SenderID(),
+ event.UserID.String(),
containsURL,
- *event.StateKey(),
+ *event.StateKeyResolved,
headeredJSON,
membership,
addedAt,
diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go
index 347523cf..7e0d895f 100644
--- a/syncapi/storage/sqlite3/invites_table.go
+++ b/syncapi/storage/sqlite3/invites_table.go
@@ -108,7 +108,7 @@ func (s *inviteEventsStatements) InsertInviteEvent(
streamPos,
inviteEvent.RoomID(),
inviteEvent.EventID(),
- *inviteEvent.StateKey(),
+ inviteEvent.UserID.String(),
headeredJSON,
)
return
diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go
index c09fa151..a9e880d2 100644
--- a/syncapi/storage/sqlite3/memberships_table.go
+++ b/syncapi/storage/sqlite3/memberships_table.go
@@ -112,7 +112,7 @@ func (s *membershipsStatements) UpsertMembership(
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
- *event.StateKey(),
+ event.StateKeyResolved,
membership,
event.EventID(),
streamPos,
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index 5a47aec4..06c65419 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -348,7 +348,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
- event.SenderID(),
+ event.UserID.String(),
containsURL,
string(addStateJSON),
string(removeStateJSON),
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index f56e44a3..f57b0d61 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -43,6 +43,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*rstypes.Header
var addStateEventIDs []string
var removeStateEventIDs []string
if ev.StateKey() != nil {
+ ev.StateKeyResolved = ev.StateKey()
addStateEvents = append(addStateEvents, ev)
addStateEventIDs = append(addStateEventIDs, ev.EventID())
}
diff --git a/syncapi/storage/tables/current_room_state_test.go b/syncapi/storage/tables/current_room_state_test.go
index 7d4ec812..2df111a2 100644
--- a/syncapi/storage/tables/current_room_state_test.go
+++ b/syncapi/storage/tables/current_room_state_test.go
@@ -54,7 +54,13 @@ func TestCurrentRoomStateTable(t *testing.T) {
events := room.CurrentState()
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
for i, ev := range events {
- err := tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i))
+ ev.StateKeyResolved = ev.StateKey()
+ userID, err := spec.NewUserID(string(ev.SenderID()), true)
+ if err != nil {
+ return err
+ }
+ ev.UserID = *userID
+ err = tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i))
if err != nil {
return fmt.Errorf("failed to UpsertRoomState: %w", err)
}
diff --git a/syncapi/storage/tables/memberships_test.go b/syncapi/storage/tables/memberships_test.go
index 4afa2ac5..a421a977 100644
--- a/syncapi/storage/tables/memberships_test.go
+++ b/syncapi/storage/tables/memberships_test.go
@@ -80,6 +80,7 @@ func TestMembershipsTable(t *testing.T) {
defer cancel()
for _, ev := range userEvents {
+ ev.StateKeyResolved = ev.StateKey()
if err := table.UpsertMembership(ctx, nil, ev, types.StreamPosition(ev.Depth()), 1); err != nil {
t.Fatalf("failed to upsert membership: %s", err)
}
@@ -134,6 +135,7 @@ func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, mem
ev := room.CreateAndInsert(t, user, spec.MRoomMember, map[string]interface{}{
"membership": spec.Join,
}, test.WithStateKey(user.ID))
+ ev.StateKeyResolved = ev.StateKey()
// Insert the same event again, but with different positions, which should get updated
if err = table.UpsertMembership(ctx, nil, ev, 2, 2); err != nil {
t.Fatalf("failed to upsert membership: %s", err)
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 7939dd8f..1a4e5351 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -3,6 +3,7 @@ package streams
import (
"context"
"database/sql"
+ "encoding/json"
"fmt"
"time"
@@ -15,6 +16,8 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
+ "github.com/tidwall/gjson"
+ "github.com/tidwall/sjson"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/gomatrixserverlib"
@@ -346,13 +349,40 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// Now that we've filtered the timeline, work out which state events are still
// left. Anything that appears in the filtered timeline will be removed from the
// "state" section and kept in "timeline".
+
+ // update the powerlevel event for timeline events
+ for i, ev := range events {
+ if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs {
+ continue
+ }
+ if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
+ continue
+ }
+ var newEvent gomatrixserverlib.PDU
+ newEvent, err = p.updatePowerLevelEvent(ctx, ev)
+ if err != nil {
+ return r.From, err
+ }
+ events[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
+
sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
gomatrixserverlib.ToPDUs(removeDuplicates(delta.StateEvents, events)),
gomatrixserverlib.TopologicalOrderByAuthEvents,
)
delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents))
for i := range sEvents {
- delta.StateEvents[i] = sEvents[i].(*rstypes.HeaderedEvent)
+ ev := sEvents[i]
+ delta.StateEvents[i] = ev.(*rstypes.HeaderedEvent)
+ // update the powerlevel event for state events
+ if ev.Version() == gomatrixserverlib.RoomVersionPseudoIDs && ev.Type() == spec.MRoomPowerLevels && ev.StateKeyEquals("") {
+ var newEvent gomatrixserverlib.PDU
+ newEvent, err = p.updatePowerLevelEvent(ctx, ev.(*rstypes.HeaderedEvent))
+ if err != nil {
+ return r.From, err
+ }
+ delta.StateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
}
if len(delta.StateEvents) > 0 {
@@ -421,6 +451,75 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}
+func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent) (gomatrixserverlib.PDU, error) {
+ pls, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev)
+ if err != nil {
+ return nil, err
+ }
+ newPls := make(map[string]int64)
+ var userID *spec.UserID
+ for user, level := range pls.Users {
+ validRoomID, _ := spec.NewRoomID(ev.RoomID())
+ userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
+ if err != nil {
+ return nil, err
+ }
+ newPls[userID.String()] = level
+ }
+ var newPlBytes, newEv []byte
+ newPlBytes, err = json.Marshal(newPls)
+ if err != nil {
+ return nil, err
+ }
+ newEv, err = sjson.SetRawBytes(ev.JSON(), "content.users", newPlBytes)
+ if err != nil {
+ return nil, err
+ }
+
+ // do the same for prev content
+ prevContent := gjson.GetBytes(ev.JSON(), "unsigned.prev_content")
+ if !prevContent.Exists() {
+ var evNew gomatrixserverlib.PDU
+ evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false)
+ if err != nil {
+ return nil, err
+ }
+
+ return evNew, err
+ }
+ pls = gomatrixserverlib.PowerLevelContent{}
+ err = json.Unmarshal([]byte(prevContent.Raw), &pls)
+ if err != nil {
+ return nil, err
+ }
+
+ newPls = make(map[string]int64)
+ for user, level := range pls.Users {
+ validRoomID, _ := spec.NewRoomID(ev.RoomID())
+ userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
+ if err != nil {
+ return nil, err
+ }
+ newPls[userID.String()] = level
+ }
+ newPlBytes, err = json.Marshal(newPls)
+ if err != nil {
+ return nil, err
+ }
+ newEv, err = sjson.SetRawBytes(newEv, "unsigned.prev_content.users", newPlBytes)
+ if err != nil {
+ return nil, err
+ }
+
+ var evNew gomatrixserverlib.PDU
+ evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false)
+ if err != nil {
+ return nil, err
+ }
+
+ return evNew, err
+}
+
// applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make
// sure we always return the required events in the timeline.
func applyHistoryVisibilityFilter(
@@ -470,6 +569,7 @@ func applyHistoryVisibilityFilter(
return events, nil
}
+// nolint: gocyclo
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
snapshot storage.DatabaseTransaction,
@@ -563,6 +663,35 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
prevBatch.Decrement()
}
+ // Update powerlevel events for timeline events
+ for i, ev := range events {
+ if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs {
+ continue
+ }
+ if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
+ continue
+ }
+ newEvent, err := p.updatePowerLevelEvent(ctx, ev)
+ if err != nil {
+ return nil, err
+ }
+ events[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
+ // Update powerlevel events for state events
+ for i, ev := range stateEvents {
+ if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs {
+ continue
+ }
+ if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
+ continue
+ }
+ newEvent, err := p.updatePowerLevelEvent(ctx, ev)
+ if err != nil {
+ return nil, err
+ }
+ stateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
+
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)