diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-16 13:52:08 +0100 |
---|---|---|
committer | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-16 13:52:08 +0100 |
commit | e5208c2ec9b5e1878c3f9860c98de29a9d95ca18 (patch) | |
tree | 1661c2f912202d2aea4130c2ac55e77d47ecbbcb /federationsender | |
parent | f5e7e7513ca0ecddafc967e3b20f35cb1201a151 (diff) |
Yggdrasil demo updates ("Bare QUIC")
Squashed commit of the following:
commit 86c2388e13ffdbabdd50cea205652dccc40e1860
Merge: b0a3ee6c f5e7e751
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 16 13:47:10 2020 +0100
Merge branch 'master' into neilalexander/yggbarequic
commit b0a3ee6c5c063962384bb91c59ec753ddc8cfe5f
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 16 13:42:22 2020 +0100
Add support for broadcasting wake-up EDUs to known hosts
commit 8a5c2020b3a4b705b5d5686a9e71990a49e6d471
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 16 13:42:10 2020 +0100
Bare QUIC demo working
commit d3939b3d6568cf4262c0391486a5203873b68bfc
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Wed Jul 15 11:42:43 2020 +0100
Support bare Yggdrasil sessions with encrypted QUIC
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/api/api.go | 12 | ||||
-rw-r--r-- | federationsender/internal/perform.go | 22 | ||||
-rw-r--r-- | federationsender/inthttp/client.go | 14 | ||||
-rw-r--r-- | federationsender/inthttp/server.go | 13 | ||||
-rw-r--r-- | federationsender/storage/interface.go | 1 | ||||
-rw-r--r-- | federationsender/storage/postgres/joined_hosts_table.go | 34 | ||||
-rw-r--r-- | federationsender/storage/postgres/storage.go | 7 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/joined_hosts_table.go | 34 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/storage.go | 7 |
9 files changed, 138 insertions, 6 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go index d90ffd29..b87af0eb 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -42,6 +42,12 @@ type FederationSenderInternalAPI interface { request *PerformServersAliveRequest, response *PerformServersAliveResponse, ) error + // Broadcasts an EDU to all servers in rooms we are joined to. + PerformBroadcastEDU( + ctx context.Context, + request *PerformBroadcastEDURequest, + response *PerformBroadcastEDUResponse, + ) error } type PerformDirectoryLookupRequest struct { @@ -91,3 +97,9 @@ type QueryJoinedHostServerNamesInRoomRequest struct { type QueryJoinedHostServerNamesInRoomResponse struct { ServerNames []gomatrixserverlib.ServerName `json:"server_names"` } + +type PerformBroadcastEDURequest struct { +} + +type PerformBroadcastEDUResponse struct { +} diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 96b1149d..d9a4b963 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -308,3 +308,25 @@ func (r *FederationSenderInternalAPI) PerformServersAlive( return nil } + +// PerformServersAlive implements api.FederationSenderInternalAPI +func (r *FederationSenderInternalAPI) PerformBroadcastEDU( + ctx context.Context, + request *api.PerformBroadcastEDURequest, + response *api.PerformBroadcastEDUResponse, +) (err error) { + destinations, err := r.db.GetAllJoinedHosts(ctx) + if err != nil { + return fmt.Errorf("r.db.GetAllJoinedHosts: %w", err) + } + + edu := &gomatrixserverlib.EDU{ + Type: "org.matrix.dendrite.wakeup", + Origin: string(r.cfg.Matrix.ServerName), + } + if err = r.queues.SendEDU(edu, r.cfg.Matrix.ServerName, destinations); err != nil { + return fmt.Errorf("r.queues.SendEDU: %w", err) + } + + return nil +} diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 25de99cc..4d968919 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -19,6 +19,7 @@ const ( FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" + FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" ) // NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. @@ -105,3 +106,16 @@ func (h *httpFederationSenderInternalAPI) PerformDirectoryLookup( apiURL := h.federationSenderURL + FederationSenderPerformDirectoryLookupRequestPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +// Handle an instruction to broadcast an EDU to all servers in rooms we are joined to. +func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU( + ctx context.Context, + request *api.PerformBroadcastEDURequest, + response *api.PerformBroadcastEDUResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformBroadcastEDU") + defer span.Finish() + + apiURL := h.federationSenderURL + FederationSenderPerformBroadcastEDUPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go index a4f3d63d..ee05cf95 100644 --- a/federationsender/inthttp/server.go +++ b/federationsender/inthttp/server.go @@ -76,4 +76,17 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(FederationSenderPerformBroadcastEDUPath, + httputil.MakeInternalAPI("PerformBroadcastEDU", func(req *http.Request) util.JSONResponse { + var request api.PerformBroadcastEDURequest + var response api.PerformBroadcastEDUResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := intAPI.PerformBroadcastEDU(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index 4bf36c24..6fff3518 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -26,6 +26,7 @@ type Database interface { internal.PartitionStorer UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) + GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) StoreJSON(ctx context.Context, js string) (int64, error) AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nids []int64) error GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) diff --git a/federationsender/storage/postgres/joined_hosts_table.go b/federationsender/storage/postgres/joined_hosts_table.go index c0f9a7d5..2612e7e0 100644 --- a/federationsender/storage/postgres/joined_hosts_table.go +++ b/federationsender/storage/postgres/joined_hosts_table.go @@ -57,10 +57,14 @@ const selectJoinedHostsSQL = "" + "SELECT event_id, server_name FROM federationsender_joined_hosts" + " WHERE room_id = $1" +const selectAllJoinedHostsSQL = "" + + "SELECT DISTINCT server_name FROM federationsender_joined_hosts" + type joinedHostsStatements struct { - insertJoinedHostsStmt *sql.Stmt - deleteJoinedHostsStmt *sql.Stmt - selectJoinedHostsStmt *sql.Stmt + insertJoinedHostsStmt *sql.Stmt + deleteJoinedHostsStmt *sql.Stmt + selectJoinedHostsStmt *sql.Stmt + selectAllJoinedHostsStmt *sql.Stmt } func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { @@ -77,6 +81,9 @@ func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { return } + if s.selectAllJoinedHostsStmt, err = db.Prepare(selectAllJoinedHostsSQL); err != nil { + return + } return } @@ -112,6 +119,27 @@ func (s *joinedHostsStatements) selectJoinedHosts( return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID) } +func (s *joinedHostsStatements) selectAllJoinedHosts( + ctx context.Context, +) ([]gomatrixserverlib.ServerName, error) { + rows, err := s.selectAllJoinedHostsStmt.QueryContext(ctx) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectAllJoinedHosts: rows.close() failed") + + var result []gomatrixserverlib.ServerName + for rows.Next() { + var serverName string + if err = rows.Scan(&serverName); err != nil { + return nil, err + } + result = append(result, gomatrixserverlib.ServerName(serverName)) + } + + return result, rows.Err() +} + func joinedHostsFromStmt( ctx context.Context, stmt *sql.Stmt, roomID string, ) ([]types.JoinedHost, error) { diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 80686e09..1535ebdf 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -134,6 +134,13 @@ func (d *Database) GetJoinedHosts( return d.selectJoinedHosts(ctx, roomID) } +// GetAllJoinedHosts returns the currently joined hosts for +// all rooms known to the federation sender. +// Returns an error if something goes wrong. +func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) { + return d.selectAllJoinedHosts(ctx) +} + // StoreJSON adds a JSON blob into the queue JSON table and returns // a NID. The NID will then be used when inserting the per-destination // metadata entries. diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go index d9824658..fd9ffedc 100644 --- a/federationsender/storage/sqlite3/joined_hosts_table.go +++ b/federationsender/storage/sqlite3/joined_hosts_table.go @@ -56,10 +56,14 @@ const selectJoinedHostsSQL = "" + "SELECT event_id, server_name FROM federationsender_joined_hosts" + " WHERE room_id = $1" +const selectAllJoinedHostsSQL = "" + + "SELECT DISTINCT server_name FROM federationsender_joined_hosts" + type joinedHostsStatements struct { - insertJoinedHostsStmt *sql.Stmt - deleteJoinedHostsStmt *sql.Stmt - selectJoinedHostsStmt *sql.Stmt + insertJoinedHostsStmt *sql.Stmt + deleteJoinedHostsStmt *sql.Stmt + selectJoinedHostsStmt *sql.Stmt + selectAllJoinedHostsStmt *sql.Stmt } func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { @@ -76,6 +80,9 @@ func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { return } + if s.selectAllJoinedHostsStmt, err = db.Prepare(selectAllJoinedHostsSQL); err != nil { + return + } return } @@ -115,6 +122,27 @@ func (s *joinedHostsStatements) selectJoinedHosts( return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID) } +func (s *joinedHostsStatements) selectAllJoinedHosts( + ctx context.Context, +) ([]gomatrixserverlib.ServerName, error) { + rows, err := s.selectAllJoinedHostsStmt.QueryContext(ctx) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectAllJoinedHosts: rows.close() failed") + + var result []gomatrixserverlib.ServerName + for rows.Next() { + var serverName string + if err = rows.Scan(&serverName); err != nil { + return nil, err + } + result = append(result, gomatrixserverlib.ServerName(serverName)) + } + + return result, rows.Err() +} + func joinedHostsFromStmt( ctx context.Context, stmt *sql.Stmt, roomID string, ) ([]types.JoinedHost, error) { diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 1a4715bf..b23a2dbe 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -145,6 +145,13 @@ func (d *Database) GetJoinedHosts( return d.selectJoinedHosts(ctx, roomID) } +// GetAllJoinedHosts returns the currently joined hosts for +// all rooms known to the federation sender. +// Returns an error if something goes wrong. +func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) { + return d.selectAllJoinedHosts(ctx) +} + // StoreJSON adds a JSON blob into the queue JSON table and returns // a NID. The NID will then be used when inserting the per-destination // metadata entries. |