aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-13 12:14:50 +0100
committerGitHub <noreply@github.com>2020-05-13 12:14:50 +0100
commit5e9dce1c0c66736937eeddd5c33c92700d9a65a7 (patch)
treee0c40667cb17714d160fba2fbc64470eedf9ca7b /syncapi/consumers
parent31e6a7f1932c11d9b5b682ad06a5b8db9d74a44f (diff)
syncapi: Rename and split out tokens (#1025)
* syncapi: Rename and split out tokens Previously we used the badly named `PaginationToken` which was used for both `/sync` and `/messages` requests. This quickly became confusing because named fields like `PDUPosition` meant different things depending on the token type. Instead, we now have two token types: `TopologyToken` and `StreamingToken`, both of which have fields which make more sense for their specific situations. Updated the codebase to use one or the other. `PaginationToken` still lives on as `syncToken`, an unexported type which both tokens rely on. This allows us to guarantee that the specific mappings of positions to a string remain solely under the control of the `types` package. This enables us to move high-level conceptual things like "decrement this topological token" to function calls e.g `TopologicalToken.Decrement()`. Currently broken because `/messages` seemingly used both stream and topological tokens, though I need to confirm this. * final tweaks/hacks * spurious logging * Review comments and linting
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/clientapi.go2
-rw-r--r--syncapi/consumers/eduserver.go6
-rw-r--r--syncapi/consumers/roomserver.go4
3 files changed, 5 insertions, 7 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index f5b8c43e..b65d01a0 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -90,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
- s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.PaginationToken{PDUPosition: pduPos})
+ s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0))
return nil
}
diff --git a/syncapi/consumers/eduserver.go b/syncapi/consumers/eduserver.go
index 249452af..ece999d5 100644
--- a/syncapi/consumers/eduserver.go
+++ b/syncapi/consumers/eduserver.go
@@ -65,9 +65,7 @@ func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent(
nil, roomID, nil,
- types.PaginationToken{
- EDUTypingPosition: types.StreamPosition(latestSyncPosition),
- },
+ types.NewStreamToken(0, types.StreamPosition(latestSyncPosition)),
)
})
@@ -96,6 +94,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
- s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.PaginationToken{EDUTypingPosition: typingPos})
+ s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos))
return nil
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 987cc5df..368420a6 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -146,7 +146,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure")
return nil
}
- s.notifier.OnNewEvent(&ev, "", nil, types.PaginationToken{PDUPosition: pduPos})
+ s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0))
return nil
}
@@ -164,7 +164,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
- s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos})
+ s.notifier.OnNewEvent(&msg.Event, "", nil, types.NewStreamToken(pduPos, 0))
return nil
}