diff options
author | Wladimir J. van der Laan <laanwj@gmail.com> | 2018-10-16 17:20:29 +0200 |
---|---|---|
committer | Wladimir J. van der Laan <laanwj@gmail.com> | 2018-10-16 17:20:34 +0200 |
commit | 23419e4c4939fe557e37ec9d17c15c6eb73febc6 (patch) | |
tree | bb8f665ca3619e62becf3562d15f9bf015d5a9c6 | |
parent | 2468471e13987b1be377e1b33fe9c5cdb7a7a3e3 (diff) | |
parent | 032488e6e7b4c2d37f3f434d575d25806ddcb0b0 (diff) |
Merge #14335: net: refactor: cleanup ThreadSocketHandler
032488e6e7b4c2d37f3f434d575d25806ddcb0b0 Move SocketHandler logic to private method. (Patrick Strateman)
2af9cff11ad3ef27893b4b2009ddbfcb208554a9 Move InactivityCheck logic to private method. (Patrick Strateman)
7479b63d9109ccaf3708a92dc65dc24556576551 Move DisconnectNodes logic to private method. (Patrick Strateman)
edb5350c32cdf7f8487777b5cc1a2ebfcdfc9e75 Move NotifyNumConnectionsChanged logic to private method. (Patrick Strateman)
Pull request description:
Working towards using poll() on unix like systems.
A number of small changes designed to separate the actual socket handling from the rest of the logic in ThreadSocketHandler.
This is a simpler version of #14147
Tree-SHA512: 72f35c8ef7649019dcbfe19537d8c9f7e3d0fc5854dc691a70c5573352230fc31c3f55565820c632e9b8cb3c55b878bed19e0ad9423100762197ac35967d8067
-rw-r--r-- | src/net.cpp | 491 | ||||
-rw-r--r-- | src/net.h | 5 |
2 files changed, 257 insertions, 239 deletions
diff --git a/src/net.cpp b/src/net.cpp index f83f39a67d..c8d3efceed 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1153,310 +1153,322 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { } } -void CConnman::ThreadSocketHandler() +void CConnman::DisconnectNodes() { - unsigned int nPrevNodeCount = 0; - while (!interruptNet) { - // - // Disconnect nodes - // - { - LOCK(cs_vNodes); + LOCK(cs_vNodes); - if (!fNetworkActive) { - // Disconnect any connected nodes - for (CNode* pnode : vNodes) { - if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); - pnode->fDisconnect = true; - } + if (!fNetworkActive) { + // Disconnect any connected nodes + for (CNode* pnode : vNodes) { + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); + pnode->fDisconnect = true; } } + } - // Disconnect unused nodes - std::vector<CNode*> vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) + // Disconnect unused nodes + std::vector<CNode*> vNodesCopy = vNodes; + for (CNode* pnode : vNodesCopy) + { + if (pnode->fDisconnect) { - if (pnode->fDisconnect) - { - // remove from vNodes - vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); + // remove from vNodes + vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); - // release outbound grant (if any) - pnode->grantOutbound.Release(); + // release outbound grant (if any) + pnode->grantOutbound.Release(); - // close socket and cleanup - pnode->CloseSocketDisconnect(); + // close socket and cleanup + pnode->CloseSocketDisconnect(); - // hold in disconnected pool until all refs are released - pnode->Release(); - vNodesDisconnected.push_back(pnode); - } + // hold in disconnected pool until all refs are released + pnode->Release(); + vNodesDisconnected.push_back(pnode); } } + } + { + // Delete disconnected nodes + std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected; + for (CNode* pnode : vNodesDisconnectedCopy) { - // Delete disconnected nodes - std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected; - for (CNode* pnode : vNodesDisconnectedCopy) - { - // wait until threads are done using it - if (pnode->GetRefCount() <= 0) { - bool fDelete = false; - { - TRY_LOCK(pnode->cs_inventory, lockInv); - if (lockInv) { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) { - fDelete = true; - } + // wait until threads are done using it + if (pnode->GetRefCount() <= 0) { + bool fDelete = false; + { + TRY_LOCK(pnode->cs_inventory, lockInv); + if (lockInv) { + TRY_LOCK(pnode->cs_vSend, lockSend); + if (lockSend) { + fDelete = true; } } - if (fDelete) { - vNodesDisconnected.remove(pnode); - DeleteNode(pnode); - } + } + if (fDelete) { + vNodesDisconnected.remove(pnode); + DeleteNode(pnode); } } } - size_t vNodesSize; + } +} + +void CConnman::NotifyNumConnectionsChanged() +{ + size_t vNodesSize; + { + LOCK(cs_vNodes); + vNodesSize = vNodes.size(); + } + if(vNodesSize != nPrevNodeCount) { + nPrevNodeCount = vNodesSize; + if(clientInterface) + clientInterface->NotifyNumConnectionsChanged(vNodesSize); + } +} + +void CConnman::InactivityCheck(CNode *pnode) +{ + int64_t nTime = GetSystemTimeInSeconds(); + if (nTime - pnode->nTimeConnected > 60) + { + if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) + { + LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId()); + pnode->fDisconnect = true; + } + else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) { - LOCK(cs_vNodes); - vNodesSize = vNodes.size(); + LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend); + pnode->fDisconnect = true; + } + else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60)) + { + LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv); + pnode->fDisconnect = true; + } + else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros()) + { + LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart)); + pnode->fDisconnect = true; } - if(vNodesSize != nPrevNodeCount) { - nPrevNodeCount = vNodesSize; - if(clientInterface) - clientInterface->NotifyNumConnectionsChanged(vNodesSize); + else if (!pnode->fSuccessfullyConnected) + { + LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId()); + pnode->fDisconnect = true; } + } +} - // - // Find which sockets have data to receive - // - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 50000; // frequency to poll pnode->vSend - - fd_set fdsetRecv; - fd_set fdsetSend; - fd_set fdsetError; - FD_ZERO(&fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - SOCKET hSocketMax = 0; - bool have_fds = false; +void CConnman::SocketHandler() +{ + // + // Find which sockets have data to receive + // + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 50000; // frequency to poll pnode->vSend - for (const ListenSocket& hListenSocket : vhListenSocket) { - FD_SET(hListenSocket.socket, &fdsetRecv); - hSocketMax = std::max(hSocketMax, hListenSocket.socket); - have_fds = true; - } + fd_set fdsetRecv; + fd_set fdsetSend; + fd_set fdsetError; + FD_ZERO(&fdsetRecv); + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + SOCKET hSocketMax = 0; + bool have_fds = false; + for (const ListenSocket& hListenSocket : vhListenSocket) { + FD_SET(hListenSocket.socket, &fdsetRecv); + hSocketMax = std::max(hSocketMax, hListenSocket.socket); + have_fds = true; + } + + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) + // Implement the following logic: + // * If there is data to send, select() for sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. + + bool select_recv = !pnode->fPauseRecv; + bool select_send; { - // Implement the following logic: - // * If there is data to send, select() for sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer, select() for - // receiving data. - // * Hand off all complete messages to the processor, to be handled without - // blocking here. - - bool select_recv = !pnode->fPauseRecv; - bool select_send; - { - LOCK(pnode->cs_vSend); - select_send = !pnode->vSendMsg.empty(); - } + LOCK(pnode->cs_vSend); + select_send = !pnode->vSendMsg.empty(); + } - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = std::max(hSocketMax, pnode->hSocket); - have_fds = true; + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = std::max(hSocketMax, pnode->hSocket); + have_fds = true; - if (select_send) { - FD_SET(pnode->hSocket, &fdsetSend); - continue; - } - if (select_recv) { - FD_SET(pnode->hSocket, &fdsetRecv); - } + if (select_send) { + FD_SET(pnode->hSocket, &fdsetSend); + continue; + } + if (select_recv) { + FD_SET(pnode->hSocket, &fdsetRecv); } } + } - int nSelect = select(have_fds ? hSocketMax + 1 : 0, - &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - if (interruptNet) - return; + int nSelect = select(have_fds ? hSocketMax + 1 : 0, + &fdsetRecv, &fdsetSend, &fdsetError, &timeout); + if (interruptNet) + return; - if (nSelect == SOCKET_ERROR) + if (nSelect == SOCKET_ERROR) + { + if (have_fds) { - if (have_fds) - { - int nErr = WSAGetLastError(); - LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); - for (unsigned int i = 0; i <= hSocketMax; i++) - FD_SET(i, &fdsetRecv); - } - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) - return; + int nErr = WSAGetLastError(); + LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); + for (unsigned int i = 0; i <= hSocketMax; i++) + FD_SET(i, &fdsetRecv); } + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; + } - // - // Accept new connections - // - for (const ListenSocket& hListenSocket : vhListenSocket) + // + // Accept new connections + // + for (const ListenSocket& hListenSocket : vhListenSocket) + { + if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) { - if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) - { - AcceptConnection(hListenSocket); - } + AcceptConnection(hListenSocket); } + } + + // + // Service each socket + // + std::vector<CNode*> vNodesCopy; + { + LOCK(cs_vNodes); + vNodesCopy = vNodes; + for (CNode* pnode : vNodesCopy) + pnode->AddRef(); + } + for (CNode* pnode : vNodesCopy) + { + if (interruptNet) + return; // - // Service each socket + // Receive // - std::vector<CNode*> vNodesCopy; + bool recvSet = false; + bool sendSet = false; + bool errorSet = false; { - LOCK(cs_vNodes); - vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) - pnode->AddRef(); + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); + sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); + errorSet = FD_ISSET(pnode->hSocket, &fdsetError); } - for (CNode* pnode : vNodesCopy) + if (recvSet || errorSet) { - if (interruptNet) - return; - - // - // Receive - // - bool recvSet = false; - bool sendSet = false; - bool errorSet = false; + // typical socket buffer is 8K-64K + char pchBuf[0x10000]; + int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; - recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); - sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); - errorSet = FD_ISSET(pnode->hSocket, &fdsetError); + nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); } - if (recvSet || errorSet) + if (nBytes > 0) { - // typical socket buffer is 8K-64K - char pchBuf[0x10000]; - int nBytes = 0; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; - nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); - } - if (nBytes > 0) - { - bool notify = false; - if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) - pnode->CloseSocketDisconnect(); - RecordBytesRecv(nBytes); - if (notify) { - size_t nSizeAdded = 0; - auto it(pnode->vRecvMsg.begin()); - for (; it != pnode->vRecvMsg.end(); ++it) { - if (!it->complete()) - break; - nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; - } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; - } - WakeMessageHandler(); - } - } - else if (nBytes == 0) - { - // socket closed gracefully - if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "socket closed\n"); - } + bool notify = false; + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } { - if (!pnode->fDisconnect) - LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); - pnode->CloseSocketDisconnect(); + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } + WakeMessageHandler(); } } - - // - // Send - // - if (sendSet) + else if (nBytes == 0) { - LOCK(pnode->cs_vSend); - size_t nBytes = SocketSendData(pnode); - if (nBytes) { - RecordBytesSent(nBytes); + // socket closed gracefully + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "socket closed\n"); } + pnode->CloseSocketDisconnect(); } - - // - // Inactivity checking - // - int64_t nTime = GetSystemTimeInSeconds(); - if (nTime - pnode->nTimeConnected > 60) + else if (nBytes < 0) { - if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) - { - LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId()); - pnode->fDisconnect = true; - } - else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) - { - LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend); - pnode->fDisconnect = true; - } - else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60)) - { - LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv); - pnode->fDisconnect = true; - } - else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros()) - { - LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart)); - pnode->fDisconnect = true; - } - else if (!pnode->fSuccessfullyConnected) + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { - LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId()); - pnode->fDisconnect = true; + if (!pnode->fDisconnect) + LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); + pnode->CloseSocketDisconnect(); } } } + + // + // Send + // + if (sendSet) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodesCopy) - pnode->Release(); + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); + } } + + InactivityCheck(pnode); + } + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodesCopy) + pnode->Release(); + } +} + +void CConnman::ThreadSocketHandler() +{ + while (!interruptNet) + { + DisconnectNodes(); + NotifyNumConnectionsChanged(); + SocketHandler(); } } @@ -2217,6 +2229,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe setBannedIsDirty = false; fAddressesInitialized = false; nLastNodeId = 0; + nPrevNodeCount = 0; nSendBufferMaxSize = 0; nReceiveFloodSize = 0; flagInterruptMsgProc = false; @@ -338,6 +338,10 @@ private: void ThreadOpenConnections(std::vector<std::string> connect); void ThreadMessageHandler(); void AcceptConnection(const ListenSocket& hListenSocket); + void DisconnectNodes(); + void NotifyNumConnectionsChanged(); + void InactivityCheck(CNode *pnode); + void SocketHandler(); void ThreadSocketHandler(); void ThreadDNSAddressSeed(); @@ -408,6 +412,7 @@ private: std::list<CNode*> vNodesDisconnected; mutable CCriticalSection cs_vNodes; std::atomic<NodeId> nLastNodeId; + unsigned int nPrevNodeCount; /** Services this instance offers */ ServiceFlags nLocalServices; |