aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--federationapi/routing/send.go86
1 files changed, 50 insertions, 36 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 99022074..e62855ab 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -74,28 +74,17 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q containing %d PDUs, %d EDUs", txnID, len(t.PDUs), len(t.EDUs))
resp, err := t.processTransaction()
- switch err.(type) {
- // No error? Great! Send back a 200.
- case nil:
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: resp,
- }
- // Handle known error cases as we will return a 400 error for these.
- case roomNotFoundError:
- case unmarshalError:
- case verifySigError:
- // Handle unknown error cases. Sending 500 errors back should be a last
- // resort as this can make other homeservers back off sending federation
- // events.
- default:
+ if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed")
- return jsonerror.InternalServerError()
}
- // Return a 400 error for bad requests as fallen through from above.
+
+ // https://matrix.org/docs/spec/server_server/r0.1.3#put-matrix-federation-v1-send-txnid
+ // Status code 200:
+ // The result of processing the transaction. The server is to use this response
+ // even in the event of one or more PDUs failing to be processed.
return util.JSONResponse{
- Code: http.StatusBadRequest,
- JSON: jsonerror.BadJSON(err.Error()),
+ Code: http.StatusOK,
+ JSON: resp,
}
}
@@ -135,30 +124,39 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
}
if err := json.Unmarshal(pdu, &header); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event")
- return nil, unmarshalError{err}
+ // We don't know the event ID at this point so we can't return the
+ // failure in the PDU results
+ continue
}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.rsAPI.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID)
- return nil, roomNotFoundError{verReq.RoomID}
+ // We don't know the event ID at this point so we can't return the
+ // failure in the PDU results
+ continue
}
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion)
if err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
- return nil, unmarshalError{err}
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: err.Error(),
+ }
+ continue
}
- if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
+ if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
- return nil, verifySigError{event.EventID(), err}
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: err.Error(),
+ }
+ continue
}
pdus = append(pdus, event.Headered(verRes.RoomVersion))
}
// Process the events.
for _, e := range pdus {
- err := t.processEvent(e.Unwrap(), true)
- if err != nil {
+ if err := t.processEvent(e.Unwrap(), true); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
@@ -174,20 +172,17 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
- switch err.(type) {
- case roomNotFoundError:
- case *gomatrixserverlib.NotAllowed:
- case missingPrevEventsError:
- default:
- util.GetLogger(t.context).Warnf("Processing %s failed: %s", e.EventID(), err)
+ if isProcessingErrorFatal(err) {
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
+ util.GetLogger(t.context).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
return nil, err
+ } else {
+ util.GetLogger(t.context).WithError(err).WithField("event_id", e.EventID()).Warn("Failed to process incoming federation event, skipping")
+ results[e.EventID()] = gomatrixserverlib.PDUResult{
+ Error: err.Error(),
+ }
}
- results[e.EventID()] = gomatrixserverlib.PDUResult{
- Error: err.Error(),
- }
- util.GetLogger(t.context).WithError(err).WithField("event_id", e.EventID()).Warn("Failed to process incoming federation event, skipping it.")
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
}
@@ -198,6 +193,25 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
+// isProcessingErrorFatal returns true if the error is really bad and
+// we should stop processing the transaction, and returns false if it
+// is just some less serious error about a specific event.
+func isProcessingErrorFatal(err error) bool {
+ switch err.(type) {
+ case roomNotFoundError:
+ case *gomatrixserverlib.NotAllowed:
+ case missingPrevEventsError:
+ default:
+ switch err {
+ case context.Canceled:
+ case context.DeadlineExceeded:
+ default:
+ return true
+ }
+ }
+ return false
+}
+
type roomNotFoundError struct {
roomID string
}