aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/roomserver.go')
-rw-r--r--syncapi/consumers/roomserver.go61
1 files changed, 38 insertions, 23 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 399f67ba..1d47b73a 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -23,8 +23,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@@ -32,19 +32,23 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- cfg *config.SyncAPI
- rsAPI api.RoomserverInternalAPI
- rsConsumer *internal.ContinualConsumer
- db storage.Database
- notifier *sync.Notifier
+ cfg *config.SyncAPI
+ rsAPI api.RoomserverInternalAPI
+ rsConsumer *internal.ContinualConsumer
+ db storage.Database
+ pduStream types.StreamProvider
+ inviteStream types.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
- n *sync.Notifier,
store storage.Database,
+ notifier *notifier.Notifier,
+ pduStream types.StreamProvider,
+ inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
@@ -55,11 +59,13 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
- cfg: cfg,
- rsConsumer: &consumer,
- db: store,
- notifier: n,
- rsAPI: rsAPI,
+ cfg: cfg,
+ rsConsumer: &consumer,
+ db: store,
+ notifier: notifier,
+ pduStream: pduStream,
+ inviteStream: inviteStream,
+ rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@@ -180,7 +186,8 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
- s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
+ s.pduStream.Advance(pduPos)
+ s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@@ -219,7 +226,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
- s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
+ s.pduStream.Advance(pduPos)
+ s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@@ -274,7 +282,10 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
+
+ s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
+
return nil
}
@@ -290,9 +301,11 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}).Panicf("roomserver output log: remove invite failure")
return nil
}
+
// Notify any active sync requests that the invite has been retired.
- // Invites share the same stream counter as PDUs
+ s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
+
return nil
}
@@ -307,12 +320,13 @@ func (s *OutputRoomEventConsumer) onNewPeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
+
// tell the notifier about the new peek so it knows to wake up new devices
- s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID)
+ // TODO: This only works because the peeks table is reusing the same
+ // index as PDUs, but we should fix this
+ s.pduStream.Advance(sp)
+ s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
- // we need to wake up the users who might need to now be peeking into this room,
- // so we send in a dummy event to trigger a wakeup
- s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}
@@ -327,12 +341,13 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
+
// tell the notifier about the new peek so it knows to wake up new devices
- s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
+ // TODO: This only works because the peeks table is reusing the same
+ // index as PDUs, but we should fix this
+ s.pduStream.Advance(sp)
+ s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
- // we need to wake up the users who might need to now be peeking into this room,
- // so we send in a dummy event to trigger a wakeup
- s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}