aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-01-03 14:07:05 +0000
committerGitHub <noreply@github.com>2020-01-03 14:07:05 +0000
commitc28577ea25d4f9c82110450f7f371905c2750f71 (patch)
tree7eda596a39f4c6ef2fbb2fc3f6a7caeb4ccc77d1 /federationsender/storage
parent6cab62246816baf1fdd026744727cba9a7b21c28 (diff)
Implement storage interfaces (#841)
* Implement interfaces for federationsender storage * Implement interfaces for mediaapi storage * Implement interfaces for publicroomsapi storage * Implement interfaces for roomserver storage * Implement interfaces for syncapi storage * Implement interfaces for keydb storage * common.PartitionStorer in publicroomsapi interface * Update copyright notices
Diffstat (limited to 'federationsender/storage')
-rw-r--r--federationsender/storage/postgres/joined_hosts_table.go (renamed from federationsender/storage/joined_hosts_table.go)5
-rw-r--r--federationsender/storage/postgres/room_table.go (renamed from federationsender/storage/room_table.go)5
-rw-r--r--federationsender/storage/postgres/storage.go122
-rw-r--r--federationsender/storage/storage.go108
4 files changed, 144 insertions, 96 deletions
diff --git a/federationsender/storage/joined_hosts_table.go b/federationsender/storage/postgres/joined_hosts_table.go
index 5d652a1a..bd580e3b 100644
--- a/federationsender/storage/joined_hosts_table.go
+++ b/federationsender/storage/postgres/joined_hosts_table.go
@@ -1,4 +1,5 @@
-// Copyright 2017 Vector Creations Ltd
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package storage
+package postgres
import (
"context"
diff --git a/federationsender/storage/room_table.go b/federationsender/storage/postgres/room_table.go
index bb52b707..a64424b4 100644
--- a/federationsender/storage/room_table.go
+++ b/federationsender/storage/postgres/room_table.go
@@ -1,4 +1,5 @@
-// Copyright 2017 Vector Creations Ltd
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package storage
+package postgres
import (
"context"
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
new file mode 100644
index 00000000..c60f6dc5
--- /dev/null
+++ b/federationsender/storage/postgres/storage.go
@@ -0,0 +1,122 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/federationsender/types"
+)
+
+// Database stores information needed by the federation sender
+type Database struct {
+ joinedHostsStatements
+ roomStatements
+ common.PartitionOffsetStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ var err error
+
+ if err = d.joinedHostsStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ if err = d.roomStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
+}
+
+// UpdateRoom updates the joined hosts for a room and returns what the joined
+// hosts were before the update, or nil if this was a duplicate message.
+// This is called when we receive a message from kafka, so we pass in
+// oldEventID and newEventID to check that we haven't missed any messages or
+// this isn't a duplicate message.
+func (d *Database) UpdateRoom(
+ ctx context.Context,
+ roomID, oldEventID, newEventID string,
+ addHosts []types.JoinedHost,
+ removeHosts []string,
+) (joinedHosts []types.JoinedHost, err error) {
+ err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
+ err = d.insertRoom(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ if lastSentEventID == newEventID {
+ // We've handled this message before, so let's just ignore it.
+ // We can only get a duplicate for the last message we processed,
+ // so its enough just to compare the newEventID with lastSentEventID
+ return nil
+ }
+
+ if lastSentEventID != oldEventID {
+ return types.EventIDMismatchError{
+ DatabaseID: lastSentEventID, RoomServerID: oldEventID,
+ }
+ }
+
+ joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ for _, add := range addHosts {
+ err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
+ if err != nil {
+ return err
+ }
+ }
+ if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
+ return err
+ }
+ return d.updateRoom(ctx, txn, roomID, newEventID)
+ })
+ return
+}
+
+// GetJoinedHosts returns the currently joined hosts for room,
+// as known to federationserver.
+// Returns an error if something goes wrong.
+func (d *Database) GetJoinedHosts(
+ ctx context.Context, roomID string,
+) ([]types.JoinedHost, error) {
+ return d.selectJoinedHosts(ctx, roomID)
+}
diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go
index 3a0f8775..8cffdbf1 100644
--- a/federationsender/storage/storage.go
+++ b/federationsender/storage/storage.go
@@ -1,4 +1,4 @@
-// Copyright 2017 Vector Creations Ltd
+// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,106 +16,30 @@ package storage
import (
"context"
- "database/sql"
+ "errors"
+ "net/url"
"github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/types"
)
-// Database stores information needed by the federation sender
-type Database struct {
- joinedHostsStatements
- roomStatements
- common.PartitionOffsetStatements
- db *sql.DB
+type Database interface {
+ common.PartitionStorer
+ UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
+ GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
}
// NewDatabase opens a new database
-func NewDatabase(dataSourceName string) (*Database, error) {
- var result Database
- var err error
- if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+func NewDatabase(dataSourceName string) (Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
return nil, err
}
- if err = result.prepare(); err != nil {
- return nil, err
- }
- return &result, nil
-}
-
-func (d *Database) prepare() error {
- var err error
-
- if err = d.joinedHostsStatements.prepare(d.db); err != nil {
- return err
+ switch uri.Scheme {
+ case "postgres":
+ return postgres.NewDatabase(dataSourceName)
+ default:
+ return nil, errors.New("unknown schema")
}
-
- if err = d.roomStatements.prepare(d.db); err != nil {
- return err
- }
-
- return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
-}
-
-// UpdateRoom updates the joined hosts for a room and returns what the joined
-// hosts were before the update, or nil if this was a duplicate message.
-// This is called when we receive a message from kafka, so we pass in
-// oldEventID and newEventID to check that we haven't missed any messages or
-// this isn't a duplicate message.
-func (d *Database) UpdateRoom(
- ctx context.Context,
- roomID, oldEventID, newEventID string,
- addHosts []types.JoinedHost,
- removeHosts []string,
-) (joinedHosts []types.JoinedHost, err error) {
- err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
- err = d.insertRoom(ctx, txn, roomID)
- if err != nil {
- return err
- }
-
- lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
- if err != nil {
- return err
- }
-
- if lastSentEventID == newEventID {
- // We've handled this message before, so let's just ignore it.
- // We can only get a duplicate for the last message we processed,
- // so its enough just to compare the newEventID with lastSentEventID
- return nil
- }
-
- if lastSentEventID != oldEventID {
- return types.EventIDMismatchError{
- DatabaseID: lastSentEventID, RoomServerID: oldEventID,
- }
- }
-
- joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
- if err != nil {
- return err
- }
-
- for _, add := range addHosts {
- err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
- if err != nil {
- return err
- }
- }
- if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
- return err
- }
- return d.updateRoom(ctx, txn, roomID, newEventID)
- })
- return
-}
-
-// GetJoinedHosts returns the currently joined hosts for room,
-// as known to federationserver.
-// Returns an error if something goes wrong.
-func (d *Database) GetJoinedHosts(
- ctx context.Context, roomID string,
-) ([]types.JoinedHost, error) {
- return d.selectJoinedHosts(ctx, roomID)
}