aboutsummaryrefslogtreecommitdiff
path: root/roomserver/storage/postgres
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-28 11:15:21 +0100
committerGitHub <noreply@github.com>2020-05-28 11:15:21 +0100
commita6f995eb45f0d012f79aa66f727cb6260455b334 (patch)
treee3929ab30c76e78412593f35cd87c34a2367e7f3 /roomserver/storage/postgres
parent02fe38e1f74a61f900ff5d8a4e2d5870ba737386 (diff)
Merge Updater structs (#1069)
* Move Updater structs to shared and use it for postgres * Add constructors for NewXXXUpdater and a useTxns flag In sqlite, we set useTxns=false and comment why. * Handle nil txn * Handle nil in transaction * Missed one * Close the txn at the right time * Don't close the transaction as we reuse it between calls
Diffstat (limited to 'roomserver/storage/postgres')
-rw-r--r--roomserver/storage/postgres/storage.go390
1 files changed, 25 insertions, 365 deletions
diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go
index 53a58076..971f2b9e 100644
--- a/roomserver/storage/postgres/storage.go
+++ b/roomserver/storage/postgres/storage.go
@@ -9,14 +9,13 @@
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
- "context"
"database/sql"
"github.com/matrix-org/dendrite/internal"
@@ -25,423 +24,84 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
- "github.com/matrix-org/dendrite/roomserver/storage/tables"
- "github.com/matrix-org/dendrite/roomserver/types"
- "github.com/matrix-org/gomatrixserverlib"
)
// A Database is used to store room events and stream offsets.
type Database struct {
shared.Database
- events tables.Events
- eventTypes tables.EventTypes
- eventStateKeys tables.EventStateKeys
- eventJSON tables.EventJSON
- rooms tables.Rooms
- transactions tables.Transactions
- prevEvents tables.PreviousEvents
- invites tables.Invites
- membership tables.Membership
- db *sql.DB
}
// Open a postgres database.
// nolint: gocyclo
func Open(dataSourceName string, dbProperties internal.DbProperties) (*Database, error) {
var d Database
+ var db *sql.DB
var err error
- if d.db, err = sqlutil.Open("postgres", dataSourceName, dbProperties); err != nil {
+ if db, err = sqlutil.Open("postgres", dataSourceName, dbProperties); err != nil {
return nil, err
}
- d.eventStateKeys, err = NewPostgresEventStateKeysTable(d.db)
+ eventStateKeys, err := NewPostgresEventStateKeysTable(db)
if err != nil {
return nil, err
}
- d.eventTypes, err = NewPostgresEventTypesTable(d.db)
+ eventTypes, err := NewPostgresEventTypesTable(db)
if err != nil {
return nil, err
}
- d.eventJSON, err = NewPostgresEventJSONTable(d.db)
+ eventJSON, err := NewPostgresEventJSONTable(db)
if err != nil {
return nil, err
}
- d.events, err = NewPostgresEventsTable(d.db)
+ events, err := NewPostgresEventsTable(db)
if err != nil {
return nil, err
}
- d.rooms, err = NewPostgresRoomsTable(d.db)
+ rooms, err := NewPostgresRoomsTable(db)
if err != nil {
return nil, err
}
- d.transactions, err = NewPostgresTransactionsTable(d.db)
+ transactions, err := NewPostgresTransactionsTable(db)
if err != nil {
return nil, err
}
- stateBlock, err := NewPostgresStateBlockTable(d.db)
+ stateBlock, err := NewPostgresStateBlockTable(db)
if err != nil {
return nil, err
}
- stateSnapshot, err := NewPostgresStateSnapshotTable(d.db)
+ stateSnapshot, err := NewPostgresStateSnapshotTable(db)
if err != nil {
return nil, err
}
- roomAliases, err := NewPostgresRoomAliasesTable(d.db)
+ roomAliases, err := NewPostgresRoomAliasesTable(db)
if err != nil {
return nil, err
}
- d.prevEvents, err = NewPostgresPreviousEventsTable(d.db)
+ prevEvents, err := NewPostgresPreviousEventsTable(db)
if err != nil {
return nil, err
}
- d.invites, err = NewPostgresInvitesTable(d.db)
+ invites, err := NewPostgresInvitesTable(db)
if err != nil {
return nil, err
}
- d.membership, err = NewPostgresMembershipTable(d.db)
+ membership, err := NewPostgresMembershipTable(db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
- DB: d.db,
- EventTypesTable: d.eventTypes,
- EventStateKeysTable: d.eventStateKeys,
- EventJSONTable: d.eventJSON,
- EventsTable: d.events,
- RoomsTable: d.rooms,
- TransactionsTable: d.transactions,
+ DB: db,
+ EventTypesTable: eventTypes,
+ EventStateKeysTable: eventStateKeys,
+ EventJSONTable: eventJSON,
+ EventsTable: events,
+ RoomsTable: rooms,
+ TransactionsTable: transactions,
StateBlockTable: stateBlock,
StateSnapshotTable: stateSnapshot,
- PrevEventsTable: d.prevEvents,
+ PrevEventsTable: prevEvents,
RoomAliasesTable: roomAliases,
- InvitesTable: d.invites,
- MembershipTable: d.membership,
+ InvitesTable: invites,
+ MembershipTable: membership,
}
return &d, nil
}
-
-func (d *Database) assignRoomNID(
- ctx context.Context, txn *sql.Tx,
- roomID string, roomVersion gomatrixserverlib.RoomVersion,
-) (types.RoomNID, error) {
- // Check if we already have a numeric ID in the database.
- roomNID, err := d.rooms.SelectRoomNID(ctx, txn, roomID)
- if err == sql.ErrNoRows {
- // We don't have a numeric ID so insert one into the database.
- roomNID, err = d.rooms.InsertRoomNID(ctx, txn, roomID, roomVersion)
- if err == sql.ErrNoRows {
- // We raced with another insert so run the select again.
- roomNID, err = d.rooms.SelectRoomNID(ctx, txn, roomID)
- }
- }
- return roomNID, err
-}
-
-func (d *Database) assignStateKeyNID(
- ctx context.Context, txn *sql.Tx, eventStateKey string,
-) (types.EventStateKeyNID, error) {
- // Check if we already have a numeric ID in the database.
- eventStateKeyNID, err := d.eventStateKeys.SelectEventStateKeyNID(ctx, txn, eventStateKey)
- if err == sql.ErrNoRows {
- // We don't have a numeric ID so insert one into the database.
- eventStateKeyNID, err = d.eventStateKeys.InsertEventStateKeyNID(ctx, txn, eventStateKey)
- if err == sql.ErrNoRows {
- // We raced with another insert so run the select again.
- eventStateKeyNID, err = d.eventStateKeys.SelectEventStateKeyNID(ctx, txn, eventStateKey)
- }
- }
- return eventStateKeyNID, err
-}
-
-// GetLatestEventsForUpdate implements input.EventDatabase
-func (d *Database) GetLatestEventsForUpdate(
- ctx context.Context, roomNID types.RoomNID,
-) (types.RoomRecentEventsUpdater, error) {
- txn, err := d.db.Begin()
- if err != nil {
- return nil, err
- }
- eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
- d.rooms.SelectLatestEventsNIDsForUpdate(ctx, txn, roomNID)
- if err != nil {
- txn.Rollback() // nolint: errcheck
- return nil, err
- }
- stateAndRefs, err := d.events.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs)
- if err != nil {
- txn.Rollback() // nolint: errcheck
- return nil, err
- }
- var lastEventIDSent string
- if lastEventNIDSent != 0 {
- lastEventIDSent, err = d.events.SelectEventID(ctx, txn, lastEventNIDSent)
- if err != nil {
- txn.Rollback() // nolint: errcheck
- return nil, err
- }
- }
- return &roomRecentEventsUpdater{
- transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
- }, nil
-}
-
-type roomRecentEventsUpdater struct {
- transaction
- d *Database
- roomNID types.RoomNID
- latestEvents []types.StateAtEventAndReference
- lastEventIDSent string
- currentStateSnapshotNID types.StateSnapshotNID
-}
-
-// RoomVersion implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) {
- version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
- return
-}
-
-// LatestEvents implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
- return u.latestEvents
-}
-
-// LastEventIDSent implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) LastEventIDSent() string {
- return u.lastEventIDSent
-}
-
-// CurrentStateSnapshotNID implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
- return u.currentStateSnapshotNID
-}
-
-// StorePreviousEvents implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
- for _, ref := range previousEventReferences {
- if err := u.d.prevEvents.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
- return err
- }
- }
- return nil
-}
-
-// IsReferenced implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) {
- err := u.d.prevEvents.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256)
- if err == nil {
- return true, nil
- }
- if err == sql.ErrNoRows {
- return false, nil
- }
- return false, err
-}
-
-// SetLatestEvents implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) SetLatestEvents(
- roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID,
- currentStateSnapshotNID types.StateSnapshotNID,
-) error {
- eventNIDs := make([]types.EventNID, len(latest))
- for i := range latest {
- eventNIDs[i] = latest[i].EventNID
- }
- return u.d.rooms.UpdateLatestEventNIDs(u.ctx, u.txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID)
-}
-
-// HasEventBeenSent implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) {
- return u.d.events.SelectEventSentToOutput(u.ctx, u.txn, eventNID)
-}
-
-// MarkEventAsSent implements types.RoomRecentEventsUpdater
-func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error {
- return u.d.events.UpdateEventSentToOutput(u.ctx, u.txn, eventNID)
-}
-
-func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (types.MembershipUpdater, error) {
- return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID, targetLocal)
-}
-
-// MembershipUpdater implements input.RoomEventDatabase
-func (d *Database) MembershipUpdater(
- ctx context.Context, roomID, targetUserID string,
- targetLocal bool, roomVersion gomatrixserverlib.RoomVersion,
-) (types.MembershipUpdater, error) {
- txn, err := d.db.Begin()
- if err != nil {
- return nil, err
- }
- succeeded := false
- defer func() {
- if !succeeded {
- txn.Rollback() // nolint: errcheck
- }
- }()
-
- roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion)
- if err != nil {
- return nil, err
- }
-
- targetUserNID, err := d.assignStateKeyNID(ctx, txn, targetUserID)
- if err != nil {
- return nil, err
- }
-
- updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal)
- if err != nil {
- return nil, err
- }
-
- succeeded = true
- return updater, nil
-}
-
-type membershipUpdater struct {
- transaction
- d *Database
- roomNID types.RoomNID
- targetUserNID types.EventStateKeyNID
- membership tables.MembershipState
-}
-
-func (d *Database) membershipUpdaterTxn(
- ctx context.Context,
- txn *sql.Tx,
- roomNID types.RoomNID,
- targetUserNID types.EventStateKeyNID,
- targetLocal bool,
-) (types.MembershipUpdater, error) {
-
- if err := d.membership.InsertMembership(ctx, txn, roomNID, targetUserNID, targetLocal); err != nil {
- return nil, err
- }
-
- membership, err := d.membership.SelectMembershipForUpdate(ctx, txn, roomNID, targetUserNID)
- if err != nil {
- return nil, err
- }
-
- return &membershipUpdater{
- transaction{ctx, txn}, d, roomNID, targetUserNID, membership,
- }, nil
-}
-
-// IsInvite implements types.MembershipUpdater
-func (u *membershipUpdater) IsInvite() bool {
- return u.membership == tables.MembershipStateInvite
-}
-
-// IsJoin implements types.MembershipUpdater
-func (u *membershipUpdater) IsJoin() bool {
- return u.membership == tables.MembershipStateJoin
-}
-
-// IsLeave implements types.MembershipUpdater
-func (u *membershipUpdater) IsLeave() bool {
- return u.membership == tables.MembershipStateLeaveOrBan
-}
-
-// SetToInvite implements types.MembershipUpdater
-func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) {
- senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender())
- if err != nil {
- return false, err
- }
- inserted, err := u.d.invites.InsertInviteEvent(
- u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
- )
- if err != nil {
- return false, err
- }
- if u.membership != tables.MembershipStateInvite {
- if err = u.d.membership.UpdateMembership(
- u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, 0,
- ); err != nil {
- return false, err
- }
- }
- return inserted, nil
-}
-
-// SetToJoin implements types.MembershipUpdater
-func (u *membershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) {
- var inviteEventIDs []string
-
- senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID)
- if err != nil {
- return nil, err
- }
-
- // If this is a join event update, there is no invite to update
- if !isUpdate {
- inviteEventIDs, err = u.d.invites.UpdateInviteRetired(
- u.ctx, u.txn, u.roomNID, u.targetUserNID,
- )
- if err != nil {
- return nil, err
- }
- }
-
- // Look up the NID of the new join event
- nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID})
- if err != nil {
- return nil, err
- }
-
- if u.membership != tables.MembershipStateJoin || isUpdate {
- if err = u.d.membership.UpdateMembership(
- u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
- tables.MembershipStateJoin, nIDs[eventID],
- ); err != nil {
- return nil, err
- }
- }
-
- return inviteEventIDs, nil
-}
-
-// SetToLeave implements types.MembershipUpdater
-func (u *membershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) {
- senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID)
- if err != nil {
- return nil, err
- }
- inviteEventIDs, err := u.d.invites.UpdateInviteRetired(
- u.ctx, u.txn, u.roomNID, u.targetUserNID,
- )
- if err != nil {
- return nil, err
- }
-
- // Look up the NID of the new leave event
- nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID})
- if err != nil {
- return nil, err
- }
-
- if u.membership != tables.MembershipStateLeaveOrBan {
- if err = u.d.membership.UpdateMembership(
- u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
- tables.MembershipStateLeaveOrBan, nIDs[eventID],
- ); err != nil {
- return nil, err
- }
- }
- return inviteEventIDs, nil
-}
-
-type transaction struct {
- ctx context.Context
- txn *sql.Tx
-}
-
-// Commit implements types.Transaction
-func (t *transaction) Commit() error {
- return t.txn.Commit()
-}
-
-// Rollback implements types.Transaction
-func (t *transaction) Rollback() error {
- return t.txn.Rollback()
-}