aboutsummaryrefslogtreecommitdiff
path: root/src/net_processing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r--src/net_processing.cpp60
1 files changed, 40 insertions, 20 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 10aaa77fff..ccfbb77fcd 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -886,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
}
-void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman)
+void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
@@ -901,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
const CInv &inv = *it;
{
- boost::this_thread::interruption_point();
+ if (interruptMsgProc)
+ return;
+
it++;
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK)
@@ -1055,12 +1057,12 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params
return nFetchFlags;
}
-bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman)
+bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id);
- if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0)
+ if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0)
{
LogPrintf("dropmessagestest DROPPING RECV MESSAGE\n");
return true;
@@ -1295,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
int64_t nSince = nNow - 10 * 60;
BOOST_FOREACH(CAddress& addr, vAddr)
{
- boost::this_thread::interruption_point();
+ if (interruptMsgProc)
+ return true;
if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES)
continue;
@@ -1377,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
{
CInv &inv = vInv[nInv];
- boost::this_thread::interruption_point();
+ if (interruptMsgProc)
+ return true;
bool fAlreadyHave = AlreadyHave(inv);
LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id);
@@ -1439,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id);
pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
- ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
+ ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
}
@@ -1513,7 +1517,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
inv.hash = req.blockhash;
pfrom->vRecvGetData.push_back(inv);
- ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
+ ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
return true;
}
@@ -1785,11 +1789,24 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
}
+ // When we succeed in decoding a block's txids from a cmpctblock
+ // message we typically jump to the BLOCKTXN handling code, with a
+ // dummy (empty) BLOCKTXN message, to re-use the logic there in
+ // completing processing of the putative block (without cs_main).
+ bool fProcessBLOCKTXN = false;
+ CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION);
+
+ // If we end up treating this as a plain headers message, call that as well
+ // without cs_main.
+ bool fRevertToHeaderProcessing = false;
+ CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION);
+
// Keep a CBlock for "optimistic" compactblock reconstructions (see
// below)
std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
bool fBlockReconstructed = false;
+ {
LOCK(cs_main);
// If AcceptBlockHeader returned true, it set pindex
assert(pindex);
@@ -1871,9 +1888,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Dirty hack to jump to BLOCKTXN code (TODO: move message handling into their own functions)
BlockTransactions txn;
txn.blockhash = cmpctblock.header.GetHash();
- CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION);
blockTxnMsg << txn;
- return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman);
+ fProcessBLOCKTXN = true;
} else {
req.blockhash = pindex->GetBlockHash();
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req));
@@ -1909,11 +1925,17 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Dirty hack to process as if it were just a headers message (TODO: move message handling into their own functions)
std::vector<CBlock> headers;
headers.push_back(cmpctblock.header);
- CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION);
vHeadersMsg << headers;
- return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman);
+ fRevertToHeaderProcessing = true;
}
}
+ } // cs_main
+
+ if (fProcessBLOCKTXN)
+ return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc);
+
+ if (fRevertToHeaderProcessing)
+ return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc);
if (fBlockReconstructed) {
// If we got here, we were able to optimistically reconstruct a
@@ -2426,7 +2448,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
// requires LOCK(cs_vRecvMsg)
-bool ProcessMessages(CNode* pfrom, CConnman& connman)
+bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
const CChainParams& chainparams = Params();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
@@ -2444,7 +2466,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
bool fOk = true;
if (!pfrom->vRecvGetData.empty())
- ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
+ ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return fOk;
@@ -2505,8 +2527,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
bool fRet = false;
try
{
- fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman);
- boost::this_thread::interruption_point();
+ fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
+ if (interruptMsgProc)
+ return true;
}
catch (const std::ios_base::failure& e)
{
@@ -2531,9 +2554,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
PrintExceptionContinue(&e, "ProcessMessages()");
}
}
- catch (const boost::thread_interrupted&) {
- throw;
- }
catch (const std::exception& e) {
PrintExceptionContinue(&e, "ProcessMessages()");
} catch (...) {
@@ -2570,7 +2590,7 @@ public:
}
};
-bool SendMessages(CNode* pto, CConnman& connman)
+bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
const Consensus::Params& consensusParams = Params().GetConsensus();
{