aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPieter Wuille <pieter@wuille.net>2023-08-16 13:31:50 -0400
committerPieter Wuille <pieter@wuille.net>2023-08-23 20:13:49 -0400
commitbb4aab90fd046f2fff61e082a0c0d01c5ee31297 (patch)
tree007eae5aefb06f54d3f734000ed9394525007522 /src
parenta1a1060fd608a11dc525f76f2f54ab5b177dbd05 (diff)
downloadbitcoin-bb4aab90fd046f2fff61e082a0c0d01c5ee31297.tar.xz
net: move message conversion to wire bytes from PushMessage to SocketSendData
This furthers transport abstraction by removing the assumption that a message can always immediately be converted to wire bytes. This assumption does not hold for the v2 transport proposed by BIP324, as no messages can be sent before the handshake completes. This is done by only keeping (complete) CSerializedNetMsg objects in vSendMsg, rather than the resulting bytes (for header and payload) that need to be sent. In SocketSendData, these objects are handed to the transport as permitted by it, and sending out the bytes the transport tells us to send. This also removes the nSendOffset member variable in CNode, as keeping track of how much has been sent is now a responsability of the transport. This is not a pure refactor, and has the following effects even for the current v1 transport: * Checksum calculation now happens in SocketSendData rather than PushMessage. For non-optimistic-send messages, that means this computation now happens in the network thread rather than the message handler thread (generally a good thing, as the message handler thread is more of a computational bottleneck). * Checksum calculation now happens while holding the cs_vSend lock. This is technically unnecessary for the v1 transport, as messages are encoded independent from one another, but is untenable for the v2 transport anyway. * Statistics updates about per-message sent bytes now happen when those bytes are actually handed to the OS, rather than at PushMessage time.
Diffstat (limited to 'src')
-rw-r--r--src/net.cpp96
-rw-r--r--src/net.h9
-rw-r--r--src/test/denialofservice_tests.cpp8
-rw-r--r--src/test/fuzz/process_messages.cpp1
-rw-r--r--src/test/util/net.cpp14
-rw-r--r--src/test/util/net.h1
6 files changed, 71 insertions, 58 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 47b872a446..9fce585b81 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -915,35 +915,49 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
-
- while (it != node.vSendMsg.end()) {
- const auto& data = *it;
- assert(data.size() > node.nSendOffset);
+ bool data_left{false}; //!< second return value (whether unsent data remains)
+
+ while (true) {
+ if (it != node.vSendMsg.end()) {
+ // If possible, move one message from the send queue to the transport. This fails when
+ // there is an existing message still being sent.
+ size_t memusage = it->GetMemoryUsage();
+ if (node.m_transport->SetMessageToSend(*it)) {
+ // Update memory usage of send buffer (as *it will be deleted).
+ node.m_send_memusage -= memusage;
+ ++it;
+ }
+ }
+ const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
+ data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
- {
+ if (!data.empty()) {
LOCK(node.m_sock_mutex);
+ // There is no socket in case we've already disconnected, or in test cases without
+ // real connections. In these cases, we bail out immediately and just leave things
+ // in the send queue and transport.
if (!node.m_sock) {
break;
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
- if (it + 1 != node.vSendMsg.end()) {
+ // We have more to send if either the transport itself has more, or if we have more
+ // messages to send.
+ if (more || it != node.vSendMsg.end()) {
flags |= MSG_MORE;
}
#endif
- nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
+ nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
}
if (nBytes > 0) {
node.m_last_send = GetTime<std::chrono::seconds>();
node.nSendBytes += nBytes;
- node.nSendOffset += nBytes;
+ // Notify transport that bytes have been processed.
+ node.m_transport->MarkBytesSent(nBytes);
+ // Update statistics per message type.
+ node.AccountForSentBytes(msg_type, nBytes);
nSentSize += nBytes;
- if (node.nSendOffset == data.size()) {
- node.nSendOffset = 0;
- // Update memory usage of send buffer (as *it will be deleted).
- node.m_send_memusage -= sizeof(data) + memusage::DynamicUsage(data);
- it++;
- } else {
+ if ((size_t)nBytes != data.size()) {
// could not send full message; stop sending more
break;
}
@@ -956,7 +970,6 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
node.CloseSocketDisconnect();
}
}
- // couldn't send anything at all
break;
}
}
@@ -964,11 +977,10 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
if (it == node.vSendMsg.end()) {
- assert(node.nSendOffset == 0);
assert(node.m_send_memusage == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
- return {nSentSize, !node.vSendMsg.empty()};
+ return {nSentSize, data_left};
}
/** Try to find a connection to evict when the node is full.
@@ -1307,7 +1319,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
for (CNode* pnode : nodes) {
bool select_recv = !pnode->fPauseRecv;
- bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
+ bool select_send;
+ {
+ LOCK(pnode->cs_vSend);
+ // Sending is possible if either there are bytes to send right now, or if there will be
+ // once a potential message from vSendMsg is handed to the transport.
+ const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
+ select_send = !to_send.empty() || !pnode->vSendMsg.empty();
+ }
if (!select_recv && !select_send) continue;
LOCK(pnode->m_sock_mutex);
@@ -2988,42 +3007,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
- const bool queue_was_empty{pnode->vSendMsg.empty()};
-
- // Give the message to the transport, and add all bytes it wants us to send out as byte
- // vectors to vSendMsg. This is temporary code that exists to support the new transport
- // sending interface using the old way of queueing data. In a future commit vSendMsg will
- // be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code
- // will disappear.
- bool queued = pnode->m_transport->SetMessageToSend(msg);
- assert(queued);
- // In the current transport (V1Transport), GetBytesToSend first returns a header to send,
- // and then the payload data (if any), necessitating a loop.
- while (true) {
- const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend();
- if (bytes.empty()) break;
- // Update statistics per message type.
- pnode->AccountForSentBytes(msg_type, bytes.size());
- pnode->vSendMsg.push_back({bytes.begin(), bytes.end()});
- // Update memory usage of send buffer. For now, use static + dynamic memory usage of
- // byte vectors in vSendMsg as send memory. In a future commit, vSendMsg will be
- // replaced with a queue of CSerializedNetMsg objects, and we'll use their memory usage
- // instead.
- pnode->m_send_memusage += sizeof(pnode->vSendMsg.back()) + memusage::DynamicUsage(pnode->vSendMsg.back());
- // Notify transport that bytes have been processed (they're not actually sent yet,
- // but pushed onto the vSendMsg queue of bytes to send).
- pnode->m_transport->MarkBytesSent(bytes.size());
- }
- // At this point, m_transport->GetSendMemoryUsage() isn't very interesting as the
- // transport's message is fully flushed (and converted to byte arrays). It's still included
- // here for correctness, and will become relevant in a future commit when a queued message
- // inside the transport may survive PushMessage calls.
+ const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
+ const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
+
+ // Update memory usage of send buffer.
+ pnode->m_send_memusage += msg.GetMemoryUsage();
if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
+ // Move message to vSendMsg queue.
+ pnode->vSendMsg.push_back(std::move(msg));
- // If the write queue was empty before and isn't now, attempt "optimistic write":
+ // If there was nothing to send before, attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
// doing a send, try sending from the calling thread if the queue was empty before.
- if (queue_was_empty && !pnode->vSendMsg.empty()) {
+ if (queue_was_empty) {
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
}
}
diff --git a/src/net.h b/src/net.h
index 299d8c8e9b..ac26b538d5 100644
--- a/src/net.h
+++ b/src/net.h
@@ -436,13 +436,12 @@ public:
*/
std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex);
- /** Total memory usage of vSendMsg (counting the vectors and their dynamic usage, but not the
- * deque overhead). */
+ /** Sum of GetMemoryUsage of all vSendMsg entries. */
size_t m_send_memusage GUARDED_BY(cs_vSend){0};
- /** Offset inside the first vSendMsg already sent */
- size_t nSendOffset GUARDED_BY(cs_vSend){0};
+ /** Total number of bytes sent on the wire to this peer. */
uint64_t nSendBytes GUARDED_BY(cs_vSend){0};
- std::deque<std::vector<unsigned char>> vSendMsg GUARDED_BY(cs_vSend);
+ /** Messages still to be fed to m_transport->SetMessageToSend. */
+ std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend);
Mutex cs_vSend;
Mutex m_sock_mutex;
Mutex cs_vRecv;
diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp
index 90e5bb34ed..7f5d587cf6 100644
--- a/src/test/denialofservice_tests.cpp
+++ b/src/test/denialofservice_tests.cpp
@@ -86,9 +86,10 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
{
LOCK(dummyNode1.cs_vSend);
- BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
- dummyNode1.vSendMsg.clear();
+ const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
+ BOOST_CHECK(!to_send.empty());
}
+ connman.FlushSendBuffer(dummyNode1);
int64_t nStartTime = GetTime();
// Wait 21 minutes
@@ -96,7 +97,8 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
- BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
+ const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
+ BOOST_CHECK(!to_send.empty());
}
// Wait 3 more minutes
SetMockTime(nStartTime+24*60);
diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp
index 98962fceb5..4cb388c20b 100644
--- a/src/test/fuzz/process_messages.cpp
+++ b/src/test/fuzz/process_messages.cpp
@@ -67,6 +67,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
CNode& random_node = *PickValue(fuzzed_data_provider, peers);
+ connman.FlushSendBuffer(random_node);
(void)connman.ReceiveMsgFrom(random_node, std::move(net_msg));
random_node.fPauseSend = false;
diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp
index c071355bc0..8015db3e80 100644
--- a/src/test/util/net.cpp
+++ b/src/test/util/net.cpp
@@ -25,6 +25,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
const CNetMsgMaker mm{0};
peerman.InitializeNode(node, local_services);
+ FlushSendBuffer(node); // Drop the version message added by InitializeNode.
CSerializedNetMsg msg_version{
mm.Make(NetMsgType::VERSION,
@@ -45,6 +46,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
node.fPauseSend = false;
connman.ProcessMessagesOnce(node);
peerman.SendMessages(&node);
+ FlushSendBuffer(node); // Drop the verack message added by SendMessages.
if (node.fDisconnect) return;
assert(node.nVersion == version);
assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
@@ -70,6 +72,18 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
}
}
+void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
+{
+ LOCK(node.cs_vSend);
+ node.vSendMsg.clear();
+ node.m_send_memusage = 0;
+ while (true) {
+ const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
+ if (to_send.empty()) break;
+ node.m_transport->MarkBytesSent(to_send.size());
+ }
+}
+
bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
{
bool queued = node.m_transport->SetMessageToSend(ser_msg);
diff --git a/src/test/util/net.h b/src/test/util/net.h
index 687ce1e813..1684da777a 100644
--- a/src/test/util/net.h
+++ b/src/test/util/net.h
@@ -55,6 +55,7 @@ struct ConnmanTestMsg : public CConnman {
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;
bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const;
+ void FlushSendBuffer(CNode& node) const;
};
constexpr ServiceFlags ALL_SERVICE_FLAGS[]{