diff options
author | Cory Fields <cory-nospam-@coryfields.com> | 2017-07-06 13:40:09 -0400 |
---|---|---|
committer | MarcoFalke <falke.marco@gmail.com> | 2017-11-02 13:16:28 -0400 |
commit | dc897e53d8a6c29016a19c862ddd0b656ae8de3f (patch) | |
tree | 6ee77baaf72f7d5fa09cc05d717816736cd9a598 /src | |
parent | 8aee55af3d21fb79edf148e42c3bd10bf80f7ec8 (diff) |
net: use an interface class rather than signals for message processing
Drop boost signals in favor of a stateful class. This will allow the message
processing loop to actually move to net_processing in a future step.
Github-Pull: #10756
Rebased-From: 8ad663c1fa88d68843e45580deced56112343183
Diffstat (limited to 'src')
-rw-r--r-- | src/init.cpp | 5 | ||||
-rw-r--r-- | src/net.cpp | 20 | ||||
-rw-r--r-- | src/net.h | 25 | ||||
-rw-r--r-- | src/net_processing.cpp | 112 | ||||
-rw-r--r-- | src/net_processing.h | 36 | ||||
-rw-r--r-- | src/test/DoS_tests.cpp | 22 | ||||
-rw-r--r-- | src/test/test_bitcoin.cpp | 6 | ||||
-rw-r--r-- | src/test/test_bitcoin.h | 2 |
8 files changed, 103 insertions, 125 deletions
diff --git a/src/init.cpp b/src/init.cpp index c9bb827f82..62250664e3 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -194,11 +194,10 @@ void Shutdown() #endif MapPort(false); UnregisterValidationInterface(peerLogic.get()); - peerLogic.reset(); g_connman.reset(); + peerLogic.reset(); StopTorControl(); - UnregisterNodeSignals(GetNodeSignals()); if (fDumpMempoolLater && gArgs.GetArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) { DumpMempool(); } @@ -1277,7 +1276,6 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) peerLogic.reset(new PeerLogicValidation(&connman)); RegisterValidationInterface(peerLogic.get()); - RegisterNodeSignals(GetNodeSignals()); // sanitize comments per BIP-0014, format user agent and check total size std::vector<std::string> uacomments; @@ -1668,6 +1666,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) connOptions.nMaxFeeler = 1; connOptions.nBestHeight = chainActive.Height(); connOptions.uiInterface = &uiInterface; + connOptions.m_msgproc = peerLogic.get(); connOptions.nSendBufferMaxSize = 1000*gArgs.GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER); connOptions.nReceiveFloodSize = 1000*gArgs.GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER); diff --git a/src/net.cpp b/src/net.cpp index 5bb1a82598..63ac833405 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -89,10 +89,6 @@ std::string strSubVersion; limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ); -// Signals for message handling -static CNodeSignals g_signals; -CNodeSignals& GetNodeSignals() { return g_signals; } - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -1114,7 +1110,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; - GetNodeSignals().InitializeNode(pnode, this); + m_msgproc->InitializeNode(pnode, this); LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); @@ -1966,7 +1962,7 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai if (manual_connection) pnode->m_manual_connection = true; - GetNodeSignals().InitializeNode(pnode, this); + m_msgproc->InitializeNode(pnode, this); { LOCK(cs_vNodes); vNodes.push_back(pnode); @@ -1996,16 +1992,16 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, this, flagInterruptMsgProc); + bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, this, flagInterruptMsgProc); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; - // Send messages { LOCK(pnode->cs_sendProcessing); - GetNodeSignals().SendMessages(pnode, this, flagInterruptMsgProc); + m_msgproc->SendMessages(pnode, this, flagInterruptMsgProc); } + if (flagInterruptMsgProc) return; } @@ -2324,6 +2320,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) // // Start threads // + assert(m_msgproc); InterruptSocks5(false); interruptNet.reset(); flagInterruptMsgProc = false; @@ -2443,9 +2440,10 @@ void CConnman::DeleteNode(CNode* pnode) { assert(pnode); bool fUpdateConnectionTime = false; - GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime); - if(fUpdateConnectionTime) + m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime); + if(fUpdateConnectionTime) { addrman.Connected(pnode->addr); + } delete pnode; } @@ -33,7 +33,6 @@ #include <arpa/inet.h> #endif -#include <boost/signals2/signal.hpp> class CScheduler; class CNode; @@ -116,7 +115,7 @@ struct CSerializedNetMsg std::string command; }; - +class NetEventsInterface; class CConnman { public: @@ -138,6 +137,7 @@ public: int nMaxFeeler = 0; int nBestHeight = 0; CClientUIInterface* uiInterface = nullptr; + NetEventsInterface* m_msgproc = nullptr; unsigned int nSendBufferMaxSize = 0; unsigned int nReceiveFloodSize = 0; uint64_t nMaxOutboundTimeframe = 0; @@ -156,6 +156,7 @@ public: nMaxFeeler = connOptions.nMaxFeeler; nBestHeight = connOptions.nBestHeight; clientInterface = connOptions.uiInterface; + m_msgproc = connOptions.m_msgproc; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; @@ -396,6 +397,7 @@ private: int nMaxFeeler; std::atomic<int> nBestHeight; CClientUIInterface* clientInterface; + NetEventsInterface* m_msgproc; /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; @@ -436,19 +438,18 @@ struct CombinerAll } }; -// Signals for message handling -struct CNodeSignals +/** + * Interface for message handling + */ +class NetEventsInterface { - boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> ProcessMessages; - boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> SendMessages; - boost::signals2::signal<void (CNode*, CConnman*)> InitializeNode; - boost::signals2::signal<void (NodeId, bool&)> FinalizeNode; +public: + virtual bool ProcessMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0; + virtual bool SendMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0; + virtual void InitializeNode(CNode* pnode, CConnman* connman) = 0; + virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0; }; - -CNodeSignals& GetNodeSignals(); - - enum { LOCAL_NONE, // unknown diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e3d2a51e7e..22d7ee194b 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -123,11 +123,6 @@ namespace { std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration; } // namespace -////////////////////////////////////////////////////////////////////////////// -// -// Registration of network node signals. -// - namespace { struct CBlockReject { @@ -265,49 +260,6 @@ void PushNodeVersion(CNode *pnode, CConnman* connman, int64_t nTime) } } -void InitializeNode(CNode *pnode, CConnman* connman) { - CAddress addr = pnode->addr; - std::string addrName = pnode->GetAddrName(); - NodeId nodeid = pnode->GetId(); - { - LOCK(cs_main); - mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); - } - if(!pnode->fInbound) - PushNodeVersion(pnode, connman, GetTime()); -} - -void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { - fUpdateConnectionTime = false; - LOCK(cs_main); - CNodeState *state = State(nodeid); - - if (state->fSyncStarted) - nSyncStarted--; - - if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { - fUpdateConnectionTime = true; - } - - for (const QueuedBlock& entry : state->vBlocksInFlight) { - mapBlocksInFlight.erase(entry.hash); - } - EraseOrphansFor(nodeid); - nPreferredDownload -= state->fPreferredDownload; - nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); - assert(nPeersWithValidatedDownloads >= 0); - - mapNodeState.erase(nodeid); - - if (mapNodeState.empty()) { - // Do a consistency check after the last peer is removed. - assert(mapBlocksInFlight.empty()); - assert(nPreferredDownload == 0); - assert(nPeersWithValidatedDownloads == 0); - } - LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); -} - // Requires cs_main. // Returns a bool indicating whether we requested this block. // Also used if a block was /not/ received and timed out or started with another peer @@ -543,6 +495,50 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<con } // namespace +void PeerLogicValidation::InitializeNode(CNode *pnode, CConnman* connman) { + CAddress addr = pnode->addr; + std::string addrName = pnode->GetAddrName(); + NodeId nodeid = pnode->GetId(); + { + LOCK(cs_main); + mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); + } + if(!pnode->fInbound) + PushNodeVersion(pnode, connman, GetTime()); +} + +void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { + fUpdateConnectionTime = false; + LOCK(cs_main); + CNodeState *state = State(nodeid); + assert(state != nullptr); + + if (state->fSyncStarted) + nSyncStarted--; + + if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { + fUpdateConnectionTime = true; + } + + for (const QueuedBlock& entry : state->vBlocksInFlight) { + mapBlocksInFlight.erase(entry.hash); + } + EraseOrphansFor(nodeid); + nPreferredDownload -= state->fPreferredDownload; + nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); + assert(nPeersWithValidatedDownloads >= 0); + + mapNodeState.erase(nodeid); + + if (mapNodeState.empty()) { + // Do a consistency check after the last peer is removed. + assert(mapBlocksInFlight.empty()); + assert(nPreferredDownload == 0); + assert(nPeersWithValidatedDownloads == 0); + } + LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); +} + bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { LOCK(cs_main); CNodeState *state = State(nodeid); @@ -558,22 +554,6 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { return true; } -void RegisterNodeSignals(CNodeSignals& nodeSignals) -{ - nodeSignals.ProcessMessages.connect(&ProcessMessages); - nodeSignals.SendMessages.connect(&SendMessages); - nodeSignals.InitializeNode.connect(&InitializeNode); - nodeSignals.FinalizeNode.connect(&FinalizeNode); -} - -void UnregisterNodeSignals(CNodeSignals& nodeSignals) -{ - nodeSignals.ProcessMessages.disconnect(&ProcessMessages); - nodeSignals.SendMessages.disconnect(&SendMessages); - nodeSignals.InitializeNode.disconnect(&InitializeNode); - nodeSignals.FinalizeNode.disconnect(&FinalizeNode); -} - ////////////////////////////////////////////////////////////////////////////// // // mapOrphanTransactions @@ -2672,7 +2652,7 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman) return false; } -bool ProcessMessages(CNode* pfrom, CConnman* connman, const std::atomic<bool>& interruptMsgProc) +bool PeerLogicValidation::ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); // @@ -2809,7 +2789,7 @@ public: } }; -bool SendMessages(CNode* pto, CConnman* connman, const std::atomic<bool>& interruptMsgProc) +bool PeerLogicValidation::SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interruptMsgProc) { const Consensus::Params& consensusParams = Params().GetConsensus(); { diff --git a/src/net_processing.h b/src/net_processing.h index 84ec7a7662..d79b74fcb7 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -22,22 +22,32 @@ static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100; static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_BASE = 15 * 60 * 1000000; // 15 minutes static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER = 1000; // 1ms/header -/** Register with a network node to receive its signals */ -void RegisterNodeSignals(CNodeSignals& nodeSignals); -/** Unregister a network node */ -void UnregisterNodeSignals(CNodeSignals& nodeSignals); - -class PeerLogicValidation : public CValidationInterface { +class PeerLogicValidation : public CValidationInterface, public NetEventsInterface { private: CConnman* connman; public: - PeerLogicValidation(CConnman* connmanIn); + explicit PeerLogicValidation(CConnman* connman); void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void BlockChecked(const CBlock& block, const CValidationState& state) override; void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) override; + + + void InitializeNode(CNode* pnode, CConnman* connman) override; + void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override; + /** Process protocol messages received from a given node */ + bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interrupt) override; + /** + * Send queued protocol messages to be sent to a give node. + * + * @param[in] pto The node which we are sending messages to. + * @param[in] connman The connection manager for that node. + * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done + */ + bool SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interrupt) override; }; struct CNodeStateStats { @@ -52,16 +62,4 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); /** Increase a node's misbehavior score. */ void Misbehaving(NodeId nodeid, int howmuch); -/** Process protocol messages received from a given node */ -bool ProcessMessages(CNode* pfrom, CConnman* connman, const std::atomic<bool>& interrupt); -/** - * Send queued protocol messages to be sent to a give node. - * - * @param[in] pto The node which we are sending messages to. - * @param[in] connman The connection manager for that node. - * @param[in] interrupt Interrupt condition for processing threads - * @return True if there is more work to be done - */ -bool SendMessages(CNode* pto, CConnman* connman, const std::atomic<bool>& interrupt); - #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index 39a75f7c82..fc5ef84ad2 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -50,26 +50,26 @@ BOOST_AUTO_TEST_CASE(DoS_banning) CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, CAddress(), "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode1, connman); + peerLogic->InitializeNode(&dummyNode1, connman); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; Misbehaving(dummyNode1.GetId(), 100); // Should get banned - SendMessages(&dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned CAddress addr2(ip(0xa0b0c002), NODE_NONE); CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, CAddress(), "", true); dummyNode2.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode2, connman); + peerLogic->InitializeNode(&dummyNode2, connman); dummyNode2.nVersion = 1; dummyNode2.fSuccessfullyConnected = true; Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, connman, interruptDummy); + peerLogic->SendMessages(&dummyNode2, connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, connman, interruptDummy); + peerLogic->SendMessages(&dummyNode2, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr2)); } @@ -82,17 +82,17 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, CAddress(), "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode1, connman); + peerLogic->InitializeNode(&dummyNode1, connman); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; Misbehaving(dummyNode1.GetId(), 100); - SendMessages(&dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 10); - SendMessages(&dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 1); - SendMessages(&dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); gArgs.ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD)); } @@ -108,12 +108,12 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) CAddress addr(ip(0xa0b0c001), NODE_NONE); CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, CAddress(), "", true); dummyNode.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode, connman); + peerLogic->InitializeNode(&dummyNode, connman); dummyNode.nVersion = 1; dummyNode.fSuccessfullyConnected = true; Misbehaving(dummyNode.GetId(), 100); - SendMessages(&dummyNode, connman, interruptDummy); + peerLogic->SendMessages(&dummyNode, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr)); SetMockTime(nStartTime+60*60); diff --git a/src/test/test_bitcoin.cpp b/src/test/test_bitcoin.cpp index 194f62ca11..045655983c 100644 --- a/src/test/test_bitcoin.cpp +++ b/src/test/test_bitcoin.cpp @@ -48,7 +48,6 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName) BasicTestingSetup::~BasicTestingSetup() { ECC_Stop(); - g_connman.reset(); } TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(chainName) @@ -86,16 +85,17 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha threadGroup.create_thread(&ThreadScriptCheck); g_connman = std::unique_ptr<CConnman>(new CConnman(0x1337, 0x1337)); // Deterministic randomness for tests. connman = g_connman.get(); - RegisterNodeSignals(GetNodeSignals()); + peerLogic.reset(new PeerLogicValidation(connman)); } TestingSetup::~TestingSetup() { - UnregisterNodeSignals(GetNodeSignals()); threadGroup.interrupt_all(); threadGroup.join_all(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); + g_connman.reset(); + peerLogic.reset(); UnloadBlockIndex(); delete pcoinsTip; delete pcoinsdbview; diff --git a/src/test/test_bitcoin.h b/src/test/test_bitcoin.h index dd3b13c8c8..5a39f8ab82 100644 --- a/src/test/test_bitcoin.h +++ b/src/test/test_bitcoin.h @@ -49,12 +49,14 @@ struct BasicTestingSetup { * Included are data directory, coins database, script check threads setup. */ class CConnman; +class PeerLogicValidation; struct TestingSetup: public BasicTestingSetup { CCoinsViewDB *pcoinsdbview; fs::path pathTemp; boost::thread_group threadGroup; CConnman* connman; CScheduler scheduler; + std::unique_ptr<PeerLogicValidation> peerLogic; TestingSetup(const std::string& chainName = CBaseChainParams::MAIN); ~TestingSetup(); |