aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-06-28 20:29:49 +0200
committerGitHub <noreply@github.com>2023-06-28 20:29:49 +0200
commit23cd7877a14bca5315467591cd47a7d51aec22ce (patch)
tree8176360a4366e7b4670839b6581b7ff728599a02 /syncapi/streams
parent4722f12fab65f3247cd253825d86206bfbfc6f95 (diff)
Add `MXIDMapping` for pseudoID rooms (#3112)
Add `MXIDMapping` on membership events when creating/joining rooms.
Diffstat (limited to 'syncapi/streams')
-rw-r--r--syncapi/streams/stream_pdu.go131
1 files changed, 130 insertions, 1 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 7939dd8f..1a4e5351 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -3,6 +3,7 @@ package streams
import (
"context"
"database/sql"
+ "encoding/json"
"fmt"
"time"
@@ -15,6 +16,8 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
+ "github.com/tidwall/gjson"
+ "github.com/tidwall/sjson"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/gomatrixserverlib"
@@ -346,13 +349,40 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// Now that we've filtered the timeline, work out which state events are still
// left. Anything that appears in the filtered timeline will be removed from the
// "state" section and kept in "timeline".
+
+ // update the powerlevel event for timeline events
+ for i, ev := range events {
+ if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs {
+ continue
+ }
+ if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
+ continue
+ }
+ var newEvent gomatrixserverlib.PDU
+ newEvent, err = p.updatePowerLevelEvent(ctx, ev)
+ if err != nil {
+ return r.From, err
+ }
+ events[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
+
sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
gomatrixserverlib.ToPDUs(removeDuplicates(delta.StateEvents, events)),
gomatrixserverlib.TopologicalOrderByAuthEvents,
)
delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents))
for i := range sEvents {
- delta.StateEvents[i] = sEvents[i].(*rstypes.HeaderedEvent)
+ ev := sEvents[i]
+ delta.StateEvents[i] = ev.(*rstypes.HeaderedEvent)
+ // update the powerlevel event for state events
+ if ev.Version() == gomatrixserverlib.RoomVersionPseudoIDs && ev.Type() == spec.MRoomPowerLevels && ev.StateKeyEquals("") {
+ var newEvent gomatrixserverlib.PDU
+ newEvent, err = p.updatePowerLevelEvent(ctx, ev.(*rstypes.HeaderedEvent))
+ if err != nil {
+ return r.From, err
+ }
+ delta.StateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
}
if len(delta.StateEvents) > 0 {
@@ -421,6 +451,75 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}
+func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent) (gomatrixserverlib.PDU, error) {
+ pls, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev)
+ if err != nil {
+ return nil, err
+ }
+ newPls := make(map[string]int64)
+ var userID *spec.UserID
+ for user, level := range pls.Users {
+ validRoomID, _ := spec.NewRoomID(ev.RoomID())
+ userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
+ if err != nil {
+ return nil, err
+ }
+ newPls[userID.String()] = level
+ }
+ var newPlBytes, newEv []byte
+ newPlBytes, err = json.Marshal(newPls)
+ if err != nil {
+ return nil, err
+ }
+ newEv, err = sjson.SetRawBytes(ev.JSON(), "content.users", newPlBytes)
+ if err != nil {
+ return nil, err
+ }
+
+ // do the same for prev content
+ prevContent := gjson.GetBytes(ev.JSON(), "unsigned.prev_content")
+ if !prevContent.Exists() {
+ var evNew gomatrixserverlib.PDU
+ evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false)
+ if err != nil {
+ return nil, err
+ }
+
+ return evNew, err
+ }
+ pls = gomatrixserverlib.PowerLevelContent{}
+ err = json.Unmarshal([]byte(prevContent.Raw), &pls)
+ if err != nil {
+ return nil, err
+ }
+
+ newPls = make(map[string]int64)
+ for user, level := range pls.Users {
+ validRoomID, _ := spec.NewRoomID(ev.RoomID())
+ userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
+ if err != nil {
+ return nil, err
+ }
+ newPls[userID.String()] = level
+ }
+ newPlBytes, err = json.Marshal(newPls)
+ if err != nil {
+ return nil, err
+ }
+ newEv, err = sjson.SetRawBytes(newEv, "unsigned.prev_content.users", newPlBytes)
+ if err != nil {
+ return nil, err
+ }
+
+ var evNew gomatrixserverlib.PDU
+ evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false)
+ if err != nil {
+ return nil, err
+ }
+
+ return evNew, err
+}
+
// applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make
// sure we always return the required events in the timeline.
func applyHistoryVisibilityFilter(
@@ -470,6 +569,7 @@ func applyHistoryVisibilityFilter(
return events, nil
}
+// nolint: gocyclo
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
snapshot storage.DatabaseTransaction,
@@ -563,6 +663,35 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
prevBatch.Decrement()
}
+ // Update powerlevel events for timeline events
+ for i, ev := range events {
+ if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs {
+ continue
+ }
+ if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
+ continue
+ }
+ newEvent, err := p.updatePowerLevelEvent(ctx, ev)
+ if err != nil {
+ return nil, err
+ }
+ events[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
+ // Update powerlevel events for state events
+ for i, ev := range stateEvents {
+ if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs {
+ continue
+ }
+ if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
+ continue
+ }
+ newEvent, err := p.updatePowerLevelEvent(ctx, ev)
+ if err != nil {
+ return nil, err
+ }
+ stateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent}
+ }
+
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)