aboutsummaryrefslogtreecommitdiff
path: root/eduserver
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2020-11-09 19:46:11 +0100
committerGitHub <noreply@github.com>2020-11-09 18:46:11 +0000
commitbcb89ada5ebbe54fa057ec403af4074a8c147764 (patch)
tree283cdbaf04db7fe5bcd25185b40f77c60aa928ca /eduserver
parenteccd0d2c1b8bd4b921bafca4585aa09d32ae561f (diff)
Implement read receipts (#1528)
* fix conversion from int to string yields a string of one rune, not a string of digits * Add receipts table to syncapi * Use StreamingToken as the since value * Add required method to testEDUProducer * Make receipt json creation "easier" to read * Add receipts api to the eduserver * Add receipts endpoint * Add eduserver kafka consumer * Add missing kafka config * Add passing tests to whitelist Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Fix copy & paste error * Fix column count error * Make outbound federation receipts pass * Make "Inbound federation rejects receipts from wrong remote" pass * Don't use errors package * - Add TODO for batching requests - Rename variable * Return a better error message * - Use OutputReceiptEvent instead of InputReceiptEvent as result - Don't use the errors package for errors - Defer CloseAndLogIfError to close rows - Fix Copyright * Better creation/usage of JoinResponse * Query all joined rooms instead of just one * Update gomatrixserverlib * Add sqlite3 migration * Add postgres migration * Ensure required sequence exists before running migrations * Clarification on comment * - Fix a bug when creating client receipts - Use concrete types instead of interface{} * Remove dead code Use key for timestamp * Fix postgres query... * Remove single purpose struct * Use key/value directly * Only apply receipts on initial sync or if edu positions differ, otherwise we'll be sending the same receipts over and over again. * Actually update the id, so it is correctly send in syncs * Set receipt on request to /read_markers * Fix issue with receipts getting overwritten * Use fmt.Errorf instead of pkg/errors * Revert "Add postgres migration" This reverts commit 722fe5a04628882b787d096942459961db159b06. * Revert "Add sqlite3 migration" This reverts commit d113b03f6495a4b8f8bcf158a3d00b510b4240cc. * Fix selectRoomReceipts query * Make golangci-lint happy Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'eduserver')
-rw-r--r--eduserver/api/input.go22
-rw-r--r--eduserver/api/output.go36
-rw-r--r--eduserver/api/wrapper.go19
-rw-r--r--eduserver/eduserver.go5
-rw-r--r--eduserver/input/input.go30
-rw-r--r--eduserver/inthttp/client.go14
-rw-r--r--eduserver/inthttp/server.go13
7 files changed, 137 insertions, 2 deletions
diff --git a/eduserver/api/input.go b/eduserver/api/input.go
index 0d0d21f3..f8599e1c 100644
--- a/eduserver/api/input.go
+++ b/eduserver/api/input.go
@@ -59,6 +59,22 @@ type InputSendToDeviceEventRequest struct {
// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest
type InputSendToDeviceEventResponse struct{}
+type InputReceiptEvent struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ EventID string `json:"event_id"`
+ Type string `json:"type"`
+ Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
+}
+
+// InputReceiptEventRequest is a request to EDUServerInputAPI
+type InputReceiptEventRequest struct {
+ InputReceiptEvent InputReceiptEvent `json:"input_receipt_event"`
+}
+
+// InputReceiptEventResponse is a response to InputReceiptEventRequest
+type InputReceiptEventResponse struct{}
+
// EDUServerInputAPI is used to write events to the typing server.
type EDUServerInputAPI interface {
InputTypingEvent(
@@ -72,4 +88,10 @@ type EDUServerInputAPI interface {
request *InputSendToDeviceEventRequest,
response *InputSendToDeviceEventResponse,
) error
+
+ InputReceiptEvent(
+ ctx context.Context,
+ request *InputReceiptEventRequest,
+ response *InputReceiptEventResponse,
+ ) error
}
diff --git a/eduserver/api/output.go b/eduserver/api/output.go
index e6ded841..650458a2 100644
--- a/eduserver/api/output.go
+++ b/eduserver/api/output.go
@@ -49,3 +49,39 @@ type OutputSendToDeviceEvent struct {
DeviceID string `json:"device_id"`
gomatrixserverlib.SendToDeviceEvent
}
+
+type ReceiptEvent struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ EventID string `json:"event_id"`
+ Type string `json:"type"`
+ Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
+}
+
+// OutputReceiptEvent is an entry in the receipt output kafka log
+type OutputReceiptEvent struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ EventID string `json:"event_id"`
+ Type string `json:"type"`
+ Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
+}
+
+// Helper structs for receipts json creation
+type ReceiptMRead struct {
+ User map[string]ReceiptTS `json:"m.read"`
+}
+
+type ReceiptTS struct {
+ TS gomatrixserverlib.Timestamp `json:"ts"`
+}
+
+// FederationSender output
+type FederationReceiptMRead struct {
+ User map[string]FederationReceiptData `json:"m.read"`
+}
+
+type FederationReceiptData struct {
+ Data ReceiptTS `json:"data"`
+ EventIDs []string `json:"event_ids"`
+}
diff --git a/eduserver/api/wrapper.go b/eduserver/api/wrapper.go
index c2c4596d..7907f4d3 100644
--- a/eduserver/api/wrapper.go
+++ b/eduserver/api/wrapper.go
@@ -67,3 +67,22 @@ func SendToDevice(
response := InputSendToDeviceEventResponse{}
return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
}
+
+// SendReceipt sends a receipt event to EDU Server
+func SendReceipt(
+ ctx context.Context,
+ eduAPI EDUServerInputAPI, userID, roomID, eventID, receiptType string,
+ timestamp gomatrixserverlib.Timestamp,
+) error {
+ request := InputReceiptEventRequest{
+ InputReceiptEvent: InputReceiptEvent{
+ UserID: userID,
+ RoomID: roomID,
+ EventID: eventID,
+ Type: receiptType,
+ Timestamp: timestamp,
+ },
+ }
+ response := InputReceiptEventResponse{}
+ return eduAPI.InputReceiptEvent(ctx, &request, &response)
+}
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
index 098ac024..d5ab3681 100644
--- a/eduserver/eduserver.go
+++ b/eduserver/eduserver.go
@@ -49,8 +49,9 @@ func NewInternalAPI(
Cache: eduCache,
UserAPI: userAPI,
Producer: producer,
- OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
- OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
+ OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
+ OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
+ OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
ServerName: cfg.Matrix.ServerName,
}
}
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
index e3d2c55e..c54fb9de 100644
--- a/eduserver/input/input.go
+++ b/eduserver/input/input.go
@@ -37,6 +37,8 @@ type EDUServerInputAPI struct {
OutputTypingEventTopic string
// The kafka topic to output new send to device events to.
OutputSendToDeviceEventTopic string
+ // The kafka topic to output new receipt events to
+ OutputReceiptEventTopic string
// kafka producer
Producer sarama.SyncProducer
// Internal user query API
@@ -173,3 +175,31 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
return nil
}
+
+// InputReceiptEvent implements api.EDUServerInputAPI
+// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events)
+func (t *EDUServerInputAPI) InputReceiptEvent(
+ ctx context.Context,
+ request *api.InputReceiptEventRequest,
+ response *api.InputReceiptEventResponse,
+) error {
+ logrus.WithFields(logrus.Fields{}).Infof("Producing to topic '%s'", t.OutputReceiptEventTopic)
+ output := &api.OutputReceiptEvent{
+ UserID: request.InputReceiptEvent.UserID,
+ RoomID: request.InputReceiptEvent.RoomID,
+ EventID: request.InputReceiptEvent.EventID,
+ Type: request.InputReceiptEvent.Type,
+ Timestamp: request.InputReceiptEvent.Timestamp,
+ }
+ js, err := json.Marshal(output)
+ if err != nil {
+ return err
+ }
+ m := &sarama.ProducerMessage{
+ Topic: t.OutputReceiptEventTopic,
+ Key: sarama.StringEncoder(request.InputReceiptEvent.RoomID + ":" + request.InputReceiptEvent.UserID),
+ Value: sarama.ByteEncoder(js),
+ }
+ _, _, err = t.Producer.SendMessage(m)
+ return err
+}
diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go
index 7d0bc160..0690ed82 100644
--- a/eduserver/inthttp/client.go
+++ b/eduserver/inthttp/client.go
@@ -14,6 +14,7 @@ import (
const (
EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
+ EDUServerInputReceiptEventPath = "/eduserver/receipt"
)
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@@ -54,3 +55,16 @@ func (h *httpEDUServerInputAPI) InputSendToDeviceEvent(
apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+
+// InputSendToDeviceEvent implements EDUServerInputAPI
+func (h *httpEDUServerInputAPI) InputReceiptEvent(
+ ctx context.Context,
+ request *api.InputReceiptEventRequest,
+ response *api.InputReceiptEventResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "InputReceiptEventPath")
+ defer span.Finish()
+
+ apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go
index e374513a..a3494375 100644
--- a/eduserver/inthttp/server.go
+++ b/eduserver/inthttp/server.go
@@ -38,4 +38,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(EDUServerInputReceiptEventPath,
+ httputil.MakeInternalAPI("inputReceiptEvent", func(req *http.Request) util.JSONResponse {
+ var request api.InputReceiptEventRequest
+ var response api.InputReceiptEventResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := t.InputReceiptEvent(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
}