From ef0acf06bfb7820a21c4719dba0d659f600be3c7 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 2 Apr 2020 20:33:01 +0530 Subject: model reserve history in the exchange, improve reserve handling logic --- src/operations/reserves.ts | 235 ++++++++++++++++++++++++++------------------- 1 file changed, 135 insertions(+), 100 deletions(-) (limited to 'src/operations/reserves.ts') diff --git a/src/operations/reserves.ts b/src/operations/reserves.ts index 5cf189d3b..2ef902ef2 100644 --- a/src/operations/reserves.ts +++ b/src/operations/reserves.ts @@ -28,14 +28,17 @@ import { ReserveRecord, CurrencyRecord, Stores, - WithdrawalSessionRecord, + WithdrawalGroupRecord, initRetryInfo, updateRetryInfoTimeout, ReserveUpdatedEventRecord, + WalletReserveHistoryItemType, + DenominationRecord, + PlanchetRecord, + WithdrawalSourceType, } from "../types/dbTypes"; -import { TransactionAbort } from "../util/query"; import { Logger } from "../util/logging"; -import * as Amounts from "../util/amounts"; +import { Amounts } from "../util/amounts"; import { updateExchangeFromUrl, getExchangeTrust, @@ -50,7 +53,7 @@ import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto"; import { randomBytes } from "../crypto/primitives/nacl-fast"; import { getVerifiedWithdrawDenomList, - processWithdrawSession, + processWithdrawGroup, getBankWithdrawalInfo, } from "./withdraw"; import { @@ -61,6 +64,10 @@ import { import { NotificationType } from "../types/notifications"; import { codecForReserveStatus } from "../types/ReserveStatus"; import { getTimestampNow } from "../util/time"; +import { + reconcileReserveHistory, + summarizeReserveHistory, +} from "../util/reserveHistoryUtil"; const logger = new Logger("reserves.ts"); @@ -98,11 +105,7 @@ export async function createReserve( const reserveRecord: ReserveRecord = { timestampCreated: now, - amountWithdrawAllocated: Amounts.getZero(currency), - amountWithdrawCompleted: Amounts.getZero(currency), - amountWithdrawRemaining: Amounts.getZero(currency), exchangeBaseUrl: canonExchange, - amountInitiallyRequested: req.amount, reservePriv: keypair.priv, reservePub: keypair.pub, senderWire: req.senderWire, @@ -115,8 +118,14 @@ export async function createReserve( retryInfo: initRetryInfo(), lastError: undefined, reserveTransactions: [], + currency: req.amount.currency, }; + reserveRecord.reserveTransactions.push({ + type: WalletReserveHistoryItemType.Credit, + expectedAmount: req.amount, + }); + const senderWire = req.senderWire; if (senderWire) { const rec = { @@ -460,6 +469,7 @@ async function updateReserve( const respJson = await resp.json(); const reserveInfo = codecForReserveStatus().decode(respJson); const balance = Amounts.parseOrThrow(reserveInfo.balance); + const currency = balance.currency; await ws.db.runWithWriteTransaction( [Stores.reserves, Stores.reserveUpdatedEvents], async (tx) => { @@ -477,60 +487,41 @@ async function updateReserve( const reserveUpdateId = encodeCrock(getRandomBytes(32)); - // FIXME: check / compare history! - if (!r.lastSuccessfulStatusQuery) { - // FIXME: check if this matches initial expectations - r.amountWithdrawRemaining = balance; + const reconciled = reconcileReserveHistory( + r.reserveTransactions, + reserveInfo.history, + ); + + console.log("reconciled history:", JSON.stringify(reconciled, undefined, 2)); + + const summary = summarizeReserveHistory( + reconciled.updatedLocalHistory, + currency, + ); + console.log("summary", summary); + + if ( + reconciled.newAddedItems.length + reconciled.newMatchedItems.length != + 0 + ) { const reserveUpdate: ReserveUpdatedEventRecord = { reservePub: r.reservePub, timestamp: getTimestampNow(), - amountReserveBalance: Amounts.toString(balance), - amountExpected: Amounts.toString(reserve.amountInitiallyRequested), + amountReserveBalance: Amounts.stringify(balance), + amountExpected: Amounts.stringify(summary.awaitedReserveAmount), newHistoryTransactions, reserveUpdateId, }; await tx.put(Stores.reserveUpdatedEvents, reserveUpdate); r.reserveStatus = ReserveRecordStatus.WITHDRAWING; + r.retryInfo = initRetryInfo(); } else { - const expectedBalance = Amounts.add( - r.amountWithdrawRemaining, - Amounts.sub(r.amountWithdrawAllocated, r.amountWithdrawCompleted) - .amount, - ); - const cmp = Amounts.cmp(balance, expectedBalance.amount); - if (cmp == 0) { - // Nothing changed, go back to sleep! - r.reserveStatus = ReserveRecordStatus.DORMANT; - } else if (cmp > 0) { - const extra = Amounts.sub(balance, expectedBalance.amount).amount; - r.amountWithdrawRemaining = Amounts.add( - r.amountWithdrawRemaining, - extra, - ).amount; - r.reserveStatus = ReserveRecordStatus.WITHDRAWING; - } else { - // We're missing some money. - r.reserveStatus = ReserveRecordStatus.DORMANT; - } - if (r.reserveStatus !== ReserveRecordStatus.DORMANT) { - const reserveUpdate: ReserveUpdatedEventRecord = { - reservePub: r.reservePub, - timestamp: getTimestampNow(), - amountReserveBalance: Amounts.toString(balance), - amountExpected: Amounts.toString(expectedBalance.amount), - newHistoryTransactions, - reserveUpdateId, - }; - await tx.put(Stores.reserveUpdatedEvents, reserveUpdate); - } - } - r.lastSuccessfulStatusQuery = getTimestampNow(); - if (r.reserveStatus == ReserveRecordStatus.DORMANT) { + r.reserveStatus = ReserveRecordStatus.DORMANT; r.retryInfo = initRetryInfo(false); - } else { - r.retryInfo = initRetryInfo(); } - r.reserveTransactions = reserveInfo.history; + r.lastSuccessfulStatusQuery = getTimestampNow(); + r.reserveTransactions = reconciled.updatedLocalHistory; + r.lastError = undefined; await tx.put(Stores.reserves, r); }, ); @@ -607,6 +598,33 @@ export async function confirmReserve( }); } +async function makePlanchet( + ws: InternalWalletState, + reserve: ReserveRecord, + denom: DenominationRecord, +): Promise { + const r = await ws.cryptoApi.createPlanchet({ + denomPub: denom.denomPub, + feeWithdraw: denom.feeWithdraw, + reservePriv: reserve.reservePriv, + reservePub: reserve.reservePub, + value: denom.value, + }); + return { + blindingKey: r.blindingKey, + coinEv: r.coinEv, + coinPriv: r.coinPriv, + coinPub: r.coinPub, + coinValue: r.coinValue, + denomPub: r.denomPub, + denomPubHash: r.denomPubHash, + isFromTip: false, + reservePub: r.reservePub, + withdrawSig: r.withdrawSig, + coinEvHash: r.coinEvHash, + }; +} + /** * Withdraw coins from a reserve until it is empty. * @@ -626,7 +644,12 @@ async function depleteReserve( } logger.trace(`depleting reserve ${reservePub}`); - const withdrawAmount = reserve.amountWithdrawRemaining; + const summary = summarizeReserveHistory( + reserve.reserveTransactions, + reserve.currency, + ); + + const withdrawAmount = summary.unclaimedReserveAmount; logger.trace(`getting denom list`); @@ -637,36 +660,47 @@ async function depleteReserve( ); logger.trace(`got denom list`); if (denomsForWithdraw.length === 0) { - const m = `Unable to withdraw from reserve, no denominations are available to withdraw.`; - const opErr = { - type: "internal", - message: m, - details: {}, - }; - await incrementReserveRetry(ws, reserve.reservePub, opErr); - console.log(m); - throw new OperationFailedAndReportedError(opErr); + // Only complain about inability to withdraw if we + // didn't withdraw before. + if (Amounts.isZero(summary.withdrawnAmount)) { + const m = `Unable to withdraw from reserve, no denominations are available to withdraw.`; + const opErr = { + type: "internal", + message: m, + details: {}, + }; + await incrementReserveRetry(ws, reserve.reservePub, opErr); + console.log(m); + throw new OperationFailedAndReportedError(opErr); + } + return; } logger.trace("selected denominations"); - const withdrawalSessionId = encodeCrock(randomBytes(32)); + const withdrawalGroupId = encodeCrock(randomBytes(32)); const totalCoinValue = Amounts.sum(denomsForWithdraw.map((x) => x.value)) .amount; - const withdrawalRecord: WithdrawalSessionRecord = { - withdrawSessionId: withdrawalSessionId, + const planchets: PlanchetRecord[] = []; + for (const d of denomsForWithdraw) { + const p = await makePlanchet(ws, reserve, d); + planchets.push(p); + } + + const withdrawalRecord: WithdrawalGroupRecord = { + withdrawalGroupId: withdrawalGroupId, exchangeBaseUrl: reserve.exchangeBaseUrl, source: { - type: "reserve", + type: WithdrawalSourceType.Reserve, reservePub: reserve.reservePub, }, rawWithdrawalAmount: withdrawAmount, timestampStart: getTimestampNow(), denoms: denomsForWithdraw.map((x) => x.denomPub), withdrawn: denomsForWithdraw.map((x) => false), - planchets: denomsForWithdraw.map((x) => undefined), + planchets, totalCoinValue, retryInfo: initRetryInfo(), lastErrorPerCoin: {}, @@ -679,53 +713,54 @@ async function depleteReserve( const totalWithdrawAmount = Amounts.add(totalCoinValue, totalCoinWithdrawFee) .amount; - function mutateReserve(r: ReserveRecord): ReserveRecord { - const remaining = Amounts.sub( - r.amountWithdrawRemaining, - totalWithdrawAmount, - ); - if (remaining.saturated) { - console.error("can't create planchets, saturated"); - throw TransactionAbort; - } - const allocated = Amounts.add( - r.amountWithdrawAllocated, - totalWithdrawAmount, - ); - if (allocated.saturated) { - console.error("can't create planchets, saturated"); - throw TransactionAbort; - } - r.amountWithdrawRemaining = remaining.amount; - r.amountWithdrawAllocated = allocated.amount; - r.reserveStatus = ReserveRecordStatus.DORMANT; - r.retryInfo = initRetryInfo(false); - return r; - } - const success = await ws.db.runWithWriteTransaction( - [Stores.withdrawalSession, Stores.reserves], + [Stores.withdrawalGroups, Stores.reserves], async (tx) => { - const myReserve = await tx.get(Stores.reserves, reservePub); - if (!myReserve) { + const newReserve = await tx.get(Stores.reserves, reservePub); + if (!newReserve) { return false; } - if (myReserve.reserveStatus !== ReserveRecordStatus.WITHDRAWING) { + if (newReserve.reserveStatus !== ReserveRecordStatus.WITHDRAWING) { return false; } - await tx.mutate(Stores.reserves, reserve.reservePub, mutateReserve); - await tx.put(Stores.withdrawalSession, withdrawalRecord); + const newSummary = summarizeReserveHistory( + newReserve.reserveTransactions, + newReserve.currency, + ); + if ( + Amounts.cmp(newSummary.unclaimedReserveAmount, totalWithdrawAmount) < 0 + ) { + // Something must have happened concurrently! + logger.error( + "aborting withdrawal session, likely concurrent withdrawal happened", + ); + return false; + } + for (let i = 0; i < planchets.length; i++) { + const amt = Amounts.add( + denomsForWithdraw[i].value, + denomsForWithdraw[i].feeWithdraw, + ).amount; + newReserve.reserveTransactions.push({ + type: WalletReserveHistoryItemType.Withdraw, + expectedAmount: amt, + }); + } + newReserve.reserveStatus = ReserveRecordStatus.DORMANT; + newReserve.retryInfo = initRetryInfo(false); + await tx.put(Stores.reserves, newReserve); + await tx.put(Stores.withdrawalGroups, withdrawalRecord); return true; }, ); if (success) { - console.log("processing new withdraw session"); + console.log("processing new withdraw group"); ws.notify({ - type: NotificationType.WithdrawSessionCreated, - withdrawSessionId: withdrawalSessionId, + type: NotificationType.WithdrawGroupCreated, + withdrawalGroupId: withdrawalGroupId, }); - await processWithdrawSession(ws, withdrawalSessionId); + await processWithdrawGroup(ws, withdrawalGroupId); } else { console.trace("withdraw session already existed"); } -- cgit v1.2.3