aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go21
-rw-r--r--syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go3
-rw-r--r--syncapi/storage/postgres/invites_table.go12
-rw-r--r--syncapi/storage/postgres/memberships_table.go5
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go35
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go5
6 files changed, 41 insertions, 40 deletions
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index c2a870c0..0cc96373 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -24,6 +24,7 @@ import (
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -274,7 +275,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *synctypes.StateFilter,
excludeEventIDs []string,
-) ([]*gomatrixserverlib.HeaderedEvent, error) {
+) ([]*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
senders, notSenders := getSendersStateFilterFilter(stateFilter)
// We're going to query members later, so remove them from this request
@@ -320,7 +321,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
+ event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@@ -378,8 +379,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -394,8 +395,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
return events, nil
}
-func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
- result := []*gomatrixserverlib.HeaderedEvent{}
+func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
+ result := []*rstypes.HeaderedEvent{}
for rows.Next() {
var eventID string
var eventBytes []byte
@@ -403,8 +404,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, &ev)
@@ -414,7 +415,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
-) (*gomatrixserverlib.HeaderedEvent, error) {
+) (*rstypes.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@@ -424,7 +425,7 @@ func (s *currentRoomStateStatements) SelectStateEvent(
if err != nil {
return nil, err
}
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
index d68ed8d5..37660ee9 100644
--- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
@@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
+ "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -79,7 +80,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom
defer rows.Close() // nolint: errcheck
var eventBytes []byte
var roomID string
- var event gomatrixserverlib.HeaderedEvent
+ var event types.HeaderedEvent
var hisVis gomatrixserverlib.HistoryVisibility
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
for rows.Next() {
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index 151bffa5..267209bb 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -22,9 +22,9 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
const inviteEventsSchema = `
@@ -89,7 +89,7 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
}
func (s *inviteEventsStatements) InsertInviteEvent(
- ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
var headeredJSON []byte
headeredJSON, err = json.Marshal(inviteEvent)
@@ -119,7 +119,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
-) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
+) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
var lastPos types.StreamPosition
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
@@ -127,8 +127,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
return nil, nil, lastPos, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
- result := map[string]*gomatrixserverlib.HeaderedEvent{}
- retired := map[string]*gomatrixserverlib.HeaderedEvent{}
+ result := map[string]*rstypes.HeaderedEvent{}
+ retired := map[string]*rstypes.HeaderedEvent{}
for rows.Next() {
var (
id types.StreamPosition
@@ -151,7 +151,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
continue
}
- var event *gomatrixserverlib.HeaderedEvent
+ var event *rstypes.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, lastPos, err
}
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index 47833893..3905f9ab 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -19,9 +19,8 @@ import (
"database/sql"
"fmt"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -100,7 +99,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
}
func (s *membershipsStatements) UpsertMembership(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 8ee5098c..3aadbccf 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -264,7 +265,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
}.Prepare(db)
}
-func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error {
+func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
@@ -329,8 +330,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@@ -375,7 +376,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
+ event *rstypes.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
) (streamPos types.StreamPosition, err error) {
var txnID *string
@@ -465,8 +466,8 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -577,7 +578,7 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
-func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
+func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt rstypes.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
@@ -595,7 +596,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (evts []*rstypes.HeaderedEvent, err error) {
senders, notSenders := getSendersRoomEventFilter(filter)
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
@@ -612,7 +613,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
@@ -630,7 +631,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
-) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
+) (lastID int, evts []*rstypes.HeaderedEvent, err error) {
senders, notSenders := getSendersRoomEventFilter(filter)
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
@@ -647,7 +648,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
for rows.Next() {
var (
eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ evt *rstypes.HeaderedEvent
historyVisibility gomatrixserverlib.HistoryVisibility
)
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
@@ -680,8 +681,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- var ev gomatrixserverlib.HeaderedEvent
- if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ var ev rstypes.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -709,7 +710,7 @@ func (s *outputRoomEventsStatements) PurgeEvents(
return err
}
-func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
if err != nil {
return nil, err
@@ -718,14 +719,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
var eventID string
var id int64
- result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ result := make(map[int64]rstypes.HeaderedEvent)
for rows.Next() {
- var ev gomatrixserverlib.HeaderedEvent
+ var ev rstypes.HeaderedEvent
var eventBytes []byte
if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
return nil, err
}
- if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ if err = json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result[id] = ev
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index 2382fca5..7140a92f 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -18,10 +18,9 @@ import (
"context"
"database/sql"
- "github.com/matrix-org/gomatrixserverlib"
-
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -105,7 +104,7 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
// InsertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
- ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition,
) (topoPos types.StreamPosition, err error) {
err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,