diff options
Diffstat (limited to 'qa/rpc-tests/mininode.py')
-rwxr-xr-x | qa/rpc-tests/mininode.py | 67 |
1 files changed, 38 insertions, 29 deletions
diff --git a/qa/rpc-tests/mininode.py b/qa/rpc-tests/mininode.py index c5c1bcfbbe..b7d78e74fa 100755 --- a/qa/rpc-tests/mininode.py +++ b/qa/rpc-tests/mininode.py @@ -26,7 +26,7 @@ import sys import random import cStringIO import hashlib -from threading import Lock +from threading import RLock from threading import Thread import logging import copy @@ -37,6 +37,19 @@ MY_SUBVERSION = "/python-mininode-tester:0.0.1/" MAX_INV_SZ = 50000 +# Keep our own socket map for asyncore, so that we can track disconnects +# ourselves (to workaround an issue with closing an asyncore socket when +# using select) +mininode_socket_map = dict() + +# One lock for synchronizing all data access between the networking thread (see +# NetworkThread below) and the thread running the test logic. For simplicity, +# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB, +# and whenever adding anything to the send buffer (in send_message()). This +# lock should be acquired in the thread running the test logic to synchronize +# access to any data shared with the NodeConnCB or NodeConn. +mininode_lock = RLock() + # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() @@ -975,10 +988,6 @@ class msg_reject(object): # Reimplement the on_* functions to provide handling for events class NodeConnCB(object): def __init__(self): - # Acquire on all callbacks -- overkill for now since asyncore is - # single-threaded, but may be useful for synchronizing access to - # member variables in derived classes. - self.cbLock = Lock() self.verack_received = False # Derived classes should call this function once to set the message map @@ -1004,7 +1013,7 @@ class NodeConnCB(object): } def deliver(self, conn, message): - with self.cbLock: + with mininode_lock: try: self.cbmap[message.command](conn, message) except: @@ -1076,7 +1085,7 @@ class NodeConn(asyncore.dispatcher): } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"): - asyncore.dispatcher.__init__(self) + asyncore.dispatcher.__init__(self, map=mininode_socket_map) self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport)) self.dstaddr = dstaddr self.dstport = dstport @@ -1089,7 +1098,6 @@ class NodeConn(asyncore.dispatcher): self.state = "connecting" self.network = net self.cb = callback - self.sendbufLock = Lock() # for protecting the sendbuffer self.disconnect = False # stuff version msg into sendbuf @@ -1140,24 +1148,18 @@ class NodeConn(asyncore.dispatcher): return True def writable(self): - if self.disconnect: - self.handle_close() - return False - else: - self.sendbufLock.acquire() + with mininode_lock: length = len(self.sendbuf) - self.sendbufLock.release() - return (length > 0) + return (length > 0) def handle_write(self): - self.sendbufLock.acquire() - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - self.sendbufLock.release() + with mininode_lock: + try: + sent = self.send(self.sendbuf) + except: + self.handle_close() + return + self.sendbuf = self.sendbuf[sent:] def got_data(self): while True: @@ -1201,7 +1203,6 @@ class NodeConn(asyncore.dispatcher): def send_message(self, message, pushbuf=False): if self.state != "connected" and not pushbuf: return - self.sendbufLock.acquire() self.show_debug_msg("Send %s" % repr(message)) command = message.command data = message.serialize() @@ -1214,9 +1215,9 @@ class NodeConn(asyncore.dispatcher): h = sha256(th) tmsg += h[:4] tmsg += data - self.sendbuf += tmsg - self.last_sent = time.time() - self.sendbufLock.release() + with mininode_lock: + self.sendbuf += tmsg + self.last_sent = time.time() def got_message(self, message): if message.command == "version": @@ -1229,12 +1230,20 @@ class NodeConn(asyncore.dispatcher): def disconnect_node(self): self.disconnect = True - self.send_message(self.messagemap['ping']()) class NetworkThread(Thread): def run(self): - asyncore.loop(0.1, True) + while mininode_socket_map: + # We check for whether to disconnect outside of the asyncore + # loop to workaround the behavior of asyncore when using + # select + disconnected = [] + for fd, obj in mininode_socket_map.items(): + if obj.disconnect: + disconnected.append(obj) + [ obj.handle_close() for obj in disconnected ] + asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) # An exception we can raise if we detect a potential disconnect |