aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp198
1 files changed, 146 insertions, 52 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 53a2dcf125..e66c0ec7f8 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -19,6 +19,7 @@
#include <crypto/sha256.h>
#include <i2p.h>
#include <logging.h>
+#include <memusage.h>
#include <net_permissions.h>
#include <netaddress.h>
#include <netbase.h>
@@ -116,6 +117,14 @@ std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(g_maplocalhost_mute
static bool vfLimited[NET_MAX] GUARDED_BY(g_maplocalhost_mutex) = {};
std::string strSubVersion;
+size_t CSerializedNetMsg::GetMemoryUsage() const noexcept
+{
+ // Don't count the dynamic memory used for the m_type string, by assuming it fits in the
+ // "small string" optimization area (which stores data inside the object itself, up to some
+ // size; 15 bytes in modern libstdc++).
+ return sizeof(*this) + memusage::DynamicUsage(data);
+}
+
void CConnman::AddAddrFetch(const std::string& strDest)
{
LOCK(m_addr_fetches_mutex);
@@ -681,16 +690,15 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
nRecvBytes += msg_bytes.size();
while (msg_bytes.size() > 0) {
// absorb network data
- int handled = m_deserializer->Read(msg_bytes);
- if (handled < 0) {
- // Serious header problem, disconnect from the peer.
+ if (!m_transport->ReceivedBytes(msg_bytes)) {
+ // Serious transport problem, disconnect from the peer.
return false;
}
- if (m_deserializer->Complete()) {
+ if (m_transport->ReceivedMessageComplete()) {
// decompose a transport agnostic CNetMessage from the deserializer
bool reject_message{false};
- CNetMessage msg = m_deserializer->GetMessage(time, reject_message);
+ CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message);
if (reject_message) {
// Message deserialization failed. Drop the message but don't disconnect the peer.
// store the size of the corrupt message
@@ -717,8 +725,18 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
return true;
}
-int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
+V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept :
+ m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn)
{
+ assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes));
+ std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes);
+ LOCK(m_recv_mutex);
+ Reset();
+}
+
+int V1Transport::readHeader(Span<const uint8_t> msg_bytes)
+{
+ AssertLockHeld(m_recv_mutex);
// copy data to temporary parsing buffer
unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos;
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
@@ -740,7 +758,7 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
}
// Check start string, network magic
- if (memcmp(hdr.pchMessageStart, m_chain_params.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
+ if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id);
return -1;
}
@@ -757,8 +775,9 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
return nCopy;
}
-int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
+int V1Transport::readData(Span<const uint8_t> msg_bytes)
{
+ AssertLockHeld(m_recv_mutex);
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
@@ -774,19 +793,22 @@ int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
return nCopy;
}
-const uint256& V1TransportDeserializer::GetMessageHash() const
+const uint256& V1Transport::GetMessageHash() const
{
- assert(Complete());
+ AssertLockHeld(m_recv_mutex);
+ assert(CompleteInternal());
if (data_hash.IsNull())
hasher.Finalize(data_hash);
return data_hash;
}
-CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds time, bool& reject_message)
+CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message)
{
+ AssertLockNotHeld(m_recv_mutex);
// Initialize out parameter
reject_message = false;
// decompose a single CNetMessage from the TransportDeserializer
+ LOCK(m_recv_mutex);
CNetMessage msg(std::move(vRecv));
// store message type string, time, and sizes
@@ -819,53 +841,122 @@ CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds
return msg;
}
-void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const
+bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
{
+ AssertLockNotHeld(m_send_mutex);
+ // Determine whether a new message can be set.
+ LOCK(m_send_mutex);
+ if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false;
+
// create dbl-sha256 checksum
uint256 hash = Hash(msg.data);
// create header
- CMessageHeader hdr(Params().MessageStart(), msg.m_type.c_str(), msg.data.size());
+ CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size());
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
// serialize header
- header.reserve(CMessageHeader::HEADER_SIZE);
- CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
+ m_header_to_send.clear();
+ CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
+
+ // update state
+ m_message_to_send = std::move(msg);
+ m_sending_header = true;
+ m_bytes_sent = 0;
+ return true;
+}
+
+Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ if (m_sending_header) {
+ return {Span{m_header_to_send}.subspan(m_bytes_sent),
+ // We have more to send after the header if the message has payload.
+ !m_message_to_send.data.empty(),
+ m_message_to_send.m_type
+ };
+ } else {
+ return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
+ // We never have more to send after this message's payload.
+ false,
+ m_message_to_send.m_type
+ };
+ }
+}
+
+void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ m_bytes_sent += bytes_sent;
+ if (m_sending_header && m_bytes_sent == m_header_to_send.size()) {
+ // We're done sending a message's header. Switch to sending its data bytes.
+ m_sending_header = false;
+ m_bytes_sent = 0;
+ } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
+ // We're done sending a message's data. Wipe the data vector to reduce memory consumption.
+ m_message_to_send.data.clear();
+ m_message_to_send.data.shrink_to_fit();
+ m_bytes_sent = 0;
+ }
+}
+
+size_t V1Transport::GetSendMemoryUsage() const noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ // Don't count sending-side fields besides m_message_to_send, as they're all small and bounded.
+ return m_message_to_send.GetMemoryUsage();
}
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
-
- while (it != node.vSendMsg.end()) {
- const auto& data = *it;
- assert(data.size() > node.nSendOffset);
+ bool data_left{false}; //!< second return value (whether unsent data remains)
+
+ while (true) {
+ if (it != node.vSendMsg.end()) {
+ // If possible, move one message from the send queue to the transport. This fails when
+ // there is an existing message still being sent.
+ size_t memusage = it->GetMemoryUsage();
+ if (node.m_transport->SetMessageToSend(*it)) {
+ // Update memory usage of send buffer (as *it will be deleted).
+ node.m_send_memusage -= memusage;
+ ++it;
+ }
+ }
+ const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
+ data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
- {
+ if (!data.empty()) {
LOCK(node.m_sock_mutex);
+ // There is no socket in case we've already disconnected, or in test cases without
+ // real connections. In these cases, we bail out immediately and just leave things
+ // in the send queue and transport.
if (!node.m_sock) {
break;
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
- if (it + 1 != node.vSendMsg.end()) {
+ // We have more to send if either the transport itself has more, or if we have more
+ // messages to send.
+ if (more || it != node.vSendMsg.end()) {
flags |= MSG_MORE;
}
#endif
- nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
+ nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
}
if (nBytes > 0) {
node.m_last_send = GetTime<std::chrono::seconds>();
node.nSendBytes += nBytes;
- node.nSendOffset += nBytes;
+ // Notify transport that bytes have been processed.
+ node.m_transport->MarkBytesSent(nBytes);
+ // Update statistics per message type.
+ node.AccountForSentBytes(msg_type, nBytes);
nSentSize += nBytes;
- if (node.nSendOffset == data.size()) {
- node.nSendOffset = 0;
- node.nSendSize -= data.size();
- node.fPauseSend = node.nSendSize > nSendBufferMaxSize;
- it++;
- } else {
+ if ((size_t)nBytes != data.size()) {
// could not send full message; stop sending more
break;
}
@@ -878,17 +969,17 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
node.CloseSocketDisconnect();
}
}
- // couldn't send anything at all
break;
}
}
+ node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
+
if (it == node.vSendMsg.end()) {
- assert(node.nSendOffset == 0);
- assert(node.nSendSize == 0);
+ assert(node.m_send_memusage == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
- return {nSentSize, !node.vSendMsg.empty()};
+ return {nSentSize, data_left};
}
/** Try to find a connection to evict when the node is full.
@@ -1227,7 +1318,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
for (CNode* pnode : nodes) {
bool select_recv = !pnode->fPauseRecv;
- bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
+ bool select_send;
+ {
+ LOCK(pnode->cs_vSend);
+ // Sending is possible if either there are bytes to send right now, or if there will be
+ // once a potential message from vSendMsg is handed to the transport.
+ const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
+ select_send = !to_send.empty() || !pnode->vSendMsg.empty();
+ }
if (!select_recv && !select_send) continue;
LOCK(pnode->m_sock_mutex);
@@ -2822,8 +2920,7 @@ CNode::CNode(NodeId idIn,
ConnectionType conn_type_in,
bool inbound_onion,
CNodeOptions&& node_opts)
- : m_deserializer{std::make_unique<V1TransportDeserializer>(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))},
- m_serializer{std::make_unique<V1TransportSerializer>(V1TransportSerializer())},
+ : m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)},
m_permission_flags{node_opts.permission_flags},
m_sock{sock},
m_connected{GetTime<std::chrono::seconds>()},
@@ -2906,27 +3003,24 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
msg.data.data()
);
- // make sure we use the appropriate network transport format
- std::vector<unsigned char> serializedHeader;
- pnode->m_serializer->prepareForTransport(msg, serializedHeader);
- size_t nTotalSize = nMessageSize + serializedHeader.size();
-
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
- bool optimisticSend(pnode->vSendMsg.empty());
+ const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
+ const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
- //log total amount of bytes per message type
- pnode->AccountForSentBytes(msg.m_type, nTotalSize);
- pnode->nSendSize += nTotalSize;
+ // Update memory usage of send buffer.
+ pnode->m_send_memusage += msg.GetMemoryUsage();
+ if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
+ // Move message to vSendMsg queue.
+ pnode->vSendMsg.push_back(std::move(msg));
- if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
- pnode->vSendMsg.push_back(std::move(serializedHeader));
- if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
-
- // If write queue empty, attempt "optimistic write"
- bool data_left;
- if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
+ // If there was nothing to send before, attempt "optimistic write":
+ // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
+ // doing a send, try sending from the calling thread if the queue was empty before.
+ if (queue_was_empty) {
+ std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
+ }
}
if (nBytesSent) RecordBytesSent(nBytesSent);
}