aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go74
-rw-r--r--src/github.com/matrix-org/dendrite/common/config/config.go14
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go361
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go53
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go105
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/queue/queue.go95
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go111
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go89
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/storage/storage.go126
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/types/types.go45
10 files changed, 1068 insertions, 5 deletions
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go
new file mode 100644
index 00000000..3c7e5c47
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go
@@ -0,0 +1,74 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "flag"
+ "net/http"
+ "os"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/federationsender/consumers"
+ "github.com/matrix-org/dendrite/federationsender/queue"
+ "github.com/matrix-org/dendrite/federationsender/storage"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/prometheus/client_golang/prometheus"
+
+ log "github.com/Sirupsen/logrus"
+)
+
+var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
+
+func main() {
+ common.SetupLogging(os.Getenv("LOG_DIR"))
+
+ flag.Parse()
+
+ if *configPath == "" {
+ log.Fatal("--config must be supplied")
+ }
+ cfg, err := config.Load(*configPath)
+ if err != nil {
+ log.Fatalf("Invalid config file: %s", err)
+ }
+
+ log.Info("config: ", cfg)
+
+ db, err := storage.NewDatabase(string(cfg.Database.FederationSender))
+ if err != nil {
+ log.Panicf("startup: failed to create federation sender database with data source %s : %s", cfg.Database.FederationSender, err)
+ }
+
+ federation := gomatrixserverlib.NewFederationClient(
+ cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
+ )
+
+ queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
+
+ consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db)
+ if err != nil {
+ log.WithError(err).Panicf("startup: failed to create room server consumer")
+ }
+ if err = consumer.Start(); err != nil {
+ log.WithError(err).Panicf("startup: failed to start room server consumer")
+ }
+
+ http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
+
+ if err := http.ListenAndServe(string(cfg.Listen.FederationSender), nil); err != nil {
+ panic(err)
+ }
+}
diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go
index a4977731..58980ca3 100644
--- a/src/github.com/matrix-org/dendrite/common/config/config.go
+++ b/src/github.com/matrix-org/dendrite/common/config/config.go
@@ -122,16 +122,20 @@ type Dendrite struct {
// The RoomServer database stores information about matrix rooms.
// It is only accessed by the RoomServer.
RoomServer DataSource `yaml:"room_server"`
+ // The FederationSender database stores information used by the FederationSender
+ // It is only accessed by the FederationSender.
+ FederationSender DataSource `yaml:"federation_sender"`
} `yaml:"database"`
// The internal addresses the components will listen on.
// These should not be exposed externally as they expose metrics and debugging APIs.
Listen struct {
- MediaAPI Address `yaml:"media_api"`
- ClientAPI Address `yaml:"client_api"`
- FederationAPI Address `yaml:"federation_api"`
- SyncAPI Address `yaml:"sync_api"`
- RoomServer Address `yaml:"room_server"`
+ MediaAPI Address `yaml:"media_api"`
+ ClientAPI Address `yaml:"client_api"`
+ FederationAPI Address `yaml:"federation_api"`
+ SyncAPI Address `yaml:"sync_api"`
+ RoomServer Address `yaml:"room_server"`
+ FederationSender Address `yaml:"federation_sender"`
} `yaml:"listen"`
}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go
new file mode 100644
index 00000000..9ac0e988
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go
@@ -0,0 +1,361 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ log "github.com/Sirupsen/logrus"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/federationsender/queue"
+ "github.com/matrix-org/dendrite/federationsender/storage"
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// OutputRoomEvent consumes events that originated in the room server.
+type OutputRoomEvent struct {
+ roomServerConsumer *common.ContinualConsumer
+ db *storage.Database
+ queues *queue.OutgoingQueues
+ query api.RoomserverQueryAPI
+}
+
+// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
+func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) {
+ kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
+ if err != nil {
+ return nil, err
+ }
+ roomServerURL := cfg.RoomServerURL()
+
+ consumer := common.ContinualConsumer{
+ Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+ s := &OutputRoomEvent{
+ roomServerConsumer: &consumer,
+ db: store,
+ queues: queues,
+ query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
+ }
+ consumer.ProcessMessage = s.onMessage
+
+ return s, nil
+}
+
+// Start consuming from room servers
+func (s *OutputRoomEvent) Start() error {
+ return s.roomServerConsumer.Start()
+}
+
+// onMessage is called when the federation server receives a new event from the room server output log.
+// It is unsafe to call this with messages for the same room in multiple gorountines
+// because updates it will likely fail with a types.EventIDMismatchError when it
+// realises that it cannot update the room state using the deltas.
+func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
+ // Parse out the event JSON
+ var output api.OutputRoomEvent
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return nil
+ }
+
+ ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false)
+ if err != nil {
+ log.WithError(err).Errorf("roomserver output log: event parse failure")
+ return nil
+ }
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "room_id": ev.RoomID(),
+ "send_as_server": output.SendAsServer,
+ }).Info("received event from roomserver")
+
+ if err = s.processMessage(output, ev); err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ "add": output.AddsStateEventIDs,
+ "del": output.RemovesStateEventIDs,
+ }).Panicf("roomserver output log: write event failure")
+ return nil
+ }
+
+ return nil
+}
+
+// processMessage updates the list of currently joined hosts in the room
+// and then sends the event to the hosts that were joined before the event.
+func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixserverlib.Event) error {
+ addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev)
+ if err != nil {
+ return err
+ }
+ addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents)
+ if err != nil {
+ return err
+ }
+ // Update our copy of the current state.
+ // We keep a copy of the current state because the state at each event is
+ // expressed as a delta against the current state.
+ // TODO: handle EventIDMismatchError and recover the current state by talking
+ // to the roomserver
+ oldJoinedHosts, err := s.db.UpdateRoom(
+ ev.RoomID(), ore.LastSentEventID, ev.EventID(),
+ addsJoinedHosts, ore.RemovesStateEventIDs,
+ )
+ if err != nil {
+ return err
+ }
+
+ if ore.SendAsServer == api.DoNotSendToOtherServers {
+ // Ignore event that we don't need to send anywhere.
+ return nil
+ }
+
+ // Work out which hosts were joined at the event itself.
+ joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, ev, oldJoinedHosts)
+ if err != nil {
+ return err
+ }
+
+ // Send the event.
+ if err = s.queues.SendEvent(
+ &ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
+ ); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// joinedHostsAtEvent works out a list of matrix servers that were joined to
+// the room at the event.
+// It is important to use the state at the event for sending messages because:
+// 1) We shouldn't send messages to servers that weren't in the room.
+// 2) If a server is kicked from the rooms it should still be told about the
+// kick event,
+// Usually the list can be calculated locally, but sometimes it will need fetch
+// events from the room server.
+// Returns an error if there was a problem talking to the room server.
+func (s *OutputRoomEvent) joinedHostsAtEvent(
+ ore api.OutputRoomEvent, ev gomatrixserverlib.Event, oldJoinedHosts []types.JoinedHost,
+) ([]gomatrixserverlib.ServerName, error) {
+ // Combine the delta into a single delta so that the adds and removes can
+ // cancel each other out. This should reduce the number of times we need
+ // to fetch a state event from the room server.
+ combinedAdds, combinedRemoves := combineDeltas(
+ ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
+ ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
+ )
+ combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev)
+ if err != nil {
+ return nil, err
+ }
+
+ combinedAddsJoinedHosts, err := joinedHostsFromEvents(combinedAddsEvents)
+ if err != nil {
+ return nil, err
+ }
+
+ removed := map[string]bool{}
+ for _, eventID := range combinedRemoves {
+ removed[eventID] = true
+ }
+
+ joined := map[gomatrixserverlib.ServerName]bool{}
+ for _, joinedHost := range oldJoinedHosts {
+ if removed[joinedHost.MemberEventID] {
+ // This m.room.member event is part of the current state of the
+ // room, but not part of the state at the event we are processing
+ // Therefore we can't use it to tell whether the server was in
+ // the room at the event.
+ continue
+ }
+ joined[joinedHost.ServerName] = true
+ }
+
+ for _, joinedHost := range combinedAddsJoinedHosts {
+ // This m.room.member event was part of the state of the room at the
+ // event, but isn't part of the current state of the room now.
+ joined[joinedHost.ServerName] = true
+ }
+
+ var result []gomatrixserverlib.ServerName
+ for serverName, include := range joined {
+ if include {
+ result = append(result, serverName)
+ }
+ }
+ return result, nil
+}
+
+// joinedHostsFromEvents turns a list of state events into a list of joined hosts.
+// This errors if one of the events was invalid.
+// It should be impossible for an invalid event to get this far in the pipeline.
+func joinedHostsFromEvents(evs []gomatrixserverlib.Event) ([]types.JoinedHost, error) {
+ var joinedHosts []types.JoinedHost
+ for _, ev := range evs {
+ if ev.Type() != "m.room.member" || ev.StateKey() == nil {
+ continue
+ }
+ var content struct {
+ Membership string `json:"membership"`
+ }
+ if err := json.Unmarshal(ev.Content(), &content); err != nil {
+ return nil, err
+ }
+ if content.Membership != "join" {
+ continue
+ }
+ serverName, err := domainFromID(*ev.StateKey())
+ if err != nil {
+ return nil, err
+ }
+ joinedHosts = append(joinedHosts, types.JoinedHost{
+ MemberEventID: ev.EventID(), ServerName: serverName,
+ })
+ }
+ return joinedHosts, nil
+}
+
+// combineDeltas combines two deltas into a single delta.
+// Assumes that the order of operations is add(1), remove(1), add(2), remove(2).
+// Removes duplicate entries and redundant operations from each delta.
+func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []string) {
+ addSet := map[string]bool{}
+ removeSet := map[string]bool{}
+
+ // combine processes each unique value in a list.
+ // If the value is in the removeFrom set then it is removed from that set.
+ // Otherwise it is added to the addTo set.
+ combine := func(values []string, removeFrom, addTo map[string]bool) {
+ processed := map[string]bool{}
+ for _, value := range values {
+ if processed[value] {
+ continue
+ }
+ processed[value] = true
+ if removeFrom[value] {
+ delete(removeFrom, value)
+ } else {
+ addTo[value] = true
+ }
+ }
+ }
+
+ combine(adds1, nil, addSet)
+ combine(removes1, addSet, removeSet)
+ combine(adds2, removeSet, addSet)
+ combine(removes2, addSet, removeSet)
+
+ for value := range addSet {
+ adds = append(adds, value)
+ }
+ for value := range removeSet {
+ removes = append(removes, value)
+ }
+ return
+}
+
+// lookupStateEvents looks up the state events that are added by a new event.
+func (s *OutputRoomEvent) lookupStateEvents(
+ addsStateEventIDs []string, event gomatrixserverlib.Event,
+) ([]gomatrixserverlib.Event, error) {
+ // Fast path if there aren't any new state events.
+ if len(addsStateEventIDs) == 0 {
+ return nil, nil
+ }
+
+ // Fast path if the only state event added is the event itself.
+ if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
+ return []gomatrixserverlib.Event{event}, nil
+ }
+
+ missing := addsStateEventIDs
+ var result []gomatrixserverlib.Event
+
+ // Check if event itself is being added.
+ for _, eventID := range missing {
+ if eventID == event.EventID() {
+ result = append(result, event)
+ break
+ }
+ }
+ missing = missingEventsFrom(result, addsStateEventIDs)
+
+ if len(missing) == 0 {
+ return result, nil
+ }
+
+ // At this point the missing events are neither the event itself nor are
+ // they present in our local database. Our only option is to fetch them
+ // from the roomserver using the query API.
+ eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
+ var eventResp api.QueryEventsByIDResponse
+ if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil {
+ return nil, err
+ }
+
+ result = append(result, eventResp.Events...)
+ missing = missingEventsFrom(result, addsStateEventIDs)
+
+ if len(missing) != 0 {
+ return nil, fmt.Errorf(
+ "missing %d state events IDs at event %q", len(missing), event.EventID(),
+ )
+ }
+
+ return result, nil
+}
+
+func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
+ have := map[string]bool{}
+ for _, event := range events {
+ have[event.EventID()] = true
+ }
+ var missing []string
+ for _, eventID := range required {
+ if !have[eventID] {
+ missing = append(missing, eventID)
+ }
+ }
+ return missing
+}
+
+// domainFromID returns everything after the first ":" character to extract
+// the domain part of a matrix ID.
+// TODO: duplicated from gomatrixserverlib.
+func domainFromID(id string) (gomatrixserverlib.ServerName, error) {
+ // IDs have the format: SIGIL LOCALPART ":" DOMAIN
+ // Split on the first ":" character since the domain can contain ":"
+ // characters.
+ parts := strings.SplitN(id, ":", 2)
+ if len(parts) != 2 {
+ // The ID must have a ":" character.
+ return "", fmt.Errorf("invalid ID: %q", id)
+ }
+ // Return everything after the first ":" character.
+ return gomatrixserverlib.ServerName(parts[1]), nil
+}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go
new file mode 100644
index 00000000..bb659b9c
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go
@@ -0,0 +1,53 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "testing"
+)
+
+func TestCombineNoOp(t *testing.T) {
+ inputAdd1 := []string{"a", "b", "c"}
+ inputDel1 := []string{"a", "b", "d"}
+ inputAdd2 := []string{"a", "d", "e"}
+ inputDel2 := []string{"a", "c", "e", "e"}
+
+ gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2)
+
+ if len(gotAdd) != 0 {
+ t.Errorf("wanted combined adds to be an empty list, got %#v", gotAdd)
+ }
+
+ if len(gotDel) != 0 {
+ t.Errorf("wanted combined removes to be an empty list, got %#v", gotDel)
+ }
+}
+
+func TestCombineDedup(t *testing.T) {
+ inputAdd1 := []string{"a", "a"}
+ inputDel1 := []string{"b", "b"}
+ inputAdd2 := []string{"a", "a"}
+ inputDel2 := []string{"b", "b"}
+
+ gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2)
+
+ if len(gotAdd) != 1 || gotAdd[0] != "a" {
+ t.Errorf("wanted combined adds to be %#v, got %#v", []string{"a"}, gotAdd)
+ }
+
+ if len(gotDel) != 1 || gotDel[0] != "b" {
+ t.Errorf("wanted combined removes to be %#v, got %#v", []string{"b"}, gotDel)
+ }
+}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go
new file mode 100644
index 00000000..bb274b08
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go
@@ -0,0 +1,105 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package queue
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// destinationQueue is a queue of events for a single destination.
+// It is responsible for sending the events to the destination and
+// ensures that only one request is in flight to a given destination
+// at a time.
+type destinationQueue struct {
+ client *gomatrixserverlib.FederationClient
+ origin gomatrixserverlib.ServerName
+ destination gomatrixserverlib.ServerName
+ // The running mutex protects running, sentCounter, lastTransactionIDs and
+ // pendingEvents.
+ runningMutex sync.Mutex
+ running bool
+ sentCounter int
+ lastTransactionIDs []gomatrixserverlib.TransactionID
+ pendingEvents []*gomatrixserverlib.Event
+}
+
+// Send event adds the event to the pending queue for the destination.
+// If the queue is empty then it starts a background goroutine to
+// start sending events to that destination.
+func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
+ oq.runningMutex.Lock()
+ defer oq.runningMutex.Unlock()
+ oq.pendingEvents = append(oq.pendingEvents, ev)
+ if !oq.running {
+ oq.running = true
+ go oq.backgroundSend()
+ }
+}
+
+func (oq *destinationQueue) backgroundSend() {
+ for {
+ t := oq.next()
+ if t == nil {
+ // If the queue is empty then stop processing for this destination.
+ // TODO: Remove this destination from the queue map.
+ return
+ }
+
+ // TODO: handle retries.
+ // TODO: blacklist uncooperative servers.
+
+ _, err := oq.client.SendTransaction(*t)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "destination": oq.destination,
+ log.ErrorKey: err,
+ }).Info("problem sending transaction")
+ }
+ }
+}
+
+// next creates a new transaction from the pending event queue
+// and flushes the queue.
+// Returns nil if the queue was empty.
+func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
+ oq.runningMutex.Lock()
+ defer oq.runningMutex.Unlock()
+ if len(oq.pendingEvents) == 0 {
+ oq.running = false
+ return nil
+ }
+ var t gomatrixserverlib.Transaction
+ now := gomatrixserverlib.AsTimestamp(time.Now())
+ t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter))
+ t.Origin = oq.origin
+ t.Destination = oq.destination
+ t.OriginServerTS = now
+ t.PreviousIDs = oq.lastTransactionIDs
+ if t.PreviousIDs == nil {
+ t.PreviousIDs = []gomatrixserverlib.TransactionID{}
+ }
+ oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
+ for _, pdu := range oq.pendingEvents {
+ t.PDUs = append(t.PDUs, *pdu)
+ }
+ oq.pendingEvents = nil
+ oq.sentCounter += len(t.PDUs)
+ return &t
+}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go
new file mode 100644
index 00000000..79f019fd
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go
@@ -0,0 +1,95 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package queue
+
+import (
+ "fmt"
+ "sync"
+
+ log "github.com/Sirupsen/logrus"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// OutgoingQueues is a collection of queues for sending transactions to other
+// matrix servers
+type OutgoingQueues struct {
+ origin gomatrixserverlib.ServerName
+ client *gomatrixserverlib.FederationClient
+ // The queuesMutex protects queues
+ queuesMutex sync.Mutex
+ queues map[gomatrixserverlib.ServerName]*destinationQueue
+}
+
+// NewOutgoingQueues makes a new OutgoingQueues
+func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues {
+ return &OutgoingQueues{
+ origin: origin,
+ client: client,
+ queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
+ }
+}
+
+// SendEvent sends an event to the destinations
+func (oqs *OutgoingQueues) SendEvent(
+ ev *gomatrixserverlib.Event, origin gomatrixserverlib.ServerName,
+ destinations []gomatrixserverlib.ServerName,
+) error {
+ if origin != oqs.origin {
+ // TODO: Support virtual hosting by allowing us to send events using
+ // different origin server names.
+ // For now assume we are always asked to send as the single server configured
+ // in the dendrite config.
+ return fmt.Errorf(
+ "sendevent: unexpected server to send as: got %q expected %q",
+ origin, oqs.origin,
+ )
+ }
+
+ // Remove our own server from the list of destinations.
+ destinations = filterDestinations(oqs.origin, destinations)
+
+ log.WithFields(log.Fields{
+ "destinations": destinations, "event": ev.EventID(),
+ }).Info("Sending event")
+
+ oqs.queuesMutex.Lock()
+ defer oqs.queuesMutex.Unlock()
+ for _, destination := range destinations {
+ oq := oqs.queues[destination]
+ if oq == nil {
+ oq = &destinationQueue{
+ origin: oqs.origin,
+ destination: destination,
+ client: oqs.client,
+ }
+ oqs.queues[destination] = oq
+ }
+ oq.sendEvent(ev)
+ }
+ return nil
+}
+
+// filterDestinations removes our own server from the list of destinations.
+// Otherwise we could end up trying to talk to ourselves.
+func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName {
+ var result []gomatrixserverlib.ServerName
+ for _, destination := range destinations {
+ if destination == origin {
+ continue
+ }
+ result = append(result, destination)
+ }
+ return result
+}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go
new file mode 100644
index 00000000..1b5c65dd
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go
@@ -0,0 +1,111 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "database/sql"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const joinedHostsSchema = `
+-- The joined_hosts table stores a list of m.room.member event ids in the
+-- current state for each room where the membership is "join".
+-- There will be an entry for every user that is joined to the room.
+CREATE TABLE IF NOT EXISTS joined_hosts (
+ -- The string ID of the room.
+ room_id TEXT NOT NULL,
+ -- The event ID of the m.room.member join event.
+ event_id TEXT NOT NULL,
+ -- The domain part of the user ID the m.room.member event is for.
+ server_name TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS joined_hosts_event_id_idx
+ ON joined_hosts (event_id);
+
+CREATE INDEX IF NOT EXISTS joined_hosts_room_id_idx
+ ON joined_hosts (room_id)
+`
+
+const insertJoinedHostsSQL = "" +
+ "INSERT INTO joined_hosts (room_id, event_id, server_name)" +
+ " VALUES ($1, $2, $3)"
+
+const deleteJoinedHostsSQL = "" +
+ "DELETE FROM joined_hosts WHERE event_id = ANY($1)"
+
+const selectJoinedHostsSQL = "" +
+ "SELECT event_id, server_name FROM joined_hosts" +
+ " WHERE room_id = $1"
+
+type joinedHostsStatements struct {
+ insertJoinedHostsStmt *sql.Stmt
+ deleteJoinedHostsStmt *sql.Stmt
+ selectJoinedHostsStmt *sql.Stmt
+}
+
+func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(joinedHostsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertJoinedHostsStmt, err = db.Prepare(insertJoinedHostsSQL); err != nil {
+ return
+ }
+ if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil {
+ return
+ }
+ if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *joinedHostsStatements) insertJoinedHosts(
+ txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName,
+) error {
+ _, err := txn.Stmt(s.insertJoinedHostsStmt).Exec(roomID, eventID, serverName)
+ return err
+}
+
+func (s *joinedHostsStatements) deleteJoinedHosts(txn *sql.Tx, eventIDs []string) error {
+ _, err := txn.Stmt(s.deleteJoinedHostsStmt).Exec(pq.StringArray(eventIDs))
+ return err
+}
+
+func (s *joinedHostsStatements) selectJoinedHosts(txn *sql.Tx, roomID string,
+) ([]types.JoinedHost, error) {
+ rows, err := txn.Stmt(s.selectJoinedHostsStmt).Query(roomID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ var result []types.JoinedHost
+ for rows.Next() {
+ var eventID, serverName string
+ if err = rows.Scan(&eventID, &serverName); err != nil {
+ return nil, err
+ }
+ result = append(result, types.JoinedHost{
+ MemberEventID: eventID,
+ ServerName: gomatrixserverlib.ServerName(serverName),
+ })
+ }
+ return result, nil
+}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go
new file mode 100644
index 00000000..e11ed421
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go
@@ -0,0 +1,89 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "database/sql"
+)
+
+const roomSchema = `
+CREATE TABLE IF NOT EXISTS rooms (
+ -- The string ID of the room
+ room_id TEXT PRIMARY KEY,
+ -- The most recent event state by the room server.
+ -- We can use this to tell if our view of the room state has become
+ -- desynchronised.
+ last_event_id TEXT NOT NULL
+);`
+
+const insertRoomSQL = "" +
+ "INSERT INTO rooms (room_id, last_event_id) VALUES ($1, '')" +
+ " ON CONFLICT DO NOTHING"
+
+const selectRoomForUpdateSQL = "" +
+ "SELECT last_event_id FROM rooms WHERE room_id = $1 FOR UPDATE"
+
+const updateRoomSQL = "" +
+ "UPDATE rooms SET last_event_id = $2 WHERE room_id = $1"
+
+type roomStatements struct {
+ insertRoomStmt *sql.Stmt
+ selectRoomForUpdateStmt *sql.Stmt
+ updateRoomStmt *sql.Stmt
+}
+
+func (s *roomStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(roomSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil {
+ return
+ }
+ if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil {
+ return
+ }
+ if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil {
+ return
+ }
+ return
+}
+
+// insertRoom inserts the room if it didn't already exist.
+// If the room didn't exist then last_event_id is set to the empty string.
+func (s *roomStatements) insertRoom(txn *sql.Tx, roomID string) error {
+ _, err := txn.Stmt(s.insertRoomStmt).Exec(roomID)
+ return err
+}
+
+// selectRoomForUpdate locks the row for the room and returns the last_event_id.
+// The row must already exist in the table. Callers can ensure that the row
+// exists by calling insertRoom first.
+func (s *roomStatements) selectRoomForUpdate(txn *sql.Tx, roomID string) (string, error) {
+ var lastEventID string
+ err := txn.Stmt(s.selectRoomForUpdateStmt).QueryRow(roomID).Scan(&lastEventID)
+ if err != nil {
+ return "", err
+ }
+ return lastEventID, nil
+}
+
+// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
+// have already been called earlier within the transaction.
+func (s *roomStatements) updateRoom(txn *sql.Tx, roomID, lastEventID string) error {
+ _, err := txn.Stmt(s.updateRoomStmt).Exec(roomID, lastEventID)
+ return err
+}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go
new file mode 100644
index 00000000..2f98093e
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go
@@ -0,0 +1,126 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/federationsender/types"
+)
+
+// Database stores information needed by the federation sender
+type Database struct {
+ joinedHostsStatements
+ roomStatements
+ common.PartitionOffsetStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ var err error
+
+ if err = d.joinedHostsStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ if err = d.roomStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ if err = d.PartitionOffsetStatements.Prepare(d.db); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// PartitionOffsets implements common.PartitionStorer
+func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
+ return d.SelectPartitionOffsets(topic)
+}
+
+// SetPartitionOffset implements common.PartitionStorer
+func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
+ return d.UpsertPartitionOffset(topic, partition, offset)
+}
+
+// UpdateRoom updates the joined hosts for a room and returns what the joined
+// hosts were before the update.
+func (d *Database) UpdateRoom(
+ roomID, oldEventID, newEventID string,
+ addHosts []types.JoinedHost,
+ removeHosts []string,
+) (joinedHosts []types.JoinedHost, err error) {
+ err = runTransaction(d.db, func(txn *sql.Tx) error {
+ if err = d.insertRoom(txn, roomID); err != nil {
+ return err
+ }
+ lastSentEventID, err := d.selectRoomForUpdate(txn, roomID)
+ if err != nil {
+ return err
+ }
+ if lastSentEventID != oldEventID {
+ return types.EventIDMismatchError{lastSentEventID, oldEventID}
+ }
+ joinedHosts, err = d.selectJoinedHosts(txn, roomID)
+ if err != nil {
+ return err
+ }
+ for _, add := range addHosts {
+ err = d.insertJoinedHosts(txn, roomID, add.MemberEventID, add.ServerName)
+ if err != nil {
+ return err
+ }
+ }
+ if err = d.deleteJoinedHosts(txn, removeHosts); err != nil {
+ return err
+ }
+ return d.updateRoom(txn, roomID, newEventID)
+ })
+ return
+}
+
+func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
+ txn, err := db.Begin()
+ if err != nil {
+ return
+ }
+ defer func() {
+ if r := recover(); r != nil {
+ txn.Rollback()
+ panic(r)
+ } else if err != nil {
+ txn.Rollback()
+ } else {
+ err = txn.Commit()
+ }
+ }()
+ err = fn(txn)
+ return
+}
diff --git a/src/github.com/matrix-org/dendrite/federationsender/types/types.go b/src/github.com/matrix-org/dendrite/federationsender/types/types.go
new file mode 100644
index 00000000..05ba92f7
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/federationsender/types/types.go
@@ -0,0 +1,45 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package types
+
+import (
+ "fmt"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// A JoinedHost is a server that is joined to a matrix room.
+type JoinedHost struct {
+ // The MemberEventID of a m.room.member join event.
+ MemberEventID string
+ // The domain part of the state key of the m.room.member join event
+ ServerName gomatrixserverlib.ServerName
+}
+
+// A EventIDMismatchError indicates that we have got out of sync with the
+// room server.
+type EventIDMismatchError struct {
+ // The event ID we have stored in our local database.
+ DatabaseID string
+ // The event ID received from the room server.
+ RoomServerID string
+}
+
+func (e EventIDMismatchError) Error() string {
+ return fmt.Sprintf(
+ "mismatched last sent event ID: had %q in database got %q from room server",
+ e.DatabaseID, e.RoomServerID,
+ )
+}