aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/sqlite3/syncserver.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-03-19 12:07:01 +0000
committerGitHub <noreply@github.com>2020-03-19 12:07:01 +0000
commitad5849d2224245e1277b8bc7d3e2104ed6061a99 (patch)
tree8551c3d1096a650ecca0cba0e5edf6ad91d078e0 /syncapi/storage/sqlite3/syncserver.go
parentbfbf96eec9152f61cb3e54154f1ed82148d82a8a (diff)
HeaderedEvents in sync API (#922)
* Use HeaderedEvent in syncapi * Update notifier test * Fix persisting headered event * Clean up unused API function * Fix overshadowed err from linter * Write headered JSON to invites table too * Rename event_json to headered_event_json in syncapi database schemae * Fix invites_table queries * Update QueryRoomVersionCapabilitiesResponse comment * Fix syncapi SQLite
Diffstat (limited to 'syncapi/storage/sqlite3/syncserver.go')
-rw-r--r--syncapi/storage/sqlite3/syncserver.go56
1 files changed, 29 insertions, 27 deletions
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 0e84c8c8..221d9672 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -40,7 +40,7 @@ import (
type stateDelta struct {
roomID string
- stateEvents []gomatrixserverlib.Event
+ stateEvents []gomatrixserverlib.HeaderedEvent
membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
@@ -126,7 +126,7 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
-func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil {
return nil, err
@@ -137,7 +137,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil
}
-func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.Event) error {
+func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
// If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID())
@@ -181,8 +181,8 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
// Returns an error if there was a problem inserting this event.
func (d *SyncServerDatasource) WriteEvent(
ctx context.Context,
- ev *gomatrixserverlib.Event,
- addStateEvents []gomatrixserverlib.Event,
+ ev *gomatrixserverlib.HeaderedEvent,
+ addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool,
) (pduPosition types.StreamPosition, returnErr error) {
@@ -218,7 +218,7 @@ func (d *SyncServerDatasource) WriteEvent(
func (d *SyncServerDatasource) updateRoomState(
ctx context.Context, txn *sql.Tx,
removedEventIDs []string,
- addedEvents []gomatrixserverlib.Event,
+ addedEvents []gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
@@ -254,7 +254,7 @@ func (d *SyncServerDatasource) updateRoomState(
// If there was an issue during the retrieval, returns an error
func (d *SyncServerDatasource) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string,
-) (*gomatrixserverlib.Event, error) {
+) (*gomatrixserverlib.HeaderedEvent, error) {
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
}
@@ -263,7 +263,7 @@ func (d *SyncServerDatasource) GetStateEvent(
// Returns an error if there was an issue with the retrieval.
func (d *SyncServerDatasource) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter,
-) (stateEvents []gomatrixserverlib.Event, err error) {
+) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
return err
@@ -633,8 +633,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
-
- var stateEvents []gomatrixserverlib.Event
+ var stateEvents []gomatrixserverlib.HeaderedEvent
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
if err != nil {
return
@@ -665,14 +664,13 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs
recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
- stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String()
- jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
- jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
@@ -748,7 +746,7 @@ func (d *SyncServerDatasource) UpsertAccountData(
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) AddInviteEvent(
- ctx context.Context, inviteEvent gomatrixserverlib.Event,
+ ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
streamPos, err = d.streamID.nextStreamID(ctx, txn)
@@ -806,7 +804,7 @@ func (d *SyncServerDatasource) addInvitesToResponse(
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse()
ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
- []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync,
+ []gomatrixserverlib.Event{inviteEvent.Event}, gomatrixserverlib.FormatSync,
)
// TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir
@@ -858,8 +856,12 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
if err != nil {
return err
}
- recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
- delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
+ headeredRecentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
+ var recentEvents []gomatrixserverlib.Event
+ for _, event := range d.StreamEventsToEvents(nil, recentStreamEvents) {
+ recentEvents = append(recentEvents, event.Event)
+ }
+ delta.stateEvents = removeDuplicates(delta.stateEvents, headeredRecentEvents)
backwardTopologyPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
switch delta.membership {
@@ -871,7 +873,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
- jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
@@ -884,7 +886,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
).String()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
- lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
+ lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
@@ -1013,7 +1015,7 @@ func (d *SyncServerDatasource) getStateDeltas(
// dupe join events will result in the entire room state coming down to the client again. This is added in
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline.
- if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
+ if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
if membership == gomatrixserverlib.Join {
// send full room state down instead of a delta
var s []types.StreamEvent
@@ -1094,7 +1096,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
- if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
+ if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas = append(deltas, stateDelta{
membership: membership,
@@ -1122,7 +1124,7 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
}
s := make([]types.StreamEvent, len(allState))
for i := 0; i < len(s); i++ {
- s[i] = types.StreamEvent{Event: allState[i], StreamPosition: 0}
+ s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
}
return s, nil
}
@@ -1130,10 +1132,10 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
-func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event {
- out := make([]gomatrixserverlib.Event, len(in))
+func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
+ out := make([]gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
- out[i] = in[i].Event
+ out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField(
@@ -1153,7 +1155,7 @@ func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward.
-func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event {
+func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
@@ -1177,7 +1179,7 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
-func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
+func getMembershipFromEvent(ev *gomatrixserverlib.HeaderedEvent, userID string) string {
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
membership, err := ev.Membership()
if err != nil {