aboutsummaryrefslogtreecommitdiff
path: root/test/functional/test_framework/mininode.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_framework/mininode.py')
-rwxr-xr-xtest/functional/test_framework/mininode.py200
1 files changed, 72 insertions, 128 deletions
diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py
index 7c2125a177..29bf33fa5b 100755
--- a/test/functional/test_framework/mininode.py
+++ b/test/functional/test_framework/mininode.py
@@ -13,11 +13,10 @@ P2PConnection: A low-level connection object to a node's P2P interface
P2PInterface: A high-level interface object for communicating to a node over P2P
P2PDataStore: A p2p interface class that keeps a store of transactions and blocks
and can respond correctly to getdata and getheaders messages"""
-import asyncore
+import asyncio
from collections import defaultdict
from io import BytesIO
import logging
-import socket
import struct
import sys
import threading
@@ -57,7 +56,8 @@ MAGIC_BYTES = {
"regtest": b"\xfa\xbf\xb5\xda", # regtest
}
-class P2PConnection(asyncore.dispatcher):
+
+class P2PConnection(asyncio.Protocol):
"""A low-level connection object to a node's P2P interface.
This class is responsible for:
@@ -71,68 +71,59 @@ class P2PConnection(asyncore.dispatcher):
sub-classed and the on_message() callback overridden."""
def __init__(self):
- # All P2PConnections must be created before starting the NetworkThread.
- # assert that the network thread is not running.
- assert not network_thread_running()
+ # The underlying transport of the connection.
+ # Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe
+ self._transport = None
- super().__init__(map=mininode_socket_map)
+ @property
+ def is_connected(self):
+ return self._transport is not None
def peer_connect(self, dstaddr, dstport, net="regtest"):
+ assert not self.is_connected
self.dstaddr = dstaddr
self.dstport = dstport
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- self.sendbuf = b""
+ # The initial message to send after the connection was made:
+ self.on_connection_send_msg = None
self.recvbuf = b""
- self.state = "connecting"
self.network = net
- self.disconnect = False
-
logger.debug('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport))
- try:
- self.connect((dstaddr, dstport))
- except:
- self.handle_close()
+ loop = NetworkThread.network_event_loop
+ conn_gen_unsafe = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport)
+ conn_gen = lambda: loop.call_soon_threadsafe(loop.create_task, conn_gen_unsafe)
+ return conn_gen
def peer_disconnect(self):
# Connection could have already been closed by other end.
- if self.state == "connected":
- self.disconnect_node()
+ NetworkThread.network_event_loop.call_soon_threadsafe(lambda: self._transport and self._transport.abort())
# Connection and disconnection methods
- def handle_connect(self):
- """asyncore callback when a connection is opened."""
- if self.state != "connected":
- logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
- self.state = "connected"
- self.on_open()
-
- def handle_close(self):
- """asyncore callback when a connection is closed."""
- logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
- self.state = "closed"
+ def connection_made(self, transport):
+ """asyncio callback when a connection is opened."""
+ assert not self._transport
+ logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
+ self._transport = transport
+ if self.on_connection_send_msg:
+ self.send_message(self.on_connection_send_msg)
+ self.on_connection_send_msg = None # Never used again
+ self.on_open()
+
+ def connection_lost(self, exc):
+ """asyncio callback when a connection is closed."""
+ if exc:
+ logger.warning("Connection lost to {}:{} due to {}".format(self.dstaddr, self.dstport, exc))
+ else:
+ logger.debug("Closed connection to: %s:%d" % (self.dstaddr, self.dstport))
+ self._transport = None
self.recvbuf = b""
- self.sendbuf = b""
- try:
- self.close()
- except:
- pass
self.on_close()
- def disconnect_node(self):
- """Disconnect the p2p connection.
-
- Called by the test logic thread. Causes the p2p connection
- to be disconnected on the next iteration of the asyncore loop."""
- self.disconnect = True
-
# Socket read methods
- def handle_read(self):
- """asyncore callback when data is read from the socket."""
- t = self.recv(8192)
+ def data_received(self, t):
+ """asyncio callback when data is read from the socket."""
if len(t) > 0:
self.recvbuf += t
self._on_data()
@@ -179,39 +170,21 @@ class P2PConnection(asyncore.dispatcher):
# Socket write methods
- def writable(self):
- """asyncore method to determine whether the handle_write() callback should be called on the next loop."""
- with mininode_lock:
- pre_connection = self.state == "connecting"
- length = len(self.sendbuf)
- return (length > 0 or pre_connection)
-
- def handle_write(self):
- """asyncore callback when data should be written to the socket."""
- with mininode_lock:
- # asyncore does not expose socket connection, only the first read/write
- # event, thus we must check connection manually here to know when we
- # actually connect
- if self.state == "connecting":
- self.handle_connect()
- if not self.writable():
- return
-
- try:
- sent = self.send(self.sendbuf)
- except:
- self.handle_close()
- return
- self.sendbuf = self.sendbuf[sent:]
-
- def send_message(self, message, pushbuf=False):
+ def send_message(self, message):
"""Send a P2P message over the socket.
This method takes a P2P payload, builds the P2P header and adds
the message to the send buffer to be sent over the socket."""
- if self.state != "connected" and not pushbuf:
- raise IOError('Not connected, no pushbuf')
+ if not self.is_connected:
+ raise IOError('Not connected')
self._log_message("send", message)
+ tmsg = self._build_message(message)
+ NetworkThread.network_event_loop.call_soon_threadsafe(lambda: self._transport and self._transport.write(tmsg))
+
+ # Class utility methods
+
+ def _build_message(self, message):
+ """Build a serialized P2P message"""
command = message.command
data = message.serialize()
tmsg = MAGIC_BYTES[self.network]
@@ -222,17 +195,7 @@ class P2PConnection(asyncore.dispatcher):
h = sha256(th)
tmsg += h[:4]
tmsg += data
- with mininode_lock:
- if (len(self.sendbuf) == 0 and not pushbuf):
- try:
- sent = self.send(tmsg)
- self.sendbuf = tmsg[sent:]
- except BlockingIOError:
- self.sendbuf = tmsg
- else:
- self.sendbuf += tmsg
-
- # Class utility methods
+ return tmsg
def _log_message(self, direction, msg):
"""Logs a message being sent or received over the connection."""
@@ -270,7 +233,7 @@ class P2PInterface(P2PConnection):
self.nServices = 0
def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs):
- super().peer_connect(*args, **kwargs)
+ create_conn = super().peer_connect(*args, **kwargs)
if send_version:
# Send a version msg
@@ -280,7 +243,9 @@ class P2PInterface(P2PConnection):
vt.addrTo.port = self.dstport
vt.addrFrom.ip = "0.0.0.0"
vt.addrFrom.port = 0
- self.send_message(vt, True)
+ self.on_connection_send_msg = vt # Will be sent soon after connection_made
+
+ return create_conn
# Message receiving methods
@@ -348,7 +313,7 @@ class P2PInterface(P2PConnection):
# Connection helper methods
def wait_for_disconnect(self, timeout=60):
- test_function = lambda: self.state != "connected"
+ test_function = lambda: not self.is_connected
wait_until(test_function, timeout=timeout, lock=mininode_lock)
# Message receiving helper methods
@@ -404,56 +369,35 @@ class P2PInterface(P2PConnection):
self.ping_counter += 1
-# Keep our own socket map for asyncore, so that we can track disconnects
-# ourselves (to work around an issue with closing an asyncore socket when
-# using select)
-mininode_socket_map = dict()
-
-# One lock for synchronizing all data access between the networking thread (see
+# One lock for synchronizing all data access between the network event loop (see
# NetworkThread below) and the thread running the test logic. For simplicity,
-# P2PConnection acquires this lock whenever delivering a message to a P2PInterface,
-# and whenever adding anything to the send buffer (in send_message()). This
-# lock should be acquired in the thread running the test logic to synchronize
+# P2PConnection acquires this lock whenever delivering a message to a P2PInterface.
+# This lock should be acquired in the thread running the test logic to synchronize
# access to any data shared with the P2PInterface or P2PConnection.
mininode_lock = threading.RLock()
+
class NetworkThread(threading.Thread):
+ network_event_loop = None
+
def __init__(self):
super().__init__(name="NetworkThread")
+ # There is only one event loop and no more than one thread must be created
+ assert not self.network_event_loop
+
+ NetworkThread.network_event_loop = asyncio.new_event_loop()
def run(self):
- while mininode_socket_map:
- # We check for whether to disconnect outside of the asyncore
- # loop to work around the behavior of asyncore when using
- # select
- disconnected = []
- for fd, obj in mininode_socket_map.items():
- if obj.disconnect:
- disconnected.append(obj)
- [obj.handle_close() for obj in disconnected]
- asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
- logger.debug("Network thread closing")
-
-def network_thread_start():
- """Start the network thread."""
- # Only one network thread may run at a time
- assert not network_thread_running()
-
- NetworkThread().start()
-
-def network_thread_running():
- """Return whether the network thread is running."""
- return any([thread.name == "NetworkThread" for thread in threading.enumerate()])
-
-def network_thread_join(timeout=10):
- """Wait timeout seconds for the network thread to terminate.
-
- Throw if the network thread doesn't terminate in timeout seconds."""
- network_threads = [thread for thread in threading.enumerate() if thread.name == "NetworkThread"]
- assert len(network_threads) <= 1
- for thread in network_threads:
- thread.join(timeout)
- assert not thread.is_alive()
+ """Start the network thread."""
+ self.network_event_loop.run_forever()
+
+ def close(self, timeout=10):
+ """Close the connections and network event loop."""
+ self.network_event_loop.call_soon_threadsafe(self.network_event_loop.stop)
+ wait_until(lambda: not self.network_event_loop.is_running(), timeout=timeout)
+ self.network_event_loop.close()
+ self.join(timeout)
+
class P2PDataStore(P2PInterface):
"""A P2P data store class.