aboutsummaryrefslogtreecommitdiff
path: root/userapi/consumers/roomserver.go
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2023-04-19 15:50:33 +0100
committerGitHub <noreply@github.com>2023-04-19 15:50:33 +0100
commit72285b2659a31ebd52c91799c17105d81d996f40 (patch)
tree1855395f5efdc3ea6051dd502882bf62aaa57e7c /userapi/consumers/roomserver.go
parent9fa39263c0a4a8d349c8715f6ba30cae30b1b73a (diff)
refactor: update GMSL (#3058)
Sister PR to https://github.com/matrix-org/gomatrixserverlib/pull/364 Read this commit by commit to avoid going insane.
Diffstat (limited to 'userapi/consumers/roomserver.go')
-rw-r--r--userapi/consumers/roomserver.go45
1 files changed, 23 insertions, 22 deletions
diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go
index 6704658d..834964ce 100644
--- a/userapi/consumers/roomserver.go
+++ b/userapi/consumers/roomserver.go
@@ -13,6 +13,7 @@ import (
"github.com/tidwall/gjson"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
@@ -43,11 +44,11 @@ type OutputRoomEventConsumer struct {
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
- msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats
- roomCounts map[gomatrixserverlib.ServerName]map[string]bool // map from serverName to map from rommID to "isEncrypted"
+ msgCounts map[spec.ServerName]userAPITypes.MessageStats
+ roomCounts map[spec.ServerName]map[string]bool // map from serverName to map from rommID to "isEncrypted"
lastUpdate time.Time
countsLock sync.Mutex
- serverName gomatrixserverlib.ServerName
+ serverName spec.ServerName
}
func NewOutputRoomEventConsumer(
@@ -69,8 +70,8 @@ func NewOutputRoomEventConsumer(
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
- msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
- roomCounts: map[gomatrixserverlib.ServerName]map[string]bool{},
+ msgCounts: map[spec.ServerName]userAPITypes.MessageStats{},
+ roomCounts: map[spec.ServerName]map[string]bool{},
lastUpdate: time.Now(),
countsLock: sync.Mutex{},
serverName: cfg.Matrix.ServerName,
@@ -119,7 +120,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
- if err := s.processMessage(ctx, event, uint64(gomatrixserverlib.AsTimestamp(metadata.Timestamp))); err != nil {
+ if err := s.processMessage(ctx, event, uint64(spec.AsTimestamp(metadata.Timestamp))); err != nil {
log.WithFields(log.Fields{
"event_id": event.EventID(),
}).WithError(err).Errorf("userapi consumer: process room event failure")
@@ -210,7 +211,7 @@ func (s *OutputRoomEventConsumer) handleRoomUpgrade(ctx context.Context, oldRoom
return nil
}
-func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID string, localpart string, serverName gomatrixserverlib.ServerName) error {
+func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID string, localpart string, serverName spec.ServerName) error {
pushRules, err := s.db.QueryPushRules(ctx, localpart, serverName)
if err != nil {
return fmt.Errorf("failed to query pushrules for user: %w", err)
@@ -238,7 +239,7 @@ func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID,
}
// updateMDirect copies the "is_direct" flag from oldRoomID to newROomID
-func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName gomatrixserverlib.ServerName, roomSize int) error {
+func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName, roomSize int) error {
// this is most likely not a DM, so skip updating m.direct state
if roomSize > 2 {
return nil
@@ -280,7 +281,7 @@ func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID,
return nil
}
-func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName gomatrixserverlib.ServerName) error {
+func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName) error {
tag, err := s.db.GetAccountDataByType(ctx, localpart, serverName, oldRoomID, "m.tag")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
@@ -298,14 +299,14 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gom
}
switch {
- case event.Type() == gomatrixserverlib.MRoomMember:
+ case event.Type() == spec.MRoomMember:
cevent := synctypes.HeaderedToClientEvent(event, synctypes.FormatAll)
var member *localMembership
member, err = newLocalMembership(&cevent)
if err != nil {
return fmt.Errorf("newLocalMembership: %w", err)
}
- if member.Membership == gomatrixserverlib.Invite && member.Domain == s.cfg.Matrix.ServerName {
+ if member.Membership == spec.Invite && member.Domain == s.cfg.Matrix.ServerName {
// localRoomMembers only adds joined members. An invite
// should also be pushed to the target user.
members = append(members, member)
@@ -356,7 +357,7 @@ type localMembership struct {
gomatrixserverlib.MemberContent
UserID string
Localpart string
- Domain gomatrixserverlib.ServerName
+ Domain spec.ServerName
}
func newLocalMembership(event *synctypes.ClientEvent) (*localMembership, error) {
@@ -418,7 +419,7 @@ func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID s
log.WithError(err).Errorf("Parsing MemberContent")
continue
}
- if member.Membership != gomatrixserverlib.Join {
+ if member.Membership != spec.Join {
continue
}
if member.Domain != s.cfg.Matrix.ServerName {
@@ -436,7 +437,7 @@ func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID s
// m.room.canonical_alias is consulted. Returns an empty string if the
// room has no name.
func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) {
- if event.Type() == gomatrixserverlib.MRoomName {
+ if event.Type() == spec.MRoomName {
name, err := unmarshalRoomName(event)
if err != nil {
return "", err
@@ -461,7 +462,7 @@ func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixs
return unmarshalRoomName(eventS)
}
- if event.Type() == gomatrixserverlib.MRoomCanonicalAlias {
+ if event.Type() == spec.MRoomCanonicalAlias {
alias, err := unmarshalCanonicalAlias(event)
if err != nil {
return "", err
@@ -480,8 +481,8 @@ func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixs
}
var (
- canonicalAliasTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias}
- roomNameTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomName}
+ canonicalAliasTuple = gomatrixserverlib.StateKeyTuple{EventType: spec.MRoomCanonicalAlias}
+ roomNameTuple = gomatrixserverlib.StateKeyTuple{EventType: spec.MRoomName}
)
func unmarshalRoomName(event *gomatrixserverlib.HeaderedEvent) (string, error) {
@@ -539,7 +540,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatr
// to "work", but they only use a single device.
ProfileTag: profileTag,
RoomID: event.RoomID(),
- TS: gomatrixserverlib.AsTimestamp(time.Now()),
+ TS: spec.AsTimestamp(time.Now()),
}
if err = s.db.InsertNotification(ctx, mem.Localpart, mem.Domain, event.EventID(), streamPos, tweaks, n); err != nil {
return fmt.Errorf("s.db.InsertNotification: %w", err)
@@ -684,7 +685,7 @@ func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, err
req := &rsapi.QueryLatestEventsAndStateRequest{
RoomID: rse.roomID,
StateToFetch: []gomatrixserverlib.StateKeyTuple{
- {EventType: gomatrixserverlib.MRoomPowerLevels},
+ {EventType: spec.MRoomPowerLevels},
},
}
var res rsapi.QueryLatestEventsAndStateResponse
@@ -692,7 +693,7 @@ func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, err
return false, err
}
for _, ev := range res.StateEvents {
- if ev.Type() != gomatrixserverlib.MRoomPowerLevels {
+ if ev.Type() != spec.MRoomPowerLevels {
continue
}
@@ -707,7 +708,7 @@ func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, err
// localPushDevices pushes to the configured devices of a local
// user. The map keys are [url][format].
-func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpart string, serverName gomatrixserverlib.ServerName, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) {
+func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpart string, serverName spec.ServerName, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) {
pusherDevices, err := util.GetPushDevices(ctx, localpart, serverName, tweaks, s.db)
if err != nil {
return nil, "", fmt.Errorf("util.GetPushDevices: %w", err)
@@ -805,7 +806,7 @@ func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatri
}
// deleteRejectedPushers deletes the pushers associated with the given devices.
-func (s *OutputRoomEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string, serverName gomatrixserverlib.ServerName) {
+func (s *OutputRoomEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string, serverName spec.ServerName) {
log.WithFields(log.Fields{
"localpart": localpart,
"app_id0": devices[0].AppID,