From e6a14b64d665eb1fafd03a6bbc8d14597ce1c83c Mon Sep 17 00:00:00 2001
From: Jeff Garzik <jgarzik@bitpay.com>
Date: Tue, 18 Nov 2014 12:06:32 -0500
Subject: Add ZeroMQ support. Notify blocks and transactions via ZeroMQ

Continues Johnathan Corgan's work.
Publishing multipart messages

Bugfix: Add missing zmq header includes

Bugfix: Adjust build system to link ZeroMQ code for Qt binaries
---
 src/Makefile.am                      |  26 +++++-
 src/Makefile.qt.include              |   3 +
 src/Makefile.qttest.include          |   3 +
 src/Makefile.test.include            |   4 +
 src/init.cpp                         |  36 +++++++-
 src/validationinterface.cpp          |   1 -
 src/zmq/zmqabstractnotifier.cpp      |  22 +++++
 src/zmq/zmqabstractnotifier.h        |  42 +++++++++
 src/zmq/zmqconfig.h                  |  24 +++++
 src/zmq/zmqnotificationinterface.cpp | 155 +++++++++++++++++++++++++++++++
 src/zmq/zmqnotificationinterface.h   |  35 +++++++
 src/zmq/zmqpublishnotifier.cpp       | 172 +++++++++++++++++++++++++++++++++++
 src/zmq/zmqpublishnotifier.h         |  41 +++++++++
 13 files changed, 560 insertions(+), 4 deletions(-)
 create mode 100644 src/zmq/zmqabstractnotifier.cpp
 create mode 100644 src/zmq/zmqabstractnotifier.h
 create mode 100644 src/zmq/zmqconfig.h
 create mode 100644 src/zmq/zmqnotificationinterface.cpp
 create mode 100644 src/zmq/zmqnotificationinterface.h
 create mode 100644 src/zmq/zmqpublishnotifier.cpp
 create mode 100644 src/zmq/zmqpublishnotifier.h

(limited to 'src')

diff --git a/src/Makefile.am b/src/Makefile.am
index 390e9f1436..67e848be39 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -48,6 +48,9 @@ if ENABLE_WALLET
 BITCOIN_INCLUDES += $(BDB_CPPFLAGS)
 EXTRA_LIBRARIES += libbitcoin_wallet.a
 endif
+if ENABLE_ZMQ
+EXTRA_LIBRARIES += libbitcoin_zmq.a
+endif
 
 if BUILD_BITCOIN_LIBS
 lib_LTLIBRARIES = libbitcoinconsensus.la
@@ -157,7 +160,12 @@ BITCOIN_CORE_H = \
   wallet/db.h \
   wallet/wallet.h \
   wallet/wallet_ismine.h \
-  wallet/walletdb.h
+  wallet/walletdb.h \
+  zmq/zmqabstractnotifier.h \
+  zmq/zmqconfig.h\
+  zmq/zmqnotificationinterface.h \
+  zmq/zmqpublishnotifier.h
+
 
 obj/build.h: FORCE
 	@$(MKDIR_P) $(builddir)/obj
@@ -199,6 +207,17 @@ libbitcoin_server_a_SOURCES = \
   validationinterface.cpp \
   $(BITCOIN_CORE_H)
 
+if ENABLE_ZMQ
+LIBBITCOIN_ZMQ=libbitcoin_zmq.a
+
+libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES)
+libbitcoin_zmq_a_SOURCES = \
+  zmq/zmqabstractnotifier.cpp \
+  zmq/zmqnotificationinterface.cpp \
+  zmq/zmqpublishnotifier.cpp
+endif
+
+
 # wallet: shared between bitcoind and bitcoin-qt, but only linked
 # when wallet enabled
 libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
@@ -320,12 +339,15 @@ bitcoind_LDADD = \
   $(LIBMEMENV) \
   $(LIBSECP256K1)
 
+if ENABLE_ZMQ
+bitcoind_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
+
 if ENABLE_WALLET
 bitcoind_LDADD += libbitcoin_wallet.a
 endif
 
 bitcoind_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
-#
 
 # bitcoin-cli binary #
 bitcoin_cli_SOURCES = bitcoin-cli.cpp
diff --git a/src/Makefile.qt.include b/src/Makefile.qt.include
index 8d60aca25c..3e8eda1782 100644
--- a/src/Makefile.qt.include
+++ b/src/Makefile.qt.include
@@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER)
 if ENABLE_WALLET
 qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
 endif
+if ENABLE_ZMQ
+qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
 qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
   $(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
   $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
diff --git a/src/Makefile.qttest.include b/src/Makefile.qttest.include
index 4250bb8f3b..6554580bea 100644
--- a/src/Makefile.qttest.include
+++ b/src/Makefile.qttest.include
@@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER)
 if ENABLE_WALLET
 qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
 endif
+if ENABLE_ZMQ
+qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
 qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \
   $(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \
   $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
diff --git a/src/Makefile.test.include b/src/Makefile.test.include
index cc60cd92bb..cee35926a5 100644
--- a/src/Makefile.test.include
+++ b/src/Makefile.test.include
@@ -100,6 +100,10 @@ endif
 test_test_bitcoin_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS)
 test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static
 
+if ENABLE_ZMQ
+test_test_bitcoin_LDADD += $(ZMQ_LIBS)
+endif
+
 nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES)
 
 $(BITCOIN_TESTS): $(GENERATED_TEST_FILES)
diff --git a/src/init.cpp b/src/init.cpp
index a12e38ff53..f03388120c 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -38,7 +38,6 @@
 #include "wallet/wallet.h"
 #include "wallet/walletdb.h"
 #endif
-
 #include <stdint.h>
 #include <stdio.h>
 
@@ -55,6 +54,10 @@
 #include <boost/thread.hpp>
 #include <openssl/crypto.h>
 
+#if ENABLE_ZMQ
+#include "zmq/zmqnotificationinterface.h"
+#endif
+
 using namespace std;
 
 #ifdef ENABLE_WALLET
@@ -62,6 +65,10 @@ CWallet* pwalletMain = NULL;
 #endif
 bool fFeeEstimatesInitialized = false;
 
+#if ENABLE_ZMQ
+static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
+#endif
+
 #ifdef WIN32
 // Win32 LevelDB doesn't use filedescriptors, and the ones used for
 // accessing block files don't count towards the fd_set size limit
@@ -211,6 +218,16 @@ void Shutdown()
     if (pwalletMain)
         pwalletMain->Flush(true);
 #endif
+
+#if ENABLE_ZMQ
+    if (pzmqNotificationInterface) {
+        UnregisterValidationInterface(pzmqNotificationInterface);
+        pzmqNotificationInterface->Shutdown();
+        delete pzmqNotificationInterface;
+        pzmqNotificationInterface = NULL;
+    }
+#endif
+
 #ifndef WIN32
     try {
         boost::filesystem::remove(GetPidFile());
@@ -375,6 +392,14 @@ std::string HelpMessage(HelpMessageMode mode)
         " " + _("(1 = keep tx meta data e.g. account owner and payment request information, 2 = drop tx meta data)"));
 #endif
 
+#if ENABLE_ZMQ
+    strUsage += HelpMessageGroup(_("ZeroMQ notification options:"));
+    strUsage += HelpMessageOpt("-zmqpubhashblock=<address>", _("Enable publish hash block in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubhashtransaction=<address>", _("Enable publish hash transaction in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubrawblock=<address>", _("Enable publish raw block in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubrawtransaction=<address>", _("Enable publish raw transaction in <address>"));
+#endif
+
     strUsage += HelpMessageGroup(_("Debugging/Testing options:"));
     if (showDebug)
     {
@@ -1125,6 +1150,15 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
     BOOST_FOREACH(const std::string& strDest, mapMultiArgs["-seednode"])
         AddOneShot(strDest);
 
+#if ENABLE_ZMQ
+    pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs);
+
+    if (pzmqNotificationInterface) {
+        pzmqNotificationInterface->Initialize();
+        RegisterValidationInterface(pzmqNotificationInterface);
+    }
+#endif
+
     // ********************************************************* Step 7: load block chain
 
     fReindex = GetBoolArg("-reindex", false);
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index bdb7069071..81f3b775f4 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -45,7 +45,6 @@ void UnregisterAllValidationInterfaces() {
     g_signals.SetBestChain.disconnect_all_slots();
     g_signals.UpdatedTransaction.disconnect_all_slots();
     g_signals.SyncTransaction.disconnect_all_slots();
-    g_signals.UpdatedTransaction.disconnect_all_slots();
     g_signals.UpdatedBlockTip.disconnect_all_slots();
 }
 
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
new file mode 100644
index 0000000000..744ec59234
--- /dev/null
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -0,0 +1,22 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqabstractnotifier.h"
+#include "util.h"
+
+
+CZMQAbstractNotifier::~CZMQAbstractNotifier()
+{
+    assert(!psocket);
+}
+
+bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/)
+{
+    return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
+{
+    return true;
+}
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
new file mode 100644
index 0000000000..626d1ddf92
--- /dev/null
+++ b/src/zmq/zmqabstractnotifier.h
@@ -0,0 +1,42 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+
+#include "zmqconfig.h"
+
+class CZMQAbstractNotifier;
+typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
+
+class CZMQAbstractNotifier
+{
+public:
+    CZMQAbstractNotifier() : psocket(0) { }
+    virtual ~CZMQAbstractNotifier();
+
+    template <typename T>
+    static CZMQAbstractNotifier* Create()
+    {
+        return new T();
+    }
+
+    std::string GetType() const { return type; }
+    void SetType(const std::string &t) { type = t; }
+    std::string GetAddress() const { return address; }
+    void SetAddress(const std::string &a) { address = a; }
+
+    virtual bool Initialize(void *pcontext) = 0;
+    virtual void Shutdown() = 0;
+
+    virtual bool NotifyBlock(const uint256 &hash);
+    virtual bool NotifyTransaction(const CTransaction &transaction);
+
+protected:
+    void *psocket;
+    std::string type;
+    std::string address;
+};
+
+#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h
new file mode 100644
index 0000000000..6057f5d1a0
--- /dev/null
+++ b/src/zmq/zmqconfig.h
@@ -0,0 +1,24 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
+#define BITCOIN_ZMQ_ZMQCONFIG_H
+
+#if defined(HAVE_CONFIG_H)
+#include "config/bitcoin-config.h"
+#endif
+
+#include <stdarg.h>
+#include <string>
+
+#if ENABLE_ZMQ
+#include <zmq.h>
+#endif
+
+#include "primitives/block.h"
+#include "primitives/transaction.h"
+
+void zmqError(const char *str);
+
+#endif // BITCOIN_ZMQ_ZMQCONFIG_H
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
new file mode 100644
index 0000000000..71ccb59a4a
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -0,0 +1,155 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqnotificationinterface.h"
+#include "zmqpublishnotifier.h"
+
+#include "version.h"
+#include "main.h"
+#include "streams.h"
+#include "util.h"
+
+void zmqError(const char *str)
+{
+    LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
+}
+
+CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
+{
+}
+
+CZMQNotificationInterface::~CZMQNotificationInterface()
+{
+    // ensure Shutdown if Initialize is called
+    assert(!pcontext);
+
+    for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+    {
+        delete *i;
+    }
+}
+
+CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
+{
+    CZMQNotificationInterface* notificationInterface = NULL;
+    std::map<std::string, CZMQNotifierFactory> factories;
+    std::list<CZMQAbstractNotifier*> notifiers;
+
+    factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
+    factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
+    factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
+    factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
+
+    for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
+    {
+        std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
+        if (j!=args.end())
+        {
+            CZMQNotifierFactory factory = i->second;
+            std::string address = j->second;
+            CZMQAbstractNotifier *notifier = factory();
+            notifier->SetType(i->first);
+            notifier->SetAddress(address);
+            notifiers.push_back(notifier);
+        }
+    }
+
+    if (!notifiers.empty())
+    {
+        notificationInterface = new CZMQNotificationInterface();
+        notificationInterface->notifiers = notifiers;
+    }
+
+    return notificationInterface;
+}
+
+// Called at startup to conditionally set up ZMQ socket(s)
+bool CZMQNotificationInterface::Initialize()
+{
+    LogPrint("zmq", "Initialize notification interface\n");
+    assert(!pcontext);
+
+    pcontext = zmq_init(1);
+
+    if (!pcontext)
+    {
+        zmqError("Unable to initialize context");
+        return false;
+    }
+
+    std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
+    for (; i!=notifiers.end(); ++i)
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->Initialize(pcontext))
+        {
+            LogPrint("zmq", "  Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+        }
+        else
+        {
+            LogPrint("zmq", "  Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+            break;
+        }
+    }
+
+    if (i!=notifiers.end())
+    {
+        Shutdown();
+        return false;
+    }
+
+    return false;
+}
+
+// Called during shutdown sequence
+void CZMQNotificationInterface::Shutdown()
+{
+    LogPrint("zmq", "Shutdown notification interface\n");
+    if (pcontext)
+    {
+        for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+        {
+            CZMQAbstractNotifier *notifier = *i;
+            LogPrint("zmq", "   Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+            notifier->Shutdown();
+        }
+        zmq_ctx_destroy(pcontext);
+
+        pcontext = 0;
+    }
+}
+
+void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash)
+{
+    for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->NotifyBlock(hash))
+        {
+            i++;
+        }
+        else
+        {
+            notifier->Shutdown();
+            i = notifiers.erase(i);
+        }
+    }
+}
+
+void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
+{
+    for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->NotifyTransaction(tx))
+        {
+            i++;
+        }
+        else
+        {
+            notifier->Shutdown();
+            i = notifiers.erase(i);
+        }
+    }
+}
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
new file mode 100644
index 0000000000..afc0b8d24e
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+
+#include "validationinterface.h"
+#include <string>
+#include <map>
+
+class CZMQAbstractNotifier;
+
+class CZMQNotificationInterface : public CValidationInterface
+{
+public:
+    virtual ~CZMQNotificationInterface();
+
+    static CZMQNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args);
+
+    bool Initialize();
+    void Shutdown();
+
+protected: // CValidationInterface
+    void SyncTransaction(const CTransaction &tx, const CBlock *pblock);
+    void UpdatedBlockTip(const uint256 &newHashTip);
+
+private:
+    CZMQNotificationInterface();
+
+    void *pcontext;
+    std::list<CZMQAbstractNotifier*> notifiers;
+};
+
+#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
new file mode 100644
index 0000000000..0a6d7d0dbc
--- /dev/null
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -0,0 +1,172 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqpublishnotifier.h"
+#include "main.h"
+#include "util.h"
+
+static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
+
+// Internal function to send multipart message
+static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
+{
+    va_list args;
+    va_start(args, size);
+
+    while (1)
+    {
+        zmq_msg_t msg;
+
+        int rc = zmq_msg_init_size(&msg, size);
+        if (rc != 0)
+        {
+            zmqError("Unable to initialize ZMQ msg");
+            return -1;
+        }
+
+        void *buf = zmq_msg_data(&msg);
+        memcpy(buf, data, size);
+
+        data = va_arg(args, const void*);
+
+        rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
+        if (rc == -1)
+        {
+            zmqError("Unable to send ZMQ msg");
+            zmq_msg_close(&msg);
+            return -1;
+        }
+
+        zmq_msg_close(&msg);
+
+        if (!data)
+            break;
+
+        size = va_arg(args, size_t);
+    }
+    return 0;
+}
+
+bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
+{
+    assert(!psocket);
+
+    // check if address is being used by other publish notifier
+    std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
+
+    if (i==mapPublishNotifiers.end())
+    {
+        psocket = zmq_socket(pcontext, ZMQ_PUB);
+        if (!psocket)
+        {
+            zmqError("Failed to create socket");
+            return false;
+        }
+
+        int rc = zmq_bind(psocket, address.c_str());
+        if (rc!=0)
+        {
+            zmqError("Failed to bind address");
+            return false;
+        }
+
+        // register this notifier for the address, so it can be reused for other publish notifier
+        mapPublishNotifiers.insert(std::make_pair(address, this));
+        return true;
+    }
+    else
+    {
+        LogPrint("zmq", "  Reuse socket for address %s\n", address);
+
+        psocket = i->second->psocket;
+        mapPublishNotifiers.insert(std::make_pair(address, this));
+
+        return true;
+    }
+}
+
+void CZMQAbstractPublishNotifier::Shutdown()
+{
+    assert(psocket);
+
+    int count = mapPublishNotifiers.count(address);
+
+    // remove this notifier from the list of publishers using this address
+    typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
+    std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
+
+    for (iterator it = iterpair.first; it != iterpair.second; ++it)
+    {
+        if (it->second==this)
+        {
+            mapPublishNotifiers.erase(it);
+            break;
+        }
+    }
+
+    if (count == 1)
+    {
+        LogPrint("zmq", "Close socket at address %s\n", address);
+        int linger = 0;
+        zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
+        zmq_close(psocket);
+    }
+
+    psocket = 0;
+}
+
+bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+    LogPrint("zmq", "Publish hash block %s\n", hash.GetHex());
+    char data[32];
+    for (unsigned int i = 0; i < 32; i++)
+        data[31 - i] = hash.begin()[i];
+    int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
+    return rc == 0;
+}
+
+bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+    uint256 hash = transaction.GetHash();
+    LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex());
+    char data[32];
+    for (unsigned int i = 0; i < 32; i++)
+        data[31 - i] = hash.begin()[i];
+    int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
+    return rc == 0;
+}
+
+bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+    LogPrint("zmq", "Publish raw block %s\n", hash.GetHex());
+
+    CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+    {
+        LOCK(cs_main);
+
+        CBlock block;
+        CBlockIndex* pblockindex = mapBlockIndex[hash];
+
+        if(!ReadBlockFromDisk(block, pblockindex))
+        {
+            zmqError("Can't read block from disk");
+            return false;
+        }
+
+        ss << block;
+    }
+
+    int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
+    return rc == 0;
+}
+
+bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+    uint256 hash = transaction.GetHash();
+    LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex());
+    CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+    ss << transaction;
+    int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
+    return rc == 0;
+}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
new file mode 100644
index 0000000000..a0eb26f5e2
--- /dev/null
+++ b/src/zmq/zmqpublishnotifier.h
@@ -0,0 +1,41 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+
+#include "zmqabstractnotifier.h"
+
+class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
+{
+public:
+    bool Initialize(void *pcontext);
+    void Shutdown();
+};
+
+class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyBlock(const uint256 &hash);
+};
+
+class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyTransaction(const CTransaction &transaction);
+};
+
+class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyBlock(const uint256 &hash);
+};
+
+class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyTransaction(const CTransaction &transaction);
+};
+
+#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
-- 
cgit v1.2.3