From 75e8bf55f5a014faada7712a9640dc35e8c86f15 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Mon, 26 Apr 2021 16:22:07 +0200 Subject: net: dedup and RAII-fy the creation of a copy of CConnman::vNodes The following pattern was duplicated in CConnman: ```cpp lock create a copy of vNodes, add a reference to each one unlock ... use the copy ... lock release each node from the copy unlock ``` Put that code in a RAII helper that reduces it to: ```cpp create snapshot "snap" ... use the copy ... // release happens when "snap" goes out of scope ``` --- src/net.cpp | 72 ++++++++++++++++++++----------------------------------------- src/net.h | 37 +++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 49 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index ad558dd598..165a28c04e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1513,18 +1513,12 @@ void CConnman::SocketHandler() } } + const NodesSnapshot snap{*this, /*shuffle=*/false}; + // // Service each socket // - std::vector vNodesCopy; - { - LOCK(cs_vNodes); - vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) - pnode->AddRef(); - } - for (CNode* pnode : vNodesCopy) - { + for (CNode* pnode : snap.Nodes()) { if (interruptNet) return; @@ -1606,11 +1600,6 @@ void CConnman::SocketHandler() if (InactivityCheck(*pnode)) pnode->fDisconnect = true; } - { - LOCK(cs_vNodes); - for (CNode* pnode : vNodesCopy) - pnode->Release(); - } } void CConnman::ThreadSocketHandler() @@ -2224,49 +2213,34 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai void CConnman::ThreadMessageHandler() { SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER); - FastRandomContext rng; while (!flagInterruptMsgProc) { - std::vector vNodesCopy; - { - LOCK(cs_vNodes); - vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) { - pnode->AddRef(); - } - } - bool fMoreWork = false; - // Randomize the order in which we process messages from/to our peers. - // This prevents attacks in which an attacker exploits having multiple - // consecutive connections in the vNodes list. - Shuffle(vNodesCopy.begin(), vNodesCopy.end(), rng); - - for (CNode* pnode : vNodesCopy) { - if (pnode->fDisconnect) - continue; + // Randomize the order in which we process messages from/to our peers. + // This prevents attacks in which an attacker exploits having multiple + // consecutive connections in the vNodes list. + const NodesSnapshot snap{*this, /*shuffle=*/true}; - // Receive messages - bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); - if (flagInterruptMsgProc) - return; - // Send messages - { - LOCK(pnode->cs_sendProcessing); - m_msgproc->SendMessages(pnode); - } + for (CNode* pnode : snap.Nodes()) { + if (pnode->fDisconnect) + continue; - if (flagInterruptMsgProc) - return; - } + // Receive messages + bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); + if (flagInterruptMsgProc) + return; + // Send messages + { + LOCK(pnode->cs_sendProcessing); + m_msgproc->SendMessages(pnode); + } - { - LOCK(cs_vNodes); - for (CNode* pnode : vNodesCopy) - pnode->Release(); + if (flagInterruptMsgProc) + return; + } } WAIT_LOCK(mutexMsgProc, lock); diff --git a/src/net.h b/src/net.h index 48dfb3043f..7b97b98ae5 100644 --- a/src/net.h +++ b/src/net.h @@ -1177,6 +1177,43 @@ private: */ std::vector m_onion_binds; + /** + * RAII helper to atomically create a copy of `vNodes` and add a reference + * to each of the nodes. The nodes are released when this object is destroyed. + */ + class NodesSnapshot + { + public: + explicit NodesSnapshot(const CConnman& connman, bool shuffle) + { + { + LOCK(connman.cs_vNodes); + m_nodes_copy = connman.vNodes; + for (auto& node : m_nodes_copy) { + node->AddRef(); + } + } + if (shuffle) { + Shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{}); + } + } + + ~NodesSnapshot() + { + for (auto& node : m_nodes_copy) { + node->Release(); + } + } + + const std::vector& Nodes() const + { + return m_nodes_copy; + } + + private: + std::vector m_nodes_copy; + }; + friend struct CConnmanTest; friend struct ConnmanTestMsg; }; -- cgit v1.2.3 From 664ac22c5379db65757fc3aab91fff8765683e7f Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 28 Apr 2021 18:29:32 +0200 Subject: net: keep reference to each node during socket wait Create the snapshot of `CConnman::vNodes` to operate on earlier in `CConnman::SocketHandler()`, before calling `CConnman::SocketEvents()` and pass the `vNodes` copy from the snapshot to `SocketEvents()`. This will keep the refcount of each node incremented during `SocketEvents()` so that the `CNode` object is not destroyed before `SocketEvents()` has finished. Currently in `SocketEvents()` we only remember file descriptor numbers (when not holding `CConnman::cs_vNodes`) which is safe, but we will change this to remember pointers to `CNode::m_sock`. --- src/net.cpp | 29 ++++++++++++++++++----------- src/net.h | 29 +++++++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index 165a28c04e..6fab80f976 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1331,16 +1331,17 @@ bool CConnman::InactivityCheck(const CNode& node) const return false; } -bool CConnman::GenerateSelectSet(std::set &recv_set, std::set &send_set, std::set &error_set) +bool CConnman::GenerateSelectSet(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set) { for (const ListenSocket& hListenSocket : vhListenSocket) { recv_set.insert(hListenSocket.socket); } { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) - { + for (CNode* pnode : nodes) { // Implement the following logic: // * If there is data to send, select() for sending data. As this only // happens when optimistic write failed, we choose to first drain the @@ -1378,10 +1379,13 @@ bool CConnman::GenerateSelectSet(std::set &recv_set, std::set &s } #ifdef USE_POLL -void CConnman::SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set) +void CConnman::SocketEvents(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set) { std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { + if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); return; } @@ -1420,10 +1424,13 @@ void CConnman::SocketEvents(std::set &recv_set, std::set &send_s } } #else -void CConnman::SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set) +void CConnman::SocketEvents(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set) { std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { + if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); return; } @@ -1497,8 +1504,10 @@ void CConnman::SocketEvents(std::set &recv_set, std::set &send_s void CConnman::SocketHandler() { + const NodesSnapshot snap{*this, /*shuffle=*/false}; + std::set recv_set, send_set, error_set; - SocketEvents(recv_set, send_set, error_set); + SocketEvents(snap.Nodes(), recv_set, send_set, error_set); if (interruptNet) return; @@ -1513,8 +1522,6 @@ void CConnman::SocketHandler() } } - const NodesSnapshot snap{*this, /*shuffle=*/false}; - // // Service each socket // diff --git a/src/net.h b/src/net.h index 7b97b98ae5..f03883ca54 100644 --- a/src/net.h +++ b/src/net.h @@ -983,8 +983,33 @@ private: void NotifyNumConnectionsChanged(); /** Return true if the peer is inactive and should be disconnected. */ bool InactivityCheck(const CNode& node) const; - bool GenerateSelectSet(std::set &recv_set, std::set &send_set, std::set &error_set); - void SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set); + + /** + * Generate a collection of sockets to check for IO readiness. + * @param[in] nodes Select from these nodes' sockets. + * @param[out] recv_set Sockets to check for read readiness. + * @param[out] send_set Sockets to check for write readiness. + * @param[out] error_set Sockets to check for errors. + * @return true if at least one socket is to be checked (the returned set is not empty) + */ + bool GenerateSelectSet(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set); + + /** + * Check which sockets are ready for IO. + * @param[in] nodes Select from these nodes' sockets. + * @param[out] recv_set Sockets which are ready for read. + * @param[out] send_set Sockets which are ready for write. + * @param[out] error_set Sockets which have errors. + * This calls `GenerateSelectSet()` to gather a list of sockets to check. + */ + void SocketEvents(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set); + void SocketHandler(); void ThreadSocketHandler(); void ThreadDNSAddressSeed(); -- cgit v1.2.3 From c7eb19ec8302e6a5abd89c0566540c2c862e9121 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Mon, 25 Oct 2021 13:49:33 +0200 Subject: style: remove unnecessary braces They were needed to define the scope of `LOCK(cs_vNodes)` which was removed in the previous commit. Re-indent in a separate commit to ease review (use `--ignore-space-change`). --- src/net.cpp | 58 ++++++++++++++++++++++++++++------------------------------ 1 file changed, 28 insertions(+), 30 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index 6fab80f976..610a795c64 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1340,38 +1340,36 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, recv_set.insert(hListenSocket.socket); } - { - for (CNode* pnode : nodes) { - // Implement the following logic: - // * If there is data to send, select() for sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer, select() for - // receiving data. - // * Hand off all complete messages to the processor, to be handled without - // blocking here. - - bool select_recv = !pnode->fPauseRecv; - bool select_send; - { - LOCK(pnode->cs_vSend); - select_send = !pnode->vSendMsg.empty(); - } + for (CNode* pnode : nodes) { + // Implement the following logic: + // * If there is data to send, select() for sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. + + bool select_recv = !pnode->fPauseRecv; + bool select_send; + { + LOCK(pnode->cs_vSend); + select_send = !pnode->vSendMsg.empty(); + } - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; - error_set.insert(pnode->hSocket); - if (select_send) { - send_set.insert(pnode->hSocket); - continue; - } - if (select_recv) { - recv_set.insert(pnode->hSocket); - } + error_set.insert(pnode->hSocket); + if (select_send) { + send_set.insert(pnode->hSocket); + continue; + } + if (select_recv) { + recv_set.insert(pnode->hSocket); } } -- cgit v1.2.3 From f52b6b2d9f482353821da0ef4c485c402a396c8d Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Mon, 25 Oct 2021 11:03:58 +0200 Subject: net: split CConnman::SocketHandler() `CConnman::SocketHandler()` does 3 things: 1. Check sockets for readiness 2. Process ready listening sockets 3. Process ready connected sockets Split the processing (2. and 3.) into separate methods to make the code easier to grasp. Also, move the processing of listening sockets after the processing of connected sockets to make it obvious that there is no dependency and also explicitly release the snapshot before dealing with listening sockets - it is only necessary for the connected sockets part. --- src/net.cpp | 51 ++++++++++++++++++++++++++++++++++----------------- src/net.h | 23 +++++++++++++++++++++++ 2 files changed, 57 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index 610a795c64..652bdb36b6 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1502,28 +1502,33 @@ void CConnman::SocketEvents(const std::vector& nodes, void CConnman::SocketHandler() { - const NodesSnapshot snap{*this, /*shuffle=*/false}; + std::set recv_set; + std::set send_set; + std::set error_set; - std::set recv_set, send_set, error_set; - SocketEvents(snap.Nodes(), recv_set, send_set, error_set); + { + const NodesSnapshot snap{*this, /*shuffle=*/false}; - if (interruptNet) return; + // Check for the readiness of the already connected sockets and the + // listening sockets in one call ("readiness" as in poll(2) or + // select(2)). If none are ready, wait for a short while and return + // empty sets. + SocketEvents(snap.Nodes(), recv_set, send_set, error_set); - // - // Accept new connections - // - for (const ListenSocket& hListenSocket : vhListenSocket) - { - if (hListenSocket.socket != INVALID_SOCKET && recv_set.count(hListenSocket.socket) > 0) - { - AcceptConnection(hListenSocket); - } + // Service (send/receive) each of the already connected nodes. + SocketHandlerConnected(snap.Nodes(), recv_set, send_set, error_set); } - // - // Service each socket - // - for (CNode* pnode : snap.Nodes()) { + // Accept new connections from listening sockets. + SocketHandlerListening(recv_set); +} + +void CConnman::SocketHandlerConnected(const std::vector& nodes, + const std::set& recv_set, + const std::set& send_set, + const std::set& error_set) +{ + for (CNode* pnode : nodes) { if (interruptNet) return; @@ -1607,6 +1612,18 @@ void CConnman::SocketHandler() } } +void CConnman::SocketHandlerListening(const std::set& recv_set) +{ + for (const ListenSocket& listen_socket : vhListenSocket) { + if (interruptNet) { + return; + } + if (listen_socket.socket != INVALID_SOCKET && recv_set.count(listen_socket.socket) > 0) { + AcceptConnection(listen_socket); + } + } +} + void CConnman::ThreadSocketHandler() { SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET); diff --git a/src/net.h b/src/net.h index f03883ca54..e8de026573 100644 --- a/src/net.h +++ b/src/net.h @@ -1010,7 +1010,30 @@ private: std::set& send_set, std::set& error_set); + /** + * Check connected and listening sockets for IO readiness and process them accordingly. + */ void SocketHandler(); + + /** + * Do the read/write for connected sockets that are ready for IO. + * @param[in] nodes Nodes to process. The socket of each node is checked against + * `recv_set`, `send_set` and `error_set`. + * @param[in] recv_set Sockets that are ready for read. + * @param[in] send_set Sockets that are ready for send. + * @param[in] error_set Sockets that have an exceptional condition (error). + */ + void SocketHandlerConnected(const std::vector& nodes, + const std::set& recv_set, + const std::set& send_set, + const std::set& error_set); + + /** + * Accept incoming connections, one from each read-ready listening socket. + * @param[in] recv_set Sockets that are ready for read. + */ + void SocketHandlerListening(const std::set& recv_set); + void ThreadSocketHandler(); void ThreadDNSAddressSeed(); -- cgit v1.2.3