diff options
-rwxr-xr-x | test/functional/test_framework/mininode.py | 170 |
1 files changed, 92 insertions, 78 deletions
diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 2a1ac2d705..91badc72ce 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -27,18 +27,35 @@ from test_framework.messages import * logger = logging.getLogger("TestFramework.mininode") -# Keep our own socket map for asyncore, so that we can track disconnects -# ourselves (to workaround an issue with closing an asyncore socket when -# using select) -mininode_socket_map = dict() - -# One lock for synchronizing all data access between the networking thread (see -# NetworkThread below) and the thread running the test logic. For simplicity, -# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, -# and whenever adding anything to the send buffer (in send_message()). This -# lock should be acquired in the thread running the test logic to synchronize -# access to any data shared with the NodeConnCB or NodeConn. -mininode_lock = RLock() +MESSAGEMAP = { + b"addr": msg_addr, + b"block": msg_block, + b"blocktxn": msg_blocktxn, + b"cmpctblock": msg_cmpctblock, + b"feefilter": msg_feefilter, + b"getaddr": msg_getaddr, + b"getblocks": msg_getblocks, + b"getblocktxn": msg_getblocktxn, + b"getdata": msg_getdata, + b"getheaders": msg_getheaders, + b"headers": msg_headers, + b"inv": msg_inv, + b"mempool": msg_mempool, + b"ping": msg_ping, + b"pong": msg_pong, + b"reject": msg_reject, + b"sendcmpct": msg_sendcmpct, + b"sendheaders": msg_sendheaders, + b"tx": msg_tx, + b"verack": msg_verack, + b"version": msg_version, +} + +MAGIC_BYTES = { + "mainnet": b"\xf9\xbe\xb4\xd9", # mainnet + "testnet3": b"\x0b\x11\x09\x07", # testnet3 + "regtest": b"\xfa\xbf\xb5\xda", # regtest +} class NodeConnCB(): """Callback and helper functions for P2P connection to a bitcoind node. @@ -183,34 +200,6 @@ class NodeConn(asyncore.dispatcher): """The actual NodeConn class This class provides an interface for a p2p connection to a specified node.""" - messagemap = { - b"version": msg_version, - b"verack": msg_verack, - b"addr": msg_addr, - b"inv": msg_inv, - b"getdata": msg_getdata, - b"getblocks": msg_getblocks, - b"tx": msg_tx, - b"block": msg_block, - b"getaddr": msg_getaddr, - b"ping": msg_ping, - b"pong": msg_pong, - b"headers": msg_headers, - b"getheaders": msg_getheaders, - b"reject": msg_reject, - b"mempool": msg_mempool, - b"feefilter": msg_feefilter, - b"sendheaders": msg_sendheaders, - b"sendcmpct": msg_sendcmpct, - b"cmpctblock": msg_cmpctblock, - b"getblocktxn": msg_getblocktxn, - b"blocktxn": msg_blocktxn - } - MAGIC_BYTES = { - "mainnet": b"\xf9\xbe\xb4\xd9", # mainnet - "testnet3": b"\x0b\x11\x09\x07", # testnet3 - "regtest": b"\xfa\xbf\xb5\xda", # regtest - } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK|NODE_WITNESS, send_version=True): asyncore.dispatcher.__init__(self, map=mininode_socket_map) @@ -247,6 +236,8 @@ class NodeConn(asyncore.dispatcher): self.handle_close() self.rpc = rpc + # Connection and disconnection methods + def handle_connect(self): if self.state != "connected": logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) @@ -264,44 +255,30 @@ class NodeConn(asyncore.dispatcher): pass self.cb.on_close(self) + def disconnect_node(self): + """ Disconnect the p2p connection. + + Called by the test logic thread. Causes the p2p connection + to be disconnected on the next iteration of the asyncore loop.""" + self.disconnect = True + + # Socket read methods + + def readable(self): + return True + def handle_read(self): t = self.recv(8192) if len(t) > 0: self.recvbuf += t self.got_data() - def readable(self): - return True - - def writable(self): - with mininode_lock: - pre_connection = self.state == "connecting" - length = len(self.sendbuf) - return (length > 0 or pre_connection) - - def handle_write(self): - with mininode_lock: - # asyncore does not expose socket connection, only the first read/write - # event, thus we must check connection manually here to know when we - # actually connect - if self.state == "connecting": - self.handle_connect() - if not self.writable(): - return - - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - def got_data(self): try: while True: if len(self.recvbuf) < 4: return - if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: + if self.recvbuf[:4] != MAGIC_BYTES[self.network]: raise ValueError("got garbage %s" % repr(self.recvbuf)) if len(self.recvbuf) < 4 + 12 + 4 + 4: return @@ -316,23 +293,54 @@ class NodeConn(asyncore.dispatcher): if checksum != h[:4]: raise ValueError("got bad checksum " + repr(self.recvbuf)) self.recvbuf = self.recvbuf[4+12+4+4+msglen:] - if command not in self.messagemap: + if command not in MESSAGEMAP: raise ValueError("Received unknown command from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, command, repr(msg))) f = BytesIO(msg) - t = self.messagemap[command]() + t = MESSAGEMAP[command]() t.deserialize(f) self.got_message(t) except Exception as e: logger.exception('Error reading message:', repr(e)) raise + def got_message(self, message): + if self.last_sent + 30 * 60 < time.time(): + self.send_message(MESSAGEMAP[b'ping']()) + self._log_message("receive", message) + self.cb.deliver(self, message) + + # Socket write methods + + def writable(self): + with mininode_lock: + pre_connection = self.state == "connecting" + length = len(self.sendbuf) + return (length > 0 or pre_connection) + + def handle_write(self): + with mininode_lock: + # asyncore does not expose socket connection, only the first read/write + # event, thus we must check connection manually here to know when we + # actually connect + if self.state == "connecting": + self.handle_connect() + if not self.writable(): + return + + try: + sent = self.send(self.sendbuf) + except: + self.handle_close() + return + self.sendbuf = self.sendbuf[sent:] + def send_message(self, message, pushbuf=False): if self.state != "connected" and not pushbuf: raise IOError('Not connected, no pushbuf') self._log_message("send", message) command = message.command data = message.serialize() - tmsg = self.MAGIC_BYTES[self.network] + tmsg = MAGIC_BYTES[self.network] tmsg += command tmsg += b"\x00" * (12 - len(command)) tmsg += struct.pack("<I", len(data)) @@ -351,11 +359,7 @@ class NodeConn(asyncore.dispatcher): self.sendbuf += tmsg self.last_sent = time.time() - def got_message(self, message): - if self.last_sent + 30 * 60 < time.time(): - self.send_message(self.messagemap[b'ping']()) - self._log_message("receive", message) - self.cb.deliver(self, message) + # Class utility methods def _log_message(self, direction, msg): if direction == "send": @@ -367,9 +371,19 @@ class NodeConn(asyncore.dispatcher): log_message += "... (msg truncated)" logger.debug(log_message) - def disconnect_node(self): - self.disconnect = True +# Keep our own socket map for asyncore, so that we can track disconnects +# ourselves (to workaround an issue with closing an asyncore socket when +# using select) +mininode_socket_map = dict() + +# One lock for synchronizing all data access between the networking thread (see +# NetworkThread below) and the thread running the test logic. For simplicity, +# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, +# and whenever adding anything to the send buffer (in send_message()). This +# lock should be acquired in the thread running the test logic to synchronize +# access to any data shared with the NodeConnCB or NodeConn. +mininode_lock = RLock() class NetworkThread(Thread): def run(self): @@ -381,6 +395,6 @@ class NetworkThread(Thread): for fd, obj in mininode_socket_map.items(): if obj.disconnect: disconnected.append(obj) - [ obj.handle_close() for obj in disconnected ] + [obj.handle_close() for obj in disconnected] asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) logger.debug("Network thread closing") |