diff options
author | S7evinK <tfaelligen@gmail.com> | 2020-11-09 19:46:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-09 18:46:11 +0000 |
commit | bcb89ada5ebbe54fa057ec403af4074a8c147764 (patch) | |
tree | 283cdbaf04db7fe5bcd25185b40f77c60aa928ca /federationapi/routing/send.go | |
parent | eccd0d2c1b8bd4b921bafca4585aa09d32ae561f (diff) |
Implement read receipts (#1528)
* fix conversion from int to string yields a string of one rune, not a string of digits
* Add receipts table to syncapi
* Use StreamingToken as the since value
* Add required method to testEDUProducer
* Make receipt json creation "easier" to read
* Add receipts api to the eduserver
* Add receipts endpoint
* Add eduserver kafka consumer
* Add missing kafka config
* Add passing tests to whitelist
Signed-off-by: Till Faelligen <tfaelligen@gmail.com>
* Fix copy & paste error
* Fix column count error
* Make outbound federation receipts pass
* Make "Inbound federation rejects receipts from wrong remote" pass
* Don't use errors package
* - Add TODO for batching requests
- Rename variable
* Return a better error message
* - Use OutputReceiptEvent instead of InputReceiptEvent as result
- Don't use the errors package for errors
- Defer CloseAndLogIfError to close rows
- Fix Copyright
* Better creation/usage of JoinResponse
* Query all joined rooms instead of just one
* Update gomatrixserverlib
* Add sqlite3 migration
* Add postgres migration
* Ensure required sequence exists before running migrations
* Clarification on comment
* - Fix a bug when creating client receipts
- Use concrete types instead of interface{}
* Remove dead code
Use key for timestamp
* Fix postgres query...
* Remove single purpose struct
* Use key/value directly
* Only apply receipts on initial sync or if edu positions differ,
otherwise we'll be sending the same receipts over and over again.
* Actually update the id, so it is correctly send in syncs
* Set receipt on request to /read_markers
* Fix issue with receipts getting overwritten
* Use fmt.Errorf instead of pkg/errors
* Revert "Add postgres migration"
This reverts commit 722fe5a04628882b787d096942459961db159b06.
* Revert "Add sqlite3 migration"
This reverts commit d113b03f6495a4b8f8bcf158a3d00b510b4240cc.
* Fix selectRoomReceipts query
* Make golangci-lint happy
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'federationapi/routing/send.go')
-rw-r--r-- | federationapi/routing/send.go | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 76dc3a2e..79fbcb3d 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -322,12 +322,69 @@ func (t *txnReq) processEDUs(ctx context.Context) { } case gomatrixserverlib.MDeviceListUpdate: t.processDeviceListUpdate(ctx, e) + case gomatrixserverlib.MReceipt: + // https://matrix.org/docs/spec/server_server/r0.1.4#receipts + payload := map[string]eduserverAPI.FederationReceiptMRead{} + + if err := json.Unmarshal(e.Content, &payload); err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal receipt event") + continue + } + + for roomID, receipt := range payload { + for userID, mread := range receipt.User { + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to split domain from receipt event sender") + continue + } + if t.Origin != domain { + util.GetLogger(ctx).Warnf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin) + continue + } + if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "sender": t.Origin, + "user_id": userID, + "room_id": roomID, + "events": mread.EventIDs, + }).Error("Failed to send receipt event to edu server") + continue + } + } + } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") } } } +// processReceiptEvent sends receipt events to the edu server +func (t *txnReq) processReceiptEvent(ctx context.Context, + userID, roomID, receiptType string, + timestamp gomatrixserverlib.Timestamp, + eventIDs []string, +) error { + // store every event + for _, eventID := range eventIDs { + req := eduserverAPI.InputReceiptEventRequest{ + InputReceiptEvent: eduserverAPI.InputReceiptEvent{ + UserID: userID, + RoomID: roomID, + EventID: eventID, + Type: receiptType, + Timestamp: timestamp, + }, + } + resp := eduserverAPI.InputReceiptEventResponse{} + if err := t.eduAPI.InputReceiptEvent(ctx, &req, &resp); err != nil { + return fmt.Errorf("unable to set receipt event: %w", err) + } + } + + return nil +} + func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverlib.EDU) { var payload gomatrixserverlib.DeviceListUpdateEvent if err := json.Unmarshal(e.Content, &payload); err != nil { |