aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWladimir J. van der Laan <laanwj@gmail.com>2016-04-19 15:36:58 +0200
committerWladimir J. van der Laan <laanwj@gmail.com>2016-04-19 15:44:38 +0200
commita1eb344ba8f4ef7dade755c823a9d001f837ae7d (patch)
treed063a57050a83688d042ceb308066c64dbbd9da8
parent0e6fd5e4af31ada6da8df006801b59e9178ecd85 (diff)
parent0b25a9fb42d5df54ea35ddb2bb4837e1e29355fd (diff)
Merge #7762: [ZMQ] append a message sequence number to every ZMQ notification
0b25a9f [ZMQ] append a message sequence number to every ZMQ notification (Jonas Schnelli) de821d5 [ZMQ] refactor message string (Jonas Schnelli)
-rwxr-xr-xcontrib/zmq/zmq_sub.py14
-rw-r--r--doc/release-notes.md9
-rw-r--r--doc/zmq.md5
-rwxr-xr-xqa/rpc-tests/zmq_test.py13
-rw-r--r--src/zmq/zmqpublishnotifier.cpp34
-rw-r--r--src/zmq/zmqpublishnotifier.h12
6 files changed, 74 insertions, 13 deletions
diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py
index decf29d42a..6268123dd8 100755
--- a/contrib/zmq/zmq_sub.py
+++ b/contrib/zmq/zmq_sub.py
@@ -3,6 +3,7 @@
import array
import binascii
import zmq
+import struct
port = 28332
@@ -19,18 +20,21 @@ try:
msg = zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
-
+ sequence = "Unknown";
+ if len(msg[-1]) == 4:
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ sequence = str(msgSequence)
if topic == "hashblock":
- print "- HASH BLOCK -"
+ print '- HASH BLOCK ('+sequence+') -'
print binascii.hexlify(body)
elif topic == "hashtx":
- print '- HASH TX -'
+ print '- HASH TX ('+sequence+') -'
print binascii.hexlify(body)
elif topic == "rawblock":
- print "- RAW BLOCK HEADER -"
+ print '- RAW BLOCK HEADER ('+sequence+') -'
print binascii.hexlify(body[:80])
elif topic == "rawtx":
- print '- RAW TX -'
+ print '- RAW TX ('+sequence+') -'
print binascii.hexlify(body)
except KeyboardInterrupt:
diff --git a/doc/release-notes.md b/doc/release-notes.md
index 8360cc4816..73ad13e722 100644
--- a/doc/release-notes.md
+++ b/doc/release-notes.md
@@ -52,6 +52,15 @@ The following outputs are affected by this change:
- REST `/rest/block/` (JSON format when including extended tx details)
- `bitcoin-tx -json`
+### ZMQ
+
+Each ZMQ notification now contains an up-counting sequence number that allows
+listeners to detect lost notifications.
+The sequence number is always the last element in a multi-part ZMQ notification and
+therefore backward compatible.
+Each message type has its own counter.
+(https://github.com/bitcoin/bitcoin/pull/7762)
+
### Configuration and command-line options
### Block and transaction handling
diff --git a/doc/zmq.md b/doc/zmq.md
index 902d1124c7..8d795a388a 100644
--- a/doc/zmq.md
+++ b/doc/zmq.md
@@ -99,3 +99,8 @@ using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip.
+
+There are several possibilities that ZMQ notification can get lost
+during transmission depending on the communication type your are
+using. Bitcoind appends an up-counting sequence number to each
+notification which allows listeners to detect lost notifications.
diff --git a/qa/rpc-tests/zmq_test.py b/qa/rpc-tests/zmq_test.py
index 3a8d62ef2e..97850bea3c 100755
--- a/qa/rpc-tests/zmq_test.py
+++ b/qa/rpc-tests/zmq_test.py
@@ -11,6 +11,7 @@ from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
import zmq
import binascii
+import struct
try:
import http.client as httplib
@@ -47,11 +48,17 @@ class ZMQTest (BitcoinTestFramework):
print "listen..."
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
+ assert_equal(topic, b"hashtx")
body = msg[1]
+ nseq = msg[2]
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, 0) #must be sequence 0 on hashtx
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, 0) #must be sequence 0 on hashblock
blkhash = bytes_to_hex_str(body)
assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq
@@ -61,12 +68,16 @@ class ZMQTest (BitcoinTestFramework):
self.sync_all()
zmqHashes = []
+ blockcount = 0
for x in range(0,n*2):
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
if topic == b"hashblock":
zmqHashes.append(bytes_to_hex_str(body))
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, blockcount+1)
+ blockcount += 1
for x in range(0,n):
assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq
@@ -82,6 +93,8 @@ class ZMQTest (BitcoinTestFramework):
hashZMQ = ""
if topic == b"hashtx":
hashZMQ = bytes_to_hex_str(body)
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, blockcount+1)
assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index f5839620ff..b6c907980f 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -9,6 +9,11 @@
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
+static const char *MSG_HASHBLOCK = "hashblock";
+static const char *MSG_HASHTX = "hashtx";
+static const char *MSG_RAWBLOCK = "rawblock";
+static const char *MSG_RAWTX = "rawtx";
+
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
@@ -118,6 +123,23 @@ void CZMQAbstractPublishNotifier::Shutdown()
psocket = 0;
}
+bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
+{
+ assert(psocket);
+
+ /* send three parts, command & data & a LE 4byte sequence number */
+ unsigned char msgseq[sizeof(uint32_t)];
+ WriteLE32(&msgseq[0], nSequence);
+ int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
+ if (rc == -1)
+ return false;
+
+ /* increment memory only sequence number after sending */
+ nSequence++;
+
+ return true;
+}
+
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
@@ -125,8 +147,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
- int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
- return rc == 0;
+ return SendMessage(MSG_HASHBLOCK, data, 32);
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@@ -136,8 +157,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
- int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
- return rc == 0;
+ return SendMessage(MSG_HASHTX, data, 32);
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
@@ -158,8 +178,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
ss << block;
}
- int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
- return rc == 0;
+ return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@@ -168,6 +187,5 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
- int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
- return rc == 0;
+ return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index 44d5cbea67..22f02a3d0d 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -11,7 +11,19 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
+private:
+ uint32_t nSequence; //! upcounting per message sequence number
+
public:
+
+ /* send zmq multipart message
+ parts:
+ * command
+ * data
+ * message sequence number
+ */
+ bool SendMessage(const char *command, const void* data, size_t size);
+
bool Initialize(void *pcontext);
void Shutdown();
};