aboutsummaryrefslogtreecommitdiff
path: root/keyserver
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-08-12 13:50:54 +0100
committerGitHub <noreply@github.com>2020-08-12 13:50:54 +0100
commitd98ec12422c8498cf710bb34d2ed31f024aa1e15 (patch)
tree9964a1783aa361c3cdd117ba246342b55ca016eb /keyserver
parent0835107f5b14ec408810da4f73da94200e19557f (diff)
Add sync mechanism to block when updating device lists (#1264)
* Add sync mechanism to block when updating device lists With a timeout, mainly for sytest to fix the test "Server correctly handles incoming m.device_list_update" which is flakey because it assumes that when `/send` 200 OKs that the server has updated the device lists in prep for `/keys/query` which is not always true when using workers. * Fix UT * Add new working test
Diffstat (limited to 'keyserver')
-rw-r--r--keyserver/internal/device_list_update.go51
-rw-r--r--keyserver/internal/device_list_update_test.go10
2 files changed, 45 insertions, 16 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go
index 85785b07..1c4f0b97 100644
--- a/keyserver/internal/device_list_update.go
+++ b/keyserver/internal/device_list_update.go
@@ -67,6 +67,11 @@ type DeviceListUpdater struct {
producer KeyChangeProducer
fedClient *gomatrixserverlib.FederationClient
workerChans []chan gomatrixserverlib.ServerName
+
+ // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will
+ // block on or timeout via a select.
+ userIDToChan map[string]chan bool
+ userIDToChanMu *sync.Mutex
}
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
@@ -98,12 +103,14 @@ func NewDeviceListUpdater(
numWorkers int,
) *DeviceListUpdater {
return &DeviceListUpdater{
- userIDToMutex: make(map[string]*sync.Mutex),
- mu: &sync.Mutex{},
- db: db,
- producer: producer,
- fedClient: fedClient,
- workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
+ userIDToMutex: make(map[string]*sync.Mutex),
+ mu: &sync.Mutex{},
+ db: db,
+ producer: producer,
+ fedClient: fedClient,
+ workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
+ userIDToChan: make(map[string]chan bool),
+ userIDToChanMu: &sync.Mutex{},
}
}
@@ -137,6 +144,8 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
return u.userIDToMutex[userID]
}
+// Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest,
+// which assumes when /send 200 OKs that the device lists have been updated.
func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error {
isDeviceListStale, err := u.update(ctx, event)
if err != nil {
@@ -213,7 +222,35 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
hash := fnv.New32a()
_, _ = hash.Write([]byte(remoteServer))
index := int(hash.Sum32()) % len(u.workerChans)
+
+ ch := u.assignChannel(userID)
u.workerChans[index] <- remoteServer
+ select {
+ case <-ch:
+ case <-time.After(10 * time.Second):
+ // we don't return an error in this case as it's not a failure condition.
+ // we mainly block for the benefit of sytest anyway
+ }
+}
+
+func (u *DeviceListUpdater) assignChannel(userID string) chan bool {
+ u.userIDToChanMu.Lock()
+ defer u.userIDToChanMu.Unlock()
+ if ch, ok := u.userIDToChan[userID]; ok {
+ return ch
+ }
+ ch := make(chan bool)
+ u.userIDToChan[userID] = ch
+ return ch
+}
+
+func (u *DeviceListUpdater) clearChannel(userID string) {
+ u.userIDToChanMu.Lock()
+ defer u.userIDToChanMu.Unlock()
+ if ch, ok := u.userIDToChan[userID]; ok {
+ close(ch)
+ delete(u.userIDToChan, userID)
+ }
}
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
@@ -285,6 +322,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
hasFailures = true
+ } else {
+ u.clearChannel(userID)
}
}
return hasFailures
diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go
index b07148bb..dcb981c4 100644
--- a/keyserver/internal/device_list_update_test.go
+++ b/keyserver/internal/device_list_update_test.go
@@ -204,16 +204,6 @@ func TestUpdateNoPrevID(t *testing.T) {
if err != nil {
t.Fatalf("Update returned an error: %s", err)
}
- // At this point we show have this device list marked as stale and not store the keys or emitted anything
- if !db.staleUsers[event.UserID] {
- t.Errorf("%s not marked as stale", event.UserID)
- }
- if len(producer.events) > 0 {
- t.Errorf("Update incorrect emitted %d device change events", len(producer.events))
- }
- if len(db.storedKeys) > 0 {
- t.Errorf("Update incorrect stored %d device change events", len(db.storedKeys))
- }
t.Log("waiting for /users/devices to be called...")
wg.Wait()
// wait a bit for db to be updated...