diff options
author | fanquake <fanquake@gmail.com> | 2023-03-23 17:16:21 +0000 |
---|---|---|
committer | fanquake <fanquake@gmail.com> | 2023-03-23 17:31:52 +0000 |
commit | 23056436461a8b3af1a504b9638c48e8c8170652 (patch) | |
tree | 20cad9d86dcf829cf19d7a54d1a1175c66300ec4 | |
parent | 381593c906fb54340dbc2377e2ebc6fb37582d08 (diff) | |
parent | 3566aa7d495bb783bbd135726238d9f2a9e9f80e (diff) |
Merge bitcoin/bitcoin#27257: refactor, net: End friendship of CNode, CConnman and ConnmanTestMsg
3566aa7d495bb783bbd135726238d9f2a9e9f80e [net] Remove CNode friends (dergoegge)
3eac5e7cd1eda6ababb9af9cd72dae08d395993d [net] Add CNode helper for send byte accounting (dergoegge)
60441a3432df10f5d7c15c09c9569f27a793625b scripted-diff: [net] Rename CNode process queue members (dergoegge)
6693c499f7d48aaa1f527a1a860b943fc8442241 [net] Make cs_vProcessMsg a non-recursive mutex (dergoegge)
23d93526545c271b0eed2bd468681864c4213cce [net] Make CNode msg process queue members private (dergoegge)
897e342d6ec320c286753daef01d6eb9839e2c4d [net] Encapsulate CNode message polling (dergoegge)
cc5cdf877666c232a94f03faaf430cbeb6968372 [net] Deduplicate marking received message for processing (dergoegge)
ad44aa5c64d4ee5f31c867fda26350ab560575b7 [net] Add connection type getter to CNode (dergoegge)
Pull request description:
We should define clear interfaces between CNode, CConnman and PeerManager. This PR makes a small step in that direction by ending the friendship of CNode, CConnman and ConnmanTestMsg. CNode's message processing queue is made private in the process and its mutex is turned into a non-recursive mutex.
ACKs for top commit:
jnewbery:
utACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e
vasild:
ACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e
theStack:
re-ACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e
brunoerg:
re-ACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e
Tree-SHA512: 26b87da5054e32401b693b2904e9c5f40e35a53937c0b6cf44b8597034ad07bacf27d87cdffc54d3e7ccfebde4231ef30a38d326f88cc18133bbb34688ead567
-rw-r--r-- | src/net.cpp | 52 | ||||
-rw-r--r-- | src/net.h | 35 | ||||
-rw-r--r-- | src/net_processing.cpp | 21 | ||||
-rw-r--r-- | src/test/util/net.cpp | 13 |
4 files changed, 73 insertions, 48 deletions
diff --git a/src/net.cpp b/src/net.cpp index 9b803180f9..59a84f2fdf 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -917,7 +917,7 @@ bool CConnman::AttemptToEvictConnection() .m_is_local = node->addr.IsLocal(), .m_network = node->ConnectedThroughNetwork(), .m_noban = node->HasPermission(NetPermissionFlags::NoBan), - .m_conn_type = node->m_conn_type, + .m_conn_type = node->GetConnectionType(), }; vEvictionCandidates.push_back(candidate); } @@ -1092,7 +1092,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ // Count existing connections int existing_connections = WITH_LOCK(m_nodes_mutex, - return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; });); + return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; });); // Max connections of specified type already exist if (max_connections != std::nullopt && existing_connections >= max_connections) return false; @@ -1328,18 +1328,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes, } RecordBytesRecv(nBytes); if (notify) { - size_t nSizeAdded = 0; - for (const auto& msg : pnode->vRecvMsg) { - // vRecvMsg contains only completed CNetMessage - // the single possible partially deserialized message are held by TransportDeserializer - nSizeAdded += msg.m_raw_message_size; - } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; - } + pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize); WakeMessageHandler(); } } @@ -1722,7 +1711,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++; // Make sure our persistent outbound slots belong to different netgroups. - switch (pnode->m_conn_type) { + switch (pnode->GetConnectionType()) { // We currently don't take inbound connections into account. Since they are // free to make, an attacker could make them to prevent us from connecting to // certain peers. @@ -2806,6 +2795,37 @@ CNode::CNode(NodeId idIn, } } +void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) +{ + AssertLockNotHeld(m_msg_process_queue_mutex); + + size_t nSizeAdded = 0; + for (const auto& msg : vRecvMsg) { + // vRecvMsg contains only completed CNetMessage + // the single possible partially deserialized message are held by TransportDeserializer + nSizeAdded += msg.m_raw_message_size; + } + + LOCK(m_msg_process_queue_mutex); + m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg); + m_msg_process_queue_size += nSizeAdded; + fPauseRecv = m_msg_process_queue_size > recv_flood_size; +} + +std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size) +{ + LOCK(m_msg_process_queue_mutex); + if (m_msg_process_queue.empty()) return std::nullopt; + + std::list<CNetMessage> msgs; + // Just take one message + msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin()); + m_msg_process_queue_size -= msgs.front().m_raw_message_size; + fPauseRecv = m_msg_process_queue_size > recv_flood_size; + + return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty()); +} + bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; @@ -2840,7 +2860,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) bool optimisticSend(pnode->vSendMsg.empty()); //log total amount of bytes per message type - pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize; + pnode->AccountForSentBytes(msg.m_type, nTotalSize); pnode->nSendSize += nTotalSize; if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true; @@ -347,9 +347,6 @@ struct CNodeOptions /** Information about a peer */ class CNode { - friend class CConnman; - friend struct ConnmanTestMsg; - public: const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread const std::unique_ptr<const TransportSerializer> m_serializer; @@ -376,10 +373,6 @@ public: Mutex m_sock_mutex; Mutex cs_vRecv; - RecursiveMutex cs_vProcessMsg; - std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg); - size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0}; - uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0}; std::atomic<std::chrono::seconds> m_last_send{0s}; @@ -417,6 +410,30 @@ public: std::atomic_bool fPauseRecv{false}; std::atomic_bool fPauseSend{false}; + const ConnectionType& GetConnectionType() const + { + return m_conn_type; + } + + /** Move all messages from the received queue to the processing queue. */ + void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) + EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); + + /** Poll the next message from the processing queue of this connection. + * + * Returns std::nullopt if the processing queue is empty, or a pair + * consisting of the message and a bool that indicates if the processing + * queue has more entries. */ + std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size) + EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); + + /** Account for the total size of a sent message in the per msg type connection stats. */ + void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes) + EXCLUSIVE_LOCKS_REQUIRED(cs_vSend) + { + mapSendBytesPerMsgType[msg_type] += sent_bytes; + } + bool IsOutboundOrBlockRelayConn() const { switch (m_conn_type) { case ConnectionType::OUTBOUND_FULL_RELAY: @@ -602,6 +619,10 @@ private: std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread + Mutex m_msg_process_queue_mutex; + std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex); + size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0}; + // Our address, as reported by the peer CService addrLocal GUARDED_BY(m_addr_local_mutex); mutable Mutex m_addr_local_mutex; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 5dd00d58a0..3df5374e21 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4860,8 +4860,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt { AssertLockHeld(g_msgproc_mutex); - bool fMoreWork = false; - PeerRef peer = GetPeerRef(pfrom->GetId()); if (peer == nullptr) return false; @@ -4889,17 +4887,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) return false; - std::list<CNetMessage> msgs; - { - LOCK(pfrom->cs_vProcessMsg); - if (pfrom->vProcessMsg.empty()) return false; - // Just take one message - msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); - pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size; - pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize(); - fMoreWork = !pfrom->vProcessMsg.empty(); - } - CNetMessage& msg(msgs.front()); + auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())}; + if (!poll_result) { + // No message to process + return false; + } + + CNetMessage& msg{poll_result->first}; + bool fMoreWork = poll_result->second; TRACE6(net, inbound_message, pfrom->GetId(), diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index ac5dfe9e73..070a6a1c50 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -66,18 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by { assert(node.ReceiveMsgBytes(msg_bytes, complete)); if (complete) { - size_t nSizeAdded = 0; - for (const auto& msg : node.vRecvMsg) { - // vRecvMsg contains only completed CNetMessage - // the single possible partially deserialized message are held by TransportDeserializer - nSizeAdded += msg.m_raw_message_size; - } - { - LOCK(node.cs_vProcessMsg); - node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg); - node.nProcessQueueSize += nSizeAdded; - node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize; - } + node.MarkReceivedMsgsForProcessing(nReceiveFloodSize); } } |