From f2ae6f32a6e3e90d77564758383b9afbbac890b7 Mon Sep 17 00:00:00 2001 From: John Newbery Date: Thu, 23 Nov 2017 09:36:35 -0500 Subject: [tests] Remove mininode periodic (half-hour) ping messages --- test/functional/test_framework/mininode.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'test/functional/test_framework') diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 24c96b5681..c8bd21e51f 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -20,7 +20,6 @@ import logging import socket import struct import sys -import time from threading import RLock, Thread from test_framework.messages import * @@ -208,7 +207,6 @@ class NodeConn(asyncore.dispatcher): self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.sendbuf = b"" self.recvbuf = b"" - self.last_sent = 0 self.state = "connecting" self.network = net self.cb = callback @@ -300,8 +298,6 @@ class NodeConn(asyncore.dispatcher): raise def got_message(self, message): - if self.last_sent + 30 * 60 < time.time(): - self.send_message(MESSAGEMAP[b'ping']()) self._log_message("receive", message) self.cb.deliver(self, message) @@ -353,7 +349,6 @@ class NodeConn(asyncore.dispatcher): self.sendbuf = tmsg else: self.sendbuf += tmsg - self.last_sent = time.time() # Class utility methods -- cgit v1.2.3 From 4d50598569fec0a4be4adef978a593aa71e87d02 Mon Sep 17 00:00:00 2001 From: John Newbery Date: Thu, 23 Nov 2017 09:47:11 -0500 Subject: [tests] Tidy up mininode Add docstrings and renames some methods. Also removes the redundant NodeConn.readable() method override. --- test/functional/test_framework/mininode.py | 35 ++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 11 deletions(-) (limited to 'test/functional/test_framework') diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index c8bd21e51f..838c885735 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -76,7 +76,7 @@ class NodeConnCB(): # Message receiving methods - def deliver(self, conn, message): + def on_message(self, conn, message): """Receive message and dispatch message to appropriate callback. We keep a count of how many of each message type has been received @@ -233,12 +233,14 @@ class NodeConn(asyncore.dispatcher): # Connection and disconnection methods def handle_connect(self): + """asyncore callback when a connection is opened.""" if self.state != "connected": logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) self.state = "connected" self.cb.on_open(self) def handle_close(self): + """asyncore callback when a connection is closed.""" logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport)) self.state = "closed" self.recvbuf = b"" @@ -250,7 +252,7 @@ class NodeConn(asyncore.dispatcher): self.cb.on_close(self) def disconnect_node(self): - """ Disconnect the p2p connection. + """Disconnect the p2p connection. Called by the test logic thread. Causes the p2p connection to be disconnected on the next iteration of the asyncore loop.""" @@ -258,16 +260,19 @@ class NodeConn(asyncore.dispatcher): # Socket read methods - def readable(self): - return True - def handle_read(self): + """asyncore callback when data is read from the socket.""" t = self.recv(8192) if len(t) > 0: self.recvbuf += t - self.got_data() + self._on_data() + + def _on_data(self): + """Try to read P2P messages from the recv buffer. - def got_data(self): + This method reads data from the buffer in a loop. It deserializes, + parses and verifies the P2P header, then passes the P2P payload to + the on_message callback for processing.""" try: while True: if len(self.recvbuf) < 4: @@ -292,24 +297,27 @@ class NodeConn(asyncore.dispatcher): f = BytesIO(msg) t = MESSAGEMAP[command]() t.deserialize(f) - self.got_message(t) + self._log_message("receive", t) + self.on_message(t) except Exception as e: logger.exception('Error reading message:', repr(e)) raise - def got_message(self, message): - self._log_message("receive", message) - self.cb.deliver(self, message) + def on_message(self, message): + """Callback for processing a P2P payload. Calls into NodeConnCB.""" + self.cb.on_message(self, message) # Socket write methods def writable(self): + """asyncore method to determine whether the handle_write() callback should be called on the next loop.""" with mininode_lock: pre_connection = self.state == "connecting" length = len(self.sendbuf) return (length > 0 or pre_connection) def handle_write(self): + """asyncore callback when data should be written to the socket.""" with mininode_lock: # asyncore does not expose socket connection, only the first read/write # event, thus we must check connection manually here to know when we @@ -327,6 +335,10 @@ class NodeConn(asyncore.dispatcher): self.sendbuf = self.sendbuf[sent:] def send_message(self, message, pushbuf=False): + """Send a P2P message over the socket. + + This method takes a P2P payload, builds the P2P header and adds + the message to the send buffer to be sent over the socket.""" if self.state != "connected" and not pushbuf: raise IOError('Not connected, no pushbuf') self._log_message("send", message) @@ -353,6 +365,7 @@ class NodeConn(asyncore.dispatcher): # Class utility methods def _log_message(self, direction, msg): + """Logs a message being sent or received over the connection.""" if direction == "send": log_message = "Send message to " elif direction == "receive": -- cgit v1.2.3 From e30d404385f46811eeeea05c55ef786bc4adcb77 Mon Sep 17 00:00:00 2001 From: John Newbery Date: Thu, 23 Nov 2017 10:17:50 -0500 Subject: [tests] Move only: move NodeConnCB below NodeConn This is required since NodeConnCB will inherit from NodeConn after the next commit. --- test/functional/test_framework/mininode.py | 277 +++++++++++++++-------------- 1 file changed, 139 insertions(+), 138 deletions(-) (limited to 'test/functional/test_framework') diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 838c885735..2b888949f4 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -56,144 +56,6 @@ MAGIC_BYTES = { "regtest": b"\xfa\xbf\xb5\xda", # regtest } -class NodeConnCB(): - """Callback and helper functions for P2P connection to a bitcoind node. - - Individual testcases should subclass this and override the on_* methods - if they want to alter message handling behaviour.""" - def __init__(self): - # Track whether we have a P2P connection open to the node - self.connected = False - self.connection = None - - # Track number of messages of each type received and the most recent - # message of each type - self.message_count = defaultdict(int) - self.last_message = {} - - # A count of the number of ping messages we've sent to the node - self.ping_counter = 1 - - # Message receiving methods - - def on_message(self, conn, message): - """Receive message and dispatch message to appropriate callback. - - We keep a count of how many of each message type has been received - and the most recent message of each type.""" - with mininode_lock: - try: - command = message.command.decode('ascii') - self.message_count[command] += 1 - self.last_message[command] = message - getattr(self, 'on_' + command)(conn, message) - except: - print("ERROR delivering %s (%s)" % (repr(message), - sys.exc_info()[0])) - raise - - # Callback methods. Can be overridden by subclasses in individual test - # cases to provide custom message handling behaviour. - - def on_open(self, conn): - self.connected = True - - def on_close(self, conn): - self.connected = False - self.connection = None - - def on_addr(self, conn, message): pass - def on_block(self, conn, message): pass - def on_blocktxn(self, conn, message): pass - def on_cmpctblock(self, conn, message): pass - def on_feefilter(self, conn, message): pass - def on_getaddr(self, conn, message): pass - def on_getblocks(self, conn, message): pass - def on_getblocktxn(self, conn, message): pass - def on_getdata(self, conn, message): pass - def on_getheaders(self, conn, message): pass - def on_headers(self, conn, message): pass - def on_mempool(self, conn): pass - def on_pong(self, conn, message): pass - def on_reject(self, conn, message): pass - def on_sendcmpct(self, conn, message): pass - def on_sendheaders(self, conn, message): pass - def on_tx(self, conn, message): pass - - def on_inv(self, conn, message): - want = msg_getdata() - for i in message.inv: - if i.type != 0: - want.inv.append(i) - if len(want.inv): - conn.send_message(want) - - def on_ping(self, conn, message): - conn.send_message(msg_pong(message.nonce)) - - def on_verack(self, conn, message): - self.verack_received = True - - def on_version(self, conn, message): - assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED) - conn.send_message(msg_verack()) - conn.nServices = message.nServices - - # Connection helper methods - - def add_connection(self, conn): - self.connection = conn - - def wait_for_disconnect(self, timeout=60): - test_function = lambda: not self.connected - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - # Message receiving helper methods - - def wait_for_block(self, blockhash, timeout=60): - test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_getdata(self, timeout=60): - test_function = lambda: self.last_message.get("getdata") - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_getheaders(self, timeout=60): - test_function = lambda: self.last_message.get("getheaders") - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_inv(self, expected_inv, timeout=60): - """Waits for an INV message and checks that the first inv object in the message was as expected.""" - if len(expected_inv) > 1: - raise NotImplementedError("wait_for_inv() will only verify the first inv object") - test_function = lambda: self.last_message.get("inv") and \ - self.last_message["inv"].inv[0].type == expected_inv[0].type and \ - self.last_message["inv"].inv[0].hash == expected_inv[0].hash - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_verack(self, timeout=60): - test_function = lambda: self.message_count["verack"] - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - # Message sending helper functions - - def send_message(self, message): - if self.connection: - self.connection.send_message(message) - else: - logger.error("Cannot send message. No connection to node!") - - def send_and_ping(self, message): - self.send_message(message) - self.sync_with_ping() - - # Sync up with the node - def sync_with_ping(self, timeout=60): - self.send_message(msg_ping(nonce=self.ping_counter)) - test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter - wait_until(test_function, timeout=timeout, lock=mininode_lock) - self.ping_counter += 1 - class NodeConn(asyncore.dispatcher): """The actual NodeConn class @@ -376,6 +238,145 @@ class NodeConn(asyncore.dispatcher): logger.debug(log_message) +class NodeConnCB(): + """Callback and helper functions for P2P connection to a bitcoind node. + + Individual testcases should subclass this and override the on_* methods + if they want to alter message handling behaviour.""" + def __init__(self): + # Track whether we have a P2P connection open to the node + self.connected = False + self.connection = None + + # Track number of messages of each type received and the most recent + # message of each type + self.message_count = defaultdict(int) + self.last_message = {} + + # A count of the number of ping messages we've sent to the node + self.ping_counter = 1 + + # Message receiving methods + + def on_message(self, conn, message): + """Receive message and dispatch message to appropriate callback. + + We keep a count of how many of each message type has been received + and the most recent message of each type.""" + with mininode_lock: + try: + command = message.command.decode('ascii') + self.message_count[command] += 1 + self.last_message[command] = message + getattr(self, 'on_' + command)(conn, message) + except: + print("ERROR delivering %s (%s)" % (repr(message), + sys.exc_info()[0])) + raise + + # Callback methods. Can be overridden by subclasses in individual test + # cases to provide custom message handling behaviour. + + def on_open(self, conn): + self.connected = True + + def on_close(self, conn): + self.connected = False + self.connection = None + + def on_addr(self, conn, message): pass + def on_block(self, conn, message): pass + def on_blocktxn(self, conn, message): pass + def on_cmpctblock(self, conn, message): pass + def on_feefilter(self, conn, message): pass + def on_getaddr(self, conn, message): pass + def on_getblocks(self, conn, message): pass + def on_getblocktxn(self, conn, message): pass + def on_getdata(self, conn, message): pass + def on_getheaders(self, conn, message): pass + def on_headers(self, conn, message): pass + def on_mempool(self, conn): pass + def on_pong(self, conn, message): pass + def on_reject(self, conn, message): pass + def on_sendcmpct(self, conn, message): pass + def on_sendheaders(self, conn, message): pass + def on_tx(self, conn, message): pass + + def on_inv(self, conn, message): + want = msg_getdata() + for i in message.inv: + if i.type != 0: + want.inv.append(i) + if len(want.inv): + conn.send_message(want) + + def on_ping(self, conn, message): + conn.send_message(msg_pong(message.nonce)) + + def on_verack(self, conn, message): + self.verack_received = True + + def on_version(self, conn, message): + assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED) + conn.send_message(msg_verack()) + conn.nServices = message.nServices + + # Connection helper methods + + def add_connection(self, conn): + self.connection = conn + + def wait_for_disconnect(self, timeout=60): + test_function = lambda: not self.connected + wait_until(test_function, timeout=timeout, lock=mininode_lock) + + # Message receiving helper methods + + def wait_for_block(self, blockhash, timeout=60): + test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash + wait_until(test_function, timeout=timeout, lock=mininode_lock) + + def wait_for_getdata(self, timeout=60): + test_function = lambda: self.last_message.get("getdata") + wait_until(test_function, timeout=timeout, lock=mininode_lock) + + def wait_for_getheaders(self, timeout=60): + test_function = lambda: self.last_message.get("getheaders") + wait_until(test_function, timeout=timeout, lock=mininode_lock) + + def wait_for_inv(self, expected_inv, timeout=60): + """Waits for an INV message and checks that the first inv object in the message was as expected.""" + if len(expected_inv) > 1: + raise NotImplementedError("wait_for_inv() will only verify the first inv object") + test_function = lambda: self.last_message.get("inv") and \ + self.last_message["inv"].inv[0].type == expected_inv[0].type and \ + self.last_message["inv"].inv[0].hash == expected_inv[0].hash + wait_until(test_function, timeout=timeout, lock=mininode_lock) + + def wait_for_verack(self, timeout=60): + test_function = lambda: self.message_count["verack"] + wait_until(test_function, timeout=timeout, lock=mininode_lock) + + # Message sending helper functions + + def send_message(self, message): + if self.connection: + self.connection.send_message(message) + else: + logger.error("Cannot send message. No connection to node!") + + def send_and_ping(self, message): + self.send_message(message) + self.sync_with_ping() + + # Sync up with the node + def sync_with_ping(self, timeout=60): + self.send_message(msg_ping(nonce=self.ping_counter)) + test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter + wait_until(test_function, timeout=timeout, lock=mininode_lock) + self.ping_counter += 1 + + # Keep our own socket map for asyncore, so that we can track disconnects # ourselves (to workaround an issue with closing an asyncore socket when # using select) -- cgit v1.2.3 From dad596fc37c8733ab806a0aa4224ac437d37aee5 Mon Sep 17 00:00:00 2001 From: John Newbery Date: Fri, 17 Nov 2017 15:01:24 -0500 Subject: [tests] Make NodeConnCB a subclass of NodeConn This makes NodeConnCB a subclass of NodeConn, and removes the need for the client code to know anything about the implementation details of NodeConnCB. NodeConn can now be swapped out for any other implementation of a low-level connection without changing client code. --- test/functional/test_framework/comptool.py | 125 ++++++++++++------------- test/functional/test_framework/mininode.py | 138 +++++++++++++++------------- test/functional/test_framework/test_node.py | 13 +-- 3 files changed, 138 insertions(+), 138 deletions(-) (limited to 'test/functional/test_framework') diff --git a/test/functional/test_framework/comptool.py b/test/functional/test_framework/comptool.py index 723826bae4..2f64fba753 100755 --- a/test/functional/test_framework/comptool.py +++ b/test/functional/test_framework/comptool.py @@ -43,7 +43,6 @@ class TestNode(NodeConnCB): def __init__(self, block_store, tx_store): super().__init__() - self.conn = None self.bestblockhash = None self.block_store = block_store self.block_request_map = {} @@ -58,26 +57,23 @@ class TestNode(NodeConnCB): self.lastInv = [] self.closed = False - def on_close(self, conn): + def on_close(self): self.closed = True - def add_connection(self, conn): - self.conn = conn - - def on_headers(self, conn, message): + def on_headers(self, message): if len(message.headers) > 0: best_header = message.headers[-1] best_header.calc_sha256() self.bestblockhash = best_header.sha256 - def on_getheaders(self, conn, message): + def on_getheaders(self, message): response = self.block_store.headers_for(message.locator, message.hashstop) if response is not None: - conn.send_message(response) + self.send_message(response) - def on_getdata(self, conn, message): - [conn.send_message(r) for r in self.block_store.get_blocks(message.inv)] - [conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)] + def on_getdata(self, message): + [self.send_message(r) for r in self.block_store.get_blocks(message.inv)] + [self.send_message(r) for r in self.tx_store.get_transactions(message.inv)] for i in message.inv: if i.type == 1 or i.type == 1 | (1 << 30): # MSG_TX or MSG_WITNESS_TX @@ -85,16 +81,16 @@ class TestNode(NodeConnCB): elif i.type == 2 or i.type == 2 | (1 << 30): # MSG_BLOCK or MSG_WITNESS_BLOCK self.block_request_map[i.hash] = True - def on_inv(self, conn, message): + def on_inv(self, message): self.lastInv = [x.hash for x in message.inv] - def on_pong(self, conn, message): + def on_pong(self, message): try: del self.pingMap[message.nonce] except KeyError: raise AssertionError("Got pong for unknown ping [%s]" % repr(message)) - def on_reject(self, conn, message): + def on_reject(self, message): if message.message == b'tx': self.tx_reject_map[message.data] = RejectResult(message.code, message.reason) if message.message == b'block': @@ -102,30 +98,30 @@ class TestNode(NodeConnCB): def send_inv(self, obj): mtype = 2 if isinstance(obj, CBlock) else 1 - self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)])) + self.send_message(msg_inv([CInv(mtype, obj.sha256)])) def send_getheaders(self): # We ask for headers from their last tip. m = msg_getheaders() m.locator = self.block_store.get_locator(self.bestblockhash) - self.conn.send_message(m) + self.send_message(m) def send_header(self, header): m = msg_headers() m.headers.append(header) - self.conn.send_message(m) + self.send_message(m) # This assumes BIP31 def send_ping(self, nonce): self.pingMap[nonce] = True - self.conn.send_message(msg_ping(nonce)) + self.send_message(msg_ping(nonce)) def received_ping_response(self, nonce): return nonce not in self.pingMap def send_mempool(self): self.lastInv = [] - self.conn.send_message(msg_mempool()) + self.send_message(msg_mempool()) # TestInstance: # @@ -166,8 +162,7 @@ class TestManager(): def __init__(self, testgen, datadir): self.test_generator = testgen - self.connections = [] - self.test_nodes = [] + self.p2p_connections= [] self.block_store = BlockStore(datadir) self.tx_store = TxStore(datadir) self.ping_counter = 1 @@ -175,28 +170,24 @@ class TestManager(): def add_all_connections(self, nodes): for i in range(len(nodes)): # Create a p2p connection to each node - test_node = TestNode(self.block_store, self.tx_store) - self.test_nodes.append(test_node) - self.connections.append(NodeConn('127.0.0.1', p2p_port(i), test_node)) - # Make sure the TestNode (callback class) has a reference to its - # associated NodeConn - test_node.add_connection(self.connections[-1]) + node = TestNode(self.block_store, self.tx_store) + node.peer_connect('127.0.0.1', p2p_port(i)) + self.p2p_connections.append(node) def clear_all_connections(self): - self.connections = [] - self.test_nodes = [] + self.p2p_connections = [] def wait_for_disconnections(self): def disconnected(): - return all(node.closed for node in self.test_nodes) + return all(node.closed for node in self.p2p_connections) wait_until(disconnected, timeout=10, lock=mininode_lock) def wait_for_verack(self): - return all(node.wait_for_verack() for node in self.test_nodes) + return all(node.wait_for_verack() for node in self.p2p_connections) def wait_for_pings(self, counter): def received_pongs(): - return all(node.received_ping_response(counter) for node in self.test_nodes) + return all(node.received_ping_response(counter) for node in self.p2p_connections) wait_until(received_pongs, lock=mininode_lock) # sync_blocks: Wait for all connections to request the blockhash given @@ -206,17 +197,17 @@ class TestManager(): def blocks_requested(): return all( blockhash in node.block_request_map and node.block_request_map[blockhash] - for node in self.test_nodes + for node in self.p2p_connections ) # --> error if not requested wait_until(blocks_requested, attempts=20*num_blocks, lock=mininode_lock) # Send getheaders message - [ c.cb.send_getheaders() for c in self.connections ] + [ c.send_getheaders() for c in self.p2p_connections ] # Send ping and wait for response -- synchronization hack - [ c.cb.send_ping(self.ping_counter) for c in self.connections ] + [ c.send_ping(self.ping_counter) for c in self.p2p_connections ] self.wait_for_pings(self.ping_counter) self.ping_counter += 1 @@ -226,42 +217,42 @@ class TestManager(): def transaction_requested(): return all( txhash in node.tx_request_map and node.tx_request_map[txhash] - for node in self.test_nodes + for node in self.p2p_connections ) # --> error if not requested wait_until(transaction_requested, attempts=20*num_events, lock=mininode_lock) # Get the mempool - [ c.cb.send_mempool() for c in self.connections ] + [ c.send_mempool() for c in self.p2p_connections ] # Send ping and wait for response -- synchronization hack - [ c.cb.send_ping(self.ping_counter) for c in self.connections ] + [ c.send_ping(self.ping_counter) for c in self.p2p_connections ] self.wait_for_pings(self.ping_counter) self.ping_counter += 1 # Sort inv responses from each node with mininode_lock: - [ c.cb.lastInv.sort() for c in self.connections ] + [ c.lastInv.sort() for c in self.p2p_connections ] # Verify that the tip of each connection all agree with each other, and # with the expected outcome (if given) def check_results(self, blockhash, outcome): with mininode_lock: - for c in self.connections: + for c in self.p2p_connections: if outcome is None: - if c.cb.bestblockhash != self.connections[0].cb.bestblockhash: + if c.bestblockhash != self.p2p_connections[0].bestblockhash: return False elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code - if c.cb.bestblockhash == blockhash: + if c.bestblockhash == blockhash: return False - if blockhash not in c.cb.block_reject_map: + if blockhash not in c.block_reject_map: logger.error('Block not in reject map: %064x' % (blockhash)) return False - if not outcome.match(c.cb.block_reject_map[blockhash]): - logger.error('Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash)) + if not outcome.match(c.block_reject_map[blockhash]): + logger.error('Block rejected with %s instead of expected %s: %064x' % (c.block_reject_map[blockhash], outcome, blockhash)) return False - elif ((c.cb.bestblockhash == blockhash) != outcome): + elif ((c.bestblockhash == blockhash) != outcome): return False return True @@ -273,21 +264,21 @@ class TestManager(): # a particular tx's existence in the mempool is the same across all nodes. def check_mempool(self, txhash, outcome): with mininode_lock: - for c in self.connections: + for c in self.p2p_connections: if outcome is None: # Make sure the mempools agree with each other - if c.cb.lastInv != self.connections[0].cb.lastInv: + if c.lastInv != self.p2p_connections[0].lastInv: return False elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code - if txhash in c.cb.lastInv: + if txhash in c.lastInv: return False - if txhash not in c.cb.tx_reject_map: + if txhash not in c.tx_reject_map: logger.error('Tx not in reject map: %064x' % (txhash)) return False - if not outcome.match(c.cb.tx_reject_map[txhash]): - logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash)) + if not outcome.match(c.tx_reject_map[txhash]): + logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.tx_reject_map[txhash], outcome, txhash)) return False - elif ((txhash in c.cb.lastInv) != outcome): + elif ((txhash in c.lastInv) != outcome): return False return True @@ -332,25 +323,25 @@ class TestManager(): first_block_with_hash = False with mininode_lock: self.block_store.add_block(block) - for c in self.connections: - if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True: + for c in self.p2p_connections: + if first_block_with_hash and block.sha256 in c.block_request_map and c.block_request_map[block.sha256] == True: # There was a previous request for this block hash # Most likely, we delivered a header for this block # but never had the block to respond to the getdata c.send_message(msg_block(block)) else: - c.cb.block_request_map[block.sha256] = False + c.block_request_map[block.sha256] = False # Either send inv's to each node and sync, or add # to invqueue for later inv'ing. if (test_instance.sync_every_block): # if we expect success, send inv and sync every block # if we expect failure, just push the block and see what happens. if outcome == True: - [ c.cb.send_inv(block) for c in self.connections ] + [ c.send_inv(block) for c in self.p2p_connections ] self.sync_blocks(block.sha256, 1) else: - [ c.send_message(msg_block(block)) for c in self.connections ] - [ c.cb.send_ping(self.ping_counter) for c in self.connections ] + [ c.send_message(msg_block(block)) for c in self.p2p_connections ] + [ c.send_ping(self.ping_counter) for c in self.p2p_connections ] self.wait_for_pings(self.ping_counter) self.ping_counter += 1 if (not self.check_results(tip, outcome)): @@ -360,7 +351,7 @@ class TestManager(): elif isinstance(b_or_t, CBlockHeader): block_header = b_or_t self.block_store.add_header(block_header) - [ c.cb.send_header(block_header) for c in self.connections ] + [ c.send_header(block_header) for c in self.p2p_connections ] else: # Tx test runner assert(isinstance(b_or_t, CTransaction)) @@ -369,11 +360,11 @@ class TestManager(): # Add to shared tx store and clear map entry with mininode_lock: self.tx_store.add_transaction(tx) - for c in self.connections: - c.cb.tx_request_map[tx.sha256] = False + for c in self.p2p_connections: + c.tx_request_map[tx.sha256] = False # Again, either inv to all nodes or save for later if (test_instance.sync_every_tx): - [ c.cb.send_inv(tx) for c in self.connections ] + [ c.send_inv(tx) for c in self.p2p_connections ] self.sync_transaction(tx.sha256, 1) if (not self.check_mempool(tx.sha256, outcome)): raise AssertionError("Test failed at test %d" % test_number) @@ -381,26 +372,26 @@ class TestManager(): invqueue.append(CInv(1, tx.sha256)) # Ensure we're not overflowing the inv queue if len(invqueue) == MAX_INV_SZ: - [ c.send_message(msg_inv(invqueue)) for c in self.connections ] + [ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ] invqueue = [] # Do final sync if we weren't syncing on every block or every tx. if (not test_instance.sync_every_block and block is not None): if len(invqueue) > 0: - [ c.send_message(msg_inv(invqueue)) for c in self.connections ] + [ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ] invqueue = [] self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions)) if (not self.check_results(tip, block_outcome)): raise AssertionError("Block test failed at test %d" % test_number) if (not test_instance.sync_every_tx and tx is not None): if len(invqueue) > 0: - [ c.send_message(msg_inv(invqueue)) for c in self.connections ] + [ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ] invqueue = [] self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions)) if (not self.check_mempool(tx.sha256, tx_outcome)): raise AssertionError("Mempool test failed at test %d" % test_number) - [ c.disconnect_node() for c in self.connections ] + [ c.disconnect_node() for c in self.p2p_connections ] self.wait_for_disconnections() self.block_store.close() self.tx_store.close() diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 2b888949f4..09a382c727 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -23,6 +23,7 @@ import sys from threading import RLock, Thread from test_framework.messages import * +from test_framework.util import wait_until logger = logging.getLogger("TestFramework.mininode") @@ -57,12 +58,24 @@ MAGIC_BYTES = { } class NodeConn(asyncore.dispatcher): - """The actual NodeConn class + """A low-level connection object to a node's P2P interface. - This class provides an interface for a p2p connection to a specified node.""" + This class is responsible for: - def __init__(self, dstaddr, dstport, callback, net="regtest", services=NODE_NETWORK|NODE_WITNESS, send_version=True): - asyncore.dispatcher.__init__(self, map=mininode_socket_map) + - opening and closing the TCP connection to the node + - reading bytes from and writing bytes to the socket + - deserializing and serializing the P2P message header + - logging messages as they are sent and received + + This class contains no logic for handing the P2P message payloads. It must be + sub-classed and the on_message() callback overridden. + + TODO: rename this class P2PConnection.""" + + def __init__(self): + super().__init__(map=mininode_socket_map) + + def peer_connect(self, dstaddr, dstport, net="regtest", services=NODE_NETWORK|NODE_WITNESS, send_version=True): self.dstaddr = dstaddr self.dstport = dstport self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -71,9 +84,7 @@ class NodeConn(asyncore.dispatcher): self.recvbuf = b"" self.state = "connecting" self.network = net - self.cb = callback self.disconnect = False - self.nServices = 0 if send_version: # stuff version msg into sendbuf @@ -92,6 +103,11 @@ class NodeConn(asyncore.dispatcher): except: self.handle_close() + def peer_disconnect(self): + # Connection could have already been closed by other end. + if self.state == "connected": + self.disconnect_node() + # Connection and disconnection methods def handle_connect(self): @@ -99,7 +115,7 @@ class NodeConn(asyncore.dispatcher): if self.state != "connected": logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) self.state = "connected" - self.cb.on_open(self) + self.on_open() def handle_close(self): """asyncore callback when a connection is closed.""" @@ -111,7 +127,7 @@ class NodeConn(asyncore.dispatcher): self.close() except: pass - self.cb.on_close(self) + self.on_close() def disconnect_node(self): """Disconnect the p2p connection. @@ -166,8 +182,8 @@ class NodeConn(asyncore.dispatcher): raise def on_message(self, message): - """Callback for processing a P2P payload. Calls into NodeConnCB.""" - self.cb.on_message(self, message) + """Callback for processing a P2P payload. Must be overridden by derived class.""" + raise NotImplementedError # Socket write methods @@ -238,15 +254,19 @@ class NodeConn(asyncore.dispatcher): logger.debug(log_message) -class NodeConnCB(): - """Callback and helper functions for P2P connection to a bitcoind node. +class NodeConnCB(NodeConn): + """A high-level P2P interface class for communicating with a Bitcoin node. + + This class provides high-level callbacks for processing P2P message + payloads, as well as convenience methods for interacting with the + node over P2P. Individual testcases should subclass this and override the on_* methods - if they want to alter message handling behaviour.""" + if they want to alter message handling behaviour. + + TODO: rename this class P2PInterface""" def __init__(self): - # Track whether we have a P2P connection open to the node - self.connected = False - self.connection = None + super().__init__() # Track number of messages of each type received and the most recent # message of each type @@ -256,9 +276,12 @@ class NodeConnCB(): # A count of the number of ping messages we've sent to the node self.ping_counter = 1 + # The network services received from the peer + self.nServices = 0 + # Message receiving methods - def on_message(self, conn, message): + def on_message(self, message): """Receive message and dispatch message to appropriate callback. We keep a count of how many of each message type has been received @@ -268,66 +291,61 @@ class NodeConnCB(): command = message.command.decode('ascii') self.message_count[command] += 1 self.last_message[command] = message - getattr(self, 'on_' + command)(conn, message) + getattr(self, 'on_' + command)(message) except: - print("ERROR delivering %s (%s)" % (repr(message), - sys.exc_info()[0])) + print("ERROR delivering %s (%s)" % (repr(message), sys.exc_info()[0])) raise # Callback methods. Can be overridden by subclasses in individual test # cases to provide custom message handling behaviour. - def on_open(self, conn): - self.connected = True - - def on_close(self, conn): - self.connected = False - self.connection = None - - def on_addr(self, conn, message): pass - def on_block(self, conn, message): pass - def on_blocktxn(self, conn, message): pass - def on_cmpctblock(self, conn, message): pass - def on_feefilter(self, conn, message): pass - def on_getaddr(self, conn, message): pass - def on_getblocks(self, conn, message): pass - def on_getblocktxn(self, conn, message): pass - def on_getdata(self, conn, message): pass - def on_getheaders(self, conn, message): pass - def on_headers(self, conn, message): pass - def on_mempool(self, conn): pass - def on_pong(self, conn, message): pass - def on_reject(self, conn, message): pass - def on_sendcmpct(self, conn, message): pass - def on_sendheaders(self, conn, message): pass - def on_tx(self, conn, message): pass - - def on_inv(self, conn, message): + def on_open(self): + pass + + def on_close(self): + pass + + def on_addr(self, message): pass + def on_block(self, message): pass + def on_blocktxn(self, message): pass + def on_cmpctblock(self, message): pass + def on_feefilter(self, message): pass + def on_getaddr(self, message): pass + def on_getblocks(self, message): pass + def on_getblocktxn(self, message): pass + def on_getdata(self, message): pass + def on_getheaders(self, message): pass + def on_headers(self, message): pass + def on_mempool(self, message): pass + def on_pong(self, message): pass + def on_reject(self, message): pass + def on_sendcmpct(self, message): pass + def on_sendheaders(self, message): pass + def on_tx(self, message): pass + + def on_inv(self, message): want = msg_getdata() for i in message.inv: if i.type != 0: want.inv.append(i) if len(want.inv): - conn.send_message(want) + self.send_message(want) - def on_ping(self, conn, message): - conn.send_message(msg_pong(message.nonce)) + def on_ping(self, message): + self.send_message(msg_pong(message.nonce)) - def on_verack(self, conn, message): + def on_verack(self, message): self.verack_received = True - def on_version(self, conn, message): + def on_version(self, message): assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED) - conn.send_message(msg_verack()) - conn.nServices = message.nServices + self.send_message(msg_verack()) + self.nServices = message.nServices # Connection helper methods - def add_connection(self, conn): - self.connection = conn - def wait_for_disconnect(self, timeout=60): - test_function = lambda: not self.connected + test_function = lambda: self.state != "connected" wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message receiving helper methods @@ -359,12 +377,6 @@ class NodeConnCB(): # Message sending helper functions - def send_message(self, message): - if self.connection: - self.connection.send_message(message) - else: - logger.error("Cannot send message. No connection to node!") - def send_and_ping(self, message): self.send_message(message) self.sync_with_ping() diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index 34b458482a..a9248c764e 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -14,7 +14,6 @@ import subprocess import time from .authproxy import JSONRPCException -from .mininode import NodeConn from .util import ( assert_equal, get_rpc_proxy, @@ -158,7 +157,7 @@ class TestNode(): self.encryptwallet(passphrase) self.wait_until_stopped() - def add_p2p_connection(self, p2p_conn, **kwargs): + def add_p2p_connection(self, p2p_conn, *args, **kwargs): """Add a p2p connection to the node. This method adds the p2p connection to the self.p2ps list and also @@ -167,9 +166,9 @@ class TestNode(): kwargs['dstport'] = p2p_port(self.index) if 'dstaddr' not in kwargs: kwargs['dstaddr'] = '127.0.0.1' + + p2p_conn.peer_connect(*args, **kwargs) self.p2ps.append(p2p_conn) - kwargs.update({'callback': p2p_conn}) - p2p_conn.add_connection(NodeConn(**kwargs)) return p2p_conn @@ -185,10 +184,8 @@ class TestNode(): def disconnect_p2ps(self): """Close all p2p connections to the node.""" for p in self.p2ps: - # Connection could have already been closed by other end. - if p.connection is not None: - p.connection.disconnect_node() - self.p2ps = [] + p.peer_disconnect() + del self.p2ps[:] class TestNodeCLI(): -- cgit v1.2.3 From e9dfa9bccc5cbb6096c60498651b451297f0a931 Mon Sep 17 00:00:00 2001 From: John Newbery Date: Tue, 17 Oct 2017 15:56:12 -0400 Subject: [tests] Move version message sending from NodeConn to NodeConnCB This commit moves the logic that sends a version message on connection from NodeConn to NodeConnCB. NodeConn should not be aware of the semantics or meaning of the P2P payloads. --- test/functional/test_framework/mininode.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) (limited to 'test/functional/test_framework') diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 09a382c727..c580d99c79 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -75,7 +75,7 @@ class NodeConn(asyncore.dispatcher): def __init__(self): super().__init__(map=mininode_socket_map) - def peer_connect(self, dstaddr, dstport, net="regtest", services=NODE_NETWORK|NODE_WITNESS, send_version=True): + def peer_connect(self, dstaddr, dstport, net="regtest"): self.dstaddr = dstaddr self.dstport = dstport self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -86,16 +86,6 @@ class NodeConn(asyncore.dispatcher): self.network = net self.disconnect = False - if send_version: - # stuff version msg into sendbuf - vt = msg_version() - vt.nServices = services - vt.addrTo.ip = self.dstaddr - vt.addrTo.port = self.dstport - vt.addrFrom.ip = "0.0.0.0" - vt.addrFrom.port = 0 - self.send_message(vt, True) - logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport)) try: @@ -279,6 +269,19 @@ class NodeConnCB(NodeConn): # The network services received from the peer self.nServices = 0 + def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs): + super().peer_connect(*args, **kwargs) + + if send_version: + # Send a version msg + vt = msg_version() + vt.nServices = services + vt.addrTo.ip = self.dstaddr + vt.addrTo.port = self.dstport + vt.addrFrom.ip = "0.0.0.0" + vt.addrFrom.port = 0 + self.send_message(vt, True) + # Message receiving methods def on_message(self, message): -- cgit v1.2.3