From 9ee0823b7e4a97a2b1812847eaabdf6cf846655e Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 19 Oct 2016 23:55:58 +0200 Subject: introduce map for query streams --- lib/wallet/query.ts | 32 ++++++++++++++++++++++++++++++-- lib/wallet/wallet.ts | 34 +++++++++++++++++++--------------- 2 files changed, 49 insertions(+), 17 deletions(-) (limited to 'lib/wallet') diff --git a/lib/wallet/query.ts b/lib/wallet/query.ts index c369e4b67..acf9aa44d 100644 --- a/lib/wallet/query.ts +++ b/lib/wallet/query.ts @@ -66,7 +66,8 @@ export interface QueryStream { keyFn: (obj: T) => I): QueryStream>; filter(f: (T: any) => boolean): QueryStream; reduce(f: (v: T, acc: S) => S, start?: S): Promise; - flatMap(f: (x: T) => T[]): QueryStream; + map(f: (x:T) => S): QueryStream; + flatMap(f: (x: T) => S[]): QueryStream; toArray(): Promise; } @@ -102,10 +103,14 @@ abstract class QueryStreamBase implements QueryStream { this.root = root; } - flatMap(f: (x: T) => T[]): QueryStream { + flatMap(f: (x: T) => T[]): QueryStream { return new QueryStreamFlatMap(this, f); } + map(f: (x: T) => S): QueryStream { + return new QueryStreamMap(this, f); + } + indexJoin(index: Index, keyFn: (obj: T) => I): QueryStream> { this.root.addStoreAccess(index.storeName, false); @@ -213,6 +218,29 @@ class QueryStreamFlatMap extends QueryStreamBase { } +class QueryStreamMap extends QueryStreamBase { + s: QueryStreamBase; + mapFn: (v: T) => T[]; + + constructor(s: QueryStreamBase, mapFn: (v: T) => T[]) { + super(s.root); + this.s = s; + this.mapFn = mapFn; + } + + subscribe(f: SubscribeFn) { + this.s.subscribe((isDone, value, tx) => { + if (isDone) { + f(true, undefined, tx); + return; + } + let mappedValue = this.mapFn(value); + f(false, mappedValue, tx); + }); + } +} + + class QueryStreamIndexJoin extends QueryStreamBase> { s: QueryStreamBase; storeName: string; diff --git a/lib/wallet/wallet.ts b/lib/wallet/wallet.ts index 601cf3536..53508cf59 100644 --- a/lib/wallet/wallet.ts +++ b/lib/wallet/wallet.ts @@ -406,14 +406,14 @@ export class Wallet { updateExchanges(): void { console.log("updating exchanges"); - this.q() - .iter(Stores.exchanges) - .reduce((exchange: IExchangeInfo) => { - this.updateExchangeFromUrl(exchange.baseUrl) - .catch((e) => { - console.error("updating exchange failed", e); - }); - }); + let exchangesUrls = this.q().iter(Stores.exchanges).map((e) => e.baseUrl); + + for (let url of exchangesUrls) { + this.updateExchangeFromUrl(url) + .catch((e) => { + console.error("updating exchange failed", e); + }); + } } /** @@ -1291,9 +1291,6 @@ export class Wallet { async createRefreshSession(oldCoinPub: string): Promise { - - // FIXME: this is not running in a transaction. - let coin = await this.q().get(Stores.coins, oldCoinPub); if (!coin) { @@ -1335,13 +1332,20 @@ export class Wallet { newCoinDenoms, oldDenom.fee_refresh)); - coin.currentAmount = Amounts.sub(coin.currentAmount, - refreshSession.valueWithFee).amount; + function mutateCoin(c: Coin): Coin { + let r = Amounts.sub(coin.currentAmount, + refreshSession.valueWithFee); + if (r.saturated) { + // Something else must have written the coin value + throw AbortTransaction; + } + c.currentAmount = r.amount; + return c; + } - // FIXME: we should check whether the amount still matches! await this.q() .put(Stores.refresh, refreshSession) - .put(Stores.coins, coin) + .mutate(Stores.coins, coin.coinPub, mutateCoin) .finish(); return refreshSession; -- cgit v1.2.3