1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// Send implements /_matrix/federation/v1/send/{txnID}
func Send(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
txnID gomatrixserverlib.TransactionID,
cfg *config.Dendrite,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
t := txnReq{
context: httpReq.Context(),
query: query,
producer: producer,
keys: keys,
federation: federation,
}
if err := json.Unmarshal(request.Content(), &t); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
}
}
t.Origin = request.Origin()
t.TransactionID = txnID
t.Destination = cfg.Matrix.ServerName
resp, err := t.processTransaction()
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: resp,
}
}
type txnReq struct {
gomatrixserverlib.Transaction
context context.Context
query api.RoomserverQueryAPI
producer *producers.RoomserverProducer
keys gomatrixserverlib.KeyRing
federation *gomatrixserverlib.FederationClient
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Check the event signatures
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, t.PDUs, t.keys); err != nil {
return nil, err
}
// Process the events.
results := map[string]gomatrixserverlib.PDUResult{}
for _, e := range t.PDUs {
err := t.processEvent(e)
if err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
switch err.(type) {
case unknownRoomError:
case *gomatrixserverlib.NotAllowed:
default:
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
return nil, err
}
results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: err.Error(),
}
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
}
}
// TODO: Process the EDUs.
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
type unknownRoomError struct {
roomID string
}
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) }
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
prevEventIDs := e.PrevEventIDs()
// Fetch the state needed to authenticate the event.
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
stateReq := api.QueryStateAfterEventsRequest{
RoomID: e.RoomID(),
PrevEventIDs: prevEventIDs,
StateToFetch: needed.Tuples(),
}
var stateResp api.QueryStateAfterEventsResponse
if err := t.query.QueryStateAfterEvents(t.context, &stateReq, &stateResp); err != nil {
return err
}
if !stateResp.RoomExists {
// TODO: When synapse receives a message for a room it is not in it
// asks the remote server for the state of the room so that it can
// check if the remote server knows of a join "m.room.member" event
// that this server is unaware of.
// However generally speaking we should reject events for rooms we
// aren't a member of.
return unknownRoomError{e.RoomID()}
}
if !stateResp.PrevEventsExist {
return t.processEventWithMissingState(e)
}
// Check that the event is allowed by the state at the event.
if err := checkAllowedByState(e, stateResp.StateEvents); err != nil {
return err
}
// TODO: Check that the roomserver has a copy of all of the auth_events.
// TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver
_, err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil)
return err
}
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
for i := range stateEvents {
err := authUsingState.AddEvent(&stateEvents[i])
if err != nil {
return err
}
}
return gomatrixserverlib.Allowed(e, &authUsingState)
}
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
// 1) We can fill in the gap using /get_missing_events
// 2) We can leave the gap and request the state of the room at
// this event from the remote server using either /state_ids
// or /state.
// Synapse will attempt to do 1 and if that fails or if the gap is
// too large then it will attempt 2.
// Synapse will use /state_ids if possible since usually the state
// is largely unchanged and it is more efficient to fetch a list of
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
// TODO: Attempt to fetch the state using /state_ids and /events
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID())
if err != nil {
return err
}
// Check that the returned state is valid.
if err := state.Check(t.context, t.keys); err != nil {
return err
}
// Check that the event is allowed by the state.
retryAllowedState:
if err := checkAllowedByState(e, state.StateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
// An auth event was missing so let's look up that event over federation
for _, s := range state.StateEvents {
if s.EventID() != missing.AuthEventID {
continue
}
err = t.processEventWithMissingState(s)
// If there was no error retrieving the event from federation then
// we assume that it succeeded, so retry the original state check
if err == nil {
goto retryAllowedState
}
}
default:
}
return err
}
// pass the event along with the state to the roomserver
return t.producer.SendEventWithState(t.context, state, e)
}
|