aboutsummaryrefslogtreecommitdiff
path: root/src/zmq/zmqnotificationinterface.cpp
diff options
context:
space:
mode:
authorJeff Garzik <jgarzik@bitpay.com>2015-09-16 09:12:07 -0400
committerJeff Garzik <jgarzik@bitpay.com>2015-09-16 09:12:07 -0400
commit13b828270a6f912aeb24dee87108c6bda83d1b96 (patch)
tree1c39d082db96075c308b470852e761086ff8fc65 /src/zmq/zmqnotificationinterface.cpp
parent9733bc99a0b967522ff47e00f4d8e408e6ee5c4f (diff)
parent029e278286cb861901c9cb8e1b84855ec1640aac (diff)
downloadbitcoin-13b828270a6f912aeb24dee87108c6bda83d1b96.tar.xz
Merge pull request #6103
Diffstat (limited to 'src/zmq/zmqnotificationinterface.cpp')
-rw-r--r--src/zmq/zmqnotificationinterface.cpp155
1 files changed, 155 insertions, 0 deletions
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
new file mode 100644
index 0000000000..71ccb59a4a
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -0,0 +1,155 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqnotificationinterface.h"
+#include "zmqpublishnotifier.h"
+
+#include "version.h"
+#include "main.h"
+#include "streams.h"
+#include "util.h"
+
+void zmqError(const char *str)
+{
+ LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
+}
+
+CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
+{
+}
+
+CZMQNotificationInterface::~CZMQNotificationInterface()
+{
+ // ensure Shutdown if Initialize is called
+ assert(!pcontext);
+
+ for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+ {
+ delete *i;
+ }
+}
+
+CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
+{
+ CZMQNotificationInterface* notificationInterface = NULL;
+ std::map<std::string, CZMQNotifierFactory> factories;
+ std::list<CZMQAbstractNotifier*> notifiers;
+
+ factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
+ factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
+ factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
+ factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
+
+ for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
+ {
+ std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
+ if (j!=args.end())
+ {
+ CZMQNotifierFactory factory = i->second;
+ std::string address = j->second;
+ CZMQAbstractNotifier *notifier = factory();
+ notifier->SetType(i->first);
+ notifier->SetAddress(address);
+ notifiers.push_back(notifier);
+ }
+ }
+
+ if (!notifiers.empty())
+ {
+ notificationInterface = new CZMQNotificationInterface();
+ notificationInterface->notifiers = notifiers;
+ }
+
+ return notificationInterface;
+}
+
+// Called at startup to conditionally set up ZMQ socket(s)
+bool CZMQNotificationInterface::Initialize()
+{
+ LogPrint("zmq", "Initialize notification interface\n");
+ assert(!pcontext);
+
+ pcontext = zmq_init(1);
+
+ if (!pcontext)
+ {
+ zmqError("Unable to initialize context");
+ return false;
+ }
+
+ std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
+ for (; i!=notifiers.end(); ++i)
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->Initialize(pcontext))
+ {
+ LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ }
+ else
+ {
+ LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ break;
+ }
+ }
+
+ if (i!=notifiers.end())
+ {
+ Shutdown();
+ return false;
+ }
+
+ return false;
+}
+
+// Called during shutdown sequence
+void CZMQNotificationInterface::Shutdown()
+{
+ LogPrint("zmq", "Shutdown notification interface\n");
+ if (pcontext)
+ {
+ for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ notifier->Shutdown();
+ }
+ zmq_ctx_destroy(pcontext);
+
+ pcontext = 0;
+ }
+}
+
+void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash)
+{
+ for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->NotifyBlock(hash))
+ {
+ i++;
+ }
+ else
+ {
+ notifier->Shutdown();
+ i = notifiers.erase(i);
+ }
+ }
+}
+
+void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
+{
+ for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->NotifyTransaction(tx))
+ {
+ i++;
+ }
+ else
+ {
+ notifier->Shutdown();
+ i = notifiers.erase(i);
+ }
+ }
+}