aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--federationapi/consumers/roomserver.go120
-rw-r--r--federationapi/federationapi.go4
-rw-r--r--roomserver/api/api.go1
3 files changed, 107 insertions, 18 deletions
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index a4273362..d16af662 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -18,6 +18,10 @@ import (
"context"
"encoding/json"
"fmt"
+ "strconv"
+ "time"
+
+ syncAPITypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
@@ -34,14 +38,16 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- ctx context.Context
- cfg *config.FederationAPI
- rsAPI api.FederationRoomserverAPI
- jetstream nats.JetStreamContext
- durable string
- db storage.Database
- queues *queue.OutgoingQueues
- topic string
+ ctx context.Context
+ cfg *config.FederationAPI
+ rsAPI api.FederationRoomserverAPI
+ jetstream nats.JetStreamContext
+ natsClient *nats.Conn
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ topic string
+ topicPresence string
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -49,19 +55,22 @@ func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.FederationAPI,
js nats.JetStreamContext,
+ natsClient *nats.Conn,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI api.FederationRoomserverAPI,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
- ctx: process.Context(),
- cfg: cfg,
- jetstream: js,
- db: store,
- queues: queues,
- rsAPI: rsAPI,
- durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ ctx: process.Context(),
+ cfg: cfg,
+ jetstream: js,
+ natsClient: natsClient,
+ db: store,
+ queues: queues,
+ rsAPI: rsAPI,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ topicPresence: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence),
}
}
@@ -146,6 +155,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rewritesState bool) error {
+
addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
// Ask the roomserver and add in the rest of the results into the set.
@@ -184,6 +194,14 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
return err
}
+ // If we added new hosts, inform them about our known presence events for this room
+ if len(addsJoinedHosts) > 0 && ore.Event.Type() == gomatrixserverlib.MRoomMember && ore.Event.StateKey() != nil {
+ membership, _ := ore.Event.Membership()
+ if membership == gomatrixserverlib.Join {
+ s.sendPresence(ore.Event.RoomID(), addsJoinedHosts)
+ }
+ }
+
if oldJoinedHosts == nil {
// This means that there is nothing to update as this is a duplicate
// message.
@@ -213,6 +231,76 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
)
}
+func (s *OutputRoomEventConsumer) sendPresence(roomID string, addedJoined []types.JoinedHost) {
+ joined := make([]gomatrixserverlib.ServerName, len(addedJoined))
+ for _, added := range addedJoined {
+ joined = append(joined, added.ServerName)
+ }
+
+ // get our locally joined users
+ var queryRes api.QueryMembershipsForRoomResponse
+ err := s.rsAPI.QueryMembershipsForRoom(s.ctx, &api.QueryMembershipsForRoomRequest{
+ JoinedOnly: true,
+ LocalOnly: true,
+ RoomID: roomID,
+ }, &queryRes)
+ if err != nil {
+ log.WithError(err).Error("failed to calculate joined rooms for user")
+ return
+ }
+
+ // send every presence we know about to the remote server
+ content := types.Presence{}
+ for _, ev := range queryRes.JoinEvents {
+ msg := nats.NewMsg(s.topicPresence)
+ msg.Header.Set(jetstream.UserID, ev.Sender)
+
+ var presence *nats.Msg
+ presence, err = s.natsClient.RequestMsg(msg, time.Second*10)
+ if err != nil {
+ log.WithError(err).Errorf("unable to get presence")
+ continue
+ }
+
+ statusMsg := presence.Header.Get("status_msg")
+ e := presence.Header.Get("error")
+ if e != "" {
+ continue
+ }
+ var lastActive int
+ lastActive, err = strconv.Atoi(presence.Header.Get("last_active_ts"))
+ if err != nil {
+ continue
+ }
+
+ p := syncAPITypes.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
+
+ content.Push = append(content.Push, types.PresenceContent{
+ CurrentlyActive: p.CurrentlyActive(),
+ LastActiveAgo: p.LastActiveAgo(),
+ Presence: presence.Header.Get("presence"),
+ StatusMsg: &statusMsg,
+ UserID: ev.Sender,
+ })
+ }
+
+ if len(content.Push) == 0 {
+ return
+ }
+
+ edu := &gomatrixserverlib.EDU{
+ Type: gomatrixserverlib.MPresence,
+ Origin: string(s.cfg.Matrix.ServerName),
+ }
+ if edu.Content, err = json.Marshal(content); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return
+ }
+ if err := s.queues.SendEDU(edu, s.cfg.Matrix.ServerName, joined); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ }
+}
+
// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because:
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 202da6c5..e35b9c7f 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -118,7 +118,7 @@ func NewInternalAPI(
stats := statistics.NewStatistics(federationDB, cfg.FederationMaxRetries+1)
- js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, nats := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,
@@ -132,7 +132,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
- base.ProcessContext, cfg, js, queues,
+ base.ProcessContext, cfg, js, nats, queues,
federationDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
diff --git a/roomserver/api/api.go b/roomserver/api/api.go
index a1373a62..01e87ec8 100644
--- a/roomserver/api/api.go
+++ b/roomserver/api/api.go
@@ -177,6 +177,7 @@ type FederationRoomserverAPI interface {
QueryBulkStateContentAPI
// QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs.
QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error
+ QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
QueryRoomVersionForRoom(ctx context.Context, req *QueryRoomVersionForRoomRequest, res *QueryRoomVersionForRoomResponse) error
GetRoomIDForAlias(ctx context.Context, req *GetRoomIDForAliasRequest, res *GetRoomIDForAliasResponse) error
QueryEventsByID(ctx context.Context, req *QueryEventsByIDRequest, res *QueryEventsByIDResponse) error