diff options
Diffstat (limited to 'federationapi/routing/send.go')
-rw-r--r-- | federationapi/routing/send.go | 55 |
1 files changed, 48 insertions, 7 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index d43ed832..1dd874d4 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -34,27 +34,55 @@ import ( "github.com/sirupsen/logrus" ) +const ( + // Event was passed to the roomserver + MetricsOutcomeOK = "ok" + // Event failed to be processed + MetricsOutcomeFail = "fail" + // Event failed auth checks + MetricsOutcomeRejected = "rejected" + // Terminated the transaction + MetricsOutcomeFatal = "fatal" + // The event has missing auth_events we need to fetch + MetricsWorkMissingAuthEvents = "missing_auth_events" + // No work had to be done as we had all prev/auth events + MetricsWorkDirect = "direct" + // The event has missing prev_events we need to call /g_m_e for + MetricsWorkMissingPrevEvents = "missing_prev_events" +) + var ( pduCountTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "dendrite", Subsystem: "federationapi", Name: "recv_pdus", + Help: "Number of incoming PDUs from remote servers with labels for success", }, - []string{"status"}, + []string{"status"}, // 'success' or 'total' ) eduCountTotal = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "dendrite", Subsystem: "federationapi", Name: "recv_edus", + Help: "Number of incoming EDUs from remote servers", + }, + ) + processEventSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "dendrite", + Subsystem: "federationapi", + Name: "process_event", + Help: "How long it takes to process an incoming event and what work had to be done for it", }, + []string{"work", "outcome"}, ) ) func init() { prometheus.MustRegister( - pduCountTotal, eduCountTotal, + pduCountTotal, eduCountTotal, processEventSummary, ) } @@ -140,6 +168,7 @@ type txnReq struct { // new events which the roomserver does not know about newEvents map[string]bool newEventsMutex sync.RWMutex + work string // metrics } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -212,6 +241,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res // Process the events. for _, e := range pdus { + evStart := time.Now() if err := t.processEvent(ctx, e.Unwrap()); err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -233,17 +263,25 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res // our server so we should bail processing the transaction entirely. util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) jsonErr := util.ErrorResponse(err) + processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) return nil, &jsonErr } else { // Auth errors mean the event is 'rejected' which have to be silent to appease sytest errMsg := "" + outcome := MetricsOutcomeRejected _, rejected := err.(*gomatrixserverlib.NotAllowed) if !rejected { errMsg = err.Error() + outcome = MetricsOutcomeFail } util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( "Failed to process incoming federation event, skipping", ) + processEventSummary.WithLabelValues(t.work, outcome).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) results[e.EventID()] = gomatrixserverlib.PDUResult{ Error: errMsg, } @@ -251,6 +289,9 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } else { results[e.EventID()] = gomatrixserverlib.PDUResult{} pduCountTotal.WithLabelValues("success").Inc() + processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) } } @@ -452,6 +493,7 @@ func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserver func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) + t.work = "" // reset from previous event // Work out if the roomserver knows everything it needs to know to auth // the event. This includes the prev_events and auth_events. @@ -480,6 +522,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e } if len(stateResp.MissingAuthEventIDs) > 0 { + t.work = MetricsWorkMissingAuthEvents logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs)) if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil { return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err) @@ -487,9 +530,11 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e } if len(stateResp.MissingPrevEventIDs) > 0 { + t.work = MetricsWorkMissingPrevEvents logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs)) return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion) } + t.work = MetricsWorkDirect // pass the event to the roomserver which will do auth checks // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently @@ -784,7 +829,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event queryReq := api.QueryEventsByIDRequest{ EventIDs: missingEventList, } - util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList) + util.GetLogger(ctx).WithField("count", len(missingEventList)).Infof("Fetching missing auth events") var queryRes api.QueryEventsByIDResponse if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { return nil @@ -854,10 +899,6 @@ retryAllowedState: }, nil } -// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should -// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events -// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns. -// This means that we may recursively call this function, as we spider back up prev_events. func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, err error) { logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e}) |