aboutsummaryrefslogtreecommitdiff
path: root/userapi/consumers
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-11-02 11:18:11 +0100
committerGitHub <noreply@github.com>2022-11-02 10:18:11 +0000
commit86b25a6337b1ef99125b662feb9adcd3703b8f73 (patch)
tree0c6429a6943f92e59a5b14a4c317eb2c2bb4e97d /userapi/consumers
parentb367cfeddf89456e7d067df8262ff5579fcbb9a1 (diff)
Add message stats to reporting (#2748)
Since we're now listening on the `OutputRoomEvent` stream, we are able to store messages stats.
Diffstat (limited to 'userapi/consumers')
-rw-r--r--userapi/consumers/roomserver.go78
-rw-r--r--userapi/consumers/roomserver_test.go123
2 files changed, 201 insertions, 0 deletions
diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go
index a1287694..97c17e18 100644
--- a/userapi/consumers/roomserver.go
+++ b/userapi/consumers/roomserver.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
+ "sync"
"time"
"github.com/matrix-org/gomatrixserverlib"
@@ -23,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
+ userAPITypes "github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/dendrite/userapi/util"
)
@@ -36,6 +38,11 @@ type OutputRoomEventConsumer struct {
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
+ msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats
+ roomCounts map[gomatrixserverlib.ServerName]map[string]bool // map from serverName to map from rommID to "isEncrypted"
+ lastUpdate time.Time
+ countsLock sync.Mutex
+ serverName gomatrixserverlib.ServerName
}
func NewOutputRoomEventConsumer(
@@ -57,6 +64,11 @@ func NewOutputRoomEventConsumer(
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
+ msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
+ roomCounts: map[gomatrixserverlib.ServerName]map[string]bool{},
+ lastUpdate: time.Now(),
+ countsLock: sync.Mutex{},
+ serverName: cfg.Matrix.ServerName,
}
}
@@ -88,6 +100,10 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
+ if s.cfg.Matrix.ReportStats.Enabled {
+ go s.storeMessageStats(ctx, event.Type(), event.Sender(), event.RoomID())
+ }
+
log.WithFields(log.Fields{
"event_id": event.EventID(),
"event_type": event.Type(),
@@ -107,6 +123,68 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
+func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventType, eventSender, roomID string) {
+ s.countsLock.Lock()
+ defer s.countsLock.Unlock()
+
+ // reset the roomCounts on a day change
+ if s.lastUpdate.Day() != time.Now().Day() {
+ s.roomCounts[s.serverName] = make(map[string]bool)
+ s.lastUpdate = time.Now()
+ }
+
+ _, sender, err := gomatrixserverlib.SplitID('@', eventSender)
+ if err != nil {
+ return
+ }
+ msgCount := s.msgCounts[s.serverName]
+ roomCount := s.roomCounts[s.serverName]
+ if roomCount == nil {
+ roomCount = make(map[string]bool)
+ }
+ switch eventType {
+ case "m.room.message":
+ roomCount[roomID] = false
+ msgCount.Messages++
+ if sender == s.serverName {
+ msgCount.SentMessages++
+ }
+ case "m.room.encrypted":
+ roomCount[roomID] = true
+ msgCount.MessagesE2EE++
+ if sender == s.serverName {
+ msgCount.SentMessagesE2EE++
+ }
+ default:
+ return
+ }
+ s.msgCounts[s.serverName] = msgCount
+ s.roomCounts[s.serverName] = roomCount
+
+ for serverName, stats := range s.msgCounts {
+ var normalRooms, encryptedRooms int64 = 0, 0
+ for _, isEncrypted := range s.roomCounts[s.serverName] {
+ if isEncrypted {
+ encryptedRooms++
+ } else {
+ normalRooms++
+ }
+ }
+ err := s.db.UpsertDailyRoomsMessages(ctx, serverName, stats, normalRooms, encryptedRooms)
+ if err != nil {
+ log.WithError(err).Errorf("failed to upsert daily messages")
+ }
+ // Clear stats if we successfully stored it
+ if err == nil {
+ stats.Messages = 0
+ stats.SentMessages = 0
+ stats.MessagesE2EE = 0
+ stats.SentMessagesE2EE = 0
+ s.msgCounts[serverName] = stats
+ }
+ }
+}
+
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil {
diff --git a/userapi/consumers/roomserver_test.go b/userapi/consumers/roomserver_test.go
index e4587670..265e3a3a 100644
--- a/userapi/consumers/roomserver_test.go
+++ b/userapi/consumers/roomserver_test.go
@@ -2,7 +2,10 @@ package consumers
import (
"context"
+ "reflect"
+ "sync"
"testing"
+ "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
@@ -12,6 +15,7 @@ import (
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/dendrite/userapi/storage"
+ userAPITypes "github.com/matrix-org/dendrite/userapi/types"
)
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
@@ -132,3 +136,122 @@ func Test_evaluatePushRules(t *testing.T) {
}
})
}
+
+func TestMessageStats(t *testing.T) {
+ type args struct {
+ eventType string
+ eventSender string
+ roomID string
+ }
+ tests := []struct {
+ name string
+ args args
+ ourServer gomatrixserverlib.ServerName
+ lastUpdate time.Time
+ initRoomCounts map[gomatrixserverlib.ServerName]map[string]bool
+ wantStats userAPITypes.MessageStats
+ }{
+ {
+ name: "m.room.create does not count as a message",
+ ourServer: "localhost",
+ args: args{
+ eventType: "m.room.create",
+ eventSender: "@alice:localhost",
+ },
+ },
+ {
+ name: "our server - message",
+ ourServer: "localhost",
+ args: args{
+ eventType: "m.room.message",
+ eventSender: "@alice:localhost",
+ roomID: "normalRoom",
+ },
+ wantStats: userAPITypes.MessageStats{Messages: 1, SentMessages: 1},
+ },
+ {
+ name: "our server - E2EE message",
+ ourServer: "localhost",
+ args: args{
+ eventType: "m.room.encrypted",
+ eventSender: "@alice:localhost",
+ roomID: "encryptedRoom",
+ },
+ wantStats: userAPITypes.MessageStats{Messages: 1, SentMessages: 1, MessagesE2EE: 1, SentMessagesE2EE: 1},
+ },
+
+ {
+ name: "remote server - message",
+ ourServer: "localhost",
+ args: args{
+ eventType: "m.room.message",
+ eventSender: "@alice:remote",
+ roomID: "normalRoom",
+ },
+ wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 1, SentMessagesE2EE: 1},
+ },
+ {
+ name: "remote server - E2EE message",
+ ourServer: "localhost",
+ args: args{
+ eventType: "m.room.encrypted",
+ eventSender: "@alice:remote",
+ roomID: "encryptedRoom",
+ },
+ wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 2, SentMessagesE2EE: 1},
+ },
+ {
+ name: "day change creates a new room map",
+ ourServer: "localhost",
+ lastUpdate: time.Now().Add(-time.Hour * 24),
+ initRoomCounts: map[gomatrixserverlib.ServerName]map[string]bool{
+ "localhost": {"encryptedRoom": true},
+ },
+ args: args{
+ eventType: "m.room.encrypted",
+ eventSender: "@alice:remote",
+ roomID: "someOtherRoom",
+ },
+ wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 3, SentMessagesE2EE: 1},
+ },
+ }
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ db, close := mustCreateDatabase(t, dbType)
+ defer close()
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.lastUpdate.IsZero() {
+ tt.lastUpdate = time.Now()
+ }
+ if tt.initRoomCounts == nil {
+ tt.initRoomCounts = map[gomatrixserverlib.ServerName]map[string]bool{}
+ }
+ s := &OutputRoomEventConsumer{
+ db: db,
+ msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
+ roomCounts: tt.initRoomCounts,
+ countsLock: sync.Mutex{},
+ lastUpdate: tt.lastUpdate,
+ serverName: tt.ourServer,
+ }
+ s.storeMessageStats(context.Background(), tt.args.eventType, tt.args.eventSender, tt.args.roomID)
+ t.Logf("%+v", s.roomCounts)
+ gotStats, activeRooms, activeE2EERooms, err := db.DailyRoomsMessages(context.Background(), tt.ourServer)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if !reflect.DeepEqual(gotStats, tt.wantStats) {
+ t.Fatalf("expected %+v, got %+v", tt.wantStats, gotStats)
+ }
+ if tt.args.eventType == "m.room.encrypted" && activeE2EERooms != 1 {
+ t.Fatalf("expected room to be activeE2EE")
+ }
+ if tt.args.eventType == "m.room.message" && activeRooms != 1 {
+ t.Fatalf("expected room to be active")
+ }
+ })
+ }
+ })
+}