From cc5cdf877666c232a94f03faaf430cbeb6968372 Mon Sep 17 00:00:00 2001 From: dergoegge Date: Tue, 14 Mar 2023 17:38:46 +0100 Subject: [net] Deduplicate marking received message for processing --- src/net.cpp | 30 ++++++++++++++++++------------ src/net.h | 4 ++++ src/test/util/net.cpp | 13 +------------ 3 files changed, 23 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index 986c9b119b..0a9600f9ac 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1328,18 +1328,7 @@ void CConnman::SocketHandlerConnected(const std::vector& nodes, } RecordBytesRecv(nBytes); if (notify) { - size_t nSizeAdded = 0; - for (const auto& msg : pnode->vRecvMsg) { - // vRecvMsg contains only completed CNetMessage - // the single possible partially deserialized message are held by TransportDeserializer - nSizeAdded += msg.m_raw_message_size; - } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; - } + pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize); WakeMessageHandler(); } } @@ -2806,6 +2795,23 @@ CNode::CNode(NodeId idIn, } } +void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) +{ + AssertLockNotHeld(cs_vProcessMsg); + + size_t nSizeAdded = 0; + for (const auto& msg : vRecvMsg) { + // vRecvMsg contains only completed CNetMessage + // the single possible partially deserialized message are held by TransportDeserializer + nSizeAdded += msg.m_raw_message_size; + } + + LOCK(cs_vProcessMsg); + vProcessMsg.splice(vProcessMsg.end(), vRecvMsg); + nProcessQueueSize += nSizeAdded; + fPauseRecv = nProcessQueueSize > recv_flood_size; +} + bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; diff --git a/src/net.h b/src/net.h index 7cc2339247..8af100a26f 100644 --- a/src/net.h +++ b/src/net.h @@ -422,6 +422,10 @@ public: return m_conn_type; } + /** Move all messages from the received queue to the processing queue. */ + void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) + EXCLUSIVE_LOCKS_REQUIRED(!cs_vProcessMsg); + bool IsOutboundOrBlockRelayConn() const { switch (m_conn_type) { case ConnectionType::OUTBOUND_FULL_RELAY: diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index ac5dfe9e73..070a6a1c50 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -66,18 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span msg_by { assert(node.ReceiveMsgBytes(msg_bytes, complete)); if (complete) { - size_t nSizeAdded = 0; - for (const auto& msg : node.vRecvMsg) { - // vRecvMsg contains only completed CNetMessage - // the single possible partially deserialized message are held by TransportDeserializer - nSizeAdded += msg.m_raw_message_size; - } - { - LOCK(node.cs_vProcessMsg); - node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg); - node.nProcessQueueSize += nSizeAdded; - node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize; - } + node.MarkReceivedMsgsForProcessing(nReceiveFloodSize); } } -- cgit v1.2.3