aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-12 12:10:08 +0100
committerGitHub <noreply@github.com>2020-06-12 12:10:08 +0100
commit4675e1ddb6a48fe1425032dc4f3cef56cbde7243 (patch)
treec1f900ab4bf0a4fadc7ca81ba614977f4a973f29
parent079d8fe8fb521f76fee3bff5b47482d5fb911257 (diff)
Add trace logging to RoomserverInternalAPI (#1120)
This is a wrapper around whatever impl we have which then logs the function name/request/response/error. Also tweak when we log on kafka streams: only log on the producer side not the consumer side: we've never had issues with comms and having 1 message rather than N would be nice.
-rw-r--r--clientapi/producers/syncapi.go9
-rw-r--r--cmd/dendrite-monolith-server/main.go13
-rw-r--r--eduserver/input/input.go16
-rw-r--r--federationsender/consumers/roomserver.go5
-rw-r--r--roomserver/api/api_trace.go218
-rw-r--r--roomserver/internal/input.go16
-rw-r--r--syncapi/consumers/roomserver.go5
7 files changed, 261 insertions, 21 deletions
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 244a61dc..375b1eee 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -17,9 +17,9 @@ package producers
import (
"encoding/json"
- "github.com/matrix-org/dendrite/internal"
-
"github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/internal"
+ log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces events for the sync API server to consume
@@ -44,6 +44,11 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
m.Topic = string(p.Topic)
m.Key = sarama.StringEncoder(userID)
m.Value = sarama.ByteEncoder(value)
+ log.WithFields(log.Fields{
+ "user_id": userID,
+ "room_id": roomID,
+ "data_type": dataType,
+ }).Infof("Producing to topic '%s'", p.Topic)
_, _, err = p.Producer.SendMessage(&m)
return err
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index 195a1ac5..2d538027 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -17,6 +17,7 @@ package main
import (
"flag"
"net/http"
+ "os"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/eduserver"
@@ -28,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver"
+ "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/serverkeyapi"
"github.com/sirupsen/logrus"
@@ -39,6 +41,7 @@ var (
certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS")
keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS")
enableHTTPAPIs = flag.Bool("api", false, "Use HTTP APIs instead of short-circuiting (warning: exposes API endpoints!)")
+ traceInternal = os.Getenv("DENDRITE_TRACE_INTERNAL") == "1"
)
func main() {
@@ -72,14 +75,18 @@ func main() {
}
keyRing := serverKeyAPI.KeyRing()
- rsComponent := roomserver.NewInternalAPI(
+ rsAPI := roomserver.NewInternalAPI(
base, keyRing, federation,
)
- rsAPI := rsComponent
if base.UseHTTPAPIs {
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
rsAPI = base.RoomserverHTTPClient()
}
+ if traceInternal {
+ rsAPI = &api.RoomserverInternalAPITrace{
+ Impl: rsAPI,
+ }
+ }
eduInputAPI := eduserver.NewInternalAPI(
base, cache.New(), deviceDB,
@@ -102,7 +109,7 @@ func main() {
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
fsAPI = base.FederationSenderHTTPClient()
}
- rsComponent.SetFederationSenderAPI(fsAPI)
+ rsAPI.SetFederationSenderAPI(fsAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil {
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
index 0bbf5b84..6eafce42 100644
--- a/eduserver/input/input.go
+++ b/eduserver/input/input.go
@@ -97,6 +97,11 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
if err != nil {
return err
}
+ logrus.WithFields(logrus.Fields{
+ "room_id": ite.RoomID,
+ "user_id": ite.UserID,
+ "typing": ite.Typing,
+ }).Infof("Producing to topic '%s'", t.OutputTypingEventTopic)
m := &sarama.ProducerMessage{
Topic: string(t.OutputTypingEventTopic),
@@ -132,6 +137,11 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
devices = append(devices, ise.DeviceID)
}
+ logrus.WithFields(logrus.Fields{
+ "user_id": ise.UserID,
+ "num_devices": len(devices),
+ "type": ise.Type,
+ }).Infof("Producing to topic '%s'", t.OutputSendToDeviceEventTopic)
for _, device := range devices {
ote := &api.OutputSendToDeviceEvent{
UserID: ise.UserID,
@@ -139,12 +149,6 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
SendToDeviceEvent: ise.SendToDeviceEvent,
}
- logrus.WithFields(logrus.Fields{
- "user_id": ise.UserID,
- "device_id": ise.DeviceID,
- "event_type": ise.Type,
- }).Info("handling send-to-device message")
-
eventJSON, err := json.Marshal(ote)
if err != nil {
logrus.WithError(err).Error("sendToDevice failed json.Marshal")
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index a15937f9..299c7b37 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -86,11 +86,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
switch output.Type {
case api.OutputTypeNewRoomEvent:
ev := &output.NewRoomEvent.Event
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_id": ev.RoomID(),
- "send_as_server": output.NewRoomEvent.SendAsServer,
- }).Info("received room event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go
new file mode 100644
index 00000000..a478eeb9
--- /dev/null
+++ b/roomserver/api/api_trace.go
@@ -0,0 +1,218 @@
+package api
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+
+ fsAPI "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/util"
+)
+
+// RoomserverInternalAPITrace wraps a RoomserverInternalAPI and logs the
+// complete request/response/error
+type RoomserverInternalAPITrace struct {
+ Impl RoomserverInternalAPI
+}
+
+func (t *RoomserverInternalAPITrace) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {
+ t.Impl.SetFederationSenderAPI(fsAPI)
+}
+
+func (t *RoomserverInternalAPITrace) InputRoomEvents(
+ ctx context.Context,
+ req *InputRoomEventsRequest,
+ res *InputRoomEventsResponse,
+) error {
+ err := t.Impl.InputRoomEvents(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("InputRoomEvents req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) PerformJoin(
+ ctx context.Context,
+ req *PerformJoinRequest,
+ res *PerformJoinResponse,
+) error {
+ err := t.Impl.PerformJoin(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("PerformJoin req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) PerformLeave(
+ ctx context.Context,
+ req *PerformLeaveRequest,
+ res *PerformLeaveResponse,
+) error {
+ err := t.Impl.PerformLeave(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("PerformLeave req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryLatestEventsAndState(
+ ctx context.Context,
+ req *QueryLatestEventsAndStateRequest,
+ res *QueryLatestEventsAndStateResponse,
+) error {
+ err := t.Impl.QueryLatestEventsAndState(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryLatestEventsAndState req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryStateAfterEvents(
+ ctx context.Context,
+ req *QueryStateAfterEventsRequest,
+ res *QueryStateAfterEventsResponse,
+) error {
+ err := t.Impl.QueryStateAfterEvents(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryStateAfterEvents req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryEventsByID(
+ ctx context.Context,
+ req *QueryEventsByIDRequest,
+ res *QueryEventsByIDResponse,
+) error {
+ err := t.Impl.QueryEventsByID(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryEventsByID req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryMembershipForUser(
+ ctx context.Context,
+ req *QueryMembershipForUserRequest,
+ res *QueryMembershipForUserResponse,
+) error {
+ err := t.Impl.QueryMembershipForUser(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryMembershipForUser req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryMembershipsForRoom(
+ ctx context.Context,
+ req *QueryMembershipsForRoomRequest,
+ res *QueryMembershipsForRoomResponse,
+) error {
+ err := t.Impl.QueryMembershipsForRoom(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryMembershipsForRoom req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryServerAllowedToSeeEvent(
+ ctx context.Context,
+ req *QueryServerAllowedToSeeEventRequest,
+ res *QueryServerAllowedToSeeEventResponse,
+) error {
+ err := t.Impl.QueryServerAllowedToSeeEvent(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryServerAllowedToSeeEvent req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryMissingEvents(
+ ctx context.Context,
+ req *QueryMissingEventsRequest,
+ res *QueryMissingEventsResponse,
+) error {
+ err := t.Impl.QueryMissingEvents(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryMissingEvents req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryStateAndAuthChain(
+ ctx context.Context,
+ req *QueryStateAndAuthChainRequest,
+ res *QueryStateAndAuthChainResponse,
+) error {
+ err := t.Impl.QueryStateAndAuthChain(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryStateAndAuthChain req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) PerformBackfill(
+ ctx context.Context,
+ req *PerformBackfillRequest,
+ res *PerformBackfillResponse,
+) error {
+ err := t.Impl.PerformBackfill(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("PerformBackfill req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryRoomVersionCapabilities(
+ ctx context.Context,
+ req *QueryRoomVersionCapabilitiesRequest,
+ res *QueryRoomVersionCapabilitiesResponse,
+) error {
+ err := t.Impl.QueryRoomVersionCapabilities(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryRoomVersionCapabilities req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) QueryRoomVersionForRoom(
+ ctx context.Context,
+ req *QueryRoomVersionForRoomRequest,
+ res *QueryRoomVersionForRoomResponse,
+) error {
+ err := t.Impl.QueryRoomVersionForRoom(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("QueryRoomVersionForRoom req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) SetRoomAlias(
+ ctx context.Context,
+ req *SetRoomAliasRequest,
+ res *SetRoomAliasResponse,
+) error {
+ err := t.Impl.SetRoomAlias(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("SetRoomAlias req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) GetRoomIDForAlias(
+ ctx context.Context,
+ req *GetRoomIDForAliasRequest,
+ res *GetRoomIDForAliasResponse,
+) error {
+ err := t.Impl.GetRoomIDForAlias(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("GetRoomIDForAlias req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) GetAliasesForRoomID(
+ ctx context.Context,
+ req *GetAliasesForRoomIDRequest,
+ res *GetAliasesForRoomIDResponse,
+) error {
+ err := t.Impl.GetAliasesForRoomID(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("GetAliasesForRoomID req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) GetCreatorIDForAlias(
+ ctx context.Context,
+ req *GetCreatorIDForAliasRequest,
+ res *GetCreatorIDForAliasResponse,
+) error {
+ err := t.Impl.GetCreatorIDForAlias(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("GetCreatorIDForAlias req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func (t *RoomserverInternalAPITrace) RemoveRoomAlias(
+ ctx context.Context,
+ req *RemoveRoomAliasRequest,
+ res *RemoveRoomAliasResponse,
+) error {
+ err := t.Impl.RemoveRoomAlias(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("RemoveRoomAlias req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
+func js(thing interface{}) string {
+ b, err := json.Marshal(thing)
+ if err != nil {
+ return fmt.Sprintf("Marshal error:%s", err)
+ }
+ return string(b)
+}
diff --git a/roomserver/internal/input.go b/roomserver/internal/input.go
index 932b4df4..e863af95 100644
--- a/roomserver/internal/input.go
+++ b/roomserver/internal/input.go
@@ -21,6 +21,7 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/roomserver/api"
+ log "github.com/sirupsen/logrus"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
)
@@ -40,6 +41,21 @@ func (r *RoomserverInternalAPI) WriteOutputEvents(roomID string, updates []api.O
if err != nil {
return err
}
+ logger := log.WithFields(log.Fields{
+ "room_id": roomID,
+ "type": updates[i].Type,
+ })
+ if updates[i].NewRoomEvent != nil {
+ logger = logger.WithFields(log.Fields{
+ "event_type": updates[i].NewRoomEvent.Event.Type(),
+ "event_id": updates[i].NewRoomEvent.Event.EventID(),
+ "adds_state": len(updates[i].NewRoomEvent.AddsStateEventIDs),
+ "removes_state": len(updates[i].NewRoomEvent.RemovesStateEventIDs),
+ "send_as_server": updates[i].NewRoomEvent.SendAsServer,
+ "sender": updates[i].NewRoomEvent.Event.Sender(),
+ })
+ }
+ logger.Infof("Producing to topic '%s'", r.OutputRoomEventTopic)
messages[i] = &sarama.ProducerMessage{
Topic: r.OutputRoomEventTopic,
Key: sarama.StringEncoder(roomID),
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 13597682..98be5bb7 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -98,11 +98,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_id": ev.RoomID(),
- "room_version": ev.RoomVersion,
- }).Info("received event from roomserver")
addsStateEvents := msg.AddsState()