aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2020-11-09 19:46:11 +0100
committerGitHub <noreply@github.com>2020-11-09 18:46:11 +0000
commitbcb89ada5ebbe54fa057ec403af4074a8c147764 (patch)
tree283cdbaf04db7fe5bcd25185b40f77c60aa928ca
parenteccd0d2c1b8bd4b921bafca4585aa09d32ae561f (diff)
Implement read receipts (#1528)
* fix conversion from int to string yields a string of one rune, not a string of digits * Add receipts table to syncapi * Use StreamingToken as the since value * Add required method to testEDUProducer * Make receipt json creation "easier" to read * Add receipts api to the eduserver * Add receipts endpoint * Add eduserver kafka consumer * Add missing kafka config * Add passing tests to whitelist Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Fix copy & paste error * Fix column count error * Make outbound federation receipts pass * Make "Inbound federation rejects receipts from wrong remote" pass * Don't use errors package * - Add TODO for batching requests - Rename variable * Return a better error message * - Use OutputReceiptEvent instead of InputReceiptEvent as result - Don't use the errors package for errors - Defer CloseAndLogIfError to close rows - Fix Copyright * Better creation/usage of JoinResponse * Query all joined rooms instead of just one * Update gomatrixserverlib * Add sqlite3 migration * Add postgres migration * Ensure required sequence exists before running migrations * Clarification on comment * - Fix a bug when creating client receipts - Use concrete types instead of interface{} * Remove dead code Use key for timestamp * Fix postgres query... * Remove single purpose struct * Use key/value directly * Only apply receipts on initial sync or if edu positions differ, otherwise we'll be sending the same receipts over and over again. * Actually update the id, so it is correctly send in syncs * Set receipt on request to /read_markers * Fix issue with receipts getting overwritten * Use fmt.Errorf instead of pkg/errors * Revert "Add postgres migration" This reverts commit 722fe5a04628882b787d096942459961db159b06. * Revert "Add sqlite3 migration" This reverts commit d113b03f6495a4b8f8bcf158a3d00b510b4240cc. * Fix selectRoomReceipts query * Make golangci-lint happy Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
-rw-r--r--clientapi/routing/account_data.go10
-rw-r--r--clientapi/routing/receipt.go54
-rw-r--r--clientapi/routing/routing.go15
-rw-r--r--eduserver/api/input.go22
-rw-r--r--eduserver/api/output.go36
-rw-r--r--eduserver/api/wrapper.go19
-rw-r--r--eduserver/eduserver.go5
-rw-r--r--eduserver/input/input.go30
-rw-r--r--eduserver/inthttp/client.go14
-rw-r--r--eduserver/inthttp/server.go13
-rw-r--r--federationapi/routing/send.go57
-rw-r--r--federationapi/routing/send_test.go8
-rw-r--r--federationsender/consumers/eduserver.go74
-rw-r--r--internal/caching/cache_roomservernids.go6
-rw-r--r--internal/config/config_kafka.go1
-rw-r--r--internal/transactions/transactions_test.go5
-rw-r--r--syncapi/consumers/eduserver_receipts.go94
-rw-r--r--syncapi/storage/interface.go6
-rw-r--r--syncapi/storage/postgres/receipt_table.go106
-rw-r--r--syncapi/storage/postgres/syncserver.go5
-rw-r--r--syncapi/storage/shared/syncserver.go92
-rw-r--r--syncapi/storage/sqlite3/receipt_table.go118
-rw-r--r--syncapi/storage/sqlite3/syncserver.go5
-rw-r--r--syncapi/storage/tables/interface.go6
-rw-r--r--syncapi/sync/notifier.go10
-rw-r--r--syncapi/syncapi.go7
-rw-r--r--sytest-whitelist6
27 files changed, 803 insertions, 21 deletions
diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go
index 48303c97..22e63513 100644
--- a/clientapi/routing/account_data.go
+++ b/clientapi/routing/account_data.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
+ eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api"
@@ -148,7 +149,8 @@ type fullyReadEvent struct {
// SaveReadMarker implements POST /rooms/{roomId}/read_markers
func SaveReadMarker(
- req *http.Request, userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
+ req *http.Request,
+ userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI,
syncProducer *producers.SyncAPIProducer, device *api.Device, roomID string,
) util.JSONResponse {
// Verify that the user is a member of this room
@@ -192,8 +194,10 @@ func SaveReadMarker(
return jsonerror.InternalServerError()
}
- // TODO handle the read receipt that may be included in the read marker
- // See https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-rooms-roomid-read-markers
+ // Handle the read receipt that may be included in the read marker
+ if r.Read != "" {
+ return SetReceipt(req, eduAPI, device, roomID, "m.read", r.Read)
+ }
return util.JSONResponse{
Code: http.StatusOK,
diff --git a/clientapi/routing/receipt.go b/clientapi/routing/receipt.go
new file mode 100644
index 00000000..fe8fe765
--- /dev/null
+++ b/clientapi/routing/receipt.go
@@ -0,0 +1,54 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "fmt"
+ "net/http"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+
+ "github.com/matrix-org/dendrite/eduserver/api"
+
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
+)
+
+func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi.Device, roomId, receiptType, eventId string) util.JSONResponse {
+ timestamp := gomatrixserverlib.AsTimestamp(time.Now())
+ logrus.WithFields(logrus.Fields{
+ "roomId": roomId,
+ "receiptType": receiptType,
+ "eventId": eventId,
+ "userId": device.UserID,
+ "timestamp": timestamp,
+ }).Debug("Setting receipt")
+
+ // currently only m.read is accepted
+ if receiptType != "m.read" {
+ return util.MessageResponse(400, fmt.Sprintf("receipt type must be m.read not '%s'", receiptType))
+ }
+
+ if err := api.SendReceipt(req.Context(), eduAPI, device.UserID, roomId, eventId, receiptType, timestamp); err != nil {
+ return util.ErrorResponse(err)
+ }
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: struct{}{},
+ }
+}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index 369ed571..99d1bd09 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -705,7 +705,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SaveReadMarker(req, userAPI, rsAPI, syncProducer, device, vars["roomID"])
+ return SaveReadMarker(req, userAPI, rsAPI, eduAPI, syncProducer, device, vars["roomID"])
}),
).Methods(http.MethodPost, http.MethodOptions)
@@ -843,4 +843,17 @@ func Setup(
return ClaimKeys(req, keyAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
+ r0mux.Handle("/rooms/{roomId}/receipt/{receiptType}/{eventId}",
+ httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ if r := rateLimits.rateLimit(req); r != nil {
+ return *r
+ }
+ vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
+ if err != nil {
+ return util.ErrorResponse(err)
+ }
+
+ return SetReceipt(req, eduAPI, device, vars["roomId"], vars["receiptType"], vars["eventId"])
+ }),
+ ).Methods(http.MethodPost, http.MethodOptions)
}
diff --git a/eduserver/api/input.go b/eduserver/api/input.go
index 0d0d21f3..f8599e1c 100644
--- a/eduserver/api/input.go
+++ b/eduserver/api/input.go
@@ -59,6 +59,22 @@ type InputSendToDeviceEventRequest struct {
// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest
type InputSendToDeviceEventResponse struct{}
+type InputReceiptEvent struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ EventID string `json:"event_id"`
+ Type string `json:"type"`
+ Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
+}
+
+// InputReceiptEventRequest is a request to EDUServerInputAPI
+type InputReceiptEventRequest struct {
+ InputReceiptEvent InputReceiptEvent `json:"input_receipt_event"`
+}
+
+// InputReceiptEventResponse is a response to InputReceiptEventRequest
+type InputReceiptEventResponse struct{}
+
// EDUServerInputAPI is used to write events to the typing server.
type EDUServerInputAPI interface {
InputTypingEvent(
@@ -72,4 +88,10 @@ type EDUServerInputAPI interface {
request *InputSendToDeviceEventRequest,
response *InputSendToDeviceEventResponse,
) error
+
+ InputReceiptEvent(
+ ctx context.Context,
+ request *InputReceiptEventRequest,
+ response *InputReceiptEventResponse,
+ ) error
}
diff --git a/eduserver/api/output.go b/eduserver/api/output.go
index e6ded841..650458a2 100644
--- a/eduserver/api/output.go
+++ b/eduserver/api/output.go
@@ -49,3 +49,39 @@ type OutputSendToDeviceEvent struct {
DeviceID string `json:"device_id"`
gomatrixserverlib.SendToDeviceEvent
}
+
+type ReceiptEvent struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ EventID string `json:"event_id"`
+ Type string `json:"type"`
+ Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
+}
+
+// OutputReceiptEvent is an entry in the receipt output kafka log
+type OutputReceiptEvent struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ EventID string `json:"event_id"`
+ Type string `json:"type"`
+ Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
+}
+
+// Helper structs for receipts json creation
+type ReceiptMRead struct {
+ User map[string]ReceiptTS `json:"m.read"`
+}
+
+type ReceiptTS struct {
+ TS gomatrixserverlib.Timestamp `json:"ts"`
+}
+
+// FederationSender output
+type FederationReceiptMRead struct {
+ User map[string]FederationReceiptData `json:"m.read"`
+}
+
+type FederationReceiptData struct {
+ Data ReceiptTS `json:"data"`
+ EventIDs []string `json:"event_ids"`
+}
diff --git a/eduserver/api/wrapper.go b/eduserver/api/wrapper.go
index c2c4596d..7907f4d3 100644
--- a/eduserver/api/wrapper.go
+++ b/eduserver/api/wrapper.go
@@ -67,3 +67,22 @@ func SendToDevice(
response := InputSendToDeviceEventResponse{}
return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
}
+
+// SendReceipt sends a receipt event to EDU Server
+func SendReceipt(
+ ctx context.Context,
+ eduAPI EDUServerInputAPI, userID, roomID, eventID, receiptType string,
+ timestamp gomatrixserverlib.Timestamp,
+) error {
+ request := InputReceiptEventRequest{
+ InputReceiptEvent: InputReceiptEvent{
+ UserID: userID,
+ RoomID: roomID,
+ EventID: eventID,
+ Type: receiptType,
+ Timestamp: timestamp,
+ },
+ }
+ response := InputReceiptEventResponse{}
+ return eduAPI.InputReceiptEvent(ctx, &request, &response)
+}
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
index 098ac024..d5ab3681 100644
--- a/eduserver/eduserver.go
+++ b/eduserver/eduserver.go
@@ -49,8 +49,9 @@ func NewInternalAPI(
Cache: eduCache,
UserAPI: userAPI,
Producer: producer,
- OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
- OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
+ OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
+ OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
+ OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
ServerName: cfg.Matrix.ServerName,
}
}
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
index e3d2c55e..c54fb9de 100644
--- a/eduserver/input/input.go
+++ b/eduserver/input/input.go
@@ -37,6 +37,8 @@ type EDUServerInputAPI struct {
OutputTypingEventTopic string
// The kafka topic to output new send to device events to.
OutputSendToDeviceEventTopic string
+ // The kafka topic to output new receipt events to
+ OutputReceiptEventTopic string
// kafka producer
Producer sarama.SyncProducer
// Internal user query API
@@ -173,3 +175,31 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
return nil
}
+
+// InputReceiptEvent implements api.EDUServerInputAPI
+// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events)
+func (t *EDUServerInputAPI) InputReceiptEvent(
+ ctx context.Context,
+ request *api.InputReceiptEventRequest,
+ response *api.InputReceiptEventResponse,
+) error {
+ logrus.WithFields(logrus.Fields{}).Infof("Producing to topic '%s'", t.OutputReceiptEventTopic)
+ output := &api.OutputReceiptEvent{
+ UserID: request.InputReceiptEvent.UserID,
+ RoomID: request.InputReceiptEvent.RoomID,
+ EventID: request.InputReceiptEvent.EventID,
+ Type: request.InputReceiptEvent.Type,
+ Timestamp: request.InputReceiptEvent.Timestamp,
+ }
+ js, err := json.Marshal(output)
+ if err != nil {
+ return err
+ }
+ m := &sarama.ProducerMessage{
+ Topic: t.OutputReceiptEventTopic,
+ Key: sarama.StringEncoder(request.InputReceiptEvent.RoomID + ":" + request.InputReceiptEvent.UserID),
+ Value: sarama.ByteEncoder(js),
+ }
+ _, _, err = t.Producer.SendMessage(m)
+ return err
+}
diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go
index 7d0bc160..0690ed82 100644
--- a/eduserver/inthttp/client.go
+++ b/eduserver/inthttp/client.go
@@ -14,6 +14,7 @@ import (
const (
EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
+ EDUServerInputReceiptEventPath = "/eduserver/receipt"
)
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@@ -54,3 +55,16 @@ func (h *httpEDUServerInputAPI) InputSendToDeviceEvent(
apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+
+// InputSendToDeviceEvent implements EDUServerInputAPI
+func (h *httpEDUServerInputAPI) InputReceiptEvent(
+ ctx context.Context,
+ request *api.InputReceiptEventRequest,
+ response *api.InputReceiptEventResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "InputReceiptEventPath")
+ defer span.Finish()
+
+ apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go
index e374513a..a3494375 100644
--- a/eduserver/inthttp/server.go
+++ b/eduserver/inthttp/server.go
@@ -38,4 +38,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(EDUServerInputReceiptEventPath,
+ httputil.MakeInternalAPI("inputReceiptEvent", func(req *http.Request) util.JSONResponse {
+ var request api.InputReceiptEventRequest
+ var response api.InputReceiptEventResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := t.InputReceiptEvent(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
}
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 76dc3a2e..79fbcb3d 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -322,12 +322,69 @@ func (t *txnReq) processEDUs(ctx context.Context) {
}
case gomatrixserverlib.MDeviceListUpdate:
t.processDeviceListUpdate(ctx, e)
+ case gomatrixserverlib.MReceipt:
+ // https://matrix.org/docs/spec/server_server/r0.1.4#receipts
+ payload := map[string]eduserverAPI.FederationReceiptMRead{}
+
+ if err := json.Unmarshal(e.Content, &payload); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal receipt event")
+ continue
+ }
+
+ for roomID, receipt := range payload {
+ for userID, mread := range receipt.User {
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error("Failed to split domain from receipt event sender")
+ continue
+ }
+ if t.Origin != domain {
+ util.GetLogger(ctx).Warnf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
+ continue
+ }
+ if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil {
+ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
+ "sender": t.Origin,
+ "user_id": userID,
+ "room_id": roomID,
+ "events": mread.EventIDs,
+ }).Error("Failed to send receipt event to edu server")
+ continue
+ }
+ }
+ }
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}
}
}
+// processReceiptEvent sends receipt events to the edu server
+func (t *txnReq) processReceiptEvent(ctx context.Context,
+ userID, roomID, receiptType string,
+ timestamp gomatrixserverlib.Timestamp,
+ eventIDs []string,
+) error {
+ // store every event
+ for _, eventID := range eventIDs {
+ req := eduserverAPI.InputReceiptEventRequest{
+ InputReceiptEvent: eduserverAPI.InputReceiptEvent{
+ UserID: userID,
+ RoomID: roomID,
+ EventID: eventID,
+ Type: receiptType,
+ Timestamp: timestamp,
+ },
+ }
+ resp := eduserverAPI.InputReceiptEventResponse{}
+ if err := t.eduAPI.InputReceiptEvent(ctx, &req, &resp); err != nil {
+ return fmt.Errorf("unable to set receipt event: %w", err)
+ }
+ }
+
+ return nil
+}
+
func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverlib.EDU) {
var payload gomatrixserverlib.DeviceListUpdateEvent
if err := json.Unmarshal(e.Content, &payload); err != nil {
diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go
index f292e741..9398fef7 100644
--- a/federationapi/routing/send_test.go
+++ b/federationapi/routing/send_test.go
@@ -76,6 +76,14 @@ func (p *testEDUProducer) InputSendToDeviceEvent(
return nil
}
+func (o *testEDUProducer) InputReceiptEvent(
+ ctx context.Context,
+ request *eduAPI.InputReceiptEventRequest,
+ response *eduAPI.InputReceiptEventResponse,
+) error {
+ return nil
+}
+
type testRoomserverAPI struct {
inputRoomEvents []api.InputRoomEvent
queryMissingAuthPrevEvents func(*api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse
diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go
index d9ac41b3..9d7574e6 100644
--- a/federationsender/consumers/eduserver.go
+++ b/federationsender/consumers/eduserver.go
@@ -34,6 +34,7 @@ import (
type OutputEDUConsumer struct {
typingConsumer *internal.ContinualConsumer
sendToDeviceConsumer *internal.ContinualConsumer
+ receiptConsumer *internal.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
@@ -51,24 +52,31 @@ func NewOutputEDUConsumer(
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
ComponentName: "eduserver/typing",
- Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
+ Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
ComponentName: "eduserver/sendtodevice",
- Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
+ Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ },
+ receiptConsumer: &internal.ContinualConsumer{
+ ComponentName: "eduserver/receipt",
+ Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
- TypingTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
- SendToDeviceTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
+ TypingTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
+ SendToDeviceTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
}
c.typingConsumer.ProcessMessage = c.onTypingEvent
c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent
+ c.receiptConsumer.ProcessMessage = c.onReceiptEvent
return c
}
@@ -81,6 +89,9 @@ func (t *OutputEDUConsumer) Start() error {
if err := t.sendToDeviceConsumer.Start(); err != nil {
return fmt.Errorf("t.sendToDeviceConsumer.Start: %w", err)
}
+ if err := t.receiptConsumer.Start(); err != nil {
+ return fmt.Errorf("t.receiptConsumer.Start: %w", err)
+ }
return nil
}
@@ -177,3 +188,58 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *sarama.ConsumerMessage) error {
return t.queues.SendEDU(edu, t.ServerName, names)
}
+
+// onReceiptEvent is called in response to a message received on the receipt
+// events topic from the EDU server.
+func (t *OutputEDUConsumer) onReceiptEvent(msg *sarama.ConsumerMessage) error {
+ // Extract the typing event from msg.
+ var receipt api.OutputReceiptEvent
+ if err := json.Unmarshal(msg.Value, &receipt); err != nil {
+ // Skip this msg but continue processing messages.
+ log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
+ return nil
+ }
+
+ // only send receipt events which originated from us
+ _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", receipt.UserID).Error("Failed to extract domain from receipt sender")
+ return nil
+ }
+ if receiptServerName != t.ServerName {
+ log.WithField("other_server", receiptServerName).Info("Suppressing receipt notif: originated elsewhere")
+ return nil
+ }
+
+ joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID)
+ if err != nil {
+ return err
+ }
+
+ names := make([]gomatrixserverlib.ServerName, len(joined))
+ for i := range joined {
+ names[i] = joined[i].ServerName
+ }
+
+ content := map[string]api.FederationReceiptMRead{}
+ content[receipt.RoomID] = api.FederationReceiptMRead{
+ User: map[string]api.FederationReceiptData{
+ receipt.UserID: {
+ Data: api.ReceiptTS{
+ TS: receipt.Timestamp,
+ },
+ EventIDs: []string{receipt.EventID},
+ },
+ },
+ }
+
+ edu := &gomatrixserverlib.EDU{
+ Type: gomatrixserverlib.MReceipt,
+ Origin: string(t.ServerName),
+ }
+ if edu.Content, err = json.Marshal(content); err != nil {
+ return err
+ }
+
+ return t.queues.SendEDU(edu, t.ServerName, names)
+}
diff --git a/internal/caching/cache_roomservernids.go b/internal/caching/cache_roomservernids.go
index 7cb312c9..cac59549 100644
--- a/internal/caching/cache_roomservernids.go
+++ b/internal/caching/cache_roomservernids.go
@@ -1,6 +1,8 @@
package caching
import (
+ "strconv"
+
"github.com/matrix-org/dendrite/roomserver/types"
)
@@ -83,11 +85,11 @@ func (c Caches) GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) {
func (c Caches) StoreRoomServerRoomNID(roomID string, roomNID types.RoomNID) {
c.RoomServerRoomNIDs.Set(roomID, roomNID)
- c.RoomServerRoomIDs.Set(string(roomNID), roomID)
+ c.RoomServerRoomIDs.Set(strconv.Itoa(int(roomNID)), roomID)
}
func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) {
- val, found := c.RoomServerRoomIDs.Get(string(roomNID))
+ val, found := c.RoomServerRoomIDs.Get(strconv.Itoa(int(roomNID)))
if found && val != nil {
if roomID, ok := val.(string); ok {
return roomID, true
diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go
index 707c92a7..aa91e558 100644
--- a/internal/config/config_kafka.go
+++ b/internal/config/config_kafka.go
@@ -9,6 +9,7 @@ const (
TopicOutputKeyChangeEvent = "OutputKeyChangeEvent"
TopicOutputRoomEvent = "OutputRoomEvent"
TopicOutputClientData = "OutputClientData"
+ TopicOutputReceiptEvent = "OutputReceiptEvent"
)
type Kafka struct {
diff --git a/internal/transactions/transactions_test.go b/internal/transactions/transactions_test.go
index f565e484..aa837f76 100644
--- a/internal/transactions/transactions_test.go
+++ b/internal/transactions/transactions_test.go
@@ -14,6 +14,7 @@ package transactions
import (
"net/http"
+ "strconv"
"testing"
"github.com/matrix-org/util"
@@ -44,8 +45,8 @@ func TestCache(t *testing.T) {
for i := 1; i <= 100; i++ {
fakeTxnCache.AddTransaction(
fakeAccessToken,
- fakeTxnID+string(i),
- &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: string(i)}},
+ fakeTxnID+strconv.Itoa(i),
+ &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: strconv.Itoa(i)}},
)
}
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
new file mode 100644
index 00000000..c5d17414
--- /dev/null
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -0,0 +1,94 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputReceiptEventConsumer consumes events that originated in the EDU server.
+type OutputReceiptEventConsumer struct {
+ receiptConsumer *internal.ContinualConsumer
+ db storage.Database
+ notifier *sync.Notifier
+}
+
+// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
+// Call Start() to begin consuming from the EDU server.
+func NewOutputReceiptEventConsumer(
+ cfg *config.SyncAPI,
+ kafkaConsumer sarama.Consumer,
+ n *sync.Notifier,
+ store storage.Database,
+) *OutputReceiptEventConsumer {
+
+ consumer := internal.ContinualConsumer{
+ ComponentName: "syncapi/eduserver/receipt",
+ Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+
+ s := &OutputReceiptEventConsumer{
+ receiptConsumer: &consumer,
+ db: store,
+ notifier: n,
+ }
+
+ consumer.ProcessMessage = s.onMessage
+
+ return s
+}
+
+// Start consuming from EDU api
+func (s *OutputReceiptEventConsumer) Start() error {
+ return s.receiptConsumer.Start()
+}
+
+func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ var output api.OutputReceiptEvent
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("EDU server output log: message parse failure")
+ return nil
+ }
+
+ streamPos, err := s.db.StoreReceipt(
+ context.TODO(),
+ output.RoomID,
+ output.Type,
+ output.UserID,
+ output.EventID,
+ output.Timestamp,
+ )
+ if err != nil {
+ return err
+ }
+ // update stream position
+ s.notifier.OnNewReceipt(types.NewStreamToken(0, streamPos, nil))
+
+ return nil
+}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index e12a1166..727cc048 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -18,6 +18,8 @@ import (
"context"
"time"
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
+
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -147,4 +149,8 @@ type Database interface {
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
+ // StoreReceipt stores new receipt events
+ StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
+ // GetRoomReceipts gets all receipts for a given roomID
+ GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
}
diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go
new file mode 100644
index 00000000..c5ec6cbc
--- /dev/null
+++ b/syncapi/storage/postgres/receipt_table.go
@@ -0,0 +1,106 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/lib/pq"
+
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const receiptsSchema = `
+CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
+-- Stores data about receipts
+CREATE TABLE IF NOT EXISTS syncapi_receipts (
+ -- The ID
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ room_id TEXT NOT NULL,
+ receipt_type TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ receipt_ts BIGINT NOT NULL,
+ CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
+);
+CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id ON syncapi_receipts(room_id);
+`
+
+const upsertReceipt = "" +
+ "INSERT INTO syncapi_receipts" +
+ " (room_id, receipt_type, user_id, event_id, receipt_ts)" +
+ " VALUES ($1, $2, $3, $4, $5)" +
+ " ON CONFLICT (room_id, receipt_type, user_id)" +
+ " DO UPDATE SET id = nextval('syncapi_stream_id'), event_id = $4, receipt_ts = $5" +
+ " RETURNING id"
+
+const selectRoomReceipts = "" +
+ "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
+ " FROM syncapi_receipts" +
+ " WHERE room_id = ANY($1) AND id > $2"
+
+type receiptStatements struct {
+ db *sql.DB
+ upsertReceipt *sql.Stmt
+ selectRoomReceipts *sql.Stmt
+}
+
+func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
+ _, err := db.Exec(receiptsSchema)
+ if err != nil {
+ return nil, err
+ }
+ r := &receiptStatements{
+ db: db,
+ }
+ if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
+ return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err)
+ }
+ if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
+ return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
+ }
+ return r, nil
+}
+
+func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
+ stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
+ err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos)
+ return
+}
+
+func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) {
+ rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
+ if err != nil {
+ return nil, fmt.Errorf("unable to query room receipts: %w", err)
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
+ var res []api.OutputReceiptEvent
+ for rows.Next() {
+ r := api.OutputReceiptEvent{}
+ err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
+ if err != nil {
+ return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
+ }
+ res = append(res, r)
+ }
+ return res, rows.Err()
+}
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 7f19722a..979e19a0 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -82,6 +82,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
+ receipts, err := NewPostgresReceiptsTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
@@ -94,6 +98,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
BackwardExtremities: backwardExtremities,
Filter: filter,
SendToDevice: sendToDevice,
+ Receipts: receipts,
EDUCache: cache.New(),
}
return &d, nil
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index a7c07f94..2b82ee33 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -21,6 +21,7 @@ import (
"fmt"
"time"
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache"
@@ -47,6 +48,7 @@ type Database struct {
BackwardExtremities tables.BackwardsExtremities
SendToDevice tables.SendToDevice
Filter tables.Filter
+ Receipts tables.Receipts
EDUCache *cache.EDUCache
}
@@ -527,10 +529,10 @@ func (d *Database) addTypingDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
- var jr types.JoinResponse
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
+ var jr types.JoinResponse
if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUPosition()),
); updated {
@@ -554,21 +556,84 @@ func (d *Database) addTypingDeltaToResponse(
return nil
}
+// addReceiptDeltaToResponse adds all receipt information to a sync response
+// since the specified position
+func (d *Database) addReceiptDeltaToResponse(
+ since types.StreamingToken,
+ joinedRoomIDs []string,
+ res *types.Response,
+) error {
+ receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition())
+ if err != nil {
+ return fmt.Errorf("unable to select receipts for rooms: %w", err)
+ }
+
+ // Group receipts by room, so we can create one ClientEvent for every room
+ receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
+ for _, receipt := range receipts {
+ receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
+ }
+
+ for roomID, receipts := range receiptsByRoom {
+ var jr types.JoinResponse
+ var ok bool
+
+ // Make sure we use an existing JoinResponse if there is one.
+ // If not, we'll create a new one
+ if jr, ok = res.Rooms.Join[roomID]; !ok {
+ jr = types.JoinResponse{}
+ }
+
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MReceipt,
+ RoomID: roomID,
+ }
+ content := make(map[string]eduAPI.ReceiptMRead)
+ for _, receipt := range receipts {
+ var read eduAPI.ReceiptMRead
+ if read, ok = content[receipt.EventID]; !ok {
+ read = eduAPI.ReceiptMRead{
+ User: make(map[string]eduAPI.ReceiptTS),
+ }
+ }
+ read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
+ content[receipt.EventID] = read
+ }
+ ev.Content, err = json.Marshal(content)
+ if err != nil {
+ return err
+ }
+
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ res.Rooms.Join[roomID] = jr
+ }
+
+ return nil
+}
+
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
func (d *Database) addEDUDeltaToResponse(
fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
-) (err error) {
-
+) error {
if fromPos.EDUPosition() != toPos.EDUPosition() {
- err = d.addTypingDeltaToResponse(
- fromPos, joinedRoomIDs, res,
- )
+ // add typing deltas
+ if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
+ return fmt.Errorf("unable to apply typing delta to response: %w", err)
+ }
}
- return
+ // Check on initial sync and if EDUPositions differ
+ if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) ||
+ fromPos.EDUPosition() != toPos.EDUPosition() {
+ if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
+ return fmt.Errorf("unable to apply receipts to response: %w", err)
+ }
+ }
+
+ return nil
}
func (d *Database) GetFilter(
@@ -1404,3 +1469,16 @@ type stateDelta struct {
// Can be 0 if there is no membership event in this delta.
membershipPos types.StreamPosition
}
+
+// StoreReceipt stores user receipts
+func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ pos, err = d.Receipts.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId, timestamp)
+ return err
+ })
+ return
+}
+
+func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) {
+ return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
+}
diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go
new file mode 100644
index 00000000..b1770e80
--- /dev/null
+++ b/syncapi/storage/sqlite3/receipt_table.go
@@ -0,0 +1,118 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strings"
+
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const receiptsSchema = `
+-- Stores data about receipts
+CREATE TABLE IF NOT EXISTS syncapi_receipts (
+ -- The ID
+ id BIGINT,
+ room_id TEXT NOT NULL,
+ receipt_type TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ receipt_ts BIGINT NOT NULL,
+ CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
+);
+CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id_idx ON syncapi_receipts(room_id);
+`
+
+const upsertReceipt = "" +
+ "INSERT INTO syncapi_receipts" +
+ " (id, room_id, receipt_type, user_id, event_id, receipt_ts)" +
+ " VALUES ($1, $2, $3, $4, $5, $6)" +
+ " ON CONFLICT (room_id, receipt_type, user_id)" +
+ " DO UPDATE SET id = $7, event_id = $8, receipt_ts = $9"
+
+const selectRoomReceipts = "" +
+ "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
+ " FROM syncapi_receipts" +
+ " WHERE id > $1 and room_id in ($2)"
+
+type receiptStatements struct {
+ db *sql.DB
+ streamIDStatements *streamIDStatements
+ upsertReceipt *sql.Stmt
+ selectRoomReceipts *sql.Stmt
+}
+
+func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) {
+ _, err := db.Exec(receiptsSchema)
+ if err != nil {
+ return nil, err
+ }
+ r := &receiptStatements{
+ db: db,
+ streamIDStatements: streamID,
+ }
+ if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
+ return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err)
+ }
+ if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
+ return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
+ }
+ return r, nil
+}
+
+// UpsertReceipt creates new user receipts
+func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
+ pos, err = r.streamIDStatements.nextStreamID(ctx, txn)
+ if err != nil {
+ return
+ }
+ stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
+ _, err = stmt.ExecContext(ctx, pos, roomId, receiptType, userId, eventId, timestamp, pos, eventId, timestamp)
+ return
+}
+
+// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
+func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) {
+ selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
+
+ params := make([]interface{}, len(roomIDs)+1)
+ params[0] = streamPos
+ for k, v := range roomIDs {
+ params[k+1] = v
+ }
+ rows, err := r.db.QueryContext(ctx, selectSQL, params...)
+ if err != nil {
+ return nil, fmt.Errorf("unable to query room receipts: %w", err)
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
+ var res []api.OutputReceiptEvent
+ for rows.Next() {
+ r := api.OutputReceiptEvent{}
+ err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
+ if err != nil {
+ return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
+ }
+ res = append(res, r)
+ }
+ return res, rows.Err()
+}
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 86d83ec9..036e2b2e 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -95,6 +95,10 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil {
return err
}
+ receipts, err := NewSqliteReceiptsTable(d.db, &d.streamID)
+ if err != nil {
+ return err
+ }
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
@@ -107,6 +111,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
Topology: topology,
Filter: filter,
SendToDevice: sendToDevice,
+ Receipts: receipts,
EDUCache: cache.New(),
}
return nil
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index da095be5..f8e7a224 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -18,6 +18,7 @@ import (
"context"
"database/sql"
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -156,3 +157,8 @@ type Filter interface {
SelectFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error)
InsertFilter(ctx context.Context, filter *gomatrixserverlib.Filter, localpart string) (filterID string, err error)
}
+
+type Receipts interface {
+ UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
+ SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
+}
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index fcac3f16..daa3a1d8 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -149,6 +149,16 @@ func (n *Notifier) OnNewSendToDevice(
n.wakeupUserDevice(userID, deviceIDs, latestPos)
}
+// OnNewReceipt updates the current position
+func (n *Notifier) OnNewReceipt(
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+ latestPos := n.currPos.WithUpdates(posUpdate)
+ n.currPos = latestPos
+}
+
func (n *Notifier) OnNewKeyChange(
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
) {
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index de0bb434..393a7aa5 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -99,5 +99,12 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
+ receiptConsumer := consumers.NewOutputReceiptEventConsumer(
+ cfg, consumer, notifier, syncDB,
+ )
+ if err = receiptConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start receipts consumer")
+ }
+
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}
diff --git a/sytest-whitelist b/sytest-whitelist
index ac089eab..49011b5a 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -483,6 +483,12 @@ POST rejects invalid utf-8 in JSON
Users cannot kick users who have already left a room
Event with an invalid signature in the send_join response should not cause room join to fail
Inbound federation rejects typing notifications from wrong remote
+POST /rooms/:room_id/receipt can create receipts
+Receipts must be m.read
+Read receipts appear in initial v2 /sync
+New read receipts appear in incremental v2 /sync
+Outbound federation sends receipts
+Inbound federation rejects receipts from wrong remote
Should not be able to take over the room by pretending there is no PL event
Can get rooms/{roomId}/state for a departed room (SPEC-216)
Users cannot set notifications powerlevel higher than their own