aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/net.cpp96
-rw-r--r--src/net.h9
-rw-r--r--src/test/denialofservice_tests.cpp8
-rw-r--r--src/test/fuzz/process_messages.cpp1
-rw-r--r--src/test/util/net.cpp14
-rw-r--r--src/test/util/net.h1
6 files changed, 71 insertions, 58 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 47b872a446..9fce585b81 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -915,35 +915,49 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
-
- while (it != node.vSendMsg.end()) {
- const auto& data = *it;
- assert(data.size() > node.nSendOffset);
+ bool data_left{false}; //!< second return value (whether unsent data remains)
+
+ while (true) {
+ if (it != node.vSendMsg.end()) {
+ // If possible, move one message from the send queue to the transport. This fails when
+ // there is an existing message still being sent.
+ size_t memusage = it->GetMemoryUsage();
+ if (node.m_transport->SetMessageToSend(*it)) {
+ // Update memory usage of send buffer (as *it will be deleted).
+ node.m_send_memusage -= memusage;
+ ++it;
+ }
+ }
+ const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
+ data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
- {
+ if (!data.empty()) {
LOCK(node.m_sock_mutex);
+ // There is no socket in case we've already disconnected, or in test cases without
+ // real connections. In these cases, we bail out immediately and just leave things
+ // in the send queue and transport.
if (!node.m_sock) {
break;
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
- if (it + 1 != node.vSendMsg.end()) {
+ // We have more to send if either the transport itself has more, or if we have more
+ // messages to send.
+ if (more || it != node.vSendMsg.end()) {
flags |= MSG_MORE;
}
#endif
- nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
+ nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
}
if (nBytes > 0) {
node.m_last_send = GetTime<std::chrono::seconds>();
node.nSendBytes += nBytes;
- node.nSendOffset += nBytes;
+ // Notify transport that bytes have been processed.
+ node.m_transport->MarkBytesSent(nBytes);
+ // Update statistics per message type.
+ node.AccountForSentBytes(msg_type, nBytes);
nSentSize += nBytes;
- if (node.nSendOffset == data.size()) {
- node.nSendOffset = 0;
- // Update memory usage of send buffer (as *it will be deleted).
- node.m_send_memusage -= sizeof(data) + memusage::DynamicUsage(data);
- it++;
- } else {
+ if ((size_t)nBytes != data.size()) {
// could not send full message; stop sending more
break;
}
@@ -956,7 +970,6 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
node.CloseSocketDisconnect();
}
}
- // couldn't send anything at all
break;
}
}
@@ -964,11 +977,10 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
if (it == node.vSendMsg.end()) {
- assert(node.nSendOffset == 0);
assert(node.m_send_memusage == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
- return {nSentSize, !node.vSendMsg.empty()};
+ return {nSentSize, data_left};
}
/** Try to find a connection to evict when the node is full.
@@ -1307,7 +1319,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
for (CNode* pnode : nodes) {
bool select_recv = !pnode->fPauseRecv;
- bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
+ bool select_send;
+ {
+ LOCK(pnode->cs_vSend);
+ // Sending is possible if either there are bytes to send right now, or if there will be
+ // once a potential message from vSendMsg is handed to the transport.
+ const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
+ select_send = !to_send.empty() || !pnode->vSendMsg.empty();
+ }
if (!select_recv && !select_send) continue;
LOCK(pnode->m_sock_mutex);
@@ -2988,42 +3007,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
- const bool queue_was_empty{pnode->vSendMsg.empty()};
-
- // Give the message to the transport, and add all bytes it wants us to send out as byte
- // vectors to vSendMsg. This is temporary code that exists to support the new transport
- // sending interface using the old way of queueing data. In a future commit vSendMsg will
- // be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code
- // will disappear.
- bool queued = pnode->m_transport->SetMessageToSend(msg);
- assert(queued);
- // In the current transport (V1Transport), GetBytesToSend first returns a header to send,
- // and then the payload data (if any), necessitating a loop.
- while (true) {
- const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend();
- if (bytes.empty()) break;
- // Update statistics per message type.
- pnode->AccountForSentBytes(msg_type, bytes.size());
- pnode->vSendMsg.push_back({bytes.begin(), bytes.end()});
- // Update memory usage of send buffer. For now, use static + dynamic memory usage of
- // byte vectors in vSendMsg as send memory. In a future commit, vSendMsg will be
- // replaced with a queue of CSerializedNetMsg objects, and we'll use their memory usage
- // instead.
- pnode->m_send_memusage += sizeof(pnode->vSendMsg.back()) + memusage::DynamicUsage(pnode->vSendMsg.back());
- // Notify transport that bytes have been processed (they're not actually sent yet,
- // but pushed onto the vSendMsg queue of bytes to send).
- pnode->m_transport->MarkBytesSent(bytes.size());
- }
- // At this point, m_transport->GetSendMemoryUsage() isn't very interesting as the
- // transport's message is fully flushed (and converted to byte arrays). It's still included
- // here for correctness, and will become relevant in a future commit when a queued message
- // inside the transport may survive PushMessage calls.
+ const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
+ const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
+
+ // Update memory usage of send buffer.
+ pnode->m_send_memusage += msg.GetMemoryUsage();
if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
+ // Move message to vSendMsg queue.
+ pnode->vSendMsg.push_back(std::move(msg));
- // If the write queue was empty before and isn't now, attempt "optimistic write":
+ // If there was nothing to send before, attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
// doing a send, try sending from the calling thread if the queue was empty before.
- if (queue_was_empty && !pnode->vSendMsg.empty()) {
+ if (queue_was_empty) {
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
}
}
diff --git a/src/net.h b/src/net.h
index 299d8c8e9b..ac26b538d5 100644
--- a/src/net.h
+++ b/src/net.h
@@ -436,13 +436,12 @@ public:
*/
std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex);
- /** Total memory usage of vSendMsg (counting the vectors and their dynamic usage, but not the
- * deque overhead). */
+ /** Sum of GetMemoryUsage of all vSendMsg entries. */
size_t m_send_memusage GUARDED_BY(cs_vSend){0};
- /** Offset inside the first vSendMsg already sent */
- size_t nSendOffset GUARDED_BY(cs_vSend){0};
+ /** Total number of bytes sent on the wire to this peer. */
uint64_t nSendBytes GUARDED_BY(cs_vSend){0};
- std::deque<std::vector<unsigned char>> vSendMsg GUARDED_BY(cs_vSend);
+ /** Messages still to be fed to m_transport->SetMessageToSend. */
+ std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend);
Mutex cs_vSend;
Mutex m_sock_mutex;
Mutex cs_vRecv;
diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp
index 90e5bb34ed..7f5d587cf6 100644
--- a/src/test/denialofservice_tests.cpp
+++ b/src/test/denialofservice_tests.cpp
@@ -86,9 +86,10 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
{
LOCK(dummyNode1.cs_vSend);
- BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
- dummyNode1.vSendMsg.clear();
+ const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
+ BOOST_CHECK(!to_send.empty());
}
+ connman.FlushSendBuffer(dummyNode1);
int64_t nStartTime = GetTime();
// Wait 21 minutes
@@ -96,7 +97,8 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
- BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
+ const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
+ BOOST_CHECK(!to_send.empty());
}
// Wait 3 more minutes
SetMockTime(nStartTime+24*60);
diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp
index 98962fceb5..4cb388c20b 100644
--- a/src/test/fuzz/process_messages.cpp
+++ b/src/test/fuzz/process_messages.cpp
@@ -67,6 +67,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
CNode& random_node = *PickValue(fuzzed_data_provider, peers);
+ connman.FlushSendBuffer(random_node);
(void)connman.ReceiveMsgFrom(random_node, std::move(net_msg));
random_node.fPauseSend = false;
diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp
index c071355bc0..8015db3e80 100644
--- a/src/test/util/net.cpp
+++ b/src/test/util/net.cpp
@@ -25,6 +25,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
const CNetMsgMaker mm{0};
peerman.InitializeNode(node, local_services);
+ FlushSendBuffer(node); // Drop the version message added by InitializeNode.
CSerializedNetMsg msg_version{
mm.Make(NetMsgType::VERSION,
@@ -45,6 +46,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
node.fPauseSend = false;
connman.ProcessMessagesOnce(node);
peerman.SendMessages(&node);
+ FlushSendBuffer(node); // Drop the verack message added by SendMessages.
if (node.fDisconnect) return;
assert(node.nVersion == version);
assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
@@ -70,6 +72,18 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
}
}
+void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
+{
+ LOCK(node.cs_vSend);
+ node.vSendMsg.clear();
+ node.m_send_memusage = 0;
+ while (true) {
+ const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
+ if (to_send.empty()) break;
+ node.m_transport->MarkBytesSent(to_send.size());
+ }
+}
+
bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
{
bool queued = node.m_transport->SetMessageToSend(ser_msg);
diff --git a/src/test/util/net.h b/src/test/util/net.h
index 687ce1e813..1684da777a 100644
--- a/src/test/util/net.h
+++ b/src/test/util/net.h
@@ -55,6 +55,7 @@ struct ConnmanTestMsg : public CConnman {
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;
bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const;
+ void FlushSendBuffer(CNode& node) const;
};
constexpr ServiceFlags ALL_SERVICE_FLAGS[]{