aboutsummaryrefslogtreecommitdiff
path: root/src/net.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.h')
-rw-r--r--src/net.h77
1 files changed, 38 insertions, 39 deletions
diff --git a/src/net.h b/src/net.h
index 7a32e21f4a..6ca402f71d 100644
--- a/src/net.h
+++ b/src/net.h
@@ -1,5 +1,5 @@
// Copyright (c) 2009-2010 Satoshi Nakamoto
-// Copyright (c) 2009-2015 The Bitcoin Core developers
+// Copyright (c) 2009-2016 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@@ -19,11 +19,14 @@
#include "streams.h"
#include "sync.h"
#include "uint256.h"
+#include "threadinterrupt.h"
#include <atomic>
#include <deque>
#include <stdint.h>
+#include <thread>
#include <memory>
+#include <condition_variable>
#ifndef WIN32
#include <arpa/inet.h>
@@ -101,6 +104,20 @@ class CTransaction;
class CNodeStats;
class CClientUIInterface;
+struct CSerializedNetMsg
+{
+ CSerializedNetMsg() = default;
+ CSerializedNetMsg(CSerializedNetMsg&&) = default;
+ CSerializedNetMsg& operator=(CSerializedNetMsg&&) = default;
+ // No copying, only moves.
+ CSerializedNetMsg(const CSerializedNetMsg& msg) = delete;
+ CSerializedNetMsg& operator=(const CSerializedNetMsg&) = delete;
+
+ std::vector<unsigned char> data;
+ std::string command;
+};
+
+
class CConnman
{
public:
@@ -128,8 +145,9 @@ public:
};
CConnman(uint64_t seed0, uint64_t seed1);
~CConnman();
- bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options);
+ bool Start(CScheduler& scheduler, std::string& strNodeError, Options options);
void Stop();
+ void Interrupt();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
bool GetNetworkActive() const { return fNetworkActive; };
void SetNetworkActive(bool active);
@@ -138,32 +156,7 @@ public:
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
- template <typename... Args>
- void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
- {
- auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
- ::SerializeMany(msg, std::forward<Args>(args)...);
- EndMessage(msg);
- PushMessage(pnode, msg, sCommand);
- }
-
- template <typename... Args>
- void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
- {
- PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
- }
-
- template <typename... Args>
- void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
- {
- PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
- }
-
- template <typename... Args>
- void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
- {
- PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
- }
+ void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);
template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func)
@@ -374,10 +367,6 @@ private:
unsigned int GetReceiveFloodSize() const;
- CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
- void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
- void EndMessage(CDataStream& strm);
-
// Network stats
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);
@@ -417,7 +406,6 @@ private:
std::list<CNode*> vNodesDisconnected;
mutable CCriticalSection cs_vNodes;
std::atomic<NodeId> nLastNodeId;
- boost::condition_variable messageHandlerCondition;
/** Services this instance offers */
ServiceFlags nLocalServices;
@@ -434,6 +422,18 @@ private:
/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1;
+
+ std::condition_variable condMsgProc;
+ std::mutex mutexMsgProc;
+ std::atomic<bool> flagInterruptMsgProc;
+
+ CThreadInterrupt interruptNet;
+
+ std::thread threadDNSAddressSeed;
+ std::thread threadSocketHandler;
+ std::thread threadOpenAddedConnections;
+ std::thread threadOpenConnections;
+ std::thread threadMessageHandler;
};
extern std::unique_ptr<CConnman> g_connman;
void Discover(boost::thread_group& threadGroup);
@@ -460,8 +460,8 @@ struct CombinerAll
// Signals for message handling
struct CNodeSignals
{
- boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> ProcessMessages;
- boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> SendMessages;
+ boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> ProcessMessages;
+ boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> SendMessages;
boost::signals2::signal<void (CNode*, CConnman&)> InitializeNode;
boost::signals2::signal<void (NodeId, bool&)> FinalizeNode;
};
@@ -601,7 +601,7 @@ public:
size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent
uint64_t nSendBytes;
- std::deque<CSerializeData> vSendMsg;
+ std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;
std::deque<CInv> vRecvGetData;
@@ -628,9 +628,8 @@ public:
bool fOneShot;
bool fClient;
const bool fInbound;
- bool fNetworkNode;
bool fSuccessfullyConnected;
- bool fDisconnect;
+ std::atomic_bool fDisconnect;
// We use fRelayTxes for two purposes -
// a) it allows us to not relay tx invs before receiving the peer's version message
// b) the peer may tell us in its version message that we should not relay tx invs
@@ -771,7 +770,7 @@ public:
{
// The send version should always be explicitly set to
// INIT_PROTO_VERSION rather than using this value until the handshake
- // is complete. See PushMessageWithVersion().
+ // is complete.
assert(nSendVersion != 0);
return nSendVersion;
}