diff options
author | Kegsay <kegan@matrix.org> | 2020-03-24 12:20:10 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-24 12:20:10 +0000 |
commit | 6bac7e5efddea05aa68a56e44423d2bc157ec364 (patch) | |
tree | c12cbf607cfbe9d7cc69bcfcbc58eaca2f8ab3e4 /syncapi/storage | |
parent | 5a1a1ded1b5d8387f89e3d586729d175c1f1a1d0 (diff) |
Implement backfill over federation (#938)
* Implement history visibility checks for /backfill
Required for p2p to show history correctly.
* Add sytest
* Logging
* Fix two backfill bugs which prevented backfill from working correctly
- When receiving backfill requests, do not send the event that was in the original request.
- When storing backfill results, correctly update the backwards extremity for the room.
* hack: make backfill work multiple times
* add sqlite impl and remove logging
* Linting
Diffstat (limited to 'syncapi/storage')
-rw-r--r-- | syncapi/storage/postgres/backward_extremities_table.go | 58 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 1 | ||||
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 21 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/backward_extremities_table.go | 72 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 17 |
5 files changed, 80 insertions, 89 deletions
diff --git a/syncapi/storage/postgres/backward_extremities_table.go b/syncapi/storage/postgres/backward_extremities_table.go index 8286ca43..b3ee28e0 100644 --- a/syncapi/storage/postgres/backward_extremities_table.go +++ b/syncapi/storage/postgres/backward_extremities_table.go @@ -21,39 +21,53 @@ import ( "github.com/matrix-org/dendrite/common" ) +// The purpose of this table is to keep track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. const backwardExtremitiesSchema = ` -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( -- The 'room_id' key for the event. room_id TEXT NOT NULL, - -- The event ID for the event. + -- The event ID for the last known event. This is the backwards extremity. event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, - PRIMARY KEY(room_id, event_id) + PRIMARY KEY(room_id, event_id, prev_event_id) ); ` const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id)" + - " VALUES ($1, $2)" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + " ON CONFLICT DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + "SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -const isBackwardExtremitySQL = "" + - "SELECT EXISTS (" + - " SELECT TRUE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + - ")" - const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" type backwardExtremitiesStatements struct { insertBackwardExtremityStmt *sql.Stmt selectBackwardExtremitiesForRoomStmt *sql.Stmt - isBackwardExtremityStmt *sql.Stmt deleteBackwardExtremityStmt *sql.Stmt } @@ -68,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { return } - if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil { - return - } if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { return } @@ -78,17 +89,15 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { } func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, ) (err error) { - _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) return } func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (eventIDs []string, err error) { - eventIDs = make([]string, 0) - rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return @@ -107,16 +116,9 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( return eventIDs, rows.Err() } -func (s *backwardExtremitiesStatements) isBackwardExtremity( - ctx context.Context, roomID, eventID string, -) (isBE bool, err error) { - err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE) - return -} - func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, ) (err error) { - _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) return } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 5f9a1d0c..0b53dfa9 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -305,7 +305,6 @@ func (s *outputRoomEventsStatements) selectRecentEvents( } else { stmt = common.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if err != nil { return nil, err diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index ccf1c565..f3f1aabc 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -111,22 +111,17 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ return d.StreamEventsToEvents(nil, streamEvents), nil } -func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error { - // If the event is already known as a backward extremity, don't consider - // it as such anymore now that we have it. - isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID()) - if err != nil { +// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of +// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table +// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. +func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { + if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } - if isBackwardExtremity { - if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs()) + prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) if err != nil { return err } @@ -141,7 +136,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev // If the event is missing, consider it a backward extremity. if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { return err } } @@ -174,7 +169,7 @@ func (d *SyncServerDatasource) WriteEvent( return err } - if err = d.handleBackwardExtremities(ctx, ev); err != nil { + if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { return err } diff --git a/syncapi/storage/sqlite3/backward_extremities_table.go b/syncapi/storage/sqlite3/backward_extremities_table.go index fcf15da2..0663e2a1 100644 --- a/syncapi/storage/sqlite3/backward_extremities_table.go +++ b/syncapi/storage/sqlite3/backward_extremities_table.go @@ -21,38 +21,53 @@ import ( "github.com/matrix-org/dendrite/common" ) +// The purpose of this table is to keep track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. const backwardExtremitiesSchema = ` -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. room_id TEXT NOT NULL, + -- The event ID for the last known event. This is the backwards extremity. event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, - PRIMARY KEY(room_id, event_id) + PRIMARY KEY(room_id, event_id, prev_event_id) ); ` const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id)" + - " VALUES ($1, $2)" + - " ON CONFLICT (room_id, event_id) DO NOTHING" + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + "SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -const isBackwardExtremitySQL = "" + - "SELECT EXISTS (" + - " SELECT TRUE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + - ")" - const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" type backwardExtremitiesStatements struct { insertBackwardExtremityStmt *sql.Stmt selectBackwardExtremitiesForRoomStmt *sql.Stmt - isBackwardExtremityStmt *sql.Stmt deleteBackwardExtremityStmt *sql.Stmt } @@ -67,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { return } - if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil { - return - } if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { return } @@ -77,23 +89,20 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { } func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, ) (err error) { - stmt := common.TxStmt(txn, s.insertBackwardExtremityStmt) - _, err = stmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) return } func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( - ctx context.Context, txn *sql.Tx, roomID string, + ctx context.Context, roomID string, ) (eventIDs []string, err error) { - eventIDs = make([]string, 0) - - stmt := common.TxStmt(txn, s.selectBackwardExtremitiesForRoomStmt) - rows, err := stmt.QueryContext(ctx, roomID) + rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return } + defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") for rows.Next() { var eID string @@ -104,21 +113,12 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( eventIDs = append(eventIDs, eID) } - return -} - -func (s *backwardExtremitiesStatements) isBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, -) (isBE bool, err error) { - stmt := common.TxStmt(txn, s.isBackwardExtremityStmt) - err = stmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE) - return + return eventIDs, rows.Err() } func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, ) (err error) { - stmt := common.TxStmt(txn, s.deleteBackwardExtremityStmt) - _, err = stmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) return } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index a06fc91f..8ff18900 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -137,18 +137,13 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ return d.StreamEventsToEvents(nil, streamEvents), nil } +// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of +// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table +// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { - // If the event is already known as a backward extremity, don't consider - // it as such anymore now that we have it. - isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()) - if err != nil { + if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } - if isBackwardExtremity { - if err = d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. @@ -167,7 +162,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // If the event is missing, consider it a backward extremity. if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { return err } } @@ -348,7 +343,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi func (d *SyncServerDatasource) BackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (backwardExtremities []string, err error) { - return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, nil, roomID) + return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID) } // MaxTopologicalPosition returns the highest topological position for a given |