aboutsummaryrefslogtreecommitdiff
path: root/src/wallet-impl
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2019-12-05 19:38:19 +0100
committerFlorian Dold <florian.dold@gmail.com>2019-12-05 19:38:19 +0100
commitf67d7f54f9d0fed97446898942e3dfee67ee2985 (patch)
tree2b81738025e8f61250ede10908cbf81071e16975 /src/wallet-impl
parent829acdd3d98f1014747f15ecb619b6fbaa06b640 (diff)
downloadwallet-core-f67d7f54f9d0fed97446898942e3dfee67ee2985.tar.xz
threads, retries and notifications WIP
Diffstat (limited to 'src/wallet-impl')
-rw-r--r--src/wallet-impl/balance.ts3
-rw-r--r--src/wallet-impl/errors.ts81
-rw-r--r--src/wallet-impl/exchanges.ts2
-rw-r--r--src/wallet-impl/history.ts8
-rw-r--r--src/wallet-impl/pay.ts207
-rw-r--r--src/wallet-impl/payback.ts9
-rw-r--r--src/wallet-impl/pending.ts561
-rw-r--r--src/wallet-impl/refresh.ts103
-rw-r--r--src/wallet-impl/reserves.ts110
-rw-r--r--src/wallet-impl/return.ts3
-rw-r--r--src/wallet-impl/state.ts63
-rw-r--r--src/wallet-impl/tip.ts44
-rw-r--r--src/wallet-impl/withdraw.ts98
13 files changed, 939 insertions, 353 deletions
diff --git a/src/wallet-impl/balance.ts b/src/wallet-impl/balance.ts
index 94d65fa96..a1351014c 100644
--- a/src/wallet-impl/balance.ts
+++ b/src/wallet-impl/balance.ts
@@ -33,6 +33,7 @@ const logger = new Logger("withdraw.ts");
export async function getBalances(
ws: InternalWalletState,
): Promise<WalletBalance> {
+ logger.trace("starting to compute balance");
/**
* Add amount to a balance field, both for
* the slicing by exchange and currency.
@@ -101,7 +102,7 @@ export async function getBalances(
await tx.iter(Stores.refresh).forEach(r => {
// Don't count finished refreshes, since the refresh already resulted
// in coins being added to the wallet.
- if (r.finished) {
+ if (r.finishedTimestamp) {
return;
}
addTo(
diff --git a/src/wallet-impl/errors.ts b/src/wallet-impl/errors.ts
new file mode 100644
index 000000000..5df99b7d3
--- /dev/null
+++ b/src/wallet-impl/errors.ts
@@ -0,0 +1,81 @@
+import { OperationError } from "../walletTypes";
+
+/*
+ This file is part of GNU Taler
+ (C) 2019 GNUnet e.V.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * This exception is there to let the caller know that an error happened,
+ * but the error has already been reported by writing it to the database.
+ */
+export class OperationFailedAndReportedError extends Error {
+ constructor(message: string) {
+ super(message);
+
+ // Set the prototype explicitly.
+ Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype);
+ }
+}
+
+/**
+ * This exception is thrown when an error occured and the caller is
+ * responsible for recording the failure in the database.
+ */
+export class OperationFailedError extends Error {
+ constructor(message: string, public err: OperationError) {
+ super(message);
+
+ // Set the prototype explicitly.
+ Object.setPrototypeOf(this, OperationFailedError.prototype);
+ }
+}
+
+/**
+ * Run an operation and call the onOpError callback
+ * when there was an exception or operation error that must be reported.
+ * The cause will be re-thrown to the caller.
+ */
+export async function guardOperationException<T>(
+ op: () => Promise<T>,
+ onOpError: (e: OperationError) => Promise<void>,
+): Promise<T> {
+ try {
+ return op();
+ } catch (e) {
+ if (e instanceof OperationFailedAndReportedError) {
+ throw e;
+ }
+ if (e instanceof OperationFailedError) {
+ await onOpError(e.err);
+ throw new OperationFailedAndReportedError(e.message);
+ }
+ if (e instanceof Error) {
+ await onOpError({
+ type: "exception",
+ message: e.message,
+ details: {},
+ });
+ throw new OperationFailedAndReportedError(e.message);
+ }
+ await onOpError({
+ type: "exception",
+ message: "non-error exception thrown",
+ details: {
+ value: e.toString(),
+ },
+ });
+ throw new OperationFailedAndReportedError(e.message);
+ }
+} \ No newline at end of file
diff --git a/src/wallet-impl/exchanges.ts b/src/wallet-impl/exchanges.ts
index b3677c6c6..b89f3f84e 100644
--- a/src/wallet-impl/exchanges.ts
+++ b/src/wallet-impl/exchanges.ts
@@ -17,7 +17,6 @@
import { InternalWalletState } from "./state";
import {
WALLET_CACHE_BREAKER_CLIENT_VERSION,
- OperationFailedAndReportedError,
} from "../wallet";
import { KeysJson, Denomination, ExchangeWireJson } from "../talerTypes";
import { getTimestampNow, OperationError } from "../walletTypes";
@@ -42,6 +41,7 @@ import {
} from "../util/query";
import * as Amounts from "../util/amounts";
import { parsePaytoUri } from "../util/payto";
+import { OperationFailedAndReportedError } from "./errors";
async function denominationRecordFromKeys(
ws: InternalWalletState,
diff --git a/src/wallet-impl/history.ts b/src/wallet-impl/history.ts
index dfc683e6d..5e93ab878 100644
--- a/src/wallet-impl/history.ts
+++ b/src/wallet-impl/history.ts
@@ -78,11 +78,11 @@ export async function getHistory(
fulfillmentUrl: p.contractTerms.fulfillment_url,
merchantName: p.contractTerms.merchant.name,
},
- timestamp: p.timestamp,
+ timestamp: p.acceptTimestamp,
type: "pay",
explicit: false,
});
- if (p.timestamp_refund) {
+ if (p.lastRefundTimestamp) {
const contractAmount = Amounts.parseOrThrow(p.contractTerms.amount);
const amountsPending = Object.keys(p.refundsPending).map(x =>
Amounts.parseOrThrow(p.refundsPending[x].refund_amount),
@@ -103,7 +103,7 @@ export async function getHistory(
merchantName: p.contractTerms.merchant.name,
refundAmount: amount,
},
- timestamp: p.timestamp_refund,
+ timestamp: p.lastRefundTimestamp,
type: "refund",
explicit: false,
});
@@ -151,7 +151,7 @@ export async function getHistory(
merchantBaseUrl: tip.merchantBaseUrl,
tipId: tip.merchantTipId,
},
- timestamp: tip.timestamp,
+ timestamp: tip.createdTimestamp,
explicit: false,
type: "tip",
});
diff --git a/src/wallet-impl/pay.ts b/src/wallet-impl/pay.ts
index 9942139a6..9b2da9c7d 100644
--- a/src/wallet-impl/pay.ts
+++ b/src/wallet-impl/pay.ts
@@ -33,6 +33,8 @@ import {
getTimestampNow,
PreparePayResult,
ConfirmPayResult,
+ OperationError,
+ NotificationType,
} from "../walletTypes";
import {
oneShotIter,
@@ -51,12 +53,14 @@ import {
PurchaseRecord,
CoinRecord,
ProposalStatus,
+ initRetryInfo,
+ updateRetryInfoTimeout,
+ PurchaseStatus,
} from "../dbTypes";
import * as Amounts from "../util/amounts";
import {
amountToPretty,
strcmp,
- extractTalerStamp,
canonicalJson,
extractTalerStampOrThrow,
} from "../util/helpers";
@@ -65,6 +69,8 @@ import { InternalWalletState } from "./state";
import { parsePayUri, parseRefundUri } from "../util/taleruri";
import { getTotalRefreshCost, refresh } from "./refresh";
import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto";
+import { guardOperationException } from "./errors";
+import { assertUnreachable } from "../util/assertUnreachable";
export interface SpeculativePayData {
payCoinInfo: PayCoinInfo;
@@ -344,9 +350,12 @@ async function recordConfirmPay(
payReq,
refundsDone: {},
refundsPending: {},
- timestamp: getTimestampNow(),
- timestamp_refund: undefined,
+ acceptTimestamp: getTimestampNow(),
+ lastRefundTimestamp: undefined,
proposalId: proposal.proposalId,
+ retryInfo: initRetryInfo(),
+ lastError: undefined,
+ status: PurchaseStatus.SubmitPay,
};
await runWithWriteTransaction(
@@ -365,8 +374,10 @@ async function recordConfirmPay(
},
);
- ws.badge.showNotification();
- ws.notifier.notify();
+ ws.notify({
+ type: NotificationType.ProposalAccepted,
+ proposalId: proposal.proposalId,
+ });
return t;
}
@@ -419,7 +430,7 @@ export async function abortFailedPayment(
}
const refundResponse = MerchantRefundResponse.checked(resp.responseJson);
- await acceptRefundResponse(ws, refundResponse);
+ await acceptRefundResponse(ws, purchase.proposalId, refundResponse);
await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => {
const p = await tx.get(Stores.purchases, proposalId);
@@ -431,10 +442,62 @@ export async function abortFailedPayment(
});
}
+async function incrementProposalRetry(
+ ws: InternalWalletState,
+ proposalId: string,
+ err: OperationError | undefined,
+): Promise<void> {
+ await runWithWriteTransaction(ws.db, [Stores.proposals], async tx => {
+ const pr = await tx.get(Stores.proposals, proposalId);
+ if (!pr) {
+ return;
+ }
+ if (!pr.retryInfo) {
+ return;
+ }
+ pr.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(pr.retryInfo);
+ pr.lastError = err;
+ await tx.put(Stores.proposals, pr);
+ });
+}
+
+async function incrementPurchaseRetry(
+ ws: InternalWalletState,
+ proposalId: string,
+ err: OperationError | undefined,
+): Promise<void> {
+ await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => {
+ const pr = await tx.get(Stores.purchases, proposalId);
+ if (!pr) {
+ return;
+ }
+ if (!pr.retryInfo) {
+ return;
+ }
+ pr.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(pr.retryInfo);
+ pr.lastError = err;
+ await tx.put(Stores.purchases, pr);
+ });
+}
+
export async function processDownloadProposal(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
+ const onOpErr = (err: OperationError) =>
+ incrementProposalRetry(ws, proposalId, err);
+ await guardOperationException(
+ () => processDownloadProposalImpl(ws, proposalId),
+ onOpErr,
+ );
+}
+
+async function processDownloadProposalImpl(
+ ws: InternalWalletState,
+ proposalId: string,
+): Promise<void> {
const proposal = await oneShotGet(ws.db, Stores.proposals, proposalId);
if (!proposal) {
return;
@@ -498,7 +561,10 @@ export async function processDownloadProposal(
},
);
- ws.notifier.notify();
+ ws.notify({
+ type: NotificationType.ProposalDownloaded,
+ proposalId: proposal.proposalId,
+ });
}
/**
@@ -536,6 +602,8 @@ async function startDownloadProposal(
proposalId: proposalId,
proposalStatus: ProposalStatus.DOWNLOADING,
repurchaseProposalId: undefined,
+ retryInfo: initRetryInfo(),
+ lastError: undefined,
};
await oneShotPut(ws.db, Stores.proposals, proposalRecord);
@@ -582,6 +650,7 @@ export async function submitPay(
throw Error("merchant payment signature invalid");
}
purchase.finished = true;
+ purchase.retryInfo = initRetryInfo(false);
const modifiedCoins: CoinRecord[] = [];
for (const pc of purchase.payReq.coins) {
const c = await oneShotGet(ws.db, Stores.coins, pc.coin_pub);
@@ -859,8 +928,6 @@ export async function confirmPay(
return submitPay(ws, proposalId, sessionId);
}
-
-
export async function getFullRefundFees(
ws: InternalWalletState,
refundPermissions: MerchantRefundPermission[],
@@ -914,15 +981,13 @@ export async function getFullRefundFees(
return feeAcc;
}
-async function submitRefunds(
+async function submitRefundsToExchange(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId);
if (!purchase) {
- console.error(
- "not submitting refunds, payment not found:",
- );
+ console.error("not submitting refunds, payment not found:");
return;
}
const pendingKeys = Object.keys(purchase.refundsPending);
@@ -991,14 +1056,18 @@ async function submitRefunds(
refresh(ws, perm.coin_pub);
}
- ws.badge.showNotification();
- ws.notifier.notify();
+ ws.notify({
+ type: NotificationType.RefundsSubmitted,
+ proposalId,
+ });
}
-export async function acceptRefundResponse(
+
+async function acceptRefundResponse(
ws: InternalWalletState,
+ proposalId: string,
refundResponse: MerchantRefundResponse,
-): Promise<string> {
+): Promise<void> {
const refundPermissions = refundResponse.refund_permissions;
if (!refundPermissions.length) {
@@ -1015,7 +1084,8 @@ export async function acceptRefundResponse(
return;
}
- t.timestamp_refund = getTimestampNow();
+ t.lastRefundTimestamp = getTimestampNow();
+ t.status = PurchaseStatus.ProcessRefund;
for (const perm of refundPermissions) {
if (
@@ -1027,18 +1097,48 @@ export async function acceptRefundResponse(
}
return t;
}
+ // Add the refund permissions to the purchase within a DB transaction
+ await oneShotMutate(ws.db, Stores.purchases, proposalId, f);
+ await submitRefundsToExchange(ws, proposalId);
+}
- const hc = refundResponse.h_contract_terms;
- // Add the refund permissions to the purchase within a DB transaction
- await oneShotMutate(ws.db, Stores.purchases, hc, f);
- ws.notifier.notify();
+async function queryRefund(ws: InternalWalletState, proposalId: string): Promise<void> {
+ const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId);
+ if (purchase?.status !== PurchaseStatus.QueryRefund) {
+ return;
+ }
- await submitRefunds(ws, hc);
+ const refundUrl = new URL("refund", purchase.contractTerms.merchant_base_url).href
+ let resp;
+ try {
+ resp = await ws.http.get(refundUrl);
+ } catch (e) {
+ console.error("error downloading refund permission", e);
+ throw e;
+ }
- return hc;
+ const refundResponse = MerchantRefundResponse.checked(resp.responseJson);
+ await acceptRefundResponse(ws, proposalId, refundResponse);
}
+async function startRefundQuery(ws: InternalWalletState, proposalId: string): Promise<void> {
+ const success = await runWithWriteTransaction(ws.db, [Stores.purchases], async (tx) => {
+ const p = await tx.get(Stores.purchases, proposalId);
+ if (p?.status !== PurchaseStatus.Done) {
+ return false;
+ }
+ p.status = PurchaseStatus.QueryRefund;
+ return true;
+ });
+
+ if (!success) {
+ return;
+ }
+ await queryRefund(ws, proposalId);
+}
+
+
/**
* Accept a refund, return the contract hash for the contract
* that was involved in the refund.
@@ -1053,17 +1153,56 @@ export async function applyRefund(
throw Error("invalid refund URI");
}
- const refundUrl = parseResult.refundUrl;
+ const purchase = await oneShotGetIndexed(
+ ws.db,
+ Stores.purchases.orderIdIndex,
+ [parseResult.merchantBaseUrl, parseResult.orderId],
+ );
- logger.trace("processing refund");
- let resp;
- try {
- resp = await ws.http.get(refundUrl);
- } catch (e) {
- console.error("error downloading refund permission", e);
- throw e;
+ if (!purchase) {
+ throw Error("no purchase for the taler://refund/ URI was found");
}
- const refundResponse = MerchantRefundResponse.checked(resp.responseJson);
- return acceptRefundResponse(ws, refundResponse);
+ await startRefundQuery(ws, purchase.proposalId);
+
+ return purchase.contractTermsHash;
+}
+
+export async function processPurchase(
+ ws: InternalWalletState,
+ proposalId: string,
+): Promise<void> {
+ const onOpErr = (e: OperationError) =>
+ incrementPurchaseRetry(ws, proposalId, e);
+ await guardOperationException(
+ () => processPurchaseImpl(ws, proposalId),
+ onOpErr,
+ );
+}
+
+export async function processPurchaseImpl(
+ ws: InternalWalletState,
+ proposalId: string,
+): Promise<void> {
+ const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId);
+ if (!purchase) {
+ return;
+ }
+ switch (purchase.status) {
+ case PurchaseStatus.Done:
+ return;
+ case PurchaseStatus.Abort:
+ // FIXME
+ break;
+ case PurchaseStatus.SubmitPay:
+ break;
+ case PurchaseStatus.QueryRefund:
+ await queryRefund(ws, proposalId);
+ break;
+ case PurchaseStatus.ProcessRefund:
+ await submitRefundsToExchange(ws, proposalId);
+ break;
+ default:
+ throw assertUnreachable(purchase.status);
+ }
}
diff --git a/src/wallet-impl/payback.ts b/src/wallet-impl/payback.ts
index 5bf5ff06e..56696d771 100644
--- a/src/wallet-impl/payback.ts
+++ b/src/wallet-impl/payback.ts
@@ -29,6 +29,7 @@ import { Stores, TipRecord, CoinStatus } from "../dbTypes";
import { Logger } from "../util/logging";
import { PaybackConfirmation } from "../talerTypes";
import { updateExchangeFromUrl } from "./exchanges";
+import { NotificationType } from "../walletTypes";
const logger = new Logger("payback.ts");
@@ -65,7 +66,9 @@ export async function payback(
await tx.put(Stores.reserves, reserve);
},
);
- ws.notifier.notify();
+ ws.notify({
+ type: NotificationType.PaybackStarted,
+ });
const paybackRequest = await ws.cryptoApi.createPaybackRequest(coin);
const reqUrl = new URL("payback", coin.exchangeBaseUrl);
@@ -83,6 +86,8 @@ export async function payback(
}
coin.status = CoinStatus.Dormant;
await oneShotPut(ws.db, Stores.coins, coin);
- ws.notifier.notify();
+ ws.notify({
+ type: NotificationType.PaybackFinished,
+ });
await updateExchangeFromUrl(ws, coin.exchangeBaseUrl, true);
}
diff --git a/src/wallet-impl/pending.ts b/src/wallet-impl/pending.ts
index 72102e3a1..bd10538af 100644
--- a/src/wallet-impl/pending.ts
+++ b/src/wallet-impl/pending.ts
@@ -21,8 +21,10 @@ import {
PendingOperationInfo,
PendingOperationsResponse,
getTimestampNow,
+ Timestamp,
+ Duration,
} from "../walletTypes";
-import { runWithReadTransaction } from "../util/query";
+import { runWithReadTransaction, TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state";
import {
Stores,
@@ -32,11 +34,355 @@ import {
ProposalStatus,
} from "../dbTypes";
+function updateRetryDelay(
+ oldDelay: Duration,
+ now: Timestamp,
+ retryTimestamp: Timestamp,
+): Duration {
+ if (retryTimestamp.t_ms <= now.t_ms) {
+ return { d_ms: 0 };
+ }
+ return { d_ms: Math.min(oldDelay.d_ms, retryTimestamp.t_ms - now.t_ms) };
+}
+
+async function gatherExchangePending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ if (onlyDue) {
+ // FIXME: exchanges should also be updated regularly
+ return;
+ }
+ await tx.iter(Stores.exchanges).forEach(e => {
+ switch (e.updateStatus) {
+ case ExchangeUpdateStatus.FINISHED:
+ if (e.lastError) {
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message:
+ "Exchange record is in FINISHED state but has lastError set",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ },
+ });
+ }
+ if (!e.details) {
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message:
+ "Exchange record does not have details, but no update in progress.",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ },
+ });
+ }
+ if (!e.wireInfo) {
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message:
+ "Exchange record does not have wire info, but no update in progress.",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ },
+ });
+ }
+ break;
+ case ExchangeUpdateStatus.FETCH_KEYS:
+ resp.pendingOperations.push({
+ type: "exchange-update",
+ givesLifeness: false,
+ stage: "fetch-keys",
+ exchangeBaseUrl: e.baseUrl,
+ lastError: e.lastError,
+ reason: e.updateReason || "unknown",
+ });
+ break;
+ case ExchangeUpdateStatus.FETCH_WIRE:
+ resp.pendingOperations.push({
+ type: "exchange-update",
+ givesLifeness: false,
+ stage: "fetch-wire",
+ exchangeBaseUrl: e.baseUrl,
+ lastError: e.lastError,
+ reason: e.updateReason || "unknown",
+ });
+ break;
+ default:
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message: "Unknown exchangeUpdateStatus",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ exchangeUpdateStatus: e.updateStatus,
+ },
+ });
+ break;
+ }
+ });
+}
+
+async function gatherReservePending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ // FIXME: this should be optimized by using an index for "onlyDue==true".
+ await tx.iter(Stores.reserves).forEach(reserve => {
+ const reserveType = reserve.bankWithdrawStatusUrl ? "taler-bank" : "manual";
+ if (!reserve.retryInfo.active) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ reserve.retryInfo.nextRetry,
+ );
+ if (onlyDue && reserve.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ switch (reserve.reserveStatus) {
+ case ReserveRecordStatus.DORMANT:
+ // nothing to report as pending
+ break;
+ case ReserveRecordStatus.WITHDRAWING:
+ case ReserveRecordStatus.UNCONFIRMED:
+ case ReserveRecordStatus.QUERYING_STATUS:
+ case ReserveRecordStatus.REGISTERING_BANK:
+ resp.pendingOperations.push({
+ type: "reserve",
+ givesLifeness: true,
+ stage: reserve.reserveStatus,
+ timestampCreated: reserve.created,
+ reserveType,
+ reservePub: reserve.reservePub,
+ retryInfo: reserve.retryInfo,
+ });
+ break;
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ resp.pendingOperations.push({
+ type: "reserve",
+ givesLifeness: true,
+ stage: reserve.reserveStatus,
+ timestampCreated: reserve.created,
+ reserveType,
+ reservePub: reserve.reservePub,
+ bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl,
+ retryInfo: reserve.retryInfo,
+ });
+ break;
+ default:
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message: "Unknown reserve record status",
+ details: {
+ reservePub: reserve.reservePub,
+ reserveStatus: reserve.reserveStatus,
+ },
+ });
+ break;
+ }
+ });
+}
+
+async function gatherRefreshPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.refresh).forEach(r => {
+ if (r.finishedTimestamp) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ r.retryInfo.nextRetry,
+ );
+ if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ let refreshStatus: string;
+ if (r.norevealIndex === undefined) {
+ refreshStatus = "melt";
+ } else {
+ refreshStatus = "reveal";
+ }
+
+ resp.pendingOperations.push({
+ type: "refresh",
+ givesLifeness: true,
+ oldCoinPub: r.meltCoinPub,
+ refreshStatus,
+ refreshOutputSize: r.newDenoms.length,
+ refreshSessionId: r.refreshSessionId,
+ });
+ });
+}
+
+async function gatherCoinsPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ // Refreshing dirty coins is always due.
+ await tx.iter(Stores.coins).forEach(coin => {
+ if (coin.status == CoinStatus.Dirty) {
+ resp.nextRetryDelay.d_ms = 0;
+ resp.pendingOperations.push({
+ givesLifeness: true,
+ type: "dirty-coin",
+ coinPub: coin.coinPub,
+ });
+ }
+ });
+}
+
+async function gatherWithdrawalPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.withdrawalSession).forEach(wsr => {
+ if (wsr.finishTimestamp) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ wsr.retryInfo.nextRetry,
+ );
+ if (onlyDue && wsr.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ const numCoinsWithdrawn = wsr.withdrawn.reduce((a, x) => a + (x ? 1 : 0), 0);
+ const numCoinsTotal = wsr.withdrawn.length;
+ resp.pendingOperations.push({
+ type: "withdraw",
+ givesLifeness: true,
+ numCoinsTotal,
+ numCoinsWithdrawn,
+ source: wsr.source,
+ withdrawSessionId: wsr.withdrawSessionId,
+ });
+ });
+}
+
+async function gatherProposalPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.proposals).forEach(proposal => {
+ if (proposal.proposalStatus == ProposalStatus.PROPOSED) {
+ if (onlyDue) {
+ return;
+ }
+ resp.pendingOperations.push({
+ type: "proposal-choice",
+ givesLifeness: false,
+ merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
+ proposalId: proposal.proposalId,
+ proposalTimestamp: proposal.timestamp,
+ });
+ } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) {
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ proposal.retryInfo.nextRetry,
+ );
+ if (onlyDue && proposal.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ resp.pendingOperations.push({
+ type: "proposal-download",
+ givesLifeness: true,
+ merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
+ proposalId: proposal.proposalId,
+ proposalTimestamp: proposal.timestamp,
+ });
+ }
+ });
+}
+
+async function gatherTipPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.tips).forEach(tip => {
+ if (tip.pickedUp) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ tip.retryInfo.nextRetry,
+ );
+ if (onlyDue && tip.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ if (tip.accepted) {
+ resp.pendingOperations.push({
+ type: "tip",
+ givesLifeness: true,
+ merchantBaseUrl: tip.merchantBaseUrl,
+ tipId: tip.tipId,
+ merchantTipId: tip.merchantTipId,
+ });
+ }
+ });
+}
+
+async function gatherPurchasePending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.purchases).forEach((pr) => {
+ if (pr.finished) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ pr.retryInfo.nextRetry,
+ );
+ if (onlyDue && pr.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ resp.pendingOperations.push({
+ type: "pay",
+ givesLifeness: true,
+ isReplay: false,
+ proposalId: pr.proposalId,
+ });
+ });
+
+}
+
export async function getPendingOperations(
ws: InternalWalletState,
+ onlyDue: boolean = false,
): Promise<PendingOperationsResponse> {
- const pendingOperations: PendingOperationInfo[] = [];
- let minRetryDurationMs = 5000;
+ const resp: PendingOperationsResponse = {
+ nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER },
+ pendingOperations: [],
+ };
+ const now = getTimestampNow();
await runWithReadTransaction(
ws.db,
[
@@ -47,207 +393,18 @@ export async function getPendingOperations(
Stores.withdrawalSession,
Stores.proposals,
Stores.tips,
+ Stores.purchases,
],
async tx => {
- await tx.iter(Stores.exchanges).forEach(e => {
- switch (e.updateStatus) {
- case ExchangeUpdateStatus.FINISHED:
- if (e.lastError) {
- pendingOperations.push({
- type: "bug",
- message:
- "Exchange record is in FINISHED state but has lastError set",
- details: {
- exchangeBaseUrl: e.baseUrl,
- },
- });
- }
- if (!e.details) {
- pendingOperations.push({
- type: "bug",
- message:
- "Exchange record does not have details, but no update in progress.",
- details: {
- exchangeBaseUrl: e.baseUrl,
- },
- });
- }
- if (!e.wireInfo) {
- pendingOperations.push({
- type: "bug",
- message:
- "Exchange record does not have wire info, but no update in progress.",
- details: {
- exchangeBaseUrl: e.baseUrl,
- },
- });
- }
- break;
- case ExchangeUpdateStatus.FETCH_KEYS:
- pendingOperations.push({
- type: "exchange-update",
- stage: "fetch-keys",
- exchangeBaseUrl: e.baseUrl,
- lastError: e.lastError,
- reason: e.updateReason || "unknown",
- });
- break;
- case ExchangeUpdateStatus.FETCH_WIRE:
- pendingOperations.push({
- type: "exchange-update",
- stage: "fetch-wire",
- exchangeBaseUrl: e.baseUrl,
- lastError: e.lastError,
- reason: e.updateReason || "unknown",
- });
- break;
- default:
- pendingOperations.push({
- type: "bug",
- message: "Unknown exchangeUpdateStatus",
- details: {
- exchangeBaseUrl: e.baseUrl,
- exchangeUpdateStatus: e.updateStatus,
- },
- });
- break;
- }
- });
- await tx.iter(Stores.reserves).forEach(reserve => {
- const reserveType = reserve.bankWithdrawStatusUrl
- ? "taler-bank"
- : "manual";
- const now = getTimestampNow();
- switch (reserve.reserveStatus) {
- case ReserveRecordStatus.DORMANT:
- // nothing to report as pending
- break;
- case ReserveRecordStatus.WITHDRAWING:
- case ReserveRecordStatus.UNCONFIRMED:
- case ReserveRecordStatus.QUERYING_STATUS:
- case ReserveRecordStatus.REGISTERING_BANK:
- pendingOperations.push({
- type: "reserve",
- stage: reserve.reserveStatus,
- timestampCreated: reserve.created,
- reserveType,
- reservePub: reserve.reservePub,
- });
- if (reserve.created.t_ms < now.t_ms - 5000) {
- minRetryDurationMs = 500;
- } else if (reserve.created.t_ms < now.t_ms - 30000) {
- minRetryDurationMs = 2000;
- }
- break;
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- pendingOperations.push({
- type: "reserve",
- stage: reserve.reserveStatus,
- timestampCreated: reserve.created,
- reserveType,
- reservePub: reserve.reservePub,
- bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl,
- });
- if (reserve.created.t_ms < now.t_ms - 5000) {
- minRetryDurationMs = 500;
- } else if (reserve.created.t_ms < now.t_ms - 30000) {
- minRetryDurationMs = 2000;
- }
- break;
- default:
- pendingOperations.push({
- type: "bug",
- message: "Unknown reserve record status",
- details: {
- reservePub: reserve.reservePub,
- reserveStatus: reserve.reserveStatus,
- },
- });
- break;
- }
- });
-
- await tx.iter(Stores.refresh).forEach(r => {
- if (r.finished) {
- return;
- }
- let refreshStatus: string;
- if (r.norevealIndex === undefined) {
- refreshStatus = "melt";
- } else {
- refreshStatus = "reveal";
- }
-
- pendingOperations.push({
- type: "refresh",
- oldCoinPub: r.meltCoinPub,
- refreshStatus,
- refreshOutputSize: r.newDenoms.length,
- refreshSessionId: r.refreshSessionId,
- });
- });
-
- await tx.iter(Stores.coins).forEach(coin => {
- if (coin.status == CoinStatus.Dirty) {
- pendingOperations.push({
- type: "dirty-coin",
- coinPub: coin.coinPub,
- });
- }
- });
-
- await tx.iter(Stores.withdrawalSession).forEach(ws => {
- const numCoinsWithdrawn = ws.withdrawn.reduce(
- (a, x) => a + (x ? 1 : 0),
- 0,
- );
- const numCoinsTotal = ws.withdrawn.length;
- if (numCoinsWithdrawn < numCoinsTotal) {
- pendingOperations.push({
- type: "withdraw",
- numCoinsTotal,
- numCoinsWithdrawn,
- source: ws.source,
- withdrawSessionId: ws.withdrawSessionId,
- });
- }
- });
-
- await tx.iter(Stores.proposals).forEach((proposal) => {
- if (proposal.proposalStatus == ProposalStatus.PROPOSED) {
- pendingOperations.push({
- type: "proposal-choice",
- merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
- proposalId: proposal.proposalId,
- proposalTimestamp: proposal.timestamp,
- });
- } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) {
- pendingOperations.push({
- type: "proposal-download",
- merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
- proposalId: proposal.proposalId,
- proposalTimestamp: proposal.timestamp,
- });
- }
- });
-
- await tx.iter(Stores.tips).forEach((tip) => {
- if (tip.accepted && !tip.pickedUp) {
- pendingOperations.push({
- type: "tip",
- merchantBaseUrl: tip.merchantBaseUrl,
- tipId: tip.tipId,
- merchantTipId: tip.merchantTipId,
- });
- }
- });
+ await gatherExchangePending(tx, now, resp, onlyDue);
+ await gatherReservePending(tx, now, resp, onlyDue);
+ await gatherRefreshPending(tx, now, resp, onlyDue);
+ await gatherCoinsPending(tx, now, resp, onlyDue);
+ await gatherWithdrawalPending(tx, now, resp, onlyDue);
+ await gatherProposalPending(tx, now, resp, onlyDue);
+ await gatherTipPending(tx, now, resp, onlyDue);
+ await gatherPurchasePending(tx, now, resp, onlyDue);
},
);
-
- return {
- pendingOperations,
- nextRetryDelay: {
- d_ms: minRetryDurationMs,
- },
- };
+ return resp;
}
diff --git a/src/wallet-impl/refresh.ts b/src/wallet-impl/refresh.ts
index 7e7270ed3..a3b48919d 100644
--- a/src/wallet-impl/refresh.ts
+++ b/src/wallet-impl/refresh.ts
@@ -23,6 +23,8 @@ import {
RefreshPlanchetRecord,
CoinRecord,
RefreshSessionRecord,
+ initRetryInfo,
+ updateRetryInfoTimeout,
} from "../dbTypes";
import { amountToPretty } from "../util/helpers";
import {
@@ -36,6 +38,8 @@ import { InternalWalletState } from "./state";
import { Logger } from "../util/logging";
import { getWithdrawDenomList } from "./withdraw";
import { updateExchangeFromUrl } from "./exchanges";
+import { getTimestampNow, OperationError, NotificationType } from "../walletTypes";
+import { guardOperationException } from "./errors";
const logger = new Logger("refresh.ts");
@@ -132,14 +136,16 @@ async function refreshMelt(
if (rs.norevealIndex !== undefined) {
return;
}
- if (rs.finished) {
+ if (rs.finishedTimestamp) {
return;
}
rs.norevealIndex = norevealIndex;
return rs;
});
- ws.notifier.notify();
+ ws.notify({
+ type: NotificationType.RefreshMelted,
+ });
}
async function refreshReveal(
@@ -225,16 +231,6 @@ async function refreshReveal(
return;
}
- const exchange = oneShotGet(
- ws.db,
- Stores.exchanges,
- refreshSession.exchangeBaseUrl,
- );
- if (!exchange) {
- console.error(`exchange ${refreshSession.exchangeBaseUrl} not found`);
- return;
- }
-
const coins: CoinRecord[] = [];
for (let i = 0; i < respJson.ev_sigs.length; i++) {
@@ -271,32 +267,72 @@ async function refreshReveal(
coins.push(coin);
}
- refreshSession.finished = true;
-
await runWithWriteTransaction(
ws.db,
[Stores.coins, Stores.refresh],
async tx => {
const rs = await tx.get(Stores.refresh, refreshSessionId);
if (!rs) {
+ console.log("no refresh session found");
return;
}
- if (rs.finished) {
+ if (rs.finishedTimestamp) {
+ console.log("refresh session already finished");
return;
}
+ rs.finishedTimestamp = getTimestampNow();
+ rs.retryInfo = initRetryInfo(false);
for (let coin of coins) {
await tx.put(Stores.coins, coin);
}
- await tx.put(Stores.refresh, refreshSession);
+ await tx.put(Stores.refresh, rs);
},
);
- ws.notifier.notify();
+ console.log("refresh finished (end of reveal)");
+ ws.notify({
+ type: NotificationType.RefreshRevealed,
+ });
}
+async function incrementRefreshRetry(
+ ws: InternalWalletState,
+ refreshSessionId: string,
+ err: OperationError | undefined,
+): Promise<void> {
+ await runWithWriteTransaction(ws.db, [Stores.refresh], async tx => {
+ const r = await tx.get(Stores.refresh, refreshSessionId);
+ if (!r) {
+ return;
+ }
+ if (!r.retryInfo) {
+ return;
+ }
+ r.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(r.retryInfo);
+ r.lastError = err;
+ await tx.put(Stores.refresh, r);
+ });
+}
+
+
export async function processRefreshSession(
ws: InternalWalletState,
refreshSessionId: string,
) {
+ return ws.memoProcessRefresh.memo(refreshSessionId, async () => {
+ const onOpErr = (e: OperationError) =>
+ incrementRefreshRetry(ws, refreshSessionId, e);
+ return guardOperationException(
+ () => processRefreshSessionImpl(ws, refreshSessionId),
+ onOpErr,
+ );
+ });
+}
+
+async function processRefreshSessionImpl(
+ ws: InternalWalletState,
+ refreshSessionId: string,
+) {
const refreshSession = await oneShotGet(
ws.db,
Stores.refresh,
@@ -305,7 +341,7 @@ export async function processRefreshSession(
if (!refreshSession) {
return;
}
- if (refreshSession.finished) {
+ if (refreshSession.finishedTimestamp) {
return;
}
if (typeof refreshSession.norevealIndex !== "number") {
@@ -376,7 +412,7 @@ export async function refresh(
x.status = CoinStatus.Dormant;
return x;
});
- ws.notifier.notify();
+ ws.notify( { type: NotificationType.RefreshRefused });
return;
}
@@ -388,29 +424,32 @@ export async function refresh(
oldDenom.feeRefresh,
);
- function mutateCoin(c: CoinRecord): CoinRecord {
- const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee);
- if (r.saturated) {
- // Something else must have written the coin value
- throw TransactionAbort;
- }
- c.currentAmount = r.amount;
- c.status = CoinStatus.Dormant;
- return c;
- }
-
// Store refresh session and subtract refreshed amount from
// coin in the same transaction.
await runWithWriteTransaction(
ws.db,
[Stores.refresh, Stores.coins],
async tx => {
+ const c = await tx.get(Stores.coins, coin.coinPub);
+ if (!c) {
+ return;
+ }
+ if (c.status !== CoinStatus.Dirty) {
+ return;
+ }
+ const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee);
+ if (r.saturated) {
+ console.log("can't refresh coin, no amount left");
+ return;
+ }
+ c.currentAmount = r.amount;
+ c.status = CoinStatus.Dormant;
await tx.put(Stores.refresh, refreshSession);
- await tx.mutate(Stores.coins, coin.coinPub, mutateCoin);
+ await tx.put(Stores.coins, c);
},
);
logger.info(`created refresh session ${refreshSession.refreshSessionId}`);
- ws.notifier.notify();
+ ws.notify( { type: NotificationType.RefreshStarted });
await processRefreshSession(ws, refreshSession.refreshSessionId);
}
diff --git a/src/wallet-impl/reserves.ts b/src/wallet-impl/reserves.ts
index d70f02576..f00956b46 100644
--- a/src/wallet-impl/reserves.ts
+++ b/src/wallet-impl/reserves.ts
@@ -20,6 +20,7 @@ import {
getTimestampNow,
ConfirmReserveRequest,
OperationError,
+ NotificationType,
} from "../walletTypes";
import { canonicalizeBaseUrl } from "../util/helpers";
import { InternalWalletState } from "./state";
@@ -29,6 +30,8 @@ import {
CurrencyRecord,
Stores,
WithdrawalSessionRecord,
+ initRetryInfo,
+ updateRetryInfoTimeout,
} from "../dbTypes";
import {
oneShotMutate,
@@ -42,13 +45,13 @@ import * as Amounts from "../util/amounts";
import { updateExchangeFromUrl, getExchangeTrust } from "./exchanges";
import { WithdrawOperationStatusResponse, ReserveStatus } from "../talerTypes";
import { assertUnreachable } from "../util/assertUnreachable";
-import { OperationFailedAndReportedError } from "../wallet";
import { encodeCrock } from "../crypto/talerCrypto";
import { randomBytes } from "../crypto/primitives/nacl-fast";
import {
getVerifiedWithdrawDenomList,
processWithdrawSession,
} from "./withdraw";
+import { guardOperationException, OperationFailedAndReportedError } from "./errors";
const logger = new Logger("reserves.ts");
@@ -91,7 +94,9 @@ export async function createReserve(
bankWithdrawStatusUrl: req.bankWithdrawStatusUrl,
exchangeWire: req.exchangeWire,
reserveStatus,
- lastStatusQuery: undefined,
+ lastSuccessfulStatusQuery: undefined,
+ retryInfo: initRetryInfo(),
+ lastError: undefined,
};
const senderWire = req.senderWire;
@@ -171,7 +176,7 @@ export async function createReserve(
// Asynchronously process the reserve, but return
// to the caller already.
- processReserve(ws, resp.reservePub).catch(e => {
+ processReserve(ws, resp.reservePub, true).catch(e => {
console.error("Processing reserve failed:", e);
});
@@ -188,18 +193,19 @@ export async function createReserve(
export async function processReserve(
ws: InternalWalletState,
reservePub: string,
+ forceNow: boolean = false,
): Promise<void> {
- const p = ws.memoProcessReserve.find(reservePub);
- if (p) {
- return p;
- } else {
- return ws.memoProcessReserve.put(
- reservePub,
- processReserveImpl(ws, reservePub),
+ return ws.memoProcessReserve.memo(reservePub, async () => {
+ const onOpError = (err: OperationError) =>
+ incrementReserveRetry(ws, reservePub, err);
+ await guardOperationException(
+ () => processReserveImpl(ws, reservePub, forceNow),
+ onOpError,
);
- }
+ });
}
+
async function registerReserveWithBank(
ws: InternalWalletState,
reservePub: string,
@@ -235,6 +241,7 @@ async function registerReserveWithBank(
}
r.timestampReserveInfoPosted = getTimestampNow();
r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK;
+ r.retryInfo = initRetryInfo();
return r;
});
return processReserveBankStatus(ws, reservePub);
@@ -244,6 +251,18 @@ export async function processReserveBankStatus(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
+ const onOpError = (err: OperationError) =>
+ incrementReserveRetry(ws, reservePub, err);
+ await guardOperationException(
+ () => processReserveBankStatusImpl(ws, reservePub),
+ onOpError,
+ );
+}
+
+async function processReserveBankStatusImpl(
+ ws: InternalWalletState,
+ reservePub: string,
+): Promise<void> {
let reserve = await oneShotGet(ws.db, Stores.reserves, reservePub);
switch (reserve?.reserveStatus) {
case ReserveRecordStatus.WAIT_CONFIRM_BANK:
@@ -287,9 +306,10 @@ export async function processReserveBankStatus(
const now = getTimestampNow();
r.timestampConfirmed = now;
r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
+ r.retryInfo = initRetryInfo();
return r;
});
- await processReserveImpl(ws, reservePub);
+ await processReserveImpl(ws, reservePub, true);
} else {
await oneShotMutate(ws.db, Stores.reserves, reservePub, r => {
switch (r.reserveStatus) {
@@ -304,16 +324,24 @@ export async function processReserveBankStatus(
}
}
-async function setReserveError(
+async function incrementReserveRetry(
ws: InternalWalletState,
reservePub: string,
- err: OperationError,
+ err: OperationError | undefined,
): Promise<void> {
- const mut = (reserve: ReserveRecord) => {
- reserve.lastError = err;
- return reserve;
- };
- await oneShotMutate(ws.db, Stores.reserves, reservePub, mut);
+ await runWithWriteTransaction(ws.db, [Stores.reserves], async tx => {
+ const r = await tx.get(Stores.reserves, reservePub);
+ if (!r) {
+ return;
+ }
+ if (!r.retryInfo) {
+ return;
+ }
+ r.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(r.retryInfo);
+ r.lastError = err;
+ await tx.put(Stores.reserves, r);
+ });
}
/**
@@ -345,15 +373,11 @@ async function updateReserve(
} catch (e) {
if (e.response?.status === 404) {
const m = "The exchange does not know about this reserve (yet).";
- await setReserveError(ws, reservePub, {
- type: "waiting",
- details: {},
- message: "The exchange does not know about this reserve (yet).",
- });
- throw new OperationFailedAndReportedError(m);
+ await incrementReserveRetry(ws, reservePub, undefined);
+ return;
} else {
const m = e.message;
- await setReserveError(ws, reservePub, {
+ await incrementReserveRetry(ws, reservePub, {
type: "network",
details: {},
message: m,
@@ -369,7 +393,7 @@ async function updateReserve(
}
// FIXME: check / compare history!
- if (!r.lastStatusQuery) {
+ if (!r.lastSuccessfulStatusQuery) {
// FIXME: check if this matches initial expectations
r.withdrawRemainingAmount = balance;
} else {
@@ -392,22 +416,31 @@ async function updateReserve(
// We're missing some money.
}
}
- r.lastStatusQuery = getTimestampNow();
+ r.lastSuccessfulStatusQuery = getTimestampNow();
r.reserveStatus = ReserveRecordStatus.WITHDRAWING;
+ r.retryInfo = initRetryInfo();
return r;
});
- ws.notifier.notify();
+ ws.notify( { type: NotificationType.ReserveUpdated });
}
async function processReserveImpl(
ws: InternalWalletState,
reservePub: string,
+ forceNow: boolean = false,
): Promise<void> {
const reserve = await oneShotGet(ws.db, Stores.reserves, reservePub);
if (!reserve) {
console.log("not processing reserve: reserve does not exist");
return;
}
+ if (!forceNow) {
+ const now = getTimestampNow();
+ if (reserve.retryInfo.nextRetry.t_ms > now.t_ms) {
+ logger.trace("processReserve retry not due yet");
+ return;
+ }
+ }
logger.trace(
`Processing reserve ${reservePub} with status ${reserve.reserveStatus}`,
);
@@ -417,10 +450,10 @@ async function processReserveImpl(
break;
case ReserveRecordStatus.REGISTERING_BANK:
await processReserveBankStatus(ws, reservePub);
- return processReserveImpl(ws, reservePub);
+ return processReserveImpl(ws, reservePub, true);
case ReserveRecordStatus.QUERYING_STATUS:
await updateReserve(ws, reservePub);
- return processReserveImpl(ws, reservePub);
+ return processReserveImpl(ws, reservePub, true);
case ReserveRecordStatus.WITHDRAWING:
await depleteReserve(ws, reservePub);
break;
@@ -448,12 +481,13 @@ export async function confirmReserve(
}
reserve.timestampConfirmed = now;
reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
+ reserve.retryInfo = initRetryInfo();
return reserve;
});
- ws.notifier.notify();
+ ws.notify({ type: NotificationType.ReserveUpdated });
- processReserve(ws, req.reservePub).catch(e => {
+ processReserve(ws, req.reservePub, true).catch(e => {
console.log("processing reserve failed:", e);
});
}
@@ -489,7 +523,7 @@ async function depleteReserve(
logger.trace(`got denom list`);
if (denomsForWithdraw.length === 0) {
const m = `Unable to withdraw from reserve, no denominations are available to withdraw.`;
- await setReserveError(ws, reserve.reservePub, {
+ await incrementReserveRetry(ws, reserve.reservePub, {
type: "internal",
message: m,
details: {},
@@ -502,7 +536,8 @@ async function depleteReserve(
const withdrawalSessionId = encodeCrock(randomBytes(32));
- const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)).amount;
+ const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value))
+ .amount;
const withdrawalRecord: WithdrawalSessionRecord = {
withdrawSessionId: withdrawalSessionId,
@@ -517,6 +552,9 @@ async function depleteReserve(
withdrawn: denomsForWithdraw.map(x => false),
planchets: denomsForWithdraw.map(x => undefined),
totalCoinValue,
+ retryInfo: initRetryInfo(),
+ lastCoinErrors: denomsForWithdraw.map(x => undefined),
+ lastError: undefined,
};
const totalCoinWithdrawFee = Amounts.sum(
@@ -545,7 +583,7 @@ async function depleteReserve(
r.withdrawRemainingAmount = remaining.amount;
r.withdrawAllocatedAmount = allocated.amount;
r.reserveStatus = ReserveRecordStatus.DORMANT;
-
+ r.retryInfo = initRetryInfo(false);
return r;
}
diff --git a/src/wallet-impl/return.ts b/src/wallet-impl/return.ts
index 9cf12052d..ec19c00ae 100644
--- a/src/wallet-impl/return.ts
+++ b/src/wallet-impl/return.ts
@@ -204,8 +204,6 @@ export async function returnCoins(
}
},
);
- ws.badge.showNotification();
- ws.notifier.notify();
depositReturnedCoins(ws, coinsReturnRecord);
}
@@ -269,6 +267,5 @@ async function depositReturnedCoins(
}
}
await oneShotPut(ws.db, Stores.coinsReturns, currentCrr);
- ws.notifier.notify();
}
}
diff --git a/src/wallet-impl/state.ts b/src/wallet-impl/state.ts
index a04a7dd1c..18df861f1 100644
--- a/src/wallet-impl/state.ts
+++ b/src/wallet-impl/state.ts
@@ -15,19 +15,54 @@
*/
import { HttpRequestLibrary } from "../util/http";
-import { Badge, Notifier, NextUrlResult } from "../walletTypes";
+import {
+ NextUrlResult,
+ WalletBalance,
+ PendingOperationsResponse,
+ WalletNotification,
+} from "../walletTypes";
import { SpeculativePayData } from "./pay";
-import { CryptoApi } from "../crypto/cryptoApi";
-import { AsyncOpMemo } from "../util/asyncMemo";
-
-export interface InternalWalletState {
- db: IDBDatabase;
- http: HttpRequestLibrary;
- badge: Badge;
- notifier: Notifier;
+import { CryptoApi, CryptoWorkerFactory } from "../crypto/workers/cryptoApi";
+import { AsyncOpMemoMap, AsyncOpMemoSingle } from "../util/asyncMemo";
+import { Logger } from "../util/logging";
+
+type NotificationListener = (n: WalletNotification) => void;
+
+const logger = new Logger("state.ts");
+
+export class InternalWalletState {
+ speculativePayData: SpeculativePayData | undefined = undefined;
+ cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {};
+ memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ memoGetPending: AsyncOpMemoSingle<
+ PendingOperationsResponse
+ > = new AsyncOpMemoSingle();
+ memoGetBalance: AsyncOpMemoSingle<WalletBalance> = new AsyncOpMemoSingle();
+ memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
cryptoApi: CryptoApi;
- speculativePayData: SpeculativePayData | undefined;
- cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult };
- memoProcessReserve: AsyncOpMemo<void>;
- memoMakePlanchet: AsyncOpMemo<void>;
-} \ No newline at end of file
+
+ listeners: NotificationListener[] = [];
+
+ constructor(
+ public db: IDBDatabase,
+ public http: HttpRequestLibrary,
+ cryptoWorkerFactory: CryptoWorkerFactory,
+ ) {
+ this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
+ }
+
+ public notify(n: WalletNotification) {
+ logger.trace("Notification", n);
+ for (const l of this.listeners) {
+ const nc = JSON.parse(JSON.stringify(n));
+ setImmediate(() => {
+ l(nc);
+ });
+ }
+ }
+
+ addNotificationListener(f: (n: WalletNotification) => void): void {
+ this.listeners.push(f);
+ }
+}
diff --git a/src/wallet-impl/tip.ts b/src/wallet-impl/tip.ts
index 593f0d612..3ae931d45 100644
--- a/src/wallet-impl/tip.ts
+++ b/src/wallet-impl/tip.ts
@@ -18,14 +18,15 @@
import { oneShotGet, oneShotPut, oneShotMutate, runWithWriteTransaction } from "../util/query";
import { InternalWalletState } from "./state";
import { parseTipUri } from "../util/taleruri";
-import { TipStatus, getTimestampNow } from "../walletTypes";
+import { TipStatus, getTimestampNow, OperationError } from "../walletTypes";
import { TipPickupGetResponse, TipPlanchetDetail, TipResponse } from "../talerTypes";
import * as Amounts from "../util/amounts";
-import { Stores, PlanchetRecord, WithdrawalSessionRecord } from "../dbTypes";
+import { Stores, PlanchetRecord, WithdrawalSessionRecord, initRetryInfo, updateRetryInfoTimeout } from "../dbTypes";
import { getWithdrawDetailsForAmount, getVerifiedWithdrawDenomList, processWithdrawSession } from "./withdraw";
import { getTalerStampSec } from "../util/helpers";
import { updateExchangeFromUrl } from "./exchanges";
import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
+import { guardOperationException } from "./errors";
export async function getTipStatus(
@@ -74,12 +75,14 @@ export async function getTipStatus(
pickedUp: false,
planchets: undefined,
response: undefined,
- timestamp: getTimestampNow(),
+ createdTimestamp: getTimestampNow(),
merchantTipId: res.merchantTipId,
totalFees: Amounts.add(
withdrawDetails.overhead,
withdrawDetails.withdrawFee,
).amount,
+ retryInfo: initRetryInfo(),
+ lastError: undefined,
};
await oneShotPut(ws.db, Stores.tips, tipRecord);
}
@@ -101,9 +104,37 @@ export async function getTipStatus(
return tipStatus;
}
+async function incrementTipRetry(
+ ws: InternalWalletState,
+ refreshSessionId: string,
+ err: OperationError | undefined,
+): Promise<void> {
+ await runWithWriteTransaction(ws.db, [Stores.tips], async tx => {
+ const t = await tx.get(Stores.tips, refreshSessionId);
+ if (!t) {
+ return;
+ }
+ if (!t.retryInfo) {
+ return;
+ }
+ t.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(t.retryInfo);
+ t.lastError = err;
+ await tx.put(Stores.tips, t);
+ });
+}
+
export async function processTip(
ws: InternalWalletState,
tipId: string,
+): Promise<void> {
+ const onOpErr = (e: OperationError) => incrementTipRetry(ws, tipId, e);
+ await guardOperationException(() => processTipImpl(ws, tipId), onOpErr);
+}
+
+async function processTipImpl(
+ ws: InternalWalletState,
+ tipId: string,
) {
let tipRecord = await oneShotGet(ws.db, Stores.tips, tipId);
if (!tipRecord) {
@@ -205,6 +236,10 @@ export async function processTip(
rawWithdrawalAmount: tipRecord.amount,
withdrawn: planchets.map((x) => false),
totalCoinValue: Amounts.sum(planchets.map((p) => p.coinValue)).amount,
+ lastCoinErrors: planchets.map((x) => undefined),
+ retryInfo: initRetryInfo(),
+ finishTimestamp: undefined,
+ lastError: undefined,
};
@@ -217,6 +252,7 @@ export async function processTip(
return;
}
tr.pickedUp = true;
+ tr.retryInfo = initRetryInfo(false);
await tx.put(Stores.tips, tr);
await tx.put(Stores.withdrawalSession, withdrawalSession);
@@ -224,8 +260,6 @@ export async function processTip(
await processWithdrawSession(ws, withdrawalSessionId);
- ws.notifier.notify();
- ws.badge.showNotification();
return;
}
diff --git a/src/wallet-impl/withdraw.ts b/src/wallet-impl/withdraw.ts
index d02ae14aa..7b7d0f640 100644
--- a/src/wallet-impl/withdraw.ts
+++ b/src/wallet-impl/withdraw.ts
@@ -22,6 +22,8 @@ import {
CoinStatus,
CoinRecord,
PlanchetRecord,
+ initRetryInfo,
+ updateRetryInfoTimeout,
} from "../dbTypes";
import * as Amounts from "../util/amounts";
import {
@@ -30,6 +32,8 @@ import {
DownloadedWithdrawInfo,
ReserveCreationInfo,
WithdrawDetails,
+ OperationError,
+ NotificationType,
} from "../walletTypes";
import { WithdrawOperationStatusResponse } from "../talerTypes";
import { InternalWalletState } from "./state";
@@ -51,6 +55,7 @@ import { createReserve, processReserveBankStatus } from "./reserves";
import { WALLET_PROTOCOL_VERSION } from "../wallet";
import * as LibtoolVersion from "../util/libtoolVersion";
+import { guardOperationException } from "./errors";
const logger = new Logger("withdraw.ts");
@@ -143,12 +148,9 @@ export async function acceptWithdrawal(
senderWire: withdrawInfo.senderWire,
exchangeWire: exchangeWire,
});
- ws.badge.showNotification();
- ws.notifier.notify();
// We do this here, as the reserve should be registered before we return,
// so that we can redirect the user to the bank's status page.
await processReserveBankStatus(ws, reserve.reservePub);
- ws.notifier.notify();
console.log("acceptWithdrawal: returning");
return {
reservePub: reserve.reservePub,
@@ -234,6 +236,12 @@ async function processPlanchet(
planchet.denomPub,
);
+
+ const isValid = await ws.cryptoApi.rsaVerify(planchet.coinPub, denomSig, planchet.denomPub);
+ if (!isValid) {
+ throw Error("invalid RSA signature by the exchange");
+ }
+
const coin: CoinRecord = {
blindingKey: planchet.blindingKey,
coinPriv: planchet.coinPriv,
@@ -249,6 +257,9 @@ async function processPlanchet(
withdrawSessionId: withdrawalSessionId,
};
+ let withdrawSessionFinished = false;
+ let reserveDepleted = false;
+
await runWithWriteTransaction(
ws.db,
[Stores.coins, Stores.withdrawalSession, Stores.reserves],
@@ -262,6 +273,18 @@ async function processPlanchet(
return;
}
ws.withdrawn[coinIdx] = true;
+ ws.lastCoinErrors[coinIdx] = undefined;
+ let numDone = 0;
+ for (let i = 0; i < ws.withdrawn.length; i++) {
+ if (ws.withdrawn[i]) {
+ numDone++;
+ }
+ }
+ if (numDone === ws.denoms.length) {
+ ws.finishTimestamp = getTimestampNow();
+ ws.retryInfo = initRetryInfo(false);
+ withdrawSessionFinished = true;
+ }
await tx.put(Stores.withdrawalSession, ws);
if (!planchet.isFromTip) {
const r = await tx.get(Stores.reserves, planchet.reservePub);
@@ -270,14 +293,29 @@ async function processPlanchet(
r.withdrawCompletedAmount,
Amounts.add(denom.value, denom.feeWithdraw).amount,
).amount;
+ if (Amounts.cmp(r.withdrawCompletedAmount, r.withdrawAllocatedAmount) == 0) {
+ reserveDepleted = true;
+ }
await tx.put(Stores.reserves, r);
}
}
await tx.add(Stores.coins, coin);
},
);
- ws.notifier.notify();
- logger.trace(`withdraw of one coin ${coin.coinPub} finished`);
+
+ if (withdrawSessionFinished) {
+ ws.notify({
+ type: NotificationType.WithdrawSessionFinished,
+ withdrawSessionId: withdrawalSessionId,
+ });
+ }
+
+ if (reserveDepleted && withdrawalSession.source.type === "reserve") {
+ ws.notify({
+ type: NotificationType.ReserveDepleted,
+ reservePub: withdrawalSession.source.reservePub,
+ });
+ }
}
/**
@@ -437,28 +475,51 @@ async function processWithdrawCoin(
}
if (!withdrawalSession.planchets[coinIndex]) {
- logger.trace("creating planchet for coin", coinIndex);
const key = `${withdrawalSessionId}-${coinIndex}`;
- const p = ws.memoMakePlanchet.find(key);
- if (p) {
- await p;
- } else {
- ws.memoMakePlanchet.put(
- key,
- makePlanchet(ws, withdrawalSessionId, coinIndex),
- );
- }
- await makePlanchet(ws, withdrawalSessionId, coinIndex);
- logger.trace("done creating planchet for coin", coinIndex);
+ await ws.memoMakePlanchet.memo(key, async () => {
+ logger.trace("creating planchet for coin", coinIndex);
+ return makePlanchet(ws, withdrawalSessionId, coinIndex);
+ });
}
await processPlanchet(ws, withdrawalSessionId, coinIndex);
- logger.trace("starting withdraw for coin", coinIndex);
+}
+
+async function incrementWithdrawalRetry(
+ ws: InternalWalletState,
+ withdrawalSessionId: string,
+ err: OperationError | undefined,
+): Promise<void> {
+ await runWithWriteTransaction(ws.db, [Stores.withdrawalSession], async tx => {
+ const wsr = await tx.get(Stores.withdrawalSession, withdrawalSessionId);
+ if (!wsr) {
+ return;
+ }
+ if (!wsr.retryInfo) {
+ return;
+ }
+ wsr.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(wsr.retryInfo);
+ wsr.lastError = err;
+ await tx.put(Stores.withdrawalSession, wsr);
+ });
}
export async function processWithdrawSession(
ws: InternalWalletState,
withdrawalSessionId: string,
): Promise<void> {
+ const onOpErr = (e: OperationError) =>
+ incrementWithdrawalRetry(ws, withdrawalSessionId, e);
+ await guardOperationException(
+ () => processWithdrawSessionImpl(ws, withdrawalSessionId),
+ onOpErr,
+ );
+}
+
+export async function processWithdrawSessionImpl(
+ ws: InternalWalletState,
+ withdrawalSessionId: string,
+): Promise<void> {
logger.trace("processing withdraw session", withdrawalSessionId);
const withdrawalSession = await oneShotGet(
ws.db,
@@ -474,7 +535,6 @@ export async function processWithdrawSession(
processWithdrawCoin(ws, withdrawalSessionId, i),
);
await Promise.all(ps);
- ws.badge.showNotification();
return;
}