aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/streams.go
blob: d3195b78f94dbce222da3ec95690d162bbd42e3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package streams

import (
	"context"

	"github.com/matrix-org/dendrite/internal/caching"
	keyapi "github.com/matrix-org/dendrite/keyserver/api"
	rsapi "github.com/matrix-org/dendrite/roomserver/api"
	"github.com/matrix-org/dendrite/syncapi/notifier"
	"github.com/matrix-org/dendrite/syncapi/storage"
	"github.com/matrix-org/dendrite/syncapi/types"
	userapi "github.com/matrix-org/dendrite/userapi/api"
)

type Streams struct {
	PDUStreamProvider              types.StreamProvider
	TypingStreamProvider           types.StreamProvider
	ReceiptStreamProvider          types.StreamProvider
	InviteStreamProvider           types.StreamProvider
	SendToDeviceStreamProvider     types.StreamProvider
	AccountDataStreamProvider      types.StreamProvider
	DeviceListStreamProvider       types.StreamProvider
	NotificationDataStreamProvider types.StreamProvider
	PresenceStreamProvider         types.StreamProvider
}

func NewSyncStreamProviders(
	d storage.Database, userAPI userapi.UserInternalAPI,
	rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
	eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams {
	streams := &Streams{
		PDUStreamProvider: &PDUStreamProvider{
			StreamProvider: StreamProvider{DB: d},
			lazyLoadCache:  lazyLoadCache,
		},
		TypingStreamProvider: &TypingStreamProvider{
			StreamProvider: StreamProvider{DB: d},
			EDUCache:       eduCache,
		},
		ReceiptStreamProvider: &ReceiptStreamProvider{
			StreamProvider: StreamProvider{DB: d},
		},
		InviteStreamProvider: &InviteStreamProvider{
			StreamProvider: StreamProvider{DB: d},
		},
		SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
			StreamProvider: StreamProvider{DB: d},
		},
		AccountDataStreamProvider: &AccountDataStreamProvider{
			StreamProvider: StreamProvider{DB: d},
			userAPI:        userAPI,
		},
		NotificationDataStreamProvider: &NotificationDataStreamProvider{
			StreamProvider: StreamProvider{DB: d},
		},
		DeviceListStreamProvider: &DeviceListStreamProvider{
			StreamProvider: StreamProvider{DB: d},
			rsAPI:          rsAPI,
			keyAPI:         keyAPI,
		},
		PresenceStreamProvider: &PresenceStreamProvider{
			StreamProvider: StreamProvider{DB: d},
			notifier:       notifier,
		},
	}

	streams.PDUStreamProvider.Setup()
	streams.TypingStreamProvider.Setup()
	streams.ReceiptStreamProvider.Setup()
	streams.InviteStreamProvider.Setup()
	streams.SendToDeviceStreamProvider.Setup()
	streams.AccountDataStreamProvider.Setup()
	streams.NotificationDataStreamProvider.Setup()
	streams.DeviceListStreamProvider.Setup()
	streams.PresenceStreamProvider.Setup()

	return streams
}

func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
	return types.StreamingToken{
		PDUPosition:              s.PDUStreamProvider.LatestPosition(ctx),
		TypingPosition:           s.TypingStreamProvider.LatestPosition(ctx),
		ReceiptPosition:          s.ReceiptStreamProvider.LatestPosition(ctx),
		InvitePosition:           s.InviteStreamProvider.LatestPosition(ctx),
		SendToDevicePosition:     s.SendToDeviceStreamProvider.LatestPosition(ctx),
		AccountDataPosition:      s.AccountDataStreamProvider.LatestPosition(ctx),
		NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx),
		DeviceListPosition:       s.DeviceListStreamProvider.LatestPosition(ctx),
		PresencePosition:         s.PresenceStreamProvider.LatestPosition(ctx),
	}
}