From bb4aab90fd046f2fff61e082a0c0d01c5ee31297 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Wed, 16 Aug 2023 13:31:50 -0400 Subject: net: move message conversion to wire bytes from PushMessage to SocketSendData This furthers transport abstraction by removing the assumption that a message can always immediately be converted to wire bytes. This assumption does not hold for the v2 transport proposed by BIP324, as no messages can be sent before the handshake completes. This is done by only keeping (complete) CSerializedNetMsg objects in vSendMsg, rather than the resulting bytes (for header and payload) that need to be sent. In SocketSendData, these objects are handed to the transport as permitted by it, and sending out the bytes the transport tells us to send. This also removes the nSendOffset member variable in CNode, as keeping track of how much has been sent is now a responsability of the transport. This is not a pure refactor, and has the following effects even for the current v1 transport: * Checksum calculation now happens in SocketSendData rather than PushMessage. For non-optimistic-send messages, that means this computation now happens in the network thread rather than the message handler thread (generally a good thing, as the message handler thread is more of a computational bottleneck). * Checksum calculation now happens while holding the cs_vSend lock. This is technically unnecessary for the v1 transport, as messages are encoded independent from one another, but is untenable for the v2 transport anyway. * Statistics updates about per-message sent bytes now happen when those bytes are actually handed to the OS, rather than at PushMessage time. --- src/net.cpp | 96 ++++++++++++++++++-------------------- src/net.h | 9 ++-- src/test/denialofservice_tests.cpp | 8 ++-- src/test/fuzz/process_messages.cpp | 1 + src/test/util/net.cpp | 14 ++++++ src/test/util/net.h | 1 + 6 files changed, 71 insertions(+), 58 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index 47b872a446..9fce585b81 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -915,35 +915,49 @@ std::pair CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; - - while (it != node.vSendMsg.end()) { - const auto& data = *it; - assert(data.size() > node.nSendOffset); + bool data_left{false}; //!< second return value (whether unsent data remains) + + while (true) { + if (it != node.vSendMsg.end()) { + // If possible, move one message from the send queue to the transport. This fails when + // there is an existing message still being sent. + size_t memusage = it->GetMemoryUsage(); + if (node.m_transport->SetMessageToSend(*it)) { + // Update memory usage of send buffer (as *it will be deleted). + node.m_send_memusage -= memusage; + ++it; + } + } + const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(); + data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent int nBytes = 0; - { + if (!data.empty()) { LOCK(node.m_sock_mutex); + // There is no socket in case we've already disconnected, or in test cases without + // real connections. In these cases, we bail out immediately and just leave things + // in the send queue and transport. if (!node.m_sock) { break; } int flags = MSG_NOSIGNAL | MSG_DONTWAIT; #ifdef MSG_MORE - if (it + 1 != node.vSendMsg.end()) { + // We have more to send if either the transport itself has more, or if we have more + // messages to send. + if (more || it != node.vSendMsg.end()) { flags |= MSG_MORE; } #endif - nBytes = node.m_sock->Send(reinterpret_cast(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags); + nBytes = node.m_sock->Send(reinterpret_cast(data.data()), data.size(), flags); } if (nBytes > 0) { node.m_last_send = GetTime(); node.nSendBytes += nBytes; - node.nSendOffset += nBytes; + // Notify transport that bytes have been processed. + node.m_transport->MarkBytesSent(nBytes); + // Update statistics per message type. + node.AccountForSentBytes(msg_type, nBytes); nSentSize += nBytes; - if (node.nSendOffset == data.size()) { - node.nSendOffset = 0; - // Update memory usage of send buffer (as *it will be deleted). - node.m_send_memusage -= sizeof(data) + memusage::DynamicUsage(data); - it++; - } else { + if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more break; } @@ -956,7 +970,6 @@ std::pair CConnman::SocketSendData(CNode& node) const node.CloseSocketDisconnect(); } } - // couldn't send anything at all break; } } @@ -964,11 +977,10 @@ std::pair CConnman::SocketSendData(CNode& node) const node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize; if (it == node.vSendMsg.end()) { - assert(node.nSendOffset == 0); assert(node.m_send_memusage == 0); } node.vSendMsg.erase(node.vSendMsg.begin(), it); - return {nSentSize, !node.vSendMsg.empty()}; + return {nSentSize, data_left}; } /** Try to find a connection to evict when the node is full. @@ -1307,7 +1319,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) for (CNode* pnode : nodes) { bool select_recv = !pnode->fPauseRecv; - bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty()); + bool select_send; + { + LOCK(pnode->cs_vSend); + // Sending is possible if either there are bytes to send right now, or if there will be + // once a potential message from vSendMsg is handed to the transport. + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + select_send = !to_send.empty() || !pnode->vSendMsg.empty(); + } if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); @@ -2988,42 +3007,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - const bool queue_was_empty{pnode->vSendMsg.empty()}; - - // Give the message to the transport, and add all bytes it wants us to send out as byte - // vectors to vSendMsg. This is temporary code that exists to support the new transport - // sending interface using the old way of queueing data. In a future commit vSendMsg will - // be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code - // will disappear. - bool queued = pnode->m_transport->SetMessageToSend(msg); - assert(queued); - // In the current transport (V1Transport), GetBytesToSend first returns a header to send, - // and then the payload data (if any), necessitating a loop. - while (true) { - const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend(); - if (bytes.empty()) break; - // Update statistics per message type. - pnode->AccountForSentBytes(msg_type, bytes.size()); - pnode->vSendMsg.push_back({bytes.begin(), bytes.end()}); - // Update memory usage of send buffer. For now, use static + dynamic memory usage of - // byte vectors in vSendMsg as send memory. In a future commit, vSendMsg will be - // replaced with a queue of CSerializedNetMsg objects, and we'll use their memory usage - // instead. - pnode->m_send_memusage += sizeof(pnode->vSendMsg.back()) + memusage::DynamicUsage(pnode->vSendMsg.back()); - // Notify transport that bytes have been processed (they're not actually sent yet, - // but pushed onto the vSendMsg queue of bytes to send). - pnode->m_transport->MarkBytesSent(bytes.size()); - } - // At this point, m_transport->GetSendMemoryUsage() isn't very interesting as the - // transport's message is fully flushed (and converted to byte arrays). It's still included - // here for correctness, and will become relevant in a future commit when a queued message - // inside the transport may survive PushMessage calls. + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()}; + + // Update memory usage of send buffer. + pnode->m_send_memusage += msg.GetMemoryUsage(); if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true; + // Move message to vSendMsg queue. + pnode->vSendMsg.push_back(std::move(msg)); - // If the write queue was empty before and isn't now, attempt "optimistic write": + // If there was nothing to send before, attempt "optimistic write": // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually // doing a send, try sending from the calling thread if the queue was empty before. - if (queue_was_empty && !pnode->vSendMsg.empty()) { + if (queue_was_empty) { std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode); } } diff --git a/src/net.h b/src/net.h index 299d8c8e9b..ac26b538d5 100644 --- a/src/net.h +++ b/src/net.h @@ -436,13 +436,12 @@ public: */ std::shared_ptr m_sock GUARDED_BY(m_sock_mutex); - /** Total memory usage of vSendMsg (counting the vectors and their dynamic usage, but not the - * deque overhead). */ + /** Sum of GetMemoryUsage of all vSendMsg entries. */ size_t m_send_memusage GUARDED_BY(cs_vSend){0}; - /** Offset inside the first vSendMsg already sent */ - size_t nSendOffset GUARDED_BY(cs_vSend){0}; + /** Total number of bytes sent on the wire to this peer. */ uint64_t nSendBytes GUARDED_BY(cs_vSend){0}; - std::deque> vSendMsg GUARDED_BY(cs_vSend); + /** Messages still to be fed to m_transport->SetMessageToSend. */ + std::deque vSendMsg GUARDED_BY(cs_vSend); Mutex cs_vSend; Mutex m_sock_mutex; Mutex cs_vRecv; diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 90e5bb34ed..7f5d587cf6 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -86,9 +86,10 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) { LOCK(dummyNode1.cs_vSend); - BOOST_CHECK(dummyNode1.vSendMsg.size() > 0); - dummyNode1.vSendMsg.clear(); + const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(); + BOOST_CHECK(!to_send.empty()); } + connman.FlushSendBuffer(dummyNode1); int64_t nStartTime = GetTime(); // Wait 21 minutes @@ -96,7 +97,8 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders { LOCK(dummyNode1.cs_vSend); - BOOST_CHECK(dummyNode1.vSendMsg.size() > 0); + const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(); + BOOST_CHECK(!to_send.empty()); } // Wait 3 more minutes SetMockTime(nStartTime+24*60); diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 98962fceb5..4cb388c20b 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -67,6 +67,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages) CNode& random_node = *PickValue(fuzzed_data_provider, peers); + connman.FlushSendBuffer(random_node); (void)connman.ReceiveMsgFrom(random_node, std::move(net_msg)); random_node.fPauseSend = false; diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index c071355bc0..8015db3e80 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -25,6 +25,7 @@ void ConnmanTestMsg::Handshake(CNode& node, const CNetMsgMaker mm{0}; peerman.InitializeNode(node, local_services); + FlushSendBuffer(node); // Drop the version message added by InitializeNode. CSerializedNetMsg msg_version{ mm.Make(NetMsgType::VERSION, @@ -45,6 +46,7 @@ void ConnmanTestMsg::Handshake(CNode& node, node.fPauseSend = false; connman.ProcessMessagesOnce(node); peerman.SendMessages(&node); + FlushSendBuffer(node); // Drop the verack message added by SendMessages. if (node.fDisconnect) return; assert(node.nVersion == version); assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION)); @@ -70,6 +72,18 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span msg_by } } +void ConnmanTestMsg::FlushSendBuffer(CNode& node) const +{ + LOCK(node.cs_vSend); + node.vSendMsg.clear(); + node.m_send_memusage = 0; + while (true) { + const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(); + if (to_send.empty()) break; + node.m_transport->MarkBytesSent(to_send.size()); + } +} + bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const { bool queued = node.m_transport->SetMessageToSend(ser_msg); diff --git a/src/test/util/net.h b/src/test/util/net.h index 687ce1e813..1684da777a 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -55,6 +55,7 @@ struct ConnmanTestMsg : public CConnman { void NodeReceiveMsgBytes(CNode& node, Span msg_bytes, bool& complete) const; bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const; + void FlushSendBuffer(CNode& node) const; }; constexpr ServiceFlags ALL_SERVICE_FLAGS[]{ -- cgit v1.2.3