aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres/send_to_device_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres/send_to_device_table.go')
-rw-r--r--syncapi/storage/postgres/send_to_device_table.go12
1 files changed, 8 insertions, 4 deletions
diff --git a/syncapi/storage/postgres/send_to_device_table.go b/syncapi/storage/postgres/send_to_device_table.go
index be9c347b..ac60989c 100644
--- a/syncapi/storage/postgres/send_to_device_table.go
+++ b/syncapi/storage/postgres/send_to_device_table.go
@@ -49,6 +49,7 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device (
const insertSendToDeviceMessageSQL = `
INSERT INTO syncapi_send_to_device (user_id, device_id, content)
VALUES ($1, $2, $3)
+ RETURNING id
`
const countSendToDeviceMessagesSQL = `
@@ -107,8 +108,8 @@ func NewPostgresSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) {
func (s *sendToDeviceStatements) InsertSendToDeviceMessage(
ctx context.Context, txn *sql.Tx, userID, deviceID, content string,
-) (err error) {
- _, err = sqlutil.TxStmt(txn, s.insertSendToDeviceMessageStmt).ExecContext(ctx, userID, deviceID, content)
+) (pos types.StreamPosition, err error) {
+ err = sqlutil.TxStmt(txn, s.insertSendToDeviceMessageStmt).QueryRowContext(ctx, userID, deviceID, content).Scan(&pos)
return
}
@@ -124,7 +125,7 @@ func (s *sendToDeviceStatements) CountSendToDeviceMessages(
func (s *sendToDeviceStatements) SelectSendToDeviceMessages(
ctx context.Context, txn *sql.Tx, userID, deviceID string,
-) (events []types.SendToDeviceEvent, err error) {
+) (lastPos types.StreamPosition, events []types.SendToDeviceEvent, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectSendToDeviceMessagesStmt).QueryContext(ctx, userID, deviceID)
if err != nil {
return
@@ -152,9 +153,12 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages(
}
}
events = append(events, event)
+ if types.StreamPosition(id) > lastPos {
+ lastPos = types.StreamPosition(id)
+ }
}
- return events, rows.Err()
+ return lastPos, events, rows.Err()
}
func (s *sendToDeviceStatements) UpdateSentSendToDeviceMessages(