aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/roomserver.go')
-rw-r--r--syncapi/consumers/roomserver.go29
1 files changed, 15 insertions, 14 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 8c83e688..1e87aee9 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -17,16 +17,12 @@ package consumers
import (
"context"
"database/sql"
+ "encoding/base64"
"encoding/json"
+ "errors"
"fmt"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/gomatrixserverlib/spec"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
- log "github.com/sirupsen/logrus"
- "github.com/tidwall/gjson"
-
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -38,6 +34,11 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib/spec"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+ log "github.com/sirupsen/logrus"
+ "github.com/tidwall/gjson"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
@@ -141,7 +142,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
)
}
if err != nil {
- log.WithError(err).Error("roomserver output log: failed to process event")
+ if errors.As(err, new(base64.CorruptInputError)) {
+ // no matter how often we retry this event, we will always get this error, discard the event
+ return true
+ }
+ log.WithFields(log.Fields{
+ "type": output.Type,
+ }).WithError(err).Error("roomserver output log: failed to process event")
+ sentry.CaptureException(err)
return false
}
@@ -237,21 +245,18 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ev, err := s.updateStateEvent(ev)
if err != nil {
- sentry.CaptureException(err)
return err
}
for i := range addsStateEvents {
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
if err != nil {
- sentry.CaptureException(err)
return err
}
}
if msg.RewritesState {
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
- sentry.CaptureException(err)
return fmt.Errorf("s.db.PurgeRoom: %w", err)
}
}
@@ -289,7 +294,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
- sentry.CaptureException(err)
return err
}
@@ -430,7 +434,6 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
- sentry.CaptureException(err)
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": msg.Event.EventID(),
@@ -452,7 +455,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
// It's possible we just haven't heard of this invite yet, so
// we should not panic if we try to retire it.
if err != nil && err != sql.ErrNoRows {
- sentry.CaptureException(err)
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": msg.EventID,
@@ -496,7 +498,6 @@ func (s *OutputRoomEventConsumer) onNewPeek(
) {
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil {
- sentry.CaptureException(err)
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
log.ErrorKey: err,