diff options
-rwxr-xr-x | contrib/zmq/zmq_sub.py | 14 | ||||
-rw-r--r-- | doc/release-notes.md | 9 | ||||
-rw-r--r-- | doc/zmq.md | 5 | ||||
-rwxr-xr-x | qa/rpc-tests/zmq_test.py | 13 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 34 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.h | 12 |
6 files changed, 74 insertions, 13 deletions
diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index decf29d42a..6268123dd8 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -3,6 +3,7 @@ import array import binascii import zmq +import struct port = 28332 @@ -19,18 +20,21 @@ try: msg = zmqSubSocket.recv_multipart() topic = str(msg[0]) body = msg[1] - + sequence = "Unknown"; + if len(msg[-1]) == 4: + msgSequence = struct.unpack('<I', msg[-1])[-1] + sequence = str(msgSequence) if topic == "hashblock": - print "- HASH BLOCK -" + print '- HASH BLOCK ('+sequence+') -' print binascii.hexlify(body) elif topic == "hashtx": - print '- HASH TX -' + print '- HASH TX ('+sequence+') -' print binascii.hexlify(body) elif topic == "rawblock": - print "- RAW BLOCK HEADER -" + print '- RAW BLOCK HEADER ('+sequence+') -' print binascii.hexlify(body[:80]) elif topic == "rawtx": - print '- RAW TX -' + print '- RAW TX ('+sequence+') -' print binascii.hexlify(body) except KeyboardInterrupt: diff --git a/doc/release-notes.md b/doc/release-notes.md index 8360cc4816..73ad13e722 100644 --- a/doc/release-notes.md +++ b/doc/release-notes.md @@ -52,6 +52,15 @@ The following outputs are affected by this change: - REST `/rest/block/` (JSON format when including extended tx details) - `bitcoin-tx -json` +### ZMQ + +Each ZMQ notification now contains an up-counting sequence number that allows +listeners to detect lost notifications. +The sequence number is always the last element in a multi-part ZMQ notification and +therefore backward compatible. +Each message type has its own counter. +(https://github.com/bitcoin/bitcoin/pull/7762) + ### Configuration and command-line options ### Block and transaction handling diff --git a/doc/zmq.md b/doc/zmq.md index 902d1124c7..8d795a388a 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -99,3 +99,8 @@ using other means such as firewalling. Note that when the block chain tip changes, a reorganisation may occur and just the tip will be notified. It is up to the subscriber to retrieve the chain from the last known block to the new tip. + +There are several possibilities that ZMQ notification can get lost +during transmission depending on the communication type your are +using. Bitcoind appends an up-counting sequence number to each +notification which allows listeners to detect lost notifications. diff --git a/qa/rpc-tests/zmq_test.py b/qa/rpc-tests/zmq_test.py index 3a8d62ef2e..97850bea3c 100755 --- a/qa/rpc-tests/zmq_test.py +++ b/qa/rpc-tests/zmq_test.py @@ -11,6 +11,7 @@ from test_framework.test_framework import BitcoinTestFramework from test_framework.util import * import zmq import binascii +import struct try: import http.client as httplib @@ -47,11 +48,17 @@ class ZMQTest (BitcoinTestFramework): print "listen..." msg = self.zmqSubSocket.recv_multipart() topic = msg[0] + assert_equal(topic, b"hashtx") body = msg[1] + nseq = msg[2] + msgSequence = struct.unpack('<I', msg[-1])[-1] + assert_equal(msgSequence, 0) #must be sequence 0 on hashtx msg = self.zmqSubSocket.recv_multipart() topic = msg[0] body = msg[1] + msgSequence = struct.unpack('<I', msg[-1])[-1] + assert_equal(msgSequence, 0) #must be sequence 0 on hashblock blkhash = bytes_to_hex_str(body) assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq @@ -61,12 +68,16 @@ class ZMQTest (BitcoinTestFramework): self.sync_all() zmqHashes = [] + blockcount = 0 for x in range(0,n*2): msg = self.zmqSubSocket.recv_multipart() topic = msg[0] body = msg[1] if topic == b"hashblock": zmqHashes.append(bytes_to_hex_str(body)) + msgSequence = struct.unpack('<I', msg[-1])[-1] + assert_equal(msgSequence, blockcount+1) + blockcount += 1 for x in range(0,n): assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq @@ -82,6 +93,8 @@ class ZMQTest (BitcoinTestFramework): hashZMQ = "" if topic == b"hashtx": hashZMQ = bytes_to_hex_str(body) + msgSequence = struct.unpack('<I', msg[-1])[-1] + assert_equal(msgSequence, blockcount+1) assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index f5839620ff..b6c907980f 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -9,6 +9,11 @@ static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWTX = "rawtx"; + // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) { @@ -118,6 +123,23 @@ void CZMQAbstractPublishNotifier::Shutdown() psocket = 0; } +bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +{ + assert(psocket); + + /* send three parts, command & data & a LE 4byte sequence number */ + unsigned char msgseq[sizeof(uint32_t)]; + WriteLE32(&msgseq[0], nSequence); + int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0); + if (rc == -1) + return false; + + /* increment memory only sequence number after sending */ + nSequence++; + + return true; +} + bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); @@ -125,8 +147,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -136,8 +157,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHTX, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) @@ -158,8 +178,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) ss << block; } - int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -168,6 +187,5 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; - int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index 44d5cbea67..22f02a3d0d 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -11,7 +11,19 @@ class CBlockIndex; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { +private: + uint32_t nSequence; //! upcounting per message sequence number + public: + + /* send zmq multipart message + parts: + * command + * data + * message sequence number + */ + bool SendMessage(const char *command, const void* data, size_t size); + bool Initialize(void *pcontext); void Shutdown(); }; |