aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--currentstateserver/api/api.go64
-rw-r--r--currentstateserver/consumers/roomserver.go140
-rw-r--r--currentstateserver/currentstateserver.go51
-rw-r--r--currentstateserver/currentstateserver_test.go180
-rw-r--r--currentstateserver/internal/api.go41
-rw-r--r--currentstateserver/inthttp/client.go62
-rw-r--r--currentstateserver/inthttp/server.go41
-rw-r--r--currentstateserver/storage/interface.go32
-rw-r--r--currentstateserver/storage/postgres/current_room_state_table.go205
-rw-r--r--currentstateserver/storage/postgres/storage.go35
-rw-r--r--currentstateserver/storage/shared/storage.go65
-rw-r--r--currentstateserver/storage/sqlite3/current_room_state_table.go201
-rw-r--r--currentstateserver/storage/sqlite3/storage.go39
-rw-r--r--currentstateserver/storage/storage.go41
-rw-r--r--currentstateserver/storage/storage_wasm.go42
-rw-r--r--currentstateserver/storage/tables/interface.go31
-rw-r--r--dendrite-config.yaml2
-rw-r--r--internal/config/config.go22
-rw-r--r--internal/config/config_test.go2
19 files changed, 1293 insertions, 3 deletions
diff --git a/currentstateserver/api/api.go b/currentstateserver/api/api.go
new file mode 100644
index 00000000..10433722
--- /dev/null
+++ b/currentstateserver/api/api.go
@@ -0,0 +1,64 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package api
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type CurrentStateInternalAPI interface {
+ QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
+}
+
+type QueryCurrentStateRequest struct {
+ RoomID string
+ StateTuples []gomatrixserverlib.StateKeyTuple
+}
+
+type QueryCurrentStateResponse struct {
+ StateEvents map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent
+}
+
+// MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
+func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) {
+ se := make(map[string]gomatrixserverlib.HeaderedEvent, len(r.StateEvents))
+ for k, v := range r.StateEvents {
+ // use 0x1F (unit separator) as the delimiter between type/state key,
+ se[fmt.Sprintf("%s\x1F%s", k.EventType, k.StateKey)] = v
+ }
+ return json.Marshal(se)
+}
+
+func (r *QueryCurrentStateResponse) UnmarshalJSON(data []byte) error {
+ res := make(map[string]gomatrixserverlib.HeaderedEvent)
+ err := json.Unmarshal(data, &res)
+ if err != nil {
+ return err
+ }
+ r.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent, len(res))
+ for k, v := range res {
+ fields := strings.Split(k, "\x1F")
+ r.StateEvents[gomatrixserverlib.StateKeyTuple{
+ EventType: fields[0],
+ StateKey: fields[1],
+ }] = v
+ }
+ return nil
+}
diff --git a/currentstateserver/consumers/roomserver.go b/currentstateserver/consumers/roomserver.go
new file mode 100644
index 00000000..9e2694b0
--- /dev/null
+++ b/currentstateserver/consumers/roomserver.go
@@ -0,0 +1,140 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/currentstateserver/storage"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+type OutputRoomEventConsumer struct {
+ rsConsumer *internal.ContinualConsumer
+ db storage.Database
+}
+
+func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database) *OutputRoomEventConsumer {
+ consumer := &internal.ContinualConsumer{
+ Topic: topicName,
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+ s := &OutputRoomEventConsumer{
+ rsConsumer: consumer,
+ db: store,
+ }
+ consumer.ProcessMessage = s.onMessage
+
+ return s
+}
+
+func (c *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return nil
+ }
+
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ return c.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
+ case api.OutputTypeNewInviteEvent:
+ case api.OutputTypeRetireInviteEvent:
+ default:
+ log.WithField("type", output.Type).Debug(
+ "roomserver output log: ignoring unknown output type",
+ )
+ }
+ return nil
+}
+
+func (c *OutputRoomEventConsumer) onNewRoomEvent(
+ ctx context.Context, msg api.OutputNewRoomEvent,
+) error {
+ ev := msg.Event
+
+ addsStateEvents := msg.AddsState()
+
+ ev, err := c.updateStateEvent(ev)
+ if err != nil {
+ return err
+ }
+
+ for i := range addsStateEvents {
+ addsStateEvents[i], err = c.updateStateEvent(addsStateEvents[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ err = c.db.StoreStateEvents(
+ ctx,
+ addsStateEvents,
+ msg.RemovesStateEventIDs,
+ )
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ "add": msg.AddsStateEventIDs,
+ "del": msg.RemovesStateEventIDs,
+ }).Panicf("roomserver output log: write event failure")
+ }
+ return nil
+}
+
+// Start consuming from room servers
+func (c *OutputRoomEventConsumer) Start() error {
+ return c.rsConsumer.Start()
+}
+
+func (c *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
+ var stateKey string
+ if event.StateKey() == nil {
+ stateKey = ""
+ } else {
+ stateKey = *event.StateKey()
+ }
+
+ prevEvent, err := c.db.GetStateEvent(
+ context.TODO(), event.RoomID(), event.Type(), stateKey,
+ )
+ if err != nil {
+ return event, err
+ }
+
+ if prevEvent == nil {
+ return event, nil
+ }
+
+ prev := types.PrevEventRef{
+ PrevContent: prevEvent.Content(),
+ ReplacesState: prevEvent.EventID(),
+ PrevSender: prevEvent.Sender(),
+ }
+
+ event.Event, err = event.SetUnsigned(prev)
+ return event, err
+}
diff --git a/currentstateserver/currentstateserver.go b/currentstateserver/currentstateserver.go
new file mode 100644
index 00000000..07d5e54a
--- /dev/null
+++ b/currentstateserver/currentstateserver.go
@@ -0,0 +1,51 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package currentstateserver
+
+import (
+ "github.com/Shopify/sarama"
+ "github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/currentstateserver/consumers"
+ "github.com/matrix-org/dendrite/currentstateserver/internal"
+ "github.com/matrix-org/dendrite/currentstateserver/inthttp"
+ "github.com/matrix-org/dendrite/currentstateserver/storage"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/sirupsen/logrus"
+)
+
+// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
+// on the given input API.
+func AddInternalRoutes(router *mux.Router, intAPI api.CurrentStateInternalAPI) {
+ inthttp.AddRoutes(router, intAPI)
+}
+
+// NewInternalAPI returns a concrete implementation of the internal API. Callers
+// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
+func NewInternalAPI(cfg *config.Dendrite, consumer sarama.Consumer) api.CurrentStateInternalAPI {
+ csDB, err := storage.NewDatabase(string(cfg.Database.CurrentState), cfg.DbProperties())
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to open database")
+ }
+ roomConsumer := consumers.NewOutputRoomEventConsumer(
+ string(cfg.Kafka.Topics.OutputRoomEvent), consumer, csDB,
+ )
+ if err = roomConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start room server consumer")
+ }
+ return &internal.CurrentStateInternalAPI{
+ DB: csDB,
+ }
+}
diff --git a/currentstateserver/currentstateserver_test.go b/currentstateserver/currentstateserver_test.go
new file mode 100644
index 00000000..95ca609b
--- /dev/null
+++ b/currentstateserver/currentstateserver_test.go
@@ -0,0 +1,180 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package currentstateserver
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/Shopify/sarama"
+ "github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/currentstateserver/inthttp"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/internal/test"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/naffka"
+)
+
+var (
+ testRoomVersion = gomatrixserverlib.RoomVersionV1
+ testData = []json.RawMessage{
+ []byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$0ok8ynDp7kjc95e3:kaer.morhen","hashes":{"sha256":"17kPoH+h0Dk4Omn7Sus0qMb6+oGcf+CZFEgDhv7UKWs"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"jP4a04f5/F10Pw95FPpdCyKAO44JOwUQ/MZOOeA/RTU1Dn+AHPMzGSaZnuGjRr/xQuADt+I3ctb5ZQfLKNzHDw"}},"state_key":"","type":"m.room.create"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"content":{"membership":"join"},"depth":1,"event_id":"$LEwEu0kxrtu5fOiS:kaer.morhen","hashes":{"sha256":"B7M88PhXf3vd1LaFtjQutFu4x/w7fHD28XKZ4sAsJTo"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"p2vqmuJn7ZBRImctSaKbXCAxCcBlIjPH9JHte1ouIUGy84gpu4eLipOvSBCLL26hXfC0Zrm4WUto6Hr+ohdrCg"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"join_rule":"public"},"depth":2,"event_id":"$SMHlqUrNhhBBRLeN:kaer.morhen","hashes":{"sha256":"vIuJQvmMjrGxshAkj1SXe0C4RqvMbv4ZADDw9pFCWqQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"hBMsb3Qppo3RaqqAl4JyTgaiWEbW5hlckATky6PrHun+F3YM203TzG7w9clwuQU5F5pZoB1a6nw+to0hN90FAw"}},"state_key":"","type":"m.room.join_rules"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"history_visibility":"shared"},"depth":3,"event_id":"$6F1yGIbO0J7TM93h:kaer.morhen","hashes":{"sha256":"Mr23GKSlZW7UCCYLgOWawI2Sg6KIoMjUWO2TDenuOgw"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$SMHlqUrNhhBBRLeN:kaer.morhen",{"sha256":"SylzE8U02I+6eyEHgL+FlU0L5YdqrVp8OOlxKS9VQW0"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sHLKrFI3hKGrEJfpMVZSDS3LvLasQsy50CTsOwru9XTVxgRsPo6wozNtRVjxo1J3Rk18RC9JppovmQ5VR5EcDw"}},"state_key":"","type":"m.room.history_visibility"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"ban":50,"events":null,"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":null,"users_default":0},"depth":4,"event_id":"$UKNe10XzYzG0TeA9:kaer.morhen","hashes":{"sha256":"ngbP3yja9U5dlckKerUs/fSOhtKxZMCVvsfhPURSS28"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6F1yGIbO0J7TM93h:kaer.morhen",{"sha256":"A4CucrKSoWX4IaJXhq02mBg1sxIyZEftbC+5p3fZAvk"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zOmwlP01QL3yFchzuR9WHvogOoBZA3oVtNIF3lM0ZfDnqlSYZB9sns27G/4HVq0k7alaK7ZE3oGoCrVnMkPNCw"}},"state_key":"","type":"m.room.power_levels"}`),
+ // messages
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":6,"event_id":"$MYSbs8m4rEbsCWXD:kaer.morhen","hashes":{"sha256":"kgbYM7v4Ud2YaBsjBTolM4ySg6rHcJNYI6nWhMSdFUA"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$gl2T9l3qm0kUbiIJ:kaer.morhen",{"sha256":"C/rD04h9wGxRdN2G/IBfrgoE1UovzLZ+uskwaKZ37/Q"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"x0UoKh968jj/F5l1/R7Ew0T6CTKuew3PLNHASNxqck/bkNe8yYQiDHXRr+kZxObeqPZZTpaF1+EI+bLU9W8GDQ"}},"type":"m.room.message"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":7,"event_id":"$N5x9WJkl9ClPrAEg:kaer.morhen","hashes":{"sha256":"FWM8oz4yquTunRZ67qlW2gzPDzdWfBP6RPHXhK1I/x8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$MYSbs8m4rEbsCWXD:kaer.morhen",{"sha256":"fatqgW+SE8mb2wFn3UN+drmluoD4UJ/EcSrL6Ur9q1M"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"Y+LX/xcyufoXMOIoqQBNOzy6lZfUGB1ffgXIrSugk6obMiyAsiRejHQN/pciZXsHKxMJLYRFAz4zSJoS/LGPAA"}},"type":"m.room.message"}`),
+ }
+ testEvents = []gomatrixserverlib.HeaderedEvent{}
+ testStateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent)
+
+ kafkaTopic = "room_events"
+)
+
+func init() {
+ for _, j := range testData {
+ e, err := gomatrixserverlib.NewEventFromTrustedJSON(j, false, testRoomVersion)
+ if err != nil {
+ panic("cannot load test data: " + err.Error())
+ }
+ h := e.Headered(testRoomVersion)
+ testEvents = append(testEvents, h)
+ if e.StateKey() != nil {
+ testStateEvents[gomatrixserverlib.StateKeyTuple{
+ EventType: e.Type(),
+ StateKey: *e.StateKey(),
+ }] = h
+ }
+ }
+}
+
+func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) error {
+ value, err := json.Marshal(roomserverAPI.OutputEvent{
+ Type: roomserverAPI.OutputTypeNewRoomEvent,
+ NewRoomEvent: out,
+ })
+ if err != nil {
+ t.Fatalf("failed to marshal output event: %s", err)
+ }
+ _, _, err = producer.SendMessage(&sarama.ProducerMessage{
+ Topic: kafkaTopic,
+ Key: sarama.StringEncoder(out.Event.RoomID()),
+ Value: sarama.ByteEncoder(value),
+ })
+ if err != nil {
+ t.Fatalf("failed to send message: %s", err)
+ }
+ return nil
+}
+
+func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.SyncProducer) {
+ cfg := &config.Dendrite{}
+ cfg.Kafka.Topics.OutputRoomEvent = config.Topic(kafkaTopic)
+ cfg.Database.CurrentState = config.DataSource("file::memory:")
+ db, err := sqlutil.Open(sqlutil.SQLiteDriverName(), "file::memory:", nil)
+ if err != nil {
+ t.Fatalf("Failed to open naffka database: %s", err)
+ }
+ naffkaDB, err := naffka.NewSqliteDatabase(db)
+ if err != nil {
+ t.Fatalf("Failed to setup naffka database: %s", err)
+ }
+ naff, err := naffka.New(naffkaDB)
+ if err != nil {
+ t.Fatalf("Failed to create naffka consumer: %s", err)
+ }
+ return NewInternalAPI(cfg, naff), naff
+}
+
+func TestQueryCurrentState(t *testing.T) {
+ currStateAPI, producer := MustMakeInternalAPI(t)
+ plTuple := gomatrixserverlib.StateKeyTuple{
+ EventType: "m.room.power_levels",
+ StateKey: "",
+ }
+ plEvent := testEvents[4]
+ MustWriteOutputEvent(t, producer, &roomserverAPI.OutputNewRoomEvent{
+ Event: plEvent,
+ AddsStateEventIDs: []string{plEvent.EventID()},
+ })
+ // we have no good way to know /when/ the server has consumed the event
+ time.Sleep(100 * time.Millisecond)
+
+ testCases := []struct {
+ req api.QueryCurrentStateRequest
+ wantRes api.QueryCurrentStateResponse
+ wantErr error
+ }{
+ {
+ req: api.QueryCurrentStateRequest{
+ RoomID: plEvent.RoomID(),
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ plTuple,
+ },
+ },
+ wantRes: api.QueryCurrentStateResponse{
+ StateEvents: map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent{
+ plTuple: plEvent,
+ },
+ },
+ },
+ }
+
+ runCases := func(testAPI api.CurrentStateInternalAPI) {
+ for _, tc := range testCases {
+ var gotRes api.QueryCurrentStateResponse
+ gotErr := testAPI.QueryCurrentState(context.TODO(), &tc.req, &gotRes)
+ if tc.wantErr == nil && gotErr != nil || tc.wantErr != nil && gotErr == nil {
+ t.Errorf("QueryCurrentState error, got %s want %s", gotErr, tc.wantErr)
+ continue
+ }
+ for tuple, wantEvent := range tc.wantRes.StateEvents {
+ gotEvent, ok := gotRes.StateEvents[tuple]
+ if !ok {
+ t.Errorf("QueryCurrentState want tuple %+v but it is missing from the response", tuple)
+ continue
+ }
+ if !reflect.DeepEqual(gotEvent.JSON(), wantEvent.JSON()) {
+ t.Errorf("QueryCurrentState tuple %+v got event JSON %s want %s", tuple, string(gotEvent.JSON()), string(wantEvent.JSON()))
+ }
+ }
+ }
+ }
+ t.Run("HTTP API", func(t *testing.T) {
+ router := mux.NewRouter().PathPrefix(httputil.InternalPathPrefix).Subrouter()
+ AddInternalRoutes(router, currStateAPI)
+ apiURL, cancel := test.ListenAndServe(t, router, false)
+ defer cancel()
+ httpAPI, err := inthttp.NewCurrentStateAPIClient(apiURL, &http.Client{})
+ if err != nil {
+ t.Fatalf("failed to create HTTP client")
+ }
+ runCases(httpAPI)
+ })
+ t.Run("Monolith", func(t *testing.T) {
+ runCases(currStateAPI)
+ })
+}
diff --git a/currentstateserver/internal/api.go b/currentstateserver/internal/api.go
new file mode 100644
index 00000000..d83a7a0f
--- /dev/null
+++ b/currentstateserver/internal/api.go
@@ -0,0 +1,41 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/currentstateserver/storage"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type CurrentStateInternalAPI struct {
+ DB storage.Database
+}
+
+func (a *CurrentStateInternalAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error {
+ res.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent)
+ for _, tuple := range req.StateTuples {
+ ev, err := a.DB.GetStateEvent(ctx, req.RoomID, tuple.EventType, tuple.StateKey)
+ if err != nil {
+ return err
+ }
+ if ev != nil {
+ res.StateEvents[tuple] = *ev
+ }
+ }
+ return nil
+}
diff --git a/currentstateserver/inthttp/client.go b/currentstateserver/inthttp/client.go
new file mode 100644
index 00000000..2267685a
--- /dev/null
+++ b/currentstateserver/inthttp/client.go
@@ -0,0 +1,62 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package inthttp
+
+import (
+ "context"
+ "errors"
+ "net/http"
+
+ "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/opentracing/opentracing-go"
+)
+
+// HTTP paths for the internal HTTP APIs
+const (
+ QueryCurrentStatePath = "/currentstateserver/queryCurrentState"
+)
+
+// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
+// If httpClient is nil an error is returned
+func NewCurrentStateAPIClient(
+ apiURL string,
+ httpClient *http.Client,
+) (api.CurrentStateInternalAPI, error) {
+ if httpClient == nil {
+ return nil, errors.New("NewCurrentStateAPIClient: httpClient is <nil>")
+ }
+ return &httpCurrentStateInternalAPI{
+ apiURL: apiURL,
+ httpClient: httpClient,
+ }, nil
+}
+
+type httpCurrentStateInternalAPI struct {
+ apiURL string
+ httpClient *http.Client
+}
+
+func (h *httpCurrentStateInternalAPI) QueryCurrentState(
+ ctx context.Context,
+ request *api.QueryCurrentStateRequest,
+ response *api.QueryCurrentStateResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "QueryCurrentState")
+ defer span.Finish()
+
+ apiURL := h.apiURL + QueryCurrentStatePath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
diff --git a/currentstateserver/inthttp/server.go b/currentstateserver/inthttp/server.go
new file mode 100644
index 00000000..83bac6eb
--- /dev/null
+++ b/currentstateserver/inthttp/server.go
@@ -0,0 +1,41 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package inthttp
+
+import (
+ "encoding/json"
+ "net/http"
+
+ "github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/util"
+)
+
+func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
+ internalAPIMux.Handle(QueryCurrentStatePath,
+ httputil.MakeInternalAPI("queryCurrentState", func(req *http.Request) util.JSONResponse {
+ request := api.QueryCurrentStateRequest{}
+ response := api.QueryCurrentStateResponse{}
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := intAPI.QueryCurrentState(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
+}
diff --git a/currentstateserver/storage/interface.go b/currentstateserver/storage/interface.go
new file mode 100644
index 00000000..488df9e2
--- /dev/null
+++ b/currentstateserver/storage/interface.go
@@ -0,0 +1,32 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type Database interface {
+ internal.PartitionStorer
+ // StoreStateEvents updates the database with new events from the roomserver.
+ StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent, removeStateEventIDs []string) error
+ // GetStateEvent returns the state event of a given type for a given room with a given state key
+ // If no event could be found, returns nil
+ // If there was an issue during the retrieval, returns an error
+ GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
+}
diff --git a/currentstateserver/storage/postgres/current_room_state_table.go b/currentstateserver/storage/postgres/current_room_state_table.go
new file mode 100644
index 00000000..255be42a
--- /dev/null
+++ b/currentstateserver/storage/postgres/current_room_state_table.go
@@ -0,0 +1,205 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/currentstateserver/storage/tables"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const currentRoomStateSchema = `
+-- Stores the current room state for every room.
+CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
+ -- The 'room_id' key for the state event.
+ room_id TEXT NOT NULL,
+ -- The state event ID
+ event_id TEXT NOT NULL,
+ -- The state event type e.g 'm.room.member'
+ type TEXT NOT NULL,
+ -- The 'sender' property of the event.
+ sender TEXT NOT NULL,
+ -- The state_key value for this state event e.g ''
+ state_key TEXT NOT NULL,
+ -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
+ headered_event_json TEXT NOT NULL,
+ -- The 'content.membership' value if this event is an m.room.member event. For other
+ -- events, this will be NULL.
+ membership TEXT,
+ -- Clobber based on 3-uple of room_id, type and state_key
+ CONSTRAINT currentstate_current_room_state_unique UNIQUE (room_id, type, state_key)
+);
+-- for event deletion
+CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
+-- for querying membership states of users
+CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
+`
+
+const upsertRoomStateSQL = "" +
+ "INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" +
+ " VALUES ($1, $2, $3, $4, $5, $6, $7)" +
+ " ON CONFLICT ON CONSTRAINT currentstate_room_state_unique" +
+ " DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7"
+
+const deleteRoomStateByEventIDSQL = "" +
+ "DELETE FROM currentstate_current_room_state WHERE event_id = $1"
+
+const selectRoomIDsWithMembershipSQL = "" +
+ "SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
+
+const selectStateEventSQL = "" +
+ "SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
+
+const selectEventsWithEventIDsSQL = "" +
+ "SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)"
+
+type currentRoomStateStatements struct {
+ upsertRoomStateStmt *sql.Stmt
+ deleteRoomStateByEventIDStmt *sql.Stmt
+ selectRoomIDsWithMembershipStmt *sql.Stmt
+ selectEventsWithEventIDsStmt *sql.Stmt
+ selectStateEventStmt *sql.Stmt
+}
+
+func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
+ s := &currentRoomStateStatements{}
+ _, err := db.Exec(currentRoomStateSchema)
+ if err != nil {
+ return nil, err
+ }
+ if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
+ return nil, err
+ }
+ if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
+ return nil, err
+ }
+ if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
+ return nil, err
+ }
+ if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
+ return nil, err
+ }
+ if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
+func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
+ ctx context.Context,
+ txn *sql.Tx,
+ userID string,
+ membership string,
+) ([]string, error) {
+ stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
+ rows, err := stmt.QueryContext(ctx, userID, membership)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
+
+ var result []string
+ for rows.Next() {
+ var roomID string
+ if err := rows.Scan(&roomID); err != nil {
+ return nil, err
+ }
+ result = append(result, roomID)
+ }
+ return result, rows.Err()
+}
+
+func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
+ ctx context.Context, txn *sql.Tx, eventID string,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
+ _, err := stmt.ExecContext(ctx, eventID)
+ return err
+}
+
+func (s *currentRoomStateStatements) UpsertRoomState(
+ ctx context.Context, txn *sql.Tx,
+ event gomatrixserverlib.HeaderedEvent, membership *string,
+) error {
+ headeredJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ // upsert state event
+ stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
+ _, err = stmt.ExecContext(
+ ctx,
+ event.RoomID(),
+ event.EventID(),
+ event.Type(),
+ event.Sender(),
+ *event.StateKey(),
+ headeredJSON,
+ membership,
+ )
+ return err
+}
+
+func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]gomatrixserverlib.HeaderedEvent, error) {
+ stmt := sqlutil.TxStmt(txn, s.selectEventsWithEventIDsStmt)
+ rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
+ result := []gomatrixserverlib.HeaderedEvent{}
+ for rows.Next() {
+ var eventBytes []byte
+ if err := rows.Scan(&eventBytes); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
+ return nil, err
+ }
+ result = append(result, ev)
+ }
+ return result, rows.Err()
+}
+
+func (s *currentRoomStateStatements) SelectStateEvent(
+ ctx context.Context, roomID, evType, stateKey string,
+) (*gomatrixserverlib.HeaderedEvent, error) {
+ stmt := s.selectStateEventStmt
+ var res []byte
+ err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ var ev gomatrixserverlib.HeaderedEvent
+ if err = json.Unmarshal(res, &ev); err != nil {
+ return nil, err
+ }
+ return &ev, err
+}
diff --git a/currentstateserver/storage/postgres/storage.go b/currentstateserver/storage/postgres/storage.go
new file mode 100644
index 00000000..f8edb94e
--- /dev/null
+++ b/currentstateserver/storage/postgres/storage.go
@@ -0,0 +1,35 @@
+package postgres
+
+import (
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/currentstateserver/storage/shared"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+type Database struct {
+ shared.Database
+ db *sql.DB
+ sqlutil.PartitionOffsetStatements
+}
+
+// NewDatabase creates a new sync server database
+func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*Database, error) {
+ var d Database
+ var err error
+ if d.db, err = sqlutil.Open("postgres", dbDataSourceName, dbProperties); err != nil {
+ return nil, err
+ }
+ if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil {
+ return nil, err
+ }
+ currRoomState, err := NewPostgresCurrentRoomStateTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ d.Database = shared.Database{
+ DB: d.db,
+ CurrentRoomState: currRoomState,
+ }
+ return &d, nil
+}
diff --git a/currentstateserver/storage/shared/storage.go b/currentstateserver/storage/shared/storage.go
new file mode 100644
index 00000000..976190cb
--- /dev/null
+++ b/currentstateserver/storage/shared/storage.go
@@ -0,0 +1,65 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package shared
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/currentstateserver/storage/tables"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type Database struct {
+ DB *sql.DB
+ CurrentRoomState tables.CurrentRoomState
+}
+
+func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) {
+ return d.CurrentRoomState.SelectStateEvent(ctx, roomID, evType, stateKey)
+}
+
+func (d *Database) StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent,
+ removeStateEventIDs []string) error {
+ return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
+ for _, eventID := range removeStateEventIDs {
+ if err := d.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil {
+ return err
+ }
+ }
+
+ for _, event := range addStateEvents {
+ if event.StateKey() == nil {
+ // ignore non state events
+ continue
+ }
+ var membership *string
+ if event.Type() == "m.room.member" {
+ value, err := event.Membership()
+ if err != nil {
+ return err
+ }
+ membership = &value
+ }
+
+ if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+}
diff --git a/currentstateserver/storage/sqlite3/current_room_state_table.go b/currentstateserver/storage/sqlite3/current_room_state_table.go
new file mode 100644
index 00000000..c1819327
--- /dev/null
+++ b/currentstateserver/storage/sqlite3/current_room_state_table.go
@@ -0,0 +1,201 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "strings"
+
+ "github.com/matrix-org/dendrite/currentstateserver/storage/tables"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const currentRoomStateSchema = `
+-- Stores the current room state for every room.
+CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
+ room_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ sender TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ headered_event_json TEXT NOT NULL,
+ membership TEXT,
+ UNIQUE (room_id, type, state_key)
+);
+-- for event deletion
+CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
+-- for querying membership states of users
+-- CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
+`
+
+const upsertRoomStateSQL = "" +
+ "INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" +
+ " VALUES ($1, $2, $3, $4, $5, $6, $7)" +
+ " ON CONFLICT (event_id, room_id, type, sender)" +
+ " DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7"
+
+const deleteRoomStateByEventIDSQL = "" +
+ "DELETE FROM currentstate_current_room_state WHERE event_id = $1"
+
+const selectRoomIDsWithMembershipSQL = "" +
+ "SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
+
+const selectStateEventSQL = "" +
+ "SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
+
+const selectEventsWithEventIDsSQL = "" +
+ // TODO: The session_id and transaction_id blanks are here because otherwise
+ // the rowsToStreamEvents expects there to be exactly five columns. We need to
+ // figure out if these really need to be in the DB, and if so, we need a
+ // better permanent fix for this. - neilalexander, 2 Jan 2020
+ "SELECT added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
+ " FROM currentstate_current_room_state WHERE event_id IN ($1)"
+
+type currentRoomStateStatements struct {
+ upsertRoomStateStmt *sql.Stmt
+ deleteRoomStateByEventIDStmt *sql.Stmt
+ selectRoomIDsWithMembershipStmt *sql.Stmt
+ selectStateEventStmt *sql.Stmt
+}
+
+func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
+ s := &currentRoomStateStatements{}
+ _, err := db.Exec(currentRoomStateSchema)
+ if err != nil {
+ return nil, err
+ }
+ if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
+ return nil, err
+ }
+ if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
+ return nil, err
+ }
+ if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
+ return nil, err
+ }
+ if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
+func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
+ ctx context.Context,
+ txn *sql.Tx,
+ userID string,
+ membership string, // nolint: unparam
+) ([]string, error) {
+ stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
+ rows, err := stmt.QueryContext(ctx, userID, membership)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
+
+ var result []string
+ for rows.Next() {
+ var roomID string
+ if err := rows.Scan(&roomID); err != nil {
+ return nil, err
+ }
+ result = append(result, roomID)
+ }
+ return result, nil
+}
+
+func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
+ ctx context.Context, txn *sql.Tx, eventID string,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
+ _, err := stmt.ExecContext(ctx, eventID)
+ return err
+}
+
+func (s *currentRoomStateStatements) UpsertRoomState(
+ ctx context.Context, txn *sql.Tx,
+ event gomatrixserverlib.HeaderedEvent, membership *string,
+) error {
+ headeredJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ // upsert state event
+ stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
+ _, err = stmt.ExecContext(
+ ctx,
+ event.RoomID(),
+ event.EventID(),
+ event.Type(),
+ event.Sender(),
+ *event.StateKey(),
+ headeredJSON,
+ membership,
+ )
+ return err
+}
+
+func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) ([]gomatrixserverlib.HeaderedEvent, error) {
+ iEventIDs := make([]interface{}, len(eventIDs))
+ for k, v := range eventIDs {
+ iEventIDs[k] = v
+ }
+ query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
+ rows, err := txn.QueryContext(ctx, query, iEventIDs...)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
+ result := []gomatrixserverlib.HeaderedEvent{}
+ for rows.Next() {
+ var eventBytes []byte
+ if err := rows.Scan(&eventBytes); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
+ return nil, err
+ }
+ result = append(result, ev)
+ }
+ return result, nil
+}
+
+func (s *currentRoomStateStatements) SelectStateEvent(
+ ctx context.Context, roomID, evType, stateKey string,
+) (*gomatrixserverlib.HeaderedEvent, error) {
+ stmt := s.selectStateEventStmt
+ var res []byte
+ err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ var ev gomatrixserverlib.HeaderedEvent
+ if err = json.Unmarshal(res, &ev); err != nil {
+ return nil, err
+ }
+ return &ev, err
+}
diff --git a/currentstateserver/storage/sqlite3/storage.go b/currentstateserver/storage/sqlite3/storage.go
new file mode 100644
index 00000000..6975e40b
--- /dev/null
+++ b/currentstateserver/storage/sqlite3/storage.go
@@ -0,0 +1,39 @@
+package sqlite3
+
+import (
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/currentstateserver/storage/shared"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+type Database struct {
+ shared.Database
+ db *sql.DB
+ sqlutil.PartitionOffsetStatements
+}
+
+// NewDatabase creates a new sync server database
+// nolint: gocyclo
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var d Database
+ cs, err := sqlutil.ParseFileURI(dataSourceName)
+ if err != nil {
+ return nil, err
+ }
+ if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil {
+ return nil, err
+ }
+ if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil {
+ return nil, err
+ }
+ currRoomState, err := NewSqliteCurrentRoomStateTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ d.Database = shared.Database{
+ DB: d.db,
+ CurrentRoomState: currRoomState,
+ }
+ return &d, nil
+}
diff --git a/currentstateserver/storage/storage.go b/currentstateserver/storage/storage.go
new file mode 100644
index 00000000..ad04cf41
--- /dev/null
+++ b/currentstateserver/storage/storage.go
@@ -0,0 +1,41 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !wasm
+
+package storage
+
+import (
+ "net/url"
+
+ "github.com/matrix-org/dendrite/currentstateserver/storage/postgres"
+ "github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+// NewDatabase opens a database connection.
+func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
+ return postgres.NewDatabase(dataSourceName, dbProperties)
+ }
+ switch uri.Scheme {
+ case "postgres":
+ return postgres.NewDatabase(dataSourceName, dbProperties)
+ case "file":
+ return sqlite3.NewDatabase(dataSourceName)
+ default:
+ return postgres.NewDatabase(dataSourceName, dbProperties)
+ }
+}
diff --git a/currentstateserver/storage/storage_wasm.go b/currentstateserver/storage/storage_wasm.go
new file mode 100644
index 00000000..aa46c44d
--- /dev/null
+++ b/currentstateserver/storage/storage_wasm.go
@@ -0,0 +1,42 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "fmt"
+ "net/url"
+
+ "github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+// NewDatabase opens a database connection.
+func NewDatabase(
+ dataSourceName string,
+ dbProperties sqlutil.DbProperties, // nolint:unparam
+) (Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
+ return nil, fmt.Errorf("Cannot use postgres implementation")
+ }
+ switch uri.Scheme {
+ case "postgres":
+ return nil, fmt.Errorf("Cannot use postgres implementation")
+ case "file":
+ return sqlite3.NewDatabase(dataSourceName)
+ default:
+ return nil, fmt.Errorf("Cannot use postgres implementation")
+ }
+}
diff --git a/currentstateserver/storage/tables/interface.go b/currentstateserver/storage/tables/interface.go
new file mode 100644
index 00000000..d2e560a2
--- /dev/null
+++ b/currentstateserver/storage/tables/interface.go
@@ -0,0 +1,31 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tables
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type CurrentRoomState interface {
+ SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
+ SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
+ UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string) error
+ DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
+ // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
+ SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
+}
diff --git a/dendrite-config.yaml b/dendrite-config.yaml
index 73bfec24..70c8f795 100644
--- a/dendrite-config.yaml
+++ b/dendrite-config.yaml
@@ -121,6 +121,7 @@ database:
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
+ current_state: "postgres://dendrite:itsasecret@localhost/dendrite_currentstate?sslmode=disable"
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1
@@ -143,6 +144,7 @@ listen:
key_server: "localhost:7779"
server_key_api: "localhost:7780"
user_api: "localhost:7781"
+ current_state_server: "localhost:7782"
# The configuration for tracing the dendrite components.
tracing:
diff --git a/internal/config/config.go b/internal/config/config.go
index baa82be2..8275fc47 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -160,10 +160,13 @@ type Dendrite struct {
// Postgres Config
Database struct {
// The Account database stores the login details and account information
- // for local users. It is accessed by the ClientAPI.
+ // for local users. It is accessed by the UserAPI.
Account DataSource `yaml:"account"`
+ // The CurrentState database stores the current state of all rooms.
+ // It is accessed by the CurrentStateServer.
+ CurrentState DataSource `yaml:"current_state"`
// The Device database stores session information for the devices of logged
- // in local users. It is accessed by the ClientAPI, the MediaAPI and the SyncAPI.
+ // in local users. It is accessed by the UserAPI.
Device DataSource `yaml:"device"`
// The MediaAPI database stores information about files uploaded and downloaded
// by local users. It is only accessed by the MediaAPI.
@@ -222,6 +225,7 @@ type Dendrite struct {
Bind struct {
MediaAPI Address `yaml:"media_api"`
ClientAPI Address `yaml:"client_api"`
+ CurrentState Address `yaml:"current_state_server"`
FederationAPI Address `yaml:"federation_api"`
ServerKeyAPI Address `yaml:"server_key_api"`
AppServiceAPI Address `yaml:"appservice_api"`
@@ -238,6 +242,7 @@ type Dendrite struct {
Listen struct {
MediaAPI Address `yaml:"media_api"`
ClientAPI Address `yaml:"client_api"`
+ CurrentState Address `yaml:"current_state_server"`
FederationAPI Address `yaml:"federation_api"`
ServerKeyAPI Address `yaml:"server_key_api"`
AppServiceAPI Address `yaml:"appservice_api"`
@@ -601,6 +606,7 @@ func (config *Dendrite) checkDatabase(configErrs *configErrors) {
checkNotEmpty(configErrs, "database.media_api", string(config.Database.MediaAPI))
checkNotEmpty(configErrs, "database.sync_api", string(config.Database.SyncAPI))
checkNotEmpty(configErrs, "database.room_server", string(config.Database.RoomServer))
+ checkNotEmpty(configErrs, "database.current_state", string(config.Database.CurrentState))
}
// checkListen verifies the parameters listen.* are valid.
@@ -613,6 +619,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer))
checkNotEmpty(configErrs, "listen.server_key_api", string(config.Listen.EDUServer))
checkNotEmpty(configErrs, "listen.user_api", string(config.Listen.UserAPI))
+ checkNotEmpty(configErrs, "listen.current_state_server", string(config.Listen.CurrentState))
}
// checkLogging verifies the parameters logging.* are valid.
@@ -735,6 +742,15 @@ func (config *Dendrite) UserAPIURL() string {
return "http://" + string(config.Listen.UserAPI)
}
+// CurrentStateAPIURL returns an HTTP URL for where the currentstateserver is listening.
+func (config *Dendrite) CurrentStateAPIURL() string {
+ // Hard code the currentstateserver to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return "http://" + string(config.Listen.CurrentState)
+}
+
// EDUServerURL returns an HTTP URL for where the EDU server is listening.
func (config *Dendrite) EDUServerURL() string {
// Hard code the EDU server to talk HTTP for now.
@@ -753,7 +769,7 @@ func (config *Dendrite) FederationSenderURL() string {
return "http://" + string(config.Listen.FederationSender)
}
-// FederationSenderURL returns an HTTP URL for where the federation sender is listening.
+// ServerKeyAPIURL returns an HTTP URL for where the federation sender is listening.
func (config *Dendrite) ServerKeyAPIURL() string {
// Hard code the server key API server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
diff --git a/internal/config/config_test.go b/internal/config/config_test.go
index 9a543e76..9b776a50 100644
--- a/internal/config/config_test.go
+++ b/internal/config/config_test.go
@@ -55,6 +55,7 @@ database:
sync_api: "postgresql:///syn_api"
room_server: "postgresql:///room_server"
appservice: "postgresql:///appservice"
+ current_state: "postgresql:///current_state"
listen:
room_server: "localhost:7770"
client_api: "localhost:7771"
@@ -64,6 +65,7 @@ listen:
appservice_api: "localhost:7777"
edu_server: "localhost:7778"
user_api: "localhost:7779"
+ current_state_server: "localhost:7775"
logging:
- type: "file"
level: "info"