aboutsummaryrefslogtreecommitdiff
path: root/src/wallet-impl/pending.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2019-12-05 19:38:19 +0100
committerFlorian Dold <florian.dold@gmail.com>2019-12-05 19:38:19 +0100
commitf67d7f54f9d0fed97446898942e3dfee67ee2985 (patch)
tree2b81738025e8f61250ede10908cbf81071e16975 /src/wallet-impl/pending.ts
parent829acdd3d98f1014747f15ecb619b6fbaa06b640 (diff)
downloadwallet-core-f67d7f54f9d0fed97446898942e3dfee67ee2985.tar.xz
threads, retries and notifications WIP
Diffstat (limited to 'src/wallet-impl/pending.ts')
-rw-r--r--src/wallet-impl/pending.ts561
1 files changed, 359 insertions, 202 deletions
diff --git a/src/wallet-impl/pending.ts b/src/wallet-impl/pending.ts
index 72102e3a1..bd10538af 100644
--- a/src/wallet-impl/pending.ts
+++ b/src/wallet-impl/pending.ts
@@ -21,8 +21,10 @@ import {
PendingOperationInfo,
PendingOperationsResponse,
getTimestampNow,
+ Timestamp,
+ Duration,
} from "../walletTypes";
-import { runWithReadTransaction } from "../util/query";
+import { runWithReadTransaction, TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state";
import {
Stores,
@@ -32,11 +34,355 @@ import {
ProposalStatus,
} from "../dbTypes";
+function updateRetryDelay(
+ oldDelay: Duration,
+ now: Timestamp,
+ retryTimestamp: Timestamp,
+): Duration {
+ if (retryTimestamp.t_ms <= now.t_ms) {
+ return { d_ms: 0 };
+ }
+ return { d_ms: Math.min(oldDelay.d_ms, retryTimestamp.t_ms - now.t_ms) };
+}
+
+async function gatherExchangePending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ if (onlyDue) {
+ // FIXME: exchanges should also be updated regularly
+ return;
+ }
+ await tx.iter(Stores.exchanges).forEach(e => {
+ switch (e.updateStatus) {
+ case ExchangeUpdateStatus.FINISHED:
+ if (e.lastError) {
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message:
+ "Exchange record is in FINISHED state but has lastError set",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ },
+ });
+ }
+ if (!e.details) {
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message:
+ "Exchange record does not have details, but no update in progress.",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ },
+ });
+ }
+ if (!e.wireInfo) {
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message:
+ "Exchange record does not have wire info, but no update in progress.",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ },
+ });
+ }
+ break;
+ case ExchangeUpdateStatus.FETCH_KEYS:
+ resp.pendingOperations.push({
+ type: "exchange-update",
+ givesLifeness: false,
+ stage: "fetch-keys",
+ exchangeBaseUrl: e.baseUrl,
+ lastError: e.lastError,
+ reason: e.updateReason || "unknown",
+ });
+ break;
+ case ExchangeUpdateStatus.FETCH_WIRE:
+ resp.pendingOperations.push({
+ type: "exchange-update",
+ givesLifeness: false,
+ stage: "fetch-wire",
+ exchangeBaseUrl: e.baseUrl,
+ lastError: e.lastError,
+ reason: e.updateReason || "unknown",
+ });
+ break;
+ default:
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message: "Unknown exchangeUpdateStatus",
+ details: {
+ exchangeBaseUrl: e.baseUrl,
+ exchangeUpdateStatus: e.updateStatus,
+ },
+ });
+ break;
+ }
+ });
+}
+
+async function gatherReservePending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ // FIXME: this should be optimized by using an index for "onlyDue==true".
+ await tx.iter(Stores.reserves).forEach(reserve => {
+ const reserveType = reserve.bankWithdrawStatusUrl ? "taler-bank" : "manual";
+ if (!reserve.retryInfo.active) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ reserve.retryInfo.nextRetry,
+ );
+ if (onlyDue && reserve.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ switch (reserve.reserveStatus) {
+ case ReserveRecordStatus.DORMANT:
+ // nothing to report as pending
+ break;
+ case ReserveRecordStatus.WITHDRAWING:
+ case ReserveRecordStatus.UNCONFIRMED:
+ case ReserveRecordStatus.QUERYING_STATUS:
+ case ReserveRecordStatus.REGISTERING_BANK:
+ resp.pendingOperations.push({
+ type: "reserve",
+ givesLifeness: true,
+ stage: reserve.reserveStatus,
+ timestampCreated: reserve.created,
+ reserveType,
+ reservePub: reserve.reservePub,
+ retryInfo: reserve.retryInfo,
+ });
+ break;
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ resp.pendingOperations.push({
+ type: "reserve",
+ givesLifeness: true,
+ stage: reserve.reserveStatus,
+ timestampCreated: reserve.created,
+ reserveType,
+ reservePub: reserve.reservePub,
+ bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl,
+ retryInfo: reserve.retryInfo,
+ });
+ break;
+ default:
+ resp.pendingOperations.push({
+ type: "bug",
+ givesLifeness: false,
+ message: "Unknown reserve record status",
+ details: {
+ reservePub: reserve.reservePub,
+ reserveStatus: reserve.reserveStatus,
+ },
+ });
+ break;
+ }
+ });
+}
+
+async function gatherRefreshPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.refresh).forEach(r => {
+ if (r.finishedTimestamp) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ r.retryInfo.nextRetry,
+ );
+ if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ let refreshStatus: string;
+ if (r.norevealIndex === undefined) {
+ refreshStatus = "melt";
+ } else {
+ refreshStatus = "reveal";
+ }
+
+ resp.pendingOperations.push({
+ type: "refresh",
+ givesLifeness: true,
+ oldCoinPub: r.meltCoinPub,
+ refreshStatus,
+ refreshOutputSize: r.newDenoms.length,
+ refreshSessionId: r.refreshSessionId,
+ });
+ });
+}
+
+async function gatherCoinsPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ // Refreshing dirty coins is always due.
+ await tx.iter(Stores.coins).forEach(coin => {
+ if (coin.status == CoinStatus.Dirty) {
+ resp.nextRetryDelay.d_ms = 0;
+ resp.pendingOperations.push({
+ givesLifeness: true,
+ type: "dirty-coin",
+ coinPub: coin.coinPub,
+ });
+ }
+ });
+}
+
+async function gatherWithdrawalPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.withdrawalSession).forEach(wsr => {
+ if (wsr.finishTimestamp) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ wsr.retryInfo.nextRetry,
+ );
+ if (onlyDue && wsr.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ const numCoinsWithdrawn = wsr.withdrawn.reduce((a, x) => a + (x ? 1 : 0), 0);
+ const numCoinsTotal = wsr.withdrawn.length;
+ resp.pendingOperations.push({
+ type: "withdraw",
+ givesLifeness: true,
+ numCoinsTotal,
+ numCoinsWithdrawn,
+ source: wsr.source,
+ withdrawSessionId: wsr.withdrawSessionId,
+ });
+ });
+}
+
+async function gatherProposalPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.proposals).forEach(proposal => {
+ if (proposal.proposalStatus == ProposalStatus.PROPOSED) {
+ if (onlyDue) {
+ return;
+ }
+ resp.pendingOperations.push({
+ type: "proposal-choice",
+ givesLifeness: false,
+ merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
+ proposalId: proposal.proposalId,
+ proposalTimestamp: proposal.timestamp,
+ });
+ } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) {
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ proposal.retryInfo.nextRetry,
+ );
+ if (onlyDue && proposal.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ resp.pendingOperations.push({
+ type: "proposal-download",
+ givesLifeness: true,
+ merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
+ proposalId: proposal.proposalId,
+ proposalTimestamp: proposal.timestamp,
+ });
+ }
+ });
+}
+
+async function gatherTipPending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.tips).forEach(tip => {
+ if (tip.pickedUp) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ tip.retryInfo.nextRetry,
+ );
+ if (onlyDue && tip.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ if (tip.accepted) {
+ resp.pendingOperations.push({
+ type: "tip",
+ givesLifeness: true,
+ merchantBaseUrl: tip.merchantBaseUrl,
+ tipId: tip.tipId,
+ merchantTipId: tip.merchantTipId,
+ });
+ }
+ });
+}
+
+async function gatherPurchasePending(
+ tx: TransactionHandle,
+ now: Timestamp,
+ resp: PendingOperationsResponse,
+ onlyDue: boolean = false,
+): Promise<void> {
+ await tx.iter(Stores.purchases).forEach((pr) => {
+ if (pr.finished) {
+ return;
+ }
+ resp.nextRetryDelay = updateRetryDelay(
+ resp.nextRetryDelay,
+ now,
+ pr.retryInfo.nextRetry,
+ );
+ if (onlyDue && pr.retryInfo.nextRetry.t_ms > now.t_ms) {
+ return;
+ }
+ resp.pendingOperations.push({
+ type: "pay",
+ givesLifeness: true,
+ isReplay: false,
+ proposalId: pr.proposalId,
+ });
+ });
+
+}
+
export async function getPendingOperations(
ws: InternalWalletState,
+ onlyDue: boolean = false,
): Promise<PendingOperationsResponse> {
- const pendingOperations: PendingOperationInfo[] = [];
- let minRetryDurationMs = 5000;
+ const resp: PendingOperationsResponse = {
+ nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER },
+ pendingOperations: [],
+ };
+ const now = getTimestampNow();
await runWithReadTransaction(
ws.db,
[
@@ -47,207 +393,18 @@ export async function getPendingOperations(
Stores.withdrawalSession,
Stores.proposals,
Stores.tips,
+ Stores.purchases,
],
async tx => {
- await tx.iter(Stores.exchanges).forEach(e => {
- switch (e.updateStatus) {
- case ExchangeUpdateStatus.FINISHED:
- if (e.lastError) {
- pendingOperations.push({
- type: "bug",
- message:
- "Exchange record is in FINISHED state but has lastError set",
- details: {
- exchangeBaseUrl: e.baseUrl,
- },
- });
- }
- if (!e.details) {
- pendingOperations.push({
- type: "bug",
- message:
- "Exchange record does not have details, but no update in progress.",
- details: {
- exchangeBaseUrl: e.baseUrl,
- },
- });
- }
- if (!e.wireInfo) {
- pendingOperations.push({
- type: "bug",
- message:
- "Exchange record does not have wire info, but no update in progress.",
- details: {
- exchangeBaseUrl: e.baseUrl,
- },
- });
- }
- break;
- case ExchangeUpdateStatus.FETCH_KEYS:
- pendingOperations.push({
- type: "exchange-update",
- stage: "fetch-keys",
- exchangeBaseUrl: e.baseUrl,
- lastError: e.lastError,
- reason: e.updateReason || "unknown",
- });
- break;
- case ExchangeUpdateStatus.FETCH_WIRE:
- pendingOperations.push({
- type: "exchange-update",
- stage: "fetch-wire",
- exchangeBaseUrl: e.baseUrl,
- lastError: e.lastError,
- reason: e.updateReason || "unknown",
- });
- break;
- default:
- pendingOperations.push({
- type: "bug",
- message: "Unknown exchangeUpdateStatus",
- details: {
- exchangeBaseUrl: e.baseUrl,
- exchangeUpdateStatus: e.updateStatus,
- },
- });
- break;
- }
- });
- await tx.iter(Stores.reserves).forEach(reserve => {
- const reserveType = reserve.bankWithdrawStatusUrl
- ? "taler-bank"
- : "manual";
- const now = getTimestampNow();
- switch (reserve.reserveStatus) {
- case ReserveRecordStatus.DORMANT:
- // nothing to report as pending
- break;
- case ReserveRecordStatus.WITHDRAWING:
- case ReserveRecordStatus.UNCONFIRMED:
- case ReserveRecordStatus.QUERYING_STATUS:
- case ReserveRecordStatus.REGISTERING_BANK:
- pendingOperations.push({
- type: "reserve",
- stage: reserve.reserveStatus,
- timestampCreated: reserve.created,
- reserveType,
- reservePub: reserve.reservePub,
- });
- if (reserve.created.t_ms < now.t_ms - 5000) {
- minRetryDurationMs = 500;
- } else if (reserve.created.t_ms < now.t_ms - 30000) {
- minRetryDurationMs = 2000;
- }
- break;
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- pendingOperations.push({
- type: "reserve",
- stage: reserve.reserveStatus,
- timestampCreated: reserve.created,
- reserveType,
- reservePub: reserve.reservePub,
- bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl,
- });
- if (reserve.created.t_ms < now.t_ms - 5000) {
- minRetryDurationMs = 500;
- } else if (reserve.created.t_ms < now.t_ms - 30000) {
- minRetryDurationMs = 2000;
- }
- break;
- default:
- pendingOperations.push({
- type: "bug",
- message: "Unknown reserve record status",
- details: {
- reservePub: reserve.reservePub,
- reserveStatus: reserve.reserveStatus,
- },
- });
- break;
- }
- });
-
- await tx.iter(Stores.refresh).forEach(r => {
- if (r.finished) {
- return;
- }
- let refreshStatus: string;
- if (r.norevealIndex === undefined) {
- refreshStatus = "melt";
- } else {
- refreshStatus = "reveal";
- }
-
- pendingOperations.push({
- type: "refresh",
- oldCoinPub: r.meltCoinPub,
- refreshStatus,
- refreshOutputSize: r.newDenoms.length,
- refreshSessionId: r.refreshSessionId,
- });
- });
-
- await tx.iter(Stores.coins).forEach(coin => {
- if (coin.status == CoinStatus.Dirty) {
- pendingOperations.push({
- type: "dirty-coin",
- coinPub: coin.coinPub,
- });
- }
- });
-
- await tx.iter(Stores.withdrawalSession).forEach(ws => {
- const numCoinsWithdrawn = ws.withdrawn.reduce(
- (a, x) => a + (x ? 1 : 0),
- 0,
- );
- const numCoinsTotal = ws.withdrawn.length;
- if (numCoinsWithdrawn < numCoinsTotal) {
- pendingOperations.push({
- type: "withdraw",
- numCoinsTotal,
- numCoinsWithdrawn,
- source: ws.source,
- withdrawSessionId: ws.withdrawSessionId,
- });
- }
- });
-
- await tx.iter(Stores.proposals).forEach((proposal) => {
- if (proposal.proposalStatus == ProposalStatus.PROPOSED) {
- pendingOperations.push({
- type: "proposal-choice",
- merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
- proposalId: proposal.proposalId,
- proposalTimestamp: proposal.timestamp,
- });
- } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) {
- pendingOperations.push({
- type: "proposal-download",
- merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
- proposalId: proposal.proposalId,
- proposalTimestamp: proposal.timestamp,
- });
- }
- });
-
- await tx.iter(Stores.tips).forEach((tip) => {
- if (tip.accepted && !tip.pickedUp) {
- pendingOperations.push({
- type: "tip",
- merchantBaseUrl: tip.merchantBaseUrl,
- tipId: tip.tipId,
- merchantTipId: tip.merchantTipId,
- });
- }
- });
+ await gatherExchangePending(tx, now, resp, onlyDue);
+ await gatherReservePending(tx, now, resp, onlyDue);
+ await gatherRefreshPending(tx, now, resp, onlyDue);
+ await gatherCoinsPending(tx, now, resp, onlyDue);
+ await gatherWithdrawalPending(tx, now, resp, onlyDue);
+ await gatherProposalPending(tx, now, resp, onlyDue);
+ await gatherTipPending(tx, now, resp, onlyDue);
+ await gatherPurchasePending(tx, now, resp, onlyDue);
},
);
-
- return {
- pendingOperations,
- nextRetryDelay: {
- d_ms: minRetryDurationMs,
- },
- };
+ return resp;
}