diff options
author | Pieter Wuille <pieter.wuille@gmail.com> | 2017-01-13 09:51:04 -0800 |
---|---|---|
committer | Pieter Wuille <pieter.wuille@gmail.com> | 2017-01-13 10:02:18 -0800 |
commit | 8b66bf74e2a349e71eaa183af81fa63eaee76ad2 (patch) | |
tree | 956237061fdcc7239b95b1fdf63ad9e69952b2f2 /src | |
parent | 02e5308c1b9f3771bbe49bc5036215fa2bd66aa9 (diff) | |
parent | e60360e139852c655930e99d4bb4db554cd8385e (diff) |
Merge #9441: Net: Massive speedup. Net locks overhaul
e60360e net: remove cs_vRecvMsg (Cory Fields)
991955e net: add a flag to indicate when a node's send buffer is full (Cory Fields)
c6e8a9b net: add a flag to indicate when a node's process queue is full (Cory Fields)
4d712e3 net: add a new message queue for the message processor (Cory Fields)
c5a8b1b net: rework the way that the messagehandler sleeps (Cory Fields)
c72cc88 net: remove useless comments (Cory Fields)
ef7b5ec net: Add a simple function for waking the message handler (Cory Fields)
f5c36d1 net: record bytes written before notifying the message processor (Cory Fields)
60befa3 net: handle message accounting in ReceiveMsgBytes (Cory Fields)
56212e2 net: set message deserialization version when it's actually time to deserialize (Cory Fields)
0e973d9 net: remove redundant max sendbuffer size check (Cory Fields)
6042587 net: wait until the node is destroyed to delete its recv buffer (Cory Fields)
f6315e0 net: only disconnect if fDisconnect has been set (Cory Fields)
5b4a8ac net: make GetReceiveFloodSize public (Cory Fields)
e5bcd9c net: make vRecvMsg a list so that we can use splice() (Cory Fields)
53ad9a1 net: fix typo causing the wrong receive buffer size (Cory Fields)
Diffstat (limited to 'src')
-rw-r--r-- | src/net.cpp | 115 | ||||
-rw-r--r-- | src/net.h | 38 | ||||
-rw-r--r-- | src/net_processing.cpp | 78 | ||||
-rw-r--r-- | src/net_processing.h | 1 |
4 files changed, 107 insertions, 125 deletions
diff --git a/src/net.cpp b/src/net.cpp index 37e7dfed4c..b275bdd809 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect() LogPrint("net", "disconnecting peer=%d\n", id); CloseSocket(hSocket); } - - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); } void CConnman::ClearBanned() @@ -650,16 +645,18 @@ void CNode::copyStats(CNodeStats &stats) } #undef X -// requires LOCK(cs_vRecvMsg) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; + int64_t nTimeMicros = GetTimeMicros(); + nLastRecv = nTimeMicros / 1000000; + nRecvBytes += nBytes; while (nBytes > 0) { // get current incomplete message, or create a new one if (vRecvMsg.empty() || vRecvMsg.back().complete()) - vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion)); + vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION)); CNetMessage& msg = vRecvMsg.back(); @@ -691,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete assert(i != mapRecvBytesPerMsgCmd.end()); i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; - msg.nTime = GetTimeMicros(); + msg.nTime = nTimeMicros; complete = true; } } @@ -764,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -781,6 +778,7 @@ size_t SocketSendData(CNode *pnode) if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1052,8 +1050,7 @@ void CConnman::ThreadSocketHandler() std::vector<CNode*> vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0)) + if (pnode->fDisconnect) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -1083,13 +1080,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { TRY_LOCK(pnode->cs_inventory, lockInv); if (lockInv) fDelete = true; - } } } if (fDelete) @@ -1149,15 +1142,10 @@ void CConnman::ThreadSocketHandler() // write buffer in this case before receiving more. This avoids // needlessly queueing received data, if the remote peer is not themselves // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is no (complete) message in the receive buffer, - // or there is space left in the buffer, select() for receiving data. - // * (if neither of the above applies, there is certainly one message - // in the receiver buffer ready to be processed). - // Together, that means that at least one of the following is always possible, - // so we don't deadlock: - // * We send some data. - // * We wait for data to be received (and disconnect after timeout). - // * We process a message in the buffer (message handler thread). + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { @@ -1168,10 +1156,7 @@ void CConnman::ThreadSocketHandler() } } { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (!pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1230,8 +1215,6 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) { { // typical socket buffer is 8K-64K @@ -1242,11 +1225,23 @@ void CConnman::ThreadSocketHandler() bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); - if(notify) - condMsgProc.notify_one(); - pnode->nLastRecv = GetTime(); - pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; + } + WakeMessageHandler(); + } } else if (nBytes == 0) { @@ -1280,8 +1275,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { size_t nBytes = SocketSendData(pnode); - if (nBytes) + if (nBytes) { RecordBytesSent(nBytes); + } } } @@ -1321,8 +1317,14 @@ void CConnman::ThreadSocketHandler() } } - - +void CConnman::WakeMessageHandler() +{ + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + fMsgProcWake = true; + } + condMsgProc.notify_one(); +} @@ -1858,7 +1860,7 @@ void CConnman::ThreadMessageHandler() } } - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1866,22 +1868,8 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete())) - { - fSleep = false; - } - } - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; @@ -1901,10 +1889,11 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) { - std::unique_lock<std::mutex> lock(mutexMsgProc); - condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + std::unique_lock<std::mutex> lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); } + fMsgProcWake = false; } } @@ -2121,7 +2110,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c nMaxFeeler = connOptions.nMaxFeeler; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; - nReceiveFloodSize = connOptions.nSendBufferMaxSize; + nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundLimit = connOptions.nMaxOutboundLimit; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; @@ -2182,6 +2171,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c interruptNet.reset(); flagInterruptMsgProc = false; + { + std::unique_lock<std::mutex> lock(mutexMsgProc); + fMsgProcWake = false; + } + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); @@ -2613,6 +2607,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; + fPauseRecv = false; + fPauseSend = false; + nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; @@ -2692,6 +2689,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->nSendSize += nTotalSize; + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); @@ -327,6 +327,7 @@ public: /** Get a unique deterministic randomizer. */ CSipHasher GetDeterministicRandomizer(uint64_t id); + unsigned int GetReceiveFloodSize() const; private: struct ListenSocket { SOCKET socket; @@ -343,6 +344,8 @@ private: void ThreadSocketHandler(); void ThreadDNSAddressSeed(); + void WakeMessageHandler(); + uint64_t CalculateKeyedNetGroup(const CAddress& ad); CNode* FindNode(const CNetAddr& ip); @@ -358,6 +361,7 @@ private: NodeId GetNewNodeId(); + size_t SocketSendData(CNode *pnode); //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist @@ -368,8 +372,6 @@ private: void DumpData(); void DumpBanlist(); - unsigned int GetReceiveFloodSize() const; - // Network stats void RecordBytesRecv(uint64_t bytes); void RecordBytesSent(uint64_t bytes); @@ -428,6 +430,9 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + /** flag for waking the message processor. */ + bool fMsgProcWake; + std::condition_variable condMsgProc; std::mutex mutexMsgProc; std::atomic<bool> flagInterruptMsgProc; @@ -445,7 +450,6 @@ void Discover(boost::thread_group& threadGroup); void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -size_t SocketSendData(CNode *pnode); struct CombinerAll { @@ -610,11 +614,13 @@ public: std::deque<std::vector<unsigned char>> vSendMsg; CCriticalSection cs_vSend; + CCriticalSection cs_vProcessMsg; + std::list<CNetMessage> vProcessMsg; + size_t nProcessQueueSize; + std::deque<CInv> vRecvGetData; - std::deque<CNetMessage> vRecvMsg; - CCriticalSection cs_vRecvMsg; uint64_t nRecvBytes; - int nRecvVersion; + std::atomic<int> nRecvVersion; int64_t nLastSend; int64_t nLastRecv; @@ -650,6 +656,8 @@ public: const NodeId id; const uint64_t nKeyedNetGroup; + std::atomic_bool fPauseRecv; + std::atomic_bool fPauseSend; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; @@ -723,6 +731,7 @@ private: const ServiceFlags nLocalServices; const int nMyStartingHeight; int nSendVersion; + std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread public: NodeId GetId() const { @@ -743,24 +752,15 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) - unsigned int GetTotalRecvSize() - { - unsigned int total = 0; - BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) - total += msg.vRecv.size() + 24; - return total; - } - - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); - // requires LOCK(cs_vRecvMsg) void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; - BOOST_FOREACH(CNetMessage &msg, vRecvMsg) - msg.SetVersion(nVersionIn); + } + int GetRecvVersion() + { + return nRecvVersion; } void SetSendVersion(int nVersionIn) { diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 72c403a57e..ee52d1a9af 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); vector<CInv> vNotFound; CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); while (it != pfrom->vRecvGetData.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) break; const CInv &inv = *it; @@ -1059,8 +1058,6 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0) { @@ -1413,11 +1410,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Track requests for our stuff GetMainSignals().Inventory(inv.hash); - - if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) { - Misbehaving(pfrom->GetId(), 50); - return error("send buffer size() = %u", pfrom->nSendSize); - } } if (!vToFetch.empty()) @@ -2450,14 +2442,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, return true; } -// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - //if (fDebug) - // LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size()); - // // Message format // (4) message start @@ -2466,40 +2453,40 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru // (4) checksum // (x) data // - bool fOk = true; + bool fMoreWork = false; if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (pfrom->fDisconnect) + return false; + // this maintains the order of responses - if (!pfrom->vRecvGetData.empty()) return fOk; + if (!pfrom->vRecvGetData.empty()) return true; - std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin(); - while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) - break; - - // get next message - CNetMessage& msg = *it; - - //if (fDebug) - // LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__, - // msg.hdr.nMessageSize, msg.vRecv.size(), - // msg.complete() ? "Y" : "N"); - - // end, if an incomplete message is found - if (!msg.complete()) - break; - - // at this point, any failure means we can delete the current message - it++; + if (pfrom->fPauseSend) + return false; + std::list<CNetMessage> msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) + return false; + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); + + msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); - fOk = false; - break; + pfrom->fDisconnect = true; + return false; } // Read header @@ -2507,7 +2494,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru if (!hdr.IsValid(chainparams.MessageStart())) { LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); - continue; + return fMoreWork; } string strCommand = hdr.GetCommand(); @@ -2523,7 +2510,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru SanitizeString(strCommand), nMessageSize, HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - continue; + return fMoreWork; } // Process message @@ -2532,7 +2519,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru { fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); if (interruptMsgProc) - return true; + return false; + if (!pfrom->vRecvGetData.empty()) + fMoreWork = true; } catch (const std::ios_base::failure& e) { @@ -2566,14 +2555,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru if (!fRet) LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); - break; - } - - // In case the connection got shut down, its receive buffer was wiped - if (!pfrom->fDisconnect) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); - - return fOk; + return fMoreWork; } class CompareInvMempoolOrder diff --git a/src/net_processing.h b/src/net_processing.h index 230d805bd4..1f33def1f7 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru * @param[in] pto The node which we are sending messages to. * @param[in] connman The connection manager for that node. * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done */ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt); |