aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keyserver/internal/device_list_update.go51
-rw-r--r--keyserver/internal/device_list_update_test.go10
-rw-r--r--sytest-whitelist1
3 files changed, 46 insertions, 16 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go
index 85785b07..1c4f0b97 100644
--- a/keyserver/internal/device_list_update.go
+++ b/keyserver/internal/device_list_update.go
@@ -67,6 +67,11 @@ type DeviceListUpdater struct {
producer KeyChangeProducer
fedClient *gomatrixserverlib.FederationClient
workerChans []chan gomatrixserverlib.ServerName
+
+ // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will
+ // block on or timeout via a select.
+ userIDToChan map[string]chan bool
+ userIDToChanMu *sync.Mutex
}
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
@@ -98,12 +103,14 @@ func NewDeviceListUpdater(
numWorkers int,
) *DeviceListUpdater {
return &DeviceListUpdater{
- userIDToMutex: make(map[string]*sync.Mutex),
- mu: &sync.Mutex{},
- db: db,
- producer: producer,
- fedClient: fedClient,
- workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
+ userIDToMutex: make(map[string]*sync.Mutex),
+ mu: &sync.Mutex{},
+ db: db,
+ producer: producer,
+ fedClient: fedClient,
+ workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
+ userIDToChan: make(map[string]chan bool),
+ userIDToChanMu: &sync.Mutex{},
}
}
@@ -137,6 +144,8 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
return u.userIDToMutex[userID]
}
+// Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest,
+// which assumes when /send 200 OKs that the device lists have been updated.
func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error {
isDeviceListStale, err := u.update(ctx, event)
if err != nil {
@@ -213,7 +222,35 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
hash := fnv.New32a()
_, _ = hash.Write([]byte(remoteServer))
index := int(hash.Sum32()) % len(u.workerChans)
+
+ ch := u.assignChannel(userID)
u.workerChans[index] <- remoteServer
+ select {
+ case <-ch:
+ case <-time.After(10 * time.Second):
+ // we don't return an error in this case as it's not a failure condition.
+ // we mainly block for the benefit of sytest anyway
+ }
+}
+
+func (u *DeviceListUpdater) assignChannel(userID string) chan bool {
+ u.userIDToChanMu.Lock()
+ defer u.userIDToChanMu.Unlock()
+ if ch, ok := u.userIDToChan[userID]; ok {
+ return ch
+ }
+ ch := make(chan bool)
+ u.userIDToChan[userID] = ch
+ return ch
+}
+
+func (u *DeviceListUpdater) clearChannel(userID string) {
+ u.userIDToChanMu.Lock()
+ defer u.userIDToChanMu.Unlock()
+ if ch, ok := u.userIDToChan[userID]; ok {
+ close(ch)
+ delete(u.userIDToChan, userID)
+ }
}
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
@@ -285,6 +322,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
hasFailures = true
+ } else {
+ u.clearChannel(userID)
}
}
return hasFailures
diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go
index b07148bb..dcb981c4 100644
--- a/keyserver/internal/device_list_update_test.go
+++ b/keyserver/internal/device_list_update_test.go
@@ -204,16 +204,6 @@ func TestUpdateNoPrevID(t *testing.T) {
if err != nil {
t.Fatalf("Update returned an error: %s", err)
}
- // At this point we show have this device list marked as stale and not store the keys or emitted anything
- if !db.staleUsers[event.UserID] {
- t.Errorf("%s not marked as stale", event.UserID)
- }
- if len(producer.events) > 0 {
- t.Errorf("Update incorrect emitted %d device change events", len(producer.events))
- }
- if len(db.storedKeys) > 0 {
- t.Errorf("Update incorrect stored %d device change events", len(db.storedKeys))
- }
t.Log("waiting for /users/devices to be called...")
wg.Wait()
// wait a bit for db to be updated...
diff --git a/sytest-whitelist b/sytest-whitelist
index 4d37b3ee..bbac6972 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -143,6 +143,7 @@ Device deletion propagates over federation
If remote user leaves room, changes device and rejoins we see update in sync
If remote user leaves room, changes device and rejoins we see update in /keys/changes
If remote user leaves room we no longer receive device updates
+If a device list update goes missing, the server resyncs on the next one
Get left notifs in sync and /keys/changes when other user leaves
Can query remote device keys using POST after notification
Can add account data