aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-03-24 15:46:17 +0000
committerGitHub <noreply@github.com>2020-03-24 15:46:17 +0000
commit0b732d6f45dc96041a85c227812ea0b53b19af68 (patch)
tree8bf5d3977b89fbb21d225895c45883767d52c143 /appservice
parent951b5d5e6895209c1940bbad04a793b147b2648a (diff)
Use HeaderedEvents in appservice component (#939)
* App service HeaderedEvents * Fix database queries * Fix lint error
Diffstat (limited to 'appservice')
-rw-r--r--appservice/consumers/roomserver.go22
-rw-r--r--appservice/storage/interface.go4
-rw-r--r--appservice/storage/postgres/appservice_events_table.go14
-rw-r--r--appservice/storage/postgres/storage.go6
-rw-r--r--appservice/storage/sqlite3/appservice_events_table.go14
-rw-r--r--appservice/storage/sqlite3/storage.go6
-rw-r--r--appservice/workers/transaction_scheduler.go7
7 files changed, 38 insertions, 35 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 9180d9ef..6ae58e85 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -101,11 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"type": ev.Type(),
}).Info("appservice received an event from roomserver")
- missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event)
+ missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil {
return err
}
- events := append(missingEvents, ev.Event)
+ events := append(missingEvents, ev)
// Send event to any relevant application services
return s.filterRoomserverEvents(context.TODO(), events)
@@ -114,19 +114,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// lookupMissingStateEvents looks up the state events that are added by a new event,
// and returns any not already present.
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
- addsStateEventIDs []string, event gomatrixserverlib.Event,
-) ([]gomatrixserverlib.Event, error) {
+ addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
+) ([]gomatrixserverlib.HeaderedEvent, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
- return []gomatrixserverlib.Event{}, nil
+ return []gomatrixserverlib.HeaderedEvent{}, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
- return []gomatrixserverlib.Event{}, nil
+ return []gomatrixserverlib.HeaderedEvent{}, nil
}
- result := []gomatrixserverlib.Event{}
+ result := []gomatrixserverlib.HeaderedEvent{}
missing := []string{}
for _, id := range addsStateEventIDs {
if id != event.EventID() {
@@ -143,9 +143,7 @@ func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
return nil, err
}
- for _, headeredEvent := range eventResp.Events {
- result = append(result, headeredEvent.Event)
- }
+ result = append(result, eventResp.Events...)
return result, nil
}
@@ -157,7 +155,7 @@ func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
// application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context,
- events []gomatrixserverlib.Event,
+ events []gomatrixserverlib.HeaderedEvent,
) error {
for _, ws := range s.workerStates {
for _, event := range events {
@@ -180,7 +178,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
-func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool {
+func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
// No reason to queue events if they'll never be sent to the application
// service
if appservice.URL == "" {
diff --git a/appservice/storage/interface.go b/appservice/storage/interface.go
index 4b75ff68..25d35af6 100644
--- a/appservice/storage/interface.go
+++ b/appservice/storage/interface.go
@@ -21,8 +21,8 @@ import (
)
type Database interface {
- StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error
- GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error)
+ StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error
+ GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error)
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
diff --git a/appservice/storage/postgres/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go
index d72faeea..d33a83b1 100644
--- a/appservice/storage/postgres/appservice_events_table.go
+++ b/appservice/storage/postgres/appservice_events_table.go
@@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS appservice_events (
-- The ID of the application service the event will be sent to
as_id TEXT NOT NULL,
-- JSON representation of the event
- event_json TEXT NOT NULL,
+ headered_event_json TEXT NOT NULL,
-- The ID of the transaction that this event is a part of
txn_id BIGINT NOT NULL
);
@@ -42,14 +42,14 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
- "SELECT id, event_json, txn_id " +
+ "SELECT id, headered_event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" +
- "INSERT INTO appservice_events(as_id, event_json, txn_id) " +
+ "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
"VALUES ($1, $2, $3)"
const updateTxnIDForEventsSQL = "" +
@@ -107,7 +107,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
limit int,
) (
txnID, maxID int,
- events []gomatrixserverlib.Event,
+ events []gomatrixserverlib.HeaderedEvent,
eventsRemaining bool,
err error,
) {
@@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
return
}
-func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
+func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
@@ -141,7 +141,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
// new ones. Send back those events first.
lastTxnID := invalidTxnID
for eventsProcessed := 0; eventRows.Next(); {
- var event gomatrixserverlib.Event
+ var event gomatrixserverlib.HeaderedEvent
var eventJSON []byte
var id int
err = eventRows.Scan(
@@ -209,7 +209,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID(
func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
- event *gomatrixserverlib.Event,
+ event *gomatrixserverlib.HeaderedEvent,
) (err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go
index c4756468..ef92db87 100644
--- a/appservice/storage/postgres/storage.go
+++ b/appservice/storage/postgres/storage.go
@@ -52,12 +52,12 @@ func (d *Database) prepare() error {
return d.txnID.prepare(d.db)
}
-// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
+// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
- event *gomatrixserverlib.Event,
+ event *gomatrixserverlib.HeaderedEvent,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
@@ -68,7 +68,7 @@ func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
-) (int, int, []gomatrixserverlib.Event, bool, error) {
+) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go
index 846f09f7..479f2213 100644
--- a/appservice/storage/sqlite3/appservice_events_table.go
+++ b/appservice/storage/sqlite3/appservice_events_table.go
@@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS appservice_events (
-- The ID of the application service the event will be sent to
as_id TEXT NOT NULL,
-- JSON representation of the event
- event_json TEXT NOT NULL,
+ headered_event_json TEXT NOT NULL,
-- The ID of the transaction that this event is a part of
txn_id INTEGER NOT NULL
);
@@ -42,14 +42,14 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
- "SELECT id, event_json, txn_id " +
+ "SELECT id, headered_event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" +
- "INSERT INTO appservice_events(as_id, event_json, txn_id) " +
+ "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
"VALUES ($1, $2, $3)"
const updateTxnIDForEventsSQL = "" +
@@ -107,7 +107,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
limit int,
) (
txnID, maxID int,
- events []gomatrixserverlib.Event,
+ events []gomatrixserverlib.HeaderedEvent,
eventsRemaining bool,
err error,
) {
@@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
return
}
-func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
+func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
@@ -141,7 +141,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
// new ones. Send back those events first.
lastTxnID := invalidTxnID
for eventsProcessed := 0; eventRows.Next(); {
- var event gomatrixserverlib.Event
+ var event gomatrixserverlib.HeaderedEvent
var eventJSON []byte
var id int
err = eventRows.Scan(
@@ -209,7 +209,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID(
func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
- event *gomatrixserverlib.Event,
+ event *gomatrixserverlib.HeaderedEvent,
) (err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go
index 5040b61b..d0538e26 100644
--- a/appservice/storage/sqlite3/storage.go
+++ b/appservice/storage/sqlite3/storage.go
@@ -53,12 +53,12 @@ func (d *Database) prepare() error {
return d.txnID.prepare(d.db)
}
-// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
+// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
- event *gomatrixserverlib.Event,
+ event *gomatrixserverlib.HeaderedEvent,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
@@ -69,7 +69,7 @@ func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
-) (int, int, []gomatrixserverlib.Event, bool, error) {
+) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go
index faa1e4a9..10c7ef91 100644
--- a/appservice/workers/transaction_scheduler.go
+++ b/appservice/workers/transaction_scheduler.go
@@ -181,9 +181,14 @@ func createTransaction(
}
}
+ var ev []gomatrixserverlib.Event
+ for _, e := range events {
+ ev = append(ev, e.Event)
+ }
+
// Create a transaction and store the events inside
transaction := gomatrixserverlib.ApplicationServiceTransaction{
- Events: events,
+ Events: ev,
}
transactionJSON, err = json.Marshal(transaction)