aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore28
-rwxr-xr-xhooks/install.sh5
-rwxr-xr-xhooks/pre-commit9
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/api/input.go35
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/input/consumer.go103
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go44
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/storage/sql.go70
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/storage/storage.go37
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/types/types.go10
9 files changed, 341 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..a11dc931
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,28 @@
+.*.swp
+
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+bin
+pkg
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+*.test
+*.prof
diff --git a/hooks/install.sh b/hooks/install.sh
new file mode 100755
index 00000000..f8aa331f
--- /dev/null
+++ b/hooks/install.sh
@@ -0,0 +1,5 @@
+#! /bin/bash
+
+DOT_GIT="$(dirname $0)/../.git"
+
+ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit" \ No newline at end of file
diff --git a/hooks/pre-commit b/hooks/pre-commit
new file mode 100755
index 00000000..d9ffbfba
--- /dev/null
+++ b/hooks/pre-commit
@@ -0,0 +1,9 @@
+#! /bin/bash
+
+set -eu
+
+golint src/...
+go fmt ./src/...
+go tool vet --shadow ./src
+gocyclo -over 12 src/
+gb test
diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go
new file mode 100644
index 00000000..366541af
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go
@@ -0,0 +1,35 @@
+// Package api provides the types that are used to communicate with the roomserver.
+package api
+
+const (
+ // KindOutlier event fall outside the contiguous event graph.
+ // We do not have the state for these events.
+ // These events are state events used to authenticate other events.
+ // They can become part of the contiguous event graph via backfill.
+ KindOutlier = 1
+ // KindJoin event start a new contiguous event graph. The event must be a
+ // m.room.member event joining this server to the room. This must come with
+ // the state at the event. If the event is contiguous with the existing
+ // graph for the room then it is treated as a normal new event.
+ KindJoin = 2
+ // KindNew event extend the contiguous graph going forwards.
+ // They usually don't need state, but may include state if the
+ // there was a new event that references an event that we don't
+ // have a copy of.
+ KindNew = 3
+ // KindBackfill event extend the contiguous graph going backwards.
+ // They always have state.
+ KindBackfill = 4
+)
+
+// InputRoomEvent is a matrix room event to add to the room server database.
+type InputRoomEvent struct {
+ // Whether this event is new, backfilled or an outlier.
+ // This controls how the event is processed.
+ Kind int
+ // The event JSON for the event to add.
+ Event []byte
+ // Optional list of state event IDs forming the state before this event.
+ // These state events must have already been persisted.
+ State []string
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go
new file mode 100644
index 00000000..6d2783f7
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go
@@ -0,0 +1,103 @@
+// Package input contains the code that writes
+package input
+
+import (
+ "github.com/matrix-org/dendrite/roomserver/types"
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// A ConsumerDatabase has the storage APIs needed by the consumer.
+type ConsumerDatabase interface {
+ // PartitionOffsets returns the offsets the consumer has reached for each partition.
+ PartitionOffsets(topic string) ([]types.PartitionOffset, error)
+ // SetPartitionOffset records where the consumer has reached for a partition.
+ SetPartitionOffset(topic string, partition int32, offset int64) error
+}
+
+// An ErrorLogger handles the errors encountered by the consumer.
+type ErrorLogger interface {
+ OnError(message *sarama.ConsumerMessage, err error)
+}
+
+// A Consumer consumes a kafkaesque stream of room events.
+// The room events are supplied as api.InputRoomEvent structs serialised as JSON.
+// The events should be valid matrix events.
+// The events needed to authenticate the event should already be stored on the roomserver.
+// The events needed to construct the state at the event should already be stored on the roomserver.
+// If the event is not valid then it will be discarded and an error will be logged.
+type Consumer struct {
+ // A kafkaesque stream consumer providing the APIs for talking to the event source.
+ // The interface is taken from a client library for Apache Kafka.
+ // But any equivalent event streaming protocol could be made to implement the same interface.
+ Consumer sarama.Consumer
+ // The database used to store the room events.
+ DB ConsumerDatabase
+ // The kafkaesque topic to consume room events from.
+ // This is the name used in kafka to identify the stream to consume events from.
+ RoomEventTopic string
+ // The ErrorLogger for this consumer.
+ // If left as nil then the consumer will panic when it encounters an error
+ ErrorLogger ErrorLogger
+}
+
+// Start starts the consumer consuming.
+// Starts up a goroutine for each partition in the kafka stream.
+// Returns nil once all the goroutines are started.
+// Returns an error if it can't start consuming for any of the partitions.
+func (c *Consumer) Start() error {
+ offsets := map[int32]int64{}
+
+ partitions, err := c.Consumer.Partitions(c.RoomEventTopic)
+ if err != nil {
+ return err
+ }
+ for _, partition := range partitions {
+ // Default all the offsets to the beginning of the stream.
+ offsets[partition] = sarama.OffsetOldest
+ }
+
+ storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic)
+ if err != nil {
+ return err
+ }
+ for _, offset := range storedOffsets {
+ // We've already processed events from this partition so advance the offset to where we got to.
+ offsets[offset.Partition] = offset.Offset
+ }
+
+ var partitionConsumers []sarama.PartitionConsumer
+ for partition, offset := range offsets {
+ pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset)
+ if err != nil {
+ for _, p := range partitionConsumers {
+ p.Close()
+ }
+ return err
+ }
+ partitionConsumers = append(partitionConsumers, pc)
+ }
+ for _, pc := range partitionConsumers {
+ go c.consumePartition(pc)
+ }
+
+ return nil
+}
+
+// consumePartition consumes the room events for a single partition of the kafkaesque stream.
+func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
+ defer pc.Close()
+ for message := range pc.Messages() {
+ // TODO: Do stuff with message.
+ if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
+ c.logError(message, err)
+ }
+ }
+}
+
+// logError is a convenience method for logging errors.
+func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) {
+ if c.ErrorLogger == nil {
+ panic(err)
+ }
+ c.ErrorLogger.OnError(message, err)
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go
new file mode 100644
index 00000000..0205ff00
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go
@@ -0,0 +1,44 @@
+package main
+
+import (
+ "fmt"
+ "github.com/matrix-org/dendrite/roomserver/input"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ sarama "gopkg.in/Shopify/sarama.v1"
+ "os"
+ "strings"
+)
+
+var (
+ database = os.Getenv("DATABASE")
+ kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
+ roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT")
+)
+
+func main() {
+ db, err := storage.Open(database)
+ if err != nil {
+ panic(err)
+ }
+
+ kafkaConsumer, err := sarama.NewConsumer(kafkaURIs, nil)
+ if err != nil {
+ panic(err)
+ }
+
+ consumer := input.Consumer{
+ Consumer: kafkaConsumer,
+ DB: db,
+ RoomEventTopic: roomEventTopic,
+ }
+
+ if err = consumer.Start(); err != nil {
+ panic(err)
+ }
+
+ fmt.Println("Started roomserver")
+
+ // Wait forever.
+ // TODO: Implement clean shutdown.
+ select {}
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go
new file mode 100644
index 00000000..3e988f21
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go
@@ -0,0 +1,70 @@
+package storage
+
+import (
+ "database/sql"
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+type statements struct {
+ selectPartitionOffsetsStmt *sql.Stmt
+ upsertPartitionOffsetStmt *sql.Stmt
+}
+
+func (s *statements) prepare(db *sql.DB) error {
+ var err error
+
+ _, err = db.Exec(partitionOffsetsSchema)
+ if err != nil {
+ return err
+ }
+
+ if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil {
+ return err
+ }
+ if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil {
+ return err
+ }
+ return nil
+}
+
+const partitionOffsetsSchema = `
+-- The offsets that the server has processed up to.
+CREATE TABLE IF NOT EXISTS partition_offsets (
+ -- The name of the topic.
+ topic TEXT NOT NULL,
+ -- The 32-bit partition ID
+ partition INTEGER NOT NULL,
+ -- The 64-bit offset.
+ partition_offset BIGINT NOT NULL,
+ CONSTRAINT topic_partition_unique UNIQUE (topic, partition)
+);
+`
+
+const selectPartitionOffsetsSQL = "" +
+ "SELECT partition, partition_offset FROM partition_offsets WHERE topic = $1"
+
+const upsertPartitionOffsetsSQL = "" +
+ "INSERT INTO partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" +
+ " ON CONFLICT ON CONSTRAINT topic_partition_unique" +
+ " DO UPDATE SET partition_offset = $3"
+
+func (s *statements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) {
+ rows, err := s.selectPartitionOffsetsStmt.Query(topic)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var results []types.PartitionOffset
+ for rows.Next() {
+ var offset types.PartitionOffset
+ if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
+ return nil, err
+ }
+ }
+ return results, nil
+}
+
+func (s *statements) upsertPartitionOffset(topic string, partition int32, offset int64) error {
+ _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
+ return err
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go
new file mode 100644
index 00000000..2b162a81
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go
@@ -0,0 +1,37 @@
+package storage
+
+import (
+ "database/sql"
+ // Import the postgres database driver.
+ _ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+// A Database is used to store room events and stream offsets.
+type Database struct {
+ statements statements
+ db *sql.DB
+}
+
+// Open a postgres database.
+func Open(dataSourceName string) (*Database, error) {
+ var d Database
+ var err error
+ if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = d.statements.prepare(d.db); err != nil {
+ return nil, err
+ }
+ return &d, nil
+}
+
+// PartitionOffsets implements input.ConsumerDatabase
+func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) {
+ return d.statements.selectPartitionOffsets(topic)
+}
+
+// SetPartitionOffset implements input.ConsumerDatabase
+func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
+ return d.statements.upsertPartitionOffset(topic, partition, offset)
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go
new file mode 100644
index 00000000..3c6dd5bb
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go
@@ -0,0 +1,10 @@
+// Package types provides the types that are used internally within the roomserver.
+package types
+
+// A PartitionOffset is the offset into a partition of the input log.
+type PartitionOffset struct {
+ // The ID of the partition.
+ Partition int32
+ // The offset into the partition.
+ Offset int64
+}