aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2017-08-08 16:38:03 +0100
committerGitHub <noreply@github.com>2017-08-08 16:38:03 +0100
commit2071387f3cc5f9ee35845e398c417fd7cc3bfdaa (patch)
tree2b368f46d94c9aec681d92e821f58da2ac7adb77 /src
parentc35803c9d8c1df024489166384c87c5a0c7eb4a2 (diff)
Add tables for tracking the state of invites to the room server. (#165)
* Storage functions for invite events * Add table for tracking membership state * More stuff * More stuff * Use utility methods from gomatrixserverlib, rather than reimplementing them * More stuff * Return string rather than pointer to string * Update gomatrixserverlib * Use HTTP API for roomserver input. * Use synchronous HTTP API for writing events to the roomserver * Remove unused config for kafka topic * Add new output types to roomserver for invites * Write membership updates * Separate filtering from pairing up changes in membershipChanges * Fix SQL * Fix SQL * Namespace the tables * Fix SQL * Use clearer names for some of the variables * Rename senderID for consistency * Restructure update membership * Comments * More comment * Fix SQL * More comments * Assign state keys inside the transaction * Comment on the purpose of the latestEventsUpdater * Comment on the purpose of updateMembership * Remove duplicate fields from stateChange * Attempt to rewrite comment in 'english' * More comments * Fix comment * Comment * more comments
Diffstat (limited to 'src')
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/api/output.go37
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/input/events.go4
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/input/input.go29
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go162
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/input/membership.go297
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go16
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go149
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go111
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/storage/sql.go4
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/storage/storage.go149
-rw-r--r--src/github.com/matrix-org/dendrite/roomserver/types/types.go44
11 files changed, 898 insertions, 104 deletions
diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go
index f1b40231..953fe3c8 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go
@@ -21,8 +21,14 @@ import (
// An OutputType is a type of roomserver output.
type OutputType string
-// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
-const OutputTypeNewRoomEvent OutputType = "new_room_event"
+const (
+ // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
+ OutputTypeNewRoomEvent OutputType = "new_room_event"
+ // OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent
+ OutputTypeNewInviteEvent OutputType = "new_invite_event"
+ // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
+ OutputTypeRetireInviteEvent OutputType = "retire_invite_event"
+)
// An OutputEvent is an entry in the roomserver output kafka log.
// Consumers should check the type field when consuming this event.
@@ -31,6 +37,10 @@ type OutputEvent struct {
Type OutputType `json:"type"`
// The content of event with type OutputTypeNewRoomEvent
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
+ // The content of event with type OutputTypeNewInviteEvent
+ NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
+ // The content of event with type OutputTypeRetireInviteEvent
+ RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
}
// An OutputNewRoomEvent is written when the roomserver receives a new event.
@@ -98,3 +108,26 @@ type OutputNewRoomEvent struct {
// future proof the API for virtual hosting.
SendAsServer string `json:"send_as_server"`
}
+
+// An OutputNewInviteEvent is written whenever an invite becomes active.
+// Invite events can be received outside of an existing room so have to be
+// tracked separately from the room events themselves.
+type OutputNewInviteEvent struct {
+ // The "m.room.member" invite event.
+ Event gomatrixserverlib.Event `json:"event"`
+}
+
+// An OutputRetireInviteEvent is written whenever an existing invite is no longer
+// active. An invite stops being active if the user joins the room or if the
+// invite is rejected by the user.
+type OutputRetireInviteEvent struct {
+ // The ID of the "m.room.member" invite event.
+ EventID string
+ // Optional event ID of the event that replaced the invite.
+ // This can be empty if the invite was rejected locally and we were unable
+ // to reach the server that originally sent the invite.
+ RetiredByEventID string
+ // The "membership" of the user after retiring the invite. One of "join"
+ // "leave" or "ban".
+ Membership string
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go
index f8acff47..c1eee4c9 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go
@@ -43,8 +43,8 @@ type RoomEventDatabase interface {
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface {
- // Write an event.
- WriteOutputRoomEvent(output api.OutputNewRoomEvent) error
+ // Write a list of events for a room
+ WriteOutputEvents(roomID string, updates []api.OutputEvent) error
}
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go
index ffbebd0c..c8ac58d3 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go
@@ -46,22 +46,21 @@ type RoomserverInputAPI struct {
processed int64
}
-// WriteOutputRoomEvent implements OutputRoomEventWriter
-func (r *RoomserverInputAPI) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error {
- var m sarama.ProducerMessage
- oe := api.OutputEvent{
- Type: api.OutputTypeNewRoomEvent,
- NewRoomEvent: &output,
- }
- value, err := json.Marshal(oe)
- if err != nil {
- return err
+// WriteOutputEvents implements OutputRoomEventWriter
+func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
+ messages := make([]*sarama.ProducerMessage, len(updates))
+ for i := range updates {
+ value, err := json.Marshal(updates[i])
+ if err != nil {
+ return err
+ }
+ messages[i] = &sarama.ProducerMessage{
+ Topic: r.OutputRoomEventTopic,
+ Key: sarama.StringEncoder(roomID),
+ Value: sarama.ByteEncoder(value),
+ }
}
- m.Topic = r.OutputRoomEventTopic
- m.Key = sarama.StringEncoder("")
- m.Value = sarama.ByteEncoder(value)
- _, _, err = r.Producer.SendMessage(&m)
- return err
+ return r.Producer.SendMessages(messages)
}
// InputRoomEvents implements api.RoomserverInputAPI
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go
index 6b5f3967..9328ecf3 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go
@@ -66,69 +66,88 @@ func updateLatestEvents(
}
}()
- err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event, sendAsServer)
- return
+ u := latestEventsUpdater{
+ db: db, updater: updater, ow: ow, roomNID: roomNID,
+ stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
+ }
+ return u.doUpdateLatestEvents()
}
-func doUpdateLatestEvents(
- db RoomEventDatabase,
- updater types.RoomRecentEventsUpdater,
- ow OutputRoomEventWriter,
- roomNID types.RoomNID,
- stateAtEvent types.StateAtEvent,
- event gomatrixserverlib.Event,
- sendAsServer string,
-) error {
+// latestEventsUpdater tracks the state used to update the latest events in the
+// room. It mostly just ferries state between the various function calls.
+// The state could be passed using function arguments, but it becomes impractical
+// when there are so many variables to pass around.
+type latestEventsUpdater struct {
+ db RoomEventDatabase
+ updater types.RoomRecentEventsUpdater
+ ow OutputRoomEventWriter
+ roomNID types.RoomNID
+ stateAtEvent types.StateAtEvent
+ event gomatrixserverlib.Event
+ // Which server to send this event as.
+ sendAsServer string
+ // The eventID of the event that was processed before this one.
+ lastEventIDSent string
+ // The latest events in the room after processing this event.
+ latest []types.StateAtEventAndReference
+ // The state entries removed from and added to the current state of the
+ // room as a result of processing this event. They are sorted lists.
+ removed []types.StateEntry
+ added []types.StateEntry
+ // The state entries that are removed and added to recover the state before
+ // the event being processed. They are sorted lists.
+ stateBeforeEventRemoves []types.StateEntry
+ stateBeforeEventAdds []types.StateEntry
+ // The snapshots of current state before and after processing this event
+ oldStateNID types.StateSnapshotNID
+ newStateNID types.StateSnapshotNID
+}
+
+func (u *latestEventsUpdater) doUpdateLatestEvents() error {
var err error
var prevEvents []gomatrixserverlib.EventReference
- prevEvents = event.PrevEvents()
- oldLatest := updater.LatestEvents()
- lastEventIDSent := updater.LastEventIDSent()
- oldStateNID := updater.CurrentStateSnapshotNID()
+ prevEvents = u.event.PrevEvents()
+ oldLatest := u.updater.LatestEvents()
+ u.lastEventIDSent = u.updater.LastEventIDSent()
+ u.oldStateNID = u.updater.CurrentStateSnapshotNID()
- if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil {
+ if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
return err
} else if hasBeenSent {
// Already sent this event so we can stop processing
return nil
}
- if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
+ if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil {
return err
}
- eventReference := event.EventReference()
+ eventReference := u.event.EventReference()
// Check if this event is already referenced by another event in the room.
var alreadyReferenced bool
- if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
+ if alreadyReferenced, err = u.updater.IsReferenced(eventReference); err != nil {
return err
}
- newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
+ u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
EventReference: eventReference,
- StateAtEvent: stateAtEvent,
+ StateAtEvent: u.stateAtEvent,
})
- latestStateAtEvents := make([]types.StateAtEvent, len(newLatest))
- for i := range newLatest {
- latestStateAtEvents[i] = newLatest[i].StateAtEvent
- }
- newStateNID, err := state.CalculateAndStoreStateAfterEvents(db, roomNID, latestStateAtEvents)
- if err != nil {
+ if err = u.latestState(); err != nil {
return err
}
- removed, added, err := state.DifferenceBetweeenStateSnapshots(db, oldStateNID, newStateNID)
+ updates, err := updateMemberships(u.db, u.updater, u.removed, u.added)
if err != nil {
return err
}
- stateBeforeEventRemoves, stateBeforeEventAdds, err := state.DifferenceBetweeenStateSnapshots(
- db, newStateNID, stateAtEvent.BeforeStateSnapshotNID,
- )
+ update, err := u.makeOutputNewRoomEvent()
if err != nil {
return err
}
+ updates = append(updates, *update)
// Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
@@ -138,24 +157,47 @@ func doUpdateLatestEvents(
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
- if err = writeEvent(
- db, ow, lastEventIDSent, event, newLatest, removed, added,
- stateBeforeEventRemoves, stateBeforeEventAdds, sendAsServer,
- ); err != nil {
+ if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
return err
}
- if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID, newStateNID); err != nil {
+ if err = u.updater.SetLatestEvents(u.roomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return err
}
- if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil {
+ if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
return err
}
return nil
}
+func (u *latestEventsUpdater) latestState() error {
+ var err error
+
+ latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
+ for i := range u.latest {
+ latestStateAtEvents[i] = u.latest[i].StateAtEvent
+ }
+ u.newStateNID, err = state.CalculateAndStoreStateAfterEvents(u.db, u.roomNID, latestStateAtEvents)
+ if err != nil {
+ return err
+ }
+
+ u.removed, u.added, err = state.DifferenceBetweeenStateSnapshots(u.db, u.oldStateNID, u.newStateNID)
+ if err != nil {
+ return err
+ }
+
+ u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = state.DifferenceBetweeenStateSnapshots(
+ u.db, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
+ )
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
var alreadyInLatest bool
var newLatest []types.StateAtEventAndReference
@@ -189,57 +231,55 @@ func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenc
return newLatest
}
-func writeEvent(
- db RoomEventDatabase, ow OutputRoomEventWriter, lastEventIDSent string,
- event gomatrixserverlib.Event, latest []types.StateAtEventAndReference,
- removed, added []types.StateEntry,
- stateBeforeEventRemoves, stateBeforeEventAdds []types.StateEntry,
- sendAsServer string,
-) error {
+func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
- latestEventIDs := make([]string, len(latest))
- for i := range latest {
- latestEventIDs[i] = latest[i].EventID
+ latestEventIDs := make([]string, len(u.latest))
+ for i := range u.latest {
+ latestEventIDs[i] = u.latest[i].EventID
}
ore := api.OutputNewRoomEvent{
- Event: event,
- LastSentEventID: lastEventIDSent,
+ Event: u.event,
+ LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
}
var stateEventNIDs []types.EventNID
- for _, entry := range added {
+ for _, entry := range u.added {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
- for _, entry := range removed {
+ for _, entry := range u.removed {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
- for _, entry := range stateBeforeEventRemoves {
+ for _, entry := range u.stateBeforeEventRemoves {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
- for _, entry := range stateBeforeEventAdds {
+ for _, entry := range u.stateBeforeEventAdds {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
- eventIDMap, err := db.EventIDs(stateEventNIDs)
+ eventIDMap, err := u.db.EventIDs(stateEventNIDs)
if err != nil {
- return err
+ return nil, err
}
- for _, entry := range added {
+ for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
- for _, entry := range removed {
+ for _, entry := range u.removed {
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
}
- for _, entry := range stateBeforeEventRemoves {
+ for _, entry := range u.stateBeforeEventRemoves {
ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
}
- for _, entry := range stateBeforeEventAdds {
+ for _, entry := range u.stateBeforeEventAdds {
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
}
- ore.SendAsServer = sendAsServer
- return ow.WriteOutputRoomEvent(ore)
+ ore.SendAsServer = u.sendAsServer
+
+ return &api.OutputEvent{
+ Type: api.OutputTypeNewRoomEvent,
+ NewRoomEvent: &ore,
+ }, nil
}
type eventNIDSorter []types.EventNID
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go
new file mode 100644
index 00000000..f306697f
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go
@@ -0,0 +1,297 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package input
+
+import (
+ "fmt"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// updateMembership updates the current membership and the invites for each
+// user affected by a change in the current state of the room.
+// Returns a list of output events to write to the kafka log to inform the
+// consumers about the invites added or retired by the change in current state.
+func updateMemberships(
+ db RoomEventDatabase, updater types.RoomRecentEventsUpdater, removed, added []types.StateEntry,
+) ([]api.OutputEvent, error) {
+ changes := membershipChanges(removed, added)
+ var eventNIDs []types.EventNID
+ for _, change := range changes {
+ if change.addedEventNID != 0 {
+ eventNIDs = append(eventNIDs, change.addedEventNID)
+ }
+ if change.removedEventNID != 0 {
+ eventNIDs = append(eventNIDs, change.removedEventNID)
+ }
+ }
+
+ // Load the event JSON so we can look up the "membership" key.
+ // TODO: Maybe add a membership key to the events table so we can load that
+ // key without having to load the entire event JSON?
+ events, err := db.Events(eventNIDs)
+ if err != nil {
+ return nil, err
+ }
+
+ var updates []api.OutputEvent
+
+ for _, change := range changes {
+ var ae *gomatrixserverlib.Event
+ var re *gomatrixserverlib.Event
+ targetUserNID := change.EventStateKeyNID
+ if change.removedEventNID != 0 {
+ ev, _ := eventMap(events).lookup(change.removedEventNID)
+ if ev != nil {
+ re = &ev.Event
+ }
+ }
+ if change.addedEventNID != 0 {
+ ev, _ := eventMap(events).lookup(change.addedEventNID)
+ if ev != nil {
+ ae = &ev.Event
+ }
+ }
+ if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
+ return nil, err
+ }
+ }
+ return nil, nil
+}
+
+func updateMembership(
+ updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID,
+ remove, add *gomatrixserverlib.Event,
+ updates []api.OutputEvent,
+) ([]api.OutputEvent, error) {
+ var err error
+ // Default the membership to "leave" if no event was added or removed.
+ old := "leave"
+ new := "leave"
+
+ if remove != nil {
+ old, err = remove.Membership()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if add != nil {
+ new, err = add.Membership()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if old == new {
+ // If the membership is the same then nothing changed and we can return
+ // immediately. This should help speed up processing for display name
+ // changes where the membership is "join" both before and after.
+ return updates, nil
+ }
+
+ mu, err := updater.MembershipUpdater(targetUserNID)
+ if err != nil {
+ return nil, err
+ }
+
+ switch new {
+ case "invite":
+ return updateToInviteMembership(mu, add, updates)
+ case "join":
+ return updateToJoinMembership(mu, add, updates)
+ case "leave", "ban":
+ return updateToLeaveMembership(mu, add, new, updates)
+ default:
+ panic(fmt.Errorf(
+ "input: membership %q is not one of the allowed values", new,
+ ))
+ }
+}
+
+func updateToInviteMembership(
+ mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
+) ([]api.OutputEvent, error) {
+ // We may have already sent the invite to the user, either because we are
+ // reprocessing this event, or because the we received this invite from a
+ // remote server via the federation invite API. In those cases we don't need
+ // to send the event.
+ needsSending, err := mu.SetToInvite(*add)
+ if err != nil {
+ return nil, err
+ }
+ if needsSending {
+ // We notify the consumers using a special event even though we will
+ // notify them about the change in current state as part of the normal
+ // room event stream. This ensures that the consumers only have to
+ // consider a single stream of events when determining whether a user
+ // is invited, rather than having to combine multiple streams themselves.
+ onie := api.OutputNewInviteEvent{
+ Event: *add,
+ }
+ updates = append(updates, api.OutputEvent{
+ Type: api.OutputTypeNewInviteEvent,
+ NewInviteEvent: &onie,
+ })
+ }
+ return updates, nil
+}
+
+func updateToJoinMembership(
+ mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
+) ([]api.OutputEvent, error) {
+ // If the user is already marked as being joined then we can return immediately.
+ // TODO: Is this code reachable given the "old != new" guard in updateMembership?
+ if mu.IsJoin() {
+ return updates, nil
+ }
+ // When we mark a user as being joined we will invalidate any invites that
+ // are active for that user. We notify the consumers that the invites have
+ // been retired using a special event, even though they could infer this
+ // by studying the state changes in the room event stream.
+ retired, err := mu.SetToJoin(add.Sender())
+ if err != nil {
+ return nil, err
+ }
+ for _, eventID := range retired {
+ orie := api.OutputRetireInviteEvent{
+ EventID: eventID,
+ Membership: "join",
+ }
+ if add != nil {
+ orie.RetiredByEventID = add.EventID()
+ }
+ updates = append(updates, api.OutputEvent{
+ Type: api.OutputTypeRetireInviteEvent,
+ RetireInviteEvent: &orie,
+ })
+ }
+ return updates, nil
+}
+
+func updateToLeaveMembership(
+ mu types.MembershipUpdater, add *gomatrixserverlib.Event,
+ newMembership string, updates []api.OutputEvent,
+) ([]api.OutputEvent, error) {
+ // If the user is already neither joined, nor invited to the room then we
+ // can return immediately.
+ if mu.IsLeave() {
+ return updates, nil
+ }
+ // When we mark a user as having left we will invalidate any invites that
+ // are active for that user. We notify the consumers that the invites have
+ // been retired using a special event, even though they could infer this
+ // by studying the state changes in the room event stream.
+ retired, err := mu.SetToLeave(add.Sender())
+ if err != nil {
+ return nil, err
+ }
+ for _, eventID := range retired {
+ orie := api.OutputRetireInviteEvent{
+ EventID: eventID,
+ Membership: newMembership,
+ }
+ if add != nil {
+ orie.RetiredByEventID = add.EventID()
+ }
+ updates = append(updates, api.OutputEvent{
+ Type: api.OutputTypeRetireInviteEvent,
+ RetireInviteEvent: &orie,
+ })
+ }
+ return updates, nil
+}
+
+// membershipChanges pairs up the membership state changes from a sorted list
+// of state removed and a sorted list of state added.
+func membershipChanges(removed, added []types.StateEntry) []stateChange {
+ changes := pairUpChanges(removed, added)
+ var result []stateChange
+ for _, c := range changes {
+ if c.EventTypeNID == types.MRoomMemberNID {
+ result = append(result, c)
+ }
+ }
+ return result
+}
+
+type stateChange struct {
+ types.StateKeyTuple
+ removedEventNID types.EventNID
+ addedEventNID types.EventNID
+}
+
+// pairUpChanges pairs up the state events added and removed for each type,
+// state key tuple. Assumes that removed and added are sorted.
+func pairUpChanges(removed, added []types.StateEntry) []stateChange {
+ var ai int
+ var ri int
+ var result []stateChange
+ for {
+ switch {
+ case ai == len(added):
+ // We've reached the end of the added entries.
+ // The rest of the removed list are events that were removed without
+ // an event with the same state key being added.
+ for _, s := range removed[ri:] {
+ result = append(result, stateChange{
+ StateKeyTuple: s.StateKeyTuple,
+ removedEventNID: s.EventNID,
+ })
+ }
+ return result
+ case ri == len(removed):
+ // We've reached the end of the removed entries.
+ // The rest of the added list are events that were added without
+ // an event with the same state key being removed.
+ for _, s := range added[ai:] {
+ result = append(result, stateChange{
+ StateKeyTuple: s.StateKeyTuple,
+ addedEventNID: s.EventNID,
+ })
+ }
+ return result
+ case added[ai].StateKeyTuple == removed[ri].StateKeyTuple:
+ // The tuple is in both lists so an event with that key is being
+ // removed and another event with the same key is being added.
+ result = append(result, stateChange{
+ StateKeyTuple: added[ai].StateKeyTuple,
+ removedEventNID: removed[ri].EventNID,
+ addedEventNID: added[ai].EventNID,
+ })
+ ai++
+ ri++
+ case added[ai].StateKeyTuple.LessThan(removed[ri].StateKeyTuple):
+ // The lists are sorted so the added entry being less than the
+ // removed entry means that the added event was added without an
+ // event with the same key being removed.
+ result = append(result, stateChange{
+ StateKeyTuple: added[ai].StateKeyTuple,
+ addedEventNID: added[ai].EventNID,
+ })
+ ai++
+ default:
+ // Reaching the default case implies that the removed entry is less
+ // than the added entry. Since the lists are sorted this means that
+ // the removed event was removed without an event with the same
+ // key being added.
+ result = append(result, stateChange{
+ StateKeyTuple: removed[ai].StateKeyTuple,
+ removedEventNID: removed[ri].EventNID,
+ })
+ ri++
+ }
+ }
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go
index da5b8a4e..d30e4581 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go
@@ -76,15 +76,23 @@ func (s *eventStateKeyStatements) prepare(db *sql.DB) (err error) {
}.prepare(db)
}
-func (s *eventStateKeyStatements) insertEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
+func (s *eventStateKeyStatements) insertEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) {
var eventStateKeyNID int64
- err := s.insertEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
+ stmt := s.insertEventStateKeyNIDStmt
+ if txn != nil {
+ stmt = txn.Stmt(stmt)
+ }
+ err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
return types.EventStateKeyNID(eventStateKeyNID), err
}
-func (s *eventStateKeyStatements) selectEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
+func (s *eventStateKeyStatements) selectEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) {
var eventStateKeyNID int64
- err := s.selectEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
+ stmt := s.selectEventStateKeyNIDStmt
+ if txn != nil {
+ stmt = txn.Stmt(stmt)
+ }
+ err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
return types.EventStateKeyNID(eventStateKeyNID), err
}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go
new file mode 100644
index 00000000..9e0860b4
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go
@@ -0,0 +1,149 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+const inviteSchema = `
+CREATE TABLE IF NOT EXISTS roomserver_invites (
+ -- The string ID of the invite event itself.
+ -- We can't use a numeric event ID here because we don't always have
+ -- enough information to store an invite in the event table.
+ -- In particular we don't always have a chain of auth_events for invites
+ -- received over federation.
+ invite_event_id TEXT PRIMARY KEY,
+ -- The numeric ID of the room the invite m.room.member event is in.
+ room_nid BIGINT NOT NULL,
+ -- The numeric ID for the state key of the invite m.room.member event.
+ -- This tells us who the invite is for.
+ -- This is used to query the active invites for a user.
+ target_nid BIGINT NOT NULL,
+ -- The numeric ID for the sender of the invite m.room.member event.
+ -- This tells us who sent the invite.
+ -- This is used to work out which matrix server we should talk to when
+ -- we try to join the room.
+ sender_nid BIGINT NOT NULL DEFAULT 0,
+ -- This is used to track whether the invite is still active.
+ -- This is set implicitly when processing new join and leave events and
+ -- explicitly when rejecting events over federation.
+ retired BOOLEAN NOT NULL DEFAULT FALSE,
+ -- The invite event JSON.
+ invite_event_json TEXT NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS roomserver_invites_active_idx ON roomserver_invites (target_nid, room_nid)
+ WHERE NOT retired;
+`
+const insertInviteEventSQL = "" +
+ "INSERT INTO roomserver_invites (invite_event_id, room_nid, target_nid," +
+ " sender_nid, invite_event_json) VALUES ($1, $2, $3, $4, $5)" +
+ " ON CONFLICT DO NOTHING"
+
+const selectInviteActiveForUserInRoomSQL = "" +
+ "SELECT sender_nid FROM roomserver_invites" +
+ " WHERE target_nid = $1 AND room_nid = $2" +
+ " AND NOT retired"
+
+// Retire every active invite for a user in a room.
+// Ideally we'd know which invite events were retired by a given update so we
+// wouldn't need to remove every active invite.
+// However the matrix protocol doesn't give us a way to reliably identify the
+// invites that were retired, so we are forced to retire all of them.
+const updateInviteRetiredSQL = "" +
+ "UPDATE roomserver_invites SET retired = TRUE" +
+ " WHERE room_nid = $1 AND target_nid = $2 AND NOT retired" +
+ " RETURNING invite_event_id"
+
+type inviteStatements struct {
+ insertInviteEventStmt *sql.Stmt
+ selectInviteActiveForUserInRoomStmt *sql.Stmt
+ updateInviteRetiredStmt *sql.Stmt
+}
+
+func (s *inviteStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(inviteSchema)
+ if err != nil {
+ return
+ }
+
+ return statementList{
+ {&s.insertInviteEventStmt, insertInviteEventSQL},
+ {&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL},
+ {&s.updateInviteRetiredStmt, updateInviteRetiredSQL},
+ }.prepare(db)
+}
+
+func (s *inviteStatements) insertInviteEvent(
+ txn *sql.Tx, inviteEventID string, roomNID types.RoomNID,
+ targetUserNID, senderUserNID types.EventStateKeyNID,
+ inviteEventJSON []byte,
+) (bool, error) {
+ result, err := txn.Stmt(s.insertInviteEventStmt).Exec(
+ inviteEventID, roomNID, targetUserNID, senderUserNID, inviteEventJSON,
+ )
+ if err != nil {
+ return false, err
+ }
+ count, err := result.RowsAffected()
+ if err != nil {
+ return false, err
+ }
+ return count != 0, nil
+}
+
+func (s *inviteStatements) updateInviteRetired(
+ txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
+) ([]string, error) {
+ rows, err := txn.Stmt(s.updateInviteRetiredStmt).Query(roomNID, targetUserNID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var result []string
+ for rows.Next() {
+ var inviteEventID string
+ if err := rows.Scan(&inviteEventID); err != nil {
+ return nil, err
+ }
+ result = append(result, inviteEventID)
+ }
+ return result, nil
+}
+
+// selectInviteActiveForUserInRoom returns a list of sender state key NIDs
+func (s *inviteStatements) selectInviteActiveForUserInRoom(
+ targetUserNID types.EventStateKeyNID, roomNID types.RoomNID,
+) ([]types.EventStateKeyNID, error) {
+ rows, err := s.selectInviteActiveForUserInRoomStmt.Query(
+ targetUserNID, roomNID,
+ )
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var result []types.EventStateKeyNID
+ for rows.Next() {
+ var senderUserNID int64
+ if err := rows.Scan(&senderUserNID); err != nil {
+ return nil, err
+ }
+ result = append(result, types.EventStateKeyNID(senderUserNID))
+ }
+ return result, nil
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go
new file mode 100644
index 00000000..725e5b8d
--- /dev/null
+++ b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go
@@ -0,0 +1,111 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+type membershipState int64
+
+const (
+ membershipStateLeaveOrBan membershipState = 1
+ membershipStateInvite membershipState = 2
+ membershipStateJoin membershipState = 3
+)
+
+const membershipSchema = `
+-- The membership table is used to coordinate updates between the invite table
+-- and the room state tables.
+-- This table is updated in one of 3 ways:
+-- 1) The membership of a user changes within the current state of the room.
+-- 2) An invite is received outside of a room over federation.
+-- 3) An invite is rejected outside of a room over federation.
+CREATE TABLE IF NOT EXISTS roomserver_membership (
+ room_nid BIGINT NOT NULL,
+ -- Numeric state key ID for the user ID this state is for.
+ target_nid BIGINT NOT NULL,
+ -- Numeric state key ID for the user ID who changed the state.
+ -- This may be 0 since it is not always possible to identify the user that
+ -- changed the state.
+ sender_nid BIGINT NOT NULL DEFAULT 0,
+ -- The state the user is in within this room.
+ -- Default value is "membershipStateLeaveOrBan"
+ membership_nid BIGINT NOT NULL DEFAULT 1,
+ UNIQUE (room_nid, target_nid)
+);
+`
+
+// Insert a row in to membership table so that it can be locked by the
+// SELECT FOR UPDATE
+const insertMembershipSQL = "" +
+ "INSERT INTO roomserver_membership (room_nid, target_nid)" +
+ " VALUES ($1, $2)" +
+ " ON CONFLICT DO NOTHING"
+
+const selectMembershipForUpdateSQL = "" +
+ "SELECT membership_nid FROM roomserver_membership" +
+ " WHERE room_nid = $1 AND target_nid = $2 FOR UPDATE"
+
+const updateMembershipSQL = "" +
+ "UPDATE roomserver_membership SET sender_nid = $3, membership_nid = $4" +
+ " WHERE room_nid = $1 AND target_nid = $2"
+
+type membershipStatements struct {
+ insertMembershipStmt *sql.Stmt
+ selectMembershipForUpdateStmt *sql.Stmt
+ updateMembershipStmt *sql.Stmt
+}
+
+func (s *membershipStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(membershipSchema)
+ if err != nil {
+ return
+ }
+
+ return statementList{
+ {&s.insertMembershipStmt, insertMembershipSQL},
+ {&s.selectMembershipForUpdateStmt, selectMembershipForUpdateSQL},
+ {&s.updateMembershipStmt, updateMembershipSQL},
+ }.prepare(db)
+}
+
+func (s *membershipStatements) insertMembership(
+ txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
+) error {
+ _, err := txn.Stmt(s.insertMembershipStmt).Exec(roomNID, targetUserNID)
+ return err
+}
+
+func (s *membershipStatements) selectMembershipForUpdate(
+ txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
+) (membership membershipState, err error) {
+ err = txn.Stmt(s.selectMembershipForUpdateStmt).QueryRow(
+ roomNID, targetUserNID,
+ ).Scan(&membership)
+ return
+}
+
+func (s *membershipStatements) updateMembership(
+ txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
+ senderUserNID types.EventStateKeyNID, membership membershipState,
+) error {
+ _, err := txn.Stmt(s.updateMembershipStmt).Exec(
+ roomNID, targetUserNID, senderUserNID, membership,
+ )
+ return err
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go
index ddab2356..a24dbb1d 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go
@@ -28,6 +28,8 @@ type statements struct {
stateBlockStatements
previousEventStatements
roomAliasesStatements
+ inviteStatements
+ membershipStatements
}
func (s *statements) prepare(db *sql.DB) error {
@@ -43,6 +45,8 @@ func (s *statements) prepare(db *sql.DB) error {
s.stateBlockStatements.prepare,
s.previousEventStatements.prepare,
s.roomAliasesStatements.prepare,
+ s.inviteStatements.prepare,
+ s.membershipStatements.prepare,
} {
if err = prepare(db); err != nil {
return err
diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go
index 3f99e7d8..d323fd13 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go
@@ -16,6 +16,7 @@ package storage
import (
"database/sql"
+
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/types"
@@ -64,7 +65,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ
// Assigned a numeric ID for the state_key if there is one present.
// Otherwise set the numeric ID for the state_key to 0.
if eventStateKey != nil {
- if eventStateKeyNID, err = d.assignStateKeyNID(*eventStateKey); err != nil {
+ if eventStateKeyNID, err = d.assignStateKeyNID(nil, *eventStateKey); err != nil {
return 0, types.StateAtEvent{}, err
}
}
@@ -131,15 +132,15 @@ func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, err
return eventTypeNID, err
}
-func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
+func (d *Database) assignStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) {
// Check if we already have a numeric ID in the database.
- eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey)
+ eventStateKeyNID, err := d.statements.selectEventStateKeyNID(txn, eventStateKey)
if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database.
- eventStateKeyNID, err = d.statements.insertEventStateKeyNID(eventStateKey)
+ eventStateKeyNID, err = d.statements.insertEventStateKeyNID(txn, eventStateKey)
if err == sql.ErrNoRows {
// We raced with another insert so run the select again.
- eventStateKeyNID, err = d.statements.selectEventStateKeyNID(eventStateKey)
+ eventStateKeyNID, err = d.statements.selectEventStateKeyNID(txn, eventStateKey)
}
}
return eventStateKeyNID, err
@@ -249,12 +250,15 @@ func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) (types.RoomRe
return nil, err
}
}
- return &roomRecentEventsUpdater{txn, d, stateAndRefs, lastEventIDSent, currentStateSnapshotNID}, nil
+ return &roomRecentEventsUpdater{
+ transaction{txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
+ }, nil
}
type roomRecentEventsUpdater struct {
- txn *sql.Tx
+ transaction
d *Database
+ roomNID types.RoomNID
latestEvents []types.StateAtEventAndReference
lastEventIDSent string
currentStateSnapshotNID types.StateSnapshotNID
@@ -319,14 +323,8 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error
return u.d.statements.updateEventSentToOutput(u.txn, eventNID)
}
-// Commit implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) Commit() error {
- return u.txn.Commit()
-}
-
-// Rollback implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) Rollback() error {
- return u.txn.Rollback()
+func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (types.MembershipUpdater, error) {
+ return u.d.membershipUpdaterTxn(u.txn, u.roomNID, targetUserNID)
}
// RoomNID implements query.RoomserverQueryAPIDB
@@ -381,3 +379,124 @@ func (d *Database) StateEntriesForTuples(
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
}
+
+type membershipUpdater struct {
+ transaction
+ d *Database
+ roomNID types.RoomNID
+ targetUserNID types.EventStateKeyNID
+ membership membershipState
+}
+
+func (d *Database) membershipUpdaterTxn(
+ txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
+) (types.MembershipUpdater, error) {
+
+ if err := d.statements.insertMembership(txn, roomNID, targetUserNID); err != nil {
+ return nil, err
+ }
+
+ membership, err := d.statements.selectMembershipForUpdate(txn, roomNID, targetUserNID)
+ if err != nil {
+ return nil, err
+ }
+
+ return &membershipUpdater{
+ transaction{txn}, d, roomNID, targetUserNID, membership,
+ }, nil
+}
+
+// IsInvite implements types.MembershipUpdater
+func (u *membershipUpdater) IsInvite() bool {
+ return u.membership == membershipStateInvite
+}
+
+// IsJoin implements types.MembershipUpdater
+func (u *membershipUpdater) IsJoin() bool {
+ return u.membership == membershipStateJoin
+}
+
+// IsLeave implements types.MembershipUpdater
+func (u *membershipUpdater) IsLeave() bool {
+ return u.membership == membershipStateLeaveOrBan
+}
+
+// SetToInvite implements types.MembershipUpdater
+func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) {
+ senderUserNID, err := u.d.assignStateKeyNID(u.txn, event.Sender())
+ if err != nil {
+ return false, err
+ }
+ inserted, err := u.d.statements.insertInviteEvent(
+ u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
+ )
+ if err != nil {
+ return false, err
+ }
+ if u.membership != membershipStateInvite {
+ if err = u.d.statements.updateMembership(
+ u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateInvite,
+ ); err != nil {
+ return false, err
+ }
+ }
+ return inserted, nil
+}
+
+// SetToJoin implements types.MembershipUpdater
+func (u *membershipUpdater) SetToJoin(senderUserID string) ([]string, error) {
+ senderUserNID, err := u.d.assignStateKeyNID(u.txn, senderUserID)
+ if err != nil {
+ return nil, err
+ }
+ inviteEventIDs, err := u.d.statements.updateInviteRetired(
+ u.txn, u.roomNID, u.targetUserNID,
+ )
+ if err != nil {
+ return nil, err
+ }
+ if u.membership != membershipStateJoin {
+ if err = u.d.statements.updateMembership(
+ u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateJoin,
+ ); err != nil {
+ return nil, err
+ }
+ }
+ return inviteEventIDs, nil
+}
+
+// SetToLeave implements types.MembershipUpdater
+func (u *membershipUpdater) SetToLeave(senderUserID string) ([]string, error) {
+ senderUserNID, err := u.d.assignStateKeyNID(u.txn, senderUserID)
+ if err != nil {
+ return nil, err
+ }
+ inviteEventIDs, err := u.d.statements.updateInviteRetired(
+ u.txn, u.roomNID, u.targetUserNID,
+ )
+ if err != nil {
+ return nil, err
+ }
+ if u.membership != membershipStateLeaveOrBan {
+ if err = u.d.statements.updateMembership(
+ u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateLeaveOrBan,
+ ); err != nil {
+ return nil, err
+ }
+ }
+ return inviteEventIDs, nil
+}
+
+type transaction struct {
+ txn *sql.Tx
+}
+
+// Commit implements types.Transaction
+func (t *transaction) Commit() error {
+ return t.txn.Commit()
+}
+
+// Rollback implements types.Transaction
+func (t *transaction) Rollback() error {
+ return t.txn.Rollback()
+}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go
index b255b64b..809b6e57 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go
@@ -135,9 +135,17 @@ type StateEntryList struct {
StateEntries []StateEntry
}
+// A Transaction is something that can be committed or rolledback.
+type Transaction interface {
+ // Commit the transaction
+ Commit() error
+ // Rollback the transaction.
+ Rollback() error
+}
+
// A RoomRecentEventsUpdater is used to update the recent events in a room.
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
-// lock on the row holding the latest events for the room.)
+// lock on the row in the rooms table holding the latest events for the room.)
type RoomRecentEventsUpdater interface {
// The latest event IDs and state in the room.
LatestEvents() []StateAtEventAndReference
@@ -163,10 +171,36 @@ type RoomRecentEventsUpdater interface {
HasEventBeenSent(eventNID EventNID) (bool, error)
// Mark the event as having been sent to the output logs.
MarkEventAsSent(eventNID EventNID) error
- // Commit the transaction
- Commit() error
- // Rollback the transaction.
- Rollback() error
+ // Build a membership updater for the target user in this room.
+ // It will share the same transaction as this updater.
+ MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error)
+ // Implements Transaction so it can be committed or rolledback
+ Transaction
+}
+
+// A MembershipUpdater is used to update the membership of a user in a room.
+// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
+// lock on the row in the membership table for this user in the room)
+// The caller should call one of SetToInvite, SetToJoin or SetToLeave once to
+// make the update, or none of them if no update is required.
+type MembershipUpdater interface {
+ // True if the target user is invited to the room before updating.
+ IsInvite() bool
+ // True if the target user is joined to the room before updating.
+ IsJoin() bool
+ // True if the target user is not invited or joined to the room before updating.
+ IsLeave() bool
+ // Set the state to invite.
+ // Returns whether this invite needs to be sent
+ SetToInvite(event gomatrixserverlib.Event) (needsSending bool, err error)
+ // Set the state to join.
+ // Returns a list of invite event IDs that this state change retired.
+ SetToJoin(senderUserID string) (inviteEventIDs []string, err error)
+ // Set the state to leave.
+ // Returns a list of invite event IDs that this state change retired.
+ SetToLeave(senderUserID string) (inviteEventIDs []string, err error)
+ // Implements Transaction so it can be committed or rolledback.
+ Transaction
}
// A MissingEventError is an error that happened because the roomserver was