aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--clientapi/clientapi.go1
-rw-r--r--clientapi/producers/syncapi.go32
-rw-r--r--clientapi/routing/account_data.go18
-rw-r--r--clientapi/routing/room_tagging.go11
-rw-r--r--userapi/internal/api.go31
-rw-r--r--userapi/producers/syncapi.go11
6 files changed, 30 insertions, 74 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 55735560..080d4d9f 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -48,7 +48,6 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
- TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 0ac63779..5933ce1a 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -21,7 +21,6 @@ import (
"strconv"
"time"
- "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -32,7 +31,6 @@ import (
// SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct {
- TopicClientData string
TopicReceiptEvent string
TopicSendToDeviceEvent string
TopicTypingEvent string
@@ -42,36 +40,6 @@ type SyncAPIProducer struct {
UserAPI userapi.ClientUserAPI
}
-// SendData sends account data to the sync API server
-func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error {
- m := &nats.Msg{
- Subject: p.TopicClientData,
- Header: nats.Header{},
- }
- m.Header.Set(jetstream.UserID, userID)
-
- data := eventutil.AccountData{
- RoomID: roomID,
- Type: dataType,
- ReadMarker: readMarker,
- IgnoredUsers: ignoredUsers,
- }
- var err error
- m.Data, err = json.Marshal(data)
- if err != nil {
- return err
- }
-
- log.WithFields(log.Fields{
- "user_id": userID,
- "room_id": roomID,
- "data_type": dataType,
- }).Tracef("Producing to topic '%s'", p.TopicClientData)
-
- _, err = p.JetStream.PublishMsg(m)
- return err
-}
-
func (p *SyncAPIProducer) SendReceipt(
ctx context.Context,
userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,
diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go
index a5a3014a..0d3a4949 100644
--- a/clientapi/routing/account_data.go
+++ b/clientapi/routing/account_data.go
@@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
@@ -127,18 +126,6 @@ func SaveAccountData(
return util.ErrorResponse(err)
}
- var ignoredUsers *types.IgnoredUsers
- if dataType == "m.ignored_user_list" {
- ignoredUsers = &types.IgnoredUsers{}
- _ = json.Unmarshal(body, ignoredUsers)
- }
-
- // TODO: user API should do this since it's account data
- if err := syncProducer.SendData(userID, roomID, dataType, nil, ignoredUsers); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
- return jsonerror.InternalServerError()
- }
-
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
@@ -191,11 +178,6 @@ func SaveReadMarker(
return util.ErrorResponse(err)
}
- if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r, nil); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
- return jsonerror.InternalServerError()
- }
-
// Handle the read receipt that may be included in the read marker
if r.Read != "" {
return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read)
diff --git a/clientapi/routing/room_tagging.go b/clientapi/routing/room_tagging.go
index 03928956..92b9e665 100644
--- a/clientapi/routing/room_tagging.go
+++ b/clientapi/routing/room_tagging.go
@@ -18,8 +18,6 @@ import (
"encoding/json"
"net/http"
- "github.com/sirupsen/logrus"
-
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
@@ -98,10 +96,6 @@ func PutTag(
return jsonerror.InternalServerError()
}
- if err = syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
- logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
- }
-
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
@@ -150,11 +144,6 @@ func DeleteTag(
return jsonerror.InternalServerError()
}
- // TODO: user API should do this since it's account data
- if err := syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
- logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
- }
-
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
diff --git a/userapi/internal/api.go b/userapi/internal/api.go
index 27ed15a0..422eb076 100644
--- a/userapi/internal/api.go
+++ b/userapi/internal/api.go
@@ -30,11 +30,13 @@ import (
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/userutil"
+ "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ synctypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
@@ -64,7 +66,24 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
if req.DataType == "" {
return fmt.Errorf("data type must not be empty")
}
- return a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData)
+ if err := a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("a.DB.SaveAccountData failed")
+ return fmt.Errorf("failed to save account data: %w", err)
+ }
+ var ignoredUsers *synctypes.IgnoredUsers
+ if req.DataType == "m.ignored_user_list" {
+ ignoredUsers = &synctypes.IgnoredUsers{}
+ _ = json.Unmarshal(req.AccountData, ignoredUsers)
+ }
+ if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
+ RoomID: req.RoomID,
+ Type: req.DataType,
+ IgnoredUsers: ignoredUsers,
+ }); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed")
+ return fmt.Errorf("failed to send account data to output: %w", err)
+ }
+ return nil
}
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
@@ -93,7 +112,9 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P
}
// Inform the SyncAPI about the newly created push_rules
- if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules"); err != nil {
+ if err = a.SyncProducer.SendAccountData(acc.UserID, eventutil.AccountData{
+ Type: "m.push_rules",
+ }); err != nil {
util.GetLogger(ctx).WithFields(logrus.Fields{
"user_id": acc.UserID,
}).WithError(err).Warn("failed to send account data to the SyncAPI")
@@ -732,11 +753,11 @@ func (a *UserInternalAPI) PerformPushRulesPut(
if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil {
return err
}
-
- if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil {
+ if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
+ Type: pushRulesAccountDataType,
+ }); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed")
}
-
return nil
}
diff --git a/userapi/producers/syncapi.go b/userapi/producers/syncapi.go
index 4a206f33..27cfc284 100644
--- a/userapi/producers/syncapi.go
+++ b/userapi/producers/syncapi.go
@@ -34,7 +34,7 @@ func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic stri
}
// SendAccountData sends account data to the Sync API server.
-func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error {
+func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) error {
m := &nats.Msg{
Subject: p.clientDataTopic,
Header: nats.Header{},
@@ -42,18 +42,15 @@ func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string)
m.Header.Set(jetstream.UserID, userID)
var err error
- m.Data, err = json.Marshal(eventutil.AccountData{
- RoomID: roomID,
- Type: dataType,
- })
+ m.Data, err = json.Marshal(data)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
- "room_id": roomID,
- "data_type": dataType,
+ "room_id": data.RoomID,
+ "data_type": data.Type,
}).Tracef("Producing to topic '%s'", p.clientDataTopic)
_, err = p.producer.PublishMsg(m)