diff options
Diffstat (limited to 'test/functional/test_framework/mininode.py')
-rwxr-xr-x | test/functional/test_framework/mininode.py | 144 |
1 files changed, 100 insertions, 44 deletions
diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index d57d46f2ff..3b4d05df6c 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -20,21 +20,22 @@ msg_block, msg_tx, msg_headers, etc.: ser_*, deser_*: functions that handle serialization/deserialization """ -import struct -import socket import asyncore -import time -import sys -import random -from .util import hex_str_to_bytes, bytes_to_hex_str -from io import BytesIO from codecs import encode +from collections import defaultdict +import copy import hashlib -from threading import RLock -from threading import Thread +from io import BytesIO import logging -import copy +import random +import socket +import struct +import sys +import time +from threading import RLock, Thread + from test_framework.siphash import siphash256 +from test_framework.util import hex_str_to_bytes, bytes_to_hex_str BIP0031_VERSION = 60000 MY_VERSION = 70014 # past bip-31 for ping/pong @@ -1465,30 +1466,57 @@ class msg_witness_blocktxn(msg_blocktxn): r += self.block_transactions.serialize(with_witness=True) return r -# This is what a callback should look like for NodeConn -# Reimplement the on_* functions to provide handling for events class NodeConnCB(object): + """Callback and helper functions for P2P connection to a bitcoind node. + + Individual testcases should subclass this and override the on_* methods + if they want to alter message handling behaviour. + """ + def __init__(self): - self.verack_received = False + # Track whether we have a P2P connection open to the node + self.connected = False + self.connection = None + + # Track number of messages of each type received and the most recent + # message of each type + self.message_count = defaultdict(int) + self.last_message = {} + + # A count of the number of ping messages we've sent to the node + self.ping_counter = 1 + # deliver_sleep_time is helpful for debugging race conditions in p2p # tests; it causes message delivery to sleep for the specified time # before acquiring the global lock and delivering the next message. self.deliver_sleep_time = None + # Remember the services our peer has advertised self.peer_services = None - self.connection = None - self.ping_counter = 1 - self.last_pong = msg_pong() + + # Message receiving methods def deliver(self, conn, message): + """Receive message and dispatch message to appropriate callback. + + We keep a count of how many of each message type has been received + and the most recent message of each type. + + Optionally waits for deliver_sleep_time before dispatching message. + """ + deliver_sleep = self.get_deliver_sleep_time() if deliver_sleep is not None: time.sleep(deliver_sleep) with mininode_lock: try: - getattr(self, 'on_' + message.command.decode('ascii'))(conn, message) + command = message.command.decode('ascii') + self.message_count[command] += 1 + self.last_message[command] = message + getattr(self, 'on_' + command)(conn, message) except: - logger.exception("ERROR delivering %s" % repr(message)) + print("ERROR delivering %s (%s)" % (repr(message), + sys.exc_info()[0])) def set_deliver_sleep_time(self, value): with mininode_lock: @@ -1498,14 +1526,20 @@ class NodeConnCB(object): with mininode_lock: return self.deliver_sleep_time - # Callbacks which can be overridden by subclasses - ################################################# + # Callback methods. Can be overridden by subclasses in individual test + # cases to provide custom message handling behaviour. + + def on_open(self, conn): + self.connected = True + + def on_close(self, conn): + self.connected = False + self.connection = None def on_addr(self, conn, message): pass def on_alert(self, conn, message): pass def on_block(self, conn, message): pass def on_blocktxn(self, conn, message): pass - def on_close(self, conn): pass def on_cmpctblock(self, conn, message): pass def on_feefilter(self, conn, message): pass def on_getaddr(self, conn, message): pass @@ -1515,7 +1549,7 @@ class NodeConnCB(object): def on_getheaders(self, conn, message): pass def on_headers(self, conn, message): pass def on_mempool(self, conn): pass - def on_open(self, conn): pass + def on_pong(self, conn, message): pass def on_reject(self, conn, message): pass def on_sendcmpct(self, conn, message): pass def on_sendheaders(self, conn, message): pass @@ -1533,9 +1567,6 @@ class NodeConnCB(object): if conn.ver_send > BIP0031_VERSION: conn.send_message(msg_pong(message.nonce)) - def on_pong(self, conn, message): - self.last_pong = message - def on_verack(self, conn, message): conn.ver_recv = conn.ver_send self.verack_received = True @@ -1548,15 +1579,53 @@ class NodeConnCB(object): conn.ver_recv = conn.ver_send conn.nServices = message.nServices - # Helper functions - ################## + # Connection helper methods def add_connection(self, conn): self.connection = conn - # Wrapper for the NodeConn's send_message function + def wait_for_disconnect(self, timeout=60): + test_function = lambda: not self.connected + assert wait_until(test_function, timeout=timeout) + + # Message receiving helper methods + + def sync(self, test_function, timeout=60): + while timeout > 0: + with mininode_lock: + if test_function(): + return + time.sleep(0.05) + timeout -= 0.05 + raise AssertionError("Sync failed to complete") + + def wait_for_block(self, blockhash, timeout=60): + test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash + self.sync(test_function, timeout) + + def wait_for_getdata(self, timeout=60): + test_function = lambda: self.last_message.get("getdata") + self.sync(test_function, timeout) + + def wait_for_getheaders(self, timeout=60): + test_function = lambda: self.last_message.get("getheaders") + self.sync(test_function, timeout) + + def wait_for_inv(self, expected_inv, timeout=60): + test_function = lambda: self.last_message.get("inv") and self.last_message["inv"] != expected_inv + self.sync(test_function, timeout) + + def wait_for_verack(self, timeout=60): + test_function = lambda: self.message_count["verack"] + self.sync(test_function, timeout) + + # Message sending helper functions + def send_message(self, message): - self.connection.send_message(message) + if self.connection: + self.connection.send_message(message) + else: + logger.error("Cannot send message. No connection to node!") def send_and_ping(self, message): self.send_message(message) @@ -1564,28 +1633,15 @@ class NodeConnCB(object): # Sync up with the node def sync_with_ping(self, timeout=60): - def received_pong(): - return (self.last_pong.nonce == self.ping_counter) self.send_message(msg_ping(nonce=self.ping_counter)) - success = wait_until(received_pong, timeout=timeout) + test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter + success = wait_until(test_function, timeout = timeout) if not success: logger.error("sync_with_ping failed!") raise AssertionError("sync_with_ping failed!") self.ping_counter += 1 - return success - # Spin until verack message is received from the node. - # Tests may want to use this as a signal that the test can begin. - # This can be called from the testing thread, so it needs to acquire the - # global lock. - def wait_for_verack(self): - while True: - with mininode_lock: - if self.verack_received: - return - time.sleep(0.05) - # The actual NodeConn class # This class provides an interface for a p2p connection to a specified node class NodeConn(asyncore.dispatcher): |