aboutsummaryrefslogtreecommitdiff
path: root/appservice/appservice.go
blob: bd261ff9b6c52c7b6662988bfbfef938ecfe40e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright 2018 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package appservice

import (
	"context"
	"errors"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/mux"
	appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
	"github.com/matrix-org/dendrite/appservice/consumers"
	"github.com/matrix-org/dendrite/appservice/inthttp"
	"github.com/matrix-org/dendrite/appservice/query"
	"github.com/matrix-org/dendrite/appservice/storage"
	"github.com/matrix-org/dendrite/appservice/types"
	"github.com/matrix-org/dendrite/appservice/workers"
	"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
	"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
	"github.com/matrix-org/dendrite/internal/config"
	"github.com/matrix-org/dendrite/internal/setup"
	"github.com/matrix-org/dendrite/internal/sqlutil"
	roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
	"github.com/sirupsen/logrus"
)

// AddInternalRoutes registers HTTP handlers for internal API calls
func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQueryAPI) {
	inthttp.AddRoutes(queryAPI, router)
}

// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
	base *setup.BaseDendrite,
	accountsDB accounts.Database,
	deviceDB devices.Database,
	rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceQueryAPI {
	// Create a connection to the appservice postgres DB
	appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService), base.Cfg.DbProperties())
	if err != nil {
		logrus.WithError(err).Panicf("failed to connect to appservice db")
	}

	// Wrap application services in a type that relates the application service and
	// a sync.Cond object that can be used to notify workers when there are new
	// events to be sent out.
	workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
	for i, appservice := range base.Cfg.Derived.ApplicationServices {
		m := sync.Mutex{}
		ws := types.ApplicationServiceWorkerState{
			AppService: appservice,
			Cond:       sync.NewCond(&m),
		}
		workerStates[i] = ws

		// Create bot account for this AS if it doesn't already exist
		if err = generateAppServiceAccount(accountsDB, deviceDB, appservice); err != nil {
			logrus.WithFields(logrus.Fields{
				"appservice": appservice.ID,
			}).WithError(err).Panicf("failed to generate bot account for appservice")
		}
	}

	// Create appserivce query API with an HTTP client that will be used for all
	// outbound and inbound requests (inbound only for the internal API)
	appserviceQueryAPI := &query.AppServiceQueryAPI{
		HTTPClient: &http.Client{
			Timeout: time.Second * 30,
		},
		Cfg: base.Cfg,
	}

	// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
	// We can't add ASes at runtime so this is safe to do.
	if len(workerStates) > 0 {
		consumer := consumers.NewOutputRoomEventConsumer(
			base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
			rsAPI, workerStates,
		)
		if err := consumer.Start(); err != nil {
			logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
		}
	}

	// Create application service transaction workers
	if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
		logrus.WithError(err).Panicf("failed to start app service transaction workers")
	}
	return appserviceQueryAPI
}

// generateAppServiceAccounts creates a dummy account based off the
// `sender_localpart` field of each application service if it doesn't
// exist already
func generateAppServiceAccount(
	accountsDB accounts.Database,
	deviceDB devices.Database,
	as config.ApplicationService,
) error {
	ctx := context.Background()

	// Create an account for the application service
	_, err := accountsDB.CreateAccount(ctx, as.SenderLocalpart, "", as.ID)
	if err != nil {
		if errors.Is(err, sqlutil.ErrUserExists) { // This account already exists
			return nil
		}
		return err
	}

	// Create a dummy device with a dummy token for the application service
	_, err = deviceDB.CreateDevice(ctx, as.SenderLocalpart, nil, as.ASToken, &as.SenderLocalpart)
	return err
}