diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 198 |
1 files changed, 146 insertions, 52 deletions
diff --git a/src/net.cpp b/src/net.cpp index 53a2dcf125..e66c0ec7f8 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -19,6 +19,7 @@ #include <crypto/sha256.h> #include <i2p.h> #include <logging.h> +#include <memusage.h> #include <net_permissions.h> #include <netaddress.h> #include <netbase.h> @@ -116,6 +117,14 @@ std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(g_maplocalhost_mute static bool vfLimited[NET_MAX] GUARDED_BY(g_maplocalhost_mutex) = {}; std::string strSubVersion; +size_t CSerializedNetMsg::GetMemoryUsage() const noexcept +{ + // Don't count the dynamic memory used for the m_type string, by assuming it fits in the + // "small string" optimization area (which stores data inside the object itself, up to some + // size; 15 bytes in modern libstdc++). + return sizeof(*this) + memusage::DynamicUsage(data); +} + void CConnman::AddAddrFetch(const std::string& strDest) { LOCK(m_addr_fetches_mutex); @@ -681,16 +690,15 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete) nRecvBytes += msg_bytes.size(); while (msg_bytes.size() > 0) { // absorb network data - int handled = m_deserializer->Read(msg_bytes); - if (handled < 0) { - // Serious header problem, disconnect from the peer. + if (!m_transport->ReceivedBytes(msg_bytes)) { + // Serious transport problem, disconnect from the peer. return false; } - if (m_deserializer->Complete()) { + if (m_transport->ReceivedMessageComplete()) { // decompose a transport agnostic CNetMessage from the deserializer bool reject_message{false}; - CNetMessage msg = m_deserializer->GetMessage(time, reject_message); + CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message); if (reject_message) { // Message deserialization failed. Drop the message but don't disconnect the peer. // store the size of the corrupt message @@ -717,8 +725,18 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete) return true; } -int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes) +V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept : + m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { + assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes)); + std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes); + LOCK(m_recv_mutex); + Reset(); +} + +int V1Transport::readHeader(Span<const uint8_t> msg_bytes) +{ + AssertLockHeld(m_recv_mutex); // copy data to temporary parsing buffer unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos; unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size()); @@ -740,7 +758,7 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes) } // Check start string, network magic - if (memcmp(hdr.pchMessageStart, m_chain_params.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { + if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id); return -1; } @@ -757,8 +775,9 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes) return nCopy; } -int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes) +int V1Transport::readData(Span<const uint8_t> msg_bytes) { + AssertLockHeld(m_recv_mutex); unsigned int nRemaining = hdr.nMessageSize - nDataPos; unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size()); @@ -774,19 +793,22 @@ int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes) return nCopy; } -const uint256& V1TransportDeserializer::GetMessageHash() const +const uint256& V1Transport::GetMessageHash() const { - assert(Complete()); + AssertLockHeld(m_recv_mutex); + assert(CompleteInternal()); if (data_hash.IsNull()) hasher.Finalize(data_hash); return data_hash; } -CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds time, bool& reject_message) +CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message) { + AssertLockNotHeld(m_recv_mutex); // Initialize out parameter reject_message = false; // decompose a single CNetMessage from the TransportDeserializer + LOCK(m_recv_mutex); CNetMessage msg(std::move(vRecv)); // store message type string, time, and sizes @@ -819,53 +841,122 @@ CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds return msg; } -void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const +bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept { + AssertLockNotHeld(m_send_mutex); + // Determine whether a new message can be set. + LOCK(m_send_mutex); + if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false; + // create dbl-sha256 checksum uint256 hash = Hash(msg.data); // create header - CMessageHeader hdr(Params().MessageStart(), msg.m_type.c_str(), msg.data.size()); + CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size()); memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE); // serialize header - header.reserve(CMessageHeader::HEADER_SIZE); - CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr}; + m_header_to_send.clear(); + CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr}; + + // update state + m_message_to_send = std::move(msg); + m_sending_header = true; + m_bytes_sent = 0; + return true; +} + +Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + if (m_sending_header) { + return {Span{m_header_to_send}.subspan(m_bytes_sent), + // We have more to send after the header if the message has payload. + !m_message_to_send.data.empty(), + m_message_to_send.m_type + }; + } else { + return {Span{m_message_to_send.data}.subspan(m_bytes_sent), + // We never have more to send after this message's payload. + false, + m_message_to_send.m_type + }; + } +} + +void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + m_bytes_sent += bytes_sent; + if (m_sending_header && m_bytes_sent == m_header_to_send.size()) { + // We're done sending a message's header. Switch to sending its data bytes. + m_sending_header = false; + m_bytes_sent = 0; + } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) { + // We're done sending a message's data. Wipe the data vector to reduce memory consumption. + m_message_to_send.data.clear(); + m_message_to_send.data.shrink_to_fit(); + m_bytes_sent = 0; + } +} + +size_t V1Transport::GetSendMemoryUsage() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + // Don't count sending-side fields besides m_message_to_send, as they're all small and bounded. + return m_message_to_send.GetMemoryUsage(); } std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; - - while (it != node.vSendMsg.end()) { - const auto& data = *it; - assert(data.size() > node.nSendOffset); + bool data_left{false}; //!< second return value (whether unsent data remains) + + while (true) { + if (it != node.vSendMsg.end()) { + // If possible, move one message from the send queue to the transport. This fails when + // there is an existing message still being sent. + size_t memusage = it->GetMemoryUsage(); + if (node.m_transport->SetMessageToSend(*it)) { + // Update memory usage of send buffer (as *it will be deleted). + node.m_send_memusage -= memusage; + ++it; + } + } + const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(); + data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent int nBytes = 0; - { + if (!data.empty()) { LOCK(node.m_sock_mutex); + // There is no socket in case we've already disconnected, or in test cases without + // real connections. In these cases, we bail out immediately and just leave things + // in the send queue and transport. if (!node.m_sock) { break; } int flags = MSG_NOSIGNAL | MSG_DONTWAIT; #ifdef MSG_MORE - if (it + 1 != node.vSendMsg.end()) { + // We have more to send if either the transport itself has more, or if we have more + // messages to send. + if (more || it != node.vSendMsg.end()) { flags |= MSG_MORE; } #endif - nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags); + nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags); } if (nBytes > 0) { node.m_last_send = GetTime<std::chrono::seconds>(); node.nSendBytes += nBytes; - node.nSendOffset += nBytes; + // Notify transport that bytes have been processed. + node.m_transport->MarkBytesSent(nBytes); + // Update statistics per message type. + node.AccountForSentBytes(msg_type, nBytes); nSentSize += nBytes; - if (node.nSendOffset == data.size()) { - node.nSendOffset = 0; - node.nSendSize -= data.size(); - node.fPauseSend = node.nSendSize > nSendBufferMaxSize; - it++; - } else { + if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more break; } @@ -878,17 +969,17 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const node.CloseSocketDisconnect(); } } - // couldn't send anything at all break; } } + node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize; + if (it == node.vSendMsg.end()) { - assert(node.nSendOffset == 0); - assert(node.nSendSize == 0); + assert(node.m_send_memusage == 0); } node.vSendMsg.erase(node.vSendMsg.begin(), it); - return {nSentSize, !node.vSendMsg.empty()}; + return {nSentSize, data_left}; } /** Try to find a connection to evict when the node is full. @@ -1227,7 +1318,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes) for (CNode* pnode : nodes) { bool select_recv = !pnode->fPauseRecv; - bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty()); + bool select_send; + { + LOCK(pnode->cs_vSend); + // Sending is possible if either there are bytes to send right now, or if there will be + // once a potential message from vSendMsg is handed to the transport. + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + select_send = !to_send.empty() || !pnode->vSendMsg.empty(); + } if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); @@ -2822,8 +2920,7 @@ CNode::CNode(NodeId idIn, ConnectionType conn_type_in, bool inbound_onion, CNodeOptions&& node_opts) - : m_deserializer{std::make_unique<V1TransportDeserializer>(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))}, - m_serializer{std::make_unique<V1TransportSerializer>(V1TransportSerializer())}, + : m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)}, m_permission_flags{node_opts.permission_flags}, m_sock{sock}, m_connected{GetTime<std::chrono::seconds>()}, @@ -2906,27 +3003,24 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) msg.data.data() ); - // make sure we use the appropriate network transport format - std::vector<unsigned char> serializedHeader; - pnode->m_serializer->prepareForTransport(msg, serializedHeader); - size_t nTotalSize = nMessageSize + serializedHeader.size(); - size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - bool optimisticSend(pnode->vSendMsg.empty()); + const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()}; - //log total amount of bytes per message type - pnode->AccountForSentBytes(msg.m_type, nTotalSize); - pnode->nSendSize += nTotalSize; + // Update memory usage of send buffer. + pnode->m_send_memusage += msg.GetMemoryUsage(); + if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true; + // Move message to vSendMsg queue. + pnode->vSendMsg.push_back(std::move(msg)); - if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true; - pnode->vSendMsg.push_back(std::move(serializedHeader)); - if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); - - // If write queue empty, attempt "optimistic write" - bool data_left; - if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode); + // If there was nothing to send before, attempt "optimistic write": + // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually + // doing a send, try sending from the calling thread if the queue was empty before. + if (queue_was_empty) { + std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode); + } } if (nBytesSent) RecordBytesSent(nBytesSent); } |