aboutsummaryrefslogtreecommitdiff
path: root/clientapi/producers
diff options
context:
space:
mode:
authorruben <code@rbn.im>2019-05-21 22:56:55 +0200
committerBrendan Abolivier <babolivier@matrix.org>2019-05-21 21:56:55 +0100
commit74827428bd3e11faab65f12204449c1b9469b0ae (patch)
tree0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /clientapi/producers
parent4d588f7008afe5600219ac0930c2eee2de5c447b (diff)
use go module for dependencies (#594)
Diffstat (limited to 'clientapi/producers')
-rw-r--r--clientapi/producers/roomserver.go112
-rw-r--r--clientapi/producers/syncapi.go50
-rw-r--r--clientapi/producers/typingserver.go54
-rw-r--r--clientapi/producers/userupdate.go62
4 files changed, 278 insertions, 0 deletions
diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go
new file mode 100644
index 00000000..e50561a7
--- /dev/null
+++ b/clientapi/producers/roomserver.go
@@ -0,0 +1,112 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// RoomserverProducer produces events for the roomserver to consume.
+type RoomserverProducer struct {
+ InputAPI api.RoomserverInputAPI
+}
+
+// NewRoomserverProducer creates a new RoomserverProducer
+func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer {
+ return &RoomserverProducer{
+ InputAPI: inputAPI,
+ }
+}
+
+// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
+func (c *RoomserverProducer) SendEvents(
+ ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName,
+ txnID *api.TransactionID,
+) (string, error) {
+ ires := make([]api.InputRoomEvent, len(events))
+ for i, event := range events {
+ ires[i] = api.InputRoomEvent{
+ Kind: api.KindNew,
+ Event: event,
+ AuthEventIDs: event.AuthEventIDs(),
+ SendAsServer: string(sendAsServer),
+ TransactionID: txnID,
+ }
+ }
+ return c.SendInputRoomEvents(ctx, ires)
+}
+
+// SendEventWithState writes an event with KindNew to the roomserver input log
+// with the state at the event as KindOutlier before it.
+func (c *RoomserverProducer) SendEventWithState(
+ ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event,
+) error {
+ outliers, err := state.Events()
+ if err != nil {
+ return err
+ }
+
+ ires := make([]api.InputRoomEvent, len(outliers)+1)
+ for i, outlier := range outliers {
+ ires[i] = api.InputRoomEvent{
+ Kind: api.KindOutlier,
+ Event: outlier,
+ AuthEventIDs: outlier.AuthEventIDs(),
+ }
+ }
+
+ stateEventIDs := make([]string, len(state.StateEvents))
+ for i := range state.StateEvents {
+ stateEventIDs[i] = state.StateEvents[i].EventID()
+ }
+
+ ires[len(outliers)] = api.InputRoomEvent{
+ Kind: api.KindNew,
+ Event: event,
+ AuthEventIDs: event.AuthEventIDs(),
+ HasState: true,
+ StateEventIDs: stateEventIDs,
+ }
+
+ _, err = c.SendInputRoomEvents(ctx, ires)
+ return err
+}
+
+// SendInputRoomEvents writes the given input room events to the roomserver input API.
+func (c *RoomserverProducer) SendInputRoomEvents(
+ ctx context.Context, ires []api.InputRoomEvent,
+) (eventID string, err error) {
+ request := api.InputRoomEventsRequest{InputRoomEvents: ires}
+ var response api.InputRoomEventsResponse
+ err = c.InputAPI.InputRoomEvents(ctx, &request, &response)
+ eventID = response.EventID
+ return
+}
+
+// SendInvite writes the invite event to the roomserver input API.
+// This should only be needed for invite events that occur outside of a known room.
+// If we are in the room then the event should be sent using the SendEvents method.
+func (c *RoomserverProducer) SendInvite(
+ ctx context.Context, inviteEvent gomatrixserverlib.Event,
+) error {
+ request := api.InputRoomEventsRequest{
+ InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}},
+ }
+ var response api.InputRoomEventsResponse
+ return c.InputAPI.InputRoomEvents(ctx, &request, &response)
+}
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
new file mode 100644
index 00000000..6bfcd51a
--- /dev/null
+++ b/clientapi/producers/syncapi.go
@@ -0,0 +1,50 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/common"
+
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// SyncAPIProducer produces events for the sync API server to consume
+type SyncAPIProducer struct {
+ Topic string
+ Producer sarama.SyncProducer
+}
+
+// SendData sends account data to the sync API server
+func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
+ var m sarama.ProducerMessage
+
+ data := common.AccountData{
+ RoomID: roomID,
+ Type: dataType,
+ }
+ value, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+
+ m.Topic = string(p.Topic)
+ m.Key = sarama.StringEncoder(userID)
+ m.Value = sarama.ByteEncoder(value)
+
+ _, _, err = p.Producer.SendMessage(&m)
+ return err
+}
diff --git a/clientapi/producers/typingserver.go b/clientapi/producers/typingserver.go
new file mode 100644
index 00000000..f4d0bcba
--- /dev/null
+++ b/clientapi/producers/typingserver.go
@@ -0,0 +1,54 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "context"
+ "time"
+
+ "github.com/matrix-org/dendrite/typingserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// TypingServerProducer produces events for the typing server to consume
+type TypingServerProducer struct {
+ InputAPI api.TypingServerInputAPI
+}
+
+// NewTypingServerProducer creates a new TypingServerProducer
+func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer {
+ return &TypingServerProducer{
+ InputAPI: inputAPI,
+ }
+}
+
+// Send typing event to typing server
+func (p *TypingServerProducer) Send(
+ ctx context.Context, userID, roomID string,
+ typing bool, timeout int64,
+) error {
+ requestData := api.InputTypingEvent{
+ UserID: userID,
+ RoomID: roomID,
+ Typing: typing,
+ Timeout: timeout,
+ OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
+ }
+
+ var response api.InputTypingEventResponse
+ err := p.InputAPI.InputTypingEvent(
+ ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response,
+ )
+
+ return err
+}
diff --git a/clientapi/producers/userupdate.go b/clientapi/producers/userupdate.go
new file mode 100644
index 00000000..2a5dfc70
--- /dev/null
+++ b/clientapi/producers/userupdate.go
@@ -0,0 +1,62 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// UserUpdateProducer produces events related to user updates.
+type UserUpdateProducer struct {
+ Topic string
+ Producer sarama.SyncProducer
+}
+
+// TODO: Move this struct to `common` so the components that consume the topic
+// can use it when parsing incoming messages
+type profileUpdate struct {
+ Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`)
+ OldValue string `json:"old_value"` // The attribute's value before the update
+ NewValue string `json:"new_value"` // The attribute's value after the update
+}
+
+// SendUpdate sends an update using kafka to notify the roomserver of the
+// profile update. Returns an error if the update failed to send.
+func (p *UserUpdateProducer) SendUpdate(
+ userID string, updatedAttribute string, oldValue string, newValue string,
+) error {
+ var update profileUpdate
+ var m sarama.ProducerMessage
+
+ m.Topic = string(p.Topic)
+ m.Key = sarama.StringEncoder(userID)
+
+ update = profileUpdate{
+ Updated: updatedAttribute,
+ OldValue: oldValue,
+ NewValue: newValue,
+ }
+
+ value, err := json.Marshal(update)
+ if err != nil {
+ return err
+ }
+ m.Value = sarama.ByteEncoder(value)
+
+ _, _, err = p.Producer.SendMessage(&m)
+ return err
+}