aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp2599
1 files changed, 1698 insertions, 901 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 42b3c30fb7..201914685c 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1,19 +1,24 @@
// Copyright (c) 2009-2010 Satoshi Nakamoto
-// Copyright (c) 2009-2014 The Bitcoin developers
-// Distributed under the MIT/X11 software license, see the accompanying
+// Copyright (c) 2009-2017 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#if defined(HAVE_CONFIG_H)
-#include "config/bitcoin-config.h"
+#include <config/bitcoin-config.h>
#endif
-#include "net.h"
+#include <net.h>
-#include "addrman.h"
-#include "chainparams.h"
-#include "clientversion.h"
-#include "primitives/transaction.h"
-#include "ui_interface.h"
+#include <chainparams.h>
+#include <clientversion.h>
+#include <consensus/consensus.h>
+#include <crypto/common.h>
+#include <crypto/sha256.h>
+#include <primitives/transaction.h>
+#include <netbase.h>
+#include <scheduler.h>
+#include <ui_interface.h>
+#include <utilstrencodings.h>
#ifdef WIN32
#include <string.h>
@@ -28,16 +33,24 @@
#include <miniupnpc/upnperrors.h>
#endif
-#include <boost/filesystem.hpp>
-#include <boost/thread.hpp>
-// Dump addresses to peers.dat every 15 minutes (900s)
+#include <math.h>
+
+// Dump addresses to peers.dat and banlist.dat every 15 minutes (900s)
#define DUMP_ADDRESSES_INTERVAL 900
-#if !defined(HAVE_MSG_NOSIGNAL) && !defined(MSG_NOSIGNAL)
+// We add a random period time (0 to 1 seconds) to feeler connections to prevent synchronization.
+#define FEELER_SLEEP_WINDOW 1
+
+#if !defined(HAVE_MSG_NOSIGNAL)
#define MSG_NOSIGNAL 0
#endif
+// MSG_DONTWAIT is not available on some platforms, if it doesn't exist define it as 0
+#if !defined(HAVE_MSG_DONTWAIT)
+#define MSG_DONTWAIT 0
+#endif
+
// Fix for ancient MinGW versions, that don't have defined these in ws2tcpip.h.
// Todo: Can be removed when our pull-tester is upgraded to a modern MinGW version.
#ifdef WIN32
@@ -49,63 +62,32 @@
#endif
#endif
-using namespace boost;
-using namespace std;
-
-namespace {
- const int MAX_OUTBOUND_CONNECTIONS = 8;
-
- struct ListenSocket {
- SOCKET socket;
- bool whitelisted;
+/** Used to pass flags to the Bind() function */
+enum BindFlags {
+ BF_NONE = 0,
+ BF_EXPLICIT = (1U << 0),
+ BF_REPORT_ERROR = (1U << 1),
+ BF_WHITELIST = (1U << 2),
+};
- ListenSocket(SOCKET socket, bool whitelisted) : socket(socket), whitelisted(whitelisted) {}
- };
-}
+const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*";
+static const uint64_t RANDOMIZER_ID_NETGROUP = 0x6c0edd8036ef4036ULL; // SHA256("netgroup")[0:8]
+static const uint64_t RANDOMIZER_ID_LOCALHOSTNONCE = 0xd93e69e2bbfa5735ULL; // SHA256("localhostnonce")[0:8]
//
// Global state variables
//
bool fDiscover = true;
bool fListen = true;
-uint64_t nLocalServices = NODE_NETWORK;
+bool fRelayTxes = true;
CCriticalSection cs_mapLocalHost;
-map<CNetAddr, LocalServiceInfo> mapLocalHost;
-static bool vfReachable[NET_MAX] = {};
+std::map<CNetAddr, LocalServiceInfo> mapLocalHost;
static bool vfLimited[NET_MAX] = {};
-static CNode* pnodeLocalHost = NULL;
-uint64_t nLocalHostNonce = 0;
-static std::vector<ListenSocket> vhListenSocket;
-CAddrMan addrman;
-int nMaxConnections = 125;
-bool fAddressesInitialized = false;
-
-vector<CNode*> vNodes;
-CCriticalSection cs_vNodes;
-map<CInv, CDataStream> mapRelay;
-deque<pair<int64_t, CInv> > vRelayExpiration;
-CCriticalSection cs_mapRelay;
-limitedmap<CInv, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);
-
-static deque<string> vOneShots;
-CCriticalSection cs_vOneShots;
-
-set<CNetAddr> setservAddNodeAddresses;
-CCriticalSection cs_setservAddNodeAddresses;
-
-vector<std::string> vAddedNodes;
-CCriticalSection cs_vAddedNodes;
-
-NodeId nLastNodeId = 0;
-CCriticalSection cs_nLastNodeId;
+std::string strSubVersion;
-static CSemaphore *semOutbound = NULL;
+limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);
-// Signals for message handling
-static CNodeSignals g_signals;
-CNodeSignals& GetNodeSignals() { return g_signals; }
-
-void AddOneShot(string strDest)
+void CConnman::AddOneShot(const std::string& strDest)
{
LOCK(cs_vOneShots);
vOneShots.push_back(strDest);
@@ -113,7 +95,7 @@ void AddOneShot(string strDest)
unsigned short GetListenPort()
{
- return (unsigned short)(GetArg("-port", Params().GetDefaultPort()));
+ return (unsigned short)(gArgs.GetArg("-port", Params().GetDefaultPort()));
}
// find 'best' local address for a particular peer
@@ -126,13 +108,13 @@ bool GetLocal(CService& addr, const CNetAddr *paddrPeer)
int nBestReachability = -1;
{
LOCK(cs_mapLocalHost);
- for (map<CNetAddr, LocalServiceInfo>::iterator it = mapLocalHost.begin(); it != mapLocalHost.end(); it++)
+ for (const auto& entry : mapLocalHost)
{
- int nScore = (*it).second.nScore;
- int nReachability = (*it).first.GetReachabilityFrom(paddrPeer);
+ int nScore = entry.second.nScore;
+ int nReachability = entry.first.GetReachabilityFrom(paddrPeer);
if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore))
{
- addr = CService((*it).first, (*it).second.nPort);
+ addr = CService(entry.first, entry.second.nPort);
nBestReachability = nReachability;
nBestScore = nScore;
}
@@ -141,73 +123,42 @@ bool GetLocal(CService& addr, const CNetAddr *paddrPeer)
return nBestScore >= 0;
}
+//! Convert the pnSeeds6 array into usable address objects.
+static std::vector<CAddress> convertSeed6(const std::vector<SeedSpec6> &vSeedsIn)
+{
+ // It'll only connect to one or two seed nodes because once it connects,
+ // it'll get a pile of addresses with newer timestamps.
+ // Seed nodes are given a random 'last seen time' of between one and two
+ // weeks ago.
+ const int64_t nOneWeek = 7*24*60*60;
+ std::vector<CAddress> vSeedsOut;
+ vSeedsOut.reserve(vSeedsIn.size());
+ for (const auto& seed_in : vSeedsIn) {
+ struct in6_addr ip;
+ memcpy(&ip, seed_in.addr, sizeof(ip));
+ CAddress addr(CService(ip, seed_in.port), GetDesirableServiceFlags(NODE_NONE));
+ addr.nTime = GetTime() - GetRand(nOneWeek) - nOneWeek;
+ vSeedsOut.push_back(addr);
+ }
+ return vSeedsOut;
+}
+
// get best local address for a particular peer as a CAddress
// Otherwise, return the unroutable 0.0.0.0 but filled in with
// the normal parameters, since the IP may be changed to a useful
// one by discovery.
-CAddress GetLocalAddress(const CNetAddr *paddrPeer)
+CAddress GetLocalAddress(const CNetAddr *paddrPeer, ServiceFlags nLocalServices)
{
- CAddress ret(CService("0.0.0.0",GetListenPort()),0);
+ CAddress ret(CService(CNetAddr(),GetListenPort()), nLocalServices);
CService addr;
if (GetLocal(addr, paddrPeer))
{
- ret = CAddress(addr);
+ ret = CAddress(addr, nLocalServices);
}
- ret.nServices = nLocalServices;
ret.nTime = GetAdjustedTime();
return ret;
}
-bool RecvLine(SOCKET hSocket, string& strLine)
-{
- strLine = "";
- while (true)
- {
- char c;
- int nBytes = recv(hSocket, &c, 1, 0);
- if (nBytes > 0)
- {
- if (c == '\n')
- continue;
- if (c == '\r')
- return true;
- strLine += c;
- if (strLine.size() >= 9000)
- return true;
- }
- else if (nBytes <= 0)
- {
- boost::this_thread::interruption_point();
- if (nBytes < 0)
- {
- int nErr = WSAGetLastError();
- if (nErr == WSAEMSGSIZE)
- continue;
- if (nErr == WSAEWOULDBLOCK || nErr == WSAEINTR || nErr == WSAEINPROGRESS)
- {
- MilliSleep(10);
- continue;
- }
- }
- if (!strLine.empty())
- return true;
- if (nBytes == 0)
- {
- // socket closed
- LogPrint("net", "socket closed\n");
- return false;
- }
- else
- {
- // socket error
- int nErr = WSAGetLastError();
- LogPrint("net", "recv failed: %s\n", NetworkErrorString(nErr));
- return false;
- }
- }
- }
-}
-
int GetnScore(const CService& addr)
{
LOCK(cs_mapLocalHost);
@@ -219,39 +170,34 @@ int GetnScore(const CService& addr)
// Is our peer's addrLocal potentially useful as an external IP source?
bool IsPeerAddrLocalGood(CNode *pnode)
{
- return fDiscover && pnode->addr.IsRoutable() && pnode->addrLocal.IsRoutable() &&
- !IsLimited(pnode->addrLocal.GetNetwork());
+ CService addrLocal = pnode->GetAddrLocal();
+ return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() &&
+ !IsLimited(addrLocal.GetNetwork());
}
// pushes our own address to a peer
-void AdvertizeLocal(CNode *pnode)
+void AdvertiseLocal(CNode *pnode)
{
if (fListen && pnode->fSuccessfullyConnected)
{
- CAddress addrLocal = GetLocalAddress(&pnode->addr);
+ CAddress addrLocal = GetLocalAddress(&pnode->addr, pnode->GetLocalServices());
// If discovery is enabled, sometimes give our peer the address it
// tells us that it sees us as in case it has a better idea of our
// address than we do.
if (IsPeerAddrLocalGood(pnode) && (!addrLocal.IsRoutable() ||
GetRand((GetnScore(addrLocal) > LOCAL_MANUAL) ? 8:2) == 0))
{
- addrLocal.SetIP(pnode->addrLocal);
+ addrLocal.SetIP(pnode->GetAddrLocal());
}
if (addrLocal.IsRoutable())
{
- pnode->PushAddress(addrLocal);
+ LogPrint(BCLog::NET, "AdvertiseLocal: advertising address %s\n", addrLocal.ToString());
+ FastRandomContext insecure_rand;
+ pnode->PushAddress(addrLocal, insecure_rand);
}
}
}
-void SetReachable(enum Network net, bool fFlag)
-{
- LOCK(cs_mapLocalHost);
- vfReachable[net] = fFlag;
- if (net == NET_IPV6 && fFlag)
- vfReachable[NET_IPV4] = true;
-}
-
// learn a new local address
bool AddLocal(const CService& addr, int nScore)
{
@@ -274,7 +220,6 @@ bool AddLocal(const CService& addr, int nScore)
info.nScore = nScore + (fAlready ? 1 : 0);
info.nPort = addr.GetPort();
}
- SetReachable(addr.GetNetwork());
}
return true;
@@ -285,10 +230,18 @@ bool AddLocal(const CNetAddr &addr, int nScore)
return AddLocal(CService(addr, GetListenPort()), nScore);
}
+bool RemoveLocal(const CService& addr)
+{
+ LOCK(cs_mapLocalHost);
+ LogPrintf("RemoveLocal(%s)\n", addr.ToString());
+ mapLocalHost.erase(addr);
+ return true;
+}
+
/** Make a particular network entirely off-limits (no automatic connects to it) */
void SetLimited(enum Network net, bool fLimited)
{
- if (net == NET_UNROUTABLE)
+ if (net == NET_UNROUTABLE || net == NET_INTERNAL)
return;
LOCK(cs_mapLocalHost);
vfLimited[net] = fLimited;
@@ -329,7 +282,7 @@ bool IsLocal(const CService& addr)
bool IsReachable(enum Network net)
{
LOCK(cs_mapLocalHost);
- return vfReachable[net] && !vfLimited[net];
+ return !vfLimited[net];
}
/** check whether a given address is in a network we can probably connect to */
@@ -339,179 +292,387 @@ bool IsReachable(const CNetAddr& addr)
return IsReachable(net);
}
-void AddressCurrentlyConnected(const CService& addr)
+
+CNode* CConnman::FindNode(const CNetAddr& ip)
{
- addrman.Connected(addr);
+ LOCK(cs_vNodes);
+ for (CNode* pnode : vNodes) {
+ if (static_cast<CNetAddr>(pnode->addr) == ip) {
+ return pnode;
+ }
+ }
+ return nullptr;
}
+CNode* CConnman::FindNode(const CSubNet& subNet)
+{
+ LOCK(cs_vNodes);
+ for (CNode* pnode : vNodes) {
+ if (subNet.Match(static_cast<CNetAddr>(pnode->addr))) {
+ return pnode;
+ }
+ }
+ return nullptr;
+}
-uint64_t CNode::nTotalBytesRecv = 0;
-uint64_t CNode::nTotalBytesSent = 0;
-CCriticalSection CNode::cs_totalBytesRecv;
-CCriticalSection CNode::cs_totalBytesSent;
-
-CNode* FindNode(const CNetAddr& ip)
+CNode* CConnman::FindNode(const std::string& addrName)
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if ((CNetAddr)pnode->addr == ip)
- return (pnode);
- return NULL;
+ for (CNode* pnode : vNodes) {
+ if (pnode->GetAddrName() == addrName) {
+ return pnode;
+ }
+ }
+ return nullptr;
}
-CNode* FindNode(const std::string& addrName)
+CNode* CConnman::FindNode(const CService& addr)
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->addrName == addrName)
- return (pnode);
- return NULL;
+ for (CNode* pnode : vNodes) {
+ if (static_cast<CService>(pnode->addr) == addr) {
+ return pnode;
+ }
+ }
+ return nullptr;
}
-CNode* FindNode(const CService& addr)
+bool CConnman::CheckIncomingNonce(uint64_t nonce)
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if ((CService)pnode->addr == addr)
- return (pnode);
- return NULL;
+ for (CNode* pnode : vNodes) {
+ if (!pnode->fSuccessfullyConnected && !pnode->fInbound && pnode->GetLocalNonce() == nonce)
+ return false;
+ }
+ return true;
}
-CNode* ConnectNode(CAddress addrConnect, const char *pszDest)
+/** Get the bind address for a socket as CAddress */
+static CAddress GetBindAddress(SOCKET sock)
{
- if (pszDest == NULL) {
+ CAddress addr_bind;
+ struct sockaddr_storage sockaddr_bind;
+ socklen_t sockaddr_bind_len = sizeof(sockaddr_bind);
+ if (sock != INVALID_SOCKET) {
+ if (!getsockname(sock, (struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) {
+ addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind);
+ } else {
+ LogPrint(BCLog::NET, "Warning: getsockname failed\n");
+ }
+ }
+ return addr_bind;
+}
+
+CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure)
+{
+ if (pszDest == nullptr) {
if (IsLocal(addrConnect))
- return NULL;
+ return nullptr;
// Look for an existing connection
- CNode* pnode = FindNode((CService)addrConnect);
+ CNode* pnode = FindNode(static_cast<CService>(addrConnect));
if (pnode)
{
- pnode->AddRef();
- return pnode;
+ LogPrintf("Failed to open new connection, already connected\n");
+ return nullptr;
}
}
/// debug print
- LogPrint("net", "trying connection %s lastseen=%.1fhrs\n",
+ LogPrint(BCLog::NET, "trying connection %s lastseen=%.1fhrs\n",
pszDest ? pszDest : addrConnect.ToString(),
pszDest ? 0.0 : (double)(GetAdjustedTime() - addrConnect.nTime)/3600.0);
+ // Resolve
+ const int default_port = Params().GetDefaultPort();
+ if (pszDest) {
+ std::vector<CService> resolved;
+ if (Lookup(pszDest, resolved, default_port, fNameLookup && !HaveNameProxy(), 256) && !resolved.empty()) {
+ addrConnect = CAddress(resolved[GetRand(resolved.size())], NODE_NONE);
+ if (!addrConnect.IsValid()) {
+ LogPrint(BCLog::NET, "Resolver returned invalid address %s for %s", addrConnect.ToString(), pszDest);
+ return nullptr;
+ }
+ // It is possible that we already have a connection to the IP/port pszDest resolved to.
+ // In that case, drop the connection that was just created, and return the existing CNode instead.
+ // Also store the name we used to connect in that CNode, so that future FindNode() calls to that
+ // name catch this early.
+ LOCK(cs_vNodes);
+ CNode* pnode = FindNode(static_cast<CService>(addrConnect));
+ if (pnode)
+ {
+ pnode->MaybeSetAddrName(std::string(pszDest));
+ LogPrintf("Failed to open new connection, already connected\n");
+ return nullptr;
+ }
+ }
+ }
+
// Connect
- SOCKET hSocket;
- bool proxyConnectionFailed = false;
- if (pszDest ? ConnectSocketByName(addrConnect, hSocket, pszDest, Params().GetDefaultPort(), nConnectTimeout, &proxyConnectionFailed) :
- ConnectSocket(addrConnect, hSocket, nConnectTimeout, &proxyConnectionFailed))
- {
- addrman.Attempt(addrConnect);
+ bool connected = false;
+ SOCKET hSocket = INVALID_SOCKET;
+ proxyType proxy;
+ if (addrConnect.IsValid()) {
+ bool proxyConnectionFailed = false;
+
+ if (GetProxy(addrConnect.GetNetwork(), proxy)) {
+ hSocket = CreateSocket(proxy.proxy);
+ if (hSocket == INVALID_SOCKET) {
+ return nullptr;
+ }
+ connected = ConnectThroughProxy(proxy, addrConnect.ToStringIP(), addrConnect.GetPort(), hSocket, nConnectTimeout, &proxyConnectionFailed);
+ } else {
+ // no proxy needed (none set for target network)
+ hSocket = CreateSocket(addrConnect);
+ if (hSocket == INVALID_SOCKET) {
+ return nullptr;
+ }
+ connected = ConnectSocketDirectly(addrConnect, hSocket, nConnectTimeout);
+ }
+ if (!proxyConnectionFailed) {
+ // If a connection to the node was attempted, and failure (if any) is not caused by a problem connecting to
+ // the proxy, mark this as an attempt.
+ addrman.Attempt(addrConnect, fCountFailure);
+ }
+ } else if (pszDest && GetNameProxy(proxy)) {
+ hSocket = CreateSocket(proxy.proxy);
+ if (hSocket == INVALID_SOCKET) {
+ return nullptr;
+ }
+ std::string host;
+ int port = default_port;
+ SplitHostPort(std::string(pszDest), port, host);
+ connected = ConnectThroughProxy(proxy, host, port, hSocket, nConnectTimeout, nullptr);
+ }
+ if (!connected) {
+ CloseSocket(hSocket);
+ return nullptr;
+ }
- // Add node
- CNode* pnode = new CNode(hSocket, addrConnect, pszDest ? pszDest : "", false);
- pnode->AddRef();
+ // Add node
+ NodeId id = GetNewNodeId();
+ uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
+ CAddress addr_bind = GetBindAddress(hSocket);
+ CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, addr_bind, pszDest ? pszDest : "", false);
+ pnode->AddRef();
- {
- LOCK(cs_vNodes);
- vNodes.push_back(pnode);
- }
+ return pnode;
+}
+
+void CConnman::DumpBanlist()
+{
+ SweepBanned(); // clean unused entries (if bantime has expired)
+
+ if (!BannedSetIsDirty())
+ return;
- pnode->nTimeConnected = GetTime();
+ int64_t nStart = GetTimeMillis();
- return pnode;
- } else if (!proxyConnectionFailed) {
- // If connecting to the node failed, and failure is not caused by a problem connecting to
- // the proxy, mark this as an attempt.
- addrman.Attempt(addrConnect);
+ CBanDB bandb;
+ banmap_t banmap;
+ GetBanned(banmap);
+ if (bandb.Write(banmap)) {
+ SetBannedSetDirty(false);
}
- return NULL;
+ LogPrint(BCLog::NET, "Flushed %d banned node ips/subnets to banlist.dat %dms\n",
+ banmap.size(), GetTimeMillis() - nStart);
}
void CNode::CloseSocketDisconnect()
{
fDisconnect = true;
+ LOCK(cs_hSocket);
if (hSocket != INVALID_SOCKET)
{
- LogPrint("net", "disconnecting peer=%d\n", id);
+ LogPrint(BCLog::NET, "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}
+}
- // in case this fails, we'll empty the recv buffer when the CNode is deleted
- TRY_LOCK(cs_vRecvMsg, lockRecv);
- if (lockRecv)
- vRecvMsg.clear();
+void CConnman::ClearBanned()
+{
+ {
+ LOCK(cs_setBanned);
+ setBanned.clear();
+ setBannedIsDirty = true;
+ }
+ DumpBanlist(); //store banlist to disk
+ if(clientInterface)
+ clientInterface->BannedListChanged();
}
-void CNode::PushVersion()
+bool CConnman::IsBanned(CNetAddr ip)
{
- int nBestHeight = g_signals.GetHeight().get_value_or(0);
+ LOCK(cs_setBanned);
+ for (const auto& it : setBanned) {
+ CSubNet subNet = it.first;
+ CBanEntry banEntry = it.second;
- /// when NTP implemented, change to just nTime = GetAdjustedTime()
- int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
- CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService("0.0.0.0",0)));
- CAddress addrMe = GetLocalAddress(&addr);
- GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
- if (fLogIPs)
- LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nBestHeight, addrMe.ToString(), addrYou.ToString(), id);
- else
- LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nBestHeight, addrMe.ToString(), id);
- PushMessage("version", PROTOCOL_VERSION, nLocalServices, nTime, addrYou, addrMe,
- nLocalHostNonce, FormatSubVersion(CLIENT_NAME, CLIENT_VERSION, std::vector<string>()), nBestHeight, true);
+ if (subNet.Match(ip) && GetTime() < banEntry.nBanUntil) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool CConnman::IsBanned(CSubNet subnet)
+{
+ LOCK(cs_setBanned);
+ banmap_t::iterator i = setBanned.find(subnet);
+ if (i != setBanned.end())
+ {
+ CBanEntry banEntry = (*i).second;
+ if (GetTime() < banEntry.nBanUntil) {
+ return true;
+ }
+ }
+ return false;
}
+void CConnman::Ban(const CNetAddr& addr, const BanReason &banReason, int64_t bantimeoffset, bool sinceUnixEpoch) {
+ CSubNet subNet(addr);
+ Ban(subNet, banReason, bantimeoffset, sinceUnixEpoch);
+}
+void CConnman::Ban(const CSubNet& subNet, const BanReason &banReason, int64_t bantimeoffset, bool sinceUnixEpoch) {
+ CBanEntry banEntry(GetTime());
+ banEntry.banReason = banReason;
+ if (bantimeoffset <= 0)
+ {
+ bantimeoffset = gArgs.GetArg("-bantime", DEFAULT_MISBEHAVING_BANTIME);
+ sinceUnixEpoch = false;
+ }
+ banEntry.nBanUntil = (sinceUnixEpoch ? 0 : GetTime() )+bantimeoffset;
+ {
+ LOCK(cs_setBanned);
+ if (setBanned[subNet].nBanUntil < banEntry.nBanUntil) {
+ setBanned[subNet] = banEntry;
+ setBannedIsDirty = true;
+ }
+ else
+ return;
+ }
+ if(clientInterface)
+ clientInterface->BannedListChanged();
+ {
+ LOCK(cs_vNodes);
+ for (CNode* pnode : vNodes) {
+ if (subNet.Match(static_cast<CNetAddr>(pnode->addr)))
+ pnode->fDisconnect = true;
+ }
+ }
+ if(banReason == BanReasonManuallyAdded)
+ DumpBanlist(); //store banlist to disk immediately if user requested ban
+}
+
+bool CConnman::Unban(const CNetAddr &addr) {
+ CSubNet subNet(addr);
+ return Unban(subNet);
+}
+bool CConnman::Unban(const CSubNet &subNet) {
+ {
+ LOCK(cs_setBanned);
+ if (!setBanned.erase(subNet))
+ return false;
+ setBannedIsDirty = true;
+ }
+ if(clientInterface)
+ clientInterface->BannedListChanged();
+ DumpBanlist(); //store banlist to disk immediately
+ return true;
+}
-std::map<CNetAddr, int64_t> CNode::setBanned;
-CCriticalSection CNode::cs_setBanned;
+void CConnman::GetBanned(banmap_t &banMap)
+{
+ LOCK(cs_setBanned);
+ // Sweep the banlist so expired bans are not returned
+ SweepBanned();
+ banMap = setBanned; //create a thread safe copy
+}
-void CNode::ClearBanned()
+void CConnman::SetBanned(const banmap_t &banMap)
{
- setBanned.clear();
+ LOCK(cs_setBanned);
+ setBanned = banMap;
+ setBannedIsDirty = true;
}
-bool CNode::IsBanned(CNetAddr ip)
+void CConnman::SweepBanned()
{
- bool fResult = false;
+ int64_t now = GetTime();
+ bool notifyUI = false;
{
LOCK(cs_setBanned);
- std::map<CNetAddr, int64_t>::iterator i = setBanned.find(ip);
- if (i != setBanned.end())
+ banmap_t::iterator it = setBanned.begin();
+ while(it != setBanned.end())
{
- int64_t t = (*i).second;
- if (GetTime() < t)
- fResult = true;
+ CSubNet subNet = (*it).first;
+ CBanEntry banEntry = (*it).second;
+ if(now > banEntry.nBanUntil)
+ {
+ setBanned.erase(it++);
+ setBannedIsDirty = true;
+ notifyUI = true;
+ LogPrint(BCLog::NET, "%s: Removed banned node ip/subnet from banlist.dat: %s\n", __func__, subNet.ToString());
+ }
+ else
+ ++it;
}
}
- return fResult;
+ // update UI
+ if(notifyUI && clientInterface) {
+ clientInterface->BannedListChanged();
+ }
}
-bool CNode::Ban(const CNetAddr &addr) {
- int64_t banTime = GetTime()+GetArg("-bantime", 60*60*24); // Default 24-hour ban
- {
- LOCK(cs_setBanned);
- if (setBanned[addr] < banTime)
- setBanned[addr] = banTime;
- }
- return true;
+bool CConnman::BannedSetIsDirty()
+{
+ LOCK(cs_setBanned);
+ return setBannedIsDirty;
}
+void CConnman::SetBannedSetDirty(bool dirty)
+{
+ LOCK(cs_setBanned); //reuse setBanned lock for the isDirty flag
+ setBannedIsDirty = dirty;
+}
-std::vector<CSubNet> CNode::vWhitelistedRange;
-CCriticalSection CNode::cs_vWhitelistedRange;
-bool CNode::IsWhitelistedRange(const CNetAddr &addr) {
- LOCK(cs_vWhitelistedRange);
- BOOST_FOREACH(const CSubNet& subnet, vWhitelistedRange) {
+bool CConnman::IsWhitelistedRange(const CNetAddr &addr) {
+ for (const CSubNet& subnet : vWhitelistedRange) {
if (subnet.Match(addr))
return true;
}
return false;
}
-void CNode::AddWhitelistedRange(const CSubNet &subnet) {
- LOCK(cs_vWhitelistedRange);
- vWhitelistedRange.push_back(subnet);
+std::string CNode::GetAddrName() const {
+ LOCK(cs_addrName);
+ return addrName;
+}
+
+void CNode::MaybeSetAddrName(const std::string& addrNameIn) {
+ LOCK(cs_addrName);
+ if (addrName.empty()) {
+ addrName = addrNameIn;
+ }
+}
+
+CService CNode::GetAddrLocal() const {
+ LOCK(cs_addrLocal);
+ return addrLocal;
+}
+
+void CNode::SetAddrLocal(const CService& addrLocalIn) {
+ LOCK(cs_addrLocal);
+ if (addrLocal.IsValid()) {
+ error("Addr local already set for node: %i. Refusing to change from %s to %s", id, addrLocal.ToString(), addrLocalIn.ToString());
+ } else {
+ addrLocal = addrLocalIn;
+ }
}
#undef X
@@ -520,16 +681,35 @@ void CNode::copyStats(CNodeStats &stats)
{
stats.nodeid = this->GetId();
X(nServices);
+ X(addr);
+ X(addrBind);
+ {
+ LOCK(cs_filter);
+ X(fRelayTxes);
+ }
X(nLastSend);
X(nLastRecv);
X(nTimeConnected);
- X(addrName);
+ X(nTimeOffset);
+ stats.addrName = GetAddrName();
X(nVersion);
- X(cleanSubVer);
+ {
+ LOCK(cs_SubVer);
+ X(cleanSubVer);
+ }
X(fInbound);
+ X(m_manual_connection);
X(nStartingHeight);
- X(nSendBytes);
- X(nRecvBytes);
+ {
+ LOCK(cs_vSend);
+ X(mapSendBytesPerMsgCmd);
+ X(nSendBytes);
+ }
+ {
+ LOCK(cs_vRecv);
+ X(mapRecvBytesPerMsgCmd);
+ X(nRecvBytes);
+ }
X(fWhitelisted);
// It is common for nodes with good ping times to suddenly become lagged,
@@ -545,22 +725,28 @@ void CNode::copyStats(CNodeStats &stats)
// Raw ping time is in microseconds, but show it to user as whole seconds (Bitcoin users should be well used to small numbers with many decimal places by now :)
stats.dPingTime = (((double)nPingUsecTime) / 1e6);
+ stats.dMinPing = (((double)nMinPingUsecTime) / 1e6);
stats.dPingWait = (((double)nPingUsecWait) / 1e6);
// Leave string empty if addrLocal invalid (not filled in yet)
- stats.addrLocal = addrLocal.IsValid() ? addrLocal.ToString() : "";
+ CService addrLocalUnlocked = GetAddrLocal();
+ stats.addrLocal = addrLocalUnlocked.IsValid() ? addrLocalUnlocked.ToString() : "";
}
#undef X
-// requires LOCK(cs_vRecvMsg)
-bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)
+bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
+ complete = false;
+ int64_t nTimeMicros = GetTimeMicros();
+ LOCK(cs_vRecv);
+ nLastRecv = nTimeMicros / 1000000;
+ nRecvBytes += nBytes;
while (nBytes > 0) {
// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
- vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion));
+ vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
CNetMessage& msg = vRecvMsg.back();
@@ -572,18 +758,61 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)
handled = msg.readData(pch, nBytes);
if (handled < 0)
- return false;
+ return false;
+
+ if (msg.in_data && msg.hdr.nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) {
+ LogPrint(BCLog::NET, "Oversized message from peer=%i, disconnecting\n", GetId());
+ return false;
+ }
pch += handled;
nBytes -= handled;
- if (msg.complete())
- msg.nTime = GetTimeMicros();
+ if (msg.complete()) {
+
+ //store received bytes per message command
+ //to prevent a memory DOS, only allow valid commands
+ mapMsgCmdSize::iterator i = mapRecvBytesPerMsgCmd.find(msg.hdr.pchCommand);
+ if (i == mapRecvBytesPerMsgCmd.end())
+ i = mapRecvBytesPerMsgCmd.find(NET_MESSAGE_COMMAND_OTHER);
+ assert(i != mapRecvBytesPerMsgCmd.end());
+ i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
+
+ msg.nTime = nTimeMicros;
+ complete = true;
+ }
}
return true;
}
+void CNode::SetSendVersion(int nVersionIn)
+{
+ // Send version may only be changed in the version message, and
+ // only one version message is allowed per session. We can therefore
+ // treat this value as const and even atomic as long as it's only used
+ // once a version message has been successfully processed. Any attempt to
+ // set this twice is an error.
+ if (nSendVersion != 0) {
+ error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn);
+ } else {
+ nSendVersion = nVersionIn;
+ }
+}
+
+int CNode::GetSendVersion() const
+{
+ // The send version should always be explicitly set to
+ // INIT_PROTO_VERSION rather than using this value until SetSendVersion
+ // has been called.
+ if (nSendVersion == 0) {
+ error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION);
+ return INIT_PROTO_VERSION;
+ }
+ return nSendVersion;
+}
+
+
int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
{
// copy data to temporary parsing buffer
@@ -601,13 +830,13 @@ int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
try {
hdrbuf >> hdr;
}
- catch (const std::exception &) {
+ catch (const std::exception&) {
return -1;
}
// reject messages larger than MAX_SIZE
if (hdr.nMessageSize > MAX_SIZE)
- return -1;
+ return -1;
// switch state to reading message data
in_data = true;
@@ -625,12 +854,21 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)
vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
}
+ hasher.Write((const unsigned char*)pch, nCopy);
memcpy(&vRecv[nDataPos], pch, nCopy);
nDataPos += nCopy;
return nCopy;
}
+const uint256& CNetMessage::GetMessageHash() const
+{
+ assert(complete());
+ if (data_hash.IsNull())
+ hasher.Finalize(data_hash.begin());
+ return data_hash;
+}
+
@@ -640,22 +878,30 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)
// requires LOCK(cs_vSend)
-void SocketSendData(CNode *pnode)
+size_t CConnman::SocketSendData(CNode *pnode) const
{
- std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
+ auto it = pnode->vSendMsg.begin();
+ size_t nSentSize = 0;
while (it != pnode->vSendMsg.end()) {
- const CSerializeData &data = *it;
+ const auto &data = *it;
assert(data.size() > pnode->nSendOffset);
- int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ int nBytes = 0;
+ {
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ break;
+ nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ }
if (nBytes > 0) {
- pnode->nLastSend = GetTime();
+ pnode->nLastSend = GetSystemTimeInSeconds();
pnode->nSendBytes += nBytes;
pnode->nSendOffset += nBytes;
- pnode->RecordBytesSent(nBytes);
+ nSentSize += nBytes;
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
+ pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -681,14 +927,231 @@ void SocketSendData(CNode *pnode)
assert(pnode->nSendSize == 0);
}
pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
+ return nSentSize;
+}
+
+struct NodeEvictionCandidate
+{
+ NodeId id;
+ int64_t nTimeConnected;
+ int64_t nMinPingUsecTime;
+ int64_t nLastBlockTime;
+ int64_t nLastTXTime;
+ bool fRelevantServices;
+ bool fRelayTxes;
+ bool fBloomFilter;
+ CAddress addr;
+ uint64_t nKeyedNetGroup;
+};
+
+static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b)
+{
+ return a.nMinPingUsecTime > b.nMinPingUsecTime;
+}
+
+static bool ReverseCompareNodeTimeConnected(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b)
+{
+ return a.nTimeConnected > b.nTimeConnected;
}
-static list<CNode*> vNodesDisconnected;
+static bool CompareNetGroupKeyed(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) {
+ return a.nKeyedNetGroup < b.nKeyedNetGroup;
+}
-void ThreadSocketHandler()
+static bool CompareNodeBlockTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b)
+{
+ // There is a fall-through here because it is common for a node to have many peers which have not yet relayed a block.
+ if (a.nLastBlockTime != b.nLastBlockTime) return a.nLastBlockTime < b.nLastBlockTime;
+ if (a.fRelevantServices != b.fRelevantServices) return b.fRelevantServices;
+ return a.nTimeConnected > b.nTimeConnected;
+}
+
+static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b)
+{
+ // There is a fall-through here because it is common for a node to have more than a few peers that have not yet relayed txn.
+ if (a.nLastTXTime != b.nLastTXTime) return a.nLastTXTime < b.nLastTXTime;
+ if (a.fRelayTxes != b.fRelayTxes) return b.fRelayTxes;
+ if (a.fBloomFilter != b.fBloomFilter) return a.fBloomFilter;
+ return a.nTimeConnected > b.nTimeConnected;
+}
+
+
+//! Sort an array by the specified comparator, then erase the last K elements.
+template<typename T, typename Comparator>
+static void EraseLastKElements(std::vector<T> &elements, Comparator comparator, size_t k)
+{
+ std::sort(elements.begin(), elements.end(), comparator);
+ size_t eraseSize = std::min(k, elements.size());
+ elements.erase(elements.end() - eraseSize, elements.end());
+}
+
+/** Try to find a connection to evict when the node is full.
+ * Extreme care must be taken to avoid opening the node to attacker
+ * triggered network partitioning.
+ * The strategy used here is to protect a small number of peers
+ * for each of several distinct characteristics which are difficult
+ * to forge. In order to partition a node the attacker must be
+ * simultaneously better at all of them than honest peers.
+ */
+bool CConnman::AttemptToEvictConnection()
+{
+ std::vector<NodeEvictionCandidate> vEvictionCandidates;
+ {
+ LOCK(cs_vNodes);
+
+ for (const CNode* node : vNodes) {
+ if (node->fWhitelisted)
+ continue;
+ if (!node->fInbound)
+ continue;
+ if (node->fDisconnect)
+ continue;
+ NodeEvictionCandidate candidate = {node->GetId(), node->nTimeConnected, node->nMinPingUsecTime,
+ node->nLastBlockTime, node->nLastTXTime,
+ HasAllDesirableServiceFlags(node->nServices),
+ node->fRelayTxes, node->pfilter != nullptr, node->addr, node->nKeyedNetGroup};
+ vEvictionCandidates.push_back(candidate);
+ }
+ }
+
+ // Protect connections with certain characteristics
+
+ // Deterministically select 4 peers to protect by netgroup.
+ // An attacker cannot predict which netgroups will be protected
+ EraseLastKElements(vEvictionCandidates, CompareNetGroupKeyed, 4);
+ // Protect the 8 nodes with the lowest minimum ping time.
+ // An attacker cannot manipulate this metric without physically moving nodes closer to the target.
+ EraseLastKElements(vEvictionCandidates, ReverseCompareNodeMinPingTime, 8);
+ // Protect 4 nodes that most recently sent us transactions.
+ // An attacker cannot manipulate this metric without performing useful work.
+ EraseLastKElements(vEvictionCandidates, CompareNodeTXTime, 4);
+ // Protect 4 nodes that most recently sent us blocks.
+ // An attacker cannot manipulate this metric without performing useful work.
+ EraseLastKElements(vEvictionCandidates, CompareNodeBlockTime, 4);
+ // Protect the half of the remaining nodes which have been connected the longest.
+ // This replicates the non-eviction implicit behavior, and precludes attacks that start later.
+ EraseLastKElements(vEvictionCandidates, ReverseCompareNodeTimeConnected, vEvictionCandidates.size() / 2);
+
+ if (vEvictionCandidates.empty()) return false;
+
+ // Identify the network group with the most connections and youngest member.
+ // (vEvictionCandidates is already sorted by reverse connect time)
+ uint64_t naMostConnections;
+ unsigned int nMostConnections = 0;
+ int64_t nMostConnectionsTime = 0;
+ std::map<uint64_t, std::vector<NodeEvictionCandidate> > mapNetGroupNodes;
+ for (const NodeEvictionCandidate &node : vEvictionCandidates) {
+ std::vector<NodeEvictionCandidate> &group = mapNetGroupNodes[node.nKeyedNetGroup];
+ group.push_back(node);
+ int64_t grouptime = group[0].nTimeConnected;
+
+ if (group.size() > nMostConnections || (group.size() == nMostConnections && grouptime > nMostConnectionsTime)) {
+ nMostConnections = group.size();
+ nMostConnectionsTime = grouptime;
+ naMostConnections = node.nKeyedNetGroup;
+ }
+ }
+
+ // Reduce to the network group with the most connections
+ vEvictionCandidates = std::move(mapNetGroupNodes[naMostConnections]);
+
+ // Disconnect from the network group with the most connections
+ NodeId evicted = vEvictionCandidates.front().id;
+ LOCK(cs_vNodes);
+ for (CNode* pnode : vNodes) {
+ if (pnode->GetId() == evicted) {
+ pnode->fDisconnect = true;
+ return true;
+ }
+ }
+ return false;
+}
+
+void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
+ struct sockaddr_storage sockaddr;
+ socklen_t len = sizeof(sockaddr);
+ SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len);
+ CAddress addr;
+ int nInbound = 0;
+ int nMaxInbound = nMaxConnections - (nMaxOutbound + nMaxFeeler);
+
+ if (hSocket != INVALID_SOCKET) {
+ if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) {
+ LogPrintf("Warning: Unknown socket family\n");
+ }
+ }
+
+ bool whitelisted = hListenSocket.whitelisted || IsWhitelistedRange(addr);
+ {
+ LOCK(cs_vNodes);
+ for (const CNode* pnode : vNodes) {
+ if (pnode->fInbound) nInbound++;
+ }
+ }
+
+ if (hSocket == INVALID_SOCKET)
+ {
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK)
+ LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr));
+ return;
+ }
+
+ if (!fNetworkActive) {
+ LogPrintf("connection from %s dropped: not accepting new connections\n", addr.ToString());
+ CloseSocket(hSocket);
+ return;
+ }
+
+ if (!IsSelectableSocket(hSocket))
+ {
+ LogPrintf("connection from %s dropped: non-selectable socket\n", addr.ToString());
+ CloseSocket(hSocket);
+ return;
+ }
+
+ // According to the internet TCP_NODELAY is not carried into accepted sockets
+ // on all platforms. Set it again here just to be sure.
+ SetSocketNoDelay(hSocket);
+
+ if (IsBanned(addr) && !whitelisted)
+ {
+ LogPrint(BCLog::NET, "connection from %s dropped (banned)\n", addr.ToString());
+ CloseSocket(hSocket);
+ return;
+ }
+
+ if (nInbound >= nMaxInbound)
+ {
+ if (!AttemptToEvictConnection()) {
+ // No connection to evict, disconnect the new connection
+ LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n");
+ CloseSocket(hSocket);
+ return;
+ }
+ }
+
+ NodeId id = GetNewNodeId();
+ uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
+ CAddress addr_bind = GetBindAddress(hSocket);
+
+ CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true);
+ pnode->AddRef();
+ pnode->fWhitelisted = whitelisted;
+ m_msgproc->InitializeNode(pnode);
+
+ LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString());
+
+ {
+ LOCK(cs_vNodes);
+ vNodes.push_back(pnode);
+ }
+}
+
+void CConnman::ThreadSocketHandler()
{
unsigned int nPrevNodeCount = 0;
- while (true)
+ while (!interruptNet)
{
//
// Disconnect nodes
@@ -696,11 +1159,10 @@ void ThreadSocketHandler()
{
LOCK(cs_vNodes);
// Disconnect unused nodes
- vector<CNode*> vNodesCopy = vNodes;
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
+ std::vector<CNode*> vNodesCopy = vNodes;
+ for (CNode* pnode : vNodesCopy)
{
- if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
+ if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -712,45 +1174,44 @@ void ThreadSocketHandler()
pnode->CloseSocketDisconnect();
// hold in disconnected pool until all refs are released
- if (pnode->fNetworkNode || pnode->fInbound)
- pnode->Release();
+ pnode->Release();
vNodesDisconnected.push_back(pnode);
}
}
}
{
// Delete disconnected nodes
- list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
- BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy)
+ std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
+ for (CNode* pnode : vNodesDisconnectedCopy)
{
// wait until threads are done using it
- if (pnode->GetRefCount() <= 0)
- {
+ if (pnode->GetRefCount() <= 0) {
bool fDelete = false;
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- TRY_LOCK(pnode->cs_inventory, lockInv);
- if (lockInv)
- fDelete = true;
+ TRY_LOCK(pnode->cs_inventory, lockInv);
+ if (lockInv) {
+ TRY_LOCK(pnode->cs_vSend, lockSend);
+ if (lockSend) {
+ fDelete = true;
}
}
}
- if (fDelete)
- {
+ if (fDelete) {
vNodesDisconnected.remove(pnode);
- delete pnode;
+ DeleteNode(pnode);
}
}
}
}
- if(vNodes.size() != nPrevNodeCount) {
- nPrevNodeCount = vNodes.size();
- uiInterface.NotifyNumConnectionsChanged(nPrevNodeCount);
+ size_t vNodesSize;
+ {
+ LOCK(cs_vNodes);
+ vNodesSize = vNodes.size();
+ }
+ if(vNodesSize != nPrevNodeCount) {
+ nPrevNodeCount = vNodesSize;
+ if(clientInterface)
+ clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
}
//
@@ -769,57 +1230,56 @@ void ThreadSocketHandler()
SOCKET hSocketMax = 0;
bool have_fds = false;
- BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) {
+ for (const ListenSocket& hListenSocket : vhListenSocket) {
FD_SET(hListenSocket.socket, &fdsetRecv);
- hSocketMax = max(hSocketMax, hListenSocket.socket);
+ hSocketMax = std::max(hSocketMax, hListenSocket.socket);
have_fds = true;
}
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
+ for (CNode* pnode : vNodes)
{
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- FD_SET(pnode->hSocket, &fdsetError);
- hSocketMax = max(hSocketMax, pnode->hSocket);
- have_fds = true;
-
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is no (complete) message in the receive buffer,
- // or there is space left in the buffer, select() for receiving data.
- // * (if neither of the above applies, there is certainly one message
- // in the receiver buffer ready to be processed).
- // Together, that means that at least one of the following is always possible,
- // so we don't deadlock:
- // * We send some data.
- // * We wait for data to be received (and disconnect after timeout).
- // * We process a message in the buffer (message handler thread).
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
+
+ bool select_recv = !pnode->fPauseRecv;
+ bool select_send;
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend && !pnode->vSendMsg.empty()) {
- FD_SET(pnode->hSocket, &fdsetSend);
- continue;
- }
+ LOCK(pnode->cs_vSend);
+ select_send = !pnode->vSendMsg.empty();
}
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv && (
- pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
- pnode->GetTotalRecvSize() <= ReceiveFloodSize()))
- FD_SET(pnode->hSocket, &fdsetRecv);
+
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+
+ FD_SET(pnode->hSocket, &fdsetError);
+ hSocketMax = std::max(hSocketMax, pnode->hSocket);
+ have_fds = true;
+
+ if (select_send) {
+ FD_SET(pnode->hSocket, &fdsetSend);
+ continue;
+ }
+ if (select_recv) {
+ FD_SET(pnode->hSocket, &fdsetRecv);
}
}
}
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
- boost::this_thread::interruption_point();
+ if (interruptNet)
+ return;
if (nSelect == SOCKET_ERROR)
{
@@ -832,117 +1292,101 @@ void ThreadSocketHandler()
}
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
- MilliSleep(timeout.tv_usec/1000);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
+ return;
}
//
// Accept new connections
//
- BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket)
+ for (const ListenSocket& hListenSocket : vhListenSocket)
{
if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
{
- struct sockaddr_storage sockaddr;
- socklen_t len = sizeof(sockaddr);
- SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len);
- CAddress addr;
- int nInbound = 0;
-
- if (hSocket != INVALID_SOCKET)
- if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr))
- LogPrintf("Warning: Unknown socket family\n");
-
- bool whitelisted = hListenSocket.whitelisted || CNode::IsWhitelistedRange(addr);
- {
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->fInbound)
- nInbound++;
- }
-
- if (hSocket == INVALID_SOCKET)
- {
- int nErr = WSAGetLastError();
- if (nErr != WSAEWOULDBLOCK)
- LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr));
- }
- else if (nInbound >= nMaxConnections - MAX_OUTBOUND_CONNECTIONS)
- {
- CloseSocket(hSocket);
- }
- else if (CNode::IsBanned(addr) && !whitelisted)
- {
- LogPrintf("connection from %s dropped (banned)\n", addr.ToString());
- CloseSocket(hSocket);
- }
- else
- {
- CNode* pnode = new CNode(hSocket, addr, "", true);
- pnode->AddRef();
- pnode->fWhitelisted = whitelisted;
-
- {
- LOCK(cs_vNodes);
- vNodes.push_back(pnode);
- }
- }
+ AcceptConnection(hListenSocket);
}
}
//
// Service each socket
//
- vector<CNode*> vNodesCopy;
+ std::vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
+ for (CNode* pnode : vNodesCopy)
pnode->AddRef();
}
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
+ for (CNode* pnode : vNodesCopy)
{
- boost::this_thread::interruption_point();
+ if (interruptNet)
+ return;
//
// Receive
//
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
+ bool recvSet = false;
+ bool sendSet = false;
+ bool errorSet = false;
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+ recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
+ sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
+ errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
+ }
+ if (recvSet || errorSet)
+ {
+ // typical socket buffer is 8K-64K
+ char pchBuf[0x10000];
+ int nBytes = 0;
{
- {
- // typical socket buffer is 8K-64K
- char pchBuf[0x10000];
- int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
- if (nBytes > 0)
- {
- if (!pnode->ReceiveMsgBytes(pchBuf, nBytes))
- pnode->CloseSocketDisconnect();
- pnode->nLastRecv = GetTime();
- pnode->nRecvBytes += nBytes;
- pnode->RecordBytesRecv(nBytes);
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+ nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
+ }
+ if (nBytes > 0)
+ {
+ bool notify = false;
+ if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
+ pnode->CloseSocketDisconnect();
+ RecordBytesRecv(nBytes);
+ if (notify) {
+ size_t nSizeAdded = 0;
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
- else if (nBytes == 0)
{
- // socket closed gracefully
- if (!pnode->fDisconnect)
- LogPrint("net", "socket closed\n");
- pnode->CloseSocketDisconnect();
- }
- else if (nBytes < 0)
- {
- // error
- int nErr = WSAGetLastError();
- if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
- {
- if (!pnode->fDisconnect)
- LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
- pnode->CloseSocketDisconnect();
- }
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
+ WakeMessageHandler();
+ }
+ }
+ else if (nBytes == 0)
+ {
+ // socket closed gracefully
+ if (!pnode->fDisconnect) {
+ LogPrint(BCLog::NET, "socket closed\n");
+ }
+ pnode->CloseSocketDisconnect();
+ }
+ else if (nBytes < 0)
+ {
+ // error
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ {
+ if (!pnode->fDisconnect)
+ LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
+ pnode->CloseSocketDisconnect();
}
}
}
@@ -950,24 +1394,24 @@ void ThreadSocketHandler()
//
// Send
//
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- if (FD_ISSET(pnode->hSocket, &fdsetSend))
+ if (sendSet)
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- SocketSendData(pnode);
+ LOCK(pnode->cs_vSend);
+ size_t nBytes = SocketSendData(pnode);
+ if (nBytes) {
+ RecordBytesSent(nBytes);
+ }
}
//
// Inactivity checking
//
- int64_t nTime = GetTime();
+ int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
{
- LogPrint("net", "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->id);
+ LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId());
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
@@ -985,18 +1429,29 @@ void ThreadSocketHandler()
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
}
+ else if (!pnode->fSuccessfullyConnected)
+ {
+ LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId());
+ pnode->fDisconnect = true;
+ }
}
}
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
+ for (CNode* pnode : vNodesCopy)
pnode->Release();
}
}
}
-
-
+void CConnman::WakeMessageHandler()
+{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = true;
+ }
+ condMsgProc.notify_one();
+}
@@ -1004,21 +1459,27 @@ void ThreadSocketHandler()
#ifdef USE_UPNP
+static CThreadInterrupt g_upnp_interrupt;
+static std::thread g_upnp_thread;
void ThreadMapPort()
{
std::string port = strprintf("%u", GetListenPort());
- const char * multicastif = 0;
- const char * minissdpdpath = 0;
- struct UPNPDev * devlist = 0;
+ const char * multicastif = nullptr;
+ const char * minissdpdpath = nullptr;
+ struct UPNPDev * devlist = nullptr;
char lanaddr[64];
#ifndef UPNPDISCOVER_SUCCESS
/* miniupnpc 1.5 */
devlist = upnpDiscover(2000, multicastif, minissdpdpath, 0);
-#else
+#elif MINIUPNPC_API_VERSION < 14
/* miniupnpc 1.6 */
int error = 0;
devlist = upnpDiscover(2000, multicastif, minissdpdpath, 0, 0, &error);
+#else
+ /* miniupnpc 1.9.20150730 */
+ int error = 0;
+ devlist = upnpDiscover(2000, multicastif, minissdpdpath, 0, 0, 2, &error);
#endif
struct UPNPUrls urls;
@@ -1037,76 +1498,83 @@ void ThreadMapPort()
{
if(externalIPAddress[0])
{
- LogPrintf("UPnP: ExternalIPAddress = %s\n", externalIPAddress);
- AddLocal(CNetAddr(externalIPAddress), LOCAL_UPNP);
+ CNetAddr resolved;
+ if(LookupHost(externalIPAddress, resolved, false)) {
+ LogPrintf("UPnP: ExternalIPAddress = %s\n", resolved.ToString().c_str());
+ AddLocal(resolved, LOCAL_UPNP);
+ }
}
else
LogPrintf("UPnP: GetExternalIPAddress failed.\n");
}
}
- string strDesc = "Bitcoin " + FormatFullVersion();
+ std::string strDesc = "Bitcoin " + FormatFullVersion();
- try {
- while (true) {
+ do {
#ifndef UPNPDISCOVER_SUCCESS
- /* miniupnpc 1.5 */
- r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
- port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0);
+ /* miniupnpc 1.5 */
+ r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
+ port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0);
#else
- /* miniupnpc 1.6 */
- r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
- port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0");
+ /* miniupnpc 1.6 */
+ r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
+ port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0");
#endif
- if(r!=UPNPCOMMAND_SUCCESS)
- LogPrintf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n",
- port, port, lanaddr, r, strupnperror(r));
- else
- LogPrintf("UPnP Port Mapping successful.\n");;
-
- MilliSleep(20*60*1000); // Refresh every 20 minutes
- }
- }
- catch (boost::thread_interrupted)
- {
- r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0);
- LogPrintf("UPNP_DeletePortMapping() returned : %d\n", r);
- freeUPNPDevlist(devlist); devlist = 0;
- FreeUPNPUrls(&urls);
- throw;
+ if(r!=UPNPCOMMAND_SUCCESS)
+ LogPrintf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n",
+ port, port, lanaddr, r, strupnperror(r));
+ else
+ LogPrintf("UPnP Port Mapping successful.\n");
}
+ while(g_upnp_interrupt.sleep_for(std::chrono::minutes(20)));
+
+ r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0);
+ LogPrintf("UPNP_DeletePortMapping() returned: %d\n", r);
+ freeUPNPDevlist(devlist); devlist = nullptr;
+ FreeUPNPUrls(&urls);
} else {
LogPrintf("No valid UPnP IGDs found\n");
- freeUPNPDevlist(devlist); devlist = 0;
+ freeUPNPDevlist(devlist); devlist = nullptr;
if (r != 0)
FreeUPNPUrls(&urls);
}
}
-void MapPort(bool fUseUPnP)
+void StartMapPort()
{
- static boost::thread* upnp_thread = NULL;
+ if (!g_upnp_thread.joinable()) {
+ assert(!g_upnp_interrupt);
+ g_upnp_thread = std::thread((std::bind(&TraceThread<void (*)()>, "upnp", &ThreadMapPort)));
+ }
+}
- if (fUseUPnP)
- {
- if (upnp_thread) {
- upnp_thread->interrupt();
- upnp_thread->join();
- delete upnp_thread;
- }
- upnp_thread = new boost::thread(boost::bind(&TraceThread<void (*)()>, "upnp", &ThreadMapPort));
+void InterruptMapPort()
+{
+ if(g_upnp_thread.joinable()) {
+ g_upnp_interrupt();
}
- else if (upnp_thread) {
- upnp_thread->interrupt();
- upnp_thread->join();
- delete upnp_thread;
- upnp_thread = NULL;
+}
+
+void StopMapPort()
+{
+ if(g_upnp_thread.joinable()) {
+ g_upnp_thread.join();
+ g_upnp_interrupt.reset();
}
}
#else
-void MapPort(bool)
+void StartMapPort()
+{
+ // Intentionally left blank.
+}
+void InterruptMapPort()
+{
+ // Intentionally left blank.
+}
+void StopMapPort()
{
// Intentionally left blank.
}
@@ -1117,43 +1585,64 @@ void MapPort(bool)
-void ThreadDNSAddressSeed()
+void CConnman::ThreadDNSAddressSeed()
{
// goal: only query DNS seeds if address need is acute
+ // Avoiding DNS seeds when we don't need them improves user privacy by
+ // creating fewer identifying DNS requests, reduces trust by giving seeds
+ // less influence on the network topology, and reduces traffic to the seeds.
if ((addrman.size() > 0) &&
- (!GetBoolArg("-forcednsseed", false))) {
- MilliSleep(11 * 1000);
+ (!gArgs.GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
+ if (!interruptNet.sleep_for(std::chrono::seconds(11)))
+ return;
LOCK(cs_vNodes);
- if (vNodes.size() >= 2) {
+ int nRelevant = 0;
+ for (auto pnode : vNodes) {
+ nRelevant += pnode->fSuccessfullyConnected && !pnode->fFeeler && !pnode->fOneShot && !pnode->m_manual_connection && !pnode->fInbound;
+ }
+ if (nRelevant >= 2) {
LogPrintf("P2P peers available. Skipped DNS seeding.\n");
return;
}
}
- const vector<CDNSSeedData> &vSeeds = Params().DNSSeeds();
+ const std::vector<std::string> &vSeeds = Params().DNSSeeds();
int found = 0;
LogPrintf("Loading addresses from DNS seeds (could take a while)\n");
- BOOST_FOREACH(const CDNSSeedData &seed, vSeeds) {
+ for (const std::string &seed : vSeeds) {
+ if (interruptNet) {
+ return;
+ }
if (HaveNameProxy()) {
- AddOneShot(seed.host);
+ AddOneShot(seed);
} else {
- vector<CNetAddr> vIPs;
- vector<CAddress> vAdd;
- if (LookupHost(seed.host.c_str(), vIPs))
+ std::vector<CNetAddr> vIPs;
+ std::vector<CAddress> vAdd;
+ ServiceFlags requiredServiceBits = GetDesirableServiceFlags(NODE_NONE);
+ std::string host = strprintf("x%x.%s", requiredServiceBits, seed);
+ CNetAddr resolveSource;
+ if (!resolveSource.SetInternal(host)) {
+ continue;
+ }
+ if (LookupHost(host.c_str(), vIPs, 0, true))
{
- BOOST_FOREACH(CNetAddr& ip, vIPs)
+ for (const CNetAddr& ip : vIPs)
{
int nOneDay = 24*3600;
- CAddress addr = CAddress(CService(ip, Params().GetDefaultPort()));
+ CAddress addr = CAddress(CService(ip, Params().GetDefaultPort()), requiredServiceBits);
addr.nTime = GetTime() - 3*nOneDay - GetRand(4*nOneDay); // use a random age between 3 and 7 days old
vAdd.push_back(addr);
found++;
}
+ addrman.Add(vAdd, resolveSource);
+ } else {
+ // We now avoid directly using results from DNS Seeds which do not support service bit filtering,
+ // instead using them as a oneshot to get nodes with our desired service bits.
+ AddOneShot(seed);
}
- addrman.Add(vAdd, CNetAddr(seed.name, true));
}
}
@@ -1171,20 +1660,26 @@ void ThreadDNSAddressSeed()
-void DumpAddresses()
+void CConnman::DumpAddresses()
{
int64_t nStart = GetTimeMillis();
CAddrDB adb;
adb.Write(addrman);
- LogPrint("net", "Flushed %d addresses to peers.dat %dms\n",
+ LogPrint(BCLog::NET, "Flushed %d addresses to peers.dat %dms\n",
addrman.size(), GetTimeMillis() - nStart);
}
-void static ProcessOneShot()
+void CConnman::DumpData()
+{
+ DumpAddresses();
+ DumpBanlist();
+}
+
+void CConnman::ProcessOneShot()
{
- string strDest;
+ std::string strDest;
{
LOCK(cs_vOneShots);
if (vOneShots.empty())
@@ -1195,49 +1690,88 @@ void static ProcessOneShot()
CAddress addr;
CSemaphoreGrant grant(*semOutbound, true);
if (grant) {
- if (!OpenNetworkConnection(addr, &grant, strDest.c_str(), true))
- AddOneShot(strDest);
+ OpenNetworkConnection(addr, false, &grant, strDest.c_str(), true);
}
}
-void ThreadOpenConnections()
+bool CConnman::GetTryNewOutboundPeer()
+{
+ return m_try_another_outbound_peer;
+}
+
+void CConnman::SetTryNewOutboundPeer(bool flag)
+{
+ m_try_another_outbound_peer = flag;
+ LogPrint(BCLog::NET, "net: setting try another outbound peer=%s\n", flag ? "true" : "false");
+}
+
+// Return the number of peers we have over our outbound connection limit
+// Exclude peers that are marked for disconnect, or are going to be
+// disconnected soon (eg one-shots and feelers)
+// Also exclude peers that haven't finished initial connection handshake yet
+// (so that we don't decide we're over our desired connection limit, and then
+// evict some peer that has finished the handshake)
+int CConnman::GetExtraOutboundCount()
+{
+ int nOutbound = 0;
+ {
+ LOCK(cs_vNodes);
+ for (CNode* pnode : vNodes) {
+ if (!pnode->fInbound && !pnode->m_manual_connection && !pnode->fFeeler && !pnode->fDisconnect && !pnode->fOneShot && pnode->fSuccessfullyConnected) {
+ ++nOutbound;
+ }
+ }
+ }
+ return std::max(nOutbound - nMaxOutbound, 0);
+}
+
+void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
{
// Connect to specific addresses
- if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0)
+ if (!connect.empty())
{
for (int64_t nLoop = 0;; nLoop++)
{
ProcessOneShot();
- BOOST_FOREACH(string strAddr, mapMultiArgs["-connect"])
+ for (const std::string& strAddr : connect)
{
- CAddress addr;
- OpenNetworkConnection(addr, NULL, strAddr.c_str());
+ CAddress addr(CService(), NODE_NONE);
+ OpenNetworkConnection(addr, false, nullptr, strAddr.c_str(), false, false, true);
for (int i = 0; i < 10 && i < nLoop; i++)
{
- MilliSleep(500);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
+ return;
}
}
- MilliSleep(500);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
+ return;
}
}
// Initiate network connections
int64_t nStart = GetTime();
- while (true)
+
+ // Minimum time before next feeler connection (in microseconds).
+ int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL);
+ while (!interruptNet)
{
ProcessOneShot();
- MilliSleep(500);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
+ return;
CSemaphoreGrant grant(*semOutbound);
- boost::this_thread::interruption_point();
+ if (interruptNet)
+ return;
// Add seed nodes if DNS seeds are all down (an infrastructure attack?).
if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
static bool done = false;
if (!done) {
LogPrintf("Adding fixed seed nodes as DNS doesn't seem to be available.\n");
- addrman.Add(Params().FixedSeeds(), CNetAddr("127.0.0.1"));
+ CNetAddr local;
+ local.SetInternal("fixedseeds");
+ addrman.Add(convertSeed6(Params().FixedSeeds()), local);
done = true;
}
}
@@ -1250,24 +1784,51 @@ void ThreadOpenConnections()
// Only connect out to one peer per network group (/16 for IPv4).
// Do this here so we don't have to critsect vNodes inside mapAddresses critsect.
int nOutbound = 0;
- set<vector<unsigned char> > setConnected;
+ std::set<std::vector<unsigned char> > setConnected;
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes) {
- if (!pnode->fInbound) {
+ for (CNode* pnode : vNodes) {
+ if (!pnode->fInbound && !pnode->m_manual_connection) {
+ // Netgroups for inbound and addnode peers are not excluded because our goal here
+ // is to not use multiple of our limited outbound slots on a single netgroup
+ // but inbound and addnode peers do not use our outbound slots. Inbound peers
+ // also have the added issue that they're attacker controlled and could be used
+ // to prevent us from connecting to particular hosts if we used them here.
setConnected.insert(pnode->addr.GetGroup());
nOutbound++;
}
}
}
- int64_t nANow = GetAdjustedTime();
+ // Feeler Connections
+ //
+ // Design goals:
+ // * Increase the number of connectable addresses in the tried table.
+ //
+ // Method:
+ // * Choose a random address from new and attempt to connect to it if we can connect
+ // successfully it is added to tried.
+ // * Start attempting feeler connections only after node finishes making outbound
+ // connections.
+ // * Only make a feeler connection once every few minutes.
+ //
+ bool fFeeler = false;
+ if (nOutbound >= nMaxOutbound && !GetTryNewOutboundPeer()) {
+ int64_t nTime = GetTimeMicros(); // The current time right now (in microseconds).
+ if (nTime > nNextFeeler) {
+ nNextFeeler = PoissonNextSend(nTime, FEELER_INTERVAL);
+ fFeeler = true;
+ } else {
+ continue;
+ }
+ }
+
+ int64_t nANow = GetAdjustedTime();
int nTries = 0;
- while (true)
+ while (!interruptNet)
{
- // use an nUnkBias between 10 (no outgoing connections) and 90 (8 outgoing connections)
- CAddress addr = addrman.Select(10 + min(nOutbound,8)*10);
+ CAddrInfo addr = addrman.Select(fFeeler);
// if we selected an invalid address, restart
if (!addr.IsValid() || setConnected.count(addr.GetGroup()) || IsLocal(addr))
@@ -1287,6 +1848,15 @@ void ThreadOpenConnections()
if (nANow - addr.nLastTry < 600 && nTries < 30)
continue;
+ // for non-feelers, require all the services we'll want,
+ // for feelers, only require they be a full node (only because most
+ // SPV clients don't have a good address DB available)
+ if (!fFeeler && !HasAllDesirableServiceFlags(addr.nServices)) {
+ continue;
+ } else if (fFeeler && !MayHaveUsefulAddressDB(addr.nServices)) {
+ continue;
+ }
+
// do not allow non-default ports, unless after 50 invalid addresses selected already
if (addr.GetPort() != Params().GetDefaultPort() && nTries < 50)
continue;
@@ -1295,175 +1865,186 @@ void ThreadOpenConnections()
break;
}
- if (addrConnect.IsValid())
- OpenNetworkConnection(addrConnect, &grant);
+ if (addrConnect.IsValid()) {
+
+ if (fFeeler) {
+ // Add small amount of random noise before connection to avoid synchronization.
+ int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
+ return;
+ LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToString());
+ }
+
+ OpenNetworkConnection(addrConnect, (int)setConnected.size() >= std::min(nMaxConnections - 1, 2), &grant, nullptr, false, fFeeler);
+ }
}
}
-void ThreadOpenAddedConnections()
+std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo()
{
+ std::vector<AddedNodeInfo> ret;
+
+ std::list<std::string> lAddresses(0);
{
LOCK(cs_vAddedNodes);
- vAddedNodes = mapMultiArgs["-addnode"];
+ ret.reserve(vAddedNodes.size());
+ std::copy(vAddedNodes.cbegin(), vAddedNodes.cend(), std::back_inserter(lAddresses));
}
- if (HaveNameProxy()) {
- while(true) {
- list<string> lAddresses(0);
- {
- LOCK(cs_vAddedNodes);
- BOOST_FOREACH(string& strAddNode, vAddedNodes)
- lAddresses.push_back(strAddNode);
+
+ // Build a map of all already connected addresses (by IP:port and by name) to inbound/outbound and resolved CService
+ std::map<CService, bool> mapConnected;
+ std::map<std::string, std::pair<bool, CService>> mapConnectedByName;
+ {
+ LOCK(cs_vNodes);
+ for (const CNode* pnode : vNodes) {
+ if (pnode->addr.IsValid()) {
+ mapConnected[pnode->addr] = pnode->fInbound;
}
- BOOST_FOREACH(string& strAddNode, lAddresses) {
- CAddress addr;
- CSemaphoreGrant grant(*semOutbound);
- OpenNetworkConnection(addr, &grant, strAddNode.c_str());
- MilliSleep(500);
+ std::string addrName = pnode->GetAddrName();
+ if (!addrName.empty()) {
+ mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->fInbound, static_cast<const CService&>(pnode->addr));
}
- MilliSleep(120000); // Retry every 2 minutes
}
}
- for (unsigned int i = 0; true; i++)
- {
- list<string> lAddresses(0);
- {
- LOCK(cs_vAddedNodes);
- BOOST_FOREACH(string& strAddNode, vAddedNodes)
- lAddresses.push_back(strAddNode);
+ for (const std::string& strAddNode : lAddresses) {
+ CService service(LookupNumeric(strAddNode.c_str(), Params().GetDefaultPort()));
+ if (service.IsValid()) {
+ // strAddNode is an IP:port
+ auto it = mapConnected.find(service);
+ if (it != mapConnected.end()) {
+ ret.push_back(AddedNodeInfo{strAddNode, service, true, it->second});
+ } else {
+ ret.push_back(AddedNodeInfo{strAddNode, CService(), false, false});
+ }
+ } else {
+ // strAddNode is a name
+ auto it = mapConnectedByName.find(strAddNode);
+ if (it != mapConnectedByName.end()) {
+ ret.push_back(AddedNodeInfo{strAddNode, it->second.second, true, it->second.first});
+ } else {
+ ret.push_back(AddedNodeInfo{strAddNode, CService(), false, false});
+ }
}
+ }
- list<vector<CService> > lservAddressesToAdd(0);
- BOOST_FOREACH(string& strAddNode, lAddresses)
- {
- vector<CService> vservNode(0);
- if(Lookup(strAddNode.c_str(), vservNode, Params().GetDefaultPort(), fNameLookup, 0))
- {
- lservAddressesToAdd.push_back(vservNode);
- {
- LOCK(cs_setservAddNodeAddresses);
- BOOST_FOREACH(CService& serv, vservNode)
- setservAddNodeAddresses.insert(serv);
+ return ret;
+}
+
+void CConnman::ThreadOpenAddedConnections()
+{
+ while (true)
+ {
+ CSemaphoreGrant grant(*semAddnode);
+ std::vector<AddedNodeInfo> vInfo = GetAddedNodeInfo();
+ bool tried = false;
+ for (const AddedNodeInfo& info : vInfo) {
+ if (!info.fConnected) {
+ if (!grant.TryAcquire()) {
+ // If we've used up our semaphore and need a new one, lets not wait here since while we are waiting
+ // the addednodeinfo state might change.
+ break;
}
+ tried = true;
+ CAddress addr(CService(), NODE_NONE);
+ OpenNetworkConnection(addr, false, &grant, info.strAddedNode.c_str(), false, false, true);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
+ return;
}
}
- // Attempt to connect to each IP for each addnode entry until at least one is successful per addnode entry
- // (keeping in mind that addnode entries can have many IPs if fNameLookup)
- {
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- for (list<vector<CService> >::iterator it = lservAddressesToAdd.begin(); it != lservAddressesToAdd.end(); it++)
- BOOST_FOREACH(CService& addrNode, *(it))
- if (pnode->addr == addrNode)
- {
- it = lservAddressesToAdd.erase(it);
- it--;
- break;
- }
- }
- BOOST_FOREACH(vector<CService>& vserv, lservAddressesToAdd)
- {
- CSemaphoreGrant grant(*semOutbound);
- OpenNetworkConnection(CAddress(vserv[i % vserv.size()]), &grant);
- MilliSleep(500);
- }
- MilliSleep(120000); // Retry every 2 minutes
+ // Retry every 60 seconds if a connection was attempted, otherwise two seconds
+ if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
+ return;
}
}
// if successful, this moves the passed grant to the constructed node
-bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot)
+void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler, bool manual_connection)
{
//
// Initiate outbound network connection
//
- boost::this_thread::interruption_point();
+ if (interruptNet) {
+ return;
+ }
+ if (!fNetworkActive) {
+ return;
+ }
if (!pszDest) {
if (IsLocal(addrConnect) ||
- FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect) ||
+ FindNode(static_cast<CNetAddr>(addrConnect)) || IsBanned(addrConnect) ||
FindNode(addrConnect.ToStringIPPort()))
- return false;
- } else if (FindNode(pszDest))
- return false;
+ return;
+ } else if (FindNode(std::string(pszDest)))
+ return;
- CNode* pnode = ConnectNode(addrConnect, pszDest);
- boost::this_thread::interruption_point();
+ CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure);
if (!pnode)
- return false;
+ return;
if (grantOutbound)
grantOutbound->MoveTo(pnode->grantOutbound);
- pnode->fNetworkNode = true;
if (fOneShot)
pnode->fOneShot = true;
+ if (fFeeler)
+ pnode->fFeeler = true;
+ if (manual_connection)
+ pnode->m_manual_connection = true;
- return true;
+ m_msgproc->InitializeNode(pnode);
+ {
+ LOCK(cs_vNodes);
+ vNodes.push_back(pnode);
+ }
}
-
-void ThreadMessageHandler()
+void CConnman::ThreadMessageHandler()
{
- SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL);
- while (true)
+ while (!flagInterruptMsgProc)
{
- vector<CNode*> vNodesCopy;
+ std::vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
- BOOST_FOREACH(CNode* pnode, vNodesCopy) {
+ for (CNode* pnode : vNodesCopy) {
pnode->AddRef();
}
}
- // Poll the connected nodes for messages
- CNode* pnodeTrickle = NULL;
- if (!vNodesCopy.empty())
- pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
-
- bool fSleep = true;
+ bool fMoreWork = false;
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
+ for (CNode* pnode : vNodesCopy)
{
if (pnode->fDisconnect)
continue;
// Receive messages
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- if (!g_signals.ProcessMessages(pnode))
- pnode->CloseSocketDisconnect();
-
- if (pnode->nSendSize < SendBufferSize())
- {
- if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
- {
- fSleep = false;
- }
- }
- }
- }
- boost::this_thread::interruption_point();
-
+ bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
+ if (flagInterruptMsgProc)
+ return;
// Send messages
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- g_signals.SendMessages(pnode, pnode == pnodeTrickle);
+ LOCK(pnode->cs_sendProcessing);
+ m_msgproc->SendMessages(pnode, flagInterruptMsgProc);
}
- boost::this_thread::interruption_point();
+
+ if (flagInterruptMsgProc)
+ return;
}
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
+ for (CNode* pnode : vNodesCopy)
pnode->Release();
}
- if (fSleep)
- MilliSleep(100);
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ if (!fMoreWork) {
+ condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
+ }
+ fMsgProcWake = false;
}
}
@@ -1472,7 +2053,7 @@ void ThreadMessageHandler()
-bool BindListenPort(const CService &addrBind, string& strError, bool fWhitelisted)
+bool CConnman::BindListenPort(const CService &addrBind, std::string& strError, bool fWhitelisted)
{
strError = "";
int nOne = 1;
@@ -1487,31 +2068,21 @@ bool BindListenPort(const CService &addrBind, string& strError, bool fWhiteliste
return false;
}
- SOCKET hListenSocket = socket(((struct sockaddr*)&sockaddr)->sa_family, SOCK_STREAM, IPPROTO_TCP);
+ SOCKET hListenSocket = CreateSocket(addrBind);
if (hListenSocket == INVALID_SOCKET)
{
strError = strprintf("Error: Couldn't open socket for incoming connections (socket returned error %s)", NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError);
return false;
}
-
#ifndef WIN32
-#ifdef SO_NOSIGPIPE
- // Different way of disabling SIGPIPE on BSD
- setsockopt(hListenSocket, SOL_SOCKET, SO_NOSIGPIPE, (void*)&nOne, sizeof(int));
-#endif
// Allow binding if the port is still in TIME_WAIT state after
- // the program was closed and restarted. Not an issue on windows!
+ // the program was closed and restarted.
setsockopt(hListenSocket, SOL_SOCKET, SO_REUSEADDR, (void*)&nOne, sizeof(int));
+#else
+ setsockopt(hListenSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&nOne, sizeof(int));
#endif
- // Set to non-blocking, incoming connections will also inherit this
- if (!SetSocketNonBlocking(hListenSocket, true)) {
- strError = strprintf("BindListenPort: Setting listening socket to non-blocking failed, error %s\n", NetworkErrorString(WSAGetLastError()));
- LogPrintf("%s\n", strError);
- return false;
- }
-
// some systems don't have IPV6_V6ONLY but are always v6only; others do have the option
// and enable it by default or not. Try to enable it, if possible.
if (addrBind.IsIPv6()) {
@@ -1532,7 +2103,7 @@ bool BindListenPort(const CService &addrBind, string& strError, bool fWhiteliste
{
int nErr = WSAGetLastError();
if (nErr == WSAEADDRINUSE)
- strError = strprintf(_("Unable to bind to %s on this computer. Bitcoin Core is probably already running."), addrBind.ToString());
+ strError = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), addrBind.ToString(), _(PACKAGE_NAME));
else
strError = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), addrBind.ToString(), NetworkErrorString(nErr));
LogPrintf("%s\n", strError);
@@ -1558,7 +2129,7 @@ bool BindListenPort(const CService &addrBind, string& strError, bool fWhiteliste
return true;
}
-void static Discover(boost::thread_group& threadGroup)
+void Discover()
{
if (!fDiscover)
return;
@@ -1568,10 +2139,10 @@ void static Discover(boost::thread_group& threadGroup)
char pszHostName[256] = "";
if (gethostname(pszHostName, sizeof(pszHostName)) != SOCKET_ERROR)
{
- vector<CNetAddr> vaddr;
- if (LookupHost(pszHostName, vaddr))
+ std::vector<CNetAddr> vaddr;
+ if (LookupHost(pszHostName, vaddr, 0, true))
{
- BOOST_FOREACH (const CNetAddr &addr, vaddr)
+ for (const CNetAddr &addr : vaddr)
{
if (AddLocal(addr, LOCAL_IF))
LogPrintf("%s: %s - %s\n", __func__, pszHostName, addr.ToString());
@@ -1583,9 +2154,9 @@ void static Discover(boost::thread_group& threadGroup)
struct ifaddrs* myaddrs;
if (getifaddrs(&myaddrs) == 0)
{
- for (struct ifaddrs* ifa = myaddrs; ifa != NULL; ifa = ifa->ifa_next)
+ for (struct ifaddrs* ifa = myaddrs; ifa != nullptr; ifa = ifa->ifa_next)
{
- if (ifa->ifa_addr == NULL) continue;
+ if (ifa->ifa_addr == nullptr) continue;
if ((ifa->ifa_flags & IFF_UP) == 0) continue;
if (strcmp(ifa->ifa_name, "lo") == 0) continue;
if (strcmp(ifa->ifa_name, "lo0") == 0) continue;
@@ -1609,72 +2180,193 @@ void static Discover(boost::thread_group& threadGroup)
#endif
}
-void StartNode(boost::thread_group& threadGroup)
+void CConnman::SetNetworkActive(bool active)
+{
+ LogPrint(BCLog::NET, "SetNetworkActive: %s\n", active);
+
+ if (fNetworkActive == active) {
+ return;
+ }
+
+ fNetworkActive = active;
+
+ if (!fNetworkActive) {
+ LOCK(cs_vNodes);
+ // Close sockets to all nodes
+ for (CNode* pnode : vNodes) {
+ pnode->CloseSocketDisconnect();
+ }
+ }
+
+ uiInterface.NotifyNetworkActiveChanged(fNetworkActive);
+}
+
+CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSeed1(nSeed1In)
+{
+ fNetworkActive = true;
+ setBannedIsDirty = false;
+ fAddressesInitialized = false;
+ nLastNodeId = 0;
+ nSendBufferMaxSize = 0;
+ nReceiveFloodSize = 0;
+ flagInterruptMsgProc = false;
+ SetTryNewOutboundPeer(false);
+
+ Options connOptions;
+ Init(connOptions);
+}
+
+NodeId CConnman::GetNewNodeId()
+{
+ return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
+}
+
+
+bool CConnman::Bind(const CService &addr, unsigned int flags) {
+ if (!(flags & BF_EXPLICIT) && IsLimited(addr))
+ return false;
+ std::string strError;
+ if (!BindListenPort(addr, strError, (flags & BF_WHITELIST) != 0)) {
+ if ((flags & BF_REPORT_ERROR) && clientInterface) {
+ clientInterface->ThreadSafeMessageBox(strError, "", CClientUIInterface::MSG_ERROR);
+ }
+ return false;
+ }
+ return true;
+}
+
+bool CConnman::InitBinds(const std::vector<CService>& binds, const std::vector<CService>& whiteBinds) {
+ bool fBound = false;
+ for (const auto& addrBind : binds) {
+ fBound |= Bind(addrBind, (BF_EXPLICIT | BF_REPORT_ERROR));
+ }
+ for (const auto& addrBind : whiteBinds) {
+ fBound |= Bind(addrBind, (BF_EXPLICIT | BF_REPORT_ERROR | BF_WHITELIST));
+ }
+ if (binds.empty() && whiteBinds.empty()) {
+ struct in_addr inaddr_any;
+ inaddr_any.s_addr = INADDR_ANY;
+ fBound |= Bind(CService(in6addr_any, GetListenPort()), BF_NONE);
+ fBound |= Bind(CService(inaddr_any, GetListenPort()), !fBound ? BF_REPORT_ERROR : BF_NONE);
+ }
+ return fBound;
+}
+
+bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
{
- uiInterface.InitMessage(_("Loading addresses..."));
- // Load addresses for peers.dat
+ Init(connOptions);
+
+ {
+ LOCK(cs_totalBytesRecv);
+ nTotalBytesRecv = 0;
+ }
+ {
+ LOCK(cs_totalBytesSent);
+ nTotalBytesSent = 0;
+ nMaxOutboundTotalBytesSentInCycle = 0;
+ nMaxOutboundCycleStartTime = 0;
+ }
+
+ if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds)) {
+ if (clientInterface) {
+ clientInterface->ThreadSafeMessageBox(
+ _("Failed to listen on any port. Use -listen=0 if you want this."),
+ "", CClientUIInterface::MSG_ERROR);
+ }
+ return false;
+ }
+
+ for (const auto& strDest : connOptions.vSeedNodes) {
+ AddOneShot(strDest);
+ }
+
+ if (clientInterface) {
+ clientInterface->InitMessage(_("Loading P2P addresses..."));
+ }
+ // Load addresses from peers.dat
int64_t nStart = GetTimeMillis();
{
CAddrDB adb;
- if (!adb.Read(addrman))
+ if (adb.Read(addrman))
+ LogPrintf("Loaded %i addresses from peers.dat %dms\n", addrman.size(), GetTimeMillis() - nStart);
+ else {
+ addrman.Clear(); // Addrman can be in an inconsistent state after failure, reset it
LogPrintf("Invalid or missing peers.dat; recreating\n");
+ DumpAddresses();
+ }
}
- LogPrintf("Loaded %i addresses from peers.dat %dms\n",
- addrman.size(), GetTimeMillis() - nStart);
+ if (clientInterface)
+ clientInterface->InitMessage(_("Loading banlist..."));
+ // Load addresses from banlist.dat
+ nStart = GetTimeMillis();
+ CBanDB bandb;
+ banmap_t banmap;
+ if (bandb.Read(banmap)) {
+ SetBanned(banmap); // thread save setter
+ SetBannedSetDirty(false); // no need to write down, just read data
+ SweepBanned(); // sweep out unused entries
+
+ LogPrint(BCLog::NET, "Loaded %d banned node ips/subnets from banlist.dat %dms\n",
+ banmap.size(), GetTimeMillis() - nStart);
+ } else {
+ LogPrintf("Invalid or missing banlist.dat; recreating\n");
+ SetBannedSetDirty(true); // force write
+ DumpBanlist();
+ }
+
+ uiInterface.InitMessage(_("Starting network threads..."));
+
fAddressesInitialized = true;
- if (semOutbound == NULL) {
+ if (semOutbound == nullptr) {
// initialize semaphore
- int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, nMaxConnections);
- semOutbound = new CSemaphore(nMaxOutbound);
+ semOutbound = MakeUnique<CSemaphore>(std::min((nMaxOutbound + nMaxFeeler), nMaxConnections));
+ }
+ if (semAddnode == nullptr) {
+ // initialize semaphore
+ semAddnode = MakeUnique<CSemaphore>(nMaxAddnode);
}
-
- if (pnodeLocalHost == NULL)
- pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices));
-
- Discover(threadGroup);
//
// Start threads
//
+ assert(m_msgproc);
+ InterruptSocks5(false);
+ interruptNet.reset();
+ flagInterruptMsgProc = false;
- if (!GetBoolArg("-dnsseed", true))
- LogPrintf("DNS seeding disabled\n");
- else
- threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "dnsseed", &ThreadDNSAddressSeed));
-
- // Map ports with UPnP
- MapPort(GetBoolArg("-upnp", DEFAULT_UPNP));
+ {
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = false;
+ }
// Send and receive from sockets, accept connections
- threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler));
+ threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
+
+ if (!gArgs.GetBoolArg("-dnsseed", true))
+ LogPrintf("DNS seeding disabled\n");
+ else
+ threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));
// Initiate outbound connections from -addnode
- threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections));
+ threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));
- // Initiate outbound connections
- threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections));
+ if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) {
+ if (clientInterface) {
+ clientInterface->ThreadSafeMessageBox(
+ _("Cannot provide specific connections and have addrman find outgoing connections at the same."),
+ "", CClientUIInterface::MSG_ERROR);
+ }
+ return false;
+ }
+ if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty())
+ threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing)));
// Process messages
- threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
+ threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
// Dump network addresses
- threadGroup.create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, DUMP_ADDRESSES_INTERVAL * 1000));
-}
-
-bool StopNode()
-{
- LogPrintf("StopNode()\n");
- MapPort(false);
- if (semOutbound)
- for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++)
- semOutbound->post();
-
- if (fAddressesInitialized)
- {
- DumpAddresses();
- fAddressesInitialized = false;
- }
+ scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
return true;
}
@@ -1686,28 +2378,6 @@ public:
~CNetCleanup()
{
- // Close sockets
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->hSocket != INVALID_SOCKET)
- CloseSocket(pnode->hSocket);
- BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket)
- if (hListenSocket.socket != INVALID_SOCKET)
- if (!CloseSocket(hListenSocket.socket))
- LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError()));
-
- // clean up some globals (to help leak detection)
- BOOST_FOREACH(CNode *pnode, vNodes)
- delete pnode;
- BOOST_FOREACH(CNode *pnode, vNodesDisconnected)
- delete pnode;
- vNodes.clear();
- vNodesDisconnected.clear();
- vhListenSocket.clear();
- delete semOutbound;
- semOutbound = NULL;
- delete pnodeLocalHost;
- pnodeLocalHost = NULL;
-
#ifdef WIN32
// Shutdown Windows Sockets
WSACleanup();
@@ -1716,289 +2386,403 @@ public:
}
instance_of_cnetcleanup;
+void CConnman::Interrupt()
+{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ flagInterruptMsgProc = true;
+ }
+ condMsgProc.notify_all();
+ interruptNet();
+ InterruptSocks5(true);
+ if (semOutbound) {
+ for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) {
+ semOutbound->post();
+ }
+ }
+ if (semAddnode) {
+ for (int i=0; i<nMaxAddnode; i++) {
+ semAddnode->post();
+ }
+ }
+}
+void CConnman::Stop()
+{
+ if (threadMessageHandler.joinable())
+ threadMessageHandler.join();
+ if (threadOpenConnections.joinable())
+ threadOpenConnections.join();
+ if (threadOpenAddedConnections.joinable())
+ threadOpenAddedConnections.join();
+ if (threadDNSAddressSeed.joinable())
+ threadDNSAddressSeed.join();
+ if (threadSocketHandler.joinable())
+ threadSocketHandler.join();
+ if (fAddressesInitialized)
+ {
+ DumpData();
+ fAddressesInitialized = false;
+ }
-void RelayTransaction(const CTransaction& tx)
+ // Close sockets
+ for (CNode* pnode : vNodes)
+ pnode->CloseSocketDisconnect();
+ for (ListenSocket& hListenSocket : vhListenSocket)
+ if (hListenSocket.socket != INVALID_SOCKET)
+ if (!CloseSocket(hListenSocket.socket))
+ LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError()));
+
+ // clean up some globals (to help leak detection)
+ for (CNode *pnode : vNodes) {
+ DeleteNode(pnode);
+ }
+ for (CNode *pnode : vNodesDisconnected) {
+ DeleteNode(pnode);
+ }
+ vNodes.clear();
+ vNodesDisconnected.clear();
+ vhListenSocket.clear();
+ semOutbound.reset();
+ semAddnode.reset();
+}
+
+void CConnman::DeleteNode(CNode* pnode)
{
- CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
- ss.reserve(10000);
- ss << tx;
- RelayTransaction(tx, ss);
+ assert(pnode);
+ bool fUpdateConnectionTime = false;
+ m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
+ if(fUpdateConnectionTime) {
+ addrman.Connected(pnode->addr);
+ }
+ delete pnode;
}
-void RelayTransaction(const CTransaction& tx, const CDataStream& ss)
+CConnman::~CConnman()
{
- CInv inv(MSG_TX, tx.GetHash());
- {
- LOCK(cs_mapRelay);
- // Expire old relay messages
- while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
- {
- mapRelay.erase(vRelayExpiration.front().second);
- vRelayExpiration.pop_front();
- }
+ Interrupt();
+ Stop();
+}
- // Save original serialized message so newer versions are preserved
- mapRelay.insert(std::make_pair(inv, ss));
- vRelayExpiration.push_back(std::make_pair(GetTime() + 15 * 60, inv));
- }
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- {
- if(!pnode->fRelayTxes)
- continue;
- LOCK(pnode->cs_filter);
- if (pnode->pfilter)
- {
- if (pnode->pfilter->IsRelevantAndUpdate(tx))
- pnode->PushInventory(inv);
- } else
- pnode->PushInventory(inv);
- }
+size_t CConnman::GetAddressCount() const
+{
+ return addrman.size();
}
-void CNode::RecordBytesRecv(uint64_t bytes)
+void CConnman::SetServices(const CService &addr, ServiceFlags nServices)
{
- LOCK(cs_totalBytesRecv);
- nTotalBytesRecv += bytes;
+ addrman.SetServices(addr, nServices);
}
-void CNode::RecordBytesSent(uint64_t bytes)
+void CConnman::MarkAddressGood(const CAddress& addr)
{
- LOCK(cs_totalBytesSent);
- nTotalBytesSent += bytes;
+ addrman.Good(addr);
}
-uint64_t CNode::GetTotalBytesRecv()
+void CConnman::AddNewAddresses(const std::vector<CAddress>& vAddr, const CAddress& addrFrom, int64_t nTimePenalty)
{
- LOCK(cs_totalBytesRecv);
- return nTotalBytesRecv;
+ addrman.Add(vAddr, addrFrom, nTimePenalty);
}
-uint64_t CNode::GetTotalBytesSent()
+std::vector<CAddress> CConnman::GetAddresses()
{
- LOCK(cs_totalBytesSent);
- return nTotalBytesSent;
+ return addrman.GetAddr();
}
-void CNode::Fuzz(int nChance)
+bool CConnman::AddNode(const std::string& strNode)
{
- if (!fSuccessfullyConnected) return; // Don't fuzz initial handshake
- if (GetRand(nChance) != 0) return; // Fuzz 1 of every nChance messages
+ LOCK(cs_vAddedNodes);
+ for (const std::string& it : vAddedNodes) {
+ if (strNode == it) return false;
+ }
- switch (GetRand(3))
- {
- case 0:
- // xor a random byte with a random value:
- if (!ssSend.empty()) {
- CDataStream::size_type pos = GetRand(ssSend.size());
- ssSend[pos] ^= (unsigned char)(GetRand(256));
- }
- break;
- case 1:
- // delete a random byte:
- if (!ssSend.empty()) {
- CDataStream::size_type pos = GetRand(ssSend.size());
- ssSend.erase(ssSend.begin()+pos);
- }
- break;
- case 2:
- // insert a random byte at a random position
- {
- CDataStream::size_type pos = GetRand(ssSend.size());
- char ch = (char)GetRand(256);
- ssSend.insert(ssSend.begin()+pos, ch);
+ vAddedNodes.push_back(strNode);
+ return true;
+}
+
+bool CConnman::RemoveAddedNode(const std::string& strNode)
+{
+ LOCK(cs_vAddedNodes);
+ for(std::vector<std::string>::iterator it = vAddedNodes.begin(); it != vAddedNodes.end(); ++it) {
+ if (strNode == *it) {
+ vAddedNodes.erase(it);
+ return true;
}
- break;
}
- // Chance of more than one change half the time:
- // (more changes exponentially less likely):
- Fuzz(2);
+ return false;
}
-//
-// CAddrDB
-//
+size_t CConnman::GetNodeCount(NumConnections flags)
+{
+ LOCK(cs_vNodes);
+ if (flags == CConnman::CONNECTIONS_ALL) // Shortcut if we want total
+ return vNodes.size();
-CAddrDB::CAddrDB()
+ int nNum = 0;
+ for (const auto& pnode : vNodes) {
+ if (flags & (pnode->fInbound ? CONNECTIONS_IN : CONNECTIONS_OUT)) {
+ nNum++;
+ }
+ }
+
+ return nNum;
+}
+
+void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats)
{
- pathAddr = GetDataDir() / "peers.dat";
+ vstats.clear();
+ LOCK(cs_vNodes);
+ vstats.reserve(vNodes.size());
+ for (CNode* pnode : vNodes) {
+ vstats.emplace_back();
+ pnode->copyStats(vstats.back());
+ }
}
-bool CAddrDB::Write(const CAddrMan& addr)
+bool CConnman::DisconnectNode(const std::string& strNode)
+{
+ LOCK(cs_vNodes);
+ if (CNode* pnode = FindNode(strNode)) {
+ pnode->fDisconnect = true;
+ return true;
+ }
+ return false;
+}
+bool CConnman::DisconnectNode(NodeId id)
{
- // Generate random temporary filename
- unsigned short randv = 0;
- GetRandBytes((unsigned char*)&randv, sizeof(randv));
- std::string tmpfn = strprintf("peers.dat.%04x", randv);
+ LOCK(cs_vNodes);
+ for(CNode* pnode : vNodes) {
+ if (id == pnode->GetId()) {
+ pnode->fDisconnect = true;
+ return true;
+ }
+ }
+ return false;
+}
- // serialize addresses, checksum data up to that point, then append csum
- CDataStream ssPeers(SER_DISK, CLIENT_VERSION);
- ssPeers << FLATDATA(Params().MessageStart());
- ssPeers << addr;
- uint256 hash = Hash(ssPeers.begin(), ssPeers.end());
- ssPeers << hash;
+void CConnman::RecordBytesRecv(uint64_t bytes)
+{
+ LOCK(cs_totalBytesRecv);
+ nTotalBytesRecv += bytes;
+}
- // open temp output file, and associate with CAutoFile
- boost::filesystem::path pathTmp = GetDataDir() / tmpfn;
- FILE *file = fopen(pathTmp.string().c_str(), "wb");
- CAutoFile fileout(file, SER_DISK, CLIENT_VERSION);
- if (fileout.IsNull())
- return error("%s : Failed to open file %s", __func__, pathTmp.string());
+void CConnman::RecordBytesSent(uint64_t bytes)
+{
+ LOCK(cs_totalBytesSent);
+ nTotalBytesSent += bytes;
- // Write and commit header, data
- try {
- fileout << ssPeers;
- }
- catch (std::exception &e) {
- return error("%s : Serialize or I/O error - %s", __func__, e.what());
+ uint64_t now = GetTime();
+ if (nMaxOutboundCycleStartTime + nMaxOutboundTimeframe < now)
+ {
+ // timeframe expired, reset cycle
+ nMaxOutboundCycleStartTime = now;
+ nMaxOutboundTotalBytesSentInCycle = 0;
}
- FileCommit(fileout.Get());
- fileout.fclose();
- // replace existing peers.dat, if any, with new peers.dat.XXXX
- if (!RenameOver(pathTmp, pathAddr))
- return error("%s : Rename-into-place failed", __func__);
+ // TODO, exclude whitebind peers
+ nMaxOutboundTotalBytesSentInCycle += bytes;
+}
- return true;
+void CConnman::SetMaxOutboundTarget(uint64_t limit)
+{
+ LOCK(cs_totalBytesSent);
+ nMaxOutboundLimit = limit;
}
-bool CAddrDB::Read(CAddrMan& addr)
+uint64_t CConnman::GetMaxOutboundTarget()
{
- // open input file, and associate with CAutoFile
- FILE *file = fopen(pathAddr.string().c_str(), "rb");
- CAutoFile filein(file, SER_DISK, CLIENT_VERSION);
- if (filein.IsNull())
- return error("%s : Failed to open file %s", __func__, pathAddr.string());
+ LOCK(cs_totalBytesSent);
+ return nMaxOutboundLimit;
+}
- // use file size to size memory buffer
- int fileSize = boost::filesystem::file_size(pathAddr);
- int dataSize = fileSize - sizeof(uint256);
- // Don't try to resize to a negative number if file is small
- if (dataSize < 0)
- dataSize = 0;
- vector<unsigned char> vchData;
- vchData.resize(dataSize);
- uint256 hashIn;
+uint64_t CConnman::GetMaxOutboundTimeframe()
+{
+ LOCK(cs_totalBytesSent);
+ return nMaxOutboundTimeframe;
+}
- // read data and checksum from file
- try {
- filein.read((char *)&vchData[0], dataSize);
- filein >> hashIn;
+uint64_t CConnman::GetMaxOutboundTimeLeftInCycle()
+{
+ LOCK(cs_totalBytesSent);
+ if (nMaxOutboundLimit == 0)
+ return 0;
+
+ if (nMaxOutboundCycleStartTime == 0)
+ return nMaxOutboundTimeframe;
+
+ uint64_t cycleEndTime = nMaxOutboundCycleStartTime + nMaxOutboundTimeframe;
+ uint64_t now = GetTime();
+ return (cycleEndTime < now) ? 0 : cycleEndTime - GetTime();
+}
+
+void CConnman::SetMaxOutboundTimeframe(uint64_t timeframe)
+{
+ LOCK(cs_totalBytesSent);
+ if (nMaxOutboundTimeframe != timeframe)
+ {
+ // reset measure-cycle in case of changing
+ // the timeframe
+ nMaxOutboundCycleStartTime = GetTime();
}
- catch (std::exception &e) {
- return error("%s : Deserialize or I/O error - %s", __func__, e.what());
+ nMaxOutboundTimeframe = timeframe;
+}
+
+bool CConnman::OutboundTargetReached(bool historicalBlockServingLimit)
+{
+ LOCK(cs_totalBytesSent);
+ if (nMaxOutboundLimit == 0)
+ return false;
+
+ if (historicalBlockServingLimit)
+ {
+ // keep a large enough buffer to at least relay each block once
+ uint64_t timeLeftInCycle = GetMaxOutboundTimeLeftInCycle();
+ uint64_t buffer = timeLeftInCycle / 600 * MAX_BLOCK_SERIALIZED_SIZE;
+ if (buffer >= nMaxOutboundLimit || nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit - buffer)
+ return true;
}
- filein.fclose();
+ else if (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit)
+ return true;
- CDataStream ssPeers(vchData, SER_DISK, CLIENT_VERSION);
+ return false;
+}
- // verify stored checksum matches input data
- uint256 hashTmp = Hash(ssPeers.begin(), ssPeers.end());
- if (hashIn != hashTmp)
- return error("%s : Checksum mismatch, data corrupted", __func__);
+uint64_t CConnman::GetOutboundTargetBytesLeft()
+{
+ LOCK(cs_totalBytesSent);
+ if (nMaxOutboundLimit == 0)
+ return 0;
- unsigned char pchMsgTmp[4];
- try {
- // de-serialize file header (network specific magic number) and ..
- ssPeers >> FLATDATA(pchMsgTmp);
+ return (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) ? 0 : nMaxOutboundLimit - nMaxOutboundTotalBytesSentInCycle;
+}
- // ... verify the network matches ours
- if (memcmp(pchMsgTmp, Params().MessageStart(), sizeof(pchMsgTmp)))
- return error("%s : Invalid network magic number", __func__);
+uint64_t CConnman::GetTotalBytesRecv()
+{
+ LOCK(cs_totalBytesRecv);
+ return nTotalBytesRecv;
+}
- // de-serialize address data into one CAddrMan object
- ssPeers >> addr;
- }
- catch (std::exception &e) {
- return error("%s : Deserialize or I/O error - %s", __func__, e.what());
- }
+uint64_t CConnman::GetTotalBytesSent()
+{
+ LOCK(cs_totalBytesSent);
+ return nTotalBytesSent;
+}
- return true;
+ServiceFlags CConnman::GetLocalServices() const
+{
+ return nLocalServices;
}
-unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); }
-unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); }
+void CConnman::SetBestHeight(int height)
+{
+ nBestHeight.store(height, std::memory_order_release);
+}
+
+int CConnman::GetBestHeight() const
+{
+ return nBestHeight.load(std::memory_order_acquire);
+}
+
+unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; }
-CNode::CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn, bool fInboundIn) : ssSend(SER_NETWORK, INIT_PROTO_VERSION), setAddrKnown(5000)
+CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string& addrNameIn, bool fInboundIn) :
+ nTimeConnected(GetSystemTimeInSeconds()),
+ addr(addrIn),
+ addrBind(addrBindIn),
+ fInbound(fInboundIn),
+ nKeyedNetGroup(nKeyedNetGroupIn),
+ addrKnown(5000, 0.001),
+ filterInventoryKnown(50000, 0.000001),
+ id(idIn),
+ nLocalHostNonce(nLocalHostNonceIn),
+ nLocalServices(nLocalServicesIn),
+ nMyStartingHeight(nMyStartingHeightIn),
+ nSendVersion(0)
{
- nServices = 0;
+ nServices = NODE_NONE;
hSocket = hSocketIn;
nRecvVersion = INIT_PROTO_VERSION;
nLastSend = 0;
nLastRecv = 0;
nSendBytes = 0;
nRecvBytes = 0;
- nTimeConnected = GetTime();
- addr = addrIn;
+ nTimeOffset = 0;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0;
strSubVer = "";
fWhitelisted = false;
fOneShot = false;
+ m_manual_connection = false;
fClient = false; // set by version message
- fInbound = fInboundIn;
- fNetworkNode = false;
+ fFeeler = false;
fSuccessfullyConnected = false;
fDisconnect = false;
nRefCount = 0;
nSendSize = 0;
nSendOffset = 0;
- hashContinue = 0;
+ hashContinue = uint256();
nStartingHeight = -1;
+ filterInventoryKnown.reset();
+ fSendMempool = false;
fGetAddr = false;
+ nNextLocalAddrSend = 0;
+ nNextAddrSend = 0;
+ nNextInvSend = 0;
fRelayTxes = false;
- setInventoryKnown.max_size(SendBufferSize() / 1000);
- pfilter = new CBloomFilter();
+ fSentAddr = false;
+ pfilter = MakeUnique<CBloomFilter>();
+ timeLastMempoolReq = 0;
+ nLastBlockTime = 0;
+ nLastTXTime = 0;
nPingNonceSent = 0;
nPingUsecStart = 0;
nPingUsecTime = 0;
fPingQueued = false;
-
- {
- LOCK(cs_nLastNodeId);
- id = nLastNodeId++;
+ nMinPingUsecTime = std::numeric_limits<int64_t>::max();
+ minFeeFilter = 0;
+ lastSentFeeFilter = 0;
+ nextSendTimeFeeFilter = 0;
+ fPauseRecv = false;
+ fPauseSend = false;
+ nProcessQueueSize = 0;
+
+ for (const std::string &msg : getAllNetMessageTypes())
+ mapRecvBytesPerMsgCmd[msg] = 0;
+ mapRecvBytesPerMsgCmd[NET_MESSAGE_COMMAND_OTHER] = 0;
+
+ if (fLogIPs) {
+ LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", addrName, id);
+ } else {
+ LogPrint(BCLog::NET, "Added connection peer=%d\n", id);
}
-
- if (fLogIPs)
- LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
- else
- LogPrint("net", "Added connection peer=%d\n", id);
-
- // Be shy and don't send version until we hear
- if (hSocket != INVALID_SOCKET && !fInbound)
- PushVersion();
-
- GetNodeSignals().InitializeNode(GetId(), this);
}
CNode::~CNode()
{
CloseSocket(hSocket);
-
- if (pfilter)
- delete pfilter;
-
- GetNodeSignals().FinalizeNode(GetId());
}
void CNode::AskFor(const CInv& inv)
{
- if (mapAskFor.size() > MAPASKFOR_MAX_SZ)
+ if (mapAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ)
+ return;
+ // a peer may not have multiple non-responded queue positions for a single inv item
+ if (!setAskFor.insert(inv.hash).second)
return;
+
// We're using mapAskFor as a priority queue,
// the key is the earliest time the request can be sent
int64_t nRequestTime;
- limitedmap<CInv, int64_t>::const_iterator it = mapAlreadyAskedFor.find(inv);
+ limitedmap<uint256, int64_t>::const_iterator it = mapAlreadyAskedFor.find(inv.hash);
if (it != mapAlreadyAskedFor.end())
nRequestTime = it->second;
else
nRequestTime = 0;
- LogPrint("net", "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id);
+ LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id);
// Make sure not to reuse time indexes to keep things in the same order
int64_t nNow = GetTimeMicros() - 1000000;
@@ -2012,64 +2796,77 @@ void CNode::AskFor(const CInv& inv)
if (it != mapAlreadyAskedFor.end())
mapAlreadyAskedFor.update(it, nRequestTime);
else
- mapAlreadyAskedFor.insert(std::make_pair(inv, nRequestTime));
+ mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime));
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}
-void CNode::BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend)
+bool CConnman::NodeFullyConnected(const CNode* pnode)
{
- ENTER_CRITICAL_SECTION(cs_vSend);
- assert(ssSend.size() == 0);
- ssSend << CMessageHeader(pszCommand, 0);
- LogPrint("net", "sending: %s ", pszCommand);
+ return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
}
-void CNode::AbortMessage() UNLOCK_FUNCTION(cs_vSend)
+void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
- ssSend.clear();
+ size_t nMessageSize = msg.data.size();
+ size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
+ LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->GetId());
- LEAVE_CRITICAL_SECTION(cs_vSend);
+ std::vector<unsigned char> serializedHeader;
+ serializedHeader.reserve(CMessageHeader::HEADER_SIZE);
+ uint256 hash = Hash(msg.data.data(), msg.data.data() + nMessageSize);
+ CMessageHeader hdr(Params().MessageStart(), msg.command.c_str(), nMessageSize);
+ memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
- LogPrint("net", "(aborted)\n");
-}
+ CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, serializedHeader, 0, hdr};
-void CNode::EndMessage() UNLOCK_FUNCTION(cs_vSend)
-{
- // The -*messagestest options are intentionally not documented in the help message,
- // since they are only used during development to debug the networking code and are
- // not intended for end-users.
- if (mapArgs.count("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 2)) == 0)
+ size_t nBytesSent = 0;
{
- LogPrint("net", "dropmessages DROPPING SEND MESSAGE\n");
- AbortMessage();
- return;
+ LOCK(pnode->cs_vSend);
+ bool optimisticSend(pnode->vSendMsg.empty());
+
+ //log total amount of bytes per command
+ pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
+ pnode->nSendSize += nTotalSize;
+
+ if (pnode->nSendSize > nSendBufferMaxSize)
+ pnode->fPauseSend = true;
+ pnode->vSendMsg.push_back(std::move(serializedHeader));
+ if (nMessageSize)
+ pnode->vSendMsg.push_back(std::move(msg.data));
+
+ // If write queue empty, attempt "optimistic write"
+ if (optimisticSend == true)
+ nBytesSent = SocketSendData(pnode);
}
- if (mapArgs.count("-fuzzmessagestest"))
- Fuzz(GetArg("-fuzzmessagestest", 10));
-
- if (ssSend.size() == 0)
- return;
-
- // Set the size
- unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE;
- memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize));
+ if (nBytesSent)
+ RecordBytesSent(nBytesSent);
+}
- // Set the checksum
- uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
- unsigned int nChecksum = 0;
- memcpy(&nChecksum, &hash, sizeof(nChecksum));
- assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
- memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum));
+bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
+{
+ CNode* found = nullptr;
+ LOCK(cs_vNodes);
+ for (auto&& pnode : vNodes) {
+ if(pnode->GetId() == id) {
+ found = pnode;
+ break;
+ }
+ }
+ return found != nullptr && NodeFullyConnected(found) && func(found);
+}
- LogPrint("net", "(%d bytes) peer=%d\n", nSize, id);
+int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) {
+ return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5);
+}
- std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData());
- ssSend.GetAndClear(*it);
- nSendSize += (*it).size();
+CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const
+{
+ return CSipHasher(nSeed0, nSeed1).Write(id);
+}
- // If write queue empty, attempt "optimistic write"
- if (it == vSendMsg.begin())
- SocketSendData(this);
+uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
+{
+ std::vector<unsigned char> vchNetGroup(ad.GetGroup());
- LEAVE_CRITICAL_SECTION(cs_vSend);
+ return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
}