aboutsummaryrefslogtreecommitdiff
path: root/src/query.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2016-11-13 23:30:18 +0100
committerFlorian Dold <florian.dold@gmail.com>2016-11-13 23:31:17 +0100
commitf3fb8be7db6de87dae40d41bd5597a735c800ca1 (patch)
tree1a061db04de8f5bb5a6b697fa56a9948f67fac2f /src/query.ts
parent200d83c3886149ebb3f018530302079e12a81f6b (diff)
restructuring
Diffstat (limited to 'src/query.ts')
-rw-r--r--src/query.ts612
1 files changed, 612 insertions, 0 deletions
diff --git a/src/query.ts b/src/query.ts
new file mode 100644
index 000000000..08e270ea6
--- /dev/null
+++ b/src/query.ts
@@ -0,0 +1,612 @@
+/*
+ This file is part of TALER
+ (C) 2016 GNUnet e.V.
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+
+/**
+ * Database query abstractions.
+ * @module Query
+ * @author Florian Dold
+ */
+
+"use strict";
+
+
+export interface JoinResult<L,R> {
+ left: L;
+ right: R;
+}
+
+
+export class Store<T> {
+ name: string;
+ validator?: (v: T) => T;
+ storeParams: IDBObjectStoreParameters;
+
+ constructor(name: string, storeParams: IDBObjectStoreParameters,
+ validator?: (v: T) => T) {
+ this.name = name;
+ this.validator = validator;
+ this.storeParams = storeParams;
+ }
+}
+
+export class Index<S extends IDBValidKey,T> {
+ indexName: string;
+ storeName: string;
+ keyPath: string | string[];
+
+ constructor(s: Store<T>, indexName: string, keyPath: string | string[]) {
+ this.storeName = s.name;
+ this.indexName = indexName;
+ this.keyPath = keyPath;
+ }
+}
+
+/**
+ * Stream that can be filtered, reduced or joined
+ * with indices.
+ */
+export interface QueryStream<T> {
+ indexJoin<S,I extends IDBValidKey>(index: Index<I,S>,
+ keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>>;
+ keyJoin<S,I extends IDBValidKey>(store: Store<S>,
+ keyFn: (obj: T) => I): QueryStream<JoinResult<T,S>>;
+ filter(f: (T: any) => boolean): QueryStream<T>;
+ reduce<S>(f: (v: T, acc: S) => S, start?: S): Promise<S>;
+ map<S>(f: (x:T) => S): QueryStream<S>;
+ flatMap<S>(f: (x: T) => S[]): QueryStream<S>;
+ toArray(): Promise<T[]>;
+
+ then(onfulfill: any, onreject: any): any;
+}
+
+export let AbortTransaction = Symbol("abort_transaction");
+
+/**
+ * Get an unresolved promise together with its extracted resolve / reject
+ * function.
+ */
+function openPromise<T>() {
+ let resolve: ((value?: T | PromiseLike<T>) => void) | null = null;
+ let reject: ((reason?: any) => void) | null = null;
+ const promise = new Promise<T>((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ if (!(resolve && reject)) {
+ // Never happens, unless JS implementation is broken
+ throw Error();
+ }
+ return {resolve, reject, promise};
+}
+
+
+abstract class QueryStreamBase<T> implements QueryStream<T>, PromiseLike<void> {
+ abstract subscribe(f: (isDone: boolean,
+ value: any,
+ tx: IDBTransaction) => void): void;
+
+ root: QueryRoot;
+
+ constructor(root: QueryRoot) {
+ this.root = root;
+ }
+
+ then<R>(onfulfilled: (value: void) => R | PromiseLike<R>, onrejected: (reason: any) => R | PromiseLike<R>): PromiseLike<R> {
+ return this.root.then(onfulfilled, onrejected);
+ }
+
+ flatMap<S>(f: (x: T) => S[]): QueryStream<S> {
+ return new QueryStreamFlatMap<T,S>(this, f);
+ }
+
+ map<S>(f: (x: T) => S): QueryStream<S> {
+ return new QueryStreamMap(this, f);
+ }
+
+ indexJoin<S,I extends IDBValidKey>(index: Index<I,S>,
+ keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>> {
+ this.root.addStoreAccess(index.storeName, false);
+ return new QueryStreamIndexJoin(this, index.storeName, index.indexName, keyFn);
+ }
+
+ keyJoin<S, I extends IDBValidKey>(store: Store<S>,
+ keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>> {
+ this.root.addStoreAccess(store.name, false);
+ return new QueryStreamKeyJoin(this, store.name, keyFn);
+ }
+
+ filter(f: (x: any) => boolean): QueryStream<T> {
+ return new QueryStreamFilter(this, f);
+ }
+
+ toArray(): Promise<T[]> {
+ let {resolve, promise} = openPromise();
+ let values: T[] = [];
+
+ this.subscribe((isDone, value) => {
+ if (isDone) {
+ resolve(values);
+ return;
+ }
+ values.push(value);
+ });
+
+ return Promise.resolve()
+ .then(() => this.root.finish())
+ .then(() => promise);
+ }
+
+ reduce<A>(f: (x: any, acc?: A) => A, init?: A): Promise<any> {
+ let {resolve, promise} = openPromise();
+ let acc = init;
+
+ this.subscribe((isDone, value) => {
+ if (isDone) {
+ resolve(acc);
+ return;
+ }
+ acc = f(value, acc);
+ });
+
+ return Promise.resolve()
+ .then(() => this.root.finish())
+ .then(() => promise);
+ }
+}
+
+type FilterFn = (e: any) => boolean;
+type SubscribeFn = (done: boolean, value: any, tx: IDBTransaction) => void;
+
+interface FlatMapFn<T> {
+ (v: T): T[];
+}
+
+class QueryStreamFilter<T> extends QueryStreamBase<T> {
+ s: QueryStreamBase<T>;
+ filterFn: FilterFn;
+
+ constructor(s: QueryStreamBase<T>, filterFn: FilterFn) {
+ super(s.root);
+ this.s = s;
+ this.filterFn = filterFn;
+ }
+
+ subscribe(f: SubscribeFn) {
+ this.s.subscribe((isDone, value, tx) => {
+ if (isDone) {
+ f(true, undefined, tx);
+ return;
+ }
+ if (this.filterFn(value)) {
+ f(false, value, tx);
+ }
+ });
+ }
+}
+
+
+class QueryStreamFlatMap<T,S> extends QueryStreamBase<S> {
+ s: QueryStreamBase<T>;
+ flatMapFn: (v: T) => S[];
+
+ constructor(s: QueryStreamBase<T>, flatMapFn: (v: T) => S[]) {
+ super(s.root);
+ this.s = s;
+ this.flatMapFn = flatMapFn;
+ }
+
+ subscribe(f: SubscribeFn) {
+ this.s.subscribe((isDone, value, tx) => {
+ if (isDone) {
+ f(true, undefined, tx);
+ return;
+ }
+ let values = this.flatMapFn(value);
+ for (let v in values) {
+ f(false, value, tx)
+ }
+ });
+ }
+}
+
+
+class QueryStreamMap<S,T> extends QueryStreamBase<T> {
+ s: QueryStreamBase<S>;
+ mapFn: (v: S) => T;
+
+ constructor(s: QueryStreamBase<S>, mapFn: (v: S) => T) {
+ super(s.root);
+ this.s = s;
+ this.mapFn = mapFn;
+ }
+
+ subscribe(f: SubscribeFn) {
+ this.s.subscribe((isDone, value, tx) => {
+ if (isDone) {
+ f(true, undefined, tx);
+ return;
+ }
+ let mappedValue = this.mapFn(value);
+ f(false, mappedValue, tx);
+ });
+ }
+}
+
+
+class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
+ s: QueryStreamBase<T>;
+ storeName: string;
+ key: any;
+ indexName: string;
+
+ constructor(s: QueryStreamBase<T>, storeName: string, indexName: string,
+ key: any) {
+ super(s.root);
+ this.s = s;
+ this.storeName = storeName;
+ this.key = key;
+ this.indexName = indexName;
+ }
+
+ subscribe(f: SubscribeFn) {
+ this.s.subscribe((isDone, value, tx) => {
+ if (isDone) {
+ f(true, undefined, tx);
+ return;
+ }
+ console.log("joining on", this.key(value));
+ let s = tx.objectStore(this.storeName).index(this.indexName);
+ let req = s.openCursor(IDBKeyRange.only(this.key(value)));
+ req.onsuccess = () => {
+ let cursor = req.result;
+ if (cursor) {
+ f(false, {left: value, right: cursor.value}, tx);
+ cursor.continue();
+ } else {
+ f(true, undefined, tx);
+ }
+ }
+ });
+ }
+}
+
+
+class QueryStreamKeyJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
+ s: QueryStreamBase<T>;
+ storeName: string;
+ key: any;
+
+ constructor(s: QueryStreamBase<T>, storeName: string,
+ key: any) {
+ super(s.root);
+ this.s = s;
+ this.storeName = storeName;
+ this.key = key;
+ }
+
+ subscribe(f: SubscribeFn) {
+ this.s.subscribe((isDone, value, tx) => {
+ if (isDone) {
+ f(true, undefined, tx);
+ return;
+ }
+ console.log("joining on", this.key(value));
+ let s = tx.objectStore(this.storeName);
+ let req = s.openCursor(IDBKeyRange.only(this.key(value)));
+ req.onsuccess = () => {
+ let cursor = req.result;
+ if (cursor) {
+ f(false, {left:value, right: cursor.value}, tx);
+ cursor.continue();
+ } else {
+ f(true, undefined, tx);
+ }
+ }
+ });
+ }
+}
+
+
+class IterQueryStream<T> extends QueryStreamBase<T> {
+ private storeName: string;
+ private options: any;
+ private subscribers: SubscribeFn[];
+
+ constructor(qr: QueryRoot, storeName: string, options: any) {
+ super(qr);
+ this.options = options;
+ this.storeName = storeName;
+ this.subscribers = [];
+
+ let doIt = (tx: IDBTransaction) => {
+ const {indexName = void 0, only = void 0} = this.options;
+ let s: any;
+ if (indexName !== void 0) {
+ s = tx.objectStore(this.storeName)
+ .index(this.options.indexName);
+ } else {
+ s = tx.objectStore(this.storeName);
+ }
+ let kr: IDBKeyRange | undefined = undefined;
+ if (only !== undefined) {
+ kr = IDBKeyRange.only(this.options.only);
+ }
+ let req = s.openCursor(kr);
+ req.onsuccess = () => {
+ let cursor: IDBCursorWithValue = req.result;
+ if (cursor) {
+ for (let f of this.subscribers) {
+ f(false, cursor.value, tx);
+ }
+ cursor.continue();
+ } else {
+ for (let f of this.subscribers) {
+ f(true, undefined, tx);
+ }
+ }
+ }
+ };
+
+ this.root.addWork(doIt);
+ }
+
+ subscribe(f: SubscribeFn) {
+ this.subscribers.push(f);
+ }
+}
+
+
+export class QueryRoot implements PromiseLike<void> {
+ private work: ((t: IDBTransaction) => void)[] = [];
+ private db: IDBDatabase;
+ private stores = new Set();
+ private kickoffPromise: Promise<void>;
+
+ /**
+ * Some operations is a write operation,
+ * and we need to do a "readwrite" transaction/
+ */
+ private hasWrite: boolean;
+
+ private finishScheduled: boolean;
+
+ constructor(db: IDBDatabase) {
+ this.db = db;
+ }
+
+ then<R>(onfulfilled: (value: void) => R | PromiseLike<R>, onrejected: (reason: any) => R | PromiseLike<R>): PromiseLike<R> {
+ return this.finish().then(onfulfilled, onrejected);
+ }
+
+ iter<T>(store: Store<T>): QueryStream<T> {
+ this.stores.add(store.name);
+ this.scheduleFinish();
+ return new IterQueryStream(this, store.name, {});
+ }
+
+ iterIndex<S extends IDBValidKey,T>(index: Index<S,T>,
+ only?: S): QueryStream<T> {
+ this.stores.add(index.storeName);
+ this.scheduleFinish();
+ return new IterQueryStream(this, index.storeName, {
+ only,
+ indexName: index.indexName
+ });
+ }
+
+ /**
+ * Put an object into the given object store.
+ * Overrides if an existing object with the same key exists
+ * in the store.
+ */
+ put<T>(store: Store<T>, val: T): QueryRoot {
+ let doPut = (tx: IDBTransaction) => {
+ tx.objectStore(store.name).put(val);
+ };
+ this.scheduleFinish();
+ this.addWork(doPut, store.name, true);
+ return this;
+ }
+
+
+ putWithResult<T>(store: Store<T>, val: T): Promise<IDBValidKey> {
+ const {resolve, promise} = openPromise();
+ let doPutWithResult = (tx: IDBTransaction) => {
+ let req = tx.objectStore(store.name).put(val);
+ req.onsuccess = () => {
+ resolve(req.result);
+ }
+ this.scheduleFinish();
+ };
+ this.addWork(doPutWithResult, store.name, true);
+ return Promise.resolve()
+ .then(() => this.finish())
+ .then(() => promise);
+ }
+
+
+ mutate<T>(store: Store<T>, key: any, f: (v: T) => T): QueryRoot {
+ let doPut = (tx: IDBTransaction) => {
+ let reqGet = tx.objectStore(store.name).get(key);
+ reqGet.onsuccess = () => {
+ let r = reqGet.result;
+ let m: T;
+ try {
+ m = f(r);
+ } catch (e) {
+ if (e == AbortTransaction) {
+ tx.abort();
+ return;
+ }
+ throw e;
+ }
+
+ tx.objectStore(store.name).put(m);
+ }
+ };
+ this.scheduleFinish();
+ this.addWork(doPut, store.name, true);
+ return this;
+ }
+
+
+ /**
+ * Add all object from an iterable to the given object store.
+ * Fails if the object's key is already present
+ * in the object store.
+ */
+ putAll<T>(store: Store<T>, iterable: T[]): QueryRoot {
+ const doPutAll = (tx: IDBTransaction) => {
+ for (let obj of iterable) {
+ tx.objectStore(store.name).put(obj);
+ }
+ };
+ this.scheduleFinish();
+ this.addWork(doPutAll, store.name, true);
+ return this;
+ }
+
+ /**
+ * Add an object to the given object store.
+ * Fails if the object's key is already present
+ * in the object store.
+ */
+ add<T>(store: Store<T>, val: T): QueryRoot {
+ const doAdd = (tx: IDBTransaction) => {
+ tx.objectStore(store.name).add(val);
+ };
+ this.scheduleFinish();
+ this.addWork(doAdd, store.name, true);
+ return this;
+ }
+
+ /**
+ * Get one object from a store by its key.
+ */
+ get<T>(store: Store<T>, key: any): Promise<T|undefined> {
+ if (key === void 0) {
+ throw Error("key must not be undefined");
+ }
+
+ const {resolve, promise} = openPromise();
+
+ const doGet = (tx: IDBTransaction) => {
+ const req = tx.objectStore(store.name).get(key);
+ req.onsuccess = () => {
+ resolve(req.result);
+ };
+ };
+
+ this.addWork(doGet, store.name, false);
+ return Promise.resolve()
+ .then(() => this.finish())
+ .then(() => promise);
+ }
+
+ /**
+ * Get one object from a store by its key.
+ */
+ getIndexed<I extends IDBValidKey,T>(index: Index<I,T>,
+ key: I): Promise<T|undefined> {
+ if (key === void 0) {
+ throw Error("key must not be undefined");
+ }
+
+ const {resolve, promise} = openPromise();
+
+ const doGetIndexed = (tx: IDBTransaction) => {
+ const req = tx.objectStore(index.storeName)
+ .index(index.indexName)
+ .get(key);
+ req.onsuccess = () => {
+ resolve(req.result);
+ };
+ };
+
+ this.addWork(doGetIndexed, index.storeName, false);
+ return Promise.resolve()
+ .then(() => this.finish())
+ .then(() => promise);
+ }
+
+ private scheduleFinish() {
+ if (!this.finishScheduled) {
+ Promise.resolve().then(() => this.finish());
+ this.finishScheduled = true;
+ }
+ }
+
+ /**
+ * Finish the query, and start the query in the first place if necessary.
+ */
+ finish(): Promise<void> {
+ if (this.kickoffPromise) {
+ return this.kickoffPromise;
+ }
+ this.kickoffPromise = new Promise<void>((resolve, reject) => {
+ if (this.work.length == 0) {
+ resolve();
+ return;
+ }
+ const mode = this.hasWrite ? "readwrite" : "readonly";
+ const tx = this.db.transaction(Array.from(this.stores), mode);
+ tx.oncomplete = () => {
+ resolve();
+ };
+ tx.onabort = () => {
+ reject(Error("transaction aborted"));
+ };
+ for (let w of this.work) {
+ w(tx);
+ }
+ });
+ return this.kickoffPromise;
+ }
+
+ /**
+ * Delete an object by from the given object store.
+ */
+ delete(storeName: string, key: any): QueryRoot {
+ const doDelete = (tx: IDBTransaction) => {
+ tx.objectStore(storeName).delete(key);
+ };
+ this.scheduleFinish();
+ this.addWork(doDelete, storeName, true);
+ return this;
+ }
+
+ /**
+ * Low-level function to add a task to the internal work queue.
+ */
+ addWork(workFn: (t: IDBTransaction) => void,
+ storeName?: string,
+ isWrite?: boolean) {
+ this.work.push(workFn);
+ if (storeName) {
+ this.addStoreAccess(storeName, isWrite);
+ }
+ }
+
+ addStoreAccess(storeName: string, isWrite?: boolean) {
+ if (storeName) {
+ this.stores.add(storeName);
+ }
+ if (isWrite) {
+ this.hasWrite = true;
+ }
+ }
+}