aboutsummaryrefslogtreecommitdiff
path: root/src/zmq/zmqpublishnotifier.cpp
diff options
context:
space:
mode:
authorJeff Garzik <jgarzik@bitpay.com>2014-11-18 12:06:32 -0500
committerJoão Barbosa <joao@bitreserve.org>2015-09-16 11:01:35 +0100
commite6a14b64d665eb1fafd03a6bbc8d14597ce1c83c (patch)
treebfefc581188fb56413c6d29cde625b0395faebc5 /src/zmq/zmqpublishnotifier.cpp
parent1136879df8af2358efb706c5af886778fbd94989 (diff)
Add ZeroMQ support. Notify blocks and transactions via ZeroMQ
Continues Johnathan Corgan's work. Publishing multipart messages Bugfix: Add missing zmq header includes Bugfix: Adjust build system to link ZeroMQ code for Qt binaries
Diffstat (limited to 'src/zmq/zmqpublishnotifier.cpp')
-rw-r--r--src/zmq/zmqpublishnotifier.cpp172
1 files changed, 172 insertions, 0 deletions
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
new file mode 100644
index 0000000000..0a6d7d0dbc
--- /dev/null
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -0,0 +1,172 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqpublishnotifier.h"
+#include "main.h"
+#include "util.h"
+
+static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
+
+// Internal function to send multipart message
+static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
+{
+ va_list args;
+ va_start(args, size);
+
+ while (1)
+ {
+ zmq_msg_t msg;
+
+ int rc = zmq_msg_init_size(&msg, size);
+ if (rc != 0)
+ {
+ zmqError("Unable to initialize ZMQ msg");
+ return -1;
+ }
+
+ void *buf = zmq_msg_data(&msg);
+ memcpy(buf, data, size);
+
+ data = va_arg(args, const void*);
+
+ rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
+ if (rc == -1)
+ {
+ zmqError("Unable to send ZMQ msg");
+ zmq_msg_close(&msg);
+ return -1;
+ }
+
+ zmq_msg_close(&msg);
+
+ if (!data)
+ break;
+
+ size = va_arg(args, size_t);
+ }
+ return 0;
+}
+
+bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
+{
+ assert(!psocket);
+
+ // check if address is being used by other publish notifier
+ std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
+
+ if (i==mapPublishNotifiers.end())
+ {
+ psocket = zmq_socket(pcontext, ZMQ_PUB);
+ if (!psocket)
+ {
+ zmqError("Failed to create socket");
+ return false;
+ }
+
+ int rc = zmq_bind(psocket, address.c_str());
+ if (rc!=0)
+ {
+ zmqError("Failed to bind address");
+ return false;
+ }
+
+ // register this notifier for the address, so it can be reused for other publish notifier
+ mapPublishNotifiers.insert(std::make_pair(address, this));
+ return true;
+ }
+ else
+ {
+ LogPrint("zmq", " Reuse socket for address %s\n", address);
+
+ psocket = i->second->psocket;
+ mapPublishNotifiers.insert(std::make_pair(address, this));
+
+ return true;
+ }
+}
+
+void CZMQAbstractPublishNotifier::Shutdown()
+{
+ assert(psocket);
+
+ int count = mapPublishNotifiers.count(address);
+
+ // remove this notifier from the list of publishers using this address
+ typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
+ std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
+
+ for (iterator it = iterpair.first; it != iterpair.second; ++it)
+ {
+ if (it->second==this)
+ {
+ mapPublishNotifiers.erase(it);
+ break;
+ }
+ }
+
+ if (count == 1)
+ {
+ LogPrint("zmq", "Close socket at address %s\n", address);
+ int linger = 0;
+ zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
+ zmq_close(psocket);
+ }
+
+ psocket = 0;
+}
+
+bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+ LogPrint("zmq", "Publish hash block %s\n", hash.GetHex());
+ char data[32];
+ for (unsigned int i = 0; i < 32; i++)
+ data[31 - i] = hash.begin()[i];
+ int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
+ return rc == 0;
+}
+
+bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+ uint256 hash = transaction.GetHash();
+ LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex());
+ char data[32];
+ for (unsigned int i = 0; i < 32; i++)
+ data[31 - i] = hash.begin()[i];
+ int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
+ return rc == 0;
+}
+
+bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+ LogPrint("zmq", "Publish raw block %s\n", hash.GetHex());
+
+ CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+ {
+ LOCK(cs_main);
+
+ CBlock block;
+ CBlockIndex* pblockindex = mapBlockIndex[hash];
+
+ if(!ReadBlockFromDisk(block, pblockindex))
+ {
+ zmqError("Can't read block from disk");
+ return false;
+ }
+
+ ss << block;
+ }
+
+ int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
+ return rc == 0;
+}
+
+bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+ uint256 hash = transaction.GetHash();
+ LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex());
+ CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+ ss << transaction;
+ int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
+ return rc == 0;
+}