aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2023-04-27 12:54:20 +0100
committerGitHub <noreply@github.com>2023-04-27 12:54:20 +0100
commitb189edf4f43ff34b69d6c60aeb0efb60dd549c86 (patch)
tree4c08aeda694f3e1cf17c66cf0e4b2b306af6a8df /syncapi/streams
parent2475cf4b61747e76a524af6f71a4eb7e112812af (diff)
Remove gmsl.HeaderedEvent (#3068)
Replaced with types.HeaderedEvent _for now_. In reality we want to move them all to gmsl.Event and only use HeaderedEvent when we _need_ to bundle the version/event ID with the event (seriailsation boundaries, and even then only when we don't have the room version). Requires https://github.com/matrix-org/gomatrixserverlib/pull/373
Diffstat (limited to 'syncapi/streams')
-rw-r--r--syncapi/streams/stream_pdu.go51
1 files changed, 41 insertions, 10 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 41c58481..723dd88f 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -8,6 +8,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -273,10 +274,21 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
recentStreamEvents := dbEvents[delta.RoomID].Events
limited := dbEvents[delta.RoomID].Limited
- recentEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
- snapshot.StreamEventsToEvents(device, recentStreamEvents),
+ hisVisMap := map[string]gomatrixserverlib.HistoryVisibility{}
+ for _, re := range recentStreamEvents {
+ hisVisMap[re.EventID()] = re.Visibility
+ }
+ recEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
+ toEvents(snapshot.StreamEventsToEvents(device, recentStreamEvents)),
gomatrixserverlib.TopologicalOrderByPrevEvents,
)
+ recentEvents := make([]*rstypes.HeaderedEvent, len(recEvents))
+ for i := range recEvents {
+ recentEvents[i] = &rstypes.HeaderedEvent{
+ Event: recEvents[i],
+ Visibility: hisVisMap[recEvents[i].EventID()],
+ }
+ }
// If we didn't return any events at all then don't bother doing anything else.
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
@@ -341,10 +353,21 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// Now that we've filtered the timeline, work out which state events are still
// left. Anything that appears in the filtered timeline will be removed from the
// "state" section and kept in "timeline".
- delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(
- removeDuplicates(delta.StateEvents, events),
+ hisVisMap = map[string]gomatrixserverlib.HistoryVisibility{}
+ for _, re := range delta.StateEvents {
+ hisVisMap[re.EventID()] = re.Visibility
+ }
+ sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
+ toEvents(removeDuplicates(delta.StateEvents, events)),
gomatrixserverlib.TopologicalOrderByAuthEvents,
)
+ delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents))
+ for i := range sEvents {
+ delta.StateEvents[i] = &rstypes.HeaderedEvent{
+ Event: sEvents[i],
+ Visibility: hisVisMap[sEvents[i].EventID()],
+ }
+ }
if len(delta.StateEvents) > 0 {
if last := delta.StateEvents[len(delta.StateEvents)-1]; last != nil {
@@ -407,8 +430,8 @@ func applyHistoryVisibilityFilter(
snapshot storage.DatabaseTransaction,
rsAPI roomserverAPI.SyncRoomserverAPI,
roomID, userID string,
- recentEvents []*gomatrixserverlib.HeaderedEvent,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ recentEvents []*rstypes.HeaderedEvent,
+) ([]*rstypes.HeaderedEvent, error) {
// We need to make sure we always include the latest state events, if they are in the timeline.
alwaysIncludeIDs := make(map[string]struct{})
var stateTypes []string
@@ -555,8 +578,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
ctx context.Context, snapshot storage.DatabaseTransaction, roomID string,
incremental, limited bool, stateFilter *synctypes.StateFilter,
device *userapi.Device,
- timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ timelineEvents, stateEvents []*rstypes.HeaderedEvent,
+) ([]*rstypes.HeaderedEvent, error) {
if len(timelineEvents) == 0 {
return stateEvents, nil
}
@@ -573,7 +596,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
}
}
// Preallocate with the same amount, even if it will end up with fewer values
- newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
+ newStateEvents := make([]*rstypes.HeaderedEvent, 0, len(stateEvents))
// Remove existing membership events we don't care about, e.g. users not in the timeline.events
for _, event := range stateEvents {
if event.Type() == spec.MRoomMember && event.StateKey() != nil {
@@ -633,7 +656,7 @@ func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapsho
return nil
}
-func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
+func removeDuplicates(stateEvents, recentEvents []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
@@ -654,3 +677,11 @@ func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEve
}
return stateEvents
}
+
+func toEvents(events []*rstypes.HeaderedEvent) []*gomatrixserverlib.Event {
+ result := make([]*gomatrixserverlib.Event, len(events))
+ for i := range events {
+ result[i] = events[i].Event
+ }
+ return result
+}