aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-06-01 17:50:19 +0100
committerGitHub <noreply@github.com>2020-06-01 17:50:19 +0100
commita5d822004dd93d6f6a7ed73371aeb4bfb163b5ba (patch)
tree76b10a79094415e45cfe5f2db9dfdf58cf0d2837 /syncapi/sync
parent1f43c24f8602dfbc95620e9d34fac78a7b449c11 (diff)
Send-to-device support (#1072)
* Groundwork for send-to-device messaging * Update sample config * Add unstable routing for now * Send to device consumer in sync API * Start the send-to-device consumer * fix indentation in dendrite-config.yaml * Create send-to-device database tables, other tweaks * Add some logic for send-to-device messages, add them into sync stream * Handle incoming send-to-device messages, count them with EDU stream pos * Undo changes to test * pq.Array * Fix sync * Logging * Fix a couple of transaction things, fix client API * Add send-to-device test, hopefully fix bugs * Comments * Refactor a bit * Fix schema * Fix queries * Debug logging * Fix storing and retrieving of send-to-device messages * Try to avoid database locks * Update sync position * Use latest sync position * Jiggle about sync a bit * Fix tests * Break out the retrieval from the update/delete behaviour * Comments * nolint on getResponseWithPDUsForCompleteSync * Try to line up sync tokens again * Implement wildcard * Add all send-to-device tests to whitelist, what could possibly go wrong? * Only care about wildcard when targeted locally * Deduplicate transactions * Handle tokens properly, return immediately if waiting send-to-device messages * Fix sync * Update sytest-whitelist * Fix copyright notice (need to do more of this) * Comments, copyrights * Return errors from Do, fix dendritejs * Review comments * Comments * Constructor for TransactionWriter * defletions * Update gomatrixserverlib, sytest-blacklist
Diffstat (limited to 'syncapi/sync')
-rw-r--r--syncapi/sync/notifier.go16
-rw-r--r--syncapi/sync/notifier_test.go2
-rw-r--r--syncapi/sync/requestpool.go71
3 files changed, 74 insertions, 15 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index 9b410a0c..325e7535 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -120,6 +120,18 @@ func (n *Notifier) OnNewEvent(
}
}
+func (n *Notifier) OnNewSendToDevice(
+ userID string, deviceIDs []string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+ latestPos := n.currPos.WithUpdates(posUpdate)
+ n.currPos = latestPos
+
+ n.wakeupUserDevice(userID, deviceIDs, latestPos)
+}
+
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
@@ -189,8 +201,8 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) {
// wakeupUserDevice will wake up the sync stream for a specific user device. Other
// device streams will be left alone.
// nolint:unused
-func (n *Notifier) wakeupUserDevice(userDevices map[string]string, newPos types.StreamingToken) {
- for userID, deviceID := range userDevices {
+func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) {
+ for _, deviceID := range deviceIDs {
if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil {
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go
index 14ddef20..13231557 100644
--- a/syncapi/sync/notifier_test.go
+++ b/syncapi/sync/notifier_test.go
@@ -172,7 +172,7 @@ func TestCorrectStreamWakeup(t *testing.T) {
time.Sleep(1 * time.Second)
wake := "two"
- n.wakeupUserDevice(map[string]string{alice: wake}, syncPositionAfter)
+ n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter)
if result := <-awoken; result != wake {
t.Fatalf("expected to wake %q, got %q", wake, result)
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index bd29b333..8b93cad4 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -1,4 +1,6 @@
// Copyright 2017 Vector Creations Ltd
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,6 +17,7 @@
package sync
import (
+ "context"
"net/http"
"time"
@@ -54,17 +57,18 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
JSON: jsonerror.Unknown(err.Error()),
}
}
+
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
- "userID": device.UserID,
- "deviceID": device.ID,
- "since": syncReq.since,
- "timeout": syncReq.timeout,
- "limit": syncReq.limit,
+ "user_id": device.UserID,
+ "device_id": device.ID,
+ "since": syncReq.since,
+ "timeout": syncReq.timeout,
+ "limit": syncReq.limit,
})
currPos := rp.notifier.CurrentPosition()
- if shouldReturnImmediately(syncReq) {
+ if rp.shouldReturnImmediately(syncReq) {
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
logger.WithError(err).Error("rp.currentSyncForUser failed")
@@ -116,7 +120,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
// response. This ensures that we don't waste the hard work
// of calculating the sync only to get timed out before we
// can respond
-
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
logger.WithError(err).Error("rp.currentSyncForUser failed")
@@ -134,19 +137,59 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
}
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
+ res = types.NewResponse()
+
+ since := types.NewStreamToken(0, 0)
+ if req.since != nil {
+ since = *req.since
+ }
+
+ // See if we have any new tasks to do for the send-to-device messaging.
+ events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, since)
+ if err != nil {
+ return nil, err
+ }
+
// TODO: handle ignored users
if req.since == nil {
- res, err = rp.db.CompleteSync(req.ctx, req.device, req.limit)
+ res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
} else {
- res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState)
+ res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
}
-
if err != nil {
return
}
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
+ if err != nil {
+ return
+ }
+
+ // Before we return the sync response, make sure that we take action on
+ // any send-to-device database updates or deletions that we need to do.
+ // Then add the updates into the sync response.
+ if len(updates) > 0 || len(deletions) > 0 {
+ // Handle the updates and deletions in the database.
+ err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since)
+ if err != nil {
+ return
+ }
+ }
+ if len(events) > 0 {
+ // Add the updates into the sync response.
+ for _, event := range events {
+ res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
+ }
+
+ // Get the next_batch from the sync response and increase the
+ // EDU counter.
+ if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
+ pos.Positions[1]++
+ res.NextBatch = pos.String()
+ }
+ }
+
return
}
@@ -238,6 +281,10 @@ func (rp *RequestPool) appendAccountData(
// shouldReturnImmediately returns whether the /sync request is an initial sync,
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
-func shouldReturnImmediately(syncReq *syncRequest) bool {
- return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState
+func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
+ if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState {
+ return true
+ }
+ waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)
+ return werr == nil && waiting
}