aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp328
1 files changed, 166 insertions, 162 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 644fed5ef8..1b47f288fd 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1262,209 +1262,213 @@ void CConnman::InactivityCheck(CNode *pnode)
}
}
-void CConnman::ThreadSocketHandler()
+void CConnman::SocketHandler()
{
- while (!interruptNet)
- {
- DisconnectNodes();
- NotifyNumConnectionsChanged();
+ //
+ // Find which sockets have data to receive
+ //
+ struct timeval timeout;
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 50000; // frequency to poll pnode->vSend
- //
- // Find which sockets have data to receive
- //
- struct timeval timeout;
- timeout.tv_sec = 0;
- timeout.tv_usec = 50000; // frequency to poll pnode->vSend
-
- fd_set fdsetRecv;
- fd_set fdsetSend;
- fd_set fdsetError;
- FD_ZERO(&fdsetRecv);
- FD_ZERO(&fdsetSend);
- FD_ZERO(&fdsetError);
- SOCKET hSocketMax = 0;
- bool have_fds = false;
+ fd_set fdsetRecv;
+ fd_set fdsetSend;
+ fd_set fdsetError;
+ FD_ZERO(&fdsetRecv);
+ FD_ZERO(&fdsetSend);
+ FD_ZERO(&fdsetError);
+ SOCKET hSocketMax = 0;
+ bool have_fds = false;
- for (const ListenSocket& hListenSocket : vhListenSocket) {
- FD_SET(hListenSocket.socket, &fdsetRecv);
- hSocketMax = std::max(hSocketMax, hListenSocket.socket);
- have_fds = true;
- }
+ for (const ListenSocket& hListenSocket : vhListenSocket) {
+ FD_SET(hListenSocket.socket, &fdsetRecv);
+ hSocketMax = std::max(hSocketMax, hListenSocket.socket);
+ have_fds = true;
+ }
+ {
+ LOCK(cs_vNodes);
+ for (CNode* pnode : vNodes)
{
- LOCK(cs_vNodes);
- for (CNode* pnode : vNodes)
+ // Implement the following logic:
+ // * If there is data to send, select() for sending data. As this only
+ // happens when optimistic write failed, we choose to first drain the
+ // write buffer in this case before receiving more. This avoids
+ // needlessly queueing received data, if the remote peer is not themselves
+ // receiving data. This means properly utilizing TCP flow control signalling.
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
+
+ bool select_recv = !pnode->fPauseRecv;
+ bool select_send;
{
- // Implement the following logic:
- // * If there is data to send, select() for sending data. As this only
- // happens when optimistic write failed, we choose to first drain the
- // write buffer in this case before receiving more. This avoids
- // needlessly queueing received data, if the remote peer is not themselves
- // receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is space left in the receive buffer, select() for
- // receiving data.
- // * Hand off all complete messages to the processor, to be handled without
- // blocking here.
-
- bool select_recv = !pnode->fPauseRecv;
- bool select_send;
- {
- LOCK(pnode->cs_vSend);
- select_send = !pnode->vSendMsg.empty();
- }
+ LOCK(pnode->cs_vSend);
+ select_send = !pnode->vSendMsg.empty();
+ }
- LOCK(pnode->cs_hSocket);
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
- FD_SET(pnode->hSocket, &fdsetError);
- hSocketMax = std::max(hSocketMax, pnode->hSocket);
- have_fds = true;
+ FD_SET(pnode->hSocket, &fdsetError);
+ hSocketMax = std::max(hSocketMax, pnode->hSocket);
+ have_fds = true;
- if (select_send) {
- FD_SET(pnode->hSocket, &fdsetSend);
- continue;
- }
- if (select_recv) {
- FD_SET(pnode->hSocket, &fdsetRecv);
- }
+ if (select_send) {
+ FD_SET(pnode->hSocket, &fdsetSend);
+ continue;
+ }
+ if (select_recv) {
+ FD_SET(pnode->hSocket, &fdsetRecv);
}
}
+ }
- int nSelect = select(have_fds ? hSocketMax + 1 : 0,
- &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
- if (interruptNet)
- return;
+ int nSelect = select(have_fds ? hSocketMax + 1 : 0,
+ &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
+ if (interruptNet)
+ return;
- if (nSelect == SOCKET_ERROR)
+ if (nSelect == SOCKET_ERROR)
+ {
+ if (have_fds)
{
- if (have_fds)
- {
- int nErr = WSAGetLastError();
- LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
- for (unsigned int i = 0; i <= hSocketMax; i++)
- FD_SET(i, &fdsetRecv);
- }
- FD_ZERO(&fdsetSend);
- FD_ZERO(&fdsetError);
- if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
- return;
+ int nErr = WSAGetLastError();
+ LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
+ for (unsigned int i = 0; i <= hSocketMax; i++)
+ FD_SET(i, &fdsetRecv);
}
+ FD_ZERO(&fdsetSend);
+ FD_ZERO(&fdsetError);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
+ return;
+ }
- //
- // Accept new connections
- //
- for (const ListenSocket& hListenSocket : vhListenSocket)
+ //
+ // Accept new connections
+ //
+ for (const ListenSocket& hListenSocket : vhListenSocket)
+ {
+ if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
{
- if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
- {
- AcceptConnection(hListenSocket);
- }
+ AcceptConnection(hListenSocket);
}
+ }
+
+ //
+ // Service each socket
+ //
+ std::vector<CNode*> vNodesCopy;
+ {
+ LOCK(cs_vNodes);
+ vNodesCopy = vNodes;
+ for (CNode* pnode : vNodesCopy)
+ pnode->AddRef();
+ }
+ for (CNode* pnode : vNodesCopy)
+ {
+ if (interruptNet)
+ return;
//
- // Service each socket
+ // Receive
//
- std::vector<CNode*> vNodesCopy;
+ bool recvSet = false;
+ bool sendSet = false;
+ bool errorSet = false;
{
- LOCK(cs_vNodes);
- vNodesCopy = vNodes;
- for (CNode* pnode : vNodesCopy)
- pnode->AddRef();
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+ recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
+ sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
+ errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
}
- for (CNode* pnode : vNodesCopy)
+ if (recvSet || errorSet)
{
- if (interruptNet)
- return;
-
- //
- // Receive
- //
- bool recvSet = false;
- bool sendSet = false;
- bool errorSet = false;
+ // typical socket buffer is 8K-64K
+ char pchBuf[0x10000];
+ int nBytes = 0;
{
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
- recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
- sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
- errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
+ nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
}
- if (recvSet || errorSet)
+ if (nBytes > 0)
{
- // typical socket buffer is 8K-64K
- char pchBuf[0x10000];
- int nBytes = 0;
- {
- LOCK(pnode->cs_hSocket);
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
- }
- if (nBytes > 0)
- {
- bool notify = false;
- if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
- pnode->CloseSocketDisconnect();
- RecordBytesRecv(nBytes);
- if (notify) {
- size_t nSizeAdded = 0;
- auto it(pnode->vRecvMsg.begin());
- for (; it != pnode->vRecvMsg.end(); ++it) {
- if (!it->complete())
- break;
- nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
- }
- {
- LOCK(pnode->cs_vProcessMsg);
- pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
- pnode->nProcessQueueSize += nSizeAdded;
- pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
- }
- WakeMessageHandler();
- }
- }
- else if (nBytes == 0)
- {
- // socket closed gracefully
- if (!pnode->fDisconnect) {
- LogPrint(BCLog::NET, "socket closed\n");
- }
+ bool notify = false;
+ if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
- }
- else if (nBytes < 0)
- {
- // error
- int nErr = WSAGetLastError();
- if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ RecordBytesRecv(nBytes);
+ if (notify) {
+ size_t nSizeAdded = 0;
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
+ }
{
- if (!pnode->fDisconnect)
- LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
- pnode->CloseSocketDisconnect();
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
+ WakeMessageHandler();
}
}
-
- //
- // Send
- //
- if (sendSet)
+ else if (nBytes == 0)
{
- LOCK(pnode->cs_vSend);
- size_t nBytes = SocketSendData(pnode);
- if (nBytes) {
- RecordBytesSent(nBytes);
+ // socket closed gracefully
+ if (!pnode->fDisconnect) {
+ LogPrint(BCLog::NET, "socket closed\n");
+ }
+ pnode->CloseSocketDisconnect();
+ }
+ else if (nBytes < 0)
+ {
+ // error
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ {
+ if (!pnode->fDisconnect)
+ LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
+ pnode->CloseSocketDisconnect();
}
}
-
- InactivityCheck(pnode);
}
+
+ //
+ // Send
+ //
+ if (sendSet)
{
- LOCK(cs_vNodes);
- for (CNode* pnode : vNodesCopy)
- pnode->Release();
+ LOCK(pnode->cs_vSend);
+ size_t nBytes = SocketSendData(pnode);
+ if (nBytes) {
+ RecordBytesSent(nBytes);
+ }
}
+
+ InactivityCheck(pnode);
+ }
+ {
+ LOCK(cs_vNodes);
+ for (CNode* pnode : vNodesCopy)
+ pnode->Release();
+ }
+}
+
+void CConnman::ThreadSocketHandler()
+{
+ while (!interruptNet)
+ {
+ DisconnectNodes();
+ NotifyNumConnectionsChanged();
+ SocketHandler();
}
}