aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/message-capture/message-capture-docs.md25
-rwxr-xr-xcontrib/message-capture/message-capture-parser.py214
-rw-r--r--src/init.cpp8
-rw-r--r--src/net.cpp43
-rw-r--r--src/net.h6
-rw-r--r--src/net_processing.cpp10
-rwxr-xr-xtest/functional/p2p_message_capture.py76
-rwxr-xr-xtest/functional/test_framework/messages.py2
-rwxr-xr-xtest/functional/test_runner.py1
9 files changed, 369 insertions, 16 deletions
diff --git a/contrib/message-capture/message-capture-docs.md b/contrib/message-capture/message-capture-docs.md
new file mode 100644
index 0000000000..7301968461
--- /dev/null
+++ b/contrib/message-capture/message-capture-docs.md
@@ -0,0 +1,25 @@
+# Per-Peer Message Capture
+
+## Purpose
+
+This feature allows for message capture on a per-peer basis. It answers the simple question: "Can I see what messages my node is sending and receiving?"
+
+## Usage and Functionality
+
+* Run `bitcoind` with the `-capturemessages` option.
+* Look in the `message_capture` folder in your datadir.
+ * Typically this will be `~/.bitcoin/message_capture`.
+ * See that there are many folders inside, one for each peer names with its IP address and port.
+ * Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`).
+* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments.
+ * See the `-h` option for help.
+ * To see all messages, both sent and received, for all peers use:
+ ```
+ ./contrib/message-capture/message-capture-parser.py -o out.json \
+ ~/.bitcoin/message_capture/**/*.dat
+ ```
+ * Note: The messages in the given `.dat` files will be interleaved in chronological order. So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order.
+ * If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`.
+* View the resulting output.
+ * The output file is `JSON` formatted.
+ * Suggestion: use `jq` to view the output, with `jq . out.json`
diff --git a/contrib/message-capture/message-capture-parser.py b/contrib/message-capture/message-capture-parser.py
new file mode 100755
index 0000000000..9988478f1b
--- /dev/null
+++ b/contrib/message-capture/message-capture-parser.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python3
+# Copyright (c) 2020 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+"""Parse message capture binary files. To be used in conjunction with -capturemessages."""
+
+import argparse
+import os
+import shutil
+import sys
+from io import BytesIO
+import json
+from pathlib import Path
+from typing import Any, List, Optional
+
+sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
+
+from test_framework.messages import ser_uint256 # noqa: E402
+from test_framework.p2p import MESSAGEMAP # noqa: E402
+
+TIME_SIZE = 8
+LENGTH_SIZE = 4
+MSGTYPE_SIZE = 12
+
+# The test framework classes stores hashes as large ints in many cases.
+# These are variables of type uint256 in core.
+# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
+# As such, they are itemized here.
+# Any variables with these names that are of type int are actually uint256 variables.
+# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
+HASH_INTS = [
+ "blockhash",
+ "block_hash",
+ "hash",
+ "hashMerkleRoot",
+ "hashPrevBlock",
+ "hashstop",
+ "prev_header",
+ "sha256",
+ "stop_hash",
+]
+
+HASH_INT_VECTORS = [
+ "hashes",
+ "headers",
+ "vHave",
+ "vHash",
+]
+
+
+class ProgressBar:
+ def __init__(self, total: float):
+ self.total = total
+ self.running = 0
+
+ def set_progress(self, progress: float):
+ cols = shutil.get_terminal_size()[0]
+ if cols <= 12:
+ return
+ max_blocks = cols - 9
+ num_blocks = int(max_blocks * progress)
+ print('\r[ {}{} ] {:3.0f}%'
+ .format('#' * num_blocks,
+ ' ' * (max_blocks - num_blocks),
+ progress * 100),
+ end ='')
+
+ def update(self, more: float):
+ self.running += more
+ self.set_progress(self.running / self.total)
+
+
+def to_jsonable(obj: Any) -> Any:
+ if hasattr(obj, "__dict__"):
+ return obj.__dict__
+ elif hasattr(obj, "__slots__"):
+ ret = {} # type: Any
+ for slot in obj.__slots__:
+ val = getattr(obj, slot, None)
+ if slot in HASH_INTS and isinstance(val, int):
+ ret[slot] = ser_uint256(val).hex()
+ elif slot in HASH_INT_VECTORS and isinstance(val[0], int):
+ ret[slot] = [ser_uint256(a).hex() for a in val]
+ else:
+ ret[slot] = to_jsonable(val)
+ return ret
+ elif isinstance(obj, list):
+ return [to_jsonable(a) for a in obj]
+ elif isinstance(obj, bytes):
+ return obj.hex()
+ else:
+ return obj
+
+
+def process_file(path: str, messages: List[Any], recv: bool, progress_bar: Optional[ProgressBar]) -> None:
+ with open(path, 'rb') as f_in:
+ if progress_bar:
+ bytes_read = 0
+
+ while True:
+ if progress_bar:
+ # Update progress bar
+ diff = f_in.tell() - bytes_read - 1
+ progress_bar.update(diff)
+ bytes_read = f_in.tell() - 1
+
+ # Read the Header
+ tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
+ if not tmp_header_raw:
+ break
+ tmp_header = BytesIO(tmp_header_raw)
+ time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") # type: int
+ msgtype = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0] # type: bytes
+ length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") # type: int
+
+ # Start converting the message to a dictionary
+ msg_dict = {}
+ msg_dict["direction"] = "recv" if recv else "sent"
+ msg_dict["time"] = time
+ msg_dict["size"] = length # "size" is less readable here, but more readable in the output
+
+ msg_ser = BytesIO(f_in.read(length))
+
+ # Determine message type
+ if msgtype not in MESSAGEMAP:
+ # Unrecognized message type
+ try:
+ msgtype_tmp = msgtype.decode()
+ if not msgtype_tmp.isprintable():
+ raise UnicodeDecodeError
+ msg_dict["msgtype"] = msgtype_tmp
+ except UnicodeDecodeError:
+ msg_dict["msgtype"] = "UNREADABLE"
+ msg_dict["body"] = msg_ser.read().hex()
+ msg_dict["error"] = "Unrecognized message type."
+ messages.append(msg_dict)
+ print(f"WARNING - Unrecognized message type {msgtype} in {path}", file=sys.stderr)
+ continue
+
+ # Deserialize the message
+ msg = MESSAGEMAP[msgtype]()
+ msg_dict["msgtype"] = msgtype.decode()
+
+ try:
+ msg.deserialize(msg_ser)
+ except KeyboardInterrupt:
+ raise
+ except Exception:
+ # Unable to deserialize message body
+ msg_ser.seek(0, os.SEEK_SET)
+ msg_dict["body"] = msg_ser.read().hex()
+ msg_dict["error"] = "Unable to deserialize message."
+ messages.append(msg_dict)
+ print(f"WARNING - Unable to deserialize message in {path}", file=sys.stderr)
+ continue
+
+ # Convert body of message into a jsonable object
+ if length:
+ msg_dict["body"] = to_jsonable(msg)
+ messages.append(msg_dict)
+
+ if progress_bar:
+ # Update the progress bar to the end of the current file
+ # in case we exited the loop early
+ f_in.seek(0, os.SEEK_END) # Go to end of file
+ diff = f_in.tell() - bytes_read - 1
+ progress_bar.update(diff)
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(sys.argv[0]),
+ formatter_class=argparse.RawTextHelpFormatter)
+ parser.add_argument(
+ "capturepaths",
+ nargs='+',
+ help="binary message capture files to parse.")
+ parser.add_argument(
+ "-o", "--output",
+ help="output file. If unset print to stdout")
+ parser.add_argument(
+ "-n", "--no-progress-bar",
+ action='store_true',
+ help="disable the progress bar. Automatically set if the output is not a terminal")
+ args = parser.parse_args()
+ capturepaths = [Path.cwd() / Path(capturepath) for capturepath in args.capturepaths]
+ output = Path.cwd() / Path(args.output) if args.output else False
+ use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()
+
+ messages = [] # type: List[Any]
+ if use_progress_bar:
+ total_size = sum(capture.stat().st_size for capture in capturepaths)
+ progress_bar = ProgressBar(total_size)
+ else:
+ progress_bar = None
+
+ for capture in capturepaths:
+ process_file(str(capture), messages, "recv" in capture.stem, progress_bar)
+
+ messages.sort(key=lambda msg: msg['time'])
+
+ if use_progress_bar:
+ progress_bar.set_progress(1)
+
+ jsonrep = json.dumps(messages)
+ if output:
+ with open(str(output), 'w+', encoding="utf8") as f_out:
+ f_out.write(jsonrep)
+ else:
+ print(jsonrep)
+
+if __name__ == "__main__":
+ main()
diff --git a/src/init.cpp b/src/init.cpp
index 716c06cd3a..ee3c169c63 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -519,6 +519,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-limitdescendantcount=<n>", strprintf("Do not accept transactions if any ancestor would have <n> or more in-mempool descendants (default: %u)", DEFAULT_DESCENDANT_LIMIT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
argsman.AddArg("-limitdescendantsize=<n>", strprintf("Do not accept transactions if any ancestor would have more than <n> kilobytes of in-mempool descendants (default: %u).", DEFAULT_DESCENDANT_SIZE_LIMIT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
+ argsman.AddArg("-capturemessages", "Capture all P2P messages to disk", ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
argsman.AddArg("-debug=<category>", "Output debugging information (default: -nodebug, supplying <category> is optional). "
"If <category> is not supplied or if <category> = 1, output all debugging information. <category> can be: " + LogInstance().LogCategoriesString() + ".",
ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
@@ -1040,16 +1041,17 @@ bool AppInitParameterInteraction(const ArgsManager& args)
// Trim requested connection counts, to fit into system limitations
// <int> in std::min<int>(...) to work around FreeBSD compilation issue described in #2695
- nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind);
+ nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind + NUM_FDS_MESSAGE_CAPTURE);
+
#ifdef USE_POLL
int fd_max = nFD;
#else
int fd_max = FD_SETSIZE;
#endif
- nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS), 0);
+ nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE), 0);
if (nFD < MIN_CORE_FILEDESCRIPTORS)
return InitError(_("Not enough file descriptors available."));
- nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS, nMaxConnections);
+ nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE, nMaxConnections);
if (nMaxConnections < nUserMaxConnections)
InitWarning(strprintf(_("Reducing -maxconnections from %d to %d, because of system limitations."), nUserMaxConnections, nMaxConnections));
diff --git a/src/net.cpp b/src/net.cpp
index 76bf7effa4..d004aace88 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -2879,6 +2879,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
size_t nMessageSize = msg.data.size();
LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.m_type), nMessageSize, pnode->GetId());
+ if (gArgs.GetBoolArg("-capturemessages", false)) {
+ CaptureMessage(pnode->addr, msg.m_type, msg.data, /* incoming */ false);
+ }
// make sure we use the appropriate network transport format
std::vector<unsigned char> serializedHeader;
@@ -2894,18 +2897,14 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.m_type] += nTotalSize;
pnode->nSendSize += nTotalSize;
- if (pnode->nSendSize > nSendBufferMaxSize)
- pnode->fPauseSend = true;
+ if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
- if (nMessageSize)
- pnode->vSendMsg.push_back(std::move(msg.data));
+ if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
// If write queue empty, attempt "optimistic write"
- if (optimisticSend == true)
- nBytesSent = SocketSendData(*pnode);
+ if (optimisticSend) nBytesSent = SocketSendData(*pnode);
}
- if (nBytesSent)
- RecordBytesSent(nBytesSent);
+ if (nBytesSent) RecordBytesSent(nBytesSent);
}
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
@@ -2948,3 +2947,31 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
}
+
+void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming)
+{
+ // Note: This function captures the message at the time of processing,
+ // not at socket receive/send time.
+ // This ensures that the messages are always in order from an application
+ // layer (processing) perspective.
+ auto now = GetTime<std::chrono::microseconds>();
+
+ // Windows folder names can not include a colon
+ std::string clean_addr = addr.ToString();
+ std::replace(clean_addr.begin(), clean_addr.end(), ':', '_');
+
+ fs::path base_path = GetDataDir() / "message_capture" / clean_addr;
+ fs::create_directories(base_path);
+
+ fs::path path = base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat");
+ CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION);
+
+ ser_writedata64(f, now.count());
+ f.write(msg_type.data(), msg_type.length());
+ for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
+ f << '\0';
+ }
+ uint32_t size = data.size();
+ ser_writedata32(f, size);
+ f.write((const char*)data.data(), data.size());
+}
diff --git a/src/net.h b/src/net.h
index 86fcee512a..2c47d5e557 100644
--- a/src/net.h
+++ b/src/net.h
@@ -20,6 +20,7 @@
#include <policy/feerate.h>
#include <protocol.h>
#include <random.h>
+#include <span.h>
#include <streams.h>
#include <sync.h>
#include <threadinterrupt.h>
@@ -75,6 +76,8 @@ static constexpr uint64_t DEFAULT_MAX_UPLOAD_TARGET = 0;
static const bool DEFAULT_BLOCKSONLY = false;
/** -peertimeout default */
static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
+/** Number of file descriptors required for message capture **/
+static const int NUM_FDS_MESSAGE_CAPTURE = 1;
static const bool DEFAULT_FORCEDNSSEED = false;
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
@@ -1241,6 +1244,9 @@ inline std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now,
return std::chrono::microseconds{PoissonNextSend(now.count(), average_interval.count())};
}
+/** Dump binary message to file, with timestamp */
+void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming);
+
struct NodeEvictionCandidate
{
NodeId id;
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index b68453759a..2dffbd7620 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -4045,14 +4045,12 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
}
// Don't bother if send buffer is too full to respond anyway
- if (pfrom->fPauseSend)
- return false;
+ if (pfrom->fPauseSend) return false;
std::list<CNetMessage> msgs;
{
LOCK(pfrom->cs_vProcessMsg);
- if (pfrom->vProcessMsg.empty())
- return false;
+ if (pfrom->vProcessMsg.empty()) return false;
// Just take one message
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
@@ -4061,6 +4059,10 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
}
CNetMessage& msg(msgs.front());
+ if (gArgs.GetBoolArg("-capturemessages", false)) {
+ CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv), /* incoming */ true);
+ }
+
msg.SetVersion(pfrom->GetCommonVersion());
const std::string& msg_type = msg.m_command;
diff --git a/test/functional/p2p_message_capture.py b/test/functional/p2p_message_capture.py
new file mode 100755
index 0000000000..113e26c425
--- /dev/null
+++ b/test/functional/p2p_message_capture.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python3
+# Copyright (c) 2020 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+"""Test per-peer message capture capability.
+
+Additionally, the output of contrib/message-capture/message-capture-parser.py should be verified manually.
+"""
+
+import glob
+from io import BytesIO
+import os
+
+from test_framework.p2p import P2PDataStore, MESSAGEMAP
+from test_framework.test_framework import BitcoinTestFramework
+from test_framework.util import assert_equal
+
+TIME_SIZE = 8
+LENGTH_SIZE = 4
+MSGTYPE_SIZE = 12
+
+def mini_parser(dat_file):
+ """Parse a data file created by CaptureMessage.
+
+ From the data file we'll only check the structure.
+
+ We won't care about things like:
+ - Deserializing the payload of the message
+ - This is managed by the deserialize methods in test_framework.messages
+ - The order of the messages
+ - There's no reason why we can't, say, change the order of the messages in the handshake
+ - Message Type
+ - We can add new message types
+
+ We're ignoring these because they're simply too brittle to test here.
+ """
+ with open(dat_file, 'rb') as f_in:
+ # This should have at least one message in it
+ assert(os.fstat(f_in.fileno()).st_size >= TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
+ while True:
+ tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
+ if not tmp_header_raw:
+ break
+ tmp_header = BytesIO(tmp_header_raw)
+ int.from_bytes(tmp_header.read(TIME_SIZE), "little") # type: int
+ raw_msgtype = tmp_header.read(MSGTYPE_SIZE)
+ msgtype = raw_msgtype.split(b'\x00', 1)[0] # type: bytes
+ remainder = raw_msgtype.split(b'\x00', 1)[1]
+ assert(len(msgtype) > 0)
+ assert(msgtype in MESSAGEMAP)
+ assert(len(remainder) == 0 or not remainder.decode().isprintable())
+ length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") # type: int
+ data = f_in.read(length)
+ assert_equal(len(data), length)
+
+
+
+class MessageCaptureTest(BitcoinTestFramework):
+ def set_test_params(self):
+ self.num_nodes = 1
+ self.extra_args = [["-capturemessages"]]
+ self.setup_clean_chain = True
+
+ def run_test(self):
+ capturedir = os.path.join(self.nodes[0].datadir, "regtest/message_capture")
+ # Connect a node so that the handshake occurs
+ self.nodes[0].add_p2p_connection(P2PDataStore())
+ self.nodes[0].disconnect_p2ps()
+ recv_file = glob.glob(os.path.join(capturedir, "*/msgs_recv.dat"))[0]
+ mini_parser(recv_file)
+ sent_file = glob.glob(os.path.join(capturedir, "*/msgs_sent.dat"))[0]
+ mini_parser(sent_file)
+
+
+if __name__ == '__main__':
+ MessageCaptureTest().main()
diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py
index 6ad4e13db2..27a09ef86c 100755
--- a/test/functional/test_framework/messages.py
+++ b/test/functional/test_framework/messages.py
@@ -1273,7 +1273,7 @@ class msg_block:
# for cases where a user needs tighter control over what is sent over the wire
# note that the user must supply the name of the msgtype, and the data
class msg_generic:
- __slots__ = ("msgtype", "data")
+ __slots__ = ("data")
def __init__(self, msgtype, data=None):
self.msgtype = msgtype
diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py
index c652ac0a06..0e6340b69d 100755
--- a/test/functional/test_runner.py
+++ b/test/functional/test_runner.py
@@ -266,6 +266,7 @@ BASE_SCRIPTS = [
'p2p_add_connections.py',
'p2p_unrequested_blocks.py',
'p2p_blockfilters.py',
+ 'p2p_message_capture.py',
'feature_includeconf.py',
'feature_asmap.py',
'mempool_unbroadcast.py',