/*
This file is part of TALER
(C) 2016 GNUnet e.V.
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
TALER; see the file COPYING. If not, see
*/
/**
* Database query abstractions.
* @module Query
* @author Florian Dold
*/
import { openPromise } from "./promiseUtils";
import { join } from "path";
/**
* Result of an inner join.
*/
export interface JoinResult {
left: L;
right: R;
}
/**
* Result of a left outer join.
*/
export interface JoinLeftResult {
left: L;
right?: R;
}
/**
* Definition of an object store.
*/
export class Store {
constructor(
public name: string,
public storeParams?: IDBObjectStoreParameters,
public validator?: (v: T) => T,
) {}
}
/**
* Options for an index.
*/
export interface IndexOptions {
/**
* If true and the path resolves to an array, create an index entry for
* each member of the array (instead of one index entry containing the full array).
*
* Defaults to false.
*/
multiEntry?: boolean;
}
/**
* Definition of an index.
*/
export class Index {
/**
* Name of the store that this index is associated with.
*/
storeName: string;
/**
* Options to use for the index.
*/
options: IndexOptions;
constructor(
s: Store,
public indexName: string,
public keyPath: string | string[],
options?: IndexOptions,
) {
const defaultOptions = {
multiEntry: false,
};
this.options = { ...defaultOptions, ...(options || {}) };
this.storeName = s.name;
}
/**
* We want to have the key type parameter in use somewhere,
* because otherwise the compiler complains. In iterIndex the
* key type is pretty useful.
*/
protected _dummyKey: S | undefined;
}
/**
* Stream that can be filtered, reduced or joined
* with indices.
*/
export interface QueryStream {
/**
* Join the current query with values from an index.
* The left side of the join is extracted via a function from the stream's
* result, the right side of the join is the key of the index.
*/
indexJoin(
index: Index,
keyFn: (obj: T) => I,
): QueryStream>;
/**
* Join the current query with values from an index, and keep values in the
* current stream that don't have a match. The left side of the join is
* extracted via a function from the stream's result, the right side of the
* join is the key of the index.
*/
indexJoinLeft(
index: Index,
keyFn: (obj: T) => I,
): QueryStream>;
/**
* Join the current query with values from another object store.
* The left side of the join is extracted via a function over the current query,
* the right side of the join is the key of the object store.
*/
keyJoin(
store: Store,
keyFn: (obj: T) => I,
): QueryStream>;
/**
* Only keep elements in the result stream for which the predicate returns
* true.
*/
filter(f: (x: T) => boolean): QueryStream;
/**
* Fold the stream, resulting in a single value.
*/
fold(f: (v: T, acc: S) => S, start: S): Promise;
/**
* Execute a function for every value of the stream, for the
* side-effects of the function.
*/
forEach(f: (v: T) => void): Promise;
/**
* Map each element of the stream using a function, resulting in another
* stream of a different type.
*/
map(f: (x: T) => S): QueryStream;
/**
* Map each element of the stream to a potentially empty array, and collect
* the result in a stream of the flattened arrays.
*/
flatMap(f: (x: T) => S[]): QueryStream;
/**
* Collect the stream into an array and return a promise for it.
*/
toArray(): Promise;
/**
* Get the first value of the stream.
*/
first(): QueryValue;
/**
* Run the query without returning a result.
* Useful for queries with side effects.
*/
run(): Promise;
}
/**
* Query result that consists of at most one value.
*/
export interface QueryValue {
/**
* Apply a function to a query value.
*/
map(f: (x: T) => S): QueryValue;
/**
* Conditionally execute either of two queries based
* on a property of this query value.
*
* Useful to properly implement complex queries within a transaction (as
* opposed to just computing the conditional and then executing either
* branch). This is necessary since IndexedDB does not allow long-lived
* transactions.
*/
cond(
f: (x: T) => boolean,
onTrue: (r: QueryRoot) => R,
onFalse: (r: QueryRoot) => R,
): Promise;
}
abstract class BaseQueryValue implements QueryValue {
constructor(public root: QueryRoot) {}
map(f: (x: T) => S): QueryValue {
return new MapQueryValue(this, f);
}
cond(
f: (x: T) => boolean,
onTrue: (r: QueryRoot) => R,
onFalse: (r: QueryRoot) => R,
): Promise {
return new Promise((resolve, reject) => {
this.subscribeOne((v, tx) => {
if (f(v)) {
onTrue(new QueryRoot(this.root.db));
} else {
onFalse(new QueryRoot(this.root.db));
}
});
resolve();
});
}
abstract subscribeOne(f: SubscribeOneFn): void;
}
class FirstQueryValue extends BaseQueryValue {
private gotValue = false;
private s: QueryStreamBase;
constructor(stream: QueryStreamBase) {
super(stream.root);
this.s = stream;
}
subscribeOne(f: SubscribeOneFn): void {
this.s.subscribe((isDone, value, tx) => {
if (this.gotValue) {
return;
}
if (isDone) {
f(undefined, tx);
} else {
f(value, tx);
}
this.gotValue = true;
});
}
}
class MapQueryValue extends BaseQueryValue {
constructor(private v: BaseQueryValue, private mapFn: (x: T) => S) {
super(v.root);
}
subscribeOne(f: SubscribeOneFn): void {
this.v.subscribeOne((v, tx) => this.mapFn(v));
}
}
/**
* Exception that should be thrown by client code to abort a transaction.
*/
export const AbortTransaction = Symbol("abort_transaction");
abstract class QueryStreamBase implements QueryStream {
abstract subscribe(
f: (isDone: boolean, value: any, tx: IDBTransaction) => void,
): void;
constructor(public root: QueryRoot) {}
first(): QueryValue {
return new FirstQueryValue(this);
}
flatMap(f: (x: T) => S[]): QueryStream {
return new QueryStreamFlatMap(this, f);
}
map(f: (x: T) => S): QueryStream {
return new QueryStreamMap(this, f);
}
indexJoin(
index: Index,
keyFn: (obj: T) => I,
): QueryStream> {
this.root.addStoreAccess(index.storeName, false);
return new QueryStreamIndexJoin(
this,
index.storeName,
index.indexName,
keyFn,
);
}
indexJoinLeft(
index: Index,
keyFn: (obj: T) => I,
): QueryStream> {
this.root.addStoreAccess(index.storeName, false);
return new QueryStreamIndexJoinLeft(
this,
index.storeName,
index.indexName,
keyFn,
);
}
keyJoin(
store: Store,
keyFn: (obj: T) => I,
): QueryStream> {
this.root.addStoreAccess(store.name, false);
return new QueryStreamKeyJoin(this, store.name, keyFn);
}
filter(f: (x: any) => boolean): QueryStream {
return new QueryStreamFilter(this, f);
}
toArray(): Promise {
const { resolve, promise } = openPromise();
const values: T[] = [];
this.subscribe((isDone, value) => {
if (isDone) {
resolve(values);
return;
}
values.push(value);
});
return Promise.resolve()
.then(() => this.root.finish())
.then(() => promise);
}
fold(f: (x: T, acc: A) => A, init: A): Promise {
const { resolve, promise } = openPromise();
let acc = init;
this.subscribe((isDone, value) => {
if (isDone) {
resolve(acc);
return;
}
acc = f(value, acc);
});
return Promise.resolve()
.then(() => this.root.finish())
.then(() => promise);
}
forEach(f: (x: T) => void): Promise {
const { resolve, promise } = openPromise();
this.subscribe((isDone, value) => {
if (isDone) {
resolve();
return;
}
f(value);
});
return Promise.resolve()
.then(() => this.root.finish())
.then(() => promise);
}
run(): Promise {
const { resolve, promise } = openPromise();
this.subscribe((isDone, value) => {
if (isDone) {
resolve();
return;
}
});
return Promise.resolve()
.then(() => this.root.finish())
.then(() => promise);
}
}
type FilterFn = (e: any) => boolean;
type SubscribeFn = (done: boolean, value: any, tx: IDBTransaction) => void;
type SubscribeOneFn = (value: any, tx: IDBTransaction) => void;
class QueryStreamFilter extends QueryStreamBase {
constructor(public s: QueryStreamBase, public filterFn: FilterFn) {
super(s.root);
}
subscribe(f: SubscribeFn) {
this.s.subscribe((isDone, value, tx) => {
if (isDone) {
f(true, undefined, tx);
return;
}
if (this.filterFn(value)) {
f(false, value, tx);
}
});
}
}
class QueryStreamFlatMap extends QueryStreamBase {
constructor(public s: QueryStreamBase, public flatMapFn: (v: T) => S[]) {
super(s.root);
}
subscribe(f: SubscribeFn) {
this.s.subscribe((isDone, value, tx) => {
if (isDone) {
f(true, undefined, tx);
return;
}
const values = this.flatMapFn(value);
for (const v in values) {
f(false, v, tx);
}
});
}
}
class QueryStreamMap extends QueryStreamBase {
constructor(public s: QueryStreamBase, public mapFn: (v: S) => T) {
super(s.root);
}
subscribe(f: SubscribeFn) {
this.s.subscribe((isDone, value, tx) => {
if (isDone) {
f(true, undefined, tx);
return;
}
const mappedValue = this.mapFn(value);
f(false, mappedValue, tx);
});
}
}
class QueryStreamIndexJoin extends QueryStreamBase> {
constructor(
public s: QueryStreamBase,
public storeName: string,
public indexName: string,
public key: any,
) {
super(s.root);
}
subscribe(f: SubscribeFn) {
this.s.subscribe((isDone, value, tx) => {
if (isDone) {
f(true, undefined, tx);
return;
}
const joinKey = this.key(value);
const s = tx.objectStore(this.storeName).index(this.indexName);
const req = s.openCursor(IDBKeyRange.only(joinKey));
req.onsuccess = () => {
const cursor = req.result;
if (cursor) {
f(false, { left: value, right: cursor.value }, tx);
cursor.continue();
}
};
});
}
}
class QueryStreamIndexJoinLeft extends QueryStreamBase<
JoinLeftResult
> {
constructor(
public s: QueryStreamBase,
public storeName: string,
public indexName: string,
public key: any,
) {
super(s.root);
}
subscribe(f: SubscribeFn) {
this.s.subscribe((isDone, value, tx) => {
if (isDone) {
f(true, undefined, tx);
return;
}
const s = tx.objectStore(this.storeName).index(this.indexName);
const req = s.openCursor(IDBKeyRange.only(this.key(value)));
let gotMatch = false;
req.onsuccess = () => {
const cursor = req.result;
if (cursor) {
gotMatch = true;
f(false, { left: value, right: cursor.value }, tx);
cursor.continue();
} else {
if (!gotMatch) {
f(false, { left: value }, tx);
}
}
};
});
}
}
class QueryStreamKeyJoin extends QueryStreamBase> {
constructor(
public s: QueryStreamBase,
public storeName: string,
public key: any,
) {
super(s.root);
}
subscribe(f: SubscribeFn) {
this.s.subscribe((isDone, value, tx) => {
if (isDone) {
f(true, undefined, tx);
return;
}
const s = tx.objectStore(this.storeName);
const req = s.openCursor(IDBKeyRange.only(this.key(value)));
req.onsuccess = () => {
const cursor = req.result;
if (cursor) {
f(false, { left: value, right: cursor.value }, tx);
cursor.continue();
} else {
f(true, undefined, tx);
}
};
});
}
}
class IterQueryStream extends QueryStreamBase {
private storeName: string;
private options: any;
private subscribers: SubscribeFn[];
constructor(qr: QueryRoot, storeName: string, options: any) {
super(qr);
this.options = options;
this.storeName = storeName;
this.subscribers = [];
const doIt = (tx: IDBTransaction) => {
const { indexName = void 0, only = void 0 } = this.options;
let s: any;
if (indexName !== void 0) {
s = tx.objectStore(this.storeName).index(this.options.indexName);
} else {
s = tx.objectStore(this.storeName);
}
let kr: IDBKeyRange | undefined;
if (only !== undefined) {
kr = IDBKeyRange.only(this.options.only);
}
const req = s.openCursor(kr);
req.onsuccess = () => {
const cursor: IDBCursorWithValue = req.result;
if (cursor) {
for (const f of this.subscribers) {
f(false, cursor.value, tx);
}
cursor.continue();
} else {
for (const f of this.subscribers) {
f(true, undefined, tx);
}
}
};
};
this.root.addWork(doIt);
}
subscribe(f: SubscribeFn) {
this.subscribers.push(f);
}
}
/**
* Root wrapper around an IndexedDB for queries with a fluent interface.
*/
export class QueryRoot {
private work: Array<(t: IDBTransaction) => void> = [];
private stores: Set = new Set();
private kickoffPromise: Promise;
/**
* Some operations is a write operation,
* and we need to do a "readwrite" transaction/
*/
private hasWrite: boolean;
private finishScheduled: boolean;
private finished: boolean = false;
private keys: { [keyName: string]: IDBValidKey } = {};
constructor(public db: IDBDatabase) {}
/**
* Get a named key that was created during the query.
*/
key(keyName: string): IDBValidKey | undefined {
return this.keys[keyName];
}
private checkFinished() {
if (this.finished) {
throw Error("Can't add work to query after it was started");
}
}
/**
* Get a stream of all objects in the store.
*/
iter(store: Store): QueryStream {
this.checkFinished();
this.stores.add(store.name);
this.scheduleFinish();
return new IterQueryStream(this, store.name, {});
}
/**
* Count the number of objects in a store.
*/
count(store: Store): Promise {
this.checkFinished();
const { resolve, promise } = openPromise();
const doCount = (tx: IDBTransaction) => {
const s = tx.objectStore(store.name);
const req = s.count();
req.onsuccess = () => {
resolve(req.result);
};
};
this.addWork(doCount, store.name, false);
return Promise.resolve()
.then(() => this.finish())
.then(() => promise);
}
/**
* Delete all objects in a store that match a predicate.
*/
deleteIf(
store: Store,
predicate: (x: T, n: number) => boolean,
): QueryRoot {
this.checkFinished();
const doDeleteIf = (tx: IDBTransaction) => {
const s = tx.objectStore(store.name);
const req = s.openCursor();
let n = 0;
req.onsuccess = () => {
const cursor: IDBCursorWithValue | null = req.result;
if (cursor) {
if (predicate(cursor.value, n++)) {
cursor.delete();
}
cursor.continue();
}
};
};
this.addWork(doDeleteIf, store.name, true);
return this;
}
iterIndex(
index: Index,
only?: S,
): QueryStream {
this.checkFinished();
this.stores.add(index.storeName);
this.scheduleFinish();
return new IterQueryStream(this, index.storeName, {
indexName: index.indexName,
only,
});
}
/**
* Put an object into the given object store.
* Overrides if an existing object with the same key exists
* in the store.
*/
put(store: Store, val: T, keyName?: string): QueryRoot {
this.checkFinished();
const doPut = (tx: IDBTransaction) => {
const req = tx.objectStore(store.name).put(val);
if (keyName) {
req.onsuccess = () => {
this.keys[keyName] = req.result;
};
}
};
this.scheduleFinish();
this.addWork(doPut, store.name, true);
return this;
}
/**
* Put an object into a store or return an existing record.
*/
putOrGetExisting(store: Store, val: T, key: IDBValidKey): Promise {
this.checkFinished();
const { resolve, promise } = openPromise();
const doPutOrGet = (tx: IDBTransaction) => {
const objstore = tx.objectStore(store.name);
const req = objstore.get(key);
req.onsuccess = () => {
if (req.result !== undefined) {
resolve(req.result);
} else {
const req2 = objstore.add(val);
req2.onsuccess = () => {
resolve(val);
};
}
};
};
this.scheduleFinish();
this.addWork(doPutOrGet, store.name, true);
return promise;
}
putWithResult(store: Store, val: T): Promise {
this.checkFinished();
const { resolve, promise } = openPromise();
const doPutWithResult = (tx: IDBTransaction) => {
const req = tx.objectStore(store.name).put(val);
req.onsuccess = () => {
resolve(req.result);
};
this.scheduleFinish();
};
this.addWork(doPutWithResult, store.name, true);
return Promise.resolve()
.then(() => this.finish())
.then(() => promise);
}
/**
* Update objects inside a transaction.
*
* If the mutation function throws AbortTransaction, the whole transaction will be aborted.
* If the mutation function returns undefined or null, no modification will be made.
*/
mutate(store: Store, key: any, f: (v: T) => T | undefined): QueryRoot {
this.checkFinished();
const doPut = (tx: IDBTransaction) => {
const req = tx.objectStore(store.name).openCursor(IDBKeyRange.only(key));
req.onsuccess = () => {
const cursor = req.result;
if (cursor) {
const value = cursor.value;
let modifiedValue: T | undefined;
try {
modifiedValue = f(value);
} catch (e) {
if (e === AbortTransaction) {
tx.abort();
return;
}
throw e;
}
if (modifiedValue !== undefined && modifiedValue !== null) {
cursor.update(modifiedValue);
}
cursor.continue();
}
};
};
this.scheduleFinish();
this.addWork(doPut, store.name, true);
return this;
}
/**
* Add all object from an iterable to the given object store.
*/
putAll(store: Store, iterable: T[]): QueryRoot {
this.checkFinished();
const doPutAll = (tx: IDBTransaction) => {
for (const obj of iterable) {
tx.objectStore(store.name).put(obj);
}
};
this.scheduleFinish();
this.addWork(doPutAll, store.name, true);
return this;
}
/**
* Add an object to the given object store.
* Fails if the object's key is already present
* in the object store.
*/
add(store: Store, val: T): QueryRoot {
this.checkFinished();
const doAdd = (tx: IDBTransaction) => {
tx.objectStore(store.name).add(val);
};
this.scheduleFinish();
this.addWork(doAdd, store.name, true);
return this;
}
/**
* Get one object from a store by its key.
*/
get(store: Store, key: any): Promise {
this.checkFinished();
if (key === void 0) {
throw Error("key must not be undefined");
}
const { resolve, promise } = openPromise();
const doGet = (tx: IDBTransaction) => {
const req = tx.objectStore(store.name).get(key);
req.onsuccess = () => {
resolve(req.result);
};
};
this.addWork(doGet, store.name, false);
return Promise.resolve()
.then(() => this.finish())
.then(() => promise);
}
/**
* Get get objects from a store by their keys.
* If no object for a key exists, the resulting position in the array
* contains 'undefined'.
*/
getMany(store: Store, keys: any[]): Promise {
this.checkFinished();
const { resolve, promise } = openPromise();
const results: T[] = [];
const doGetMany = (tx: IDBTransaction) => {
for (const key of keys) {
if (key === void 0) {
throw Error("key must not be undefined");
}
const req = tx.objectStore(store.name).get(key);
req.onsuccess = () => {
results.push(req.result);
if (results.length === keys.length) {
resolve(results);
}
};
}
};
this.addWork(doGetMany, store.name, false);
return Promise.resolve()
.then(() => this.finish())
.then(() => promise);
}
/**
* Get one object from a store by its key.
*/
getIndexed(
index: Index,
key: I,
): Promise {
this.checkFinished();
if (key === void 0) {
throw Error("key must not be undefined");
}
const { resolve, promise } = openPromise();
const doGetIndexed = (tx: IDBTransaction) => {
const req = tx
.objectStore(index.storeName)
.index(index.indexName)
.get(key);
req.onsuccess = () => {
resolve(req.result);
};
};
this.addWork(doGetIndexed, index.storeName, false);
return Promise.resolve()
.then(() => this.finish())
.then(() => promise);
}
private scheduleFinish() {
if (!this.finishScheduled) {
Promise.resolve().then(() => this.finish());
this.finishScheduled = true;
}
}
/**
* Finish the query, and start the query in the first place if necessary.
*/
finish(): Promise {
if (this.kickoffPromise) {
return this.kickoffPromise;
}
this.kickoffPromise = new Promise((resolve, reject) => {
// At this point, we can't add any more work
this.finished = true;
if (this.work.length === 0) {
resolve();
return;
}
const mode = this.hasWrite ? "readwrite" : "readonly";
const tx = this.db.transaction(Array.from(this.stores), mode);
tx.oncomplete = () => {
resolve();
};
tx.onabort = () => {
console.warn(
`aborted ${mode} transaction on stores [${[...this.stores]}]`,
);
reject(Error("transaction aborted"));
};
tx.onerror = e => {
console.warn(`error in transaction`, (e.target as any).error);
};
for (const w of this.work) {
w(tx);
}
});
return this.kickoffPromise;
}
/**
* Delete an object by from the given object store.
*/
delete(store: Store, key: any): QueryRoot {
this.checkFinished();
const doDelete = (tx: IDBTransaction) => {
tx.objectStore(store.name).delete(key);
};
this.scheduleFinish();
this.addWork(doDelete, store.name, true);
return this;
}
/**
* Low-level function to add a task to the internal work queue.
*/
addWork(
workFn: (t: IDBTransaction) => void,
storeName?: string,
isWrite?: boolean,
) {
this.work.push(workFn);
if (storeName) {
this.addStoreAccess(storeName, isWrite);
}
}
addStoreAccess(storeName: string, isWrite?: boolean) {
if (storeName) {
this.stores.add(storeName);
}
if (isWrite) {
this.hasWrite = true;
}
}
}