aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBrendan Abolivier <contact@brendanabolivier.com>2017-08-02 16:21:35 +0100
committerMark Haines <mjark@negativecurvature.net>2017-08-02 16:21:35 +0100
commit0fbb8b782462238e29f95972b377e6e6625f0ad1 (patch)
tree86e0b95dc9364122a7e0f70b0e9bff81c887e9b1 /src
parent7d17df6f51b73283cb5073e6bd1ca4441594e14c (diff)
Make account data sync incremental (#170)
* Clean roomserver consumer * Make account data sync incremental * Use a different name for the sync AD table * Improved error logging * Created missing topic in tests * Add client API topic to tests * Add client API topic to common * Move data batch retrieval * Add database index for data retrieval * Fix typo in table name * Fix indentation
Diffstat (limited to 'src')
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go39
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go8
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go65
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go7
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/routing/routing.go5
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go8
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go13
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go6
-rw-r--r--src/github.com/matrix-org/dendrite/common/config/config.go3
-rw-r--r--src/github.com/matrix-org/dendrite/common/config/config_test.go1
-rw-r--r--src/github.com/matrix-org/dendrite/common/test/config.go1
-rw-r--r--src/github.com/matrix-org/dendrite/common/types.go22
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go91
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go21
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go113
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go42
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go53
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go10
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go58
19 files changed, 500 insertions, 66 deletions
diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go
index 63e84a66..0c1fc0ff 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go
@@ -44,12 +44,16 @@ const insertAccountDataSQL = `
const selectAccountDataSQL = "" +
"SELECT room_id, type, content FROM account_data WHERE localpart = $1"
+const selectAccountDataByTypeSQL = "" +
+ "SELECT content FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3"
+
const deleteAccountDataSQL = "" +
"DELETE FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3"
type accountDataStatements struct {
- insertAccountDataStmt *sql.Stmt
- selectAccountDataStmt *sql.Stmt
+ insertAccountDataStmt *sql.Stmt
+ selectAccountDataStmt *sql.Stmt
+ selectAccountDataByTypeStmt *sql.Stmt
}
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
@@ -63,6 +67,9 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
if s.selectAccountDataStmt, err = db.Prepare(selectAccountDataSQL); err != nil {
return
}
+ if s.selectAccountDataByTypeStmt, err = db.Prepare(selectAccountDataByTypeSQL); err != nil {
+ return
+ }
return
}
@@ -107,3 +114,31 @@ func (s *accountDataStatements) selectAccountData(localpart string) (
return
}
+
+func (s *accountDataStatements) selectAccountDataByType(
+ localpart string, roomID string, dataType string,
+) (data []gomatrixserverlib.ClientEvent, err error) {
+ data = []gomatrixserverlib.ClientEvent{}
+
+ rows, err := s.selectAccountDataByTypeStmt.Query(localpart, roomID, dataType)
+ if err != nil {
+ return
+ }
+
+ for rows.Next() {
+ var content []byte
+
+ if err = rows.Scan(&content); err != nil {
+ return
+ }
+
+ ac := gomatrixserverlib.ClientEvent{
+ Type: dataType,
+ Content: content,
+ }
+
+ data = append(data, ac)
+ }
+
+ return
+}
diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go
index a7b2c786..ca9deac0 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go
@@ -224,6 +224,14 @@ func (d *Database) GetAccountData(localpart string) (
return d.accountDatas.selectAccountData(localpart)
}
+// GetAccountDataByType returns account data matching a given
+// localpart, room ID and type.
+// If no account data could be found, returns an empty array
+// Returns an error if there was an issue with the retrieval
+func (d *Database) GetAccountDataByType(localpart string, roomID string, dataType string) (data []gomatrixserverlib.ClientEvent, err error) {
+ return d.accountDatas.selectAccountDataByType(localpart, roomID, dataType)
+}
+
func hashPassword(plaintext string) (hash string, err error) {
hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost)
return string(hashBytes), err
diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go
new file mode 100644
index 00000000..2597089e
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go
@@ -0,0 +1,65 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/common"
+
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// SyncAPIProducer produces events for the sync API server to consume
+type SyncAPIProducer struct {
+ Topic string
+ Producer sarama.SyncProducer
+}
+
+// NewSyncAPIProducer creates a new SyncAPIProducer
+func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) {
+ producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
+ if err != nil {
+ return nil, err
+ }
+ return &SyncAPIProducer{
+ Topic: topic,
+ Producer: producer,
+ }, nil
+}
+
+// SendData sends account data to the sync API server
+func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
+ var m sarama.ProducerMessage
+
+ data := common.AccountData{
+ RoomID: roomID,
+ Type: dataType,
+ }
+ value, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+
+ m.Topic = string(p.Topic)
+ m.Key = sarama.StringEncoder(userID)
+ m.Value = sarama.ByteEncoder(value)
+
+ if _, _, err := p.Producer.SendMessage(&m); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go b/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go
index ca2c2232..d3bb932d 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -30,7 +31,7 @@ import (
// SaveAccountData implements PUT /user/{userId}/[rooms/{roomId}/]account_data/{type}
func SaveAccountData(
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
- userID string, roomID string, dataType string,
+ userID string, roomID string, dataType string, syncProducer *producers.SyncAPIProducer,
) util.JSONResponse {
if req.Method != "PUT" {
return util.JSONResponse{
@@ -62,6 +63,10 @@ func SaveAccountData(
return httputil.LogThenError(req, err)
}
+ if err := syncProducer.SendData(userID, roomID, dataType); err != nil {
+ return httputil.LogThenError(req, err)
+ }
+
return util.JSONResponse{
Code: 200,
JSON: struct{}{},
diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go
index 5a2935e8..aeb23164 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go
@@ -48,6 +48,7 @@ func Setup(
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer,
+ syncProducer *producers.SyncAPIProducer,
) {
apiMux := mux.NewRouter()
@@ -291,14 +292,14 @@ func Setup(
r0mux.Handle("/user/{userID}/account_data/{type}",
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
- return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"])
+ return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"], syncProducer)
}),
)
r0mux.Handle("/user/{userID}/rooms/{roomID}/account_data/{type}",
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
- return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"])
+ return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"], syncProducer)
}),
)
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go
index 0f8c035f..6f568b1a 100644
--- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go
@@ -61,6 +61,12 @@ func main() {
if err != nil {
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
}
+ syncProducer, err := producers.NewSyncAPIProducer(
+ cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData),
+ )
+ if err != nil {
+ log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
+ }
federation := gomatrixserverlib.NewFederationClient(
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
@@ -99,7 +105,7 @@ func main() {
routing.Setup(
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing,
- userUpdateProducer,
+ userUpdateProducer, syncProducer,
)
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
}
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go
index c7870684..77ada412 100644
--- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go
@@ -73,12 +73,19 @@ func main() {
if err = n.Load(db); err != nil {
log.Panicf("startup: failed to set up notifier: %s", err)
}
- consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
+ roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
if err != nil {
log.Panicf("startup: failed to create room server consumer: %s", err)
}
- if err = consumer.Start(); err != nil {
- log.Panicf("startup: failed to start room server consumer")
+ if err = roomConsumer.Start(); err != nil {
+ log.Panicf("startup: failed to start room server consumer: %s", err)
+ }
+ clientConsumer, err := consumers.NewOutputClientData(cfg, n, db)
+ if err != nil {
+ log.Panicf("startup: failed to create client API server consumer: %s", err)
+ }
+ if err = clientConsumer.Start(); err != nil {
+ log.Panicf("startup: failed to start client API server consumer: %s", err)
}
log.Info("Starting sync server on ", cfg.Listen.SyncAPI)
diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go
index b0e36c42..e39a8980 100644
--- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go
@@ -54,6 +54,7 @@ var (
)
const inputTopic = "syncserverInput"
+const clientTopic = "clientapiserverOutput"
var exe = test.KafkaExecutor{
ZookeeperURI: zookeeperURI,
@@ -134,6 +135,7 @@ func startSyncServer() (*exec.Cmd, chan error) {
cfg.Matrix.ServerName = "localhost"
cfg.Listen.SyncAPI = config.Address(syncserverAddr)
cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic)
+ cfg.Kafka.Topics.OutputClientData = config.Topic(clientTopic)
if err := test.WriteConfig(cfg, dir); err != nil {
panic(err)
@@ -177,6 +179,10 @@ func prepareKafka() {
if err := exe.CreateTopic(inputTopic); err != nil {
panic(err)
}
+ exe.DeleteTopic(clientTopic)
+ if err := exe.CreateTopic(clientTopic); err != nil {
+ panic(err)
+ }
}
func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go
index 4b362b5f..324561f6 100644
--- a/src/github.com/matrix-org/dendrite/common/config/config.go
+++ b/src/github.com/matrix-org/dendrite/common/config/config.go
@@ -98,6 +98,8 @@ type Dendrite struct {
Topics struct {
// Topic for roomserver/api.OutputRoomEvent events.
OutputRoomEvent Topic `yaml:"output_room_event"`
+ // Topic for sending account data from client API to sync API
+ OutputClientData Topic `yaml:"output_client_data"`
// Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"`
}
@@ -298,6 +300,7 @@ func (config *Dendrite) check() error {
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
+ checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
checkNotEmpty("database.account", string(config.Database.Account))
checkNotEmpty("database.device", string(config.Database.Device))
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go
index 7af61968..4275e3d4 100644
--- a/src/github.com/matrix-org/dendrite/common/config/config_test.go
+++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go
@@ -44,6 +44,7 @@ kafka:
topics:
input_room_event: input.room
output_room_event: output.room
+ output_client_data: output.client
database:
media_api: "postgresql:///media_api"
account: "postgresql:///account"
diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go
index e429d06b..a28a08d5 100644
--- a/src/github.com/matrix-org/dendrite/common/test/config.go
+++ b/src/github.com/matrix-org/dendrite/common/test/config.go
@@ -82,6 +82,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
// TODO: Different servers should be using different topics.
// Make this configurable somehow?
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
+ cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
// TODO: Use different databases for the different schemas.
// Using the same database for every schema currently works because
diff --git a/src/github.com/matrix-org/dendrite/common/types.go b/src/github.com/matrix-org/dendrite/common/types.go
new file mode 100644
index 00000000..471a2f30
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/common/types.go
@@ -0,0 +1,22 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package common
+
+// AccountData represents account data sent from the client API server to the
+// sync API server
+type AccountData struct {
+ RoomID string `json:"room_id"`
+ Type string `json:"type"`
+}
diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go
new file mode 100644
index 00000000..a2a240ff
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go
@@ -0,0 +1,91 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "encoding/json"
+
+ log "github.com/Sirupsen/logrus"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/sync"
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// OutputClientData consumes events that originated in the client API server.
+type OutputClientData struct {
+ clientAPIConsumer *common.ContinualConsumer
+ db *storage.SyncServerDatabase
+ notifier *sync.Notifier
+}
+
+// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
+func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) {
+ kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ consumer := common.ContinualConsumer{
+ Topic: string(cfg.Kafka.Topics.OutputClientData),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+ s := &OutputClientData{
+ clientAPIConsumer: &consumer,
+ db: store,
+ notifier: n,
+ }
+ consumer.ProcessMessage = s.onMessage
+
+ return s, nil
+}
+
+// Start consuming from room servers
+func (s *OutputClientData) Start() error {
+ return s.clientAPIConsumer.Start()
+}
+
+// onMessage is called when the sync server receives a new event from the client API server output log.
+// It is not safe for this function to be called from multiple goroutines, or else the
+// sync stream position may race and be incorrectly calculated.
+func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error {
+ // Parse out the event JSON
+ var output common.AccountData
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("client API server output log: message parse failure")
+ return nil
+ }
+
+ log.WithFields(log.Fields{
+ "type": output.Type,
+ "room_id": output.RoomID,
+ }).Info("received data from client API server")
+
+ syncStreamPos, err := s.db.UpsertAccountData(string(msg.Key), output.RoomID, output.Type)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "type": output.Type,
+ "room_id": output.RoomID,
+ log.ErrorKey: err,
+ }).Panicf("could not save account data")
+ }
+
+ s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
+
+ return nil
+}
diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go
index 13159c87..c846705f 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go
@@ -35,12 +35,9 @@ type OutputRoomEvent struct {
db *storage.SyncServerDatabase
notifier *sync.Notifier
query api.RoomserverQueryAPI
- serverName gomatrixserverlib.ServerName
- keyID gomatrixserverlib.KeyID
- privateKey []byte
}
-type prevMembership struct {
+type prevEventRef struct {
PrevContent json.RawMessage `json:"prev_content"`
PrevID string `json:"replaces_state"`
UserID string `json:"prev_sender"`
@@ -64,9 +61,6 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
db: store,
notifier: n,
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
- serverName: cfg.Matrix.ServerName,
- keyID: cfg.Matrix.KeyID,
- privateKey: cfg.Matrix.PrivateKey,
}
consumer.ProcessMessage = s.onMessage
@@ -113,13 +107,13 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: state event lookup failure")
}
- ev, err = s.updateStateEvent(ev, s.keyID, s.privateKey)
+ ev, err = s.updateStateEvent(ev)
if err != nil {
return err
}
for i := range addsStateEvents {
- addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i], s.keyID, s.privateKey)
+ addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
if err != nil {
return err
}
@@ -139,7 +133,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write event failure")
return nil
}
- s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos))
+ s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos))
return nil
}
@@ -201,10 +195,7 @@ func (s *OutputRoomEvent) lookupStateEvents(
return result, nil
}
-func (s *OutputRoomEvent) updateStateEvent(
- event gomatrixserverlib.Event, keyID gomatrixserverlib.KeyID,
- privateKey []byte,
-) (gomatrixserverlib.Event, error) {
+func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
var stateKey string
if event.StateKey() == nil {
stateKey = ""
@@ -221,7 +212,7 @@ func (s *OutputRoomEvent) updateStateEvent(
return event, nil
}
- prev := prevMembership{
+ prev := prevEventRef{
PrevContent: prevEvent.Content(),
PrevID: prevEvent.EventID(),
UserID: prevEvent.Sender(),
diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go
new file mode 100644
index 00000000..f95e4858
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go
@@ -0,0 +1,113 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+const accountDataSchema = `
+-- Stores the users account data
+CREATE TABLE IF NOT EXISTS account_data_type (
+ -- The highest numeric ID from the output_room_events at the time of saving the data
+ id BIGINT,
+ -- ID of the user the data belongs to
+ user_id TEXT NOT NULL,
+ -- ID of the room the data is related to (empty string if not related to a specific room)
+ room_id TEXT NOT NULL,
+ -- Type of the data
+ type TEXT NOT NULL,
+
+ PRIMARY KEY(user_id, room_id, type),
+
+ -- We don't want two entries of the same type for the same user
+ CONSTRAINT account_data_unique UNIQUE (user_id, room_id, type)
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS account_data_id_idx ON account_data_type(id);
+`
+
+const insertAccountDataSQL = "" +
+ "INSERT INTO account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
+ " ON CONFLICT ON CONSTRAINT account_data_unique" +
+ " DO UPDATE SET id = EXCLUDED.id"
+
+const selectAccountDataInRangeSQL = "" +
+ "SELECT room_id, type FROM account_data_type" +
+ " WHERE user_id = $1 AND id > $2 AND id <= $3" +
+ " ORDER BY id ASC"
+
+type accountDataStatements struct {
+ insertAccountDataStmt *sql.Stmt
+ selectAccountDataInRangeStmt *sql.Stmt
+}
+
+func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(accountDataSchema)
+ if err != nil {
+ return
+ }
+ if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
+ return
+ }
+ if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *accountDataStatements) insertAccountData(
+ pos types.StreamPosition, userID string, roomID string, dataType string,
+) (err error) {
+ _, err = s.insertAccountDataStmt.Exec(pos, userID, roomID, dataType)
+ return
+}
+
+func (s *accountDataStatements) selectAccountDataInRange(
+ userID string, oldPos types.StreamPosition, newPos types.StreamPosition,
+) (data map[string][]string, err error) {
+ data = make(map[string][]string)
+
+ // If both positions are the same, it means that the data was saved after the
+ // latest room event. In that case, we need to decrement the old position as
+ // it would prevent the SQL request from returning anything.
+ if oldPos == newPos {
+ oldPos--
+ }
+
+ rows, err := s.selectAccountDataInRangeStmt.Query(userID, oldPos, newPos)
+ if err != nil {
+ return
+ }
+
+ for rows.Next() {
+ var dataType string
+ var roomID string
+
+ if err = rows.Scan(&roomID, &dataType); err != nil {
+ return
+ }
+
+ if len(data[roomID]) > 0 {
+ data[roomID] = append(data[roomID], dataType)
+ } else {
+ data[roomID] = []string{dataType}
+ }
+ }
+
+ return
+}
diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go
index 46231c77..2433b68c 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go
@@ -41,10 +41,11 @@ type streamEvent struct {
// SyncServerDatabase represents a sync server database
type SyncServerDatabase struct {
- db *sql.DB
- partitions common.PartitionOffsetStatements
- events outputRoomEventsStatements
- roomstate currentRoomStateStatements
+ db *sql.DB
+ partitions common.PartitionOffsetStatements
+ accountData accountDataStatements
+ events outputRoomEventsStatements
+ roomstate currentRoomStateStatements
}
// NewSyncServerDatabase creates a new sync server database
@@ -58,6 +59,10 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
if err = partitions.Prepare(db); err != nil {
return nil, err
}
+ accountData := accountDataStatements{}
+ if err = accountData.prepare(db); err != nil {
+ return nil, err
+ }
events := outputRoomEventsStatements{}
if err = events.prepare(db); err != nil {
return nil, err
@@ -66,7 +71,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
if err := state.prepare(db); err != nil {
return nil, err
}
- return &SyncServerDatabase{db, partitions, events, state}, nil
+ return &SyncServerDatabase{db, partitions, accountData, events, state}, nil
}
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
@@ -274,6 +279,33 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
return
}
+// GetAccountDataInRange returns all account data for a given user inserted or
+// updated between two given positions
+// Returns a map following the format data[roomID] = []dataTypes
+// If no data is retrieved, returns an empty map
+// If there was an issue with the retrieval, returns an error
+func (d *SyncServerDatabase) GetAccountDataInRange(
+ userID string, oldPos types.StreamPosition, newPos types.StreamPosition,
+) (map[string][]string, error) {
+ return d.accountData.selectAccountDataInRange(userID, oldPos, newPos)
+}
+
+// UpsertAccountData keeps track of new or updated account data, by saving the type
+// of the new/updated data, and the user ID and room ID the data is related to (empty)
+// room ID means the data isn't specific to any room)
+// If no data with the given type, user ID and room ID exists in the database,
+// creates a new row, else update the existing one
+// Returns an error if there was an issue with the upsert
+func (d *SyncServerDatabase) UpsertAccountData(userID string, roomID string, dataType string) (types.StreamPosition, error) {
+ pos, err := d.SyncStreamPosition()
+ if err != nil {
+ return pos, err
+ }
+
+ err = d.accountData.insertAccountData(pos, userID, roomID, dataType)
+ return pos, err
+}
+
func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error {
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(txn, userID, "invite")
diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go
index 814660a7..c2fdd8f0 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go
@@ -54,39 +54,44 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly.
-func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
+// Can be called either with a *gomatrixserverlib.Event, or with an user ID
+func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) {
// update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos = pos
- // Map this event's room_id to a list of joined users, and wake them up.
- userIDs := n.joinedUsers(ev.RoomID())
- // If this is an invite, also add in the invitee to this list.
- if ev.Type() == "m.room.member" && ev.StateKey() != nil {
- userID := *ev.StateKey()
- membership, err := ev.Membership()
- if err != nil {
- log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
- "Notifier.OnNewEvent: Failed to unmarshal member event",
- )
- } else {
- // Keep the joined user map up-to-date
- switch membership {
- case "invite":
- userIDs = append(userIDs, userID)
- case "join":
- n.addJoinedUser(ev.RoomID(), userID)
- case "leave":
- fallthrough
- case "ban":
- n.removeJoinedUser(ev.RoomID(), userID)
+ if ev != nil {
+ // Map this event's room_id to a list of joined users, and wake them up.
+ userIDs := n.joinedUsers(ev.RoomID())
+ // If this is an invite, also add in the invitee to this list.
+ if ev.Type() == "m.room.member" && ev.StateKey() != nil {
+ userID := *ev.StateKey()
+ membership, err := ev.Membership()
+ if err != nil {
+ log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
+ "Notifier.OnNewEvent: Failed to unmarshal member event",
+ )
+ } else {
+ // Keep the joined user map up-to-date
+ switch membership {
+ case "invite":
+ userIDs = append(userIDs, userID)
+ case "join":
+ n.addJoinedUser(ev.RoomID(), userID)
+ case "leave":
+ fallthrough
+ case "ban":
+ n.removeJoinedUser(ev.RoomID(), userID)
+ }
}
}
- }
- for _, userID := range userIDs {
+ for _, userID := range userIDs {
+ n.wakeupUser(userID, pos)
+ }
+ } else if len(userID) > 0 {
n.wakeupUser(userID, pos)
}
}
diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go
index 03e39da0..358243bc 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go
@@ -123,7 +123,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
- n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
+ n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
wg.Wait()
}
@@ -151,7 +151,7 @@ func TestNewInviteEventForUser(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
- n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter)
+ n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
wg.Wait()
}
@@ -182,7 +182,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 3)
- n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
+ n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
wg.Wait()
@@ -217,7 +217,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
}()
bobStream := n.fetchUserStream(bob, true)
waitForBlocking(bobStream, 1)
- n.OnNewEvent(&bobLeaveEvent, streamPositionAfter)
+ n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
@@ -246,7 +246,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
waitForBlocking(aliceStream, 1)
waitForBlocking(bobStream, 1)
- n.OnNewEvent(&randomMessageEvent, streamPositionAfter2)
+ n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
aliceWG.Wait()
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
index 953e5f4f..a207b815 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
@@ -80,7 +80,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
if err != nil {
res = httputil.LogThenError(req, err)
} else {
- syncData, err = rp.appendAccountData(syncData, device.UserID)
+ syncData, err = rp.appendAccountData(syncData, device.UserID, *syncReq, currentPos)
if err != nil {
res = httputil.LogThenError(req, err)
} else {
@@ -113,7 +113,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre
return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
}
-func (rp *RequestPool) appendAccountData(data *types.Response, userID string) (*types.Response, error) {
+func (rp *RequestPool) appendAccountData(
+ data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
+) (*types.Response, error) {
// TODO: We currently send all account data on every sync response, we should instead send data
// that has changed on incremental sync responses
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
@@ -121,16 +123,56 @@ func (rp *RequestPool) appendAccountData(data *types.Response, userID string) (*
return nil, err
}
- global, rooms, err := rp.accountDB.GetAccountData(localpart)
+ if req.since == types.StreamPosition(0) {
+ // If this is the initial sync, we don't need to check if a data has
+ // already been sent. Instead, we send the whole batch.
+ var global []gomatrixserverlib.ClientEvent
+ var rooms map[string][]gomatrixserverlib.ClientEvent
+ global, rooms, err = rp.accountDB.GetAccountData(localpart)
+ if err != nil {
+ return nil, err
+ }
+ data.AccountData.Events = global
+
+ for r, j := range data.Rooms.Join {
+ if len(rooms[r]) > 0 {
+ j.AccountData.Events = rooms[r]
+ data.Rooms.Join[r] = j
+ }
+ }
+
+ return data, nil
+ }
+
+ // Sync is not initial, get all account data since the latest sync
+ dataTypes, err := rp.db.GetAccountDataInRange(userID, req.since, currentPos)
if err != nil {
return nil, err
}
- data.AccountData.Events = global
- for r, j := range data.Rooms.Join {
- if len(rooms[r]) > 0 {
- j.AccountData.Events = rooms[r]
- data.Rooms.Join[r] = j
+ if len(dataTypes) == 0 {
+ return data, nil
+ }
+
+ // Iterate over the rooms
+ for roomID, dataTypes := range dataTypes {
+ events := []gomatrixserverlib.ClientEvent{}
+ // Request the missing data from the database
+ for _, dataType := range dataTypes {
+ evs, err := rp.accountDB.GetAccountDataByType(localpart, roomID, dataType)
+ if err != nil {
+ return nil, err
+ }
+ events = append(events, evs...)
+ }
+
+ // Append the data to the response
+ if len(roomID) > 0 {
+ jr := data.Rooms.Join[roomID]
+ jr.AccountData.Events = events
+ data.Rooms.Join[roomID] = jr
+ } else {
+ data.AccountData.Events = events
}
}