diff options
-rw-r--r-- | src/net.cpp | 98 | ||||
-rw-r--r-- | src/net.h | 6 | ||||
-rw-r--r-- | src/net_processing.cpp | 158 | ||||
-rw-r--r-- | src/policy/fees.cpp | 2 | ||||
-rw-r--r-- | src/test/net_tests.cpp | 13 | ||||
-rw-r--r-- | src/util.cpp | 13 | ||||
-rw-r--r-- | src/wallet/wallet.cpp | 8 | ||||
-rwxr-xr-x | test/functional/import-abort-rescan.py | 66 | ||||
-rwxr-xr-x | test/functional/test_runner.py | 1 |
9 files changed, 214 insertions, 151 deletions
diff --git a/src/net.cpp b/src/net.cpp index 27389d6e0c..50b192e2ca 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -708,7 +708,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete handled = msg.readData(pch, nBytes); if (handled < 0) - return false; + return false; if (msg.in_data && msg.hdr.nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) { LogPrint(BCLog::NET, "Oversized message from peer=%i, disconnecting\n", GetId()); @@ -786,7 +786,7 @@ int CNetMessage::readHeader(const char *pch, unsigned int nBytes) // reject messages larger than MAX_SIZE if (hdr.nMessageSize > MAX_SIZE) - return -1; + return -1; // switch state to reading message data in_data = true; @@ -1299,59 +1299,55 @@ void CConnman::ThreadSocketHandler() } if (recvSet || errorSet) { + // typical socket buffer is 8K-64K + char pchBuf[0x10000]; + int nBytes = 0; { - { - // typical socket buffer is 8K-64K - char pchBuf[0x10000]; - int nBytes = 0; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; - nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); - } - if (nBytes > 0) - { - bool notify = false; - if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) - pnode->CloseSocketDisconnect(); - RecordBytesRecv(nBytes); - if (notify) { - size_t nSizeAdded = 0; - auto it(pnode->vRecvMsg.begin()); - for (; it != pnode->vRecvMsg.end(); ++it) { - if (!it->complete()) - break; - nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; - } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; - } - WakeMessageHandler(); - } - } - else if (nBytes == 0) - { - // socket closed gracefully - if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "socket closed\n"); - } - pnode->CloseSocketDisconnect(); + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); + } + if (nBytes > 0) + { + bool notify = false; + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) + pnode->CloseSocketDisconnect(); + RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } - else if (nBytes < 0) { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - if (!pnode->fDisconnect) - LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); - pnode->CloseSocketDisconnect(); - } + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } + WakeMessageHandler(); + } + } + else if (nBytes == 0) + { + // socket closed gracefully + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "socket closed\n"); + } + pnode->CloseSocketDisconnect(); + } + else if (nBytes < 0) + { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + if (!pnode->fDisconnect) + LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); + pnode->CloseSocketDisconnect(); } } } @@ -699,15 +699,15 @@ private: public: NodeId GetId() const { - return id; + return id; } uint64_t GetLocalNonce() const { - return nLocalHostNonce; + return nLocalHostNonce; } int GetMyStartingHeight() const { - return nMyStartingHeight; + return nMyStartingHeight; } int GetRefCount() diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 2e7b99baa4..718a7de031 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2681,100 +2681,100 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, const std::atomic<bool>& i // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return true; - // Don't bother if send buffer is too full to respond anyway - if (pfrom->fPauseSend) - return false; + // Don't bother if send buffer is too full to respond anyway + if (pfrom->fPauseSend) + return false; - std::list<CNetMessage> msgs; - { - LOCK(pfrom->cs_vProcessMsg); - if (pfrom->vProcessMsg.empty()) - return false; - // Just take one message - msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); - pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; - pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); - fMoreWork = !pfrom->vProcessMsg.empty(); - } - CNetMessage& msg(msgs.front()); - - msg.SetVersion(pfrom->GetRecvVersion()); - // Scan for message start - if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { - LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); - pfrom->fDisconnect = true; + std::list<CNetMessage> msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) return false; - } + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); + + msg.SetVersion(pfrom->GetRecvVersion()); + // Scan for message start + if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { + LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); + pfrom->fDisconnect = true; + return false; + } - // Read header - CMessageHeader& hdr = msg.hdr; - if (!hdr.IsValid(chainparams.MessageStart())) - { - LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); - return fMoreWork; - } - std::string strCommand = hdr.GetCommand(); + // Read header + CMessageHeader& hdr = msg.hdr; + if (!hdr.IsValid(chainparams.MessageStart())) + { + LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); + return fMoreWork; + } + std::string strCommand = hdr.GetCommand(); + + // Message size + unsigned int nMessageSize = hdr.nMessageSize; - // Message size - unsigned int nMessageSize = hdr.nMessageSize; + // Checksum + CDataStream& vRecv = msg.vRecv; + const uint256& hash = msg.GetMessageHash(); + if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0) + { + LogPrintf("%s(%s, %u bytes): CHECKSUM ERROR expected %s was %s\n", __func__, + SanitizeString(strCommand), nMessageSize, + HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), + HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); + return fMoreWork; + } - // Checksum - CDataStream& vRecv = msg.vRecv; - const uint256& hash = msg.GetMessageHash(); - if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0) + // Process message + bool fRet = false; + try + { + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); + if (interruptMsgProc) + return false; + if (!pfrom->vRecvGetData.empty()) + fMoreWork = true; + } + catch (const std::ios_base::failure& e) + { + connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, std::string("error parsing message"))); + if (strstr(e.what(), "end of data")) { - LogPrintf("%s(%s, %u bytes): CHECKSUM ERROR expected %s was %s\n", __func__, - SanitizeString(strCommand), nMessageSize, - HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), - HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - return fMoreWork; + // Allow exceptions from under-length message on vRecv + LogPrintf("%s(%s, %u bytes): Exception '%s' caught, normally caused by a message being shorter than its stated length\n", __func__, SanitizeString(strCommand), nMessageSize, e.what()); } - - // Process message - bool fRet = false; - try + else if (strstr(e.what(), "size too large")) { - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); - if (interruptMsgProc) - return false; - if (!pfrom->vRecvGetData.empty()) - fMoreWork = true; + // Allow exceptions from over-long size + LogPrintf("%s(%s, %u bytes): Exception '%s' caught\n", __func__, SanitizeString(strCommand), nMessageSize, e.what()); } - catch (const std::ios_base::failure& e) + else if (strstr(e.what(), "non-canonical ReadCompactSize()")) { - connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, std::string("error parsing message"))); - if (strstr(e.what(), "end of data")) - { - // Allow exceptions from under-length message on vRecv - LogPrintf("%s(%s, %u bytes): Exception '%s' caught, normally caused by a message being shorter than its stated length\n", __func__, SanitizeString(strCommand), nMessageSize, e.what()); - } - else if (strstr(e.what(), "size too large")) - { - // Allow exceptions from over-long size - LogPrintf("%s(%s, %u bytes): Exception '%s' caught\n", __func__, SanitizeString(strCommand), nMessageSize, e.what()); - } - else if (strstr(e.what(), "non-canonical ReadCompactSize()")) - { - // Allow exceptions from non-canonical encoding - LogPrintf("%s(%s, %u bytes): Exception '%s' caught\n", __func__, SanitizeString(strCommand), nMessageSize, e.what()); - } - else - { - PrintExceptionContinue(&e, "ProcessMessages()"); - } + // Allow exceptions from non-canonical encoding + LogPrintf("%s(%s, %u bytes): Exception '%s' caught\n", __func__, SanitizeString(strCommand), nMessageSize, e.what()); } - catch (const std::exception& e) { + else + { PrintExceptionContinue(&e, "ProcessMessages()"); - } catch (...) { - PrintExceptionContinue(NULL, "ProcessMessages()"); } + } + catch (const std::exception& e) { + PrintExceptionContinue(&e, "ProcessMessages()"); + } catch (...) { + PrintExceptionContinue(NULL, "ProcessMessages()"); + } - if (!fRet) { - LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); - } + if (!fRet) { + LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); + } - LOCK(cs_main); - SendRejectsAndCheckIfBanned(pfrom, connman); + LOCK(cs_main); + SendRejectsAndCheckIfBanned(pfrom, connman); return fMoreWork; } diff --git a/src/policy/fees.cpp b/src/policy/fees.cpp index f3f7f8378e..bd169f875a 100644 --- a/src/policy/fees.cpp +++ b/src/policy/fees.cpp @@ -586,7 +586,7 @@ bool CBlockPolicyEstimator::Write(CAutoFile& fileout) const feeStats->Write(fileout); } catch (const std::exception&) { - LogPrintf("CBlockPolicyEstimator::Write(): unable to read policy estimator data (non-fatal)\n"); + LogPrintf("CBlockPolicyEstimator::Write(): unable to write policy estimator data (non-fatal)\n"); return false; } return true; diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp index b9ed4952bb..0c7f3e5e23 100644 --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -11,6 +11,7 @@ #include "net.h" #include "netbase.h" #include "chainparams.h" +#include "util.h" class CAddrManSerializationMock : public CAddrMan { @@ -72,6 +73,18 @@ CDataStream AddrmanToStream(CAddrManSerializationMock& _addrman) BOOST_FIXTURE_TEST_SUITE(net_tests, BasicTestingSetup) +BOOST_AUTO_TEST_CASE(cnode_listen_port) +{ + // test default + unsigned short port = GetListenPort(); + BOOST_CHECK(port == Params().GetDefaultPort()); + // test set port + unsigned short altPort = 12345; + SoftSetArg("-port", std::to_string(altPort)); + port = GetListenPort(); + BOOST_CHECK(port == altPort); +} + BOOST_AUTO_TEST_CASE(caddrdb_read) { CAddrManUncorrupted addrmanUncorrupted; diff --git a/src/util.cpp b/src/util.cpp index 0dc203cba5..cf10ee4aa5 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -88,19 +88,6 @@ #include <openssl/rand.h> #include <openssl/conf.h> -// Work around clang compilation problem in Boost 1.46: -// /usr/include/boost/program_options/detail/config_file.hpp:163:17: error: call to function 'to_internal' that is neither visible in the template definition nor found by argument-dependent lookup -// See also: http://stackoverflow.com/questions/10020179/compilation-fail-in-boost-librairies-program-options -// http://clang.debian.net/status.php?version=3.0&key=CANNOT_FIND_FUNCTION -namespace boost { - - namespace program_options { - std::string to_internal(const std::string&); - } - -} // namespace boost - - const char * const BITCOIN_CONF_FILENAME = "bitcoin.conf"; const char * const BITCOIN_PID_FILENAME = "bitcoind.pid"; diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 211be76c45..b63171c4b6 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1526,6 +1526,10 @@ CBlockIndex* CWallet::ScanForWalletTransactions(CBlockIndex* pindexStart, bool f { if (pindex->nHeight % 100 == 0 && dProgressTip - dProgressStart > 0.0) ShowProgress(_("Rescanning..."), std::max(1, std::min(99, (int)((GuessVerificationProgress(chainParams.TxData(), pindex) - dProgressStart) / (dProgressTip - dProgressStart) * 100)))); + if (GetTime() >= nNow + 60) { + nNow = GetTime(); + LogPrintf("Still rescanning. At block %d. Progress=%f\n", pindex->nHeight, GuessVerificationProgress(chainParams.TxData(), pindex)); + } CBlock block; if (ReadBlockFromDisk(block, pindex, Params().GetConsensus())) { @@ -1539,10 +1543,6 @@ CBlockIndex* CWallet::ScanForWalletTransactions(CBlockIndex* pindexStart, bool f ret = nullptr; } pindex = chainActive.Next(pindex); - if (GetTime() >= nNow + 60) { - nNow = GetTime(); - LogPrintf("Still rescanning. At block %d. Progress=%f\n", pindex->nHeight, GuessVerificationProgress(chainParams.TxData(), pindex)); - } } if (pindex && fAbortRescan) { LogPrintf("Rescan aborted at block %d. Progress=%f\n", pindex->nHeight, GuessVerificationProgress(chainParams.TxData(), pindex)); diff --git a/test/functional/import-abort-rescan.py b/test/functional/import-abort-rescan.py new file mode 100755 index 0000000000..ffe45bbb1d --- /dev/null +++ b/test/functional/import-abort-rescan.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# Copyright (c) 2017 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test wallet import RPCs. + +Test rescan behavior of importprivkey when aborted. The test ensures that: +1. The abortrescan command indeed stops the rescan process. +2. Subsequent rescan catches the aborted address UTXO +""" + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import (assert_equal, get_rpc_proxy) +from decimal import Decimal +import threading # for bg importprivkey +import time # for sleep + +class ImportAbortRescanTest(BitcoinTestFramework): + def __init__(self): + super().__init__() + self.setup_clean_chain = True + + def run_test(self): + # Generate for BTC + assert_equal(self.nodes[0].getbalance(), 0) + assert_equal(self.nodes[1].getbalance(), 0) + self.nodes[0].generate(300) + assert_equal(self.nodes[1].getbalance(), 0) + # Make blocks with spam to cause rescan delay + for i in range(5): + for j in range(5): + self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1) + self.nodes[0].generate(10) + addr = self.nodes[0].getnewaddress() + privkey = self.nodes[0].dumpprivkey(addr) + self.nodes[0].sendtoaddress(addr, 0.123) + self.nodes[0].generate(10) # mature tx + self.sync_all() + + # Import this address in the background ... + node1ref = get_rpc_proxy(self.nodes[1].url, 1, timeout=600) + importthread = threading.Thread(target=node1ref.importprivkey, args=[privkey]) + importthread.start() + # ... then abort rescan; try a bunch until abortres becomes true, + # because we will start checking before above thread starts processing + for i in range(2000): + time.sleep(0.001) + abortres = self.nodes[1].abortrescan() + if abortres: break + assert abortres # if false, we failed to abort + # import should die soon + for i in range(10): + time.sleep(0.1) + deadres = not importthread.isAlive() + if deadres: break + + assert deadres # if false, importthread did not die soon enough + assert_equal(self.nodes[1].getbalance(), 0.0) + + # Import a different address and let it run + self.nodes[1].importprivkey(self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress())) + # Expect original privkey to now also be discovered and added to balance + assert_equal(self.nodes[1].getbalance(), Decimal("0.123")) + +if __name__ == "__main__": + ImportAbortRescanTest().main() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 2932f82970..0996b1bc20 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -109,6 +109,7 @@ BASE_SCRIPTS= [ 'rpcnamedargs.py', 'listsinceblock.py', 'p2p-leaktests.py', + 'import-abort-rescan.py', ] EXTENDED_SCRIPTS = [ |