diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 422 |
1 files changed, 237 insertions, 185 deletions
diff --git a/src/net.cpp b/src/net.cpp index ce87150ae3..4d0d781d6d 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto -// Copyright (c) 2009-2015 The Bitcoin Core developers +// Copyright (c) 2009-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -35,8 +35,6 @@ #include <miniupnpc/upnperrors.h> #endif -#include <boost/filesystem.hpp> -#include <boost/thread.hpp> #include <math.h> @@ -74,7 +72,6 @@ bool fRelayTxes = true; CCriticalSection cs_mapLocalHost; std::map<CNetAddr, LocalServiceInfo> mapLocalHost; static bool vfLimited[NET_MAX] = {}; -static CNode* pnodeLocalHost = NULL; std::string strSubVersion; limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ); @@ -345,8 +342,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo CNode* pnode = FindNode((CService)addrConnect); if (pnode) { - pnode->AddRef(); - return pnode; + LogPrintf("Failed to open new connection, already connected\n"); + return NULL; } } @@ -372,18 +369,16 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo // In that case, drop the connection that was just created, and return the existing CNode instead. // Also store the name we used to connect in that CNode, so that future FindNode() calls to that // name catch this early. + LOCK(cs_vNodes); CNode* pnode = FindNode((CService)addrConnect); if (pnode) { - pnode->AddRef(); - { - LOCK(cs_vNodes); - if (pnode->addrName.empty()) { - pnode->addrName = std::string(pszDest); - } + if (pnode->addrName.empty()) { + pnode->addrName = std::string(pszDest); } CloseSocket(hSocket); - return pnode; + LogPrintf("Failed to open new connection, already connected\n"); + return NULL; } } @@ -394,13 +389,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false); pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices); - pnode->nTimeConnected = GetTime(); + pnode->nTimeConnected = GetSystemTimeInSeconds(); pnode->AddRef(); - GetNodeSignals().InitializeNode(pnode, *this); - { - LOCK(cs_vNodes); - vNodes.push_back(pnode); - } return pnode; } else if (!proxyConnectionFailed) { @@ -440,11 +430,6 @@ void CNode::CloseSocketDisconnect() LogPrint("net", "disconnecting peer=%d\n", id); CloseSocket(hSocket); } - - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); } void CConnman::ClearBanned() @@ -624,6 +609,7 @@ void CNode::copyStats(CNodeStats &stats) X(nVersion); X(cleanSubVer); X(fInbound); + X(fAddnode); X(nStartingHeight); X(nSendBytes); X(mapSendBytesPerMsgCmd); @@ -652,16 +638,18 @@ void CNode::copyStats(CNodeStats &stats) } #undef X -// requires LOCK(cs_vRecvMsg) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; + int64_t nTimeMicros = GetTimeMicros(); + nLastRecv = nTimeMicros / 1000000; + nRecvBytes += nBytes; while (nBytes > 0) { // get current incomplete message, or create a new one if (vRecvMsg.empty() || vRecvMsg.back().complete()) - vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion)); + vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION)); CNetMessage& msg = vRecvMsg.back(); @@ -693,7 +681,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete assert(i != mapRecvBytesPerMsgCmd.end()); i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; - msg.nTime = GetTimeMicros(); + msg.nTime = nTimeMicros; complete = true; } } @@ -701,6 +689,33 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete return true; } +void CNode::SetSendVersion(int nVersionIn) +{ + // Send version may only be changed in the version message, and + // only one version message is allowed per session. We can therefore + // treat this value as const and even atomic as long as it's only used + // once a version message has been successfully processed. Any attempt to + // set this twice is an error. + if (nSendVersion != 0) { + error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn); + } else { + nSendVersion = nVersionIn; + } +} + +int CNode::GetSendVersion() const +{ + // The send version should always be explicitly set to + // INIT_PROTO_VERSION rather than using this value until SetSendVersion + // has been called. + if (nSendVersion == 0) { + error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION); + return INIT_PROTO_VERSION; + } + return nSendVersion; +} + + int CNetMessage::readHeader(const char *pch, unsigned int nBytes) { // copy data to temporary parsing buffer @@ -766,7 +781,7 @@ const uint256& CNetMessage::GetMessageHash() const // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) const { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -776,13 +791,14 @@ size_t SocketSendData(CNode *pnode) assert(data.size() > pnode->nSendOffset); int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); if (nBytes > 0) { - pnode->nLastSend = GetTime(); + pnode->nLastSend = GetSystemTimeInSeconds(); pnode->nSendBytes += nBytes; pnode->nSendOffset += nBytes; nSentSize += nBytes; if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1043,7 +1059,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { void CConnman::ThreadSocketHandler() { unsigned int nPrevNodeCount = 0; - while (true) + while (!interruptNet) { // // Disconnect nodes @@ -1054,8 +1070,7 @@ void CConnman::ThreadSocketHandler() std::vector<CNode*> vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0)) + if (pnode->fDisconnect) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -1067,8 +1082,7 @@ void CConnman::ThreadSocketHandler() pnode->CloseSocketDisconnect(); // hold in disconnected pool until all refs are released - if (pnode->fNetworkNode || pnode->fInbound) - pnode->Release(); + pnode->Release(); vNodesDisconnected.push_back(pnode); } } @@ -1086,13 +1100,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { TRY_LOCK(pnode->cs_inventory, lockInv); if (lockInv) fDelete = true; - } } } if (fDelete) @@ -1103,8 +1113,13 @@ void CConnman::ThreadSocketHandler() } } } - if(vNodes.size() != nPrevNodeCount) { - nPrevNodeCount = vNodes.size(); + size_t vNodesSize; + { + LOCK(cs_vNodes); + vNodesSize = vNodes.size(); + } + if(vNodesSize != nPrevNodeCount) { + nPrevNodeCount = vNodesSize; if(clientInterface) clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount); } @@ -1147,29 +1162,19 @@ void CConnman::ThreadSocketHandler() // write buffer in this case before receiving more. This avoids // needlessly queueing received data, if the remote peer is not themselves // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is no (complete) message in the receive buffer, - // or there is space left in the buffer, select() for receiving data. - // * (if neither of the above applies, there is certainly one message - // in the receiver buffer ready to be processed). - // Together, that means that at least one of the following is always possible, - // so we don't deadlock: - // * We send some data. - // * We wait for data to be received (and disconnect after timeout). - // * We process a message in the buffer (message handler thread). + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) { - if (!pnode->vSendMsg.empty()) { - FD_SET(pnode->hSocket, &fdsetSend); - continue; - } + LOCK(pnode->cs_vSend); + if (!pnode->vSendMsg.empty()) { + FD_SET(pnode->hSocket, &fdsetSend); + continue; } } { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (!pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1177,7 +1182,8 @@ void CConnman::ThreadSocketHandler() int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - boost::this_thread::interruption_point(); + if (interruptNet) + return; if (nSelect == SOCKET_ERROR) { @@ -1190,7 +1196,8 @@ void CConnman::ThreadSocketHandler() } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); - MilliSleep(timeout.tv_usec/1000); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; } // @@ -1216,7 +1223,8 @@ void CConnman::ThreadSocketHandler() } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - boost::this_thread::interruption_point(); + if (interruptNet) + return; // // Receive @@ -1225,8 +1233,6 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) { { // typical socket buffer is 8K-64K @@ -1237,11 +1243,23 @@ void CConnman::ThreadSocketHandler() bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); - if(notify) - messageHandlerCondition.notify_one(); - pnode->nLastRecv = GetTime(); - pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; + } + WakeMessageHandler(); + } } else if (nBytes == 0) { @@ -1272,18 +1290,17 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetSend)) { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) { - size_t nBytes = SocketSendData(pnode); - if (nBytes) - RecordBytesSent(nBytes); + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); } } // // Inactivity checking // - int64_t nTime = GetTime(); + int64_t nTime = GetSystemTimeInSeconds(); if (nTime - pnode->nTimeConnected > 60) { if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) @@ -1316,8 +1333,14 @@ void CConnman::ThreadSocketHandler() } } - - +void CConnman::WakeMessageHandler() +{ + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + fMsgProcWake = true; + } + condMsgProc.notify_one(); +} @@ -1466,7 +1489,8 @@ void CConnman::ThreadDNSAddressSeed() // less influence on the network topology, and reduces traffic to the seeds. if ((addrman.size() > 0) && (!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { - MilliSleep(11 * 1000); + if (!interruptNet.sleep_for(std::chrono::seconds(11))) + return; LOCK(cs_vNodes); int nRelevant = 0; @@ -1566,21 +1590,23 @@ void CConnman::ProcessOneShot() void CConnman::ThreadOpenConnections() { // Connect to specific addresses - if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0) + if (mapMultiArgs.count("-connect") && mapMultiArgs.at("-connect").size() > 0) { for (int64_t nLoop = 0;; nLoop++) { ProcessOneShot(); - BOOST_FOREACH(const std::string& strAddr, mapMultiArgs["-connect"]) + BOOST_FOREACH(const std::string& strAddr, mapMultiArgs.at("-connect")) { CAddress addr(CService(), NODE_NONE); OpenNetworkConnection(addr, false, NULL, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } @@ -1589,14 +1615,16 @@ void CConnman::ThreadOpenConnections() // Minimum time before next feeler connection (in microseconds). int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); - while (true) + while (!interruptNet) { ProcessOneShot(); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; CSemaphoreGrant grant(*semOutbound); - boost::this_thread::interruption_point(); + if (interruptNet) + return; // Add seed nodes if DNS seeds are all down (an infrastructure attack?). if (addrman.size() == 0 && (GetTime() - nStart > 60)) { @@ -1622,7 +1650,12 @@ void CConnman::ThreadOpenConnections() { LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) { - if (!pnode->fInbound) { + if (!pnode->fInbound && !pnode->fAddnode) { + // Netgroups for inbound and addnode peers are not excluded because our goal here + // is to not use multiple of our limited outbound slots on a single netgroup + // but inbound and addnode peers do not use our outbound slots. Inbound peers + // also have the added issue that they're attacker controlled and could be used + // to prevent us from connecting to particular hosts if we used them here. setConnected.insert(pnode->addr.GetGroup()); nOutbound++; } @@ -1654,7 +1687,7 @@ void CConnman::ThreadOpenConnections() int64_t nANow = GetAdjustedTime(); int nTries = 0; - while (true) + while (!interruptNet) { CAddrInfo addr = addrman.Select(fFeeler); @@ -1697,7 +1730,8 @@ void CConnman::ThreadOpenConnections() if (fFeeler) { // Add small amount of random noise before connection to avoid synchronization. int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); - MilliSleep(randsleep); + if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep))) + return; LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); } @@ -1762,34 +1796,46 @@ void CConnman::ThreadOpenAddedConnections() { { LOCK(cs_vAddedNodes); - vAddedNodes = mapMultiArgs["-addnode"]; + if (mapMultiArgs.count("-addnode")) + vAddedNodes = mapMultiArgs.at("-addnode"); } - for (unsigned int i = 0; true; i++) + while (true) { + CSemaphoreGrant grant(*semAddnode); std::vector<AddedNodeInfo> vInfo = GetAddedNodeInfo(); + bool tried = false; for (const AddedNodeInfo& info : vInfo) { if (!info.fConnected) { - CSemaphoreGrant grant(*semOutbound); + if (!grant.TryAcquire()) { + // If we've used up our semaphore and need a new one, lets not wait here since while we are waiting + // the addednodeinfo state might change. + break; + } // If strAddedNode is an IP/port, decode it immediately, so // OpenNetworkConnection can detect existing connections to that IP/port. + tried = true; CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort())); - OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false); - MilliSleep(500); + OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false, false, true); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - - MilliSleep(120000); // Retry every 2 minutes + // Retry every 60 seconds if a connection was attempted, otherwise two seconds + if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2))) + return; } } // if successful, this moves the passed grant to the constructed node -bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler) +bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler, bool fAddnode) { // // Initiate outbound network connection // - boost::this_thread::interruption_point(); + if (interruptNet) { + return false; + } if (!fNetworkActive) { return false; } @@ -1802,28 +1848,30 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return false; CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure); - boost::this_thread::interruption_point(); if (!pnode) return false; if (grantOutbound) grantOutbound->MoveTo(pnode->grantOutbound); - pnode->fNetworkNode = true; if (fOneShot) pnode->fOneShot = true; if (fFeeler) pnode->fFeeler = true; + if (fAddnode) + pnode->fAddnode = true; + + GetNodeSignals().InitializeNode(pnode, *this); + { + LOCK(cs_vNodes); + vNodes.push_back(pnode); + } return true; } - void CConnman::ThreadMessageHandler() { - boost::mutex condition_mutex; - boost::unique_lock<boost::mutex> lock(condition_mutex); - - while (true) + while (!flagInterruptMsgProc) { std::vector<CNode*> vNodesCopy; { @@ -1834,7 +1882,7 @@ void CConnman::ThreadMessageHandler() } } - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1842,31 +1890,18 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - if (!GetNodeSignals().ProcessMessages(pnode, *this)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete())) - { - fSleep = false; - } - } - } - } - boost::this_thread::interruption_point(); + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); + if (flagInterruptMsgProc) + return; // Send messages { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) - GetNodeSignals().SendMessages(pnode, *this); + LOCK(pnode->cs_sendProcessing); + GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc); } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; } { @@ -1875,8 +1910,11 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) - messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100)); + std::unique_lock<std::mutex> lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); + } + fMsgProcWake = false; } } @@ -2064,10 +2102,13 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe nSendBufferMaxSize = 0; nReceiveFloodSize = 0; semOutbound = NULL; + semAddnode = NULL; nMaxConnections = 0; nMaxOutbound = 0; + nMaxAddnode = 0; nBestHeight = 0; clientInterface = NULL; + flagInterruptMsgProc = false; } NodeId CConnman::GetNewNodeId() @@ -2075,7 +2116,7 @@ NodeId CConnman::GetNewNodeId() return nLastNodeId.fetch_add(1, std::memory_order_relaxed); } -bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions) +bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions) { nTotalBytesRecv = 0; nTotalBytesSent = 0; @@ -2086,10 +2127,11 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st nLocalServices = connOptions.nLocalServices; nMaxConnections = connOptions.nMaxConnections; nMaxOutbound = std::min((connOptions.nMaxOutbound), nMaxConnections); + nMaxAddnode = connOptions.nMaxAddnode; nMaxFeeler = connOptions.nMaxFeeler; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; - nReceiveFloodSize = connOptions.nSendBufferMaxSize; + nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundLimit = connOptions.nMaxOutboundLimit; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; @@ -2138,39 +2180,40 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st // initialize semaphore semOutbound = new CSemaphore(std::min((nMaxOutbound + nMaxFeeler), nMaxConnections)); } - - if (pnodeLocalHost == NULL) { - CNetAddr local; - LookupHost("127.0.0.1", local, false); - - NodeId id = GetNewNodeId(); - uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); - - pnodeLocalHost = new CNode(id, nLocalServices, GetBestHeight(), INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices), 0, nonce); - GetNodeSignals().InitializeNode(pnodeLocalHost, *this); + if (semAddnode == NULL) { + // initialize semaphore + semAddnode = new CSemaphore(nMaxAddnode); } // // Start threads // + InterruptSocks5(false); + interruptNet.reset(); + flagInterruptMsgProc = false; + + { + std::unique_lock<std::mutex> lock(mutexMsgProc); + fMsgProcWake = false; + } + + // Send and receive from sockets, accept connections + threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); if (!GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); - - // Send and receive from sockets, accept connections - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this)))); + threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this))); // Initiate outbound connections from -addnode - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); + threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this))); // Initiate outbound connections unless connect=0 - if (!mapArgs.count("-connect") || mapMultiArgs["-connect"].size() != 1 || mapMultiArgs["-connect"][0] != "0") - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this)))); + if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0") + threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this))); // Process messages - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this)))); + threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); @@ -2193,12 +2236,38 @@ public: } instance_of_cnetcleanup; -void CConnman::Stop() +void CConnman::Interrupt() { - LogPrintf("%s\n",__func__); + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + flagInterruptMsgProc = true; + } + condMsgProc.notify_all(); + + interruptNet(); + InterruptSocks5(true); + if (semOutbound) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) semOutbound->post(); +} + +void CConnman::Stop() +{ + if (threadMessageHandler.joinable()) + threadMessageHandler.join(); + if (threadOpenConnections.joinable()) + threadOpenConnections.join(); + if (threadOpenAddedConnections.joinable()) + threadOpenAddedConnections.join(); + if (threadDNSAddressSeed.joinable()) + threadDNSAddressSeed.join(); + if (threadSocketHandler.joinable()) + threadSocketHandler.join(); + + if (semAddnode) + for (int i=0; i<nMaxAddnode; i++) + semOutbound->post(); if (fAddressesInitialized) { @@ -2227,9 +2296,8 @@ void CConnman::Stop() vhListenSocket.clear(); delete semOutbound; semOutbound = NULL; - if(pnodeLocalHost) - DeleteNode(pnodeLocalHost); - pnodeLocalHost = NULL; + delete semAddnode; + semAddnode = NULL; } void CConnman::DeleteNode(CNode* pnode) @@ -2244,6 +2312,7 @@ void CConnman::DeleteNode(CNode* pnode) CConnman::~CConnman() { + Interrupt(); Stop(); } @@ -2328,26 +2397,9 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) } } -bool CConnman::DisconnectAddress(const CNetAddr& netAddr) -{ - if (CNode* pnode = FindNode(netAddr)) { - pnode->fDisconnect = true; - return true; - } - return false; -} - -bool CConnman::DisconnectSubnet(const CSubNet& subNet) -{ - if (CNode* pnode = FindNode(subNet)) { - pnode->fDisconnect = true; - return true; - } - return false; -} - bool CConnman::DisconnectNode(const std::string& strNode) { + LOCK(cs_vNodes); if (CNode* pnode = FindNode(strNode)) { pnode->fDisconnect = true; return true; @@ -2366,16 +2418,6 @@ bool CConnman::DisconnectNode(NodeId id) return false; } -void CConnman::RelayTransaction(const CTransaction& tx) -{ - CInv inv(MSG_TX, tx.GetHash()); - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - { - pnode->PushInventory(inv); - } -} - void CConnman::RecordBytesRecv(uint64_t bytes) { LOCK(cs_totalBytesRecv); @@ -2522,16 +2564,16 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn nLastRecv = 0; nSendBytes = 0; nRecvBytes = 0; - nTimeConnected = GetTime(); + nTimeConnected = GetSystemTimeInSeconds(); nTimeOffset = 0; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; strSubVer = ""; fWhitelisted = false; fOneShot = false; + fAddnode = false; fClient = false; // set by version message fFeeler = false; - fNetworkNode = false; fSuccessfullyConnected = false; fDisconnect = false; nRefCount = 0; @@ -2559,6 +2601,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; + fPauseRecv = false; + fPauseSend = false; + nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; @@ -2612,6 +2657,11 @@ void CNode::AskFor(const CInv& inv) mapAskFor.insert(std::make_pair(nRequestTime, inv)); } +bool CConnman::NodeFullyConnected(const CNode* pnode) +{ + return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; +} + void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { size_t nMessageSize = msg.data.size(); @@ -2638,6 +2688,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->nSendSize += nTotalSize; + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); @@ -2660,19 +2712,19 @@ bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func) break; } } - return found != nullptr && func(found); + return found != nullptr && NodeFullyConnected(found) && func(found); } int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) { return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5); } -CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) +CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const { return CSipHasher(nSeed0, nSeed1).Write(id); } -uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) +uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const { std::vector<unsigned char> vchNetGroup(ad.GetGroup()); |