aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2016-05-24 00:28:24 +0200
committerFlorian Dold <florian.dold@gmail.com>2016-05-24 00:28:24 +0200
commitafc87f3c5904274195d80868beae90a9dfa4e613 (patch)
tree76d29b385436837ccc703a0a5d332d636d1279bd
parenta96d9aa386f22dc770cc1f7d8be362e49544e9aa (diff)
implement flatMap operation for query streams
-rw-r--r--lib/wallet/query.ts30
1 files changed, 30 insertions, 0 deletions
diff --git a/lib/wallet/query.ts b/lib/wallet/query.ts
index 1e39fda0f..d5ec1fecc 100644
--- a/lib/wallet/query.ts
+++ b/lib/wallet/query.ts
@@ -38,6 +38,7 @@ export interface QueryStream<T> {
keyFn: (obj: any) => any): QueryStream<[T,S]>;
filter(f: (any) => boolean): QueryStream<T>;
reduce<S>(f: (v: T, acc: S) => S, start?: S): Promise<S>;
+ flatMap(f: (T) => T[]): QueryStream<T>;
}
@@ -68,6 +69,10 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
this.root = root;
}
+ flatMap(f: (T) => T[]): QueryStream<T> {
+ return new QueryStreamFlatMap(this, f);
+ }
+
indexJoin<S>(storeName: string,
indexName: string,
key: any): QueryStream<[T,S]> {
@@ -117,6 +122,31 @@ class QueryStreamFilter<T> extends QueryStreamBase<T> {
return;
}
if (this.filterFn(value)) {
+ f(false, value, tx);
+ }
+ });
+ }
+}
+
+
+class QueryStreamFlatMap<T> extends QueryStreamBase<T> {
+ s: QueryStreamBase<T>;
+ flatMapFn;
+
+ constructor(s: QueryStreamBase<T>, flatMapFn) {
+ super(s.root);
+ this.s = s;
+ this.flatMap = flatMapFn;
+ }
+
+ subscribe(f) {
+ this.s.subscribe((isDone, value, tx) => {
+ if (isDone) {
+ f(true, undefined, tx);
+ return;
+ }
+ let values = this.flatMapFn(value);
+ for (let v in values) {
f(false, value, tx)
}
});