aboutsummaryrefslogtreecommitdiff
path: root/federationapi/routing/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/routing/send.go')
-rw-r--r--federationapi/routing/send.go55
1 files changed, 48 insertions, 7 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index d43ed832..1dd874d4 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -34,27 +34,55 @@ import (
"github.com/sirupsen/logrus"
)
+const (
+ // Event was passed to the roomserver
+ MetricsOutcomeOK = "ok"
+ // Event failed to be processed
+ MetricsOutcomeFail = "fail"
+ // Event failed auth checks
+ MetricsOutcomeRejected = "rejected"
+ // Terminated the transaction
+ MetricsOutcomeFatal = "fatal"
+ // The event has missing auth_events we need to fetch
+ MetricsWorkMissingAuthEvents = "missing_auth_events"
+ // No work had to be done as we had all prev/auth events
+ MetricsWorkDirect = "direct"
+ // The event has missing prev_events we need to call /g_m_e for
+ MetricsWorkMissingPrevEvents = "missing_prev_events"
+)
+
var (
pduCountTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_pdus",
+ Help: "Number of incoming PDUs from remote servers with labels for success",
},
- []string{"status"},
+ []string{"status"}, // 'success' or 'total'
)
eduCountTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_edus",
+ Help: "Number of incoming EDUs from remote servers",
+ },
+ )
+ processEventSummary = prometheus.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationapi",
+ Name: "process_event",
+ Help: "How long it takes to process an incoming event and what work had to be done for it",
},
+ []string{"work", "outcome"},
)
)
func init() {
prometheus.MustRegister(
- pduCountTotal, eduCountTotal,
+ pduCountTotal, eduCountTotal, processEventSummary,
)
}
@@ -140,6 +168,7 @@ type txnReq struct {
// new events which the roomserver does not know about
newEvents map[string]bool
newEventsMutex sync.RWMutex
+ work string // metrics
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@@ -212,6 +241,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
// Process the events.
for _, e := range pdus {
+ evStart := time.Now()
if err := t.processEvent(ctx, e.Unwrap()); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
@@ -233,17 +263,25 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
// our server so we should bail processing the transaction entirely.
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
jsonErr := util.ErrorResponse(err)
+ processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
+ float64(time.Since(evStart).Nanoseconds()) / 1000.,
+ )
return nil, &jsonErr
} else {
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := ""
+ outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected {
errMsg = err.Error()
+ outcome = MetricsOutcomeFail
}
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping",
)
+ processEventSummary.WithLabelValues(t.work, outcome).Observe(
+ float64(time.Since(evStart).Nanoseconds()) / 1000.,
+ )
results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg,
}
@@ -251,6 +289,9 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
pduCountTotal.WithLabelValues("success").Inc()
+ processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
+ float64(time.Since(evStart).Nanoseconds()) / 1000.,
+ )
}
}
@@ -452,6 +493,7 @@ func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserver
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
+ t.work = "" // reset from previous event
// Work out if the roomserver knows everything it needs to know to auth
// the event. This includes the prev_events and auth_events.
@@ -480,6 +522,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
}
if len(stateResp.MissingAuthEventIDs) > 0 {
+ t.work = MetricsWorkMissingAuthEvents
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil {
return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err)
@@ -487,9 +530,11 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
}
if len(stateResp.MissingPrevEventIDs) > 0 {
+ t.work = MetricsWorkMissingPrevEvents
logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs))
return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion)
}
+ t.work = MetricsWorkDirect
// pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
@@ -784,7 +829,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
queryReq := api.QueryEventsByIDRequest{
EventIDs: missingEventList,
}
- util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
+ util.GetLogger(ctx).WithField("count", len(missingEventList)).Infof("Fetching missing auth events")
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil
@@ -854,10 +899,6 @@ retryAllowedState:
}, nil
}
-// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should
-// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
-// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
-// This means that we may recursively call this function, as we spider back up prev_events.
func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, err error) {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e})