aboutsummaryrefslogtreecommitdiff
path: root/lib/wallet
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2016-10-19 23:55:58 +0200
committerFlorian Dold <florian.dold@gmail.com>2016-10-19 23:55:58 +0200
commit9ee0823b7e4a97a2b1812847eaabdf6cf846655e (patch)
tree96e379de500febaad97e428b71bd8817c2ec22fe /lib/wallet
parent9ccc6626acf66ab4d938bfd836e29124b2dd3558 (diff)
downloadwallet-core-9ee0823b7e4a97a2b1812847eaabdf6cf846655e.tar.xz
introduce map for query streams
Diffstat (limited to 'lib/wallet')
-rw-r--r--lib/wallet/query.ts32
-rw-r--r--lib/wallet/wallet.ts34
2 files changed, 49 insertions, 17 deletions
diff --git a/lib/wallet/query.ts b/lib/wallet/query.ts
index c369e4b67..acf9aa44d 100644
--- a/lib/wallet/query.ts
+++ b/lib/wallet/query.ts
@@ -66,7 +66,8 @@ export interface QueryStream<T> {
keyFn: (obj: T) => I): QueryStream<JoinResult<T,S>>;
filter(f: (T: any) => boolean): QueryStream<T>;
reduce<S>(f: (v: T, acc: S) => S, start?: S): Promise<S>;
- flatMap(f: (x: T) => T[]): QueryStream<T>;
+ map<S>(f: (x:T) => S): QueryStream<S>;
+ flatMap<S>(f: (x: T) => S[]): QueryStream<S>;
toArray(): Promise<T[]>;
}
@@ -102,10 +103,14 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
this.root = root;
}
- flatMap(f: (x: T) => T[]): QueryStream<T> {
+ flatMap<S>(f: (x: T) => T[]): QueryStream<S> {
return new QueryStreamFlatMap(this, f);
}
+ map<S>(f: (x: T) => S): QueryStream<T> {
+ return new QueryStreamMap(this, f);
+ }
+
indexJoin<S,I extends IDBValidKey>(index: Index<I,S>,
keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>> {
this.root.addStoreAccess(index.storeName, false);
@@ -213,6 +218,29 @@ class QueryStreamFlatMap<T> extends QueryStreamBase<T> {
}
+class QueryStreamMap<T> extends QueryStreamBase<T> {
+ s: QueryStreamBase<T>;
+ mapFn: (v: T) => T[];
+
+ constructor(s: QueryStreamBase<T>, mapFn: (v: T) => T[]) {
+ super(s.root);
+ this.s = s;
+ this.mapFn = mapFn;
+ }
+
+ subscribe(f: SubscribeFn) {
+ this.s.subscribe((isDone, value, tx) => {
+ if (isDone) {
+ f(true, undefined, tx);
+ return;
+ }
+ let mappedValue = this.mapFn(value);
+ f(false, mappedValue, tx);
+ });
+ }
+}
+
+
class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
s: QueryStreamBase<T>;
storeName: string;
diff --git a/lib/wallet/wallet.ts b/lib/wallet/wallet.ts
index 601cf3536..53508cf59 100644
--- a/lib/wallet/wallet.ts
+++ b/lib/wallet/wallet.ts
@@ -406,14 +406,14 @@ export class Wallet {
updateExchanges(): void {
console.log("updating exchanges");
- this.q()
- .iter(Stores.exchanges)
- .reduce((exchange: IExchangeInfo) => {
- this.updateExchangeFromUrl(exchange.baseUrl)
- .catch((e) => {
- console.error("updating exchange failed", e);
- });
- });
+ let exchangesUrls = this.q().iter(Stores.exchanges).map((e) => e.baseUrl);
+
+ for (let url of exchangesUrls) {
+ this.updateExchangeFromUrl(url)
+ .catch((e) => {
+ console.error("updating exchange failed", e);
+ });
+ }
}
/**
@@ -1291,9 +1291,6 @@ export class Wallet {
async createRefreshSession(oldCoinPub: string): Promise<RefreshSession|undefined> {
-
- // FIXME: this is not running in a transaction.
-
let coin = await this.q().get<Coin>(Stores.coins, oldCoinPub);
if (!coin) {
@@ -1335,13 +1332,20 @@ export class Wallet {
newCoinDenoms,
oldDenom.fee_refresh));
- coin.currentAmount = Amounts.sub(coin.currentAmount,
- refreshSession.valueWithFee).amount;
+ function mutateCoin(c: Coin): Coin {
+ let r = Amounts.sub(coin.currentAmount,
+ refreshSession.valueWithFee);
+ if (r.saturated) {
+ // Something else must have written the coin value
+ throw AbortTransaction;
+ }
+ c.currentAmount = r.amount;
+ return c;
+ }
- // FIXME: we should check whether the amount still matches!
await this.q()
.put(Stores.refresh, refreshSession)
- .put(Stores.coins, coin)
+ .mutate(Stores.coins, coin.coinPub, mutateCoin)
.finish();
return refreshSession;