diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/net.cpp | 96 | ||||
-rw-r--r-- | src/net.h | 9 | ||||
-rw-r--r-- | src/test/denialofservice_tests.cpp | 8 | ||||
-rw-r--r-- | src/test/fuzz/process_messages.cpp | 1 | ||||
-rw-r--r-- | src/test/util/net.cpp | 14 | ||||
-rw-r--r-- | src/test/util/net.h | 1 |
6 files changed, 71 insertions, 58 deletions
diff --git a/src/net.cpp b/src/net.cpp index 47b872a446..9fce585b81 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -915,35 +915,49 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; - - while (it != node.vSendMsg.end()) { - const auto& data = *it; - assert(data.size() > node.nSendOffset); + bool data_left{false}; //!< second return value (whether unsent data remains) + + while (true) { + if (it != node.vSendMsg.end()) { + // If possible, move one message from the send queue to the transport. This fails when + // there is an existing message still being sent. + size_t memusage = it->GetMemoryUsage(); + if (node.m_transport->SetMessageToSend(*it)) { + // Update memory usage of send buffer (as *it will be deleted). + node.m_send_memusage -= memusage; + ++it; + } + } + const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(); + data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent int nBytes = 0; - { + if (!data.empty()) { LOCK(node.m_sock_mutex); + // There is no socket in case we've already disconnected, or in test cases without + // real connections. In these cases, we bail out immediately and just leave things + // in the send queue and transport. if (!node.m_sock) { break; } int flags = MSG_NOSIGNAL | MSG_DONTWAIT; #ifdef MSG_MORE - if (it + 1 != node.vSendMsg.end()) { + // We have more to send if either the transport itself has more, or if we have more + // messages to send. + if (more || it != node.vSendMsg.end()) { flags |= MSG_MORE; } #endif - nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags); + nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags); } if (nBytes > 0) { node.m_last_send = GetTime<std::chrono::seconds>(); node.nSendBytes += nBytes; - node.nSendOffset += nBytes; + // Notify transport that bytes have been processed. + node.m_transport->MarkBytesSent(nBytes); + // Update statistics per message type. + node.AccountForSentBytes(msg_type, nBytes); nSentSize += nBytes; - if (node.nSendOffset == data.size()) { - node.nSendOffset = 0; - // Update memory usage of send buffer (as *it will be deleted). - node.m_send_memusage -= sizeof(data) + memusage::DynamicUsage(data); - it++; - } else { + if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more break; } @@ -956,7 +970,6 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const node.CloseSocketDisconnect(); } } - // couldn't send anything at all break; } } @@ -964,11 +977,10 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize; if (it == node.vSendMsg.end()) { - assert(node.nSendOffset == 0); assert(node.m_send_memusage == 0); } node.vSendMsg.erase(node.vSendMsg.begin(), it); - return {nSentSize, !node.vSendMsg.empty()}; + return {nSentSize, data_left}; } /** Try to find a connection to evict when the node is full. @@ -1307,7 +1319,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes) for (CNode* pnode : nodes) { bool select_recv = !pnode->fPauseRecv; - bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty()); + bool select_send; + { + LOCK(pnode->cs_vSend); + // Sending is possible if either there are bytes to send right now, or if there will be + // once a potential message from vSendMsg is handed to the transport. + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + select_send = !to_send.empty() || !pnode->vSendMsg.empty(); + } if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); @@ -2988,42 +3007,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - const bool queue_was_empty{pnode->vSendMsg.empty()}; - - // Give the message to the transport, and add all bytes it wants us to send out as byte - // vectors to vSendMsg. This is temporary code that exists to support the new transport - // sending interface using the old way of queueing data. In a future commit vSendMsg will - // be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code - // will disappear. - bool queued = pnode->m_transport->SetMessageToSend(msg); - assert(queued); - // In the current transport (V1Transport), GetBytesToSend first returns a header to send, - // and then the payload data (if any), necessitating a loop. - while (true) { - const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend(); - if (bytes.empty()) break; - // Update statistics per message type. - pnode->AccountForSentBytes(msg_type, bytes.size()); - pnode->vSendMsg.push_back({bytes.begin(), bytes.end()}); - // Update memory usage of send buffer. For now, use static + dynamic memory usage of - // byte vectors in vSendMsg as send memory. In a future commit, vSendMsg will be - // replaced with a queue of CSerializedNetMsg objects, and we'll use their memory usage - // instead. - pnode->m_send_memusage += sizeof(pnode->vSendMsg.back()) + memusage::DynamicUsage(pnode->vSendMsg.back()); - // Notify transport that bytes have been processed (they're not actually sent yet, - // but pushed onto the vSendMsg queue of bytes to send). - pnode->m_transport->MarkBytesSent(bytes.size()); - } - // At this point, m_transport->GetSendMemoryUsage() isn't very interesting as the - // transport's message is fully flushed (and converted to byte arrays). It's still included - // here for correctness, and will become relevant in a future commit when a queued message - // inside the transport may survive PushMessage calls. + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()}; + + // Update memory usage of send buffer. + pnode->m_send_memusage += msg.GetMemoryUsage(); if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true; + // Move message to vSendMsg queue. + pnode->vSendMsg.push_back(std::move(msg)); - // If the write queue was empty before and isn't now, attempt "optimistic write": + // If there was nothing to send before, attempt "optimistic write": // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually // doing a send, try sending from the calling thread if the queue was empty before. - if (queue_was_empty && !pnode->vSendMsg.empty()) { + if (queue_was_empty) { std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode); } } @@ -436,13 +436,12 @@ public: */ std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex); - /** Total memory usage of vSendMsg (counting the vectors and their dynamic usage, but not the - * deque overhead). */ + /** Sum of GetMemoryUsage of all vSendMsg entries. */ size_t m_send_memusage GUARDED_BY(cs_vSend){0}; - /** Offset inside the first vSendMsg already sent */ - size_t nSendOffset GUARDED_BY(cs_vSend){0}; + /** Total number of bytes sent on the wire to this peer. */ uint64_t nSendBytes GUARDED_BY(cs_vSend){0}; - std::deque<std::vector<unsigned char>> vSendMsg GUARDED_BY(cs_vSend); + /** Messages still to be fed to m_transport->SetMessageToSend. */ + std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend); Mutex cs_vSend; Mutex m_sock_mutex; Mutex cs_vRecv; diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 90e5bb34ed..7f5d587cf6 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -86,9 +86,10 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) { LOCK(dummyNode1.cs_vSend); - BOOST_CHECK(dummyNode1.vSendMsg.size() > 0); - dummyNode1.vSendMsg.clear(); + const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(); + BOOST_CHECK(!to_send.empty()); } + connman.FlushSendBuffer(dummyNode1); int64_t nStartTime = GetTime(); // Wait 21 minutes @@ -96,7 +97,8 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders { LOCK(dummyNode1.cs_vSend); - BOOST_CHECK(dummyNode1.vSendMsg.size() > 0); + const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(); + BOOST_CHECK(!to_send.empty()); } // Wait 3 more minutes SetMockTime(nStartTime+24*60); diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 98962fceb5..4cb388c20b 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -67,6 +67,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages) CNode& random_node = *PickValue(fuzzed_data_provider, peers); + connman.FlushSendBuffer(random_node); (void)connman.ReceiveMsgFrom(random_node, std::move(net_msg)); random_node.fPauseSend = false; diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index c071355bc0..8015db3e80 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -25,6 +25,7 @@ void ConnmanTestMsg::Handshake(CNode& node, const CNetMsgMaker mm{0}; peerman.InitializeNode(node, local_services); + FlushSendBuffer(node); // Drop the version message added by InitializeNode. CSerializedNetMsg msg_version{ mm.Make(NetMsgType::VERSION, @@ -45,6 +46,7 @@ void ConnmanTestMsg::Handshake(CNode& node, node.fPauseSend = false; connman.ProcessMessagesOnce(node); peerman.SendMessages(&node); + FlushSendBuffer(node); // Drop the verack message added by SendMessages. if (node.fDisconnect) return; assert(node.nVersion == version); assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION)); @@ -70,6 +72,18 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by } } +void ConnmanTestMsg::FlushSendBuffer(CNode& node) const +{ + LOCK(node.cs_vSend); + node.vSendMsg.clear(); + node.m_send_memusage = 0; + while (true) { + const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(); + if (to_send.empty()) break; + node.m_transport->MarkBytesSent(to_send.size()); + } +} + bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const { bool queued = node.m_transport->SetMessageToSend(ser_msg); diff --git a/src/test/util/net.h b/src/test/util/net.h index 687ce1e813..1684da777a 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -55,6 +55,7 @@ struct ConnmanTestMsg : public CConnman { void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const; bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const; + void FlushSendBuffer(CNode& node) const; }; constexpr ServiceFlags ALL_SERVICE_FLAGS[]{ |