diff options
author | dergoegge <n.goeggi@gmail.com> | 2023-03-14 17:58:59 +0100 |
---|---|---|
committer | dergoegge <n.goeggi@gmail.com> | 2023-03-22 13:18:30 +0100 |
commit | 897e342d6ec320c286753daef01d6eb9839e2c4d (patch) | |
tree | 664de554350d1de5dae3354fc2ca512c7c6ef281 | |
parent | cc5cdf877666c232a94f03faaf430cbeb6968372 (diff) |
[net] Encapsulate CNode message polling
-rw-r--r-- | src/net.cpp | 14 | ||||
-rw-r--r-- | src/net.h | 8 | ||||
-rw-r--r-- | src/net_processing.cpp | 21 |
3 files changed, 30 insertions, 13 deletions
diff --git a/src/net.cpp b/src/net.cpp index 0a9600f9ac..80cccb1c74 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2812,6 +2812,20 @@ void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) fPauseRecv = nProcessQueueSize > recv_flood_size; } +std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size) +{ + LOCK(cs_vProcessMsg); + if (vProcessMsg.empty()) return std::nullopt; + + std::list<CNetMessage> msgs; + // Just take one message + msgs.splice(msgs.begin(), vProcessMsg, vProcessMsg.begin()); + nProcessQueueSize -= msgs.front().m_raw_message_size; + fPauseRecv = nProcessQueueSize > recv_flood_size; + + return std::make_pair(std::move(msgs.front()), !vProcessMsg.empty()); +} + bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; @@ -426,6 +426,14 @@ public: void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) EXCLUSIVE_LOCKS_REQUIRED(!cs_vProcessMsg); + /** Poll the next message from the processing queue of this connection. + * + * Returns std::nullopt if the processing queue is empty, or a pair + * consisting of the message and a bool that indicates if the processing + * queue has more entries. */ + std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size) + EXCLUSIVE_LOCKS_REQUIRED(!cs_vProcessMsg); + bool IsOutboundOrBlockRelayConn() const { switch (m_conn_type) { case ConnectionType::OUTBOUND_FULL_RELAY: diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 25c65c7090..1bff41514e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4854,8 +4854,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt { AssertLockHeld(g_msgproc_mutex); - bool fMoreWork = false; - PeerRef peer = GetPeerRef(pfrom->GetId()); if (peer == nullptr) return false; @@ -4883,17 +4881,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) return false; - std::list<CNetMessage> msgs; - { - LOCK(pfrom->cs_vProcessMsg); - if (pfrom->vProcessMsg.empty()) return false; - // Just take one message - msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); - pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size; - pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize(); - fMoreWork = !pfrom->vProcessMsg.empty(); - } - CNetMessage& msg(msgs.front()); + auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())}; + if (!poll_result) { + // No message to process + return false; + } + + CNetMessage& msg{poll_result->first}; + bool fMoreWork = poll_result->second; TRACE6(net, inbound_message, pfrom->GetId(), |