aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/routing/messages.go176
-rw-r--r--syncapi/storage/postgres/backward_extremities_table.go58
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go1
-rw-r--r--syncapi/storage/postgres/syncserver.go21
-rw-r--r--syncapi/storage/sqlite3/backward_extremities_table.go72
-rw-r--r--syncapi/storage/sqlite3/syncserver.go17
-rw-r--r--syncapi/sync/requestpool.go13
7 files changed, 173 insertions, 185 deletions
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index cd21c3bd..5f2e4f17 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -28,7 +28,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
)
type messagesReq struct {
@@ -151,6 +151,14 @@ func OnIncomingMessagesRequest(
util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed")
return jsonerror.InternalServerError()
}
+ util.GetLogger(req.Context()).WithFields(logrus.Fields{
+ "from": from.String(),
+ "to": to.String(),
+ "limit": limit,
+ "backwards": backwardOrdering,
+ "return_start": start.String(),
+ "return_end": end.String(),
+ }).Info("Responding")
// Respond with the events.
return util.JSONResponse{
@@ -302,8 +310,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
events []gomatrixserverlib.HeaderedEvent, err error,
) {
// Check if we have enough events.
- isSetLargeEnough := true
- if len(streamEvents) < r.limit {
+ isSetLargeEnough := len(streamEvents) >= r.limit
+ if !isSetLargeEnough {
+ // it might be fine we don't have up to 'limit' events, let's find out
if r.backwardOrdering {
if r.wasToProvided {
// The condition in the SQL query is a strict "greater than" so
@@ -343,54 +352,6 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
return
}
-// containsBackwardExtremity checks if a slice of StreamEvent contains a
-// backward extremity. It does so by selecting the earliest event in the slice
-// and by checking the presence in the database of all of its parent events, and
-// considers the event itself a backward extremity if at least one of the parent
-// events doesn't exist in the database.
-// Returns an error if there was an issue with talking to the database.
-//
-// This function is unused but currently set to nolint for now until we are
-// absolutely sure that the changes in matrix-org/dendrite#847 are behaving
-// properly.
-// nolint:unused
-func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) {
- // Select the earliest retrieved event.
- var ev *types.StreamEvent
- if r.backwardOrdering {
- ev = &(events[len(events)-1])
- } else {
- ev = &(events[0])
- }
- // Get the earliest retrieved event's parents.
- prevIDs := ev.PrevEventIDs()
- prevs, err := r.db.Events(r.ctx, prevIDs)
- if err != nil {
- return false, nil
- }
- // Check if we have all of the events we requested. If not, it means we've
- // reached a backward extremity.
- var eventInDB bool
- var id string
- // Iterate over the IDs we used in the request.
- for _, id = range prevIDs {
- eventInDB = false
- // Iterate over the events we got in response.
- for _, ev := range prevs {
- if ev.EventID() == id {
- eventInDB = true
- }
- }
- // One occurrence of one the event's parents not being present in the
- // database is enough to say that the event is a backward extremity.
- if !eventInDB {
- return true, nil
- }
- }
-
- return false, nil
-}
-
// backfill performs a backfill request over the federation on another
// homeserver in the room.
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
@@ -401,6 +362,48 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
+ srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs)
+ if err != nil {
+ return nil, fmt.Errorf("Cannot find server to backfill from: %w", err)
+ }
+
+ pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
+
+ // If the roomserver responded with at least one server that isn't us,
+ // send it a request for backfill.
+ util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
+ txn, err := r.federation.Backfill(
+ r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ for _, p := range txn.PDUs {
+ pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
+ }
+ util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill")
+
+ // Store the events in the database, while marking them as unfit to show
+ // up in responses to sync requests.
+ for _, pdu := range pdus {
+ headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
+ if _, err = r.db.WriteEvent(
+ r.ctx,
+ &headered,
+ []gomatrixserverlib.HeaderedEvent{},
+ []string{},
+ []string{},
+ nil, true,
+ ); err != nil {
+ return nil, err
+ }
+ }
+
+ return pdus, nil
+}
+
+func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
@@ -409,7 +412,33 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
EventID: fromEventIDs[0],
}
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
- return nil, err
+ util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
+ // FIXME: We shouldn't be doing this but in situations where we have already backfilled once
+ // the query API doesn't work as backfilled events do not make it to the room server.
+ // This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
+ // We need to inject backfilled events into the room server and store them appropriately.
+ events, err := r.db.Events(r.ctx, fromEventIDs)
+ if err != nil {
+ return "", err
+ }
+ if len(events) == 0 {
+ // should be impossible as these event IDs are backwards extremities
+ return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
+ }
+ // The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
+ // We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
+ for _, e := range events {
+ _, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
+ if srverr != nil {
+ util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
+ continue
+ }
+ if srv != r.cfg.Matrix.ServerName {
+ return srv, nil
+ }
+ }
+ // no valid events which have a remote server, fail.
+ return "", err
}
// Use the first server from the response, except if that server is us.
@@ -423,45 +452,11 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
- srvToBackfillFrom = gomatrixserverlib.ServerName("")
- log.Warn("Not enough servers to backfill from")
- }
- }
-
- pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
-
- // If the roomserver responded with at least one server that isn't us,
- // send it a request for backfill.
- if len(srvToBackfillFrom) > 0 {
- txn, err := r.federation.Backfill(
- r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
- )
- if err != nil {
- return nil, err
- }
-
- for _, p := range txn.PDUs {
- pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
- }
-
- // Store the events in the database, while marking them as unfit to show
- // up in responses to sync requests.
- for _, pdu := range pdus {
- headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
- if _, err = r.db.WriteEvent(
- r.ctx,
- &headered,
- []gomatrixserverlib.HeaderedEvent{},
- []string{},
- []string{},
- nil, true,
- ); err != nil {
- return nil, err
- }
+ util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
+ return "", nil
}
}
-
- return pdus, nil
+ return srvToBackfillFrom, nil
}
// setToDefault returns the default value for the "to" query parameter of a
@@ -475,7 +470,8 @@ func setToDefault(
roomID string,
) (to *types.PaginationToken, err error) {
if backwardOrdering {
- to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0)
+ // go 1 earlier than the first event so we correctly fetch the earliest event
+ to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0)
} else {
var pos types.StreamPosition
pos, err = db.MaxTopologicalPosition(ctx, roomID)
diff --git a/syncapi/storage/postgres/backward_extremities_table.go b/syncapi/storage/postgres/backward_extremities_table.go
index 8286ca43..b3ee28e0 100644
--- a/syncapi/storage/postgres/backward_extremities_table.go
+++ b/syncapi/storage/postgres/backward_extremities_table.go
@@ -21,39 +21,53 @@ import (
"github.com/matrix-org/dendrite/common"
)
+// The purpose of this table is to keep track of backwards extremities for a room.
+// Backwards extremities are the earliest (DAG-wise) known events which we have
+// the entire event JSON. These event IDs are used in federation requests to fetch
+// even earlier events.
+//
+// We persist the previous event IDs as well, one per row, so when we do fetch even
+// earlier events we can simply delete rows which referenced it. Consider the graph:
+// A
+// | Event C has 1 prev_event ID: A.
+// B C
+// |___| Event D has 2 prev_event IDs: B and C.
+// |
+// D
+// The earliest known event we have is D, so this table has 2 rows.
+// A backfill request gives us C but not B. We delete rows where prev_event=C. This
+// still means that D is a backwards extremity as we do not have event B. However, event
+// C is *also* a backwards extremity at this point as we do not have event A. Later,
+// when we fetch event B, we delete rows where prev_event=B. This then removes D as
+// a backwards extremity because there are no more rows with event_id=B.
const backwardExtremitiesSchema = `
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
- -- The event ID for the event.
+ -- The event ID for the last known event. This is the backwards extremity.
event_id TEXT NOT NULL,
+ -- The prev_events for the last known event. This is used to update extremities.
+ prev_event_id TEXT NOT NULL,
- PRIMARY KEY(room_id, event_id)
+ PRIMARY KEY(room_id, event_id, prev_event_id)
);
`
const insertBackwardExtremitySQL = "" +
- "INSERT INTO syncapi_backward_extremities (room_id, event_id)" +
- " VALUES ($1, $2)" +
+ "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
+ " VALUES ($1, $2, $3)" +
" ON CONFLICT DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
-const isBackwardExtremitySQL = "" +
- "SELECT EXISTS (" +
- " SELECT TRUE FROM syncapi_backward_extremities" +
- " WHERE room_id = $1 AND event_id = $2" +
- ")"
-
const deleteBackwardExtremitySQL = "" +
- "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2"
+ "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
type backwardExtremitiesStatements struct {
insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt
- isBackwardExtremityStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt
}
@@ -68,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
return
}
- if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil {
- return
- }
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return
}
@@ -78,17 +89,15 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
}
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
- ctx context.Context, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
) (err error) {
- _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
return
}
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (eventIDs []string, err error) {
- eventIDs = make([]string, 0)
-
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
@@ -107,16 +116,9 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
return eventIDs, rows.Err()
}
-func (s *backwardExtremitiesStatements) isBackwardExtremity(
- ctx context.Context, roomID, eventID string,
-) (isBE bool, err error) {
- err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE)
- return
-}
-
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
- ctx context.Context, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
) (err error) {
- _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
return
}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 5f9a1d0c..0b53dfa9 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -305,7 +305,6 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
} else {
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
}
-
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index ccf1c565..f3f1aabc 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -111,22 +111,17 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil
}
-func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error {
- // If the event is already known as a backward extremity, don't consider
- // it as such anymore now that we have it.
- isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
- if err != nil {
+// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
+// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
+// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
+func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
+ if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
return err
}
- if isBackwardExtremity {
- if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
- return err
- }
- }
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
- prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs())
+ prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs())
if err != nil {
return err
}
@@ -141,7 +136,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev
// If the event is missing, consider it a backward extremity.
if !found {
- if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
+ if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
return err
}
}
@@ -174,7 +169,7 @@ func (d *SyncServerDatasource) WriteEvent(
return err
}
- if err = d.handleBackwardExtremities(ctx, ev); err != nil {
+ if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
return err
}
diff --git a/syncapi/storage/sqlite3/backward_extremities_table.go b/syncapi/storage/sqlite3/backward_extremities_table.go
index fcf15da2..0663e2a1 100644
--- a/syncapi/storage/sqlite3/backward_extremities_table.go
+++ b/syncapi/storage/sqlite3/backward_extremities_table.go
@@ -21,38 +21,53 @@ import (
"github.com/matrix-org/dendrite/common"
)
+// The purpose of this table is to keep track of backwards extremities for a room.
+// Backwards extremities are the earliest (DAG-wise) known events which we have
+// the entire event JSON. These event IDs are used in federation requests to fetch
+// even earlier events.
+//
+// We persist the previous event IDs as well, one per row, so when we do fetch even
+// earlier events we can simply delete rows which referenced it. Consider the graph:
+// A
+// | Event C has 1 prev_event ID: A.
+// B C
+// |___| Event D has 2 prev_event IDs: B and C.
+// |
+// D
+// The earliest known event we have is D, so this table has 2 rows.
+// A backfill request gives us C but not B. We delete rows where prev_event=C. This
+// still means that D is a backwards extremity as we do not have event B. However, event
+// C is *also* a backwards extremity at this point as we do not have event A. Later,
+// when we fetch event B, we delete rows where prev_event=B. This then removes D as
+// a backwards extremity because there are no more rows with event_id=B.
const backwardExtremitiesSchema = `
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
+ -- The 'room_id' key for the event.
room_id TEXT NOT NULL,
+ -- The event ID for the last known event. This is the backwards extremity.
event_id TEXT NOT NULL,
+ -- The prev_events for the last known event. This is used to update extremities.
+ prev_event_id TEXT NOT NULL,
- PRIMARY KEY(room_id, event_id)
+ PRIMARY KEY(room_id, event_id, prev_event_id)
);
`
const insertBackwardExtremitySQL = "" +
- "INSERT INTO syncapi_backward_extremities (room_id, event_id)" +
- " VALUES ($1, $2)" +
- " ON CONFLICT (room_id, event_id) DO NOTHING"
+ "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
+ " VALUES ($1, $2, $3)" +
+ " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
-const isBackwardExtremitySQL = "" +
- "SELECT EXISTS (" +
- " SELECT TRUE FROM syncapi_backward_extremities" +
- " WHERE room_id = $1 AND event_id = $2" +
- ")"
-
const deleteBackwardExtremitySQL = "" +
- "DELETE FROM syncapi_backward_extremities" +
- " WHERE room_id = $1 AND event_id = $2"
+ "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
type backwardExtremitiesStatements struct {
insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt
- isBackwardExtremityStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt
}
@@ -67,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
return
}
- if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil {
- return
- }
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return
}
@@ -77,23 +89,20 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
}
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
) (err error) {
- stmt := common.TxStmt(txn, s.insertBackwardExtremityStmt)
- _, err = stmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
return
}
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
- ctx context.Context, txn *sql.Tx, roomID string,
+ ctx context.Context, roomID string,
) (eventIDs []string, err error) {
- eventIDs = make([]string, 0)
-
- stmt := common.TxStmt(txn, s.selectBackwardExtremitiesForRoomStmt)
- rows, err := stmt.QueryContext(ctx, roomID)
+ rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
}
+ defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
for rows.Next() {
var eID string
@@ -104,21 +113,12 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
eventIDs = append(eventIDs, eID)
}
- return
-}
-
-func (s *backwardExtremitiesStatements) isBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, eventID string,
-) (isBE bool, err error) {
- stmt := common.TxStmt(txn, s.isBackwardExtremityStmt)
- err = stmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE)
- return
+ return eventIDs, rows.Err()
}
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
- ctx context.Context, txn *sql.Tx, roomID, eventID string,
+ ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
) (err error) {
- stmt := common.TxStmt(txn, s.deleteBackwardExtremityStmt)
- _, err = stmt.ExecContext(ctx, roomID, eventID)
+ _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
return
}
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index a06fc91f..8ff18900 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -137,18 +137,13 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil
}
+// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
+// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
+// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
- // If the event is already known as a backward extremity, don't consider
- // it as such anymore now that we have it.
- isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID())
- if err != nil {
+ if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
return err
}
- if isBackwardExtremity {
- if err = d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
- return err
- }
- }
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
@@ -167,7 +162,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
// If the event is missing, consider it a backward extremity.
if !found {
- if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
+ if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
return err
}
}
@@ -348,7 +343,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi
func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (backwardExtremities []string, err error) {
- return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, nil, roomID)
+ return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID)
}
// MaxTopologicalPosition returns the highest topological position for a given
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index b4ccbd27..69efd8aa 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -47,7 +47,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
var syncData *types.Response
// Extract values from request
- logger := util.GetLogger(req.Context())
userID := device.UserID
syncReq, err := newSyncRequest(req, *device)
if err != nil {
@@ -56,20 +55,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
JSON: jsonerror.Unknown(err.Error()),
}
}
- logger.WithFields(log.Fields{
+ logger := util.GetLogger(req.Context()).WithFields(log.Fields{
"userID": userID,
"since": syncReq.since,
"timeout": syncReq.timeout,
- }).Info("Incoming /sync request")
+ })
currPos := rp.notifier.CurrentPosition()
if shouldReturnImmediately(syncReq) {
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
+ logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
+ logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
@@ -107,7 +107,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
hasTimedOut = true
// Or for the request to be cancelled
case <-req.Context().Done():
- util.GetLogger(req.Context()).WithError(err).Error("request cancelled")
+ logger.WithError(err).Error("request cancelled")
return jsonerror.InternalServerError()
}
@@ -118,11 +118,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
+ logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
if !syncData.IsEmpty() || hasTimedOut {
+ logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,