aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--INSTALL.md13
-rw-r--r--dendrite-config.yaml1
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/README.md6
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/appservice.go32
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go79
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go248
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/storage/storage.go110
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go52
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/types/types.go46
-rw-r--r--src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go234
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go2
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go4
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/routing/register.go14
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-appservice-server/main.go (renamed from src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go)2
-rw-r--r--src/github.com/matrix-org/dendrite/common/config/appservice.go8
-rw-r--r--src/github.com/matrix-org/dendrite/common/config/config.go9
-rw-r--r--vendor/manifest2
-rw-r--r--vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go15
-rw-r--r--vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go3
-rw-r--r--vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go6
20 files changed, 806 insertions, 80 deletions
diff --git a/INSTALL.md b/INSTALL.md
index a7e2d835..ee3c6f1e 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -72,7 +72,7 @@ Dendrite requires a postgres database engine, version 9.5 or later.
```
* Create databases:
```bash
- for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do
+ for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do
sudo -u postgres createdb -O dendrite dendrite_$i
done
```
@@ -253,3 +253,14 @@ you want to support federation.
```bash
./bin/dendrite-federation-sender-server --config dendrite.yaml
```
+
+### Run an appservice server
+
+This sends events from the network to [application
+services](https://matrix.org/docs/spec/application_service/unstable.html)
+running locally. This is only required if you want to support running
+application services on your homeserver.
+
+```bash
+./bin/dendrite-appservice-server --config dendrite.yaml
+```
diff --git a/dendrite-config.yaml b/dendrite-config.yaml
index ae926bab..44441787 100644
--- a/dendrite-config.yaml
+++ b/dendrite-config.yaml
@@ -97,6 +97,7 @@ database:
room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable"
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
+ appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
# If using naffka you need to specify a naffka database
# naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable"
diff --git a/src/github.com/matrix-org/dendrite/appservice/README.md b/src/github.com/matrix-org/dendrite/appservice/README.md
index 5b00386d..d7555744 100644
--- a/src/github.com/matrix-org/dendrite/appservice/README.md
+++ b/src/github.com/matrix-org/dendrite/appservice/README.md
@@ -2,9 +2,9 @@
This component interfaces with external [Application
Services](https://matrix.org/docs/spec/application_service/unstable.html).
-This includes any HTTP endpoints that Application Services call, as well as talking
-to any HTTP endpoints that Application Services provide themselves.
+This includes any HTTP endpoints that application services call, as well as talking
+to any HTTP endpoints that application services provide themselves.
## Consumers
-This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services. \ No newline at end of file
+This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services. \ No newline at end of file
diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go
index 9caf70fb..57b127f2 100644
--- a/src/github.com/matrix-org/dendrite/appservice/appservice.go
+++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go
@@ -15,8 +15,13 @@
package appservice
import (
+ "sync"
+
"github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/routing"
+ "github.com/matrix-org/dendrite/appservice/storage"
+ "github.com/matrix-org/dendrite/appservice/types"
+ "github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/transactions"
@@ -35,13 +40,38 @@ func SetupAppServiceAPIComponent(
queryAPI api.RoomserverQueryAPI,
transactionsCache *transactions.Cache,
) {
+ // Create a connection to the appservice postgres DB
+ appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService))
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to appservice db")
+ }
+
+ // Wrap application services in a type that relates the application service and
+ // a sync.Cond object that can be used to notify workers when there are new
+ // events to be sent out.
+ workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
+ for i, appservice := range base.Cfg.Derived.ApplicationServices {
+ m := sync.Mutex{}
+ ws := types.ApplicationServiceWorkerState{
+ AppService: appservice,
+ Cond: sync.NewCond(&m),
+ }
+ workerStates[i] = ws
+ }
+
consumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI,
+ base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
+ queryAPI, aliasAPI, workerStates,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
}
+ // Create application service transaction workers
+ if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
+ logrus.WithError(err).Panicf("failed to start app service transaction workers")
+ }
+
// Set up HTTP Endpoints
routing.Setup(
base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB,
diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go
index a934bf44..bc1d3bf2 100644
--- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go
+++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go
@@ -17,8 +17,9 @@ package consumers
import (
"context"
"encoding/json"
- "fmt"
+ "github.com/matrix-org/dendrite/appservice/storage"
+ "github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
@@ -29,29 +30,28 @@ import (
sarama "gopkg.in/Shopify/sarama.v1"
)
-var (
- appServices []config.ApplicationService
-)
-
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *accounts.Database
+ asDB *storage.Database
query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI
serverName string
+ workerStates []types.ApplicationServiceWorkerState
}
-// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
+// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
+// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
store *accounts.Database,
+ appserviceDB *storage.Database,
queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
+ workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
- appServices = cfg.Derived.ApplicationServices
-
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
@@ -60,9 +60,11 @@ func NewOutputRoomEventConsumer(
s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer,
db: store,
+ asDB: appserviceDB,
query: queryAPI,
alias: aliasAPI,
serverName: string(cfg.Matrix.ServerName),
+ workerStates: workerStates,
}
consumer.ProcessMessage = s.onMessage
@@ -74,9 +76,8 @@ func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
-// onMessage is called when the sync server receives a new event from the room server output log.
-// It is not safe for this function to be called from multiple goroutines, or else the
-// sync stream position may race and be incorrectly calculated.
+// onMessage is called when the appservice component receives a new event from
+// the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
@@ -98,50 +99,37 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
- }).Info("appservice received event from roomserver")
+ }).Info("appservice received an event from roomserver")
- events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
+ missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil {
return err
}
+ events := append(missingEvents, ev)
- // Create a context to thread through the whole filtering process
- ctx := context.TODO()
-
- if err = s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs); err != nil {
- return err
- }
-
- // Check if any events need to passed on to external application services
- return s.filterRoomserverEvents(ctx, append(events, ev))
+ // Send event to any relevant application services
+ return s.filterRoomserverEvents(context.TODO(), events)
}
-// lookupStateEvents looks up the state events that are added by a new event.
-func (s *OutputRoomEventConsumer) lookupStateEvents(
+// lookupMissingStateEvents looks up the state events that are added by a new event,
+// and returns any not already present.
+func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
- // If the event is a membership update (e.g. for a profile update), it won't
- // show up in AddsStateEventIDs, so we need to add it manually
- if event.Type() == "m.room.member" {
- return []gomatrixserverlib.Event{event}, nil
- }
- return nil, nil
+ return []gomatrixserverlib.Event{}, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
- return []gomatrixserverlib.Event{event}, nil
+ return []gomatrixserverlib.Event{}, nil
}
result := []gomatrixserverlib.Event{}
missing := []string{}
for _, id := range addsStateEventIDs {
- // Append the current event in the results if its ID is in the events list
- if id == event.EventID() {
- result = append(result, event)
- } else {
+ if id != event.EventID() {
// If the event isn't the current one, add it to the list of events
// to retrieve from the roomserver
missing = append(missing, id)
@@ -165,13 +153,22 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// each namespace of each registered application service, and if there is a
// match, adds the event to the queue for events to be sent to a particular
// application service.
-func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, events []gomatrixserverlib.Event) error {
- for _, event := range events {
- for _, appservice := range appServices {
+func (s *OutputRoomEventConsumer) filterRoomserverEvents(
+ ctx context.Context,
+ events []gomatrixserverlib.Event,
+) error {
+ for _, ws := range s.workerStates {
+ for _, event := range events {
// Check if this event is interesting to this application service
- if s.appserviceIsInterestedInEvent(ctx, event, appservice) {
- // TODO: Queue this event to be sent off to the application service
- fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID())
+ if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
+ // Queue this event to be sent off to the application service
+ if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
+ log.WithError(err).Warn("failed to insert incoming event into appservices database")
+ } else {
+ // Tell our worker to send out new messages by updating remaining message
+ // count and waking them up with a broadcast
+ ws.NotifyNewEvents()
+ }
}
}
}
diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go
new file mode 100644
index 00000000..285bbf48
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go
@@ -0,0 +1,248 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+const appserviceEventsSchema = `
+-- Stores events to be sent to application services
+CREATE TABLE IF NOT EXISTS appservice_events (
+ -- An auto-incrementing id unique to each event in the table
+ id BIGSERIAL NOT NULL PRIMARY KEY,
+ -- The ID of the application service the event will be sent to
+ as_id TEXT NOT NULL,
+ -- JSON representation of the event
+ event_json TEXT NOT NULL,
+ -- The ID of the transaction that this event is a part of
+ txn_id BIGINT NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
+`
+
+const selectEventsByApplicationServiceIDSQL = "" +
+ "SELECT id, event_json, txn_id " +
+ "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
+
+const countEventsByApplicationServiceIDSQL = "" +
+ "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
+
+const insertEventSQL = "" +
+ "INSERT INTO appservice_events(as_id, event_json, txn_id) " +
+ "VALUES ($1, $2, $3)"
+
+const updateTxnIDForEventsSQL = "" +
+ "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
+
+const deleteEventsBeforeAndIncludingIDSQL = "" +
+ "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
+
+const (
+ // A transaction ID number that no transaction should ever have. Used for
+ // checking again the default value.
+ invalidTxnID = -2
+)
+
+type eventsStatements struct {
+ selectEventsByApplicationServiceIDStmt *sql.Stmt
+ countEventsByApplicationServiceIDStmt *sql.Stmt
+ insertEventStmt *sql.Stmt
+ updateTxnIDForEventsStmt *sql.Stmt
+ deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
+}
+
+func (s *eventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(appserviceEventsSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
+ return
+ }
+ if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
+ return
+ }
+ if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectEventsByApplicationServiceID takes in an application service ID and
+// returns a slice of events that need to be sent to that application service,
+// as well as an int later used to remove these same events from the database
+// once successfully sent to an application service.
+func (s *eventsStatements) selectEventsByApplicationServiceID(
+ ctx context.Context,
+ applicationServiceID string,
+ limit int,
+) (
+ txnID, maxID int,
+ events []gomatrixserverlib.Event,
+ eventsRemaining bool,
+ err error,
+) {
+ // Retrieve events from the database. Unsuccessfully sent events first
+ eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
+ if err != nil {
+ return
+ }
+ defer func() {
+ err = eventRows.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": applicationServiceID,
+ }).WithError(err).Fatalf("appservice unable to select new events to send")
+ }
+ }()
+ events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
+ // Get current time for use in calculating event age
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+
+ // Iterate through each row and store event contents
+ // If txn_id changes dramatically, we've switched from collecting old events to
+ // new ones. Send back those events first.
+ lastTxnID := invalidTxnID
+ for eventsProcessed := 0; eventRows.Next(); {
+ var event gomatrixserverlib.Event
+ var eventJSON []byte
+ var id int
+ err = eventRows.Scan(
+ &id,
+ &eventJSON,
+ &txnID,
+ )
+ if err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // Unmarshal eventJSON
+ if err = json.Unmarshal(eventJSON, &event); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // If txnID has changed on this event from the previous event, then we've
+ // reached the end of a transaction's events. Return only those events.
+ if lastTxnID > invalidTxnID && lastTxnID != txnID {
+ return events, maxID, lastTxnID, true, nil
+ }
+ lastTxnID = txnID
+
+ // Limit events that aren't part of an old transaction
+ if txnID == -1 {
+ // Return if we've hit the limit
+ if eventsProcessed++; eventsProcessed > limit {
+ return events, maxID, lastTxnID, true, nil
+ }
+ }
+
+ if id > maxID {
+ maxID = id
+ }
+
+ // Portion of the event that is unsigned due to rapid change
+ // TODO: Consider removing age as not many app services use it
+ if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ events = append(events, event)
+ }
+
+ return
+}
+
+// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) countEventsByApplicationServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ var count int
+ err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
+ if err != nil && err != sql.ErrNoRows {
+ return 0, err
+ }
+
+ return count, nil
+}
+
+// insertEvent inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) insertEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) (err error) {
+ // Convert event to JSON before inserting
+ eventJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ _, err = s.insertEventStmt.ExecContext(
+ ctx,
+ appServiceID,
+ eventJSON,
+ -1, // No transaction ID yet
+ )
+ return
+}
+
+// updateTxnIDForEvents sets the transactionID for a collection of events. Done
+// before sending them to an AppService. Referenced before sending to make sure
+// we aren't constructing multiple transactions with the same events.
+func (s *eventsStatements) updateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) (err error) {
+ _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
+ return
+}
+
+// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
+func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) (err error) {
+ _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
+ return
+}
diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go
new file mode 100644
index 00000000..b68989fb
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go
@@ -0,0 +1,110 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ // Import postgres database driver
+ _ "github.com/lib/pq"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// Database stores events intended to be later sent to application services
+type Database struct {
+ events eventsStatements
+ txnID txnStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ if err := d.events.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.txnID.prepare(d.db)
+}
+
+// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
+// for a transaction worker to pull and later send to an application service.
+func (d *Database) StoreEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) error {
+ return d.events.insertEvent(ctx, appServiceID, event)
+}
+
+// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
+// be sent to an application service given its ID.
+func (d *Database) GetEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+ limit int,
+) (int, int, []gomatrixserverlib.Event, bool, error) {
+ return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
+}
+
+// CountEventsWithAppServiceID returns the number of events destined for an
+// application service given its ID.
+func (d *Database) CountEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
+}
+
+// UpdateTxnIDForEvents takes in an application service ID and a
+// and stores them in the DB, unless the pair already exists, in
+// which case it updates them.
+func (d *Database) UpdateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) error {
+ return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
+}
+
+// RemoveEventsBeforeAndIncludingID removes all events from the database that
+// are less than or equal to a given maximum ID. IDs here are implemented as a
+// serial, thus this should always delete events in chronological order.
+func (d *Database) RemoveEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) error {
+ return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
+}
+
+// GetLatestTxnID returns the latest available transaction id
+func (d *Database) GetLatestTxnID(
+ ctx context.Context,
+) (int, error) {
+ return d.txnID.selectTxnID(ctx)
+}
diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go
new file mode 100644
index 00000000..7b0fa378
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go
@@ -0,0 +1,52 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+)
+
+const txnIDSchema = `
+-- Keeps a count of the current transaction ID
+CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1;
+`
+
+const selectTxnIDSQL = "SELECT nextval('txn_id_counter')"
+
+type txnStatements struct {
+ selectTxnIDStmt *sql.Stmt
+}
+
+func (s *txnStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(txnIDSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectTxnID selects the latest ascending transaction ID
+func (s *txnStatements) selectTxnID(
+ ctx context.Context,
+) (txnID int, err error) {
+ err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
+ return
+}
diff --git a/src/github.com/matrix-org/dendrite/appservice/types/types.go b/src/github.com/matrix-org/dendrite/appservice/types/types.go
index 3e702165..aac73155 100644
--- a/src/github.com/matrix-org/dendrite/appservice/types/types.go
+++ b/src/github.com/matrix-org/dendrite/appservice/types/types.go
@@ -12,7 +12,53 @@
package types
+import (
+ "sync"
+
+ "github.com/matrix-org/dendrite/common/config"
+)
+
const (
// AppServiceDeviceID is the AS dummy device ID
AppServiceDeviceID = "AS_Device"
)
+
+// ApplicationServiceWorkerState is a type that couples an application service,
+// a lockable condition as well as some other state variables, allowing the
+// roomserver to notify appservice workers when there are events ready to send
+// externally to application services.
+type ApplicationServiceWorkerState struct {
+ AppService config.ApplicationService
+ Cond *sync.Cond
+ // Events ready to be sent
+ EventsReady bool
+ // Backoff exponent (2^x secs). Max 6, aka 64s.
+ Backoff int
+}
+
+// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
+// in the event queue for this application service worker.
+func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
+ a.Cond.L.Lock()
+ a.EventsReady = true
+ a.Cond.Broadcast()
+ a.Cond.L.Unlock()
+}
+
+// FinishEventProcessing marks all events of this worker as being sent to the
+// application service.
+func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
+ a.Cond.L.Lock()
+ a.EventsReady = false
+ a.Cond.L.Unlock()
+}
+
+// WaitForNewEvents causes the calling goroutine to wait on the worker state's
+// condition for a broadcast or similar wakeup, if there are no events ready.
+func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
+ a.Cond.L.Lock()
+ if !a.EventsReady {
+ a.Cond.Wait()
+ }
+ a.Cond.L.Unlock()
+}
diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go
new file mode 100644
index 00000000..8f966c94
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go
@@ -0,0 +1,234 @@
+// Copyright 2018 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package workers
+
+import (
+ "bytes"
+ "context"
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "math"
+ "net/http"
+ "time"
+
+ "github.com/matrix-org/dendrite/appservice/storage"
+ "github.com/matrix-org/dendrite/appservice/types"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+var (
+ // Maximum size of events sent in each transaction.
+ transactionBatchSize = 50
+ // Timeout for sending a single transaction to an application service.
+ transactionTimeout = time.Second * 60
+)
+
+// SetupTransactionWorkers spawns a separate goroutine for each application
+// service. Each of these "workers" handle taking all events intended for their
+// app service, batch them up into a single transaction (up to a max transaction
+// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
+// handles exponentially backing off in case the AS isn't currently available.
+func SetupTransactionWorkers(
+ appserviceDB *storage.Database,
+ workerStates []types.ApplicationServiceWorkerState,
+) error {
+ // Create a worker that handles transmitting events to a single homeserver
+ for _, workerState := range workerStates {
+ // Don't create a worker if this AS doesn't want to receive events
+ if workerState.AppService.URL != "" {
+ go worker(appserviceDB, workerState)
+ }
+ }
+ return nil
+}
+
+// worker is a goroutine that sends any queued events to the application service
+// it is given.
+func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).Info("starting application service")
+ ctx := context.Background()
+
+ // Grab the HTTP client for sending requests to app services
+ client := &http.Client{
+ Timeout: transactionTimeout,
+ // TODO: Verify certificates
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true, // nolint: gas
+ },
+ },
+ }
+
+ // Initial check for any leftover events to send from last time
+ eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("appservice worker unable to read queued events from DB")
+ return
+ }
+ if eventCount > 0 {
+ ws.NotifyNewEvents()
+ }
+
+ // Loop forever and keep waiting for more events to send
+ for {
+ // Wait for more events if we've sent all the events in the database
+ ws.WaitForNewEvents()
+
+ // Batch events up into a transaction
+ transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("appservice worker unable to create transaction")
+
+ return
+ }
+
+ // Send the events off to the application service
+ // Backoff if the application service does not respond
+ err = send(client, ws.AppService, txnID, transactionJSON)
+ if err != nil {
+ // Backoff
+ backoff(&ws, err)
+ continue
+ }
+
+ // We sent successfully, hooray!
+ ws.Backoff = 0
+
+ // Transactions have a maximum event size, so there may still be some events
+ // left over to send. Keep sending until none are left
+ if !eventsRemaining {
+ ws.FinishEventProcessing()
+ }
+
+ // Remove sent events from the DB
+ err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("unable to remove appservice events from the database")
+ return
+ }
+ }
+}
+
+// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
+func backoff(ws *types.ApplicationServiceWorkerState, err error) {
+ // Calculate how long to backoff for
+ backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
+ backoffSeconds := time.Second * backoffDuration
+
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
+ backoffDuration)
+
+ ws.Backoff++
+ if ws.Backoff > 6 {
+ ws.Backoff = 6
+ }
+
+ // Backoff
+ time.Sleep(backoffSeconds)
+}
+
+// createTransaction takes in a slice of AS events, stores them in an AS
+// transaction, and JSON-encodes the results.
+func createTransaction(
+ ctx context.Context,
+ db *storage.Database,
+ appserviceID string,
+) (
+ transactionJSON []byte,
+ txnID, maxID int,
+ eventsRemaining bool,
+ err error,
+) {
+ // Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
+ txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": appserviceID,
+ }).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
+
+ return
+ }
+
+ // Check if these events do not already have a transaction ID
+ if txnID == -1 {
+ // If not, grab next available ID from the DB
+ txnID, err = db.GetLatestTxnID(ctx)
+ if err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // Mark new events with current transactionID
+ if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
+ return nil, 0, 0, false, err
+ }
+ }
+
+ // Create a transaction and store the events inside
+ transaction := gomatrixserverlib.ApplicationServiceTransaction{
+ Events: events,
+ }
+
+ transactionJSON, err = json.Marshal(transaction)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+// send sends events to an application service. Returns an error if an OK was not
+// received back from the application service or the request timed out.
+func send(
+ client *http.Client,
+ appservice config.ApplicationService,
+ txnID int,
+ transaction []byte,
+) error {
+ // POST a transaction to our AS
+ address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
+ resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
+ if err != nil {
+ return err
+ }
+ defer func() {
+ err := resp.Body.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": appservice.ID,
+ }).WithError(err).Error("unable to close response body from application service")
+ }
+ }()
+
+ // Check the AS received the events correctly
+ if resp.StatusCode != http.StatusOK {
+ // TODO: Handle non-200 error codes from application services
+ return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
+ }
+
+ return nil
+}
diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go
index 4ed54f95..e86654ec 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go
@@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS account_accounts (
created_ts BIGINT NOT NULL,
-- The password hash for this account. Can be NULL if this is a passwordless account.
password_hash TEXT,
- -- Identifies which Application Service this account belongs to, if any.
+ -- Identifies which application service this account belongs to, if any.
appservice_id TEXT
-- TODO:
-- is_guest, is_admin, upgraded_ts, devices, any email reset stuff?
diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go
index 7683a427..7032fe7b 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go
@@ -138,9 +138,9 @@ func (d *Database) UpdateDevice(
}
// RemoveDevice revokes a device by deleting the entry in the database
-// matching with the given device ID and user ID localpart
+// matching with the given device ID and user ID localpart.
// If the device doesn't exist, it will not return an error
-// If something went wrong during the deletion, it will return the SQL error
+// If something went wrong during the deletion, it will return the SQL error.
func (d *Database) RemoveDevice(
ctx context.Context, deviceID, localpart string,
) error {
diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go
index 63cb013d..4949dc01 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go
@@ -115,7 +115,7 @@ type registerRequest struct {
InitialDisplayName *string `json:"initial_device_display_name"`
- // Application Services place Type in the root of their registration
+ // Application services place Type in the root of their registration
// request, whereas clients place it in the authDict struct.
Type authtypes.LoginType `json:"type"`
}
@@ -281,16 +281,16 @@ func validateRecaptcha(
}
// UsernameIsWithinApplicationServiceNamespace checks to see if a username falls
-// within any of the namespaces of a given Application Service. If no
-// Application Service is given, it will check to see if it matches any
-// Application Service's namespace.
+// within any of the namespaces of a given application service. If no
+// application service is given, it will check to see if it matches any
+// application service's namespace.
func UsernameIsWithinApplicationServiceNamespace(
cfg *config.Dendrite,
username string,
appservice *config.ApplicationService,
) bool {
if appservice != nil {
- // Loop through given Application Service's namespaces and see if any match
+ // Loop through given application service's namespaces and see if any match
for _, namespace := range appservice.NamespaceMap["users"] {
// AS namespaces are checked for validity in config
if namespace.RegexpObject.MatchString(username) {
@@ -300,7 +300,7 @@ func UsernameIsWithinApplicationServiceNamespace(
return false
}
- // Loop through all known Application Service's namespaces and see if any match
+ // Loop through all known application service's namespaces and see if any match
for _, knownAppservice := range cfg.Derived.ApplicationServices {
for _, namespace := range knownAppservice.NamespaceMap["users"] {
// AS namespaces are checked for validity in config
@@ -509,7 +509,7 @@ func handleRegistrationFlow(
sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret)
case authtypes.LoginTypeApplicationService:
- // Check Application Service register user request is valid.
+ // Check application service register user request is valid.
// The application service's ID is returned if so.
appserviceID, err := validateApplicationService(cfg, req, r.Username)
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-appservice-server/main.go
index 3c537bea..347a0446 100644
--- a/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-appservice-server/main.go
@@ -22,7 +22,7 @@ import (
func main() {
cfg := basecomponent.ParseFlags()
- base := basecomponent.NewBaseDendrite(cfg, "AppService")
+ base := basecomponent.NewBaseDendrite(cfg, "AppServiceAPI")
defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB()
diff --git a/src/github.com/matrix-org/dendrite/common/config/appservice.go b/src/github.com/matrix-org/dendrite/common/config/appservice.go
index 86bc92c1..0333b338 100644
--- a/src/github.com/matrix-org/dendrite/common/config/appservice.go
+++ b/src/github.com/matrix-org/dendrite/common/config/appservice.go
@@ -123,8 +123,6 @@ func setupRegexps(cfg *Dendrite) (err error) {
}
}
- fmt.Println(exclusiveUsernameStrings, exclusiveAliasStrings)
-
// Join the regexes together into one big regex.
// i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
// Later we can check if a username or alias matches any exclusive regex and
@@ -194,13 +192,13 @@ func checkErrors(config *Dendrite) (err error) {
// can have the same ID or token.
if idMap[appservice.ID] {
return configErrors([]string{fmt.Sprintf(
- "Application Service ID %s must be unique", appservice.ID,
+ "Application service ID %s must be unique", appservice.ID,
)})
}
// Check if we've already seen this token
if tokenMap[appservice.ASToken] {
return configErrors([]string{fmt.Sprintf(
- "Application Service Token %s must be unique", appservice.ASToken,
+ "Application service Token %s must be unique", appservice.ASToken,
)})
}
@@ -216,7 +214,7 @@ func checkErrors(config *Dendrite) (err error) {
// namespace, which often ends up in an application service receiving events
// it doesn't want, as an empty regex will match all events.
return configErrors([]string{fmt.Sprintf(
- "Application Service namespace can only contain a single regex tuple. Check your YAML.",
+ "Application service namespace can only contain a single regex tuple. Check your YAML.",
)})
}
}
diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go
index 8bbac80c..bd6e361d 100644
--- a/src/github.com/matrix-org/dendrite/common/config/config.go
+++ b/src/github.com/matrix-org/dendrite/common/config/config.go
@@ -162,6 +162,9 @@ type Dendrite struct {
// The FederationSender database stores information used by the FederationSender
// It is only accessed by the FederationSender.
FederationSender DataSource `yaml:"federation_sender"`
+ // The AppServices database stores information used by the AppService component.
+ // It is only accessed by the AppService component.
+ AppService DataSource `yaml:"appservice"`
// The PublicRoomsAPI database stores information used to compute the public
// room directory. It is only accessed by the PublicRoomsAPI server.
PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
@@ -231,15 +234,15 @@ type Dendrite struct {
Params map[string]interface{} `json:"params"`
}
- // Application Services parsed from their config files
+ // Application services parsed from their config files
// The paths of which were given above in the main config file
ApplicationServices []ApplicationService
- // Meta-regexes compiled from all exclusive Application Service
+ // Meta-regexes compiled from all exclusive application service
// Regexes.
//
// When a user registers, we check that their username does not match any
- // exclusive Application Service namespaces
+ // exclusive application service namespaces
ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
// When a user creates a room alias, we check that it isn't already
// reserved by an application service
diff --git a/vendor/manifest b/vendor/manifest
index 1bd43957..71b834ee 100644
--- a/vendor/manifest
+++ b/vendor/manifest
@@ -135,7 +135,7 @@
{
"importpath": "github.com/matrix-org/gomatrixserverlib",
"repository": "https://github.com/matrix-org/gomatrixserverlib",
- "revision": "38a4f0f648bf357adc4bdb601cdc0535cee14e21",
+ "revision": "929828872b51e6733166553d6b1a20155b6ab829",
"branch": "master"
},
{
diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go
index 18e51b46..a6752794 100644
--- a/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go
+++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go
@@ -15,21 +15,8 @@
package gomatrixserverlib
-// ApplicationServiceEvent is an event format that is sent off to an
-// application service as part of a transaction.
-type ApplicationServiceEvent struct {
- Age int64 `json:"age,omitempty"`
- Content RawJSON `json:"content,omitempty"`
- EventID string `json:"event_id,omitempty"`
- OriginServerTimestamp int64 `json:"origin_server_ts,omitempty"`
- RoomID string `json:"room_id,omitempty"`
- Sender string `json:"sender,omitempty"`
- Type string `json:"type,omitempty"`
- UserID string `json:"user_id,omitempty"`
-}
-
// ApplicationServiceTransaction is the transaction that is sent off to an
// application service.
type ApplicationServiceTransaction struct {
- Events []ApplicationServiceEvent `json:"events"`
+ Events []Event `json:"events"`
}
diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go
index ad4e7751..97b966b6 100644
--- a/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go
+++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go
@@ -261,6 +261,9 @@ func newPowerLevelContentFromAuthEvents(authEvents AuthEventProvider, creatorUse
// If there is no power level event then the creator gets level 100
// https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L569
c.userLevels = map[string]int64{creatorUserID: 100}
+ // If there is no power level event then the state_default is level 0
+ // https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L997
+ c.stateDefaultLevel = 0
return
}
diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go
index c2adcc0c..5d61521b 100644
--- a/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go
+++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go
@@ -37,6 +37,12 @@ type RespState struct {
AuthEvents []Event `json:"auth_chain"`
}
+// A RespEventAuth is the content of a response to GET /_matrix/federation/v1/event_auth/{roomID}/{eventID}
+type RespEventAuth struct {
+ // A list of events needed to authenticate the state events.
+ AuthEvents []Event `json:"auth_chain"`
+}
+
// Events combines the auth events and the state events and returns
// them in an order where every event comes after its auth events.
// Each event will only appear once in the output list.