From dbf779d5deb04f55c6e8493ce4e12ed4628638f3 Mon Sep 17 00:00:00 2001 From: Troy Giorshev Date: Wed, 22 Jul 2020 08:59:16 -0400 Subject: Clean PushMessage and ProcessMessages This brings PushMessage and ProcessMessages further in line with the style guide by fixing their if statements. LogMessage is later called, inside an if statement, inside both of these methods. --- src/net.cpp | 12 ++++-------- src/net_processing.cpp | 6 ++---- 2 files changed, 6 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index 4f74bbede4..a3f7f3aba1 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2879,18 +2879,14 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.m_type] += nTotalSize; pnode->nSendSize += nTotalSize; - if (pnode->nSendSize > nSendBufferMaxSize) - pnode->fPauseSend = true; + if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); - if (nMessageSize) - pnode->vSendMsg.push_back(std::move(msg.data)); + if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); // If write queue empty, attempt "optimistic write" - if (optimisticSend == true) - nBytesSent = SocketSendData(*pnode); + if (optimisticSend) nBytesSent = SocketSendData(*pnode); } - if (nBytesSent) - RecordBytesSent(nBytesSent); + if (nBytesSent) RecordBytesSent(nBytesSent); } bool CConnman::ForNode(NodeId id, std::function func) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index cf73b1dae2..7d89d3ea6e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4028,14 +4028,12 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt } // Don't bother if send buffer is too full to respond anyway - if (pfrom->fPauseSend) - return false; + if (pfrom->fPauseSend) return false; std::list msgs; { LOCK(pfrom->cs_vProcessMsg); - if (pfrom->vProcessMsg.empty()) - return false; + if (pfrom->vProcessMsg.empty()) return false; // Just take one message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size; -- cgit v1.2.3 From f2a77ff97bec09dd5fcc043d8659d8ec5dfb87c2 Mon Sep 17 00:00:00 2001 From: Troy Giorshev Date: Mon, 13 Jul 2020 13:20:47 -0400 Subject: Add CaptureMessage This commit adds the CaptureMessage function. This will later be called when any message is sent or received. The capture directory is fixed, in a new folder "message_capture" in the datadir. Peers will then have their own subfolders, named with their IP address and port, replacing colons with underscores to keep compatibility with Windows. Inside, received and sent messages will be captured into two binary files, msgs_recv.dat and msgs_sent.dat. e.g. message_capture/203.0.113.7_56072/msgs_recv.dat message_capture/203.0.113.7_56072/msgs_sent.dat The format has been designed as to result in a minimal performance impact. A parsing script is added in a later commit. --- src/init.cpp | 7 ++++--- src/net.cpp | 28 ++++++++++++++++++++++++++++ src/net.h | 6 ++++++ 3 files changed, 38 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/init.cpp b/src/init.cpp index 09eb76eaee..3ab6263813 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1042,16 +1042,17 @@ bool AppInitParameterInteraction(const ArgsManager& args) // Trim requested connection counts, to fit into system limitations // in std::min(...) to work around FreeBSD compilation issue described in #2695 - nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind); + nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind + NUM_FDS_MESSAGE_CAPTURE); + #ifdef USE_POLL int fd_max = nFD; #else int fd_max = FD_SETSIZE; #endif - nMaxConnections = std::max(std::min(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS), 0); + nMaxConnections = std::max(std::min(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE), 0); if (nFD < MIN_CORE_FILEDESCRIPTORS) return InitError(_("Not enough file descriptors available.")); - nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS, nMaxConnections); + nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE, nMaxConnections); if (nMaxConnections < nUserMaxConnections) InitWarning(strprintf(_("Reducing -maxconnections from %d to %d, because of system limitations."), nUserMaxConnections, nMaxConnections)); diff --git a/src/net.cpp b/src/net.cpp index a3f7f3aba1..29529d54ca 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2929,3 +2929,31 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize(); } + +void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span& data, bool is_incoming) +{ + // Note: This function captures the message at the time of processing, + // not at socket receive/send time. + // This ensures that the messages are always in order from an application + // layer (processing) perspective. + auto now = GetTime(); + + // Windows folder names can not include a colon + std::string clean_addr = addr.ToString(); + std::replace(clean_addr.begin(), clean_addr.end(), ':', '_'); + + fs::path base_path = GetDataDir() / "message_capture" / clean_addr; + fs::create_directories(base_path); + + fs::path path = base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat"); + CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION); + + ser_writedata64(f, now.count()); + f.write(msg_type.data(), msg_type.length()); + for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) { + f << '\0'; + } + uint32_t size = data.size(); + ser_writedata32(f, size); + f.write((const char*)data.data(), data.size()); +} diff --git a/src/net.h b/src/net.h index 087135a290..96d411bf29 100644 --- a/src/net.h +++ b/src/net.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -75,6 +76,8 @@ static constexpr uint64_t DEFAULT_MAX_UPLOAD_TARGET = 0; static const bool DEFAULT_BLOCKSONLY = false; /** -peertimeout default */ static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60; +/** Number of file descriptors required for message capture **/ +static const int NUM_FDS_MESSAGE_CAPTURE = 1; static const bool DEFAULT_FORCEDNSSEED = false; static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000; @@ -1239,6 +1242,9 @@ inline std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now, return std::chrono::microseconds{PoissonNextSend(now.count(), average_interval.count())}; } +/** Dump binary message to file, with timestamp */ +void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span& data, bool is_incoming); + struct NodeEvictionCandidate { NodeId id; -- cgit v1.2.3 From 4d1a582549bc982d55e24585b0ba06f92f21e9da Mon Sep 17 00:00:00 2001 From: Troy Giorshev Date: Mon, 13 Jul 2020 14:00:03 -0400 Subject: Call CaptureMessage at appropriate locations These calls are toggled by a debug-only "capturemessages" flag. Default disabled. --- src/init.cpp | 1 + src/net.cpp | 3 +++ src/net_processing.cpp | 4 ++++ 3 files changed, 8 insertions(+) (limited to 'src') diff --git a/src/init.cpp b/src/init.cpp index 3ab6263813..e16f0e8c83 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -521,6 +521,7 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-limitdescendantcount=", strprintf("Do not accept transactions if any ancestor would have or more in-mempool descendants (default: %u)", DEFAULT_DESCENDANT_LIMIT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); argsman.AddArg("-limitdescendantsize=", strprintf("Do not accept transactions if any ancestor would have more than kilobytes of in-mempool descendants (default: %u).", DEFAULT_DESCENDANT_SIZE_LIMIT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); + argsman.AddArg("-capturemessages", "Capture all P2P messages to disk", ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); argsman.AddArg("-debug=", "Output debugging information (default: -nodebug, supplying is optional). " "If is not supplied or if = 1, output all debugging information. can be: " + LogInstance().LogCategoriesString() + ".", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST); diff --git a/src/net.cpp b/src/net.cpp index 29529d54ca..81f014d932 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2864,6 +2864,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { size_t nMessageSize = msg.data.size(); LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.m_type), nMessageSize, pnode->GetId()); + if (gArgs.GetBoolArg("-capturemessages", false)) { + CaptureMessage(pnode->addr, msg.m_type, msg.data, /* incoming */ false); + } // make sure we use the appropriate network transport format std::vector serializedHeader; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 7d89d3ea6e..d0a743e81c 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4042,6 +4042,10 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt } CNetMessage& msg(msgs.front()); + if (gArgs.GetBoolArg("-capturemessages", false)) { + CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv), /* incoming */ true); + } + msg.SetVersion(pfrom->GetCommonVersion()); const std::string& msg_type = msg.m_command; -- cgit v1.2.3