aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/api/api.go16
-rw-r--r--federationsender/consumers/roomserver.go40
-rw-r--r--federationsender/internal/perform.go168
-rw-r--r--federationsender/internal/perform/join.go15
-rw-r--r--federationsender/inthttp/client.go14
-rw-r--r--federationsender/storage/interface.go11
-rw-r--r--federationsender/storage/postgres/inbound_peeks_table.go176
-rw-r--r--federationsender/storage/postgres/outbound_peeks_table.go176
-rw-r--r--federationsender/storage/postgres/storage.go28
-rw-r--r--federationsender/storage/shared/storage.go60
-rw-r--r--federationsender/storage/sqlite3/inbound_peeks_table.go176
-rw-r--r--federationsender/storage/sqlite3/outbound_peeks_table.go176
-rw-r--r--federationsender/storage/sqlite3/storage.go28
-rw-r--r--federationsender/storage/tables/interface.go18
-rw-r--r--federationsender/types/types.go20
15 files changed, 1092 insertions, 30 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index dfc2dd8a..a9ebedaf 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -62,6 +62,12 @@ type FederationSenderInternalAPI interface {
request *PerformJoinRequest,
response *PerformJoinResponse,
)
+ // Handle an instruction to peek a room on a remote server.
+ PerformOutboundPeek(
+ ctx context.Context,
+ request *PerformOutboundPeekRequest,
+ response *PerformOutboundPeekResponse,
+ ) error
// Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave(
ctx context.Context,
@@ -111,6 +117,16 @@ type PerformJoinResponse struct {
LastError *gomatrix.HTTPError
}
+type PerformOutboundPeekRequest struct {
+ RoomID string `json:"room_id"`
+ // The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
+ ServerNames types.ServerNames `json:"server_names"`
+}
+
+type PerformOutboundPeekResponse struct {
+ LastError *gomatrix.HTTPError
+}
+
type PerformLeaveRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 513919c6..846468fa 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -111,6 +111,14 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
return nil
}
+ case api.OutputTypeNewInboundPeek:
+ if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
+ log.WithFields(log.Fields{
+ "event": output.NewInboundPeek,
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: remote peek event failure")
+ return nil
+ }
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
@@ -121,6 +129,23 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
+// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
+// causing the federationsender to start sending messages to the peeking server
+func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error {
+
+ // FIXME: there's a race here - we should start /sending new peeked events
+ // atomically after the orp.LatestEventID to ensure there are no gaps between
+ // the peek beginning and the send stream beginning.
+ //
+ // We probably need to track orp.LatestEventID on the inbound peek, but it's
+ // unclear how we then use that to prevent the race when we start the send
+ // stream.
+ //
+ // This is making the tests flakey.
+
+ return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
+}
+
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
@@ -164,6 +189,10 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
return err
}
+ // TODO: do housekeeping to evict unrenewed peeking hosts
+
+ // TODO: implement query to let the fedapi check whether a given peek is live or not
+
// Send the event.
return s.queues.SendEvent(
ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
@@ -171,7 +200,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to
-// the room at the event.
+// the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because:
// 1) We shouldn't send messages to servers that weren't in the room.
// 2) If a server is kicked from the rooms it should still be told about the
@@ -222,6 +251,15 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
joined[joinedHost.ServerName] = true
}
+ // handle peeking hosts
+ inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID())
+ if err != nil {
+ return nil, err
+ }
+ for _, inboundPeek := range inboundPeeks {
+ joined[inboundPeek.ServerName] = true
+ }
+
var result []gomatrixserverlib.ServerName
for serverName, include := range joined {
if include {
diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go
index 45f33ff7..3adf8fc9 100644
--- a/federationsender/internal/perform.go
+++ b/federationsender/internal/perform.go
@@ -234,7 +234,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
respState, err := joinCtx.CheckSendJoinResponse(
- ctx, event, serverName, respMakeJoin, respSendJoin,
+ ctx, event, serverName, respSendJoin,
)
if err != nil {
logrus.WithFields(logrus.Fields{
@@ -266,6 +266,172 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
return nil
}
+// PerformOutboundPeekRequest implements api.FederationSenderInternalAPI
+func (r *FederationSenderInternalAPI) PerformOutboundPeek(
+ ctx context.Context,
+ request *api.PerformOutboundPeekRequest,
+ response *api.PerformOutboundPeekResponse,
+) error {
+ // Look up the supported room versions.
+ var supportedVersions []gomatrixserverlib.RoomVersion
+ for version := range version.SupportedRoomVersions() {
+ supportedVersions = append(supportedVersions, version)
+ }
+
+ // Deduplicate the server names we were provided but keep the ordering
+ // as this encodes useful information about which servers are most likely
+ // to respond.
+ seenSet := make(map[gomatrixserverlib.ServerName]bool)
+ var uniqueList []gomatrixserverlib.ServerName
+ for _, srv := range request.ServerNames {
+ if seenSet[srv] {
+ continue
+ }
+ seenSet[srv] = true
+ uniqueList = append(uniqueList, srv)
+ }
+ request.ServerNames = uniqueList
+
+ // See if there's an existing outbound peek for this room ID with
+ // one of the specified servers.
+ if peeks, err := r.db.GetOutboundPeeks(ctx, request.RoomID); err == nil {
+ for _, peek := range peeks {
+ if _, ok := seenSet[peek.ServerName]; ok {
+ return nil
+ }
+ }
+ }
+
+ // Try each server that we were provided until we land on one that
+ // successfully completes the peek
+ var lastErr error
+ for _, serverName := range request.ServerNames {
+ if err := r.performOutboundPeekUsingServer(
+ ctx,
+ request.RoomID,
+ serverName,
+ supportedVersions,
+ ); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "server_name": serverName,
+ "room_id": request.RoomID,
+ }).Warnf("Failed to peek room through server")
+ lastErr = err
+ continue
+ }
+
+ // We're all good.
+ return nil
+ }
+
+ // If we reach here then we didn't complete a peek for some reason.
+ var httpErr gomatrix.HTTPError
+ if ok := errors.As(lastErr, &httpErr); ok {
+ httpErr.Message = string(httpErr.Contents)
+ // Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
+ httpErr.WrappedError = nil
+ response.LastError = &httpErr
+ } else {
+ response.LastError = &gomatrix.HTTPError{
+ Code: 0,
+ WrappedError: nil,
+ Message: lastErr.Error(),
+ }
+ }
+
+ logrus.Errorf(
+ "failed to peek room %q through %d server(s): last error %s",
+ request.RoomID, len(request.ServerNames), lastErr,
+ )
+
+ return lastErr
+}
+
+func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
+ ctx context.Context,
+ roomID string,
+ serverName gomatrixserverlib.ServerName,
+ supportedVersions []gomatrixserverlib.RoomVersion,
+) error {
+ // create a unique ID for this peek.
+ // for now we just use the room ID again. In future, if we ever
+ // support concurrent peeks to the same room with different filters
+ // then we would need to disambiguate further.
+ peekID := roomID
+
+ // check whether we're peeking already to try to avoid needlessly
+ // re-peeking on the server. we don't need a transaction for this,
+ // given this is a nice-to-have.
+ outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
+ if err != nil {
+ return err
+ }
+ renewing := false
+ if outboundPeek != nil {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval {
+ logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
+ renewing = true
+ } else {
+ logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
+ return nil
+ }
+ }
+
+ // Try to perform an outbound /peek using the information supplied in the
+ // request.
+ respPeek, err := r.federation.Peek(
+ ctx,
+ serverName,
+ roomID,
+ peekID,
+ supportedVersions,
+ )
+ if err != nil {
+ r.statistics.ForServer(serverName).Failure()
+ return fmt.Errorf("r.federation.Peek: %w", err)
+ }
+ r.statistics.ForServer(serverName).Success()
+
+ // Work out if we support the room version that has been supplied in
+ // the peek response.
+ if respPeek.RoomVersion == "" {
+ respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1
+ }
+ if _, err = respPeek.RoomVersion.EventFormat(); err != nil {
+ return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err)
+ }
+
+ // TODO: authenticate the state returned (check its auth events etc)
+ // the equivalent of CheckSendJoinResponse()
+
+ // If we've got this far, the remote server is peeking.
+ if renewing {
+ if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
+ return err
+ }
+ } else {
+ if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
+ return err
+ }
+ }
+
+ respState := respPeek.ToRespState()
+ // logrus.Warnf("got respPeek %#v", respPeek)
+ // Send the newly returned state to the roomserver to update our local view.
+ if err = roomserverAPI.SendEventWithState(
+ ctx, r.rsAPI,
+ roomserverAPI.KindNew,
+ &respState,
+ respPeek.LatestEvent.Headered(respPeek.RoomVersion),
+ nil,
+ ); err != nil {
+ return fmt.Errorf("r.producer.SendEventWithState: %w", err)
+ }
+
+ return nil
+}
+
// PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context,
diff --git a/federationsender/internal/perform/join.go b/federationsender/internal/perform/join.go
index 2fa3d4bf..c23f6fa3 100644
--- a/federationsender/internal/perform/join.go
+++ b/federationsender/internal/perform/join.go
@@ -1,3 +1,17 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package perform
import (
@@ -28,7 +42,6 @@ func (r joinContext) CheckSendJoinResponse(
ctx context.Context,
event *gomatrixserverlib.Event,
server gomatrixserverlib.ServerName,
- respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin,
) (*gomatrixserverlib.RespState, error) {
// A list of events that we have retried, if they were not included in
diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go
index 81d3fc51..3f86a2d0 100644
--- a/federationsender/inthttp/client.go
+++ b/federationsender/inthttp/client.go
@@ -20,6 +20,7 @@ const (
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
+ FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
@@ -76,6 +77,19 @@ func (h *httpFederationSenderInternalAPI) PerformInvite(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+// Handle starting a peek on a remote server.
+func (h *httpFederationSenderInternalAPI) PerformOutboundPeek(
+ ctx context.Context,
+ request *api.PerformOutboundPeekRequest,
+ response *api.PerformOutboundPeekResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformOutboundPeekRequest")
+ defer span.Finish()
+
+ apiURL := h.federationSenderURL + FederationSenderPerformOutboundPeekRequestPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,
request *api.PerformServersAliveRequest,
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go
index 03d616f1..b8361304 100644
--- a/federationsender/storage/interface.go
+++ b/federationsender/storage/interface.go
@@ -51,7 +51,18 @@ type Database interface {
GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
+ // these don't have contexts passed in as we want things to happen regardless of the request context
AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
+
+ AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error)
+ GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error)
+
+ AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error)
+ GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error)
}
diff --git a/federationsender/storage/postgres/inbound_peeks_table.go b/federationsender/storage/postgres/inbound_peeks_table.go
new file mode 100644
index 00000000..fe35ce44
--- /dev/null
+++ b/federationsender/storage/postgres/inbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts BIGINT NOT NULL,
+ renewed_ts BIGINT NOT NULL,
+ renewal_interval BIGINT NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertInboundPeekSQL = "" +
+ "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectInboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectInboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+const renewInboundPeekSQL = "" +
+ "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteInboundPeekSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteInboundPeeksSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+type inboundPeeksStatements struct {
+ db *sql.DB
+ insertInboundPeekStmt *sql.Stmt
+ selectInboundPeekStmt *sql.Stmt
+ selectInboundPeeksStmt *sql.Stmt
+ renewInboundPeekStmt *sql.Stmt
+ deleteInboundPeekStmt *sql.Stmt
+ deleteInboundPeeksStmt *sql.Stmt
+}
+
+func NewPostgresInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
+ s = &inboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(inboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inboundPeeksStatements) InsertInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *inboundPeeksStatements) RenewInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.InboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
+ inboundPeek := types.InboundPeek{}
+ err := row.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &inboundPeek, nil
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (inboundPeeks []types.InboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ inboundPeek := types.InboundPeek{}
+ if err = rows.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ inboundPeeks = append(inboundPeeks, inboundPeek)
+ }
+
+ return inboundPeeks, rows.Err()
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/postgres/outbound_peeks_table.go b/federationsender/storage/postgres/outbound_peeks_table.go
new file mode 100644
index 00000000..596b4bcc
--- /dev/null
+++ b/federationsender/storage/postgres/outbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const outboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts BIGINT NOT NULL,
+ renewed_ts BIGINT NOT NULL,
+ renewal_interval BIGINT NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertOutboundPeekSQL = "" +
+ "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectOutboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectOutboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+const renewOutboundPeekSQL = "" +
+ "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteOutboundPeekSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteOutboundPeeksSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+type outboundPeeksStatements struct {
+ db *sql.DB
+ insertOutboundPeekStmt *sql.Stmt
+ selectOutboundPeekStmt *sql.Stmt
+ selectOutboundPeeksStmt *sql.Stmt
+ renewOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeeksStmt *sql.Stmt
+}
+
+func NewPostgresOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
+ s = &outboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(outboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *outboundPeeksStatements) InsertOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *outboundPeeksStatements) RenewOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.OutboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
+ outboundPeek := types.OutboundPeek{}
+ err := row.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &outboundPeek, nil
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (outboundPeeks []types.OutboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ outboundPeek := types.OutboundPeek{}
+ if err = rows.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ outboundPeeks = append(outboundPeeks, outboundPeek)
+ }
+
+ return outboundPeeks, rows.Err()
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index 75b54bbc..b9827ca1 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -64,16 +64,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil {
return nil, err
}
+ inboundPeeks, err := NewPostgresInboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ outboundPeeks, err := NewPostgresOutboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
- DB: d.db,
- Cache: cache,
- Writer: d.writer,
- FederationSenderJoinedHosts: joinedHosts,
- FederationSenderQueuePDUs: queuePDUs,
- FederationSenderQueueEDUs: queueEDUs,
- FederationSenderQueueJSON: queueJSON,
- FederationSenderRooms: rooms,
- FederationSenderBlacklist: blacklist,
+ DB: d.db,
+ Cache: cache,
+ Writer: d.writer,
+ FederationSenderJoinedHosts: joinedHosts,
+ FederationSenderQueuePDUs: queuePDUs,
+ FederationSenderQueueEDUs: queueEDUs,
+ FederationSenderQueueJSON: queueJSON,
+ FederationSenderRooms: rooms,
+ FederationSenderBlacklist: blacklist,
+ FederationSenderInboundPeeks: inboundPeeks,
+ FederationSenderOutboundPeeks: outboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index fbf84c70..4c949042 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -27,15 +27,17 @@ import (
)
type Database struct {
- DB *sql.DB
- Cache caching.FederationSenderCache
- Writer sqlutil.Writer
- FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
- FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
- FederationSenderQueueJSON tables.FederationSenderQueueJSON
- FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
- FederationSenderRooms tables.FederationSenderRooms
- FederationSenderBlacklist tables.FederationSenderBlacklist
+ DB *sql.DB
+ Cache caching.FederationSenderCache
+ Writer sqlutil.Writer
+ FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
+ FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
+ FederationSenderQueueJSON tables.FederationSenderQueueJSON
+ FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
+ FederationSenderRooms tables.FederationSenderRooms
+ FederationSenderBlacklist tables.FederationSenderBlacklist
+ FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
+ FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
}
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@@ -173,3 +175,43 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
}
+
+func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
+ return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
+}
+
+func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
+ return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
+}
+
+func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
+ return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
+}
+
+func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
+ return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
+}
diff --git a/federationsender/storage/sqlite3/inbound_peeks_table.go b/federationsender/storage/sqlite3/inbound_peeks_table.go
new file mode 100644
index 00000000..d5eacf9e
--- /dev/null
+++ b/federationsender/storage/sqlite3/inbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts INTEGER NOT NULL,
+ renewed_ts INTEGER NOT NULL,
+ renewal_interval INTEGER NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertInboundPeekSQL = "" +
+ "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectInboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectInboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+const renewInboundPeekSQL = "" +
+ "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteInboundPeekSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteInboundPeeksSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+type inboundPeeksStatements struct {
+ db *sql.DB
+ insertInboundPeekStmt *sql.Stmt
+ selectInboundPeekStmt *sql.Stmt
+ selectInboundPeeksStmt *sql.Stmt
+ renewInboundPeekStmt *sql.Stmt
+ deleteInboundPeekStmt *sql.Stmt
+ deleteInboundPeeksStmt *sql.Stmt
+}
+
+func NewSQLiteInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
+ s = &inboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(inboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inboundPeeksStatements) InsertInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *inboundPeeksStatements) RenewInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.InboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
+ inboundPeek := types.InboundPeek{}
+ err := row.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &inboundPeek, nil
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (inboundPeeks []types.InboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ inboundPeek := types.InboundPeek{}
+ if err = rows.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ inboundPeeks = append(inboundPeeks, inboundPeek)
+ }
+
+ return inboundPeeks, rows.Err()
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/sqlite3/outbound_peeks_table.go b/federationsender/storage/sqlite3/outbound_peeks_table.go
new file mode 100644
index 00000000..02aefce7
--- /dev/null
+++ b/federationsender/storage/sqlite3/outbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const outboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts INTEGER NOT NULL,
+ renewed_ts INTEGER NOT NULL,
+ renewal_interval INTEGER NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertOutboundPeekSQL = "" +
+ "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectOutboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectOutboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+const renewOutboundPeekSQL = "" +
+ "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteOutboundPeekSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteOutboundPeeksSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+type outboundPeeksStatements struct {
+ db *sql.DB
+ insertOutboundPeekStmt *sql.Stmt
+ selectOutboundPeekStmt *sql.Stmt
+ selectOutboundPeeksStmt *sql.Stmt
+ renewOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeeksStmt *sql.Stmt
+}
+
+func NewSQLiteOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
+ s = &outboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(outboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *outboundPeeksStatements) InsertOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *outboundPeeksStatements) RenewOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.OutboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
+ outboundPeek := types.OutboundPeek{}
+ err := row.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &outboundPeek, nil
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (outboundPeeks []types.OutboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ outboundPeek := types.OutboundPeek{}
+ if err = rows.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ outboundPeeks = append(outboundPeeks, outboundPeek)
+ }
+
+ return outboundPeeks, rows.Err()
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index e66d7690..2b135858 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -66,16 +66,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil {
return nil, err
}
+ outboundPeeks, err := NewSQLiteOutboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ inboundPeeks, err := NewSQLiteInboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
- DB: d.db,
- Cache: cache,
- Writer: d.writer,
- FederationSenderJoinedHosts: joinedHosts,
- FederationSenderQueuePDUs: queuePDUs,
- FederationSenderQueueEDUs: queueEDUs,
- FederationSenderQueueJSON: queueJSON,
- FederationSenderRooms: rooms,
- FederationSenderBlacklist: blacklist,
+ DB: d.db,
+ Cache: cache,
+ Writer: d.writer,
+ FederationSenderJoinedHosts: joinedHosts,
+ FederationSenderQueuePDUs: queuePDUs,
+ FederationSenderQueueEDUs: queueEDUs,
+ FederationSenderQueueJSON: queueJSON,
+ FederationSenderRooms: rooms,
+ FederationSenderBlacklist: blacklist,
+ FederationSenderOutboundPeeks: outboundPeeks,
+ FederationSenderInboundPeeks: inboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go
index 69e952de..22fd5554 100644
--- a/federationsender/storage/tables/interface.go
+++ b/federationsender/storage/tables/interface.go
@@ -67,3 +67,21 @@ type FederationSenderBlacklist interface {
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
}
+
+type FederationSenderOutboundPeeks interface {
+ InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
+ SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error)
+ DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
+ DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
+}
+
+type FederationSenderInboundPeeks interface {
+ InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
+ SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error)
+ DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
+ DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
+}
diff --git a/federationsender/types/types.go b/federationsender/types/types.go
index 398d3267..90da310c 100644
--- a/federationsender/types/types.go
+++ b/federationsender/types/types.go
@@ -49,3 +49,23 @@ func (e EventIDMismatchError) Error() string {
e.DatabaseID, e.RoomServerID,
)
}
+
+// tracks peeks we're performing on another server over federation
+type OutboundPeek struct {
+ PeekID string
+ RoomID string
+ ServerName gomatrixserverlib.ServerName
+ CreationTimestamp int64
+ RenewedTimestamp int64
+ RenewalInterval int64
+}
+
+// tracks peeks other servers are performing on us over federation
+type InboundPeek struct {
+ PeekID string
+ RoomID string
+ ServerName gomatrixserverlib.ServerName
+ CreationTimestamp int64
+ RenewedTimestamp int64
+ RenewalInterval int64
+}