aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPieter Wuille <pieter@wuille.net>2023-08-16 13:21:35 -0400
committerPieter Wuille <pieter@wuille.net>2023-09-07 08:53:45 -0400
commitc3fad1f29df093e8fd03d70eb43f25ee9d531bf7 (patch)
tree9167136caf012096d498b9cdc71d80fb0156d3b2
parent8e0d9796da8cfb6c4e918788a03eea125d0633a6 (diff)
net: add have_next_message argument to Transport::GetBytesToSend()
Before this commit, there are only two possibly outcomes for the "more" prediction in Transport::GetBytesToSend(): * true: the transport itself has more to send, so the answer is certainly yes. * false: the transport has nothing further to send, but if vSendMsg has more message(s) left, that still will result in more wire bytes after the next SetMessageToSend(). For the BIP324 v2 transport, there will arguably be a third state: * definitely not: the transport has nothing further to send, but even if vSendMsg has more messages left, they can't be sent (right now). This happens before the handshake is complete. To implement this, we move the entire decision logic to the Transport, by adding a boolean to GetBytesToSend(), called have_next_message, which informs the transport whether more messages are available. The return values are still true and false, but they mean "definitely yes" and "definitely no", rather than "yes" and "maybe".
-rw-r--r--src/net.cpp43
-rw-r--r--src/net.h37
-rw-r--r--src/test/denialofservice_tests.cpp4
-rw-r--r--src/test/fuzz/p2p_transport_serialization.cpp34
-rw-r--r--src/test/util/net.cpp4
5 files changed, 85 insertions, 37 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 4addca0982..eaa99e6601 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -867,20 +867,22 @@ bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
return true;
}
-Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
+Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept
{
AssertLockNotHeld(m_send_mutex);
LOCK(m_send_mutex);
if (m_sending_header) {
return {Span{m_header_to_send}.subspan(m_bytes_sent),
- // We have more to send after the header if the message has payload.
- !m_message_to_send.data.empty(),
+ // We have more to send after the header if the message has payload, or if there
+ // is a next message after that.
+ have_next_message || !m_message_to_send.data.empty(),
m_message_to_send.m_type
};
} else {
return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
- // We never have more to send after this message's payload.
- false,
+ // We only have more to send after this message's payload if there is another
+ // message.
+ have_next_message,
m_message_to_send.m_type
};
}
@@ -916,6 +918,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
bool data_left{false}; //!< second return value (whether unsent data remains)
+ std::optional<bool> expected_more;
while (true) {
if (it != node.vSendMsg.end()) {
@@ -928,7 +931,12 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
++it;
}
}
- const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
+ const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end());
+ // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
+ // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
+ // verify that the previously returned 'more' was correct.
+ if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
+ expected_more = more;
data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
if (!data.empty()) {
@@ -941,9 +949,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
- // We have more to send if either the transport itself has more, or if we have more
- // messages to send.
- if (more || it != node.vSendMsg.end()) {
+ if (more) {
flags |= MSG_MORE;
}
#endif
@@ -1323,9 +1329,10 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
{
LOCK(pnode->cs_vSend);
// Sending is possible if either there are bytes to send right now, or if there will be
- // once a potential message from vSendMsg is handed to the transport.
- const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
- select_send = !to_send.empty() || !pnode->vSendMsg.empty();
+ // once a potential message from vSendMsg is handed to the transport. GetBytesToSend
+ // determines both of these in a single call.
+ const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(!pnode->vSendMsg.empty());
+ select_send = !to_send.empty() || more;
}
if (!select_recv && !select_send) continue;
@@ -3007,7 +3014,10 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
- const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
+ // Check if the transport still has unsent bytes, and indicate to it that we're about to
+ // give it a message to send.
+ const auto& [to_send, more, _msg_type] =
+ pnode->m_transport->GetBytesToSend(/*have_next_message=*/true);
const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
// Update memory usage of send buffer.
@@ -3016,10 +3026,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
// Move message to vSendMsg queue.
pnode->vSendMsg.push_back(std::move(msg));
- // If there was nothing to send before, attempt "optimistic write":
+ // If there was nothing to send before, and there is now (predicted by the "more" value
+ // returned by the GetBytesToSend call above), attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
// doing a send, try sending from the calling thread if the queue was empty before.
- if (queue_was_empty) {
+ // With a V1Transport, more will always be true here, because adding a message always
+ // results in sendable bytes there.
+ if (queue_was_empty && more) {
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
}
}
diff --git a/src/net.h b/src/net.h
index 60a15fea55..1507ff7384 100644
--- a/src/net.h
+++ b/src/net.h
@@ -308,19 +308,40 @@ public:
const std::string& /*m_type*/
>;
- /** Get bytes to send on the wire.
+ /** Get bytes to send on the wire, if any, along with other information about it.
*
* As a const function, it does not modify the transport's observable state, and is thus safe
* to be called multiple times.
*
- * The bytes returned by this function act as a stream which can only be appended to. This
- * means that with the exception of MarkBytesSent, operations on the transport can only append
- * to what is being returned.
+ * @param[in] have_next_message If true, the "more" return value reports whether more will
+ * be sendable after a SetMessageToSend call. It is set by the caller when they know
+ * they have another message ready to send, and only care about what happens
+ * after that. The have_next_message argument only affects this "more" return value
+ * and nothing else.
*
- * Note that m_type and to_send refer to data that is internal to the transport, and calling
- * any non-const function on this object may invalidate them.
+ * Effectively, there are three possible outcomes about whether there are more bytes
+ * to send:
+ * - Yes: the transport itself has more bytes to send later. For example, for
+ * V1Transport this happens during the sending of the header of a
+ * message, when there is a non-empty payload that follows.
+ * - No: the transport itself has no more bytes to send, but will have bytes to
+ * send if handed a message through SetMessageToSend. In V1Transport this
+ * happens when sending the payload of a message.
+ * - Blocked: the transport itself has no more bytes to send, and is also incapable
+ * of sending anything more at all now, if it were handed another
+ * message to send.
+ *
+ * The boolean 'more' is true for Yes, false for Blocked, and have_next_message
+ * controls what is returned for No.
+ *
+ * @return a BytesToSend object. The to_send member returned acts as a stream which is only
+ * ever appended to. This means that with the exception of MarkBytesSent (which pops
+ * bytes off the front of later to_sends), operations on the transport can only append
+ * to what is being returned. Also note that m_type and to_send refer to data that is
+ * internal to the transport, and calling any non-const function on this object may
+ * invalidate them.
*/
- virtual BytesToSend GetBytesToSend() const noexcept = 0;
+ virtual BytesToSend GetBytesToSend(bool have_next_message) const noexcept = 0;
/** Report how many bytes returned by the last GetBytesToSend() have been sent.
*
@@ -416,7 +437,7 @@ public:
CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);
bool SetMessageToSend(CSerializedNetMsg& msg) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
- BytesToSend GetBytesToSend() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
+ BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
};
diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp
index 7f5d587cf6..8c1182b5e1 100644
--- a/src/test/denialofservice_tests.cpp
+++ b/src/test/denialofservice_tests.cpp
@@ -86,7 +86,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
{
LOCK(dummyNode1.cs_vSend);
- const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
+ const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(false);
BOOST_CHECK(!to_send.empty());
}
connman.FlushSendBuffer(dummyNode1);
@@ -97,7 +97,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
- const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
+ const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(false);
BOOST_CHECK(!to_send.empty());
}
// Wait 3 more minutes
diff --git a/src/test/fuzz/p2p_transport_serialization.cpp b/src/test/fuzz/p2p_transport_serialization.cpp
index 2fa5de5008..468bb789ed 100644
--- a/src/test/fuzz/p2p_transport_serialization.cpp
+++ b/src/test/fuzz/p2p_transport_serialization.cpp
@@ -92,7 +92,7 @@ FUZZ_TARGET(p2p_transport_serialization, .init = initialize_p2p_transport_serial
assert(queued);
std::optional<bool> known_more;
while (true) {
- const auto& [to_send, more, _msg_type] = send_transport.GetBytesToSend();
+ const auto& [to_send, more, _msg_type] = send_transport.GetBytesToSend(false);
if (known_more) assert(!to_send.empty() == *known_more);
if (to_send.empty()) break;
send_transport.MarkBytesSent(to_send.size());
@@ -124,11 +124,13 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// Vectors with bytes last returned by GetBytesToSend() on transport[i].
std::array<std::vector<uint8_t>, 2> to_send;
- // Last returned 'more' values (if still relevant) by transport[i]->GetBytesToSend().
- std::array<std::optional<bool>, 2> last_more;
+ // Last returned 'more' values (if still relevant) by transport[i]->GetBytesToSend(), for
+ // both have_next_message false and true.
+ std::array<std::optional<bool>, 2> last_more, last_more_next;
- // Whether more bytes to be sent are expected on transport[i].
- std::array<std::optional<bool>, 2> expect_more;
+ // Whether more bytes to be sent are expected on transport[i], before and after
+ // SetMessageToSend().
+ std::array<std::optional<bool>, 2> expect_more, expect_more_next;
// Function to consume a message type.
auto msg_type_fn = [&]() {
@@ -177,18 +179,27 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// Wrapper around transport[i]->GetBytesToSend() that performs sanity checks.
auto bytes_to_send_fn = [&](int side) -> Transport::BytesToSend {
- const auto& [bytes, more, msg_type] = transports[side]->GetBytesToSend();
+ // Invoke GetBytesToSend twice (for have_next_message = {false, true}). This function does
+ // not modify state (it's const), and only the "more" return value should differ between
+ // the calls.
+ const auto& [bytes, more_nonext, msg_type] = transports[side]->GetBytesToSend(false);
+ const auto& [bytes_next, more_next, msg_type_next] = transports[side]->GetBytesToSend(true);
// Compare with expected more.
if (expect_more[side].has_value()) assert(!bytes.empty() == *expect_more[side]);
+ // Verify consistency between the two results.
+ assert(bytes == bytes_next);
+ assert(msg_type == msg_type_next);
+ if (more_nonext) assert(more_next);
// Compare with previously reported output.
assert(to_send[side].size() <= bytes.size());
assert(to_send[side] == Span{bytes}.first(to_send[side].size()));
to_send[side].resize(bytes.size());
std::copy(bytes.begin(), bytes.end(), to_send[side].begin());
- // Remember 'more' result.
- last_more[side] = {more};
+ // Remember 'more' results.
+ last_more[side] = {more_nonext};
+ last_more_next[side] = {more_next};
// Return.
- return {bytes, more, msg_type};
+ return {bytes, more_nonext, msg_type};
};
// Function to make side send a new message.
@@ -199,7 +210,8 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
CSerializedNetMsg msg = next_msg[side].Copy();
bool queued = transports[side]->SetMessageToSend(msg);
// Update expected more data.
- expect_more[side] = std::nullopt;
+ expect_more[side] = expect_more_next[side];
+ expect_more_next[side] = std::nullopt;
// Verify consistency of GetBytesToSend after SetMessageToSend
bytes_to_send_fn(/*side=*/side);
if (queued) {
@@ -223,6 +235,7 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// If all to-be-sent bytes were sent, move last_more data to expect_more data.
if (send_now == bytes.size()) {
expect_more[side] = last_more[side];
+ expect_more_next[side] = last_more_next[side];
}
// Remove the bytes from the last reported to-be-sent vector.
assert(to_send[side].size() >= send_now);
@@ -251,6 +264,7 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// Clear cached expected 'more' information: if certainly no more data was to be sent
// before, receiving bytes makes this uncertain.
if (expect_more[!side] == false) expect_more[!side] = std::nullopt;
+ if (expect_more_next[!side] == false) expect_more_next[!side] = std::nullopt;
// Verify consistency of GetBytesToSend after ReceivedBytes
bytes_to_send_fn(/*side=*/!side);
bool progress = to_recv.size() < old_len;
diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp
index 5696f8d13c..dc64c0b4c1 100644
--- a/src/test/util/net.cpp
+++ b/src/test/util/net.cpp
@@ -78,7 +78,7 @@ void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
node.vSendMsg.clear();
node.m_send_memusage = 0;
while (true) {
- const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
+ const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
if (to_send.empty()) break;
node.m_transport->MarkBytesSent(to_send.size());
}
@@ -90,7 +90,7 @@ bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) co
assert(queued);
bool complete{false};
while (true) {
- const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
+ const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
if (to_send.empty()) break;
NodeReceiveMsgBytes(node, to_send, complete);
node.m_transport->MarkBytesSent(to_send.size());