diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-11-02 11:18:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-02 10:18:11 +0000 |
commit | 86b25a6337b1ef99125b662feb9adcd3703b8f73 (patch) | |
tree | 0c6429a6943f92e59a5b14a4c317eb2c2bb4e97d /userapi/consumers | |
parent | b367cfeddf89456e7d067df8262ff5579fcbb9a1 (diff) |
Add message stats to reporting (#2748)
Since we're now listening on the `OutputRoomEvent` stream, we are able
to store messages stats.
Diffstat (limited to 'userapi/consumers')
-rw-r--r-- | userapi/consumers/roomserver.go | 78 | ||||
-rw-r--r-- | userapi/consumers/roomserver_test.go | 123 |
2 files changed, 201 insertions, 0 deletions
diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index a1287694..97c17e18 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "github.com/matrix-org/gomatrixserverlib" @@ -23,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage/tables" + userAPITypes "github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/dendrite/userapi/util" ) @@ -36,6 +38,11 @@ type OutputRoomEventConsumer struct { topic string pgClient pushgateway.Client syncProducer *producers.SyncAPI + msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats + roomCounts map[gomatrixserverlib.ServerName]map[string]bool // map from serverName to map from rommID to "isEncrypted" + lastUpdate time.Time + countsLock sync.Mutex + serverName gomatrixserverlib.ServerName } func NewOutputRoomEventConsumer( @@ -57,6 +64,11 @@ func NewOutputRoomEventConsumer( pgClient: pgClient, rsAPI: rsAPI, syncProducer: syncProducer, + msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{}, + roomCounts: map[gomatrixserverlib.ServerName]map[string]bool{}, + lastUpdate: time.Now(), + countsLock: sync.Mutex{}, + serverName: cfg.Matrix.ServerName, } } @@ -88,6 +100,10 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms return true } + if s.cfg.Matrix.ReportStats.Enabled { + go s.storeMessageStats(ctx, event.Type(), event.Sender(), event.RoomID()) + } + log.WithFields(log.Fields{ "event_id": event.EventID(), "event_type": event.Type(), @@ -107,6 +123,68 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms return true } +func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventType, eventSender, roomID string) { + s.countsLock.Lock() + defer s.countsLock.Unlock() + + // reset the roomCounts on a day change + if s.lastUpdate.Day() != time.Now().Day() { + s.roomCounts[s.serverName] = make(map[string]bool) + s.lastUpdate = time.Now() + } + + _, sender, err := gomatrixserverlib.SplitID('@', eventSender) + if err != nil { + return + } + msgCount := s.msgCounts[s.serverName] + roomCount := s.roomCounts[s.serverName] + if roomCount == nil { + roomCount = make(map[string]bool) + } + switch eventType { + case "m.room.message": + roomCount[roomID] = false + msgCount.Messages++ + if sender == s.serverName { + msgCount.SentMessages++ + } + case "m.room.encrypted": + roomCount[roomID] = true + msgCount.MessagesE2EE++ + if sender == s.serverName { + msgCount.SentMessagesE2EE++ + } + default: + return + } + s.msgCounts[s.serverName] = msgCount + s.roomCounts[s.serverName] = roomCount + + for serverName, stats := range s.msgCounts { + var normalRooms, encryptedRooms int64 = 0, 0 + for _, isEncrypted := range s.roomCounts[s.serverName] { + if isEncrypted { + encryptedRooms++ + } else { + normalRooms++ + } + } + err := s.db.UpsertDailyRoomsMessages(ctx, serverName, stats, normalRooms, encryptedRooms) + if err != nil { + log.WithError(err).Errorf("failed to upsert daily messages") + } + // Clear stats if we successfully stored it + if err == nil { + stats.Messages = 0 + stats.SentMessages = 0 + stats.MessagesE2EE = 0 + stats.SentMessagesE2EE = 0 + s.msgCounts[serverName] = stats + } + } +} + func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error { members, roomSize, err := s.localRoomMembers(ctx, event.RoomID()) if err != nil { diff --git a/userapi/consumers/roomserver_test.go b/userapi/consumers/roomserver_test.go index e4587670..265e3a3a 100644 --- a/userapi/consumers/roomserver_test.go +++ b/userapi/consumers/roomserver_test.go @@ -2,7 +2,10 @@ package consumers import ( "context" + "reflect" + "sync" "testing" + "time" "github.com/matrix-org/gomatrixserverlib" "github.com/stretchr/testify/assert" @@ -12,6 +15,7 @@ import ( "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/userapi/storage" + userAPITypes "github.com/matrix-org/dendrite/userapi/types" ) func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { @@ -132,3 +136,122 @@ func Test_evaluatePushRules(t *testing.T) { } }) } + +func TestMessageStats(t *testing.T) { + type args struct { + eventType string + eventSender string + roomID string + } + tests := []struct { + name string + args args + ourServer gomatrixserverlib.ServerName + lastUpdate time.Time + initRoomCounts map[gomatrixserverlib.ServerName]map[string]bool + wantStats userAPITypes.MessageStats + }{ + { + name: "m.room.create does not count as a message", + ourServer: "localhost", + args: args{ + eventType: "m.room.create", + eventSender: "@alice:localhost", + }, + }, + { + name: "our server - message", + ourServer: "localhost", + args: args{ + eventType: "m.room.message", + eventSender: "@alice:localhost", + roomID: "normalRoom", + }, + wantStats: userAPITypes.MessageStats{Messages: 1, SentMessages: 1}, + }, + { + name: "our server - E2EE message", + ourServer: "localhost", + args: args{ + eventType: "m.room.encrypted", + eventSender: "@alice:localhost", + roomID: "encryptedRoom", + }, + wantStats: userAPITypes.MessageStats{Messages: 1, SentMessages: 1, MessagesE2EE: 1, SentMessagesE2EE: 1}, + }, + + { + name: "remote server - message", + ourServer: "localhost", + args: args{ + eventType: "m.room.message", + eventSender: "@alice:remote", + roomID: "normalRoom", + }, + wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 1, SentMessagesE2EE: 1}, + }, + { + name: "remote server - E2EE message", + ourServer: "localhost", + args: args{ + eventType: "m.room.encrypted", + eventSender: "@alice:remote", + roomID: "encryptedRoom", + }, + wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 2, SentMessagesE2EE: 1}, + }, + { + name: "day change creates a new room map", + ourServer: "localhost", + lastUpdate: time.Now().Add(-time.Hour * 24), + initRoomCounts: map[gomatrixserverlib.ServerName]map[string]bool{ + "localhost": {"encryptedRoom": true}, + }, + args: args{ + eventType: "m.room.encrypted", + eventSender: "@alice:remote", + roomID: "someOtherRoom", + }, + wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 3, SentMessagesE2EE: 1}, + }, + } + + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, close := mustCreateDatabase(t, dbType) + defer close() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.lastUpdate.IsZero() { + tt.lastUpdate = time.Now() + } + if tt.initRoomCounts == nil { + tt.initRoomCounts = map[gomatrixserverlib.ServerName]map[string]bool{} + } + s := &OutputRoomEventConsumer{ + db: db, + msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{}, + roomCounts: tt.initRoomCounts, + countsLock: sync.Mutex{}, + lastUpdate: tt.lastUpdate, + serverName: tt.ourServer, + } + s.storeMessageStats(context.Background(), tt.args.eventType, tt.args.eventSender, tt.args.roomID) + t.Logf("%+v", s.roomCounts) + gotStats, activeRooms, activeE2EERooms, err := db.DailyRoomsMessages(context.Background(), tt.ourServer) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(gotStats, tt.wantStats) { + t.Fatalf("expected %+v, got %+v", tt.wantStats, gotStats) + } + if tt.args.eventType == "m.room.encrypted" && activeE2EERooms != 1 { + t.Fatalf("expected room to be activeE2EE") + } + if tt.args.eventType == "m.room.message" && activeRooms != 1 { + t.Fatalf("expected room to be active") + } + }) + } + }) +} |