diff options
author | Tak Wai Wong <64229756+tak-hntlabs@users.noreply.github.com> | 2022-09-19 09:39:06 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-19 18:39:06 +0200 |
commit | 99f6b6a95234f59a16806b9dc6b16cb41f038504 (patch) | |
tree | 7acf1cc735354b93ae62dc50f3227175efff20c3 /appservice | |
parent | 7bfc3074d10b2cd91b37290e39d3882119853107 (diff) |
Bug fix #2718 appservice txnid should be different for each batch of events (#2719)
See issue: [#2718](https://github.com/matrix-org/dendrite/issues/2718)
for more details.
The fix assumes that if the number of transaction items are different,
then the txnid should be different.
txnid := OriginalServerTS()_len(transactions)
The case that it doesn't address is if the txnid generated this way is
the same for 2 different batches of events which have the same
OriginalServerTS and the same array length.
Another option:
txnid := OriginalServerTS()_hash(transactions)
Would love to hear other ideas and ways to fix this.
### Pull Request Checklist
* [x ] I have added added tests for PR _or_ I have justified why this PR
doesn't need tests.
* [x ] Pull request includes a [sign
off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off)
Signed-off-by: `Tak Wai Wong <tak@hntlabs.com>`
Co-authored-by: Tak Wai Wong <tak@hntlabs.com>
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/consumers/roomserver.go | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index a4bcfa7d..d44f32b3 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -22,6 +22,7 @@ import ( "math" "net/http" "net/url" + "strconv" "time" "github.com/matrix-org/gomatrixserverlib" @@ -151,10 +152,17 @@ func (s *OutputRoomEventConsumer) onMessage( return true } + txnID := "" + // Try to get the message metadata, if we're able to, use the timestamp as the txnID + metadata, err := msgs[0].Metadata() + if err == nil { + txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano())) + } + // Send event to any relevant application services. If we hit // an error here, return false, so that we negatively ack. log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events)) - return s.sendEvents(ctx, state, events) == nil + return s.sendEvents(ctx, state, events, txnID) == nil } // sendEvents passes events to the appservice by using the transactions @@ -162,6 +170,7 @@ func (s *OutputRoomEventConsumer) onMessage( func (s *OutputRoomEventConsumer) sendEvents( ctx context.Context, state *appserviceState, events []*gomatrixserverlib.HeaderedEvent, + txnID string, ) error { // Create the transaction body. transaction, err := json.Marshal( @@ -173,13 +182,14 @@ func (s *OutputRoomEventConsumer) sendEvents( return err } - // TODO: We should probably be more intelligent and pick something not - // in the control of the event. A NATS timestamp header or something maybe. - txnID := events[0].Event.OriginServerTS() + // If txnID is not defined, generate one from the events. + if txnID == "" { + txnID = fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction)) + } // Send the transaction to the appservice. // https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid - address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken)) + address := fmt.Sprintf("%s/transactions/%s?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken)) req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction)) if err != nil { return err |