aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp194
1 files changed, 95 insertions, 99 deletions
diff --git a/src/net.cpp b/src/net.cpp
index d1f1b54007..1b6d16ad9b 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1353,46 +1353,45 @@ bool CConnman::InactivityCheck(const CNode& node) const
return false;
}
-bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
+bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
+ std::set<SOCKET>& recv_set,
+ std::set<SOCKET>& send_set,
+ std::set<SOCKET>& error_set)
{
for (const ListenSocket& hListenSocket : vhListenSocket) {
recv_set.insert(hListenSocket.socket);
}
- {
- LOCK(cs_vNodes);
- for (CNode* pnode : vNodes)
+ for (CNode* pnode : nodes) {
+ // Implement the following logic:
+ // * If there is data to send, select() for sending data. As this only
+ // happens when optimistic write failed, we choose to first drain the
+ // write buffer in this case before receiving more. This avoids
+ // needlessly queueing received data, if the remote peer is not themselves
+ // receiving data. This means properly utilizing TCP flow control signalling.
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
+
+ bool select_recv = !pnode->fPauseRecv;
+ bool select_send;
{
- // Implement the following logic:
- // * If there is data to send, select() for sending data. As this only
- // happens when optimistic write failed, we choose to first drain the
- // write buffer in this case before receiving more. This avoids
- // needlessly queueing received data, if the remote peer is not themselves
- // receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is space left in the receive buffer, select() for
- // receiving data.
- // * Hand off all complete messages to the processor, to be handled without
- // blocking here.
-
- bool select_recv = !pnode->fPauseRecv;
- bool select_send;
- {
- LOCK(pnode->cs_vSend);
- select_send = !pnode->vSendMsg.empty();
- }
+ LOCK(pnode->cs_vSend);
+ select_send = !pnode->vSendMsg.empty();
+ }
- LOCK(pnode->cs_hSocket);
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
- error_set.insert(pnode->hSocket);
- if (select_send) {
- send_set.insert(pnode->hSocket);
- continue;
- }
- if (select_recv) {
- recv_set.insert(pnode->hSocket);
- }
+ error_set.insert(pnode->hSocket);
+ if (select_send) {
+ send_set.insert(pnode->hSocket);
+ continue;
+ }
+ if (select_recv) {
+ recv_set.insert(pnode->hSocket);
}
}
@@ -1400,10 +1399,13 @@ bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &s
}
#ifdef USE_POLL
-void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
+void CConnman::SocketEvents(const std::vector<CNode*>& nodes,
+ std::set<SOCKET>& recv_set,
+ std::set<SOCKET>& send_set,
+ std::set<SOCKET>& error_set)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
- if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
+ if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
@@ -1442,10 +1444,13 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
}
}
#else
-void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
+void CConnman::SocketEvents(const std::vector<CNode*>& nodes,
+ std::set<SOCKET>& recv_set,
+ std::set<SOCKET>& send_set,
+ std::set<SOCKET>& error_set)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
- if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
+ if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
@@ -1519,34 +1524,33 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
void CConnman::SocketHandler()
{
- std::set<SOCKET> recv_set, send_set, error_set;
- SocketEvents(recv_set, send_set, error_set);
-
- if (interruptNet) return;
+ std::set<SOCKET> recv_set;
+ std::set<SOCKET> send_set;
+ std::set<SOCKET> error_set;
- //
- // Accept new connections
- //
- for (const ListenSocket& hListenSocket : vhListenSocket)
{
- if (hListenSocket.socket != INVALID_SOCKET && recv_set.count(hListenSocket.socket) > 0)
- {
- AcceptConnection(hListenSocket);
- }
- }
+ const NodesSnapshot snap{*this, /*shuffle=*/false};
- //
- // Service each socket
- //
- std::vector<CNode*> vNodesCopy;
- {
- LOCK(cs_vNodes);
- vNodesCopy = vNodes;
- for (CNode* pnode : vNodesCopy)
- pnode->AddRef();
+ // Check for the readiness of the already connected sockets and the
+ // listening sockets in one call ("readiness" as in poll(2) or
+ // select(2)). If none are ready, wait for a short while and return
+ // empty sets.
+ SocketEvents(snap.Nodes(), recv_set, send_set, error_set);
+
+ // Service (send/receive) each of the already connected nodes.
+ SocketHandlerConnected(snap.Nodes(), recv_set, send_set, error_set);
}
- for (CNode* pnode : vNodesCopy)
- {
+
+ // Accept new connections from listening sockets.
+ SocketHandlerListening(recv_set);
+}
+
+void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
+ const std::set<SOCKET>& recv_set,
+ const std::set<SOCKET>& send_set,
+ const std::set<SOCKET>& error_set)
+{
+ for (CNode* pnode : nodes) {
if (interruptNet)
return;
@@ -1628,10 +1632,17 @@ void CConnman::SocketHandler()
if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
- {
- LOCK(cs_vNodes);
- for (CNode* pnode : vNodesCopy)
- pnode->Release();
+}
+
+void CConnman::SocketHandlerListening(const std::set<SOCKET>& recv_set)
+{
+ for (const ListenSocket& listen_socket : vhListenSocket) {
+ if (interruptNet) {
+ return;
+ }
+ if (listen_socket.socket != INVALID_SOCKET && recv_set.count(listen_socket.socket) > 0) {
+ AcceptConnection(listen_socket);
+ }
}
}
@@ -2246,49 +2257,34 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
void CConnman::ThreadMessageHandler()
{
SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER);
- FastRandomContext rng;
while (!flagInterruptMsgProc)
{
- std::vector<CNode*> vNodesCopy;
- {
- LOCK(cs_vNodes);
- vNodesCopy = vNodes;
- for (CNode* pnode : vNodesCopy) {
- pnode->AddRef();
- }
- }
-
bool fMoreWork = false;
- // Randomize the order in which we process messages from/to our peers.
- // This prevents attacks in which an attacker exploits having multiple
- // consecutive connections in the vNodes list.
- Shuffle(vNodesCopy.begin(), vNodesCopy.end(), rng);
-
- for (CNode* pnode : vNodesCopy)
{
- if (pnode->fDisconnect)
- continue;
+ // Randomize the order in which we process messages from/to our peers.
+ // This prevents attacks in which an attacker exploits having multiple
+ // consecutive connections in the vNodes list.
+ const NodesSnapshot snap{*this, /*shuffle=*/true};
- // Receive messages
- bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
- fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
- if (flagInterruptMsgProc)
- return;
- // Send messages
- {
- LOCK(pnode->cs_sendProcessing);
- m_msgproc->SendMessages(pnode);
- }
+ for (CNode* pnode : snap.Nodes()) {
+ if (pnode->fDisconnect)
+ continue;
- if (flagInterruptMsgProc)
- return;
- }
+ // Receive messages
+ bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
+ if (flagInterruptMsgProc)
+ return;
+ // Send messages
+ {
+ LOCK(pnode->cs_sendProcessing);
+ m_msgproc->SendMessages(pnode);
+ }
- {
- LOCK(cs_vNodes);
- for (CNode* pnode : vNodesCopy)
- pnode->Release();
+ if (flagInterruptMsgProc)
+ return;
+ }
}
WAIT_LOCK(mutexMsgProc, lock);