aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordergoegge <n.goeggi@gmail.com>2023-03-14 17:38:46 +0100
committerdergoegge <n.goeggi@gmail.com>2023-03-19 14:34:37 +0100
commitcc5cdf877666c232a94f03faaf430cbeb6968372 (patch)
treefec128374168a131e9d4dcb5f29c5965e04e7e33 /src
parentad44aa5c64d4ee5f31c867fda26350ab560575b7 (diff)
[net] Deduplicate marking received message for processing
Diffstat (limited to 'src')
-rw-r--r--src/net.cpp30
-rw-r--r--src/net.h4
-rw-r--r--src/test/util/net.cpp13
3 files changed, 23 insertions, 24 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 986c9b119b..0a9600f9ac 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1328,18 +1328,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
}
RecordBytesRecv(nBytes);
if (notify) {
- size_t nSizeAdded = 0;
- for (const auto& msg : pnode->vRecvMsg) {
- // vRecvMsg contains only completed CNetMessage
- // the single possible partially deserialized message are held by TransportDeserializer
- nSizeAdded += msg.m_raw_message_size;
- }
- {
- LOCK(pnode->cs_vProcessMsg);
- pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg);
- pnode->nProcessQueueSize += nSizeAdded;
- pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
- }
+ pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
WakeMessageHandler();
}
}
@@ -2806,6 +2795,23 @@ CNode::CNode(NodeId idIn,
}
}
+void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
+{
+ AssertLockNotHeld(cs_vProcessMsg);
+
+ size_t nSizeAdded = 0;
+ for (const auto& msg : vRecvMsg) {
+ // vRecvMsg contains only completed CNetMessage
+ // the single possible partially deserialized message are held by TransportDeserializer
+ nSizeAdded += msg.m_raw_message_size;
+ }
+
+ LOCK(cs_vProcessMsg);
+ vProcessMsg.splice(vProcessMsg.end(), vRecvMsg);
+ nProcessQueueSize += nSizeAdded;
+ fPauseRecv = nProcessQueueSize > recv_flood_size;
+}
+
bool CConnman::NodeFullyConnected(const CNode* pnode)
{
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
diff --git a/src/net.h b/src/net.h
index 7cc2339247..8af100a26f 100644
--- a/src/net.h
+++ b/src/net.h
@@ -422,6 +422,10 @@ public:
return m_conn_type;
}
+ /** Move all messages from the received queue to the processing queue. */
+ void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
+ EXCLUSIVE_LOCKS_REQUIRED(!cs_vProcessMsg);
+
bool IsOutboundOrBlockRelayConn() const {
switch (m_conn_type) {
case ConnectionType::OUTBOUND_FULL_RELAY:
diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp
index ac5dfe9e73..070a6a1c50 100644
--- a/src/test/util/net.cpp
+++ b/src/test/util/net.cpp
@@ -66,18 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
{
assert(node.ReceiveMsgBytes(msg_bytes, complete));
if (complete) {
- size_t nSizeAdded = 0;
- for (const auto& msg : node.vRecvMsg) {
- // vRecvMsg contains only completed CNetMessage
- // the single possible partially deserialized message are held by TransportDeserializer
- nSizeAdded += msg.m_raw_message_size;
- }
- {
- LOCK(node.cs_vProcessMsg);
- node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg);
- node.nProcessQueueSize += nSizeAdded;
- node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize;
- }
+ node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
}
}