aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'appservice')
-rw-r--r--appservice/storage/postgres/appservice_events_table.go19
-rw-r--r--appservice/storage/sqlite3/appservice_events_table.go19
-rw-r--r--appservice/workers/transaction_scheduler.go21
3 files changed, 38 insertions, 21 deletions
diff --git a/appservice/storage/postgres/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go
index d33a83b1..a95be6b8 100644
--- a/appservice/storage/postgres/appservice_events_table.go
+++ b/appservice/storage/postgres/appservice_events_table.go
@@ -111,19 +111,19 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
eventsRemaining bool,
err error,
) {
- // Retrieve events from the database. Unsuccessfully sent events first
- eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
- if err != nil {
- return
- }
defer func() {
- err = eventRows.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": applicationServiceID,
}).WithError(err).Fatalf("appservice unable to select new events to send")
}
}()
+ // Retrieve events from the database. Unsuccessfully sent events first
+ eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
+ if err != nil {
+ return
+ }
+ defer checkNamedErr(eventRows.Close, &err)
events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
if err != nil {
return
@@ -132,6 +132,13 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
return
}
+// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil
+func checkNamedErr(fn func() error, err *error) {
+ if e := fn(); e != nil && *err == nil {
+ *err = e
+ }
+}
+
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go
index 5dfb72f6..34b4859e 100644
--- a/appservice/storage/sqlite3/appservice_events_table.go
+++ b/appservice/storage/sqlite3/appservice_events_table.go
@@ -116,19 +116,19 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
eventsRemaining bool,
err error,
) {
- // Retrieve events from the database. Unsuccessfully sent events first
- eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
- if err != nil {
- return
- }
defer func() {
- err = eventRows.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": applicationServiceID,
}).WithError(err).Fatalf("appservice unable to select new events to send")
}
}()
+ // Retrieve events from the database. Unsuccessfully sent events first
+ eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
+ if err != nil {
+ return
+ }
+ defer checkNamedErr(eventRows.Close, &err)
events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
if err != nil {
return
@@ -137,6 +137,13 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
return
}
+// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil
+func checkNamedErr(fn func() error, err *error) {
+ if e := fn(); e != nil && *err == nil {
+ *err = e
+ }
+}
+
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go
index 63ec58aa..b1735841 100644
--- a/appservice/workers/transaction_scheduler.go
+++ b/appservice/workers/transaction_scheduler.go
@@ -101,6 +101,9 @@ func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
// Backoff if the application service does not respond
err = send(client, ws.AppService, txnID, transactionJSON)
if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Error("unable to send event")
// Backoff
backoff(&ws, err)
continue
@@ -207,7 +210,7 @@ func send(
appservice config.ApplicationService,
txnID int,
transaction []byte,
-) error {
+) (err error) {
// PUT a transaction to our AS
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
address := fmt.Sprintf("%s/transactions/%d?access_token=%s", appservice.URL, txnID, url.QueryEscape(appservice.HSToken))
@@ -220,14 +223,7 @@ func send(
if err != nil {
return err
}
- defer func() {
- err := resp.Body.Close()
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": appservice.ID,
- }).WithError(err).Error("unable to close response body from application service")
- }
- }()
+ defer checkNamedErr(resp.Body.Close, &err)
// Check the AS received the events correctly
if resp.StatusCode != http.StatusOK {
@@ -237,3 +233,10 @@ func send(
return nil
}
+
+// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil
+func checkNamedErr(fn func() error, err *error) {
+ if e := fn(); e != nil && *err == nil {
+ *err = e
+ }
+}