aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xqa/rpc-tests/p2p-compactblocks.py26
-rwxr-xr-xqa/rpc-tests/preciousblock.py2
-rw-r--r--src/net.cpp115
-rw-r--r--src/net.h38
-rw-r--r--src/net_processing.cpp280
-rw-r--r--src/net_processing.h2
-rw-r--r--src/validation.cpp38
-rw-r--r--src/validation.h2
-rw-r--r--src/validationinterface.cpp3
-rw-r--r--src/validationinterface.h6
10 files changed, 327 insertions, 185 deletions
diff --git a/qa/rpc-tests/p2p-compactblocks.py b/qa/rpc-tests/p2p-compactblocks.py
index fc1f16c6d2..9151ecf5de 100755
--- a/qa/rpc-tests/p2p-compactblocks.py
+++ b/qa/rpc-tests/p2p-compactblocks.py
@@ -310,6 +310,9 @@ class CompactBlocksTest(BitcoinTestFramework):
tip = int(node.getbestblockhash(), 16)
assert(test_node.wait_for_block_announcement(tip))
+ # Make sure we will receive a fast-announce compact block
+ self.request_cb_announcements(test_node, node, version)
+
# Now mine a block, and look at the resulting compact block.
test_node.clear_block_announcement()
block_hash = int(node.generate(1)[0], 16)
@@ -319,27 +322,36 @@ class CompactBlocksTest(BitcoinTestFramework):
[tx.calc_sha256() for tx in block.vtx]
block.rehash()
- # Don't care which type of announcement came back for this test; just
- # request the compact block if we didn't get one yet.
+ # Wait until the block was announced (via compact blocks)
wait_until(test_node.received_block_announcement, timeout=30)
assert(test_node.received_block_announcement())
+ # Now fetch and check the compact block
+ header_and_shortids = None
+ with mininode_lock:
+ assert(test_node.last_cmpctblock is not None)
+ # Convert the on-the-wire representation to absolute indexes
+ header_and_shortids = HeaderAndShortIDs(test_node.last_cmpctblock.header_and_shortids)
+ self.check_compactblock_construction_from_block(version, header_and_shortids, block_hash, block)
+
+ # Now fetch the compact block using a normal non-announce getdata
with mininode_lock:
- if test_node.last_cmpctblock is None:
- test_node.clear_block_announcement()
- inv = CInv(4, block_hash) # 4 == "CompactBlock"
- test_node.send_message(msg_getdata([inv]))
+ test_node.clear_block_announcement()
+ inv = CInv(4, block_hash) # 4 == "CompactBlock"
+ test_node.send_message(msg_getdata([inv]))
wait_until(test_node.received_block_announcement, timeout=30)
assert(test_node.received_block_announcement())
- # Now we should have the compactblock
+ # Now fetch and check the compact block
header_and_shortids = None
with mininode_lock:
assert(test_node.last_cmpctblock is not None)
# Convert the on-the-wire representation to absolute indexes
header_and_shortids = HeaderAndShortIDs(test_node.last_cmpctblock.header_and_shortids)
+ self.check_compactblock_construction_from_block(version, header_and_shortids, block_hash, block)
+ def check_compactblock_construction_from_block(self, version, header_and_shortids, block_hash, block):
# Check that we got the right block!
header_and_shortids.header.calc_sha256()
assert_equal(header_and_shortids.header.sha256, block_hash)
diff --git a/qa/rpc-tests/preciousblock.py b/qa/rpc-tests/preciousblock.py
index f43160e19a..a806294648 100755
--- a/qa/rpc-tests/preciousblock.py
+++ b/qa/rpc-tests/preciousblock.py
@@ -102,7 +102,7 @@ class PreciousTest(BitcoinTestFramework):
assert_equal(self.nodes[2].getblockcount(), 6)
hashL = self.nodes[2].getbestblockhash()
print("Connect nodes and check no reorg occurs")
- node_sync_via_rpc(self.nodes[0:3])
+ node_sync_via_rpc(self.nodes[1:3])
connect_nodes_bi(self.nodes,1,2)
connect_nodes_bi(self.nodes,0,2)
assert_equal(self.nodes[0].getbestblockhash(), hashH)
diff --git a/src/net.cpp b/src/net.cpp
index 37e7dfed4c..b275bdd809 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect()
LogPrint("net", "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}
-
- // in case this fails, we'll empty the recv buffer when the CNode is deleted
- TRY_LOCK(cs_vRecvMsg, lockRecv);
- if (lockRecv)
- vRecvMsg.clear();
}
void CConnman::ClearBanned()
@@ -650,16 +645,18 @@ void CNode::copyStats(CNodeStats &stats)
}
#undef X
-// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
+ int64_t nTimeMicros = GetTimeMicros();
+ nLastRecv = nTimeMicros / 1000000;
+ nRecvBytes += nBytes;
while (nBytes > 0) {
// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
- vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
+ vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
CNetMessage& msg = vRecvMsg.back();
@@ -691,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
- msg.nTime = GetTimeMicros();
+ msg.nTime = nTimeMicros;
complete = true;
}
}
@@ -764,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
-size_t SocketSendData(CNode *pnode)
+size_t CConnman::SocketSendData(CNode *pnode)
{
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
@@ -781,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
+ pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -1052,8 +1050,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
- if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
+ if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -1083,13 +1080,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv)
fDelete = true;
- }
}
}
if (fDelete)
@@ -1149,15 +1142,10 @@ void CConnman::ThreadSocketHandler()
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is no (complete) message in the receive buffer,
- // or there is space left in the buffer, select() for receiving data.
- // * (if neither of the above applies, there is certainly one message
- // in the receiver buffer ready to be processed).
- // Together, that means that at least one of the following is always possible,
- // so we don't deadlock:
- // * We send some data.
- // * We wait for data to be received (and disconnect after timeout).
- // * We process a message in the buffer (message handler thread).
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
@@ -1168,10 +1156,7 @@ void CConnman::ThreadSocketHandler()
}
}
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv && (
- pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
- pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
+ if (!pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
@@ -1230,8 +1215,6 @@ void CConnman::ThreadSocketHandler()
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
{
{
// typical socket buffer is 8K-64K
@@ -1242,11 +1225,23 @@ void CConnman::ThreadSocketHandler()
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
- if(notify)
- condMsgProc.notify_one();
- pnode->nLastRecv = GetTime();
- pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
+ if (notify) {
+ size_t nSizeAdded = 0;
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
+ }
+ {
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
+ }
+ WakeMessageHandler();
+ }
}
else if (nBytes == 0)
{
@@ -1280,8 +1275,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
size_t nBytes = SocketSendData(pnode);
- if (nBytes)
+ if (nBytes) {
RecordBytesSent(nBytes);
+ }
}
}
@@ -1321,8 +1317,14 @@ void CConnman::ThreadSocketHandler()
}
}
-
-
+void CConnman::WakeMessageHandler()
+{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = true;
+ }
+ condMsgProc.notify_one();
+}
@@ -1858,7 +1860,7 @@ void CConnman::ThreadMessageHandler()
}
}
- bool fSleep = true;
+ bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
@@ -1866,22 +1868,8 @@ void CConnman::ThreadMessageHandler()
continue;
// Receive messages
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
- pnode->CloseSocketDisconnect();
-
- if (pnode->nSendSize < GetSendBufferSize())
- {
- if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
- {
- fSleep = false;
- }
- }
- }
- }
+ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
@@ -1901,10 +1889,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}
- if (fSleep) {
- std::unique_lock<std::mutex> lock(mutexMsgProc);
- condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ if (!fMoreWork) {
+ condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
+ fMsgProcWake = false;
}
}
@@ -2121,7 +2110,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nMaxFeeler = connOptions.nMaxFeeler;
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
- nReceiveFloodSize = connOptions.nSendBufferMaxSize;
+ nReceiveFloodSize = connOptions.nReceiveFloodSize;
nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@@ -2182,6 +2171,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;
+ {
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = false;
+ }
+
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@@ -2613,6 +2607,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
+ fPauseRecv = false;
+ fPauseSend = false;
+ nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
@@ -2692,6 +2689,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;
+ if (pnode->nSendSize > nSendBufferMaxSize)
+ pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));
diff --git a/src/net.h b/src/net.h
index 97b27dcdf4..2baf82702c 100644
--- a/src/net.h
+++ b/src/net.h
@@ -327,6 +327,7 @@ public:
/** Get a unique deterministic randomizer. */
CSipHasher GetDeterministicRandomizer(uint64_t id);
+ unsigned int GetReceiveFloodSize() const;
private:
struct ListenSocket {
SOCKET socket;
@@ -343,6 +344,8 @@ private:
void ThreadSocketHandler();
void ThreadDNSAddressSeed();
+ void WakeMessageHandler();
+
uint64_t CalculateKeyedNetGroup(const CAddress& ad);
CNode* FindNode(const CNetAddr& ip);
@@ -358,6 +361,7 @@ private:
NodeId GetNewNodeId();
+ size_t SocketSendData(CNode *pnode);
//!check is the banlist has unwritten changes
bool BannedSetIsDirty();
//!set the "dirty" flag for the banlist
@@ -368,8 +372,6 @@ private:
void DumpData();
void DumpBanlist();
- unsigned int GetReceiveFloodSize() const;
-
// Network stats
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);
@@ -428,6 +430,9 @@ private:
/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1;
+ /** flag for waking the message processor. */
+ bool fMsgProcWake;
+
std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;
@@ -445,7 +450,6 @@ void Discover(boost::thread_group& threadGroup);
void MapPort(bool fUseUPnP);
unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
-size_t SocketSendData(CNode *pnode);
struct CombinerAll
{
@@ -610,11 +614,13 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;
+ CCriticalSection cs_vProcessMsg;
+ std::list<CNetMessage> vProcessMsg;
+ size_t nProcessQueueSize;
+
std::deque<CInv> vRecvGetData;
- std::deque<CNetMessage> vRecvMsg;
- CCriticalSection cs_vRecvMsg;
uint64_t nRecvBytes;
- int nRecvVersion;
+ std::atomic<int> nRecvVersion;
int64_t nLastSend;
int64_t nLastRecv;
@@ -650,6 +656,8 @@ public:
const NodeId id;
const uint64_t nKeyedNetGroup;
+ std::atomic_bool fPauseRecv;
+ std::atomic_bool fPauseSend;
protected:
mapMsgCmdSize mapSendBytesPerMsgCmd;
@@ -723,6 +731,7 @@ private:
const ServiceFlags nLocalServices;
const int nMyStartingHeight;
int nSendVersion;
+ std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
public:
NodeId GetId() const {
@@ -743,24 +752,15 @@ public:
return nRefCount;
}
- // requires LOCK(cs_vRecvMsg)
- unsigned int GetTotalRecvSize()
- {
- unsigned int total = 0;
- BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
- total += msg.vRecv.size() + 24;
- return total;
- }
-
- // requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
- // requires LOCK(cs_vRecvMsg)
void SetRecvVersion(int nVersionIn)
{
nRecvVersion = nVersionIn;
- BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
- msg.SetVersion(nVersionIn);
+ }
+ int GetRecvVersion()
+ {
+ return nRecvVersion;
}
void SetSendVersion(int nVersionIn)
{
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 72c403a57e..59c1d1bf8f 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -101,7 +101,7 @@ namespace {
/** Blocks that are in flight, and that are in the queue to be downloaded. Protected by cs_main. */
struct QueuedBlock {
uint256 hash;
- CBlockIndex* pindex; //!< Optional.
+ const CBlockIndex* pindex; //!< Optional.
bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request.
std::unique_ptr<PartiallyDownloadedBlock> partialBlock; //!< Optional, used for CMPCTBLOCK downloads
};
@@ -156,13 +156,13 @@ struct CNodeState {
//! List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
//! The best known block we know this peer has announced.
- CBlockIndex *pindexBestKnownBlock;
+ const CBlockIndex *pindexBestKnownBlock;
//! The hash of the last unknown block this peer has announced.
uint256 hashLastUnknownBlock;
//! The last full block we both have.
- CBlockIndex *pindexLastCommonBlock;
+ const CBlockIndex *pindexLastCommonBlock;
//! The best header we have sent our peer.
- CBlockIndex *pindexBestHeaderSent;
+ const CBlockIndex *pindexBestHeaderSent;
//! Length of current-streak of unconnecting headers announcements
int nUnconnectingHeaders;
//! Whether we've started headers synchronization with this peer.
@@ -331,7 +331,7 @@ bool MarkBlockAsReceived(const uint256& hash) {
// Requires cs_main.
// returns false, still setting pit, if the block was already in flight from the same peer
// pit will only be valid as long as the same cs_main lock is being held
-bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL, list<QueuedBlock>::iterator **pit = NULL) {
+bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, const CBlockIndex *pindex = NULL, list<QueuedBlock>::iterator **pit = NULL) {
CNodeState *state = State(nodeid);
assert(state != NULL);
@@ -432,7 +432,7 @@ bool CanDirectFetch(const Consensus::Params &consensusParams)
}
// Requires cs_main
-bool PeerHasHeader(CNodeState *state, CBlockIndex *pindex)
+bool PeerHasHeader(CNodeState *state, const CBlockIndex *pindex)
{
if (state->pindexBestKnownBlock && pindex == state->pindexBestKnownBlock->GetAncestor(pindex->nHeight))
return true;
@@ -443,7 +443,7 @@ bool PeerHasHeader(CNodeState *state, CBlockIndex *pindex)
/** Find the last common ancestor two blocks have.
* Both pa and pb must be non-NULL. */
-CBlockIndex* LastCommonAncestor(CBlockIndex* pa, CBlockIndex* pb) {
+const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb) {
if (pa->nHeight > pb->nHeight) {
pa = pa->GetAncestor(pb->nHeight);
} else if (pb->nHeight > pa->nHeight) {
@@ -462,7 +462,7 @@ CBlockIndex* LastCommonAncestor(CBlockIndex* pa, CBlockIndex* pb) {
/** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has
* at most count entries. */
-void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) {
+void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) {
if (count == 0)
return;
@@ -490,8 +490,8 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBl
if (state->pindexLastCommonBlock == state->pindexBestKnownBlock)
return;
- std::vector<CBlockIndex*> vToFetch;
- CBlockIndex *pindexWalk = state->pindexLastCommonBlock;
+ std::vector<const CBlockIndex*> vToFetch;
+ const CBlockIndex *pindexWalk = state->pindexLastCommonBlock;
// Never fetch further than the best block we know the peer has, or more than BLOCK_DOWNLOAD_WINDOW + 1 beyond the last
// linked block we have in common with this peer. The +1 is so we can detect stalling, namely if we would be able to
// download that next block if the window were 1 larger.
@@ -514,7 +514,7 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBl
// are not yet downloaded and not in flight to vBlocks. In the mean time, update
// pindexLastCommonBlock as long as all ancestors are already downloaded, or if it's
// already part of our chain (and therefore don't need it even if pruned).
- BOOST_FOREACH(CBlockIndex* pindex, vToFetch) {
+ BOOST_FOREACH(const CBlockIndex* pindex, vToFetch) {
if (!pindex->IsValid(BLOCK_VALID_TREE)) {
// We consider the chain that this peer is on invalid.
return;
@@ -752,6 +752,51 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn
}
}
+static CCriticalSection cs_most_recent_block;
+static std::shared_ptr<const CBlock> most_recent_block;
+static std::shared_ptr<const CBlockHeaderAndShortTxIDs> most_recent_compact_block;
+static uint256 most_recent_block_hash;
+
+void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) {
+ std::shared_ptr<const CBlockHeaderAndShortTxIDs> pcmpctblock = std::make_shared<const CBlockHeaderAndShortTxIDs> (*pblock, true);
+ CNetMsgMaker msgMaker(PROTOCOL_VERSION);
+
+ LOCK(cs_main);
+
+ static int nHighestFastAnnounce = 0;
+ if (pindex->nHeight <= nHighestFastAnnounce)
+ return;
+ nHighestFastAnnounce = pindex->nHeight;
+
+ bool fWitnessEnabled = IsWitnessEnabled(pindex->pprev, Params().GetConsensus());
+ uint256 hashBlock(pblock->GetHash());
+
+ {
+ LOCK(cs_most_recent_block);
+ most_recent_block_hash = hashBlock;
+ most_recent_block = pblock;
+ most_recent_compact_block = pcmpctblock;
+ }
+
+ connman->ForEachNode([this, &pcmpctblock, pindex, &msgMaker, fWitnessEnabled, &hashBlock](CNode* pnode) {
+ // TODO: Avoid the repeated-serialization here
+ if (pnode->nVersion < INVALID_CB_NO_BAN_VERSION || pnode->fDisconnect)
+ return;
+ ProcessBlockAvailability(pnode->GetId());
+ CNodeState &state = *State(pnode->GetId());
+ // If the peer has, or we announced to them the previous block already,
+ // but we don't think they have this one, go ahead and announce it
+ if (state.fPreferHeaderAndIDs && (!fWitnessEnabled || state.fWantsCmpctWitness) &&
+ !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->pprev)) {
+
+ LogPrint("net", "%s sending header-and-ids %s to peer %d\n", "PeerLogicValidation::NewPoWValidBlock",
+ hashBlock.ToString(), pnode->id);
+ connman->PushMessage(pnode, msgMaker.Make(NetMsgType::CMPCTBLOCK, *pcmpctblock));
+ state.pindexBestHeaderSent = pindex;
+ }
+ });
+}
+
void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
const int nNewHeight = pindexNew->nHeight;
connman->SetBestHeight(nNewHeight);
@@ -889,14 +934,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
- unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
vector<CInv> vNotFound;
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
LOCK(cs_main);
while (it != pfrom->vRecvGetData.end()) {
// Don't bother if send buffer is too full to respond anyway
- if (pfrom->nSendSize >= nMaxSendBufferSize)
+ if (pfrom->fPauseSend)
break;
const CInv &inv = *it;
@@ -912,6 +956,21 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
BlockMap::iterator mi = mapBlockIndex.find(inv.hash);
if (mi != mapBlockIndex.end())
{
+ if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) &&
+ mi->second->IsValid(BLOCK_VALID_TREE)) {
+ // If we have the block and all of its parents, but have not yet validated it,
+ // we might be in the middle of connecting it (ie in the unlock of cs_main
+ // before ActivateBestChain but after AcceptBlock).
+ // In this case, we need to run ActivateBestChain prior to checking the relay
+ // conditions below.
+ std::shared_ptr<const CBlock> a_recent_block;
+ {
+ LOCK(cs_most_recent_block);
+ a_recent_block = most_recent_block;
+ }
+ CValidationState dummy;
+ ActivateBestChain(dummy, Params(), a_recent_block);
+ }
if (chainActive.Contains(mi->second)) {
send = true;
} else {
@@ -1049,7 +1108,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}
-uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params& chainparams) {
+uint32_t GetFetchFlags(CNode* pfrom, const CBlockIndex* pprev, const Consensus::Params& chainparams) {
uint32_t nFetchFlags = 0;
if ((pfrom->GetLocalServices() & NODE_WITNESS) && State(pfrom->GetId())->fHaveWitness) {
nFetchFlags |= MSG_WITNESS_FLAG;
@@ -1057,10 +1116,25 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params
return nFetchFlags;
}
+inline void static SendBlockTransactions(const CBlock& block, const BlockTransactionsRequest& req, CNode* pfrom, CConnman& connman) {
+ BlockTransactions resp(req);
+ for (size_t i = 0; i < req.indexes.size(); i++) {
+ if (req.indexes[i] >= block.vtx.size()) {
+ LOCK(cs_main);
+ Misbehaving(pfrom->GetId(), 100);
+ LogPrintf("Peer %d sent us a getblocktxn with out-of-bounds tx indices", pfrom->id);
+ return;
+ }
+ resp.txn[i] = block.vtx[req.indexes[i]];
+ }
+ LOCK(cs_main);
+ CNetMsgMaker msgMaker(pfrom->GetSendVersion());
+ int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
+ connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
+}
+
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
- unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
-
LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id);
if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0)
{
@@ -1413,11 +1487,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Track requests for our stuff
GetMainSignals().Inventory(inv.hash);
-
- if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) {
- Misbehaving(pfrom->GetId(), 50);
- return error("send buffer size() = %u", pfrom->nSendSize);
- }
}
if (!vToFetch.empty())
@@ -1453,10 +1522,27 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
uint256 hashStop;
vRecv >> locator >> hashStop;
+ // We might have announced the currently-being-connected tip using a
+ // compact block, which resulted in the peer sending a getblocks
+ // request, which we would otherwise respond to without the new block.
+ // To avoid this situation we simply verify that we are on our best
+ // known chain now. This is super overkill, but we handle it better
+ // for getheaders requests, and there are no known nodes which support
+ // compact blocks but still use getblocks to request blocks.
+ {
+ std::shared_ptr<const CBlock> a_recent_block;
+ {
+ LOCK(cs_most_recent_block);
+ a_recent_block = most_recent_block;
+ }
+ CValidationState dummy;
+ ActivateBestChain(dummy, Params(), a_recent_block);
+ }
+
LOCK(cs_main);
// Find the last block the caller has in the main chain
- CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator);
+ const CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator);
// Send the rest of the chain
if (pindex)
@@ -1496,6 +1582,18 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
BlockTransactionsRequest req;
vRecv >> req;
+ std::shared_ptr<const CBlock> recent_block;
+ {
+ LOCK(cs_most_recent_block);
+ if (most_recent_block_hash == req.blockhash)
+ recent_block = most_recent_block;
+ // Unlock cs_most_recent_block to avoid cs_main lock inversion
+ }
+ if (recent_block) {
+ SendBlockTransactions(*recent_block, req, pfrom, connman);
+ return true;
+ }
+
LOCK(cs_main);
BlockMap::iterator it = mapBlockIndex.find(req.blockhash);
@@ -1525,17 +1623,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
bool ret = ReadBlockFromDisk(block, it->second, chainparams.GetConsensus());
assert(ret);
- BlockTransactions resp(req);
- for (size_t i = 0; i < req.indexes.size(); i++) {
- if (req.indexes[i] >= block.vtx.size()) {
- Misbehaving(pfrom->GetId(), 100);
- LogPrintf("Peer %d sent us a getblocktxn with out-of-bounds tx indices", pfrom->id);
- return true;
- }
- resp.txn[i] = block.vtx[req.indexes[i]];
- }
- int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
- connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
+ SendBlockTransactions(block, req, pfrom, connman);
}
@@ -1552,7 +1640,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
CNodeState *nodestate = State(pfrom->GetId());
- CBlockIndex* pindex = NULL;
+ const CBlockIndex* pindex = NULL;
if (locator.IsNull())
{
// If locator is null, return the hashStop block
@@ -1583,6 +1671,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// if our peer has chainActive.Tip() (and thus we are sending an empty
// headers message). In both cases it's safe to update
// pindexBestHeaderSent to be our tip.
+ //
+ // It is important that we simply reset the BestHeaderSent value here,
+ // and not max(BestHeaderSent, newHeaderSent). We might have announced
+ // the currently-being-connected tip using a compact block, which
+ // resulted in the peer sending a headers request, which we respond to
+ // without the new block. By resetting the BestHeaderSent, we ensure we
+ // will re-announce the new block via headers (or compact blocks again)
+ // in the SendMessages logic.
nodestate->pindexBestHeaderSent = pindex ? pindex : chainActive.Tip();
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders));
}
@@ -1778,7 +1874,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
}
- CBlockIndex *pindex = NULL;
+ const CBlockIndex *pindex = NULL;
CValidationState state;
if (!ProcessNewBlockHeaders({cmpctblock.header}, state, chainparams, &pindex)) {
int nDoS;
@@ -2054,7 +2150,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true;
}
- CBlockIndex *pindexLast = NULL;
+ const CBlockIndex *pindexLast = NULL;
{
LOCK(cs_main);
CNodeState *nodestate = State(pfrom->GetId());
@@ -2131,8 +2227,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// If this set of headers is valid and ends in a block with at least as
// much work as our tip, download as much as possible.
if (fCanDirectFetch && pindexLast->IsValid(BLOCK_VALID_TREE) && chainActive.Tip()->nChainWork <= pindexLast->nChainWork) {
- vector<CBlockIndex *> vToFetch;
- CBlockIndex *pindexWalk = pindexLast;
+ vector<const CBlockIndex *> vToFetch;
+ const CBlockIndex *pindexWalk = pindexLast;
// Calculate all the blocks we'd need to switch to pindexLast, up to a limit.
while (pindexWalk && !chainActive.Contains(pindexWalk) && vToFetch.size() <= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
if (!(pindexWalk->nStatus & BLOCK_HAVE_DATA) &&
@@ -2154,7 +2250,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
} else {
vector<CInv> vGetData;
// Download as much as possible, from earliest to latest.
- BOOST_REVERSE_FOREACH(CBlockIndex *pindex, vToFetch) {
+ BOOST_REVERSE_FOREACH(const CBlockIndex *pindex, vToFetch) {
if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
// Can't download any more from this peer
break;
@@ -2450,14 +2546,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true;
}
-// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
const CChainParams& chainparams = Params();
- unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
- //if (fDebug)
- // LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size());
-
//
// Message format
// (4) message start
@@ -2466,40 +2557,40 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
// (4) checksum
// (x) data
//
- bool fOk = true;
+ bool fMoreWork = false;
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
+ if (pfrom->fDisconnect)
+ return false;
+
// this maintains the order of responses
- if (!pfrom->vRecvGetData.empty()) return fOk;
+ if (!pfrom->vRecvGetData.empty()) return true;
- std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
- while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
- if (pfrom->nSendSize >= nMaxSendBufferSize)
- break;
-
- // get next message
- CNetMessage& msg = *it;
-
- //if (fDebug)
- // LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__,
- // msg.hdr.nMessageSize, msg.vRecv.size(),
- // msg.complete() ? "Y" : "N");
-
- // end, if an incomplete message is found
- if (!msg.complete())
- break;
-
- // at this point, any failure means we can delete the current message
- it++;
+ if (pfrom->fPauseSend)
+ return false;
+ std::list<CNetMessage> msgs;
+ {
+ LOCK(pfrom->cs_vProcessMsg);
+ if (pfrom->vProcessMsg.empty())
+ return false;
+ // Just take one message
+ msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
+ pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
+ pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
+ fMoreWork = !pfrom->vProcessMsg.empty();
+ }
+ CNetMessage& msg(msgs.front());
+
+ msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
- fOk = false;
- break;
+ pfrom->fDisconnect = true;
+ return false;
}
// Read header
@@ -2507,7 +2598,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!hdr.IsValid(chainparams.MessageStart()))
{
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
- continue;
+ return fMoreWork;
}
string strCommand = hdr.GetCommand();
@@ -2523,7 +2614,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
SanitizeString(strCommand), nMessageSize,
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
- continue;
+ return fMoreWork;
}
// Process message
@@ -2532,7 +2623,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
{
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
if (interruptMsgProc)
- return true;
+ return false;
+ if (!pfrom->vRecvGetData.empty())
+ fMoreWork = true;
}
catch (const std::ios_base::failure& e)
{
@@ -2566,14 +2659,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!fRet)
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
- break;
- }
-
- // In case the connection got shut down, its receive buffer was wiped
- if (!pfrom->fDisconnect)
- pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
-
- return fOk;
+ return fMoreWork;
}
class CompareInvMempoolOrder
@@ -2745,7 +2831,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
bool fRevertToInv = ((!state.fPreferHeaders &&
(!state.fPreferHeaderAndIDs || pto->vBlockHashesToAnnounce.size() > 1)) ||
pto->vBlockHashesToAnnounce.size() > MAX_BLOCKS_TO_ANNOUNCE);
- CBlockIndex *pBestIndex = NULL; // last header queued for delivery
+ const CBlockIndex *pBestIndex = NULL; // last header queued for delivery
ProcessBlockAvailability(pto->id); // ensure pindexBestKnownBlock is up-to-date
if (!fRevertToInv) {
@@ -2756,7 +2842,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
BOOST_FOREACH(const uint256 &hash, pto->vBlockHashesToAnnounce) {
BlockMap::iterator mi = mapBlockIndex.find(hash);
assert(mi != mapBlockIndex.end());
- CBlockIndex *pindex = mi->second;
+ const CBlockIndex *pindex = mi->second;
if (chainActive[pindex->nHeight] != pindex) {
// Bail out if we reorged away from this block
fRevertToInv = true;
@@ -2802,13 +2888,29 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
// probably means we're doing an initial-ish-sync or they're slow
LogPrint("net", "%s sending header-and-ids %s to peer %d\n", __func__,
vHeaders.front().GetHash().ToString(), pto->id);
- //TODO: Shouldn't need to reload block from disk, but requires refactor
- CBlock block;
- bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams);
- assert(ret);
- CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness);
+
int nSendFlags = state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
- connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
+
+ bool fGotBlockFromCache = false;
+ {
+ LOCK(cs_most_recent_block);
+ if (most_recent_block_hash == pBestIndex->GetBlockHash()) {
+ if (state.fWantsCmpctWitness)
+ connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *most_recent_compact_block));
+ else {
+ CBlockHeaderAndShortTxIDs cmpctblock(*most_recent_block, state.fWantsCmpctWitness);
+ connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
+ }
+ fGotBlockFromCache = true;
+ }
+ }
+ if (!fGotBlockFromCache) {
+ CBlock block;
+ bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams);
+ assert(ret);
+ CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness);
+ connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
+ }
state.pindexBestHeaderSent = pBestIndex;
} else if (state.fPreferHeaders) {
if (vHeaders.size() > 1) {
@@ -2833,7 +2935,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
const uint256 &hashToAnnounce = pto->vBlockHashesToAnnounce.back();
BlockMap::iterator mi = mapBlockIndex.find(hashToAnnounce);
assert(mi != mapBlockIndex.end());
- CBlockIndex *pindex = mi->second;
+ const CBlockIndex *pindex = mi->second;
// Warn if we're announcing a block that is not on the main chain.
// This should be very rare and could be optimized out.
@@ -3018,10 +3120,10 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
//
vector<CInv> vGetData;
if (!pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
- vector<CBlockIndex*> vToDownload;
+ vector<const CBlockIndex*> vToDownload;
NodeId staller = -1;
FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams);
- BOOST_FOREACH(CBlockIndex *pindex, vToDownload) {
+ BOOST_FOREACH(const CBlockIndex *pindex, vToDownload) {
uint32_t nFetchFlags = GetFetchFlags(pto, pindex->pprev, consensusParams);
vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash()));
MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), consensusParams, pindex);
diff --git a/src/net_processing.h b/src/net_processing.h
index 230d805bd4..eaa0305136 100644
--- a/src/net_processing.h
+++ b/src/net_processing.h
@@ -24,6 +24,7 @@ public:
virtual void SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int nPosInBlock);
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
virtual void BlockChecked(const CBlock& block, const CValidationState& state);
+ virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock);
};
struct CNodeStateStats {
@@ -46,6 +47,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
* @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
+ * @return True if there is more work to be done
*/
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
diff --git a/src/validation.cpp b/src/validation.cpp
index 625307b6a7..9013e82ce9 100644
--- a/src/validation.cpp
+++ b/src/validation.cpp
@@ -2414,6 +2414,11 @@ static void NotifyHeaderTip() {
* that is already loaded (to avoid loading it again from disk).
*/
bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams, std::shared_ptr<const CBlock> pblock) {
+ // Note that while we're often called here from ProcessNewBlock, this is
+ // far from a guarantee. Things in the P2P/RPC will often end up calling
+ // us in the middle of ProcessNewBlock - do not assume pblock is set
+ // sanely for performance or correctness!
+
CBlockIndex *pindexMostWork = NULL;
CBlockIndex *pindexNewTip = NULL;
do {
@@ -3057,14 +3062,18 @@ static bool AcceptBlockHeader(const CBlockHeader& block, CValidationState& state
}
// Exposed wrapper for AcceptBlockHeader
-bool ProcessNewBlockHeaders(const std::vector<CBlockHeader>& headers, CValidationState& state, const CChainParams& chainparams, CBlockIndex** ppindex)
+bool ProcessNewBlockHeaders(const std::vector<CBlockHeader>& headers, CValidationState& state, const CChainParams& chainparams, const CBlockIndex** ppindex)
{
{
LOCK(cs_main);
for (const CBlockHeader& header : headers) {
- if (!AcceptBlockHeader(header, state, chainparams, ppindex)) {
+ CBlockIndex *pindex = NULL; // Use a temp pindex instead of ppindex to avoid a const_cast
+ if (!AcceptBlockHeader(header, state, chainparams, &pindex)) {
return false;
}
+ if (ppindex) {
+ *ppindex = pindex;
+ }
}
}
NotifyHeaderTip();
@@ -3072,8 +3081,10 @@ bool ProcessNewBlockHeaders(const std::vector<CBlockHeader>& headers, CValidatio
}
/** Store block on disk. If dbp is non-NULL, the file is known to already reside on disk */
-static bool AcceptBlock(const CBlock& block, CValidationState& state, const CChainParams& chainparams, CBlockIndex** ppindex, bool fRequested, const CDiskBlockPos* dbp, bool* fNewBlock)
+static bool AcceptBlock(const std::shared_ptr<const CBlock>& pblock, CValidationState& state, const CChainParams& chainparams, CBlockIndex** ppindex, bool fRequested, const CDiskBlockPos* dbp, bool* fNewBlock)
{
+ const CBlock& block = *pblock;
+
if (fNewBlock) *fNewBlock = false;
AssertLockHeld(cs_main);
@@ -3119,6 +3130,11 @@ static bool AcceptBlock(const CBlock& block, CValidationState& state, const CCha
return error("%s: %s", __func__, FormatStateMessage(state));
}
+ // Header is valid/has work, merkle tree and segwit merkle tree are good...RELAY NOW
+ // (but if it does not build on our best tip, let the SendMessages loop relay it)
+ if (!IsInitialBlockDownload() && chainActive.Tip() == pindex->pprev)
+ GetMainSignals().NewPoWValidBlock(pindex, pblock);
+
int nHeight = pindex->nHeight;
// Write block to history file
@@ -3153,7 +3169,7 @@ bool ProcessNewBlock(const CChainParams& chainparams, const std::shared_ptr<cons
CBlockIndex *pindex = NULL;
if (fNewBlock) *fNewBlock = false;
CValidationState state;
- bool ret = AcceptBlock(*pblock, state, chainparams, &pindex, fForceProcessing, NULL, fNewBlock);
+ bool ret = AcceptBlock(pblock, state, chainparams, &pindex, fForceProcessing, NULL, fNewBlock);
CheckBlockIndex(chainparams.GetConsensus());
if (!ret) {
GetMainSignals().BlockChecked(*pblock, state);
@@ -3810,7 +3826,8 @@ bool LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, CDiskB
dbp->nPos = nBlockPos;
blkdat.SetLimit(nBlockPos + nSize);
blkdat.SetPos(nBlockPos);
- CBlock block;
+ std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
+ CBlock& block = *pblock;
blkdat >> block;
nRewind = blkdat.GetPos();
@@ -3828,7 +3845,7 @@ bool LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, CDiskB
if (mapBlockIndex.count(hash) == 0 || (mapBlockIndex[hash]->nStatus & BLOCK_HAVE_DATA) == 0) {
LOCK(cs_main);
CValidationState state;
- if (AcceptBlock(block, state, chainparams, NULL, true, dbp, NULL))
+ if (AcceptBlock(pblock, state, chainparams, NULL, true, dbp, NULL))
nLoaded++;
if (state.IsError())
break;
@@ -3855,16 +3872,17 @@ bool LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, CDiskB
std::pair<std::multimap<uint256, CDiskBlockPos>::iterator, std::multimap<uint256, CDiskBlockPos>::iterator> range = mapBlocksUnknownParent.equal_range(head);
while (range.first != range.second) {
std::multimap<uint256, CDiskBlockPos>::iterator it = range.first;
- if (ReadBlockFromDisk(block, it->second, chainparams.GetConsensus()))
+ std::shared_ptr<CBlock> pblockrecursive = std::make_shared<CBlock>();
+ if (ReadBlockFromDisk(*pblockrecursive, it->second, chainparams.GetConsensus()))
{
- LogPrint("reindex", "%s: Processing out of order child %s of %s\n", __func__, block.GetHash().ToString(),
+ LogPrint("reindex", "%s: Processing out of order child %s of %s\n", __func__, pblockrecursive->GetHash().ToString(),
head.ToString());
LOCK(cs_main);
CValidationState dummy;
- if (AcceptBlock(block, dummy, chainparams, NULL, true, &it->second, NULL))
+ if (AcceptBlock(pblockrecursive, dummy, chainparams, NULL, true, &it->second, NULL))
{
nLoaded++;
- queue.push_back(block.GetHash());
+ queue.push_back(pblockrecursive->GetHash());
}
}
range.first++;
diff --git a/src/validation.h b/src/validation.h
index 40e8907724..43a37e46e5 100644
--- a/src/validation.h
+++ b/src/validation.h
@@ -246,7 +246,7 @@ bool ProcessNewBlock(const CChainParams& chainparams, const std::shared_ptr<cons
* @param[in] chainparams The params for the chain we want to connect to
* @param[out] ppindex If set, the pointer will be set to point to the last new block index object for the given headers
*/
-bool ProcessNewBlockHeaders(const std::vector<CBlockHeader>& block, CValidationState& state, const CChainParams& chainparams, CBlockIndex** ppindex=NULL);
+bool ProcessNewBlockHeaders(const std::vector<CBlockHeader>& block, CValidationState& state, const CChainParams& chainparams, const CBlockIndex** ppindex=NULL);
/** Check whether enough disk space is available for an incoming block */
bool CheckDiskSpace(uint64_t nAdditionalBytes = 0);
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index 215c342dea..d4121a28bc 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -22,6 +22,7 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.BlockChecked.connect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2));
g_signals.ScriptForMining.connect(boost::bind(&CValidationInterface::GetScriptForMining, pwalletIn, _1));
g_signals.BlockFound.connect(boost::bind(&CValidationInterface::ResetRequestCount, pwalletIn, _1));
+ g_signals.NewPoWValidBlock.connect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
@@ -34,6 +35,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2, _3));
g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
+ g_signals.NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
void UnregisterAllValidationInterfaces() {
@@ -46,4 +48,5 @@ void UnregisterAllValidationInterfaces() {
g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.SyncTransaction.disconnect_all_slots();
g_signals.UpdatedBlockTip.disconnect_all_slots();
+ g_signals.NewPoWValidBlock.disconnect_all_slots();
}
diff --git a/src/validationinterface.h b/src/validationinterface.h
index 717026389c..594072719c 100644
--- a/src/validationinterface.h
+++ b/src/validationinterface.h
@@ -8,6 +8,7 @@
#include <boost/signals2/signal.hpp>
#include <boost/shared_ptr.hpp>
+#include <memory>
class CBlock;
class CBlockIndex;
@@ -40,6 +41,7 @@ protected:
virtual void BlockChecked(const CBlock&, const CValidationState&) {}
virtual void GetScriptForMining(boost::shared_ptr<CReserveScript>&) {};
virtual void ResetRequestCount(const uint256 &hash) {};
+ virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
friend void ::RegisterValidationInterface(CValidationInterface*);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();
@@ -66,6 +68,10 @@ struct CMainSignals {
boost::signals2::signal<void (boost::shared_ptr<CReserveScript>&)> ScriptForMining;
/** Notifies listeners that a block has been successfully mined */
boost::signals2::signal<void (const uint256 &)> BlockFound;
+ /**
+ * Notifies listeners that a block which builds directly on our current tip
+ * has been received and connected to the headers tree, though not validated yet */
+ boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
};
CMainSignals& GetMainSignals();