aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorTak Wai Wong <64229756+tak-hntlabs@users.noreply.github.com>2022-09-19 09:39:06 -0700
committerGitHub <noreply@github.com>2022-09-19 18:39:06 +0200
commit99f6b6a95234f59a16806b9dc6b16cb41f038504 (patch)
tree7acf1cc735354b93ae62dc50f3227175efff20c3 /appservice
parent7bfc3074d10b2cd91b37290e39d3882119853107 (diff)
Bug fix #2718 appservice txnid should be different for each batch of events (#2719)
See issue: [#2718](https://github.com/matrix-org/dendrite/issues/2718) for more details. The fix assumes that if the number of transaction items are different, then the txnid should be different. txnid := OriginalServerTS()_len(transactions) The case that it doesn't address is if the txnid generated this way is the same for 2 different batches of events which have the same OriginalServerTS and the same array length. Another option: txnid := OriginalServerTS()_hash(transactions) Would love to hear other ideas and ways to fix this. ### Pull Request Checklist * [x ] I have added added tests for PR _or_ I have justified why this PR doesn't need tests. * [x ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off) Signed-off-by: `Tak Wai Wong <tak@hntlabs.com>` Co-authored-by: Tak Wai Wong <tak@hntlabs.com>
Diffstat (limited to 'appservice')
-rw-r--r--appservice/consumers/roomserver.go20
1 files changed, 15 insertions, 5 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index a4bcfa7d..d44f32b3 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -22,6 +22,7 @@ import (
"math"
"net/http"
"net/url"
+ "strconv"
"time"
"github.com/matrix-org/gomatrixserverlib"
@@ -151,10 +152,17 @@ func (s *OutputRoomEventConsumer) onMessage(
return true
}
+ txnID := ""
+ // Try to get the message metadata, if we're able to, use the timestamp as the txnID
+ metadata, err := msgs[0].Metadata()
+ if err == nil {
+ txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
+ }
+
// Send event to any relevant application services. If we hit
// an error here, return false, so that we negatively ack.
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
- return s.sendEvents(ctx, state, events) == nil
+ return s.sendEvents(ctx, state, events, txnID) == nil
}
// sendEvents passes events to the appservice by using the transactions
@@ -162,6 +170,7 @@ func (s *OutputRoomEventConsumer) onMessage(
func (s *OutputRoomEventConsumer) sendEvents(
ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
+ txnID string,
) error {
// Create the transaction body.
transaction, err := json.Marshal(
@@ -173,13 +182,14 @@ func (s *OutputRoomEventConsumer) sendEvents(
return err
}
- // TODO: We should probably be more intelligent and pick something not
- // in the control of the event. A NATS timestamp header or something maybe.
- txnID := events[0].Event.OriginServerTS()
+ // If txnID is not defined, generate one from the events.
+ if txnID == "" {
+ txnID = fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction))
+ }
// Send the transaction to the appservice.
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
- address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
+ address := fmt.Sprintf("%s/transactions/%s?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction))
if err != nil {
return err