aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2023-04-19 15:50:33 +0100
committerGitHub <noreply@github.com>2023-04-19 15:50:33 +0100
commit72285b2659a31ebd52c91799c17105d81d996f40 (patch)
tree1855395f5efdc3ea6051dd502882bf62aaa57e7c /syncapi/streams
parent9fa39263c0a4a8d349c8715f6ba30cae30b1b73a (diff)
refactor: update GMSL (#3058)
Sister PR to https://github.com/matrix-org/gomatrixserverlib/pull/364 Read this commit by commit to avoid going insane.
Diffstat (limited to 'syncapi/streams')
-rw-r--r--syncapi/streams/stream_accountdata.go7
-rw-r--r--syncapi/streams/stream_invite.go8
-rw-r--r--syncapi/streams/stream_pdu.go25
-rw-r--r--syncapi/streams/stream_presence.go6
-rw-r--r--syncapi/streams/stream_receipt.go8
-rw-r--r--syncapi/streams/stream_typing.go7
6 files changed, 30 insertions, 31 deletions
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go
index 22953b8c..51f2a3d3 100644
--- a/syncapi/streams/stream_accountdata.go
+++ b/syncapi/streams/stream_accountdata.go
@@ -3,12 +3,11 @@ package streams
import (
"context"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib/spec"
)
type AccountDataStreamProvider struct {
@@ -85,7 +84,7 @@ func (p *AccountDataStreamProvider) IncrementalSync(
req.Response.AccountData.Events,
synctypes.ClientEvent{
Type: dataType,
- Content: gomatrixserverlib.RawJSON(globalData),
+ Content: spec.RawJSON(globalData),
},
)
}
@@ -99,7 +98,7 @@ func (p *AccountDataStreamProvider) IncrementalSync(
joinData.AccountData.Events,
synctypes.ClientEvent{
Type: dataType,
- Content: gomatrixserverlib.RawJSON(roomData),
+ Content: spec.RawJSON(roomData),
},
)
req.Response.Rooms.Join[roomID] = joinData
diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go
index a4414f31..becd863a 100644
--- a/syncapi/streams/stream_invite.go
+++ b/syncapi/streams/stream_invite.go
@@ -8,7 +8,7 @@ import (
"strconv"
"time"
- "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -79,7 +79,7 @@ func (p *InviteStreamProvider) IncrementalSync(
membership, _, err := snapshot.SelectMembershipForUser(ctx, roomID, req.Device.UserID, math.MaxInt64)
// Skip if the user is an existing member of the room.
// Otherwise, the NewLeaveResponse will eject the user from the room unintentionally
- if membership == gomatrixserverlib.Join ||
+ if membership == spec.Join ||
err != nil {
continue
}
@@ -89,12 +89,12 @@ func (p *InviteStreamProvider) IncrementalSync(
lr.Timeline.Events = append(lr.Timeline.Events, synctypes.ClientEvent{
// fake event ID which muxes in the to position
EventID: "$" + base64.RawURLEncoding.EncodeToString(h[:]),
- OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
+ OriginServerTS: spec.AsTimestamp(time.Now()),
RoomID: roomID,
Sender: req.Device.UserID,
StateKey: &req.Device.UserID,
Type: "m.room.member",
- Content: gomatrixserverlib.RawJSON(`{"membership":"leave"}`),
+ Content: spec.RawJSON(`{"membership":"leave"}`),
})
req.Response.Rooms.Leave[roomID] = lr
}
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index e29e29f7..41c58481 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -13,6 +13,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/gomatrixserverlib"
@@ -70,7 +71,7 @@ func (p *PDUStreamProvider) CompleteSync(
}
// Extract room state and recent events for all rooms the user is joined to.
- joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
+ joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, spec.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return from
@@ -110,7 +111,7 @@ func (p *PDUStreamProvider) CompleteSync(
continue
}
req.Response.Rooms.Join[roomID] = jr
- req.Rooms[roomID] = gomatrixserverlib.Join
+ req.Rooms[roomID] = spec.Join
}
// Add peeked rooms.
@@ -185,7 +186,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
for _, roomID := range syncJoinedRooms {
- req.Rooms[roomID] = gomatrixserverlib.Join
+ req.Rooms[roomID] = spec.Join
}
if len(stateDeltas) == 0 {
@@ -312,8 +313,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
hasMembershipChange := false
for _, recentEvent := range recentStreamEvents {
- if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
- if membership, _ := recentEvent.Membership(); membership == gomatrixserverlib.Join {
+ if recentEvent.Type() == spec.MRoomMember && recentEvent.StateKey() != nil {
+ if membership, _ := recentEvent.Membership(); membership == spec.Join {
req.MembershipChanges[*recentEvent.StateKey()] = struct{}{}
}
hasMembershipChange = true
@@ -357,7 +358,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
switch delta.Membership {
- case gomatrixserverlib.Join:
+ case spec.Join:
jr := types.NewJoinResponse()
if hasMembershipChange {
jr.Summary, err = snapshot.GetRoomSummary(ctx, delta.RoomID, device.UserID)
@@ -373,7 +374,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr.State.Events = synctypes.HeaderedToClientEvents(delta.StateEvents, synctypes.FormatSync)
req.Response.Rooms.Join[delta.RoomID] = jr
- case gomatrixserverlib.Peek:
+ case spec.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
// TODO: Apply history visibility on peeked rooms
@@ -382,10 +383,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr.State.Events = synctypes.HeaderedToClientEvents(delta.StateEvents, synctypes.FormatSync)
req.Response.Rooms.Peek[delta.RoomID] = jr
- case gomatrixserverlib.Leave:
+ case spec.Leave:
fallthrough // transitions to leave are the same as ban
- case gomatrixserverlib.Ban:
+ case spec.Ban:
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = synctypes.HeaderedToClientEvents(events, synctypes.FormatSync)
@@ -527,7 +528,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
event := events[0]
// If this is the beginning of the room, we can't go back further. We're going to return
// the TopologyToken from the last event instead. (Synapse returns the /sync next_Batch)
- if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
+ if event.Type() == spec.MRoomCreate && event.StateKeyEquals("") {
event = events[len(events)-1]
}
backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, event.EventID())
@@ -575,7 +576,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
// Remove existing membership events we don't care about, e.g. users not in the timeline.events
for _, event := range stateEvents {
- if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
+ if event.Type() == spec.MRoomMember && event.StateKey() != nil {
// If this is a gapped incremental sync, we still want this membership
isGappedIncremental := limited && incremental
// We want this users membership event, keep it in the list
@@ -598,7 +599,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
// Query missing membership events
filter := synctypes.DefaultStateFilter()
filter.Senders = &wantUsers
- filter.Types = &[]string{gomatrixserverlib.MRoomMember}
+ filter.Types = &[]string{spec.MRoomMember}
memberships, err := snapshot.GetStateEventsForRoom(ctx, roomID, &filter)
if err != nil {
return stateEvents, err
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 81f33207..32424066 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -20,13 +20,13 @@ import (
"fmt"
"sync"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib/spec"
)
type PresenceStreamProvider struct {
@@ -134,7 +134,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
req.Response.Presence.Events = append(req.Response.Presence.Events, synctypes.ClientEvent{
Content: content,
Sender: presence.UserID,
- Type: gomatrixserverlib.MPresence,
+ Type: spec.MPresence,
})
if presence.StreamPos > lastPos {
lastPos = presence.StreamPos
@@ -207,7 +207,7 @@ func membershipEventPresent(events []synctypes.ClientEvent, userID string) bool
for _, ev := range events {
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
- if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
+ if ev.Type == spec.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
// ignore e.g. join -> join changes
if gjson.GetBytes(ev.Unsigned, "prev_content.membership").Str == gjson.GetBytes(ev.Content, "membership").Str {
continue
diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go
index 88db0054..ed52dc5c 100644
--- a/syncapi/streams/stream_receipt.go
+++ b/syncapi/streams/stream_receipt.go
@@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -46,7 +46,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
) types.StreamPosition {
var joinedRooms []string
for roomID, membership := range req.Rooms {
- if membership == gomatrixserverlib.Join {
+ if membership == spec.Join {
joinedRooms = append(joinedRooms, roomID)
}
}
@@ -88,7 +88,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
}
ev := synctypes.ClientEvent{
- Type: gomatrixserverlib.MReceipt,
+ Type: spec.MReceipt,
}
content := make(map[string]ReceiptMRead)
for _, receipt := range receipts {
@@ -119,5 +119,5 @@ type ReceiptMRead struct {
}
type ReceiptTS struct {
- TS gomatrixserverlib.Timestamp `json:"ts"`
+ TS spec.Timestamp `json:"ts"`
}
diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go
index b0e7d9e7..15500a47 100644
--- a/syncapi/streams/stream_typing.go
+++ b/syncapi/streams/stream_typing.go
@@ -4,12 +4,11 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib/spec"
)
type TypingStreamProvider struct {
@@ -33,7 +32,7 @@ func (p *TypingStreamProvider) IncrementalSync(
) types.StreamPosition {
var err error
for roomID, membership := range req.Rooms {
- if membership != gomatrixserverlib.Join {
+ if membership != spec.Join {
continue
}
@@ -53,7 +52,7 @@ func (p *TypingStreamProvider) IncrementalSync(
}
}
ev := synctypes.ClientEvent{
- Type: gomatrixserverlib.MTyping,
+ Type: spec.MTyping,
}
ev.Content, err = json.Marshal(map[string]interface{}{
"user_ids": typingUsers,