diff options
author | Kegsay <kegan@matrix.org> | 2020-08-12 13:50:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-12 13:50:54 +0100 |
commit | d98ec12422c8498cf710bb34d2ed31f024aa1e15 (patch) | |
tree | 9964a1783aa361c3cdd117ba246342b55ca016eb /keyserver | |
parent | 0835107f5b14ec408810da4f73da94200e19557f (diff) |
Add sync mechanism to block when updating device lists (#1264)
* Add sync mechanism to block when updating device lists
With a timeout, mainly for sytest to fix the test
"Server correctly handles incoming m.device_list_update"
which is flakey because it assumes that when `/send` 200 OKs
that the server has updated the device lists in prep for
`/keys/query` which is not always true when using workers.
* Fix UT
* Add new working test
Diffstat (limited to 'keyserver')
-rw-r--r-- | keyserver/internal/device_list_update.go | 51 | ||||
-rw-r--r-- | keyserver/internal/device_list_update_test.go | 10 |
2 files changed, 45 insertions, 16 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 85785b07..1c4f0b97 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -67,6 +67,11 @@ type DeviceListUpdater struct { producer KeyChangeProducer fedClient *gomatrixserverlib.FederationClient workerChans []chan gomatrixserverlib.ServerName + + // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will + // block on or timeout via a select. + userIDToChan map[string]chan bool + userIDToChanMu *sync.Mutex } // DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater. @@ -98,12 +103,14 @@ func NewDeviceListUpdater( numWorkers int, ) *DeviceListUpdater { return &DeviceListUpdater{ - userIDToMutex: make(map[string]*sync.Mutex), - mu: &sync.Mutex{}, - db: db, - producer: producer, - fedClient: fedClient, - workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + userIDToMutex: make(map[string]*sync.Mutex), + mu: &sync.Mutex{}, + db: db, + producer: producer, + fedClient: fedClient, + workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + userIDToChan: make(map[string]chan bool), + userIDToChanMu: &sync.Mutex{}, } } @@ -137,6 +144,8 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex { return u.userIDToMutex[userID] } +// Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest, +// which assumes when /send 200 OKs that the device lists have been updated. func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error { isDeviceListStale, err := u.update(ctx, event) if err != nil { @@ -213,7 +222,35 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) { hash := fnv.New32a() _, _ = hash.Write([]byte(remoteServer)) index := int(hash.Sum32()) % len(u.workerChans) + + ch := u.assignChannel(userID) u.workerChans[index] <- remoteServer + select { + case <-ch: + case <-time.After(10 * time.Second): + // we don't return an error in this case as it's not a failure condition. + // we mainly block for the benefit of sytest anyway + } +} + +func (u *DeviceListUpdater) assignChannel(userID string) chan bool { + u.userIDToChanMu.Lock() + defer u.userIDToChanMu.Unlock() + if ch, ok := u.userIDToChan[userID]; ok { + return ch + } + ch := make(chan bool) + u.userIDToChan[userID] = ch + return ch +} + +func (u *DeviceListUpdater) clearChannel(userID string) { + u.userIDToChanMu.Lock() + defer u.userIDToChanMu.Unlock() + if ch, ok := u.userIDToChan[userID]; ok { + close(ch) + delete(u.userIDToChan, userID) + } } func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { @@ -285,6 +322,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam if err != nil { logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") hasFailures = true + } else { + u.clearChannel(userID) } } return hasFailures diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go index b07148bb..dcb981c4 100644 --- a/keyserver/internal/device_list_update_test.go +++ b/keyserver/internal/device_list_update_test.go @@ -204,16 +204,6 @@ func TestUpdateNoPrevID(t *testing.T) { if err != nil { t.Fatalf("Update returned an error: %s", err) } - // At this point we show have this device list marked as stale and not store the keys or emitted anything - if !db.staleUsers[event.UserID] { - t.Errorf("%s not marked as stale", event.UserID) - } - if len(producer.events) > 0 { - t.Errorf("Update incorrect emitted %d device change events", len(producer.events)) - } - if len(db.storedKeys) > 0 { - t.Errorf("Update incorrect stored %d device change events", len(db.storedKeys)) - } t.Log("waiting for /users/devices to be called...") wg.Wait() // wait a bit for db to be updated... |