diff options
Diffstat (limited to 'eduserver/input/input.go')
-rw-r--r-- | eduserver/input/input.go | 30 |
1 files changed, 30 insertions, 0 deletions
diff --git a/eduserver/input/input.go b/eduserver/input/input.go index e3d2c55e..c54fb9de 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -37,6 +37,8 @@ type EDUServerInputAPI struct { OutputTypingEventTopic string // The kafka topic to output new send to device events to. OutputSendToDeviceEventTopic string + // The kafka topic to output new receipt events to + OutputReceiptEventTopic string // kafka producer Producer sarama.SyncProducer // Internal user query API @@ -173,3 +175,31 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e return nil } + +// InputReceiptEvent implements api.EDUServerInputAPI +// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events) +func (t *EDUServerInputAPI) InputReceiptEvent( + ctx context.Context, + request *api.InputReceiptEventRequest, + response *api.InputReceiptEventResponse, +) error { + logrus.WithFields(logrus.Fields{}).Infof("Producing to topic '%s'", t.OutputReceiptEventTopic) + output := &api.OutputReceiptEvent{ + UserID: request.InputReceiptEvent.UserID, + RoomID: request.InputReceiptEvent.RoomID, + EventID: request.InputReceiptEvent.EventID, + Type: request.InputReceiptEvent.Type, + Timestamp: request.InputReceiptEvent.Timestamp, + } + js, err := json.Marshal(output) + if err != nil { + return err + } + m := &sarama.ProducerMessage{ + Topic: t.OutputReceiptEventTopic, + Key: sarama.StringEncoder(request.InputReceiptEvent.RoomID + ":" + request.InputReceiptEvent.UserID), + Value: sarama.ByteEncoder(js), + } + _, _, err = t.Producer.SendMessage(m) + return err +} |