aboutsummaryrefslogtreecommitdiff
path: root/src/zmq/zmqnotificationinterface.cpp
diff options
context:
space:
mode:
authorJeff Garzik <jgarzik@bitpay.com>2014-11-18 12:06:32 -0500
committerJoão Barbosa <joao@bitreserve.org>2015-09-16 11:01:35 +0100
commite6a14b64d665eb1fafd03a6bbc8d14597ce1c83c (patch)
treebfefc581188fb56413c6d29cde625b0395faebc5 /src/zmq/zmqnotificationinterface.cpp
parent1136879df8af2358efb706c5af886778fbd94989 (diff)
Add ZeroMQ support. Notify blocks and transactions via ZeroMQ
Continues Johnathan Corgan's work. Publishing multipart messages Bugfix: Add missing zmq header includes Bugfix: Adjust build system to link ZeroMQ code for Qt binaries
Diffstat (limited to 'src/zmq/zmqnotificationinterface.cpp')
-rw-r--r--src/zmq/zmqnotificationinterface.cpp155
1 files changed, 155 insertions, 0 deletions
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
new file mode 100644
index 0000000000..71ccb59a4a
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -0,0 +1,155 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqnotificationinterface.h"
+#include "zmqpublishnotifier.h"
+
+#include "version.h"
+#include "main.h"
+#include "streams.h"
+#include "util.h"
+
+void zmqError(const char *str)
+{
+ LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
+}
+
+CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
+{
+}
+
+CZMQNotificationInterface::~CZMQNotificationInterface()
+{
+ // ensure Shutdown if Initialize is called
+ assert(!pcontext);
+
+ for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+ {
+ delete *i;
+ }
+}
+
+CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
+{
+ CZMQNotificationInterface* notificationInterface = NULL;
+ std::map<std::string, CZMQNotifierFactory> factories;
+ std::list<CZMQAbstractNotifier*> notifiers;
+
+ factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
+ factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
+ factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
+ factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
+
+ for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
+ {
+ std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
+ if (j!=args.end())
+ {
+ CZMQNotifierFactory factory = i->second;
+ std::string address = j->second;
+ CZMQAbstractNotifier *notifier = factory();
+ notifier->SetType(i->first);
+ notifier->SetAddress(address);
+ notifiers.push_back(notifier);
+ }
+ }
+
+ if (!notifiers.empty())
+ {
+ notificationInterface = new CZMQNotificationInterface();
+ notificationInterface->notifiers = notifiers;
+ }
+
+ return notificationInterface;
+}
+
+// Called at startup to conditionally set up ZMQ socket(s)
+bool CZMQNotificationInterface::Initialize()
+{
+ LogPrint("zmq", "Initialize notification interface\n");
+ assert(!pcontext);
+
+ pcontext = zmq_init(1);
+
+ if (!pcontext)
+ {
+ zmqError("Unable to initialize context");
+ return false;
+ }
+
+ std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
+ for (; i!=notifiers.end(); ++i)
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->Initialize(pcontext))
+ {
+ LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ }
+ else
+ {
+ LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ break;
+ }
+ }
+
+ if (i!=notifiers.end())
+ {
+ Shutdown();
+ return false;
+ }
+
+ return false;
+}
+
+// Called during shutdown sequence
+void CZMQNotificationInterface::Shutdown()
+{
+ LogPrint("zmq", "Shutdown notification interface\n");
+ if (pcontext)
+ {
+ for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ notifier->Shutdown();
+ }
+ zmq_ctx_destroy(pcontext);
+
+ pcontext = 0;
+ }
+}
+
+void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash)
+{
+ for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->NotifyBlock(hash))
+ {
+ i++;
+ }
+ else
+ {
+ notifier->Shutdown();
+ i = notifiers.erase(i);
+ }
+ }
+}
+
+void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
+{
+ for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->NotifyTransaction(tx))
+ {
+ i++;
+ }
+ else
+ {
+ notifier->Shutdown();
+ i = notifiers.erase(i);
+ }
+ }
+}