diff options
author | ruben <code@rbn.im> | 2019-05-21 22:56:55 +0200 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-05-21 21:56:55 +0100 |
commit | 74827428bd3e11faab65f12204449c1b9469b0ae (patch) | |
tree | 0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /clientapi/producers | |
parent | 4d588f7008afe5600219ac0930c2eee2de5c447b (diff) |
use go module for dependencies (#594)
Diffstat (limited to 'clientapi/producers')
-rw-r--r-- | clientapi/producers/roomserver.go | 112 | ||||
-rw-r--r-- | clientapi/producers/syncapi.go | 50 | ||||
-rw-r--r-- | clientapi/producers/typingserver.go | 54 | ||||
-rw-r--r-- | clientapi/producers/userupdate.go | 62 |
4 files changed, 278 insertions, 0 deletions
diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go new file mode 100644 index 00000000..e50561a7 --- /dev/null +++ b/clientapi/producers/roomserver.go @@ -0,0 +1,112 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "context" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +// RoomserverProducer produces events for the roomserver to consume. +type RoomserverProducer struct { + InputAPI api.RoomserverInputAPI +} + +// NewRoomserverProducer creates a new RoomserverProducer +func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer { + return &RoomserverProducer{ + InputAPI: inputAPI, + } +} + +// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. +func (c *RoomserverProducer) SendEvents( + ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName, + txnID *api.TransactionID, +) (string, error) { + ires := make([]api.InputRoomEvent, len(events)) + for i, event := range events { + ires[i] = api.InputRoomEvent{ + Kind: api.KindNew, + Event: event, + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: string(sendAsServer), + TransactionID: txnID, + } + } + return c.SendInputRoomEvents(ctx, ires) +} + +// SendEventWithState writes an event with KindNew to the roomserver input log +// with the state at the event as KindOutlier before it. +func (c *RoomserverProducer) SendEventWithState( + ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event, +) error { + outliers, err := state.Events() + if err != nil { + return err + } + + ires := make([]api.InputRoomEvent, len(outliers)+1) + for i, outlier := range outliers { + ires[i] = api.InputRoomEvent{ + Kind: api.KindOutlier, + Event: outlier, + AuthEventIDs: outlier.AuthEventIDs(), + } + } + + stateEventIDs := make([]string, len(state.StateEvents)) + for i := range state.StateEvents { + stateEventIDs[i] = state.StateEvents[i].EventID() + } + + ires[len(outliers)] = api.InputRoomEvent{ + Kind: api.KindNew, + Event: event, + AuthEventIDs: event.AuthEventIDs(), + HasState: true, + StateEventIDs: stateEventIDs, + } + + _, err = c.SendInputRoomEvents(ctx, ires) + return err +} + +// SendInputRoomEvents writes the given input room events to the roomserver input API. +func (c *RoomserverProducer) SendInputRoomEvents( + ctx context.Context, ires []api.InputRoomEvent, +) (eventID string, err error) { + request := api.InputRoomEventsRequest{InputRoomEvents: ires} + var response api.InputRoomEventsResponse + err = c.InputAPI.InputRoomEvents(ctx, &request, &response) + eventID = response.EventID + return +} + +// SendInvite writes the invite event to the roomserver input API. +// This should only be needed for invite events that occur outside of a known room. +// If we are in the room then the event should be sent using the SendEvents method. +func (c *RoomserverProducer) SendInvite( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) error { + request := api.InputRoomEventsRequest{ + InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}}, + } + var response api.InputRoomEventsResponse + return c.InputAPI.InputRoomEvents(ctx, &request, &response) +} diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go new file mode 100644 index 00000000..6bfcd51a --- /dev/null +++ b/clientapi/producers/syncapi.go @@ -0,0 +1,50 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/common" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// SyncAPIProducer produces events for the sync API server to consume +type SyncAPIProducer struct { + Topic string + Producer sarama.SyncProducer +} + +// SendData sends account data to the sync API server +func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { + var m sarama.ProducerMessage + + data := common.AccountData{ + RoomID: roomID, + Type: dataType, + } + value, err := json.Marshal(data) + if err != nil { + return err + } + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(userID) + m.Value = sarama.ByteEncoder(value) + + _, _, err = p.Producer.SendMessage(&m) + return err +} diff --git a/clientapi/producers/typingserver.go b/clientapi/producers/typingserver.go new file mode 100644 index 00000000..f4d0bcba --- /dev/null +++ b/clientapi/producers/typingserver.go @@ -0,0 +1,54 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "context" + "time" + + "github.com/matrix-org/dendrite/typingserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +// TypingServerProducer produces events for the typing server to consume +type TypingServerProducer struct { + InputAPI api.TypingServerInputAPI +} + +// NewTypingServerProducer creates a new TypingServerProducer +func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer { + return &TypingServerProducer{ + InputAPI: inputAPI, + } +} + +// Send typing event to typing server +func (p *TypingServerProducer) Send( + ctx context.Context, userID, roomID string, + typing bool, timeout int64, +) error { + requestData := api.InputTypingEvent{ + UserID: userID, + RoomID: roomID, + Typing: typing, + Timeout: timeout, + OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), + } + + var response api.InputTypingEventResponse + err := p.InputAPI.InputTypingEvent( + ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response, + ) + + return err +} diff --git a/clientapi/producers/userupdate.go b/clientapi/producers/userupdate.go new file mode 100644 index 00000000..2a5dfc70 --- /dev/null +++ b/clientapi/producers/userupdate.go @@ -0,0 +1,62 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// UserUpdateProducer produces events related to user updates. +type UserUpdateProducer struct { + Topic string + Producer sarama.SyncProducer +} + +// TODO: Move this struct to `common` so the components that consume the topic +// can use it when parsing incoming messages +type profileUpdate struct { + Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`) + OldValue string `json:"old_value"` // The attribute's value before the update + NewValue string `json:"new_value"` // The attribute's value after the update +} + +// SendUpdate sends an update using kafka to notify the roomserver of the +// profile update. Returns an error if the update failed to send. +func (p *UserUpdateProducer) SendUpdate( + userID string, updatedAttribute string, oldValue string, newValue string, +) error { + var update profileUpdate + var m sarama.ProducerMessage + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(userID) + + update = profileUpdate{ + Updated: updatedAttribute, + OldValue: oldValue, + NewValue: newValue, + } + + value, err := json.Marshal(update) + if err != nil { + return err + } + m.Value = sarama.ByteEncoder(value) + + _, _, err = p.Producer.SendMessage(&m) + return err +} |