aboutsummaryrefslogtreecommitdiff
path: root/eduserver/input/input.go
diff options
context:
space:
mode:
Diffstat (limited to 'eduserver/input/input.go')
-rw-r--r--eduserver/input/input.go30
1 files changed, 30 insertions, 0 deletions
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
index e3d2c55e..c54fb9de 100644
--- a/eduserver/input/input.go
+++ b/eduserver/input/input.go
@@ -37,6 +37,8 @@ type EDUServerInputAPI struct {
OutputTypingEventTopic string
// The kafka topic to output new send to device events to.
OutputSendToDeviceEventTopic string
+ // The kafka topic to output new receipt events to
+ OutputReceiptEventTopic string
// kafka producer
Producer sarama.SyncProducer
// Internal user query API
@@ -173,3 +175,31 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
return nil
}
+
+// InputReceiptEvent implements api.EDUServerInputAPI
+// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events)
+func (t *EDUServerInputAPI) InputReceiptEvent(
+ ctx context.Context,
+ request *api.InputReceiptEventRequest,
+ response *api.InputReceiptEventResponse,
+) error {
+ logrus.WithFields(logrus.Fields{}).Infof("Producing to topic '%s'", t.OutputReceiptEventTopic)
+ output := &api.OutputReceiptEvent{
+ UserID: request.InputReceiptEvent.UserID,
+ RoomID: request.InputReceiptEvent.RoomID,
+ EventID: request.InputReceiptEvent.EventID,
+ Type: request.InputReceiptEvent.Type,
+ Timestamp: request.InputReceiptEvent.Timestamp,
+ }
+ js, err := json.Marshal(output)
+ if err != nil {
+ return err
+ }
+ m := &sarama.ProducerMessage{
+ Topic: t.OutputReceiptEventTopic,
+ Key: sarama.StringEncoder(request.InputReceiptEvent.RoomID + ":" + request.InputReceiptEvent.UserID),
+ Value: sarama.ByteEncoder(js),
+ }
+ _, _, err = t.Producer.SendMessage(m)
+ return err
+}