diff options
author | Kegsay <kegan@matrix.org> | 2020-03-06 10:23:55 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-06 10:23:55 +0000 |
commit | a97b8eafd459c6a5c742333e2d0d93385da6db95 (patch) | |
tree | fa52ae89e74218d09d355139003bd26cf8113297 /syncapi/storage | |
parent | 0cda3c52d0dc0462b204c8dc455762821bb74c3f (diff) |
Add peer-to-peer support into Dendrite via libp2p and fetch (#880)
* Use a fork of pq which supports userCurrent on wasm
* Use sqlite3_js driver when running in JS
* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only
* Update to latest go-sqlite-js version
* Replace prometheus with a stub. sigh
* Hard-code a config and don't use opentracing
* Latest go-sqlite3-js version
* Generate a key for now
* Listen for fetch traffic rather than HTTP
* Latest hacks for js
* libp2p support
* More libp2p
* Fork gjson to allow us to enforce auth checks as before
Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.
See https://github.com/tidwall/gjson/issues/157
When it's resolved, let's go back to mainline.
* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157
* Use latest gomatrixserverlib for sig checks
* Fix a bug which could cause exclude_from_sync to not be set
Caused when sending events over federation.
* Use query variadic to make lookups actually work!
* Latest gomatrixserverlib
* Add notes on getting p2p up and running
Partly so I don't forget myself!
* refactor: Move p2p specific stuff to cmd/dendritejs
This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.
Also, clean up main.go to read a bit better.
* Update ho-http-js-libp2p to return errors from RoundTrip
* Add an LRU cache around the key DB
We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.
Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)
* breaking: Add Tracing.Enabled to toggle whether we do opentracing
Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.
* Start adding conditional builds for wasm to handle lib/pq
The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).
* Remove lib/pq for wasm for syncapi
* Add conditional building to remaining storage APIs
* Update build script to set env vars correctly for dendritejs
* sqlite bug fixes
* Docs
* Add a no-op main for dendritejs when not building under wasm
* Use the real prometheus, even for WASM
Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:
```
global.process = {
pid: 1,
};
global.fs.stat = function(path, cb) {
cb({
code: "EINVAL",
});
}
```
* Linting
Diffstat (limited to 'syncapi/storage')
-rw-r--r-- | syncapi/storage/interface.go | 53 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/current_room_state_table.go | 9 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/filtering.go | 36 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/output_room_events_table.go | 53 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 5 | ||||
-rw-r--r-- | syncapi/storage/storage.go | 36 | ||||
-rw-r--r-- | syncapi/storage/storage_wasm.go | 38 |
7 files changed, 140 insertions, 90 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go new file mode 100644 index 00000000..0abeac34 --- /dev/null +++ b/syncapi/storage/interface.go @@ -0,0 +1,53 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/gomatrixserverlib" +) + +type Database interface { + common.PartitionStorer + AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) + Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) + WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error) + GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error) + GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error) + SyncPosition(ctx context.Context) (types.PaginationToken, error) + IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) + CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error) + GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) + UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error) + AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error) + RetireInviteEvent(ctx context.Context, inviteEventID string) error + SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) + AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition + RemoveTypingUser(userID, roomID string) types.StreamPosition + GetEventsInRange(ctx context.Context, from, to *types.PaginationToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) + EventPositionInTopology(ctx context.Context, eventID string) (types.StreamPosition, error) + EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) + BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) + MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error) + StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event + SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) +} diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index eb969c95..ed76177b 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -21,7 +21,6 @@ import ( "encoding/json" "strings" - "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -175,10 +174,10 @@ func (s *currentRoomStateStatements) selectCurrentState( ) ([]gomatrixserverlib.Event, error) { stmt := common.TxStmt(txn, s.selectCurrentStateStmt) rows, err := stmt.QueryContext(ctx, roomID, - pq.StringArray(stateFilterPart.Senders), - pq.StringArray(stateFilterPart.NotSenders), - pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), - pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), + nil, // FIXME: pq.StringArray(stateFilterPart.Senders), + nil, // FIXME: pq.StringArray(stateFilterPart.NotSenders), + nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), + nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), stateFilterPart.ContainsURL, stateFilterPart.Limit, ) diff --git a/syncapi/storage/sqlite3/filtering.go b/syncapi/storage/sqlite3/filtering.go deleted file mode 100644 index c4a2f4bf..00000000 --- a/syncapi/storage/sqlite3/filtering.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2017 Thibaut CHARLES -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "strings" -) - -// filterConvertWildcardToSQL converts wildcards as defined in -// https://matrix.org/docs/spec/client_server/r0.3.0.html#post-matrix-client-r0-user-userid-filter -// to SQL wildcards that can be used with LIKE() -func filterConvertTypeWildcardToSQL(values []string) []string { - if values == nil { - // Return nil instead of []string{} so IS NULL can work correctly when - // the return value is passed into SQL queries - return nil - } - - ret := make([]string, len(values)) - for i := range values { - ret[i] = strings.Replace(values[i], "*", "%", -1) - } - return ret -} diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 4535688d..be893743 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -40,11 +39,11 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( type TEXT NOT NULL, sender TEXT NOT NULL, contains_url BOOL NOT NULL, - add_state_ids TEXT[], - remove_state_ids TEXT[], + add_state_ids TEXT, -- JSON encoded string array + remove_state_ids TEXT, -- JSON encoded string array session_id BIGINT, transaction_id TEXT, - exclude_from_sync BOOL DEFAULT FALSE + exclude_from_sync BOOL NOT NULL DEFAULT FALSE ); ` @@ -52,7 +51,7 @@ const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + "id, room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + - "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $11" + "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $13" const selectEventsSQL = "" + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1" @@ -176,20 +175,26 @@ func (s *outputRoomEventsStatements) selectStateInRange( streamPos types.StreamPosition eventBytes []byte excludeFromSync bool - addIDs pq.StringArray - delIDs pq.StringArray + addIDsJSON string + delIDsJSON string ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { return nil, nil, err } + + addIDs, delIDs, err := unmarshalStateIDs(addIDsJSON, delIDsJSON) + if err != nil { + return nil, nil, err + } + // Sanity check for deleted state and whine if we see it. We don't need to do anything // since it'll just mark the event as not being needed. if len(addIDs) < len(delIDs) { log.WithFields(log.Fields{ "since": oldPos, "current": newPos, - "adds": addIDs, - "dels": delIDs, + "adds": addIDsJSON, + "dels": delIDsJSON, }).Warn("StateBetween: ignoring deleted state") } @@ -262,6 +267,15 @@ func (s *outputRoomEventsStatements) insertEvent( return } + addStateJSON, err := json.Marshal(addState) + if err != nil { + return + } + removeStateJSON, err := json.Marshal(removeState) + if err != nil { + return + } + insertStmt := common.TxStmt(txn, s.insertEventStmt) _, err = insertStmt.ExecContext( ctx, @@ -272,11 +286,12 @@ func (s *outputRoomEventsStatements) insertEvent( event.Type(), event.Sender(), containsURL, - pq.StringArray(addState), - pq.StringArray(removeState), + string(addStateJSON), + string(removeStateJSON), sessionID, txnID, excludeFromSync, + excludeFromSync, ) return } @@ -397,3 +412,17 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { } return result, nil } + +func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) { + if len(addIDsJSON) > 0 { + if err = json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil { + return + } + } + if len(delIDsJSON) > 0 { + if err = json.Unmarshal([]byte(delIDsJSON), &delIDs); err != nil { + return + } + } + return +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index da580e3a..0e84c8c8 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -29,8 +29,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" - // Import the postgres database driver. - _ "github.com/lib/pq" + // Import the sqlite3 package _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/common" @@ -79,7 +78,7 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro } else { return nil, errors.New("no filename or path in connect string") } - if d.db, err = sql.Open("sqlite3", cs); err != nil { + if d.db, err = sql.Open(common.SQLiteDriverName(), cs); err != nil { return nil, err } if err = d.prepare(); err != nil { diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index c87024b2..c56db063 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -12,49 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build !wasm + package storage import ( - "context" "net/url" - "time" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" - "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" - "github.com/matrix-org/gomatrixserverlib" ) -type Database interface { - common.PartitionStorer - AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) - Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) - WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error) - GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error) - GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error) - SyncPosition(ctx context.Context) (types.PaginationToken, error) - IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) - CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error) - GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) - UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error) - AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error) - RetireInviteEvent(ctx context.Context, inviteEventID string) error - SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) - AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition - RemoveTypingUser(userID, roomID string) types.StreamPosition - GetEventsInRange(ctx context.Context, from, to *types.PaginationToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) - EventPositionInTopology(ctx context.Context, eventID string) (types.StreamPosition, error) - EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) - BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) - MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error) - StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event - SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) -} - // NewPublicRoomsServerDatabase opens a database connection. func NewSyncServerDatasource(dataSourceName string) (Database, error) { uri, err := url.Parse(dataSourceName) diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go new file mode 100644 index 00000000..43806a01 --- /dev/null +++ b/syncapi/storage/storage_wasm.go @@ -0,0 +1,38 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "net/url" + + "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" +) + +// NewPublicRoomsServerDatabase opens a database connection. +func NewSyncServerDatasource(dataSourceName string) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return nil, fmt.Errorf("Cannot use postgres implementation") + } + switch uri.Scheme { + case "postgres": + return nil, fmt.Errorf("Cannot use postgres implementation") + case "file": + return sqlite3.NewSyncServerDatasource(dataSourceName) + default: + return nil, fmt.Errorf("Cannot use postgres implementation") + } +} |