aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-10 18:57:10 +0000
committerGitHub <noreply@github.com>2020-12-10 18:57:10 +0000
commit9c03b0a4fa38971dfe83bd135aefb3c482a18380 (patch)
treeba8e55b75c056adac1fc7c42fb6febb2b8565d36 /syncapi/consumers
parentbad81c028f090af0e1005076829db67d1a749a14 (diff)
Refactor sync tokens (#1628)
* Refactor sync tokens * Comment out broken notifier test * Update types, sytest-whitelist * More robust token checking * Remove New functions for streaming tokens * Export Logs in StreamingToken * Fix tests
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/clientapi.go2
-rw-r--r--syncapi/consumers/eduserver_receipts.go2
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go2
-rw-r--r--syncapi/consumers/eduserver_typing.go6
-rw-r--r--syncapi/consumers/keychange.go12
-rw-r--r--syncapi/consumers/roomserver.go12
6 files changed, 20 insertions, 16 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 7070dd32..9883c6b0 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -92,7 +92,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
- s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0, nil))
+ s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos})
return nil
}
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index 3361e134..5c286cf0 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -88,7 +88,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err
}
// update stream position
- s.notifier.OnNewReceipt(types.NewStreamToken(0, streamPos, nil))
+ s.notifier.OnNewReceipt(types.StreamingToken{ReceiptPosition: streamPos})
return nil
}
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index 07324fcd..0c3f52cd 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -107,7 +107,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
- types.NewStreamToken(0, streamPos, nil),
+ types.StreamingToken{SendToDevicePosition: streamPos},
)
return nil
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index bdea606c..885e7fd1 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -66,7 +66,9 @@ func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent(
nil, roomID, nil,
- types.NewStreamToken(0, types.StreamPosition(latestSyncPosition), nil),
+ types.StreamingToken{
+ TypingPosition: types.StreamPosition(latestSyncPosition),
+ },
)
})
@@ -95,6 +97,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
- s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos, nil))
+ s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.StreamingToken{TypingPosition: typingPos})
return nil
}
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 3fc6120d..0d82f7a5 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -114,12 +114,14 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
- posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
- syncinternal.DeviceListLogName: {
- Offset: msg.Offset,
- Partition: msg.Partition,
+ posUpdate := types.StreamingToken{
+ Logs: map[string]*types.LogPosition{
+ syncinternal.DeviceListLogName: {
+ Offset: msg.Offset,
+ Partition: msg.Partition,
+ },
},
- })
+ }
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 11d75a68..be84a281 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -181,7 +181,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
- s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
+ s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@@ -220,7 +220,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
- s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
+ s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@@ -269,7 +269,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
- s.notifier.OnNewEvent(msg.Event, "", nil, types.NewStreamToken(pduPos, 0, nil))
+ s.notifier.OnNewEvent(msg.Event, "", nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@@ -287,7 +287,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
- s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0, nil))
+ s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.StreamingToken{PDUPosition: sp})
return nil
}
@@ -307,7 +307,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
- s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil))
+ s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}
@@ -327,7 +327,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
- s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil))
+ s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}