diff options
author | devonh <devon.dmytro@gmail.com> | 2023-05-17 00:33:27 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-17 00:33:27 +0000 |
commit | 67d68768574a234b733eb3e4061644fc098a69f6 (patch) | |
tree | 819e9a0a150b68181d91112ffc7e0d6030412b77 /syncapi | |
parent | 0489d16f95a3d9f1f5bc532e2060bd2482d7b156 (diff) |
Move MakeJoin logic to GMSL (#3081)
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/routing/context.go | 45 | ||||
-rw-r--r-- | syncapi/routing/filter.go | 15 | ||||
-rw-r--r-- | syncapi/routing/getevent.go | 12 | ||||
-rw-r--r-- | syncapi/routing/memberships.go | 30 | ||||
-rw-r--r-- | syncapi/routing/messages.go | 30 | ||||
-rw-r--r-- | syncapi/routing/relations.go | 5 | ||||
-rw-r--r-- | syncapi/routing/routing.go | 5 | ||||
-rw-r--r-- | syncapi/routing/search.go | 40 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 15 |
9 files changed, 157 insertions, 40 deletions
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 8ff656e7..ac17d39d 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -56,7 +56,10 @@ func Context( ) util.JSONResponse { snapshot, err := syncDB.NewDatabaseSnapshot(req.Context()) if err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } var succeeded bool defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) @@ -87,7 +90,10 @@ func Context( membershipReq := roomserver.QueryMembershipForUserRequest{UserID: device.UserID, RoomID: roomID} if err = rsAPI.QueryMembershipForUser(ctx, &membershipReq, &membershipRes); err != nil { logrus.WithError(err).Error("unable to query membership") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } if !membershipRes.RoomExists { return util.JSONResponse{ @@ -117,7 +123,10 @@ func Context( } } logrus.WithError(err).WithField("eventID", eventID).Error("unable to find requested event") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } // verify the user is allowed to see the context for this room/event @@ -125,7 +134,10 @@ func Context( filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, []*rstypes.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context") if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } logrus.WithFields(logrus.Fields{ "duration": time.Since(startTime), @@ -141,20 +153,29 @@ func Context( eventsBefore, err := snapshot.SelectContextBeforeEvent(ctx, id, roomID, filter) if err != nil && err != sql.ErrNoRows { logrus.WithError(err).Error("unable to fetch before events") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } _, eventsAfter, err := snapshot.SelectContextAfterEvent(ctx, id, roomID, filter) if err != nil && err != sql.ErrNoRows { logrus.WithError(err).Error("unable to fetch after events") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } startTime = time.Now() eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, snapshot, rsAPI, eventsBefore, eventsAfter, device.UserID) if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } logrus.WithFields(logrus.Fields{ @@ -166,7 +187,10 @@ func Context( state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil) if err != nil { logrus.WithError(err).Error("unable to fetch current room state") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } eventsBeforeClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBeforeFiltered), synctypes.FormatAll) @@ -180,7 +204,10 @@ func Context( newState, err = applyLazyLoadMembers(ctx, device, snapshot, roomID, evs, lazyLoadCache) if err != nil { logrus.WithError(err).Error("unable to load membership events") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } } diff --git a/syncapi/routing/filter.go b/syncapi/routing/filter.go index 5152e1f8..c4eecbdb 100644 --- a/syncapi/routing/filter.go +++ b/syncapi/routing/filter.go @@ -43,7 +43,10 @@ func GetFilter( localpart, _, err := gomatrixserverlib.SplitID('@', userID) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } filter := synctypes.DefaultFilter() @@ -83,7 +86,10 @@ func PutFilter( localpart, _, err := gomatrixserverlib.SplitID('@', userID) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } var filter synctypes.Filter @@ -122,7 +128,10 @@ func PutFilter( filterID, err := syncDB.PutFilter(req.Context(), localpart, &filter) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("syncDB.PutFilter failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } return util.JSONResponse{ diff --git a/syncapi/routing/getevent.go b/syncapi/routing/getevent.go index e3d77cc3..0d3d412f 100644 --- a/syncapi/routing/getevent.go +++ b/syncapi/routing/getevent.go @@ -51,13 +51,19 @@ func GetEvent( }) if err != nil { logger.WithError(err).Error("GetEvent: syncDB.NewDatabaseTransaction failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } events, err := db.Events(ctx, []string{eventID}) if err != nil { logger.WithError(err).Error("GetEvent: syncDB.Events failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } // The requested event does not exist in our database @@ -81,7 +87,7 @@ func GetEvent( logger.WithError(err).Error("GetEvent: internal.ApplyHistoryVisibilityFilter failed") return util.JSONResponse{ Code: http.StatusInternalServerError, - JSON: spec.InternalServerError(), + JSON: spec.InternalServerError{}, } } diff --git a/syncapi/routing/memberships.go b/syncapi/routing/memberships.go index 5a66009c..7d2e137d 100644 --- a/syncapi/routing/memberships.go +++ b/syncapi/routing/memberships.go @@ -67,7 +67,10 @@ func GetMemberships( var queryRes api.QueryMembershipForUserResponse if err := rsAPI.QueryMembershipForUser(req.Context(), &queryReq, &queryRes); err != nil { util.GetLogger(req.Context()).WithError(err).Error("rsAPI.QueryMembershipsForRoom failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } if !queryRes.HasBeenInRoom { @@ -86,7 +89,10 @@ func GetMemberships( db, err := syncDB.NewDatabaseSnapshot(req.Context()) if err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } defer db.Rollback() // nolint: errcheck @@ -98,7 +104,10 @@ func GetMemberships( atToken, err = db.EventPositionInTopology(req.Context(), queryRes.EventID) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("unable to get 'atToken'") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } } } @@ -106,13 +115,19 @@ func GetMemberships( eventIDs, err := db.SelectMemberships(req.Context(), roomID, atToken, membership, notMembership) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("db.SelectMemberships failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } qryRes := &api.QueryEventsByIDResponse{} if err := rsAPI.QueryEventsByID(req.Context(), &api.QueryEventsByIDRequest{EventIDs: eventIDs, RoomID: roomID}, qryRes); err != nil { util.GetLogger(req.Context()).WithError(err).Error("rsAPI.QueryEventsByID failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } result := qryRes.Events @@ -124,7 +139,10 @@ func GetMemberships( var content databaseJoinedMember if err := json.Unmarshal(ev.Content(), &content); err != nil { util.GetLogger(req.Context()).WithError(err).Error("failed to unmarshal event content") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } res.Joined[ev.Sender()] = joinedMember(content) } diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 4d3c9e2e..58f663d0 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -81,7 +81,10 @@ func OnIncomingMessagesRequest( // request that requires backfilling from the roomserver or federation. snapshot, err := db.NewDatabaseTransaction(req.Context()) if err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } var succeeded bool defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) @@ -89,7 +92,10 @@ func OnIncomingMessagesRequest( // check if the user has already forgotten about this room membershipResp, err := getMembershipForUser(req.Context(), roomID, device.UserID, rsAPI) if err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } if !membershipResp.RoomExists { return util.JSONResponse{ @@ -151,7 +157,10 @@ func OnIncomingMessagesRequest( from, err = snapshot.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering) if err != nil { logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } } } @@ -173,7 +182,10 @@ func OnIncomingMessagesRequest( to, err = snapshot.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering) if err != nil { logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } } } @@ -232,7 +244,10 @@ func OnIncomingMessagesRequest( clientEvents, start, end, err := mReq.retrieveEvents() if err != nil { util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } util.GetLogger(req.Context()).WithFields(logrus.Fields{ @@ -253,7 +268,10 @@ func OnIncomingMessagesRequest( membershipEvents, err := applyLazyLoadMembers(req.Context(), device, snapshot, roomID, clientEvents, lazyLoadCache) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("failed to apply lazy loading") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } res.State = append(res.State, synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(membershipEvents), synctypes.FormatAll)...) } diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go index 2bf11a56..8374bf5b 100644 --- a/syncapi/routing/relations.go +++ b/syncapi/routing/relations.go @@ -80,7 +80,10 @@ func Relations( snapshot, err := syncDB.NewDatabaseSnapshot(req.Context()) if err != nil { logrus.WithError(err).Error("Failed to get snapshot for relations") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } var succeeded bool defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 88c5c504..9ad0c047 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -162,7 +162,10 @@ func Setup( } var nextBatch *string if err := req.ParseForm(); err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } if req.Form.Has("next_batch") { nb := req.FormValue("next_batch") diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index 986284d0..b7191873 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -55,7 +55,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts if from != nil && *from != "" { nextBatch, err = strconv.Atoi(*from) if err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } } @@ -65,7 +68,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts snapshot, err := syncDB.NewDatabaseSnapshot(req.Context()) if err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } var succeeded bool defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) @@ -73,7 +79,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts // only search rooms the user is actually joined to joinedRooms, err := snapshot.RoomIDsWithMembership(ctx, device.UserID, "join") if err != nil { - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } if len(joinedRooms) == 0 { return util.JSONResponse{ @@ -115,7 +124,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts ) if err != nil { logrus.WithError(err).Error("failed to search fulltext") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } logrus.Debugf("Search took %s", result.Took) @@ -155,7 +167,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts evs, err := syncDB.Events(ctx, wantEvents) if err != nil { logrus.WithError(err).Error("failed to get events from database") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } groups := make(map[string]RoomResult) @@ -173,12 +188,18 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts eventsBefore, eventsAfter, err := contextEvents(ctx, snapshot, event, roomFilter, searchReq) if err != nil { logrus.WithError(err).Error("failed to get context events") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } startToken, endToken, err := getStartEnd(ctx, snapshot, eventsBefore, eventsAfter) if err != nil { logrus.WithError(err).Error("failed to get start/end") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } profileInfos := make(map[string]ProfileInfoResponse) @@ -221,7 +242,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts state, err := snapshot.CurrentState(ctx, event.RoomID(), &stateFilter, nil) if err != nil { logrus.WithError(err).Error("unable to get current state") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } stateForRooms[event.RoomID()] = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(state), synctypes.FormatSync) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 09e5dee1..5a92c70e 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -536,12 +536,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use syncReq, err := newSyncRequest(req, *device, rp.db) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("newSyncRequest failed") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } snapshot, err := rp.db.NewDatabaseSnapshot(req.Context()) if err != nil { logrus.WithError(err).Error("Failed to acquire database snapshot for key change") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } var succeeded bool defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) @@ -552,7 +558,10 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use ) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info") - return spec.InternalServerError() + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } } succeeded = true return util.JSONResponse{ |