aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfanquake <fanquake@gmail.com>2023-03-23 17:16:21 +0000
committerfanquake <fanquake@gmail.com>2023-03-23 17:31:52 +0000
commit23056436461a8b3af1a504b9638c48e8c8170652 (patch)
tree20cad9d86dcf829cf19d7a54d1a1175c66300ec4
parent381593c906fb54340dbc2377e2ebc6fb37582d08 (diff)
parent3566aa7d495bb783bbd135726238d9f2a9e9f80e (diff)
Merge bitcoin/bitcoin#27257: refactor, net: End friendship of CNode, CConnman and ConnmanTestMsg
3566aa7d495bb783bbd135726238d9f2a9e9f80e [net] Remove CNode friends (dergoegge) 3eac5e7cd1eda6ababb9af9cd72dae08d395993d [net] Add CNode helper for send byte accounting (dergoegge) 60441a3432df10f5d7c15c09c9569f27a793625b scripted-diff: [net] Rename CNode process queue members (dergoegge) 6693c499f7d48aaa1f527a1a860b943fc8442241 [net] Make cs_vProcessMsg a non-recursive mutex (dergoegge) 23d93526545c271b0eed2bd468681864c4213cce [net] Make CNode msg process queue members private (dergoegge) 897e342d6ec320c286753daef01d6eb9839e2c4d [net] Encapsulate CNode message polling (dergoegge) cc5cdf877666c232a94f03faaf430cbeb6968372 [net] Deduplicate marking received message for processing (dergoegge) ad44aa5c64d4ee5f31c867fda26350ab560575b7 [net] Add connection type getter to CNode (dergoegge) Pull request description: We should define clear interfaces between CNode, CConnman and PeerManager. This PR makes a small step in that direction by ending the friendship of CNode, CConnman and ConnmanTestMsg. CNode's message processing queue is made private in the process and its mutex is turned into a non-recursive mutex. ACKs for top commit: jnewbery: utACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e vasild: ACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e theStack: re-ACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e brunoerg: re-ACK 3566aa7d495bb783bbd135726238d9f2a9e9f80e Tree-SHA512: 26b87da5054e32401b693b2904e9c5f40e35a53937c0b6cf44b8597034ad07bacf27d87cdffc54d3e7ccfebde4231ef30a38d326f88cc18133bbb34688ead567
-rw-r--r--src/net.cpp52
-rw-r--r--src/net.h35
-rw-r--r--src/net_processing.cpp21
-rw-r--r--src/test/util/net.cpp13
4 files changed, 73 insertions, 48 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 9b803180f9..59a84f2fdf 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -917,7 +917,7 @@ bool CConnman::AttemptToEvictConnection()
.m_is_local = node->addr.IsLocal(),
.m_network = node->ConnectedThroughNetwork(),
.m_noban = node->HasPermission(NetPermissionFlags::NoBan),
- .m_conn_type = node->m_conn_type,
+ .m_conn_type = node->GetConnectionType(),
};
vEvictionCandidates.push_back(candidate);
}
@@ -1092,7 +1092,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
// Count existing connections
int existing_connections = WITH_LOCK(m_nodes_mutex,
- return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
+ return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; }););
// Max connections of specified type already exist
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
@@ -1328,18 +1328,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
}
RecordBytesRecv(nBytes);
if (notify) {
- size_t nSizeAdded = 0;
- for (const auto& msg : pnode->vRecvMsg) {
- // vRecvMsg contains only completed CNetMessage
- // the single possible partially deserialized message are held by TransportDeserializer
- nSizeAdded += msg.m_raw_message_size;
- }
- {
- LOCK(pnode->cs_vProcessMsg);
- pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg);
- pnode->nProcessQueueSize += nSizeAdded;
- pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
- }
+ pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
WakeMessageHandler();
}
}
@@ -1722,7 +1711,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++;
// Make sure our persistent outbound slots belong to different netgroups.
- switch (pnode->m_conn_type) {
+ switch (pnode->GetConnectionType()) {
// We currently don't take inbound connections into account. Since they are
// free to make, an attacker could make them to prevent us from connecting to
// certain peers.
@@ -2806,6 +2795,37 @@ CNode::CNode(NodeId idIn,
}
}
+void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
+{
+ AssertLockNotHeld(m_msg_process_queue_mutex);
+
+ size_t nSizeAdded = 0;
+ for (const auto& msg : vRecvMsg) {
+ // vRecvMsg contains only completed CNetMessage
+ // the single possible partially deserialized message are held by TransportDeserializer
+ nSizeAdded += msg.m_raw_message_size;
+ }
+
+ LOCK(m_msg_process_queue_mutex);
+ m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
+ m_msg_process_queue_size += nSizeAdded;
+ fPauseRecv = m_msg_process_queue_size > recv_flood_size;
+}
+
+std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size)
+{
+ LOCK(m_msg_process_queue_mutex);
+ if (m_msg_process_queue.empty()) return std::nullopt;
+
+ std::list<CNetMessage> msgs;
+ // Just take one message
+ msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
+ m_msg_process_queue_size -= msgs.front().m_raw_message_size;
+ fPauseRecv = m_msg_process_queue_size > recv_flood_size;
+
+ return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
+}
+
bool CConnman::NodeFullyConnected(const CNode* pnode)
{
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
@@ -2840,7 +2860,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
bool optimisticSend(pnode->vSendMsg.empty());
//log total amount of bytes per message type
- pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
+ pnode->AccountForSentBytes(msg.m_type, nTotalSize);
pnode->nSendSize += nTotalSize;
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
diff --git a/src/net.h b/src/net.h
index 2025dfdb05..7bb164003e 100644
--- a/src/net.h
+++ b/src/net.h
@@ -347,9 +347,6 @@ struct CNodeOptions
/** Information about a peer */
class CNode
{
- friend class CConnman;
- friend struct ConnmanTestMsg;
-
public:
const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread
const std::unique_ptr<const TransportSerializer> m_serializer;
@@ -376,10 +373,6 @@ public:
Mutex m_sock_mutex;
Mutex cs_vRecv;
- RecursiveMutex cs_vProcessMsg;
- std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
- size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
-
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
std::atomic<std::chrono::seconds> m_last_send{0s};
@@ -417,6 +410,30 @@ public:
std::atomic_bool fPauseRecv{false};
std::atomic_bool fPauseSend{false};
+ const ConnectionType& GetConnectionType() const
+ {
+ return m_conn_type;
+ }
+
+ /** Move all messages from the received queue to the processing queue. */
+ void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
+ EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
+
+ /** Poll the next message from the processing queue of this connection.
+ *
+ * Returns std::nullopt if the processing queue is empty, or a pair
+ * consisting of the message and a bool that indicates if the processing
+ * queue has more entries. */
+ std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size)
+ EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
+
+ /** Account for the total size of a sent message in the per msg type connection stats. */
+ void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes)
+ EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
+ {
+ mapSendBytesPerMsgType[msg_type] += sent_bytes;
+ }
+
bool IsOutboundOrBlockRelayConn() const {
switch (m_conn_type) {
case ConnectionType::OUTBOUND_FULL_RELAY:
@@ -602,6 +619,10 @@ private:
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
+ Mutex m_msg_process_queue_mutex;
+ std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
+ size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
+
// Our address, as reported by the peer
CService addrLocal GUARDED_BY(m_addr_local_mutex);
mutable Mutex m_addr_local_mutex;
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 5dd00d58a0..3df5374e21 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -4860,8 +4860,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
{
AssertLockHeld(g_msgproc_mutex);
- bool fMoreWork = false;
-
PeerRef peer = GetPeerRef(pfrom->GetId());
if (peer == nullptr) return false;
@@ -4889,17 +4887,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend) return false;
- std::list<CNetMessage> msgs;
- {
- LOCK(pfrom->cs_vProcessMsg);
- if (pfrom->vProcessMsg.empty()) return false;
- // Just take one message
- msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
- pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
- pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize();
- fMoreWork = !pfrom->vProcessMsg.empty();
- }
- CNetMessage& msg(msgs.front());
+ auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())};
+ if (!poll_result) {
+ // No message to process
+ return false;
+ }
+
+ CNetMessage& msg{poll_result->first};
+ bool fMoreWork = poll_result->second;
TRACE6(net, inbound_message,
pfrom->GetId(),
diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp
index ac5dfe9e73..070a6a1c50 100644
--- a/src/test/util/net.cpp
+++ b/src/test/util/net.cpp
@@ -66,18 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
{
assert(node.ReceiveMsgBytes(msg_bytes, complete));
if (complete) {
- size_t nSizeAdded = 0;
- for (const auto& msg : node.vRecvMsg) {
- // vRecvMsg contains only completed CNetMessage
- // the single possible partially deserialized message are held by TransportDeserializer
- nSizeAdded += msg.m_raw_message_size;
- }
- {
- LOCK(node.cs_vProcessMsg);
- node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg);
- node.nProcessQueueSize += nSizeAdded;
- node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize;
- }
+ node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
}
}