aboutsummaryrefslogtreecommitdiff
path: root/test/functional/test_framework
diff options
context:
space:
mode:
authorMarcoFalke <falke.marco@gmail.com>2017-11-29 16:29:12 -0500
committerMarcoFalke <falke.marco@gmail.com>2017-11-29 16:29:19 -0500
commit9f2c2dba21855b8cb9b193b1819be73fa4a23a99 (patch)
treed200bb364c984d331dfc9178497c629568a6669e /test/functional/test_framework
parent32c9b570fceaad76536a2c881b4dc1d961d9b306 (diff)
parente9dfa9bccc5cbb6096c60498651b451297f0a931 (diff)
Merge #11712: [tests] Split NodeConn from NodeConnCB
e9dfa9bcc [tests] Move version message sending from NodeConn to NodeConnCB (John Newbery) dad596fc3 [tests] Make NodeConnCB a subclass of NodeConn (John Newbery) e30d40438 [tests] Move only: move NodeConnCB below NodeConn (John Newbery) 4d5059856 [tests] Tidy up mininode (John Newbery) f2ae6f32a [tests] Remove mininode periodic (half-hour) ping messages (John Newbery) ec59523c5 [tests] Remove rpc property from TestNode in p2p-segwit.py. (John Newbery) Pull request description: This is the final step in #11518, except for possibly renaming - for motivation, please see that PR. If this is merged, then migrating the test framework from asyncore to asyncio should be easier (I say should because I haven't dug too deeply into what would be required). Requesting review from @ryanofsky , since he always has good feedback on these refactor PRs, and I'd appreciate his take on this refactor. Note particularly that I've reverted the change suggested here: https://github.com/bitcoin/bitcoin/pull/11182#discussion_r148859555 . The idea, as always, is to present a simple interface to the test writer. Tree-SHA512: 94dd467a13ec799b101108cf47d4dccb6f6240b601e375e3d785313333bbb389c26072a50759aca663bbf3d6c8b867b99e36ae8800ab8ea115e0496c151926ce
Diffstat (limited to 'test/functional/test_framework')
-rwxr-xr-xtest/functional/test_framework/comptool.py125
-rwxr-xr-xtest/functional/test_framework/mininode.py360
-rwxr-xr-xtest/functional/test_framework/test_node.py13
3 files changed, 255 insertions, 243 deletions
diff --git a/test/functional/test_framework/comptool.py b/test/functional/test_framework/comptool.py
index 723826bae4..2f64fba753 100755
--- a/test/functional/test_framework/comptool.py
+++ b/test/functional/test_framework/comptool.py
@@ -43,7 +43,6 @@ class TestNode(NodeConnCB):
def __init__(self, block_store, tx_store):
super().__init__()
- self.conn = None
self.bestblockhash = None
self.block_store = block_store
self.block_request_map = {}
@@ -58,26 +57,23 @@ class TestNode(NodeConnCB):
self.lastInv = []
self.closed = False
- def on_close(self, conn):
+ def on_close(self):
self.closed = True
- def add_connection(self, conn):
- self.conn = conn
-
- def on_headers(self, conn, message):
+ def on_headers(self, message):
if len(message.headers) > 0:
best_header = message.headers[-1]
best_header.calc_sha256()
self.bestblockhash = best_header.sha256
- def on_getheaders(self, conn, message):
+ def on_getheaders(self, message):
response = self.block_store.headers_for(message.locator, message.hashstop)
if response is not None:
- conn.send_message(response)
+ self.send_message(response)
- def on_getdata(self, conn, message):
- [conn.send_message(r) for r in self.block_store.get_blocks(message.inv)]
- [conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
+ def on_getdata(self, message):
+ [self.send_message(r) for r in self.block_store.get_blocks(message.inv)]
+ [self.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
for i in message.inv:
if i.type == 1 or i.type == 1 | (1 << 30): # MSG_TX or MSG_WITNESS_TX
@@ -85,16 +81,16 @@ class TestNode(NodeConnCB):
elif i.type == 2 or i.type == 2 | (1 << 30): # MSG_BLOCK or MSG_WITNESS_BLOCK
self.block_request_map[i.hash] = True
- def on_inv(self, conn, message):
+ def on_inv(self, message):
self.lastInv = [x.hash for x in message.inv]
- def on_pong(self, conn, message):
+ def on_pong(self, message):
try:
del self.pingMap[message.nonce]
except KeyError:
raise AssertionError("Got pong for unknown ping [%s]" % repr(message))
- def on_reject(self, conn, message):
+ def on_reject(self, message):
if message.message == b'tx':
self.tx_reject_map[message.data] = RejectResult(message.code, message.reason)
if message.message == b'block':
@@ -102,30 +98,30 @@ class TestNode(NodeConnCB):
def send_inv(self, obj):
mtype = 2 if isinstance(obj, CBlock) else 1
- self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)]))
+ self.send_message(msg_inv([CInv(mtype, obj.sha256)]))
def send_getheaders(self):
# We ask for headers from their last tip.
m = msg_getheaders()
m.locator = self.block_store.get_locator(self.bestblockhash)
- self.conn.send_message(m)
+ self.send_message(m)
def send_header(self, header):
m = msg_headers()
m.headers.append(header)
- self.conn.send_message(m)
+ self.send_message(m)
# This assumes BIP31
def send_ping(self, nonce):
self.pingMap[nonce] = True
- self.conn.send_message(msg_ping(nonce))
+ self.send_message(msg_ping(nonce))
def received_ping_response(self, nonce):
return nonce not in self.pingMap
def send_mempool(self):
self.lastInv = []
- self.conn.send_message(msg_mempool())
+ self.send_message(msg_mempool())
# TestInstance:
#
@@ -166,8 +162,7 @@ class TestManager():
def __init__(self, testgen, datadir):
self.test_generator = testgen
- self.connections = []
- self.test_nodes = []
+ self.p2p_connections= []
self.block_store = BlockStore(datadir)
self.tx_store = TxStore(datadir)
self.ping_counter = 1
@@ -175,28 +170,24 @@ class TestManager():
def add_all_connections(self, nodes):
for i in range(len(nodes)):
# Create a p2p connection to each node
- test_node = TestNode(self.block_store, self.tx_store)
- self.test_nodes.append(test_node)
- self.connections.append(NodeConn('127.0.0.1', p2p_port(i), test_node))
- # Make sure the TestNode (callback class) has a reference to its
- # associated NodeConn
- test_node.add_connection(self.connections[-1])
+ node = TestNode(self.block_store, self.tx_store)
+ node.peer_connect('127.0.0.1', p2p_port(i))
+ self.p2p_connections.append(node)
def clear_all_connections(self):
- self.connections = []
- self.test_nodes = []
+ self.p2p_connections = []
def wait_for_disconnections(self):
def disconnected():
- return all(node.closed for node in self.test_nodes)
+ return all(node.closed for node in self.p2p_connections)
wait_until(disconnected, timeout=10, lock=mininode_lock)
def wait_for_verack(self):
- return all(node.wait_for_verack() for node in self.test_nodes)
+ return all(node.wait_for_verack() for node in self.p2p_connections)
def wait_for_pings(self, counter):
def received_pongs():
- return all(node.received_ping_response(counter) for node in self.test_nodes)
+ return all(node.received_ping_response(counter) for node in self.p2p_connections)
wait_until(received_pongs, lock=mininode_lock)
# sync_blocks: Wait for all connections to request the blockhash given
@@ -206,17 +197,17 @@ class TestManager():
def blocks_requested():
return all(
blockhash in node.block_request_map and node.block_request_map[blockhash]
- for node in self.test_nodes
+ for node in self.p2p_connections
)
# --> error if not requested
wait_until(blocks_requested, attempts=20*num_blocks, lock=mininode_lock)
# Send getheaders message
- [ c.cb.send_getheaders() for c in self.connections ]
+ [ c.send_getheaders() for c in self.p2p_connections ]
# Send ping and wait for response -- synchronization hack
- [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
+ [ c.send_ping(self.ping_counter) for c in self.p2p_connections ]
self.wait_for_pings(self.ping_counter)
self.ping_counter += 1
@@ -226,42 +217,42 @@ class TestManager():
def transaction_requested():
return all(
txhash in node.tx_request_map and node.tx_request_map[txhash]
- for node in self.test_nodes
+ for node in self.p2p_connections
)
# --> error if not requested
wait_until(transaction_requested, attempts=20*num_events, lock=mininode_lock)
# Get the mempool
- [ c.cb.send_mempool() for c in self.connections ]
+ [ c.send_mempool() for c in self.p2p_connections ]
# Send ping and wait for response -- synchronization hack
- [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
+ [ c.send_ping(self.ping_counter) for c in self.p2p_connections ]
self.wait_for_pings(self.ping_counter)
self.ping_counter += 1
# Sort inv responses from each node
with mininode_lock:
- [ c.cb.lastInv.sort() for c in self.connections ]
+ [ c.lastInv.sort() for c in self.p2p_connections ]
# Verify that the tip of each connection all agree with each other, and
# with the expected outcome (if given)
def check_results(self, blockhash, outcome):
with mininode_lock:
- for c in self.connections:
+ for c in self.p2p_connections:
if outcome is None:
- if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
+ if c.bestblockhash != self.p2p_connections[0].bestblockhash:
return False
elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code
- if c.cb.bestblockhash == blockhash:
+ if c.bestblockhash == blockhash:
return False
- if blockhash not in c.cb.block_reject_map:
+ if blockhash not in c.block_reject_map:
logger.error('Block not in reject map: %064x' % (blockhash))
return False
- if not outcome.match(c.cb.block_reject_map[blockhash]):
- logger.error('Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash))
+ if not outcome.match(c.block_reject_map[blockhash]):
+ logger.error('Block rejected with %s instead of expected %s: %064x' % (c.block_reject_map[blockhash], outcome, blockhash))
return False
- elif ((c.cb.bestblockhash == blockhash) != outcome):
+ elif ((c.bestblockhash == blockhash) != outcome):
return False
return True
@@ -273,21 +264,21 @@ class TestManager():
# a particular tx's existence in the mempool is the same across all nodes.
def check_mempool(self, txhash, outcome):
with mininode_lock:
- for c in self.connections:
+ for c in self.p2p_connections:
if outcome is None:
# Make sure the mempools agree with each other
- if c.cb.lastInv != self.connections[0].cb.lastInv:
+ if c.lastInv != self.p2p_connections[0].lastInv:
return False
elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code
- if txhash in c.cb.lastInv:
+ if txhash in c.lastInv:
return False
- if txhash not in c.cb.tx_reject_map:
+ if txhash not in c.tx_reject_map:
logger.error('Tx not in reject map: %064x' % (txhash))
return False
- if not outcome.match(c.cb.tx_reject_map[txhash]):
- logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash))
+ if not outcome.match(c.tx_reject_map[txhash]):
+ logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.tx_reject_map[txhash], outcome, txhash))
return False
- elif ((txhash in c.cb.lastInv) != outcome):
+ elif ((txhash in c.lastInv) != outcome):
return False
return True
@@ -332,25 +323,25 @@ class TestManager():
first_block_with_hash = False
with mininode_lock:
self.block_store.add_block(block)
- for c in self.connections:
- if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True:
+ for c in self.p2p_connections:
+ if first_block_with_hash and block.sha256 in c.block_request_map and c.block_request_map[block.sha256] == True:
# There was a previous request for this block hash
# Most likely, we delivered a header for this block
# but never had the block to respond to the getdata
c.send_message(msg_block(block))
else:
- c.cb.block_request_map[block.sha256] = False
+ c.block_request_map[block.sha256] = False
# Either send inv's to each node and sync, or add
# to invqueue for later inv'ing.
if (test_instance.sync_every_block):
# if we expect success, send inv and sync every block
# if we expect failure, just push the block and see what happens.
if outcome == True:
- [ c.cb.send_inv(block) for c in self.connections ]
+ [ c.send_inv(block) for c in self.p2p_connections ]
self.sync_blocks(block.sha256, 1)
else:
- [ c.send_message(msg_block(block)) for c in self.connections ]
- [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
+ [ c.send_message(msg_block(block)) for c in self.p2p_connections ]
+ [ c.send_ping(self.ping_counter) for c in self.p2p_connections ]
self.wait_for_pings(self.ping_counter)
self.ping_counter += 1
if (not self.check_results(tip, outcome)):
@@ -360,7 +351,7 @@ class TestManager():
elif isinstance(b_or_t, CBlockHeader):
block_header = b_or_t
self.block_store.add_header(block_header)
- [ c.cb.send_header(block_header) for c in self.connections ]
+ [ c.send_header(block_header) for c in self.p2p_connections ]
else: # Tx test runner
assert(isinstance(b_or_t, CTransaction))
@@ -369,11 +360,11 @@ class TestManager():
# Add to shared tx store and clear map entry
with mininode_lock:
self.tx_store.add_transaction(tx)
- for c in self.connections:
- c.cb.tx_request_map[tx.sha256] = False
+ for c in self.p2p_connections:
+ c.tx_request_map[tx.sha256] = False
# Again, either inv to all nodes or save for later
if (test_instance.sync_every_tx):
- [ c.cb.send_inv(tx) for c in self.connections ]
+ [ c.send_inv(tx) for c in self.p2p_connections ]
self.sync_transaction(tx.sha256, 1)
if (not self.check_mempool(tx.sha256, outcome)):
raise AssertionError("Test failed at test %d" % test_number)
@@ -381,26 +372,26 @@ class TestManager():
invqueue.append(CInv(1, tx.sha256))
# Ensure we're not overflowing the inv queue
if len(invqueue) == MAX_INV_SZ:
- [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
+ [ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ]
invqueue = []
# Do final sync if we weren't syncing on every block or every tx.
if (not test_instance.sync_every_block and block is not None):
if len(invqueue) > 0:
- [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
+ [ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ]
invqueue = []
self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions))
if (not self.check_results(tip, block_outcome)):
raise AssertionError("Block test failed at test %d" % test_number)
if (not test_instance.sync_every_tx and tx is not None):
if len(invqueue) > 0:
- [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
+ [ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ]
invqueue = []
self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions))
if (not self.check_mempool(tx.sha256, tx_outcome)):
raise AssertionError("Mempool test failed at test %d" % test_number)
- [ c.disconnect_node() for c in self.connections ]
+ [ c.disconnect_node() for c in self.p2p_connections ]
self.wait_for_disconnections()
self.block_store.close()
self.tx_store.close()
diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py
index 24c96b5681..c580d99c79 100755
--- a/test/functional/test_framework/mininode.py
+++ b/test/functional/test_framework/mininode.py
@@ -20,10 +20,10 @@ import logging
import socket
import struct
import sys
-import time
from threading import RLock, Thread
from test_framework.messages import *
+from test_framework.util import wait_until
logger = logging.getLogger("TestFramework.mininode")
@@ -57,173 +57,34 @@ MAGIC_BYTES = {
"regtest": b"\xfa\xbf\xb5\xda", # regtest
}
-class NodeConnCB():
- """Callback and helper functions for P2P connection to a bitcoind node.
-
- Individual testcases should subclass this and override the on_* methods
- if they want to alter message handling behaviour."""
- def __init__(self):
- # Track whether we have a P2P connection open to the node
- self.connected = False
- self.connection = None
-
- # Track number of messages of each type received and the most recent
- # message of each type
- self.message_count = defaultdict(int)
- self.last_message = {}
-
- # A count of the number of ping messages we've sent to the node
- self.ping_counter = 1
-
- # Message receiving methods
-
- def deliver(self, conn, message):
- """Receive message and dispatch message to appropriate callback.
-
- We keep a count of how many of each message type has been received
- and the most recent message of each type."""
- with mininode_lock:
- try:
- command = message.command.decode('ascii')
- self.message_count[command] += 1
- self.last_message[command] = message
- getattr(self, 'on_' + command)(conn, message)
- except:
- print("ERROR delivering %s (%s)" % (repr(message),
- sys.exc_info()[0]))
- raise
-
- # Callback methods. Can be overridden by subclasses in individual test
- # cases to provide custom message handling behaviour.
-
- def on_open(self, conn):
- self.connected = True
-
- def on_close(self, conn):
- self.connected = False
- self.connection = None
-
- def on_addr(self, conn, message): pass
- def on_block(self, conn, message): pass
- def on_blocktxn(self, conn, message): pass
- def on_cmpctblock(self, conn, message): pass
- def on_feefilter(self, conn, message): pass
- def on_getaddr(self, conn, message): pass
- def on_getblocks(self, conn, message): pass
- def on_getblocktxn(self, conn, message): pass
- def on_getdata(self, conn, message): pass
- def on_getheaders(self, conn, message): pass
- def on_headers(self, conn, message): pass
- def on_mempool(self, conn): pass
- def on_pong(self, conn, message): pass
- def on_reject(self, conn, message): pass
- def on_sendcmpct(self, conn, message): pass
- def on_sendheaders(self, conn, message): pass
- def on_tx(self, conn, message): pass
-
- def on_inv(self, conn, message):
- want = msg_getdata()
- for i in message.inv:
- if i.type != 0:
- want.inv.append(i)
- if len(want.inv):
- conn.send_message(want)
-
- def on_ping(self, conn, message):
- conn.send_message(msg_pong(message.nonce))
-
- def on_verack(self, conn, message):
- self.verack_received = True
-
- def on_version(self, conn, message):
- assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED)
- conn.send_message(msg_verack())
- conn.nServices = message.nServices
-
- # Connection helper methods
-
- def add_connection(self, conn):
- self.connection = conn
-
- def wait_for_disconnect(self, timeout=60):
- test_function = lambda: not self.connected
- wait_until(test_function, timeout=timeout, lock=mininode_lock)
-
- # Message receiving helper methods
-
- def wait_for_block(self, blockhash, timeout=60):
- test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash
- wait_until(test_function, timeout=timeout, lock=mininode_lock)
-
- def wait_for_getdata(self, timeout=60):
- test_function = lambda: self.last_message.get("getdata")
- wait_until(test_function, timeout=timeout, lock=mininode_lock)
-
- def wait_for_getheaders(self, timeout=60):
- test_function = lambda: self.last_message.get("getheaders")
- wait_until(test_function, timeout=timeout, lock=mininode_lock)
-
- def wait_for_inv(self, expected_inv, timeout=60):
- """Waits for an INV message and checks that the first inv object in the message was as expected."""
- if len(expected_inv) > 1:
- raise NotImplementedError("wait_for_inv() will only verify the first inv object")
- test_function = lambda: self.last_message.get("inv") and \
- self.last_message["inv"].inv[0].type == expected_inv[0].type and \
- self.last_message["inv"].inv[0].hash == expected_inv[0].hash
- wait_until(test_function, timeout=timeout, lock=mininode_lock)
-
- def wait_for_verack(self, timeout=60):
- test_function = lambda: self.message_count["verack"]
- wait_until(test_function, timeout=timeout, lock=mininode_lock)
-
- # Message sending helper functions
+class NodeConn(asyncore.dispatcher):
+ """A low-level connection object to a node's P2P interface.
- def send_message(self, message):
- if self.connection:
- self.connection.send_message(message)
- else:
- logger.error("Cannot send message. No connection to node!")
+ This class is responsible for:
- def send_and_ping(self, message):
- self.send_message(message)
- self.sync_with_ping()
+ - opening and closing the TCP connection to the node
+ - reading bytes from and writing bytes to the socket
+ - deserializing and serializing the P2P message header
+ - logging messages as they are sent and received
- # Sync up with the node
- def sync_with_ping(self, timeout=60):
- self.send_message(msg_ping(nonce=self.ping_counter))
- test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter
- wait_until(test_function, timeout=timeout, lock=mininode_lock)
- self.ping_counter += 1
+ This class contains no logic for handing the P2P message payloads. It must be
+ sub-classed and the on_message() callback overridden.
-class NodeConn(asyncore.dispatcher):
- """The actual NodeConn class
+ TODO: rename this class P2PConnection."""
- This class provides an interface for a p2p connection to a specified node."""
+ def __init__(self):
+ super().__init__(map=mininode_socket_map)
- def __init__(self, dstaddr, dstport, callback, net="regtest", services=NODE_NETWORK|NODE_WITNESS, send_version=True):
- asyncore.dispatcher.__init__(self, map=mininode_socket_map)
+ def peer_connect(self, dstaddr, dstport, net="regtest"):
self.dstaddr = dstaddr
self.dstport = dstport
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sendbuf = b""
self.recvbuf = b""
- self.last_sent = 0
self.state = "connecting"
self.network = net
- self.cb = callback
self.disconnect = False
- self.nServices = 0
-
- if send_version:
- # stuff version msg into sendbuf
- vt = msg_version()
- vt.nServices = services
- vt.addrTo.ip = self.dstaddr
- vt.addrTo.port = self.dstport
- vt.addrFrom.ip = "0.0.0.0"
- vt.addrFrom.port = 0
- self.send_message(vt, True)
logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport))
@@ -232,15 +93,22 @@ class NodeConn(asyncore.dispatcher):
except:
self.handle_close()
+ def peer_disconnect(self):
+ # Connection could have already been closed by other end.
+ if self.state == "connected":
+ self.disconnect_node()
+
# Connection and disconnection methods
def handle_connect(self):
+ """asyncore callback when a connection is opened."""
if self.state != "connected":
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self.state = "connected"
- self.cb.on_open(self)
+ self.on_open()
def handle_close(self):
+ """asyncore callback when a connection is closed."""
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
self.state = "closed"
self.recvbuf = b""
@@ -249,10 +117,10 @@ class NodeConn(asyncore.dispatcher):
self.close()
except:
pass
- self.cb.on_close(self)
+ self.on_close()
def disconnect_node(self):
- """ Disconnect the p2p connection.
+ """Disconnect the p2p connection.
Called by the test logic thread. Causes the p2p connection
to be disconnected on the next iteration of the asyncore loop."""
@@ -260,16 +128,19 @@ class NodeConn(asyncore.dispatcher):
# Socket read methods
- def readable(self):
- return True
-
def handle_read(self):
+ """asyncore callback when data is read from the socket."""
t = self.recv(8192)
if len(t) > 0:
self.recvbuf += t
- self.got_data()
+ self._on_data()
+
+ def _on_data(self):
+ """Try to read P2P messages from the recv buffer.
- def got_data(self):
+ This method reads data from the buffer in a loop. It deserializes,
+ parses and verifies the P2P header, then passes the P2P payload to
+ the on_message callback for processing."""
try:
while True:
if len(self.recvbuf) < 4:
@@ -294,26 +165,27 @@ class NodeConn(asyncore.dispatcher):
f = BytesIO(msg)
t = MESSAGEMAP[command]()
t.deserialize(f)
- self.got_message(t)
+ self._log_message("receive", t)
+ self.on_message(t)
except Exception as e:
logger.exception('Error reading message:', repr(e))
raise
- def got_message(self, message):
- if self.last_sent + 30 * 60 < time.time():
- self.send_message(MESSAGEMAP[b'ping']())
- self._log_message("receive", message)
- self.cb.deliver(self, message)
+ def on_message(self, message):
+ """Callback for processing a P2P payload. Must be overridden by derived class."""
+ raise NotImplementedError
# Socket write methods
def writable(self):
+ """asyncore method to determine whether the handle_write() callback should be called on the next loop."""
with mininode_lock:
pre_connection = self.state == "connecting"
length = len(self.sendbuf)
return (length > 0 or pre_connection)
def handle_write(self):
+ """asyncore callback when data should be written to the socket."""
with mininode_lock:
# asyncore does not expose socket connection, only the first read/write
# event, thus we must check connection manually here to know when we
@@ -331,6 +203,10 @@ class NodeConn(asyncore.dispatcher):
self.sendbuf = self.sendbuf[sent:]
def send_message(self, message, pushbuf=False):
+ """Send a P2P message over the socket.
+
+ This method takes a P2P payload, builds the P2P header and adds
+ the message to the send buffer to be sent over the socket."""
if self.state != "connected" and not pushbuf:
raise IOError('Not connected, no pushbuf')
self._log_message("send", message)
@@ -353,11 +229,11 @@ class NodeConn(asyncore.dispatcher):
self.sendbuf = tmsg
else:
self.sendbuf += tmsg
- self.last_sent = time.time()
# Class utility methods
def _log_message(self, direction, msg):
+ """Logs a message being sent or received over the connection."""
if direction == "send":
log_message = "Send message to "
elif direction == "receive":
@@ -368,6 +244,154 @@ class NodeConn(asyncore.dispatcher):
logger.debug(log_message)
+class NodeConnCB(NodeConn):
+ """A high-level P2P interface class for communicating with a Bitcoin node.
+
+ This class provides high-level callbacks for processing P2P message
+ payloads, as well as convenience methods for interacting with the
+ node over P2P.
+
+ Individual testcases should subclass this and override the on_* methods
+ if they want to alter message handling behaviour.
+
+ TODO: rename this class P2PInterface"""
+ def __init__(self):
+ super().__init__()
+
+ # Track number of messages of each type received and the most recent
+ # message of each type
+ self.message_count = defaultdict(int)
+ self.last_message = {}
+
+ # A count of the number of ping messages we've sent to the node
+ self.ping_counter = 1
+
+ # The network services received from the peer
+ self.nServices = 0
+
+ def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs):
+ super().peer_connect(*args, **kwargs)
+
+ if send_version:
+ # Send a version msg
+ vt = msg_version()
+ vt.nServices = services
+ vt.addrTo.ip = self.dstaddr
+ vt.addrTo.port = self.dstport
+ vt.addrFrom.ip = "0.0.0.0"
+ vt.addrFrom.port = 0
+ self.send_message(vt, True)
+
+ # Message receiving methods
+
+ def on_message(self, message):
+ """Receive message and dispatch message to appropriate callback.
+
+ We keep a count of how many of each message type has been received
+ and the most recent message of each type."""
+ with mininode_lock:
+ try:
+ command = message.command.decode('ascii')
+ self.message_count[command] += 1
+ self.last_message[command] = message
+ getattr(self, 'on_' + command)(message)
+ except:
+ print("ERROR delivering %s (%s)" % (repr(message), sys.exc_info()[0]))
+ raise
+
+ # Callback methods. Can be overridden by subclasses in individual test
+ # cases to provide custom message handling behaviour.
+
+ def on_open(self):
+ pass
+
+ def on_close(self):
+ pass
+
+ def on_addr(self, message): pass
+ def on_block(self, message): pass
+ def on_blocktxn(self, message): pass
+ def on_cmpctblock(self, message): pass
+ def on_feefilter(self, message): pass
+ def on_getaddr(self, message): pass
+ def on_getblocks(self, message): pass
+ def on_getblocktxn(self, message): pass
+ def on_getdata(self, message): pass
+ def on_getheaders(self, message): pass
+ def on_headers(self, message): pass
+ def on_mempool(self, message): pass
+ def on_pong(self, message): pass
+ def on_reject(self, message): pass
+ def on_sendcmpct(self, message): pass
+ def on_sendheaders(self, message): pass
+ def on_tx(self, message): pass
+
+ def on_inv(self, message):
+ want = msg_getdata()
+ for i in message.inv:
+ if i.type != 0:
+ want.inv.append(i)
+ if len(want.inv):
+ self.send_message(want)
+
+ def on_ping(self, message):
+ self.send_message(msg_pong(message.nonce))
+
+ def on_verack(self, message):
+ self.verack_received = True
+
+ def on_version(self, message):
+ assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED)
+ self.send_message(msg_verack())
+ self.nServices = message.nServices
+
+ # Connection helper methods
+
+ def wait_for_disconnect(self, timeout=60):
+ test_function = lambda: self.state != "connected"
+ wait_until(test_function, timeout=timeout, lock=mininode_lock)
+
+ # Message receiving helper methods
+
+ def wait_for_block(self, blockhash, timeout=60):
+ test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash
+ wait_until(test_function, timeout=timeout, lock=mininode_lock)
+
+ def wait_for_getdata(self, timeout=60):
+ test_function = lambda: self.last_message.get("getdata")
+ wait_until(test_function, timeout=timeout, lock=mininode_lock)
+
+ def wait_for_getheaders(self, timeout=60):
+ test_function = lambda: self.last_message.get("getheaders")
+ wait_until(test_function, timeout=timeout, lock=mininode_lock)
+
+ def wait_for_inv(self, expected_inv, timeout=60):
+ """Waits for an INV message and checks that the first inv object in the message was as expected."""
+ if len(expected_inv) > 1:
+ raise NotImplementedError("wait_for_inv() will only verify the first inv object")
+ test_function = lambda: self.last_message.get("inv") and \
+ self.last_message["inv"].inv[0].type == expected_inv[0].type and \
+ self.last_message["inv"].inv[0].hash == expected_inv[0].hash
+ wait_until(test_function, timeout=timeout, lock=mininode_lock)
+
+ def wait_for_verack(self, timeout=60):
+ test_function = lambda: self.message_count["verack"]
+ wait_until(test_function, timeout=timeout, lock=mininode_lock)
+
+ # Message sending helper functions
+
+ def send_and_ping(self, message):
+ self.send_message(message)
+ self.sync_with_ping()
+
+ # Sync up with the node
+ def sync_with_ping(self, timeout=60):
+ self.send_message(msg_ping(nonce=self.ping_counter))
+ test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter
+ wait_until(test_function, timeout=timeout, lock=mininode_lock)
+ self.ping_counter += 1
+
+
# Keep our own socket map for asyncore, so that we can track disconnects
# ourselves (to workaround an issue with closing an asyncore socket when
# using select)
diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py
index 34b458482a..a9248c764e 100755
--- a/test/functional/test_framework/test_node.py
+++ b/test/functional/test_framework/test_node.py
@@ -14,7 +14,6 @@ import subprocess
import time
from .authproxy import JSONRPCException
-from .mininode import NodeConn
from .util import (
assert_equal,
get_rpc_proxy,
@@ -158,7 +157,7 @@ class TestNode():
self.encryptwallet(passphrase)
self.wait_until_stopped()
- def add_p2p_connection(self, p2p_conn, **kwargs):
+ def add_p2p_connection(self, p2p_conn, *args, **kwargs):
"""Add a p2p connection to the node.
This method adds the p2p connection to the self.p2ps list and also
@@ -167,9 +166,9 @@ class TestNode():
kwargs['dstport'] = p2p_port(self.index)
if 'dstaddr' not in kwargs:
kwargs['dstaddr'] = '127.0.0.1'
+
+ p2p_conn.peer_connect(*args, **kwargs)
self.p2ps.append(p2p_conn)
- kwargs.update({'callback': p2p_conn})
- p2p_conn.add_connection(NodeConn(**kwargs))
return p2p_conn
@@ -185,10 +184,8 @@ class TestNode():
def disconnect_p2ps(self):
"""Close all p2p connections to the node."""
for p in self.p2ps:
- # Connection could have already been closed by other end.
- if p.connection is not None:
- p.connection.disconnect_node()
- self.p2ps = []
+ p.peer_disconnect()
+ del self.p2ps[:]
class TestNodeCLI():