aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-16 13:52:08 +0100
committerNeil Alexander <neilalexander@users.noreply.github.com>2020-07-16 13:52:08 +0100
commite5208c2ec9b5e1878c3f9860c98de29a9d95ca18 (patch)
tree1661c2f912202d2aea4130c2ac55e77d47ecbbcb /federationsender
parentf5e7e7513ca0ecddafc967e3b20f35cb1201a151 (diff)
Yggdrasil demo updates ("Bare QUIC")
Squashed commit of the following: commit 86c2388e13ffdbabdd50cea205652dccc40e1860 Merge: b0a3ee6c f5e7e751 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 16 13:47:10 2020 +0100 Merge branch 'master' into neilalexander/yggbarequic commit b0a3ee6c5c063962384bb91c59ec753ddc8cfe5f Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 16 13:42:22 2020 +0100 Add support for broadcasting wake-up EDUs to known hosts commit 8a5c2020b3a4b705b5d5686a9e71990a49e6d471 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 16 13:42:10 2020 +0100 Bare QUIC demo working commit d3939b3d6568cf4262c0391486a5203873b68bfc Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Wed Jul 15 11:42:43 2020 +0100 Support bare Yggdrasil sessions with encrypted QUIC
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/api/api.go12
-rw-r--r--federationsender/internal/perform.go22
-rw-r--r--federationsender/inthttp/client.go14
-rw-r--r--federationsender/inthttp/server.go13
-rw-r--r--federationsender/storage/interface.go1
-rw-r--r--federationsender/storage/postgres/joined_hosts_table.go34
-rw-r--r--federationsender/storage/postgres/storage.go7
-rw-r--r--federationsender/storage/sqlite3/joined_hosts_table.go34
-rw-r--r--federationsender/storage/sqlite3/storage.go7
9 files changed, 138 insertions, 6 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index d90ffd29..b87af0eb 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -42,6 +42,12 @@ type FederationSenderInternalAPI interface {
request *PerformServersAliveRequest,
response *PerformServersAliveResponse,
) error
+ // Broadcasts an EDU to all servers in rooms we are joined to.
+ PerformBroadcastEDU(
+ ctx context.Context,
+ request *PerformBroadcastEDURequest,
+ response *PerformBroadcastEDUResponse,
+ ) error
}
type PerformDirectoryLookupRequest struct {
@@ -91,3 +97,9 @@ type QueryJoinedHostServerNamesInRoomRequest struct {
type QueryJoinedHostServerNamesInRoomResponse struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
+
+type PerformBroadcastEDURequest struct {
+}
+
+type PerformBroadcastEDUResponse struct {
+}
diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go
index 96b1149d..d9a4b963 100644
--- a/federationsender/internal/perform.go
+++ b/federationsender/internal/perform.go
@@ -308,3 +308,25 @@ func (r *FederationSenderInternalAPI) PerformServersAlive(
return nil
}
+
+// PerformServersAlive implements api.FederationSenderInternalAPI
+func (r *FederationSenderInternalAPI) PerformBroadcastEDU(
+ ctx context.Context,
+ request *api.PerformBroadcastEDURequest,
+ response *api.PerformBroadcastEDUResponse,
+) (err error) {
+ destinations, err := r.db.GetAllJoinedHosts(ctx)
+ if err != nil {
+ return fmt.Errorf("r.db.GetAllJoinedHosts: %w", err)
+ }
+
+ edu := &gomatrixserverlib.EDU{
+ Type: "org.matrix.dendrite.wakeup",
+ Origin: string(r.cfg.Matrix.ServerName),
+ }
+ if err = r.queues.SendEDU(edu, r.cfg.Matrix.ServerName, destinations); err != nil {
+ return fmt.Errorf("r.queues.SendEDU: %w", err)
+ }
+
+ return nil
+}
diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go
index 25de99cc..4d968919 100644
--- a/federationsender/inthttp/client.go
+++ b/federationsender/inthttp/client.go
@@ -19,6 +19,7 @@ const (
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
+ FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
)
// NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API.
@@ -105,3 +106,16 @@ func (h *httpFederationSenderInternalAPI) PerformDirectoryLookup(
apiURL := h.federationSenderURL + FederationSenderPerformDirectoryLookupRequestPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+
+// Handle an instruction to broadcast an EDU to all servers in rooms we are joined to.
+func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU(
+ ctx context.Context,
+ request *api.PerformBroadcastEDURequest,
+ response *api.PerformBroadcastEDUResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformBroadcastEDU")
+ defer span.Finish()
+
+ apiURL := h.federationSenderURL + FederationSenderPerformBroadcastEDUPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go
index a4f3d63d..ee05cf95 100644
--- a/federationsender/inthttp/server.go
+++ b/federationsender/inthttp/server.go
@@ -76,4 +76,17 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(FederationSenderPerformBroadcastEDUPath,
+ httputil.MakeInternalAPI("PerformBroadcastEDU", func(req *http.Request) util.JSONResponse {
+ var request api.PerformBroadcastEDURequest
+ var response api.PerformBroadcastEDUResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := intAPI.PerformBroadcastEDU(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
}
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go
index 4bf36c24..6fff3518 100644
--- a/federationsender/storage/interface.go
+++ b/federationsender/storage/interface.go
@@ -26,6 +26,7 @@ type Database interface {
internal.PartitionStorer
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
+ GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
StoreJSON(ctx context.Context, js string) (int64, error)
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nids []int64) error
GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error)
diff --git a/federationsender/storage/postgres/joined_hosts_table.go b/federationsender/storage/postgres/joined_hosts_table.go
index c0f9a7d5..2612e7e0 100644
--- a/federationsender/storage/postgres/joined_hosts_table.go
+++ b/federationsender/storage/postgres/joined_hosts_table.go
@@ -57,10 +57,14 @@ const selectJoinedHostsSQL = "" +
"SELECT event_id, server_name FROM federationsender_joined_hosts" +
" WHERE room_id = $1"
+const selectAllJoinedHostsSQL = "" +
+ "SELECT DISTINCT server_name FROM federationsender_joined_hosts"
+
type joinedHostsStatements struct {
- insertJoinedHostsStmt *sql.Stmt
- deleteJoinedHostsStmt *sql.Stmt
- selectJoinedHostsStmt *sql.Stmt
+ insertJoinedHostsStmt *sql.Stmt
+ deleteJoinedHostsStmt *sql.Stmt
+ selectJoinedHostsStmt *sql.Stmt
+ selectAllJoinedHostsStmt *sql.Stmt
}
func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
@@ -77,6 +81,9 @@ func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
return
}
+ if s.selectAllJoinedHostsStmt, err = db.Prepare(selectAllJoinedHostsSQL); err != nil {
+ return
+ }
return
}
@@ -112,6 +119,27 @@ func (s *joinedHostsStatements) selectJoinedHosts(
return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
}
+func (s *joinedHostsStatements) selectAllJoinedHosts(
+ ctx context.Context,
+) ([]gomatrixserverlib.ServerName, error) {
+ rows, err := s.selectAllJoinedHostsStmt.QueryContext(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectAllJoinedHosts: rows.close() failed")
+
+ var result []gomatrixserverlib.ServerName
+ for rows.Next() {
+ var serverName string
+ if err = rows.Scan(&serverName); err != nil {
+ return nil, err
+ }
+ result = append(result, gomatrixserverlib.ServerName(serverName))
+ }
+
+ return result, rows.Err()
+}
+
func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) {
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index 80686e09..1535ebdf 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -134,6 +134,13 @@ func (d *Database) GetJoinedHosts(
return d.selectJoinedHosts(ctx, roomID)
}
+// GetAllJoinedHosts returns the currently joined hosts for
+// all rooms known to the federation sender.
+// Returns an error if something goes wrong.
+func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
+ return d.selectAllJoinedHosts(ctx)
+}
+
// StoreJSON adds a JSON blob into the queue JSON table and returns
// a NID. The NID will then be used when inserting the per-destination
// metadata entries.
diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go
index d9824658..fd9ffedc 100644
--- a/federationsender/storage/sqlite3/joined_hosts_table.go
+++ b/federationsender/storage/sqlite3/joined_hosts_table.go
@@ -56,10 +56,14 @@ const selectJoinedHostsSQL = "" +
"SELECT event_id, server_name FROM federationsender_joined_hosts" +
" WHERE room_id = $1"
+const selectAllJoinedHostsSQL = "" +
+ "SELECT DISTINCT server_name FROM federationsender_joined_hosts"
+
type joinedHostsStatements struct {
- insertJoinedHostsStmt *sql.Stmt
- deleteJoinedHostsStmt *sql.Stmt
- selectJoinedHostsStmt *sql.Stmt
+ insertJoinedHostsStmt *sql.Stmt
+ deleteJoinedHostsStmt *sql.Stmt
+ selectJoinedHostsStmt *sql.Stmt
+ selectAllJoinedHostsStmt *sql.Stmt
}
func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
@@ -76,6 +80,9 @@ func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
return
}
+ if s.selectAllJoinedHostsStmt, err = db.Prepare(selectAllJoinedHostsSQL); err != nil {
+ return
+ }
return
}
@@ -115,6 +122,27 @@ func (s *joinedHostsStatements) selectJoinedHosts(
return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
}
+func (s *joinedHostsStatements) selectAllJoinedHosts(
+ ctx context.Context,
+) ([]gomatrixserverlib.ServerName, error) {
+ rows, err := s.selectAllJoinedHostsStmt.QueryContext(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectAllJoinedHosts: rows.close() failed")
+
+ var result []gomatrixserverlib.ServerName
+ for rows.Next() {
+ var serverName string
+ if err = rows.Scan(&serverName); err != nil {
+ return nil, err
+ }
+ result = append(result, gomatrixserverlib.ServerName(serverName))
+ }
+
+ return result, rows.Err()
+}
+
func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) {
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index 1a4715bf..b23a2dbe 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -145,6 +145,13 @@ func (d *Database) GetJoinedHosts(
return d.selectJoinedHosts(ctx, roomID)
}
+// GetAllJoinedHosts returns the currently joined hosts for
+// all rooms known to the federation sender.
+// Returns an error if something goes wrong.
+func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
+ return d.selectAllJoinedHosts(ctx)
+}
+
// StoreJSON adds a JSON blob into the queue JSON table and returns
// a NID. The NID will then be used when inserting the per-destination
// metadata entries.