diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 2599 |
1 files changed, 1698 insertions, 901 deletions
diff --git a/src/net.cpp b/src/net.cpp index 42b3c30fb7..201914685c 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1,19 +1,24 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto -// Copyright (c) 2009-2014 The Bitcoin developers -// Distributed under the MIT/X11 software license, see the accompanying +// Copyright (c) 2009-2017 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #if defined(HAVE_CONFIG_H) -#include "config/bitcoin-config.h" +#include <config/bitcoin-config.h> #endif -#include "net.h" +#include <net.h> -#include "addrman.h" -#include "chainparams.h" -#include "clientversion.h" -#include "primitives/transaction.h" -#include "ui_interface.h" +#include <chainparams.h> +#include <clientversion.h> +#include <consensus/consensus.h> +#include <crypto/common.h> +#include <crypto/sha256.h> +#include <primitives/transaction.h> +#include <netbase.h> +#include <scheduler.h> +#include <ui_interface.h> +#include <utilstrencodings.h> #ifdef WIN32 #include <string.h> @@ -28,16 +33,24 @@ #include <miniupnpc/upnperrors.h> #endif -#include <boost/filesystem.hpp> -#include <boost/thread.hpp> -// Dump addresses to peers.dat every 15 minutes (900s) +#include <math.h> + +// Dump addresses to peers.dat and banlist.dat every 15 minutes (900s) #define DUMP_ADDRESSES_INTERVAL 900 -#if !defined(HAVE_MSG_NOSIGNAL) && !defined(MSG_NOSIGNAL) +// We add a random period time (0 to 1 seconds) to feeler connections to prevent synchronization. +#define FEELER_SLEEP_WINDOW 1 + +#if !defined(HAVE_MSG_NOSIGNAL) #define MSG_NOSIGNAL 0 #endif +// MSG_DONTWAIT is not available on some platforms, if it doesn't exist define it as 0 +#if !defined(HAVE_MSG_DONTWAIT) +#define MSG_DONTWAIT 0 +#endif + // Fix for ancient MinGW versions, that don't have defined these in ws2tcpip.h. // Todo: Can be removed when our pull-tester is upgraded to a modern MinGW version. #ifdef WIN32 @@ -49,63 +62,32 @@ #endif #endif -using namespace boost; -using namespace std; - -namespace { - const int MAX_OUTBOUND_CONNECTIONS = 8; - - struct ListenSocket { - SOCKET socket; - bool whitelisted; +/** Used to pass flags to the Bind() function */ +enum BindFlags { + BF_NONE = 0, + BF_EXPLICIT = (1U << 0), + BF_REPORT_ERROR = (1U << 1), + BF_WHITELIST = (1U << 2), +}; - ListenSocket(SOCKET socket, bool whitelisted) : socket(socket), whitelisted(whitelisted) {} - }; -} +const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*"; +static const uint64_t RANDOMIZER_ID_NETGROUP = 0x6c0edd8036ef4036ULL; // SHA256("netgroup")[0:8] +static const uint64_t RANDOMIZER_ID_LOCALHOSTNONCE = 0xd93e69e2bbfa5735ULL; // SHA256("localhostnonce")[0:8] // // Global state variables // bool fDiscover = true; bool fListen = true; -uint64_t nLocalServices = NODE_NETWORK; +bool fRelayTxes = true; CCriticalSection cs_mapLocalHost; -map<CNetAddr, LocalServiceInfo> mapLocalHost; -static bool vfReachable[NET_MAX] = {}; +std::map<CNetAddr, LocalServiceInfo> mapLocalHost; static bool vfLimited[NET_MAX] = {}; -static CNode* pnodeLocalHost = NULL; -uint64_t nLocalHostNonce = 0; -static std::vector<ListenSocket> vhListenSocket; -CAddrMan addrman; -int nMaxConnections = 125; -bool fAddressesInitialized = false; - -vector<CNode*> vNodes; -CCriticalSection cs_vNodes; -map<CInv, CDataStream> mapRelay; -deque<pair<int64_t, CInv> > vRelayExpiration; -CCriticalSection cs_mapRelay; -limitedmap<CInv, int64_t> mapAlreadyAskedFor(MAX_INV_SZ); - -static deque<string> vOneShots; -CCriticalSection cs_vOneShots; - -set<CNetAddr> setservAddNodeAddresses; -CCriticalSection cs_setservAddNodeAddresses; - -vector<std::string> vAddedNodes; -CCriticalSection cs_vAddedNodes; - -NodeId nLastNodeId = 0; -CCriticalSection cs_nLastNodeId; +std::string strSubVersion; -static CSemaphore *semOutbound = NULL; +limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ); -// Signals for message handling -static CNodeSignals g_signals; -CNodeSignals& GetNodeSignals() { return g_signals; } - -void AddOneShot(string strDest) +void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); vOneShots.push_back(strDest); @@ -113,7 +95,7 @@ void AddOneShot(string strDest) unsigned short GetListenPort() { - return (unsigned short)(GetArg("-port", Params().GetDefaultPort())); + return (unsigned short)(gArgs.GetArg("-port", Params().GetDefaultPort())); } // find 'best' local address for a particular peer @@ -126,13 +108,13 @@ bool GetLocal(CService& addr, const CNetAddr *paddrPeer) int nBestReachability = -1; { LOCK(cs_mapLocalHost); - for (map<CNetAddr, LocalServiceInfo>::iterator it = mapLocalHost.begin(); it != mapLocalHost.end(); it++) + for (const auto& entry : mapLocalHost) { - int nScore = (*it).second.nScore; - int nReachability = (*it).first.GetReachabilityFrom(paddrPeer); + int nScore = entry.second.nScore; + int nReachability = entry.first.GetReachabilityFrom(paddrPeer); if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) { - addr = CService((*it).first, (*it).second.nPort); + addr = CService(entry.first, entry.second.nPort); nBestReachability = nReachability; nBestScore = nScore; } @@ -141,73 +123,42 @@ bool GetLocal(CService& addr, const CNetAddr *paddrPeer) return nBestScore >= 0; } +//! Convert the pnSeeds6 array into usable address objects. +static std::vector<CAddress> convertSeed6(const std::vector<SeedSpec6> &vSeedsIn) +{ + // It'll only connect to one or two seed nodes because once it connects, + // it'll get a pile of addresses with newer timestamps. + // Seed nodes are given a random 'last seen time' of between one and two + // weeks ago. + const int64_t nOneWeek = 7*24*60*60; + std::vector<CAddress> vSeedsOut; + vSeedsOut.reserve(vSeedsIn.size()); + for (const auto& seed_in : vSeedsIn) { + struct in6_addr ip; + memcpy(&ip, seed_in.addr, sizeof(ip)); + CAddress addr(CService(ip, seed_in.port), GetDesirableServiceFlags(NODE_NONE)); + addr.nTime = GetTime() - GetRand(nOneWeek) - nOneWeek; + vSeedsOut.push_back(addr); + } + return vSeedsOut; +} + // get best local address for a particular peer as a CAddress // Otherwise, return the unroutable 0.0.0.0 but filled in with // the normal parameters, since the IP may be changed to a useful // one by discovery. -CAddress GetLocalAddress(const CNetAddr *paddrPeer) +CAddress GetLocalAddress(const CNetAddr *paddrPeer, ServiceFlags nLocalServices) { - CAddress ret(CService("0.0.0.0",GetListenPort()),0); + CAddress ret(CService(CNetAddr(),GetListenPort()), nLocalServices); CService addr; if (GetLocal(addr, paddrPeer)) { - ret = CAddress(addr); + ret = CAddress(addr, nLocalServices); } - ret.nServices = nLocalServices; ret.nTime = GetAdjustedTime(); return ret; } -bool RecvLine(SOCKET hSocket, string& strLine) -{ - strLine = ""; - while (true) - { - char c; - int nBytes = recv(hSocket, &c, 1, 0); - if (nBytes > 0) - { - if (c == '\n') - continue; - if (c == '\r') - return true; - strLine += c; - if (strLine.size() >= 9000) - return true; - } - else if (nBytes <= 0) - { - boost::this_thread::interruption_point(); - if (nBytes < 0) - { - int nErr = WSAGetLastError(); - if (nErr == WSAEMSGSIZE) - continue; - if (nErr == WSAEWOULDBLOCK || nErr == WSAEINTR || nErr == WSAEINPROGRESS) - { - MilliSleep(10); - continue; - } - } - if (!strLine.empty()) - return true; - if (nBytes == 0) - { - // socket closed - LogPrint("net", "socket closed\n"); - return false; - } - else - { - // socket error - int nErr = WSAGetLastError(); - LogPrint("net", "recv failed: %s\n", NetworkErrorString(nErr)); - return false; - } - } - } -} - int GetnScore(const CService& addr) { LOCK(cs_mapLocalHost); @@ -219,39 +170,34 @@ int GetnScore(const CService& addr) // Is our peer's addrLocal potentially useful as an external IP source? bool IsPeerAddrLocalGood(CNode *pnode) { - return fDiscover && pnode->addr.IsRoutable() && pnode->addrLocal.IsRoutable() && - !IsLimited(pnode->addrLocal.GetNetwork()); + CService addrLocal = pnode->GetAddrLocal(); + return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() && + !IsLimited(addrLocal.GetNetwork()); } // pushes our own address to a peer -void AdvertizeLocal(CNode *pnode) +void AdvertiseLocal(CNode *pnode) { if (fListen && pnode->fSuccessfullyConnected) { - CAddress addrLocal = GetLocalAddress(&pnode->addr); + CAddress addrLocal = GetLocalAddress(&pnode->addr, pnode->GetLocalServices()); // If discovery is enabled, sometimes give our peer the address it // tells us that it sees us as in case it has a better idea of our // address than we do. if (IsPeerAddrLocalGood(pnode) && (!addrLocal.IsRoutable() || GetRand((GetnScore(addrLocal) > LOCAL_MANUAL) ? 8:2) == 0)) { - addrLocal.SetIP(pnode->addrLocal); + addrLocal.SetIP(pnode->GetAddrLocal()); } if (addrLocal.IsRoutable()) { - pnode->PushAddress(addrLocal); + LogPrint(BCLog::NET, "AdvertiseLocal: advertising address %s\n", addrLocal.ToString()); + FastRandomContext insecure_rand; + pnode->PushAddress(addrLocal, insecure_rand); } } } -void SetReachable(enum Network net, bool fFlag) -{ - LOCK(cs_mapLocalHost); - vfReachable[net] = fFlag; - if (net == NET_IPV6 && fFlag) - vfReachable[NET_IPV4] = true; -} - // learn a new local address bool AddLocal(const CService& addr, int nScore) { @@ -274,7 +220,6 @@ bool AddLocal(const CService& addr, int nScore) info.nScore = nScore + (fAlready ? 1 : 0); info.nPort = addr.GetPort(); } - SetReachable(addr.GetNetwork()); } return true; @@ -285,10 +230,18 @@ bool AddLocal(const CNetAddr &addr, int nScore) return AddLocal(CService(addr, GetListenPort()), nScore); } +bool RemoveLocal(const CService& addr) +{ + LOCK(cs_mapLocalHost); + LogPrintf("RemoveLocal(%s)\n", addr.ToString()); + mapLocalHost.erase(addr); + return true; +} + /** Make a particular network entirely off-limits (no automatic connects to it) */ void SetLimited(enum Network net, bool fLimited) { - if (net == NET_UNROUTABLE) + if (net == NET_UNROUTABLE || net == NET_INTERNAL) return; LOCK(cs_mapLocalHost); vfLimited[net] = fLimited; @@ -329,7 +282,7 @@ bool IsLocal(const CService& addr) bool IsReachable(enum Network net) { LOCK(cs_mapLocalHost); - return vfReachable[net] && !vfLimited[net]; + return !vfLimited[net]; } /** check whether a given address is in a network we can probably connect to */ @@ -339,179 +292,387 @@ bool IsReachable(const CNetAddr& addr) return IsReachable(net); } -void AddressCurrentlyConnected(const CService& addr) + +CNode* CConnman::FindNode(const CNetAddr& ip) { - addrman.Connected(addr); + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { + if (static_cast<CNetAddr>(pnode->addr) == ip) { + return pnode; + } + } + return nullptr; } +CNode* CConnman::FindNode(const CSubNet& subNet) +{ + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { + if (subNet.Match(static_cast<CNetAddr>(pnode->addr))) { + return pnode; + } + } + return nullptr; +} -uint64_t CNode::nTotalBytesRecv = 0; -uint64_t CNode::nTotalBytesSent = 0; -CCriticalSection CNode::cs_totalBytesRecv; -CCriticalSection CNode::cs_totalBytesSent; - -CNode* FindNode(const CNetAddr& ip) +CNode* CConnman::FindNode(const std::string& addrName) { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - if ((CNetAddr)pnode->addr == ip) - return (pnode); - return NULL; + for (CNode* pnode : vNodes) { + if (pnode->GetAddrName() == addrName) { + return pnode; + } + } + return nullptr; } -CNode* FindNode(const std::string& addrName) +CNode* CConnman::FindNode(const CService& addr) { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->addrName == addrName) - return (pnode); - return NULL; + for (CNode* pnode : vNodes) { + if (static_cast<CService>(pnode->addr) == addr) { + return pnode; + } + } + return nullptr; } -CNode* FindNode(const CService& addr) +bool CConnman::CheckIncomingNonce(uint64_t nonce) { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - if ((CService)pnode->addr == addr) - return (pnode); - return NULL; + for (CNode* pnode : vNodes) { + if (!pnode->fSuccessfullyConnected && !pnode->fInbound && pnode->GetLocalNonce() == nonce) + return false; + } + return true; } -CNode* ConnectNode(CAddress addrConnect, const char *pszDest) +/** Get the bind address for a socket as CAddress */ +static CAddress GetBindAddress(SOCKET sock) { - if (pszDest == NULL) { + CAddress addr_bind; + struct sockaddr_storage sockaddr_bind; + socklen_t sockaddr_bind_len = sizeof(sockaddr_bind); + if (sock != INVALID_SOCKET) { + if (!getsockname(sock, (struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) { + addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind); + } else { + LogPrint(BCLog::NET, "Warning: getsockname failed\n"); + } + } + return addr_bind; +} + +CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure) +{ + if (pszDest == nullptr) { if (IsLocal(addrConnect)) - return NULL; + return nullptr; // Look for an existing connection - CNode* pnode = FindNode((CService)addrConnect); + CNode* pnode = FindNode(static_cast<CService>(addrConnect)); if (pnode) { - pnode->AddRef(); - return pnode; + LogPrintf("Failed to open new connection, already connected\n"); + return nullptr; } } /// debug print - LogPrint("net", "trying connection %s lastseen=%.1fhrs\n", + LogPrint(BCLog::NET, "trying connection %s lastseen=%.1fhrs\n", pszDest ? pszDest : addrConnect.ToString(), pszDest ? 0.0 : (double)(GetAdjustedTime() - addrConnect.nTime)/3600.0); + // Resolve + const int default_port = Params().GetDefaultPort(); + if (pszDest) { + std::vector<CService> resolved; + if (Lookup(pszDest, resolved, default_port, fNameLookup && !HaveNameProxy(), 256) && !resolved.empty()) { + addrConnect = CAddress(resolved[GetRand(resolved.size())], NODE_NONE); + if (!addrConnect.IsValid()) { + LogPrint(BCLog::NET, "Resolver returned invalid address %s for %s", addrConnect.ToString(), pszDest); + return nullptr; + } + // It is possible that we already have a connection to the IP/port pszDest resolved to. + // In that case, drop the connection that was just created, and return the existing CNode instead. + // Also store the name we used to connect in that CNode, so that future FindNode() calls to that + // name catch this early. + LOCK(cs_vNodes); + CNode* pnode = FindNode(static_cast<CService>(addrConnect)); + if (pnode) + { + pnode->MaybeSetAddrName(std::string(pszDest)); + LogPrintf("Failed to open new connection, already connected\n"); + return nullptr; + } + } + } + // Connect - SOCKET hSocket; - bool proxyConnectionFailed = false; - if (pszDest ? ConnectSocketByName(addrConnect, hSocket, pszDest, Params().GetDefaultPort(), nConnectTimeout, &proxyConnectionFailed) : - ConnectSocket(addrConnect, hSocket, nConnectTimeout, &proxyConnectionFailed)) - { - addrman.Attempt(addrConnect); + bool connected = false; + SOCKET hSocket = INVALID_SOCKET; + proxyType proxy; + if (addrConnect.IsValid()) { + bool proxyConnectionFailed = false; + + if (GetProxy(addrConnect.GetNetwork(), proxy)) { + hSocket = CreateSocket(proxy.proxy); + if (hSocket == INVALID_SOCKET) { + return nullptr; + } + connected = ConnectThroughProxy(proxy, addrConnect.ToStringIP(), addrConnect.GetPort(), hSocket, nConnectTimeout, &proxyConnectionFailed); + } else { + // no proxy needed (none set for target network) + hSocket = CreateSocket(addrConnect); + if (hSocket == INVALID_SOCKET) { + return nullptr; + } + connected = ConnectSocketDirectly(addrConnect, hSocket, nConnectTimeout); + } + if (!proxyConnectionFailed) { + // If a connection to the node was attempted, and failure (if any) is not caused by a problem connecting to + // the proxy, mark this as an attempt. + addrman.Attempt(addrConnect, fCountFailure); + } + } else if (pszDest && GetNameProxy(proxy)) { + hSocket = CreateSocket(proxy.proxy); + if (hSocket == INVALID_SOCKET) { + return nullptr; + } + std::string host; + int port = default_port; + SplitHostPort(std::string(pszDest), port, host); + connected = ConnectThroughProxy(proxy, host, port, hSocket, nConnectTimeout, nullptr); + } + if (!connected) { + CloseSocket(hSocket); + return nullptr; + } - // Add node - CNode* pnode = new CNode(hSocket, addrConnect, pszDest ? pszDest : "", false); - pnode->AddRef(); + // Add node + NodeId id = GetNewNodeId(); + uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); + CAddress addr_bind = GetBindAddress(hSocket); + CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, addr_bind, pszDest ? pszDest : "", false); + pnode->AddRef(); - { - LOCK(cs_vNodes); - vNodes.push_back(pnode); - } + return pnode; +} + +void CConnman::DumpBanlist() +{ + SweepBanned(); // clean unused entries (if bantime has expired) + + if (!BannedSetIsDirty()) + return; - pnode->nTimeConnected = GetTime(); + int64_t nStart = GetTimeMillis(); - return pnode; - } else if (!proxyConnectionFailed) { - // If connecting to the node failed, and failure is not caused by a problem connecting to - // the proxy, mark this as an attempt. - addrman.Attempt(addrConnect); + CBanDB bandb; + banmap_t banmap; + GetBanned(banmap); + if (bandb.Write(banmap)) { + SetBannedSetDirty(false); } - return NULL; + LogPrint(BCLog::NET, "Flushed %d banned node ips/subnets to banlist.dat %dms\n", + banmap.size(), GetTimeMillis() - nStart); } void CNode::CloseSocketDisconnect() { fDisconnect = true; + LOCK(cs_hSocket); if (hSocket != INVALID_SOCKET) { - LogPrint("net", "disconnecting peer=%d\n", id); + LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); CloseSocket(hSocket); } +} - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); +void CConnman::ClearBanned() +{ + { + LOCK(cs_setBanned); + setBanned.clear(); + setBannedIsDirty = true; + } + DumpBanlist(); //store banlist to disk + if(clientInterface) + clientInterface->BannedListChanged(); } -void CNode::PushVersion() +bool CConnman::IsBanned(CNetAddr ip) { - int nBestHeight = g_signals.GetHeight().get_value_or(0); + LOCK(cs_setBanned); + for (const auto& it : setBanned) { + CSubNet subNet = it.first; + CBanEntry banEntry = it.second; - /// when NTP implemented, change to just nTime = GetAdjustedTime() - int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime()); - CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService("0.0.0.0",0))); - CAddress addrMe = GetLocalAddress(&addr); - GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce)); - if (fLogIPs) - LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nBestHeight, addrMe.ToString(), addrYou.ToString(), id); - else - LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nBestHeight, addrMe.ToString(), id); - PushMessage("version", PROTOCOL_VERSION, nLocalServices, nTime, addrYou, addrMe, - nLocalHostNonce, FormatSubVersion(CLIENT_NAME, CLIENT_VERSION, std::vector<string>()), nBestHeight, true); + if (subNet.Match(ip) && GetTime() < banEntry.nBanUntil) { + return true; + } + } + return false; +} + +bool CConnman::IsBanned(CSubNet subnet) +{ + LOCK(cs_setBanned); + banmap_t::iterator i = setBanned.find(subnet); + if (i != setBanned.end()) + { + CBanEntry banEntry = (*i).second; + if (GetTime() < banEntry.nBanUntil) { + return true; + } + } + return false; } +void CConnman::Ban(const CNetAddr& addr, const BanReason &banReason, int64_t bantimeoffset, bool sinceUnixEpoch) { + CSubNet subNet(addr); + Ban(subNet, banReason, bantimeoffset, sinceUnixEpoch); +} +void CConnman::Ban(const CSubNet& subNet, const BanReason &banReason, int64_t bantimeoffset, bool sinceUnixEpoch) { + CBanEntry banEntry(GetTime()); + banEntry.banReason = banReason; + if (bantimeoffset <= 0) + { + bantimeoffset = gArgs.GetArg("-bantime", DEFAULT_MISBEHAVING_BANTIME); + sinceUnixEpoch = false; + } + banEntry.nBanUntil = (sinceUnixEpoch ? 0 : GetTime() )+bantimeoffset; + { + LOCK(cs_setBanned); + if (setBanned[subNet].nBanUntil < banEntry.nBanUntil) { + setBanned[subNet] = banEntry; + setBannedIsDirty = true; + } + else + return; + } + if(clientInterface) + clientInterface->BannedListChanged(); + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { + if (subNet.Match(static_cast<CNetAddr>(pnode->addr))) + pnode->fDisconnect = true; + } + } + if(banReason == BanReasonManuallyAdded) + DumpBanlist(); //store banlist to disk immediately if user requested ban +} + +bool CConnman::Unban(const CNetAddr &addr) { + CSubNet subNet(addr); + return Unban(subNet); +} +bool CConnman::Unban(const CSubNet &subNet) { + { + LOCK(cs_setBanned); + if (!setBanned.erase(subNet)) + return false; + setBannedIsDirty = true; + } + if(clientInterface) + clientInterface->BannedListChanged(); + DumpBanlist(); //store banlist to disk immediately + return true; +} -std::map<CNetAddr, int64_t> CNode::setBanned; -CCriticalSection CNode::cs_setBanned; +void CConnman::GetBanned(banmap_t &banMap) +{ + LOCK(cs_setBanned); + // Sweep the banlist so expired bans are not returned + SweepBanned(); + banMap = setBanned; //create a thread safe copy +} -void CNode::ClearBanned() +void CConnman::SetBanned(const banmap_t &banMap) { - setBanned.clear(); + LOCK(cs_setBanned); + setBanned = banMap; + setBannedIsDirty = true; } -bool CNode::IsBanned(CNetAddr ip) +void CConnman::SweepBanned() { - bool fResult = false; + int64_t now = GetTime(); + bool notifyUI = false; { LOCK(cs_setBanned); - std::map<CNetAddr, int64_t>::iterator i = setBanned.find(ip); - if (i != setBanned.end()) + banmap_t::iterator it = setBanned.begin(); + while(it != setBanned.end()) { - int64_t t = (*i).second; - if (GetTime() < t) - fResult = true; + CSubNet subNet = (*it).first; + CBanEntry banEntry = (*it).second; + if(now > banEntry.nBanUntil) + { + setBanned.erase(it++); + setBannedIsDirty = true; + notifyUI = true; + LogPrint(BCLog::NET, "%s: Removed banned node ip/subnet from banlist.dat: %s\n", __func__, subNet.ToString()); + } + else + ++it; } } - return fResult; + // update UI + if(notifyUI && clientInterface) { + clientInterface->BannedListChanged(); + } } -bool CNode::Ban(const CNetAddr &addr) { - int64_t banTime = GetTime()+GetArg("-bantime", 60*60*24); // Default 24-hour ban - { - LOCK(cs_setBanned); - if (setBanned[addr] < banTime) - setBanned[addr] = banTime; - } - return true; +bool CConnman::BannedSetIsDirty() +{ + LOCK(cs_setBanned); + return setBannedIsDirty; } +void CConnman::SetBannedSetDirty(bool dirty) +{ + LOCK(cs_setBanned); //reuse setBanned lock for the isDirty flag + setBannedIsDirty = dirty; +} -std::vector<CSubNet> CNode::vWhitelistedRange; -CCriticalSection CNode::cs_vWhitelistedRange; -bool CNode::IsWhitelistedRange(const CNetAddr &addr) { - LOCK(cs_vWhitelistedRange); - BOOST_FOREACH(const CSubNet& subnet, vWhitelistedRange) { +bool CConnman::IsWhitelistedRange(const CNetAddr &addr) { + for (const CSubNet& subnet : vWhitelistedRange) { if (subnet.Match(addr)) return true; } return false; } -void CNode::AddWhitelistedRange(const CSubNet &subnet) { - LOCK(cs_vWhitelistedRange); - vWhitelistedRange.push_back(subnet); +std::string CNode::GetAddrName() const { + LOCK(cs_addrName); + return addrName; +} + +void CNode::MaybeSetAddrName(const std::string& addrNameIn) { + LOCK(cs_addrName); + if (addrName.empty()) { + addrName = addrNameIn; + } +} + +CService CNode::GetAddrLocal() const { + LOCK(cs_addrLocal); + return addrLocal; +} + +void CNode::SetAddrLocal(const CService& addrLocalIn) { + LOCK(cs_addrLocal); + if (addrLocal.IsValid()) { + error("Addr local already set for node: %i. Refusing to change from %s to %s", id, addrLocal.ToString(), addrLocalIn.ToString()); + } else { + addrLocal = addrLocalIn; + } } #undef X @@ -520,16 +681,35 @@ void CNode::copyStats(CNodeStats &stats) { stats.nodeid = this->GetId(); X(nServices); + X(addr); + X(addrBind); + { + LOCK(cs_filter); + X(fRelayTxes); + } X(nLastSend); X(nLastRecv); X(nTimeConnected); - X(addrName); + X(nTimeOffset); + stats.addrName = GetAddrName(); X(nVersion); - X(cleanSubVer); + { + LOCK(cs_SubVer); + X(cleanSubVer); + } X(fInbound); + X(m_manual_connection); X(nStartingHeight); - X(nSendBytes); - X(nRecvBytes); + { + LOCK(cs_vSend); + X(mapSendBytesPerMsgCmd); + X(nSendBytes); + } + { + LOCK(cs_vRecv); + X(mapRecvBytesPerMsgCmd); + X(nRecvBytes); + } X(fWhitelisted); // It is common for nodes with good ping times to suddenly become lagged, @@ -545,22 +725,28 @@ void CNode::copyStats(CNodeStats &stats) // Raw ping time is in microseconds, but show it to user as whole seconds (Bitcoin users should be well used to small numbers with many decimal places by now :) stats.dPingTime = (((double)nPingUsecTime) / 1e6); + stats.dMinPing = (((double)nMinPingUsecTime) / 1e6); stats.dPingWait = (((double)nPingUsecWait) / 1e6); // Leave string empty if addrLocal invalid (not filled in yet) - stats.addrLocal = addrLocal.IsValid() ? addrLocal.ToString() : ""; + CService addrLocalUnlocked = GetAddrLocal(); + stats.addrLocal = addrLocalUnlocked.IsValid() ? addrLocalUnlocked.ToString() : ""; } #undef X -// requires LOCK(cs_vRecvMsg) -bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) +bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { + complete = false; + int64_t nTimeMicros = GetTimeMicros(); + LOCK(cs_vRecv); + nLastRecv = nTimeMicros / 1000000; + nRecvBytes += nBytes; while (nBytes > 0) { // get current incomplete message, or create a new one if (vRecvMsg.empty() || vRecvMsg.back().complete()) - vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); + vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION)); CNetMessage& msg = vRecvMsg.back(); @@ -572,18 +758,61 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) handled = msg.readData(pch, nBytes); if (handled < 0) - return false; + return false; + + if (msg.in_data && msg.hdr.nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) { + LogPrint(BCLog::NET, "Oversized message from peer=%i, disconnecting\n", GetId()); + return false; + } pch += handled; nBytes -= handled; - if (msg.complete()) - msg.nTime = GetTimeMicros(); + if (msg.complete()) { + + //store received bytes per message command + //to prevent a memory DOS, only allow valid commands + mapMsgCmdSize::iterator i = mapRecvBytesPerMsgCmd.find(msg.hdr.pchCommand); + if (i == mapRecvBytesPerMsgCmd.end()) + i = mapRecvBytesPerMsgCmd.find(NET_MESSAGE_COMMAND_OTHER); + assert(i != mapRecvBytesPerMsgCmd.end()); + i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; + + msg.nTime = nTimeMicros; + complete = true; + } } return true; } +void CNode::SetSendVersion(int nVersionIn) +{ + // Send version may only be changed in the version message, and + // only one version message is allowed per session. We can therefore + // treat this value as const and even atomic as long as it's only used + // once a version message has been successfully processed. Any attempt to + // set this twice is an error. + if (nSendVersion != 0) { + error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn); + } else { + nSendVersion = nVersionIn; + } +} + +int CNode::GetSendVersion() const +{ + // The send version should always be explicitly set to + // INIT_PROTO_VERSION rather than using this value until SetSendVersion + // has been called. + if (nSendVersion == 0) { + error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION); + return INIT_PROTO_VERSION; + } + return nSendVersion; +} + + int CNetMessage::readHeader(const char *pch, unsigned int nBytes) { // copy data to temporary parsing buffer @@ -601,13 +830,13 @@ int CNetMessage::readHeader(const char *pch, unsigned int nBytes) try { hdrbuf >> hdr; } - catch (const std::exception &) { + catch (const std::exception&) { return -1; } // reject messages larger than MAX_SIZE if (hdr.nMessageSize > MAX_SIZE) - return -1; + return -1; // switch state to reading message data in_data = true; @@ -625,12 +854,21 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024)); } + hasher.Write((const unsigned char*)pch, nCopy); memcpy(&vRecv[nDataPos], pch, nCopy); nDataPos += nCopy; return nCopy; } +const uint256& CNetMessage::GetMessageHash() const +{ + assert(complete()); + if (data_hash.IsNull()) + hasher.Finalize(data_hash.begin()); + return data_hash; +} + @@ -640,22 +878,30 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) // requires LOCK(cs_vSend) -void SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) const { - std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin(); + auto it = pnode->vSendMsg.begin(); + size_t nSentSize = 0; while (it != pnode->vSendMsg.end()) { - const CSerializeData &data = *it; + const auto &data = *it; assert(data.size() > pnode->nSendOffset); - int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + int nBytes = 0; + { + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + break; + nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + } if (nBytes > 0) { - pnode->nLastSend = GetTime(); + pnode->nLastSend = GetSystemTimeInSeconds(); pnode->nSendBytes += nBytes; pnode->nSendOffset += nBytes; - pnode->RecordBytesSent(nBytes); + nSentSize += nBytes; if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -681,14 +927,231 @@ void SocketSendData(CNode *pnode) assert(pnode->nSendSize == 0); } pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); + return nSentSize; +} + +struct NodeEvictionCandidate +{ + NodeId id; + int64_t nTimeConnected; + int64_t nMinPingUsecTime; + int64_t nLastBlockTime; + int64_t nLastTXTime; + bool fRelevantServices; + bool fRelayTxes; + bool fBloomFilter; + CAddress addr; + uint64_t nKeyedNetGroup; +}; + +static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) +{ + return a.nMinPingUsecTime > b.nMinPingUsecTime; +} + +static bool ReverseCompareNodeTimeConnected(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) +{ + return a.nTimeConnected > b.nTimeConnected; } -static list<CNode*> vNodesDisconnected; +static bool CompareNetGroupKeyed(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) { + return a.nKeyedNetGroup < b.nKeyedNetGroup; +} -void ThreadSocketHandler() +static bool CompareNodeBlockTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) +{ + // There is a fall-through here because it is common for a node to have many peers which have not yet relayed a block. + if (a.nLastBlockTime != b.nLastBlockTime) return a.nLastBlockTime < b.nLastBlockTime; + if (a.fRelevantServices != b.fRelevantServices) return b.fRelevantServices; + return a.nTimeConnected > b.nTimeConnected; +} + +static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) +{ + // There is a fall-through here because it is common for a node to have more than a few peers that have not yet relayed txn. + if (a.nLastTXTime != b.nLastTXTime) return a.nLastTXTime < b.nLastTXTime; + if (a.fRelayTxes != b.fRelayTxes) return b.fRelayTxes; + if (a.fBloomFilter != b.fBloomFilter) return a.fBloomFilter; + return a.nTimeConnected > b.nTimeConnected; +} + + +//! Sort an array by the specified comparator, then erase the last K elements. +template<typename T, typename Comparator> +static void EraseLastKElements(std::vector<T> &elements, Comparator comparator, size_t k) +{ + std::sort(elements.begin(), elements.end(), comparator); + size_t eraseSize = std::min(k, elements.size()); + elements.erase(elements.end() - eraseSize, elements.end()); +} + +/** Try to find a connection to evict when the node is full. + * Extreme care must be taken to avoid opening the node to attacker + * triggered network partitioning. + * The strategy used here is to protect a small number of peers + * for each of several distinct characteristics which are difficult + * to forge. In order to partition a node the attacker must be + * simultaneously better at all of them than honest peers. + */ +bool CConnman::AttemptToEvictConnection() +{ + std::vector<NodeEvictionCandidate> vEvictionCandidates; + { + LOCK(cs_vNodes); + + for (const CNode* node : vNodes) { + if (node->fWhitelisted) + continue; + if (!node->fInbound) + continue; + if (node->fDisconnect) + continue; + NodeEvictionCandidate candidate = {node->GetId(), node->nTimeConnected, node->nMinPingUsecTime, + node->nLastBlockTime, node->nLastTXTime, + HasAllDesirableServiceFlags(node->nServices), + node->fRelayTxes, node->pfilter != nullptr, node->addr, node->nKeyedNetGroup}; + vEvictionCandidates.push_back(candidate); + } + } + + // Protect connections with certain characteristics + + // Deterministically select 4 peers to protect by netgroup. + // An attacker cannot predict which netgroups will be protected + EraseLastKElements(vEvictionCandidates, CompareNetGroupKeyed, 4); + // Protect the 8 nodes with the lowest minimum ping time. + // An attacker cannot manipulate this metric without physically moving nodes closer to the target. + EraseLastKElements(vEvictionCandidates, ReverseCompareNodeMinPingTime, 8); + // Protect 4 nodes that most recently sent us transactions. + // An attacker cannot manipulate this metric without performing useful work. + EraseLastKElements(vEvictionCandidates, CompareNodeTXTime, 4); + // Protect 4 nodes that most recently sent us blocks. + // An attacker cannot manipulate this metric without performing useful work. + EraseLastKElements(vEvictionCandidates, CompareNodeBlockTime, 4); + // Protect the half of the remaining nodes which have been connected the longest. + // This replicates the non-eviction implicit behavior, and precludes attacks that start later. + EraseLastKElements(vEvictionCandidates, ReverseCompareNodeTimeConnected, vEvictionCandidates.size() / 2); + + if (vEvictionCandidates.empty()) return false; + + // Identify the network group with the most connections and youngest member. + // (vEvictionCandidates is already sorted by reverse connect time) + uint64_t naMostConnections; + unsigned int nMostConnections = 0; + int64_t nMostConnectionsTime = 0; + std::map<uint64_t, std::vector<NodeEvictionCandidate> > mapNetGroupNodes; + for (const NodeEvictionCandidate &node : vEvictionCandidates) { + std::vector<NodeEvictionCandidate> &group = mapNetGroupNodes[node.nKeyedNetGroup]; + group.push_back(node); + int64_t grouptime = group[0].nTimeConnected; + + if (group.size() > nMostConnections || (group.size() == nMostConnections && grouptime > nMostConnectionsTime)) { + nMostConnections = group.size(); + nMostConnectionsTime = grouptime; + naMostConnections = node.nKeyedNetGroup; + } + } + + // Reduce to the network group with the most connections + vEvictionCandidates = std::move(mapNetGroupNodes[naMostConnections]); + + // Disconnect from the network group with the most connections + NodeId evicted = vEvictionCandidates.front().id; + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { + if (pnode->GetId() == evicted) { + pnode->fDisconnect = true; + return true; + } + } + return false; +} + +void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { + struct sockaddr_storage sockaddr; + socklen_t len = sizeof(sockaddr); + SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len); + CAddress addr; + int nInbound = 0; + int nMaxInbound = nMaxConnections - (nMaxOutbound + nMaxFeeler); + + if (hSocket != INVALID_SOCKET) { + if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) { + LogPrintf("Warning: Unknown socket family\n"); + } + } + + bool whitelisted = hListenSocket.whitelisted || IsWhitelistedRange(addr); + { + LOCK(cs_vNodes); + for (const CNode* pnode : vNodes) { + if (pnode->fInbound) nInbound++; + } + } + + if (hSocket == INVALID_SOCKET) + { + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK) + LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr)); + return; + } + + if (!fNetworkActive) { + LogPrintf("connection from %s dropped: not accepting new connections\n", addr.ToString()); + CloseSocket(hSocket); + return; + } + + if (!IsSelectableSocket(hSocket)) + { + LogPrintf("connection from %s dropped: non-selectable socket\n", addr.ToString()); + CloseSocket(hSocket); + return; + } + + // According to the internet TCP_NODELAY is not carried into accepted sockets + // on all platforms. Set it again here just to be sure. + SetSocketNoDelay(hSocket); + + if (IsBanned(addr) && !whitelisted) + { + LogPrint(BCLog::NET, "connection from %s dropped (banned)\n", addr.ToString()); + CloseSocket(hSocket); + return; + } + + if (nInbound >= nMaxInbound) + { + if (!AttemptToEvictConnection()) { + // No connection to evict, disconnect the new connection + LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n"); + CloseSocket(hSocket); + return; + } + } + + NodeId id = GetNewNodeId(); + uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); + CAddress addr_bind = GetBindAddress(hSocket); + + CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true); + pnode->AddRef(); + pnode->fWhitelisted = whitelisted; + m_msgproc->InitializeNode(pnode); + + LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); + + { + LOCK(cs_vNodes); + vNodes.push_back(pnode); + } +} + +void CConnman::ThreadSocketHandler() { unsigned int nPrevNodeCount = 0; - while (true) + while (!interruptNet) { // // Disconnect nodes @@ -696,11 +1159,10 @@ void ThreadSocketHandler() { LOCK(cs_vNodes); // Disconnect unused nodes - vector<CNode*> vNodesCopy = vNodes; - BOOST_FOREACH(CNode* pnode, vNodesCopy) + std::vector<CNode*> vNodesCopy = vNodes; + for (CNode* pnode : vNodesCopy) { - if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) + if (pnode->fDisconnect) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -712,45 +1174,44 @@ void ThreadSocketHandler() pnode->CloseSocketDisconnect(); // hold in disconnected pool until all refs are released - if (pnode->fNetworkNode || pnode->fInbound) - pnode->Release(); + pnode->Release(); vNodesDisconnected.push_back(pnode); } } } { // Delete disconnected nodes - list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected; - BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy) + std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected; + for (CNode* pnode : vNodesDisconnectedCopy) { // wait until threads are done using it - if (pnode->GetRefCount() <= 0) - { + if (pnode->GetRefCount() <= 0) { bool fDelete = false; { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - TRY_LOCK(pnode->cs_inventory, lockInv); - if (lockInv) - fDelete = true; + TRY_LOCK(pnode->cs_inventory, lockInv); + if (lockInv) { + TRY_LOCK(pnode->cs_vSend, lockSend); + if (lockSend) { + fDelete = true; } } } - if (fDelete) - { + if (fDelete) { vNodesDisconnected.remove(pnode); - delete pnode; + DeleteNode(pnode); } } } } - if(vNodes.size() != nPrevNodeCount) { - nPrevNodeCount = vNodes.size(); - uiInterface.NotifyNumConnectionsChanged(nPrevNodeCount); + size_t vNodesSize; + { + LOCK(cs_vNodes); + vNodesSize = vNodes.size(); + } + if(vNodesSize != nPrevNodeCount) { + nPrevNodeCount = vNodesSize; + if(clientInterface) + clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount); } // @@ -769,57 +1230,56 @@ void ThreadSocketHandler() SOCKET hSocketMax = 0; bool have_fds = false; - BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) { + for (const ListenSocket& hListenSocket : vhListenSocket) { FD_SET(hListenSocket.socket, &fdsetRecv); - hSocketMax = max(hSocketMax, hListenSocket.socket); + hSocketMax = std::max(hSocketMax, hListenSocket.socket); have_fds = true; } { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) + for (CNode* pnode : vNodes) { - if (pnode->hSocket == INVALID_SOCKET) - continue; - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = max(hSocketMax, pnode->hSocket); - have_fds = true; - // Implement the following logic: // * If there is data to send, select() for sending data. As this only // happens when optimistic write failed, we choose to first drain the // write buffer in this case before receiving more. This avoids // needlessly queueing received data, if the remote peer is not themselves // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is no (complete) message in the receive buffer, - // or there is space left in the buffer, select() for receiving data. - // * (if neither of the above applies, there is certainly one message - // in the receiver buffer ready to be processed). - // Together, that means that at least one of the following is always possible, - // so we don't deadlock: - // * We send some data. - // * We wait for data to be received (and disconnect after timeout). - // * We process a message in the buffer (message handler thread). + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. + + bool select_recv = !pnode->fPauseRecv; + bool select_send; { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend && !pnode->vSendMsg.empty()) { - FD_SET(pnode->hSocket, &fdsetSend); - continue; - } + LOCK(pnode->cs_vSend); + select_send = !pnode->vSendMsg.empty(); } - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= ReceiveFloodSize())) - FD_SET(pnode->hSocket, &fdsetRecv); + + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = std::max(hSocketMax, pnode->hSocket); + have_fds = true; + + if (select_send) { + FD_SET(pnode->hSocket, &fdsetSend); + continue; + } + if (select_recv) { + FD_SET(pnode->hSocket, &fdsetRecv); } } } int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - boost::this_thread::interruption_point(); + if (interruptNet) + return; if (nSelect == SOCKET_ERROR) { @@ -832,117 +1292,101 @@ void ThreadSocketHandler() } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); - MilliSleep(timeout.tv_usec/1000); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; } // // Accept new connections // - BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) + for (const ListenSocket& hListenSocket : vhListenSocket) { if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) { - struct sockaddr_storage sockaddr; - socklen_t len = sizeof(sockaddr); - SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len); - CAddress addr; - int nInbound = 0; - - if (hSocket != INVALID_SOCKET) - if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) - LogPrintf("Warning: Unknown socket family\n"); - - bool whitelisted = hListenSocket.whitelisted || CNode::IsWhitelistedRange(addr); - { - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->fInbound) - nInbound++; - } - - if (hSocket == INVALID_SOCKET) - { - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK) - LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr)); - } - else if (nInbound >= nMaxConnections - MAX_OUTBOUND_CONNECTIONS) - { - CloseSocket(hSocket); - } - else if (CNode::IsBanned(addr) && !whitelisted) - { - LogPrintf("connection from %s dropped (banned)\n", addr.ToString()); - CloseSocket(hSocket); - } - else - { - CNode* pnode = new CNode(hSocket, addr, "", true); - pnode->AddRef(); - pnode->fWhitelisted = whitelisted; - - { - LOCK(cs_vNodes); - vNodes.push_back(pnode); - } - } + AcceptConnection(hListenSocket); } } // // Service each socket // - vector<CNode*> vNodesCopy; + std::vector<CNode*> vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; - BOOST_FOREACH(CNode* pnode, vNodesCopy) + for (CNode* pnode : vNodesCopy) pnode->AddRef(); } - BOOST_FOREACH(CNode* pnode, vNodesCopy) + for (CNode* pnode : vNodesCopy) { - boost::this_thread::interruption_point(); + if (interruptNet) + return; // // Receive // - if (pnode->hSocket == INVALID_SOCKET) - continue; - if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) + bool recvSet = false; + bool sendSet = false; + bool errorSet = false; { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); + sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); + errorSet = FD_ISSET(pnode->hSocket, &fdsetError); + } + if (recvSet || errorSet) + { + // typical socket buffer is 8K-64K + char pchBuf[0x10000]; + int nBytes = 0; { - { - // typical socket buffer is 8K-64K - char pchBuf[0x10000]; - int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); - if (nBytes > 0) - { - if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) - pnode->CloseSocketDisconnect(); - pnode->nLastRecv = GetTime(); - pnode->nRecvBytes += nBytes; - pnode->RecordBytesRecv(nBytes); + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); + } + if (nBytes > 0) + { + bool notify = false; + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) + pnode->CloseSocketDisconnect(); + RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } - else if (nBytes == 0) { - // socket closed gracefully - if (!pnode->fDisconnect) - LogPrint("net", "socket closed\n"); - pnode->CloseSocketDisconnect(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - if (!pnode->fDisconnect) - LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); - pnode->CloseSocketDisconnect(); - } + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } + WakeMessageHandler(); + } + } + else if (nBytes == 0) + { + // socket closed gracefully + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "socket closed\n"); + } + pnode->CloseSocketDisconnect(); + } + else if (nBytes < 0) + { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + if (!pnode->fDisconnect) + LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); + pnode->CloseSocketDisconnect(); } } } @@ -950,24 +1394,24 @@ void ThreadSocketHandler() // // Send // - if (pnode->hSocket == INVALID_SOCKET) - continue; - if (FD_ISSET(pnode->hSocket, &fdsetSend)) + if (sendSet) { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) - SocketSendData(pnode); + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); + } } // // Inactivity checking // - int64_t nTime = GetTime(); + int64_t nTime = GetSystemTimeInSeconds(); if (nTime - pnode->nTimeConnected > 60) { if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) { - LogPrint("net", "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->id); + LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId()); pnode->fDisconnect = true; } else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) @@ -985,18 +1429,29 @@ void ThreadSocketHandler() LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart)); pnode->fDisconnect = true; } + else if (!pnode->fSuccessfullyConnected) + { + LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId()); + pnode->fDisconnect = true; + } } } { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodesCopy) + for (CNode* pnode : vNodesCopy) pnode->Release(); } } } - - +void CConnman::WakeMessageHandler() +{ + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + fMsgProcWake = true; + } + condMsgProc.notify_one(); +} @@ -1004,21 +1459,27 @@ void ThreadSocketHandler() #ifdef USE_UPNP +static CThreadInterrupt g_upnp_interrupt; +static std::thread g_upnp_thread; void ThreadMapPort() { std::string port = strprintf("%u", GetListenPort()); - const char * multicastif = 0; - const char * minissdpdpath = 0; - struct UPNPDev * devlist = 0; + const char * multicastif = nullptr; + const char * minissdpdpath = nullptr; + struct UPNPDev * devlist = nullptr; char lanaddr[64]; #ifndef UPNPDISCOVER_SUCCESS /* miniupnpc 1.5 */ devlist = upnpDiscover(2000, multicastif, minissdpdpath, 0); -#else +#elif MINIUPNPC_API_VERSION < 14 /* miniupnpc 1.6 */ int error = 0; devlist = upnpDiscover(2000, multicastif, minissdpdpath, 0, 0, &error); +#else + /* miniupnpc 1.9.20150730 */ + int error = 0; + devlist = upnpDiscover(2000, multicastif, minissdpdpath, 0, 0, 2, &error); #endif struct UPNPUrls urls; @@ -1037,76 +1498,83 @@ void ThreadMapPort() { if(externalIPAddress[0]) { - LogPrintf("UPnP: ExternalIPAddress = %s\n", externalIPAddress); - AddLocal(CNetAddr(externalIPAddress), LOCAL_UPNP); + CNetAddr resolved; + if(LookupHost(externalIPAddress, resolved, false)) { + LogPrintf("UPnP: ExternalIPAddress = %s\n", resolved.ToString().c_str()); + AddLocal(resolved, LOCAL_UPNP); + } } else LogPrintf("UPnP: GetExternalIPAddress failed.\n"); } } - string strDesc = "Bitcoin " + FormatFullVersion(); + std::string strDesc = "Bitcoin " + FormatFullVersion(); - try { - while (true) { + do { #ifndef UPNPDISCOVER_SUCCESS - /* miniupnpc 1.5 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0); + /* miniupnpc 1.5 */ + r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, + port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0); #else - /* miniupnpc 1.6 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0"); + /* miniupnpc 1.6 */ + r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, + port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0"); #endif - if(r!=UPNPCOMMAND_SUCCESS) - LogPrintf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n", - port, port, lanaddr, r, strupnperror(r)); - else - LogPrintf("UPnP Port Mapping successful.\n");; - - MilliSleep(20*60*1000); // Refresh every 20 minutes - } - } - catch (boost::thread_interrupted) - { - r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); - LogPrintf("UPNP_DeletePortMapping() returned : %d\n", r); - freeUPNPDevlist(devlist); devlist = 0; - FreeUPNPUrls(&urls); - throw; + if(r!=UPNPCOMMAND_SUCCESS) + LogPrintf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n", + port, port, lanaddr, r, strupnperror(r)); + else + LogPrintf("UPnP Port Mapping successful.\n"); } + while(g_upnp_interrupt.sleep_for(std::chrono::minutes(20))); + + r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); + LogPrintf("UPNP_DeletePortMapping() returned: %d\n", r); + freeUPNPDevlist(devlist); devlist = nullptr; + FreeUPNPUrls(&urls); } else { LogPrintf("No valid UPnP IGDs found\n"); - freeUPNPDevlist(devlist); devlist = 0; + freeUPNPDevlist(devlist); devlist = nullptr; if (r != 0) FreeUPNPUrls(&urls); } } -void MapPort(bool fUseUPnP) +void StartMapPort() { - static boost::thread* upnp_thread = NULL; + if (!g_upnp_thread.joinable()) { + assert(!g_upnp_interrupt); + g_upnp_thread = std::thread((std::bind(&TraceThread<void (*)()>, "upnp", &ThreadMapPort))); + } +} - if (fUseUPnP) - { - if (upnp_thread) { - upnp_thread->interrupt(); - upnp_thread->join(); - delete upnp_thread; - } - upnp_thread = new boost::thread(boost::bind(&TraceThread<void (*)()>, "upnp", &ThreadMapPort)); +void InterruptMapPort() +{ + if(g_upnp_thread.joinable()) { + g_upnp_interrupt(); } - else if (upnp_thread) { - upnp_thread->interrupt(); - upnp_thread->join(); - delete upnp_thread; - upnp_thread = NULL; +} + +void StopMapPort() +{ + if(g_upnp_thread.joinable()) { + g_upnp_thread.join(); + g_upnp_interrupt.reset(); } } #else -void MapPort(bool) +void StartMapPort() +{ + // Intentionally left blank. +} +void InterruptMapPort() +{ + // Intentionally left blank. +} +void StopMapPort() { // Intentionally left blank. } @@ -1117,43 +1585,64 @@ void MapPort(bool) -void ThreadDNSAddressSeed() +void CConnman::ThreadDNSAddressSeed() { // goal: only query DNS seeds if address need is acute + // Avoiding DNS seeds when we don't need them improves user privacy by + // creating fewer identifying DNS requests, reduces trust by giving seeds + // less influence on the network topology, and reduces traffic to the seeds. if ((addrman.size() > 0) && - (!GetBoolArg("-forcednsseed", false))) { - MilliSleep(11 * 1000); + (!gArgs.GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { + if (!interruptNet.sleep_for(std::chrono::seconds(11))) + return; LOCK(cs_vNodes); - if (vNodes.size() >= 2) { + int nRelevant = 0; + for (auto pnode : vNodes) { + nRelevant += pnode->fSuccessfullyConnected && !pnode->fFeeler && !pnode->fOneShot && !pnode->m_manual_connection && !pnode->fInbound; + } + if (nRelevant >= 2) { LogPrintf("P2P peers available. Skipped DNS seeding.\n"); return; } } - const vector<CDNSSeedData> &vSeeds = Params().DNSSeeds(); + const std::vector<std::string> &vSeeds = Params().DNSSeeds(); int found = 0; LogPrintf("Loading addresses from DNS seeds (could take a while)\n"); - BOOST_FOREACH(const CDNSSeedData &seed, vSeeds) { + for (const std::string &seed : vSeeds) { + if (interruptNet) { + return; + } if (HaveNameProxy()) { - AddOneShot(seed.host); + AddOneShot(seed); } else { - vector<CNetAddr> vIPs; - vector<CAddress> vAdd; - if (LookupHost(seed.host.c_str(), vIPs)) + std::vector<CNetAddr> vIPs; + std::vector<CAddress> vAdd; + ServiceFlags requiredServiceBits = GetDesirableServiceFlags(NODE_NONE); + std::string host = strprintf("x%x.%s", requiredServiceBits, seed); + CNetAddr resolveSource; + if (!resolveSource.SetInternal(host)) { + continue; + } + if (LookupHost(host.c_str(), vIPs, 0, true)) { - BOOST_FOREACH(CNetAddr& ip, vIPs) + for (const CNetAddr& ip : vIPs) { int nOneDay = 24*3600; - CAddress addr = CAddress(CService(ip, Params().GetDefaultPort())); + CAddress addr = CAddress(CService(ip, Params().GetDefaultPort()), requiredServiceBits); addr.nTime = GetTime() - 3*nOneDay - GetRand(4*nOneDay); // use a random age between 3 and 7 days old vAdd.push_back(addr); found++; } + addrman.Add(vAdd, resolveSource); + } else { + // We now avoid directly using results from DNS Seeds which do not support service bit filtering, + // instead using them as a oneshot to get nodes with our desired service bits. + AddOneShot(seed); } - addrman.Add(vAdd, CNetAddr(seed.name, true)); } } @@ -1171,20 +1660,26 @@ void ThreadDNSAddressSeed() -void DumpAddresses() +void CConnman::DumpAddresses() { int64_t nStart = GetTimeMillis(); CAddrDB adb; adb.Write(addrman); - LogPrint("net", "Flushed %d addresses to peers.dat %dms\n", + LogPrint(BCLog::NET, "Flushed %d addresses to peers.dat %dms\n", addrman.size(), GetTimeMillis() - nStart); } -void static ProcessOneShot() +void CConnman::DumpData() +{ + DumpAddresses(); + DumpBanlist(); +} + +void CConnman::ProcessOneShot() { - string strDest; + std::string strDest; { LOCK(cs_vOneShots); if (vOneShots.empty()) @@ -1195,49 +1690,88 @@ void static ProcessOneShot() CAddress addr; CSemaphoreGrant grant(*semOutbound, true); if (grant) { - if (!OpenNetworkConnection(addr, &grant, strDest.c_str(), true)) - AddOneShot(strDest); + OpenNetworkConnection(addr, false, &grant, strDest.c_str(), true); } } -void ThreadOpenConnections() +bool CConnman::GetTryNewOutboundPeer() +{ + return m_try_another_outbound_peer; +} + +void CConnman::SetTryNewOutboundPeer(bool flag) +{ + m_try_another_outbound_peer = flag; + LogPrint(BCLog::NET, "net: setting try another outbound peer=%s\n", flag ? "true" : "false"); +} + +// Return the number of peers we have over our outbound connection limit +// Exclude peers that are marked for disconnect, or are going to be +// disconnected soon (eg one-shots and feelers) +// Also exclude peers that haven't finished initial connection handshake yet +// (so that we don't decide we're over our desired connection limit, and then +// evict some peer that has finished the handshake) +int CConnman::GetExtraOutboundCount() +{ + int nOutbound = 0; + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { + if (!pnode->fInbound && !pnode->m_manual_connection && !pnode->fFeeler && !pnode->fDisconnect && !pnode->fOneShot && pnode->fSuccessfullyConnected) { + ++nOutbound; + } + } + } + return std::max(nOutbound - nMaxOutbound, 0); +} + +void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) { // Connect to specific addresses - if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0) + if (!connect.empty()) { for (int64_t nLoop = 0;; nLoop++) { ProcessOneShot(); - BOOST_FOREACH(string strAddr, mapMultiArgs["-connect"]) + for (const std::string& strAddr : connect) { - CAddress addr; - OpenNetworkConnection(addr, NULL, strAddr.c_str()); + CAddress addr(CService(), NODE_NONE); + OpenNetworkConnection(addr, false, nullptr, strAddr.c_str(), false, false, true); for (int i = 0; i < 10 && i < nLoop; i++) { - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } // Initiate network connections int64_t nStart = GetTime(); - while (true) + + // Minimum time before next feeler connection (in microseconds). + int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); + while (!interruptNet) { ProcessOneShot(); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; CSemaphoreGrant grant(*semOutbound); - boost::this_thread::interruption_point(); + if (interruptNet) + return; // Add seed nodes if DNS seeds are all down (an infrastructure attack?). if (addrman.size() == 0 && (GetTime() - nStart > 60)) { static bool done = false; if (!done) { LogPrintf("Adding fixed seed nodes as DNS doesn't seem to be available.\n"); - addrman.Add(Params().FixedSeeds(), CNetAddr("127.0.0.1")); + CNetAddr local; + local.SetInternal("fixedseeds"); + addrman.Add(convertSeed6(Params().FixedSeeds()), local); done = true; } } @@ -1250,24 +1784,51 @@ void ThreadOpenConnections() // Only connect out to one peer per network group (/16 for IPv4). // Do this here so we don't have to critsect vNodes inside mapAddresses critsect. int nOutbound = 0; - set<vector<unsigned char> > setConnected; + std::set<std::vector<unsigned char> > setConnected; { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) { - if (!pnode->fInbound) { + for (CNode* pnode : vNodes) { + if (!pnode->fInbound && !pnode->m_manual_connection) { + // Netgroups for inbound and addnode peers are not excluded because our goal here + // is to not use multiple of our limited outbound slots on a single netgroup + // but inbound and addnode peers do not use our outbound slots. Inbound peers + // also have the added issue that they're attacker controlled and could be used + // to prevent us from connecting to particular hosts if we used them here. setConnected.insert(pnode->addr.GetGroup()); nOutbound++; } } } - int64_t nANow = GetAdjustedTime(); + // Feeler Connections + // + // Design goals: + // * Increase the number of connectable addresses in the tried table. + // + // Method: + // * Choose a random address from new and attempt to connect to it if we can connect + // successfully it is added to tried. + // * Start attempting feeler connections only after node finishes making outbound + // connections. + // * Only make a feeler connection once every few minutes. + // + bool fFeeler = false; + if (nOutbound >= nMaxOutbound && !GetTryNewOutboundPeer()) { + int64_t nTime = GetTimeMicros(); // The current time right now (in microseconds). + if (nTime > nNextFeeler) { + nNextFeeler = PoissonNextSend(nTime, FEELER_INTERVAL); + fFeeler = true; + } else { + continue; + } + } + + int64_t nANow = GetAdjustedTime(); int nTries = 0; - while (true) + while (!interruptNet) { - // use an nUnkBias between 10 (no outgoing connections) and 90 (8 outgoing connections) - CAddress addr = addrman.Select(10 + min(nOutbound,8)*10); + CAddrInfo addr = addrman.Select(fFeeler); // if we selected an invalid address, restart if (!addr.IsValid() || setConnected.count(addr.GetGroup()) || IsLocal(addr)) @@ -1287,6 +1848,15 @@ void ThreadOpenConnections() if (nANow - addr.nLastTry < 600 && nTries < 30) continue; + // for non-feelers, require all the services we'll want, + // for feelers, only require they be a full node (only because most + // SPV clients don't have a good address DB available) + if (!fFeeler && !HasAllDesirableServiceFlags(addr.nServices)) { + continue; + } else if (fFeeler && !MayHaveUsefulAddressDB(addr.nServices)) { + continue; + } + // do not allow non-default ports, unless after 50 invalid addresses selected already if (addr.GetPort() != Params().GetDefaultPort() && nTries < 50) continue; @@ -1295,175 +1865,186 @@ void ThreadOpenConnections() break; } - if (addrConnect.IsValid()) - OpenNetworkConnection(addrConnect, &grant); + if (addrConnect.IsValid()) { + + if (fFeeler) { + // Add small amount of random noise before connection to avoid synchronization. + int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); + if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep))) + return; + LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToString()); + } + + OpenNetworkConnection(addrConnect, (int)setConnected.size() >= std::min(nMaxConnections - 1, 2), &grant, nullptr, false, fFeeler); + } } } -void ThreadOpenAddedConnections() +std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() { + std::vector<AddedNodeInfo> ret; + + std::list<std::string> lAddresses(0); { LOCK(cs_vAddedNodes); - vAddedNodes = mapMultiArgs["-addnode"]; + ret.reserve(vAddedNodes.size()); + std::copy(vAddedNodes.cbegin(), vAddedNodes.cend(), std::back_inserter(lAddresses)); } - if (HaveNameProxy()) { - while(true) { - list<string> lAddresses(0); - { - LOCK(cs_vAddedNodes); - BOOST_FOREACH(string& strAddNode, vAddedNodes) - lAddresses.push_back(strAddNode); + + // Build a map of all already connected addresses (by IP:port and by name) to inbound/outbound and resolved CService + std::map<CService, bool> mapConnected; + std::map<std::string, std::pair<bool, CService>> mapConnectedByName; + { + LOCK(cs_vNodes); + for (const CNode* pnode : vNodes) { + if (pnode->addr.IsValid()) { + mapConnected[pnode->addr] = pnode->fInbound; } - BOOST_FOREACH(string& strAddNode, lAddresses) { - CAddress addr; - CSemaphoreGrant grant(*semOutbound); - OpenNetworkConnection(addr, &grant, strAddNode.c_str()); - MilliSleep(500); + std::string addrName = pnode->GetAddrName(); + if (!addrName.empty()) { + mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->fInbound, static_cast<const CService&>(pnode->addr)); } - MilliSleep(120000); // Retry every 2 minutes } } - for (unsigned int i = 0; true; i++) - { - list<string> lAddresses(0); - { - LOCK(cs_vAddedNodes); - BOOST_FOREACH(string& strAddNode, vAddedNodes) - lAddresses.push_back(strAddNode); + for (const std::string& strAddNode : lAddresses) { + CService service(LookupNumeric(strAddNode.c_str(), Params().GetDefaultPort())); + if (service.IsValid()) { + // strAddNode is an IP:port + auto it = mapConnected.find(service); + if (it != mapConnected.end()) { + ret.push_back(AddedNodeInfo{strAddNode, service, true, it->second}); + } else { + ret.push_back(AddedNodeInfo{strAddNode, CService(), false, false}); + } + } else { + // strAddNode is a name + auto it = mapConnectedByName.find(strAddNode); + if (it != mapConnectedByName.end()) { + ret.push_back(AddedNodeInfo{strAddNode, it->second.second, true, it->second.first}); + } else { + ret.push_back(AddedNodeInfo{strAddNode, CService(), false, false}); + } } + } - list<vector<CService> > lservAddressesToAdd(0); - BOOST_FOREACH(string& strAddNode, lAddresses) - { - vector<CService> vservNode(0); - if(Lookup(strAddNode.c_str(), vservNode, Params().GetDefaultPort(), fNameLookup, 0)) - { - lservAddressesToAdd.push_back(vservNode); - { - LOCK(cs_setservAddNodeAddresses); - BOOST_FOREACH(CService& serv, vservNode) - setservAddNodeAddresses.insert(serv); + return ret; +} + +void CConnman::ThreadOpenAddedConnections() +{ + while (true) + { + CSemaphoreGrant grant(*semAddnode); + std::vector<AddedNodeInfo> vInfo = GetAddedNodeInfo(); + bool tried = false; + for (const AddedNodeInfo& info : vInfo) { + if (!info.fConnected) { + if (!grant.TryAcquire()) { + // If we've used up our semaphore and need a new one, lets not wait here since while we are waiting + // the addednodeinfo state might change. + break; } + tried = true; + CAddress addr(CService(), NODE_NONE); + OpenNetworkConnection(addr, false, &grant, info.strAddedNode.c_str(), false, false, true); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - // Attempt to connect to each IP for each addnode entry until at least one is successful per addnode entry - // (keeping in mind that addnode entries can have many IPs if fNameLookup) - { - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - for (list<vector<CService> >::iterator it = lservAddressesToAdd.begin(); it != lservAddressesToAdd.end(); it++) - BOOST_FOREACH(CService& addrNode, *(it)) - if (pnode->addr == addrNode) - { - it = lservAddressesToAdd.erase(it); - it--; - break; - } - } - BOOST_FOREACH(vector<CService>& vserv, lservAddressesToAdd) - { - CSemaphoreGrant grant(*semOutbound); - OpenNetworkConnection(CAddress(vserv[i % vserv.size()]), &grant); - MilliSleep(500); - } - MilliSleep(120000); // Retry every 2 minutes + // Retry every 60 seconds if a connection was attempted, otherwise two seconds + if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2))) + return; } } // if successful, this moves the passed grant to the constructed node -bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot) +void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler, bool manual_connection) { // // Initiate outbound network connection // - boost::this_thread::interruption_point(); + if (interruptNet) { + return; + } + if (!fNetworkActive) { + return; + } if (!pszDest) { if (IsLocal(addrConnect) || - FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect) || + FindNode(static_cast<CNetAddr>(addrConnect)) || IsBanned(addrConnect) || FindNode(addrConnect.ToStringIPPort())) - return false; - } else if (FindNode(pszDest)) - return false; + return; + } else if (FindNode(std::string(pszDest))) + return; - CNode* pnode = ConnectNode(addrConnect, pszDest); - boost::this_thread::interruption_point(); + CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure); if (!pnode) - return false; + return; if (grantOutbound) grantOutbound->MoveTo(pnode->grantOutbound); - pnode->fNetworkNode = true; if (fOneShot) pnode->fOneShot = true; + if (fFeeler) + pnode->fFeeler = true; + if (manual_connection) + pnode->m_manual_connection = true; - return true; + m_msgproc->InitializeNode(pnode); + { + LOCK(cs_vNodes); + vNodes.push_back(pnode); + } } - -void ThreadMessageHandler() +void CConnman::ThreadMessageHandler() { - SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL); - while (true) + while (!flagInterruptMsgProc) { - vector<CNode*> vNodesCopy; + std::vector<CNode*> vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; - BOOST_FOREACH(CNode* pnode, vNodesCopy) { + for (CNode* pnode : vNodesCopy) { pnode->AddRef(); } } - // Poll the connected nodes for messages - CNode* pnodeTrickle = NULL; - if (!vNodesCopy.empty()) - pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; - - bool fSleep = true; + bool fMoreWork = false; - BOOST_FOREACH(CNode* pnode, vNodesCopy) + for (CNode* pnode : vNodesCopy) { if (pnode->fDisconnect) continue; // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - if (!g_signals.ProcessMessages(pnode)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < SendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete())) - { - fSleep = false; - } - } - } - } - boost::this_thread::interruption_point(); - + bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); + if (flagInterruptMsgProc) + return; // Send messages { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) - g_signals.SendMessages(pnode, pnode == pnodeTrickle); + LOCK(pnode->cs_sendProcessing); + m_msgproc->SendMessages(pnode, flagInterruptMsgProc); } - boost::this_thread::interruption_point(); + + if (flagInterruptMsgProc) + return; } { LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodesCopy) + for (CNode* pnode : vNodesCopy) pnode->Release(); } - if (fSleep) - MilliSleep(100); + std::unique_lock<std::mutex> lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); + } + fMsgProcWake = false; } } @@ -1472,7 +2053,7 @@ void ThreadMessageHandler() -bool BindListenPort(const CService &addrBind, string& strError, bool fWhitelisted) +bool CConnman::BindListenPort(const CService &addrBind, std::string& strError, bool fWhitelisted) { strError = ""; int nOne = 1; @@ -1487,31 +2068,21 @@ bool BindListenPort(const CService &addrBind, string& strError, bool fWhiteliste return false; } - SOCKET hListenSocket = socket(((struct sockaddr*)&sockaddr)->sa_family, SOCK_STREAM, IPPROTO_TCP); + SOCKET hListenSocket = CreateSocket(addrBind); if (hListenSocket == INVALID_SOCKET) { strError = strprintf("Error: Couldn't open socket for incoming connections (socket returned error %s)", NetworkErrorString(WSAGetLastError())); LogPrintf("%s\n", strError); return false; } - #ifndef WIN32 -#ifdef SO_NOSIGPIPE - // Different way of disabling SIGPIPE on BSD - setsockopt(hListenSocket, SOL_SOCKET, SO_NOSIGPIPE, (void*)&nOne, sizeof(int)); -#endif // Allow binding if the port is still in TIME_WAIT state after - // the program was closed and restarted. Not an issue on windows! + // the program was closed and restarted. setsockopt(hListenSocket, SOL_SOCKET, SO_REUSEADDR, (void*)&nOne, sizeof(int)); +#else + setsockopt(hListenSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&nOne, sizeof(int)); #endif - // Set to non-blocking, incoming connections will also inherit this - if (!SetSocketNonBlocking(hListenSocket, true)) { - strError = strprintf("BindListenPort: Setting listening socket to non-blocking failed, error %s\n", NetworkErrorString(WSAGetLastError())); - LogPrintf("%s\n", strError); - return false; - } - // some systems don't have IPV6_V6ONLY but are always v6only; others do have the option // and enable it by default or not. Try to enable it, if possible. if (addrBind.IsIPv6()) { @@ -1532,7 +2103,7 @@ bool BindListenPort(const CService &addrBind, string& strError, bool fWhiteliste { int nErr = WSAGetLastError(); if (nErr == WSAEADDRINUSE) - strError = strprintf(_("Unable to bind to %s on this computer. Bitcoin Core is probably already running."), addrBind.ToString()); + strError = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), addrBind.ToString(), _(PACKAGE_NAME)); else strError = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), addrBind.ToString(), NetworkErrorString(nErr)); LogPrintf("%s\n", strError); @@ -1558,7 +2129,7 @@ bool BindListenPort(const CService &addrBind, string& strError, bool fWhiteliste return true; } -void static Discover(boost::thread_group& threadGroup) +void Discover() { if (!fDiscover) return; @@ -1568,10 +2139,10 @@ void static Discover(boost::thread_group& threadGroup) char pszHostName[256] = ""; if (gethostname(pszHostName, sizeof(pszHostName)) != SOCKET_ERROR) { - vector<CNetAddr> vaddr; - if (LookupHost(pszHostName, vaddr)) + std::vector<CNetAddr> vaddr; + if (LookupHost(pszHostName, vaddr, 0, true)) { - BOOST_FOREACH (const CNetAddr &addr, vaddr) + for (const CNetAddr &addr : vaddr) { if (AddLocal(addr, LOCAL_IF)) LogPrintf("%s: %s - %s\n", __func__, pszHostName, addr.ToString()); @@ -1583,9 +2154,9 @@ void static Discover(boost::thread_group& threadGroup) struct ifaddrs* myaddrs; if (getifaddrs(&myaddrs) == 0) { - for (struct ifaddrs* ifa = myaddrs; ifa != NULL; ifa = ifa->ifa_next) + for (struct ifaddrs* ifa = myaddrs; ifa != nullptr; ifa = ifa->ifa_next) { - if (ifa->ifa_addr == NULL) continue; + if (ifa->ifa_addr == nullptr) continue; if ((ifa->ifa_flags & IFF_UP) == 0) continue; if (strcmp(ifa->ifa_name, "lo") == 0) continue; if (strcmp(ifa->ifa_name, "lo0") == 0) continue; @@ -1609,72 +2180,193 @@ void static Discover(boost::thread_group& threadGroup) #endif } -void StartNode(boost::thread_group& threadGroup) +void CConnman::SetNetworkActive(bool active) +{ + LogPrint(BCLog::NET, "SetNetworkActive: %s\n", active); + + if (fNetworkActive == active) { + return; + } + + fNetworkActive = active; + + if (!fNetworkActive) { + LOCK(cs_vNodes); + // Close sockets to all nodes + for (CNode* pnode : vNodes) { + pnode->CloseSocketDisconnect(); + } + } + + uiInterface.NotifyNetworkActiveChanged(fNetworkActive); +} + +CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSeed1(nSeed1In) +{ + fNetworkActive = true; + setBannedIsDirty = false; + fAddressesInitialized = false; + nLastNodeId = 0; + nSendBufferMaxSize = 0; + nReceiveFloodSize = 0; + flagInterruptMsgProc = false; + SetTryNewOutboundPeer(false); + + Options connOptions; + Init(connOptions); +} + +NodeId CConnman::GetNewNodeId() +{ + return nLastNodeId.fetch_add(1, std::memory_order_relaxed); +} + + +bool CConnman::Bind(const CService &addr, unsigned int flags) { + if (!(flags & BF_EXPLICIT) && IsLimited(addr)) + return false; + std::string strError; + if (!BindListenPort(addr, strError, (flags & BF_WHITELIST) != 0)) { + if ((flags & BF_REPORT_ERROR) && clientInterface) { + clientInterface->ThreadSafeMessageBox(strError, "", CClientUIInterface::MSG_ERROR); + } + return false; + } + return true; +} + +bool CConnman::InitBinds(const std::vector<CService>& binds, const std::vector<CService>& whiteBinds) { + bool fBound = false; + for (const auto& addrBind : binds) { + fBound |= Bind(addrBind, (BF_EXPLICIT | BF_REPORT_ERROR)); + } + for (const auto& addrBind : whiteBinds) { + fBound |= Bind(addrBind, (BF_EXPLICIT | BF_REPORT_ERROR | BF_WHITELIST)); + } + if (binds.empty() && whiteBinds.empty()) { + struct in_addr inaddr_any; + inaddr_any.s_addr = INADDR_ANY; + fBound |= Bind(CService(in6addr_any, GetListenPort()), BF_NONE); + fBound |= Bind(CService(inaddr_any, GetListenPort()), !fBound ? BF_REPORT_ERROR : BF_NONE); + } + return fBound; +} + +bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) { - uiInterface.InitMessage(_("Loading addresses...")); - // Load addresses for peers.dat + Init(connOptions); + + { + LOCK(cs_totalBytesRecv); + nTotalBytesRecv = 0; + } + { + LOCK(cs_totalBytesSent); + nTotalBytesSent = 0; + nMaxOutboundTotalBytesSentInCycle = 0; + nMaxOutboundCycleStartTime = 0; + } + + if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds)) { + if (clientInterface) { + clientInterface->ThreadSafeMessageBox( + _("Failed to listen on any port. Use -listen=0 if you want this."), + "", CClientUIInterface::MSG_ERROR); + } + return false; + } + + for (const auto& strDest : connOptions.vSeedNodes) { + AddOneShot(strDest); + } + + if (clientInterface) { + clientInterface->InitMessage(_("Loading P2P addresses...")); + } + // Load addresses from peers.dat int64_t nStart = GetTimeMillis(); { CAddrDB adb; - if (!adb.Read(addrman)) + if (adb.Read(addrman)) + LogPrintf("Loaded %i addresses from peers.dat %dms\n", addrman.size(), GetTimeMillis() - nStart); + else { + addrman.Clear(); // Addrman can be in an inconsistent state after failure, reset it LogPrintf("Invalid or missing peers.dat; recreating\n"); + DumpAddresses(); + } } - LogPrintf("Loaded %i addresses from peers.dat %dms\n", - addrman.size(), GetTimeMillis() - nStart); + if (clientInterface) + clientInterface->InitMessage(_("Loading banlist...")); + // Load addresses from banlist.dat + nStart = GetTimeMillis(); + CBanDB bandb; + banmap_t banmap; + if (bandb.Read(banmap)) { + SetBanned(banmap); // thread save setter + SetBannedSetDirty(false); // no need to write down, just read data + SweepBanned(); // sweep out unused entries + + LogPrint(BCLog::NET, "Loaded %d banned node ips/subnets from banlist.dat %dms\n", + banmap.size(), GetTimeMillis() - nStart); + } else { + LogPrintf("Invalid or missing banlist.dat; recreating\n"); + SetBannedSetDirty(true); // force write + DumpBanlist(); + } + + uiInterface.InitMessage(_("Starting network threads...")); + fAddressesInitialized = true; - if (semOutbound == NULL) { + if (semOutbound == nullptr) { // initialize semaphore - int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, nMaxConnections); - semOutbound = new CSemaphore(nMaxOutbound); + semOutbound = MakeUnique<CSemaphore>(std::min((nMaxOutbound + nMaxFeeler), nMaxConnections)); + } + if (semAddnode == nullptr) { + // initialize semaphore + semAddnode = MakeUnique<CSemaphore>(nMaxAddnode); } - - if (pnodeLocalHost == NULL) - pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices)); - - Discover(threadGroup); // // Start threads // + assert(m_msgproc); + InterruptSocks5(false); + interruptNet.reset(); + flagInterruptMsgProc = false; - if (!GetBoolArg("-dnsseed", true)) - LogPrintf("DNS seeding disabled\n"); - else - threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "dnsseed", &ThreadDNSAddressSeed)); - - // Map ports with UPnP - MapPort(GetBoolArg("-upnp", DEFAULT_UPNP)); + { + std::unique_lock<std::mutex> lock(mutexMsgProc); + fMsgProcWake = false; + } // Send and receive from sockets, accept connections - threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler)); + threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); + + if (!gArgs.GetBoolArg("-dnsseed", true)) + LogPrintf("DNS seeding disabled\n"); + else + threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this))); // Initiate outbound connections from -addnode - threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections)); + threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this))); - // Initiate outbound connections - threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections)); + if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) { + if (clientInterface) { + clientInterface->ThreadSafeMessageBox( + _("Cannot provide specific connections and have addrman find outgoing connections at the same."), + "", CClientUIInterface::MSG_ERROR); + } + return false; + } + if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty()) + threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing))); // Process messages - threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler)); + threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses - threadGroup.create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, DUMP_ADDRESSES_INTERVAL * 1000)); -} - -bool StopNode() -{ - LogPrintf("StopNode()\n"); - MapPort(false); - if (semOutbound) - for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++) - semOutbound->post(); - - if (fAddressesInitialized) - { - DumpAddresses(); - fAddressesInitialized = false; - } + scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000); return true; } @@ -1686,28 +2378,6 @@ public: ~CNetCleanup() { - // Close sockets - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->hSocket != INVALID_SOCKET) - CloseSocket(pnode->hSocket); - BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket) - if (hListenSocket.socket != INVALID_SOCKET) - if (!CloseSocket(hListenSocket.socket)) - LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError())); - - // clean up some globals (to help leak detection) - BOOST_FOREACH(CNode *pnode, vNodes) - delete pnode; - BOOST_FOREACH(CNode *pnode, vNodesDisconnected) - delete pnode; - vNodes.clear(); - vNodesDisconnected.clear(); - vhListenSocket.clear(); - delete semOutbound; - semOutbound = NULL; - delete pnodeLocalHost; - pnodeLocalHost = NULL; - #ifdef WIN32 // Shutdown Windows Sockets WSACleanup(); @@ -1716,289 +2386,403 @@ public: } instance_of_cnetcleanup; +void CConnman::Interrupt() +{ + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + flagInterruptMsgProc = true; + } + condMsgProc.notify_all(); + interruptNet(); + InterruptSocks5(true); + if (semOutbound) { + for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) { + semOutbound->post(); + } + } + if (semAddnode) { + for (int i=0; i<nMaxAddnode; i++) { + semAddnode->post(); + } + } +} +void CConnman::Stop() +{ + if (threadMessageHandler.joinable()) + threadMessageHandler.join(); + if (threadOpenConnections.joinable()) + threadOpenConnections.join(); + if (threadOpenAddedConnections.joinable()) + threadOpenAddedConnections.join(); + if (threadDNSAddressSeed.joinable()) + threadDNSAddressSeed.join(); + if (threadSocketHandler.joinable()) + threadSocketHandler.join(); + if (fAddressesInitialized) + { + DumpData(); + fAddressesInitialized = false; + } -void RelayTransaction(const CTransaction& tx) + // Close sockets + for (CNode* pnode : vNodes) + pnode->CloseSocketDisconnect(); + for (ListenSocket& hListenSocket : vhListenSocket) + if (hListenSocket.socket != INVALID_SOCKET) + if (!CloseSocket(hListenSocket.socket)) + LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError())); + + // clean up some globals (to help leak detection) + for (CNode *pnode : vNodes) { + DeleteNode(pnode); + } + for (CNode *pnode : vNodesDisconnected) { + DeleteNode(pnode); + } + vNodes.clear(); + vNodesDisconnected.clear(); + vhListenSocket.clear(); + semOutbound.reset(); + semAddnode.reset(); +} + +void CConnman::DeleteNode(CNode* pnode) { - CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); - ss.reserve(10000); - ss << tx; - RelayTransaction(tx, ss); + assert(pnode); + bool fUpdateConnectionTime = false; + m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime); + if(fUpdateConnectionTime) { + addrman.Connected(pnode->addr); + } + delete pnode; } -void RelayTransaction(const CTransaction& tx, const CDataStream& ss) +CConnman::~CConnman() { - CInv inv(MSG_TX, tx.GetHash()); - { - LOCK(cs_mapRelay); - // Expire old relay messages - while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime()) - { - mapRelay.erase(vRelayExpiration.front().second); - vRelayExpiration.pop_front(); - } + Interrupt(); + Stop(); +} - // Save original serialized message so newer versions are preserved - mapRelay.insert(std::make_pair(inv, ss)); - vRelayExpiration.push_back(std::make_pair(GetTime() + 15 * 60, inv)); - } - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - { - if(!pnode->fRelayTxes) - continue; - LOCK(pnode->cs_filter); - if (pnode->pfilter) - { - if (pnode->pfilter->IsRelevantAndUpdate(tx)) - pnode->PushInventory(inv); - } else - pnode->PushInventory(inv); - } +size_t CConnman::GetAddressCount() const +{ + return addrman.size(); } -void CNode::RecordBytesRecv(uint64_t bytes) +void CConnman::SetServices(const CService &addr, ServiceFlags nServices) { - LOCK(cs_totalBytesRecv); - nTotalBytesRecv += bytes; + addrman.SetServices(addr, nServices); } -void CNode::RecordBytesSent(uint64_t bytes) +void CConnman::MarkAddressGood(const CAddress& addr) { - LOCK(cs_totalBytesSent); - nTotalBytesSent += bytes; + addrman.Good(addr); } -uint64_t CNode::GetTotalBytesRecv() +void CConnman::AddNewAddresses(const std::vector<CAddress>& vAddr, const CAddress& addrFrom, int64_t nTimePenalty) { - LOCK(cs_totalBytesRecv); - return nTotalBytesRecv; + addrman.Add(vAddr, addrFrom, nTimePenalty); } -uint64_t CNode::GetTotalBytesSent() +std::vector<CAddress> CConnman::GetAddresses() { - LOCK(cs_totalBytesSent); - return nTotalBytesSent; + return addrman.GetAddr(); } -void CNode::Fuzz(int nChance) +bool CConnman::AddNode(const std::string& strNode) { - if (!fSuccessfullyConnected) return; // Don't fuzz initial handshake - if (GetRand(nChance) != 0) return; // Fuzz 1 of every nChance messages + LOCK(cs_vAddedNodes); + for (const std::string& it : vAddedNodes) { + if (strNode == it) return false; + } - switch (GetRand(3)) - { - case 0: - // xor a random byte with a random value: - if (!ssSend.empty()) { - CDataStream::size_type pos = GetRand(ssSend.size()); - ssSend[pos] ^= (unsigned char)(GetRand(256)); - } - break; - case 1: - // delete a random byte: - if (!ssSend.empty()) { - CDataStream::size_type pos = GetRand(ssSend.size()); - ssSend.erase(ssSend.begin()+pos); - } - break; - case 2: - // insert a random byte at a random position - { - CDataStream::size_type pos = GetRand(ssSend.size()); - char ch = (char)GetRand(256); - ssSend.insert(ssSend.begin()+pos, ch); + vAddedNodes.push_back(strNode); + return true; +} + +bool CConnman::RemoveAddedNode(const std::string& strNode) +{ + LOCK(cs_vAddedNodes); + for(std::vector<std::string>::iterator it = vAddedNodes.begin(); it != vAddedNodes.end(); ++it) { + if (strNode == *it) { + vAddedNodes.erase(it); + return true; } - break; } - // Chance of more than one change half the time: - // (more changes exponentially less likely): - Fuzz(2); + return false; } -// -// CAddrDB -// +size_t CConnman::GetNodeCount(NumConnections flags) +{ + LOCK(cs_vNodes); + if (flags == CConnman::CONNECTIONS_ALL) // Shortcut if we want total + return vNodes.size(); -CAddrDB::CAddrDB() + int nNum = 0; + for (const auto& pnode : vNodes) { + if (flags & (pnode->fInbound ? CONNECTIONS_IN : CONNECTIONS_OUT)) { + nNum++; + } + } + + return nNum; +} + +void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) { - pathAddr = GetDataDir() / "peers.dat"; + vstats.clear(); + LOCK(cs_vNodes); + vstats.reserve(vNodes.size()); + for (CNode* pnode : vNodes) { + vstats.emplace_back(); + pnode->copyStats(vstats.back()); + } } -bool CAddrDB::Write(const CAddrMan& addr) +bool CConnman::DisconnectNode(const std::string& strNode) +{ + LOCK(cs_vNodes); + if (CNode* pnode = FindNode(strNode)) { + pnode->fDisconnect = true; + return true; + } + return false; +} +bool CConnman::DisconnectNode(NodeId id) { - // Generate random temporary filename - unsigned short randv = 0; - GetRandBytes((unsigned char*)&randv, sizeof(randv)); - std::string tmpfn = strprintf("peers.dat.%04x", randv); + LOCK(cs_vNodes); + for(CNode* pnode : vNodes) { + if (id == pnode->GetId()) { + pnode->fDisconnect = true; + return true; + } + } + return false; +} - // serialize addresses, checksum data up to that point, then append csum - CDataStream ssPeers(SER_DISK, CLIENT_VERSION); - ssPeers << FLATDATA(Params().MessageStart()); - ssPeers << addr; - uint256 hash = Hash(ssPeers.begin(), ssPeers.end()); - ssPeers << hash; +void CConnman::RecordBytesRecv(uint64_t bytes) +{ + LOCK(cs_totalBytesRecv); + nTotalBytesRecv += bytes; +} - // open temp output file, and associate with CAutoFile - boost::filesystem::path pathTmp = GetDataDir() / tmpfn; - FILE *file = fopen(pathTmp.string().c_str(), "wb"); - CAutoFile fileout(file, SER_DISK, CLIENT_VERSION); - if (fileout.IsNull()) - return error("%s : Failed to open file %s", __func__, pathTmp.string()); +void CConnman::RecordBytesSent(uint64_t bytes) +{ + LOCK(cs_totalBytesSent); + nTotalBytesSent += bytes; - // Write and commit header, data - try { - fileout << ssPeers; - } - catch (std::exception &e) { - return error("%s : Serialize or I/O error - %s", __func__, e.what()); + uint64_t now = GetTime(); + if (nMaxOutboundCycleStartTime + nMaxOutboundTimeframe < now) + { + // timeframe expired, reset cycle + nMaxOutboundCycleStartTime = now; + nMaxOutboundTotalBytesSentInCycle = 0; } - FileCommit(fileout.Get()); - fileout.fclose(); - // replace existing peers.dat, if any, with new peers.dat.XXXX - if (!RenameOver(pathTmp, pathAddr)) - return error("%s : Rename-into-place failed", __func__); + // TODO, exclude whitebind peers + nMaxOutboundTotalBytesSentInCycle += bytes; +} - return true; +void CConnman::SetMaxOutboundTarget(uint64_t limit) +{ + LOCK(cs_totalBytesSent); + nMaxOutboundLimit = limit; } -bool CAddrDB::Read(CAddrMan& addr) +uint64_t CConnman::GetMaxOutboundTarget() { - // open input file, and associate with CAutoFile - FILE *file = fopen(pathAddr.string().c_str(), "rb"); - CAutoFile filein(file, SER_DISK, CLIENT_VERSION); - if (filein.IsNull()) - return error("%s : Failed to open file %s", __func__, pathAddr.string()); + LOCK(cs_totalBytesSent); + return nMaxOutboundLimit; +} - // use file size to size memory buffer - int fileSize = boost::filesystem::file_size(pathAddr); - int dataSize = fileSize - sizeof(uint256); - // Don't try to resize to a negative number if file is small - if (dataSize < 0) - dataSize = 0; - vector<unsigned char> vchData; - vchData.resize(dataSize); - uint256 hashIn; +uint64_t CConnman::GetMaxOutboundTimeframe() +{ + LOCK(cs_totalBytesSent); + return nMaxOutboundTimeframe; +} - // read data and checksum from file - try { - filein.read((char *)&vchData[0], dataSize); - filein >> hashIn; +uint64_t CConnman::GetMaxOutboundTimeLeftInCycle() +{ + LOCK(cs_totalBytesSent); + if (nMaxOutboundLimit == 0) + return 0; + + if (nMaxOutboundCycleStartTime == 0) + return nMaxOutboundTimeframe; + + uint64_t cycleEndTime = nMaxOutboundCycleStartTime + nMaxOutboundTimeframe; + uint64_t now = GetTime(); + return (cycleEndTime < now) ? 0 : cycleEndTime - GetTime(); +} + +void CConnman::SetMaxOutboundTimeframe(uint64_t timeframe) +{ + LOCK(cs_totalBytesSent); + if (nMaxOutboundTimeframe != timeframe) + { + // reset measure-cycle in case of changing + // the timeframe + nMaxOutboundCycleStartTime = GetTime(); } - catch (std::exception &e) { - return error("%s : Deserialize or I/O error - %s", __func__, e.what()); + nMaxOutboundTimeframe = timeframe; +} + +bool CConnman::OutboundTargetReached(bool historicalBlockServingLimit) +{ + LOCK(cs_totalBytesSent); + if (nMaxOutboundLimit == 0) + return false; + + if (historicalBlockServingLimit) + { + // keep a large enough buffer to at least relay each block once + uint64_t timeLeftInCycle = GetMaxOutboundTimeLeftInCycle(); + uint64_t buffer = timeLeftInCycle / 600 * MAX_BLOCK_SERIALIZED_SIZE; + if (buffer >= nMaxOutboundLimit || nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit - buffer) + return true; } - filein.fclose(); + else if (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) + return true; - CDataStream ssPeers(vchData, SER_DISK, CLIENT_VERSION); + return false; +} - // verify stored checksum matches input data - uint256 hashTmp = Hash(ssPeers.begin(), ssPeers.end()); - if (hashIn != hashTmp) - return error("%s : Checksum mismatch, data corrupted", __func__); +uint64_t CConnman::GetOutboundTargetBytesLeft() +{ + LOCK(cs_totalBytesSent); + if (nMaxOutboundLimit == 0) + return 0; - unsigned char pchMsgTmp[4]; - try { - // de-serialize file header (network specific magic number) and .. - ssPeers >> FLATDATA(pchMsgTmp); + return (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) ? 0 : nMaxOutboundLimit - nMaxOutboundTotalBytesSentInCycle; +} - // ... verify the network matches ours - if (memcmp(pchMsgTmp, Params().MessageStart(), sizeof(pchMsgTmp))) - return error("%s : Invalid network magic number", __func__); +uint64_t CConnman::GetTotalBytesRecv() +{ + LOCK(cs_totalBytesRecv); + return nTotalBytesRecv; +} - // de-serialize address data into one CAddrMan object - ssPeers >> addr; - } - catch (std::exception &e) { - return error("%s : Deserialize or I/O error - %s", __func__, e.what()); - } +uint64_t CConnman::GetTotalBytesSent() +{ + LOCK(cs_totalBytesSent); + return nTotalBytesSent; +} - return true; +ServiceFlags CConnman::GetLocalServices() const +{ + return nLocalServices; } -unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } -unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); } +void CConnman::SetBestHeight(int height) +{ + nBestHeight.store(height, std::memory_order_release); +} + +int CConnman::GetBestHeight() const +{ + return nBestHeight.load(std::memory_order_acquire); +} + +unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; } -CNode::CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn, bool fInboundIn) : ssSend(SER_NETWORK, INIT_PROTO_VERSION), setAddrKnown(5000) +CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string& addrNameIn, bool fInboundIn) : + nTimeConnected(GetSystemTimeInSeconds()), + addr(addrIn), + addrBind(addrBindIn), + fInbound(fInboundIn), + nKeyedNetGroup(nKeyedNetGroupIn), + addrKnown(5000, 0.001), + filterInventoryKnown(50000, 0.000001), + id(idIn), + nLocalHostNonce(nLocalHostNonceIn), + nLocalServices(nLocalServicesIn), + nMyStartingHeight(nMyStartingHeightIn), + nSendVersion(0) { - nServices = 0; + nServices = NODE_NONE; hSocket = hSocketIn; nRecvVersion = INIT_PROTO_VERSION; nLastSend = 0; nLastRecv = 0; nSendBytes = 0; nRecvBytes = 0; - nTimeConnected = GetTime(); - addr = addrIn; + nTimeOffset = 0; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; strSubVer = ""; fWhitelisted = false; fOneShot = false; + m_manual_connection = false; fClient = false; // set by version message - fInbound = fInboundIn; - fNetworkNode = false; + fFeeler = false; fSuccessfullyConnected = false; fDisconnect = false; nRefCount = 0; nSendSize = 0; nSendOffset = 0; - hashContinue = 0; + hashContinue = uint256(); nStartingHeight = -1; + filterInventoryKnown.reset(); + fSendMempool = false; fGetAddr = false; + nNextLocalAddrSend = 0; + nNextAddrSend = 0; + nNextInvSend = 0; fRelayTxes = false; - setInventoryKnown.max_size(SendBufferSize() / 1000); - pfilter = new CBloomFilter(); + fSentAddr = false; + pfilter = MakeUnique<CBloomFilter>(); + timeLastMempoolReq = 0; + nLastBlockTime = 0; + nLastTXTime = 0; nPingNonceSent = 0; nPingUsecStart = 0; nPingUsecTime = 0; fPingQueued = false; - - { - LOCK(cs_nLastNodeId); - id = nLastNodeId++; + nMinPingUsecTime = std::numeric_limits<int64_t>::max(); + minFeeFilter = 0; + lastSentFeeFilter = 0; + nextSendTimeFeeFilter = 0; + fPauseRecv = false; + fPauseSend = false; + nProcessQueueSize = 0; + + for (const std::string &msg : getAllNetMessageTypes()) + mapRecvBytesPerMsgCmd[msg] = 0; + mapRecvBytesPerMsgCmd[NET_MESSAGE_COMMAND_OTHER] = 0; + + if (fLogIPs) { + LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", addrName, id); + } else { + LogPrint(BCLog::NET, "Added connection peer=%d\n", id); } - - if (fLogIPs) - LogPrint("net", "Added connection to %s peer=%d\n", addrName, id); - else - LogPrint("net", "Added connection peer=%d\n", id); - - // Be shy and don't send version until we hear - if (hSocket != INVALID_SOCKET && !fInbound) - PushVersion(); - - GetNodeSignals().InitializeNode(GetId(), this); } CNode::~CNode() { CloseSocket(hSocket); - - if (pfilter) - delete pfilter; - - GetNodeSignals().FinalizeNode(GetId()); } void CNode::AskFor(const CInv& inv) { - if (mapAskFor.size() > MAPASKFOR_MAX_SZ) + if (mapAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ) + return; + // a peer may not have multiple non-responded queue positions for a single inv item + if (!setAskFor.insert(inv.hash).second) return; + // We're using mapAskFor as a priority queue, // the key is the earliest time the request can be sent int64_t nRequestTime; - limitedmap<CInv, int64_t>::const_iterator it = mapAlreadyAskedFor.find(inv); + limitedmap<uint256, int64_t>::const_iterator it = mapAlreadyAskedFor.find(inv.hash); if (it != mapAlreadyAskedFor.end()) nRequestTime = it->second; else nRequestTime = 0; - LogPrint("net", "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id); + LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id); // Make sure not to reuse time indexes to keep things in the same order int64_t nNow = GetTimeMicros() - 1000000; @@ -2012,64 +2796,77 @@ void CNode::AskFor(const CInv& inv) if (it != mapAlreadyAskedFor.end()) mapAlreadyAskedFor.update(it, nRequestTime); else - mapAlreadyAskedFor.insert(std::make_pair(inv, nRequestTime)); + mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime)); mapAskFor.insert(std::make_pair(nRequestTime, inv)); } -void CNode::BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend) +bool CConnman::NodeFullyConnected(const CNode* pnode) { - ENTER_CRITICAL_SECTION(cs_vSend); - assert(ssSend.size() == 0); - ssSend << CMessageHeader(pszCommand, 0); - LogPrint("net", "sending: %s ", pszCommand); + return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } -void CNode::AbortMessage() UNLOCK_FUNCTION(cs_vSend) +void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { - ssSend.clear(); + size_t nMessageSize = msg.data.size(); + size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE; + LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->GetId()); - LEAVE_CRITICAL_SECTION(cs_vSend); + std::vector<unsigned char> serializedHeader; + serializedHeader.reserve(CMessageHeader::HEADER_SIZE); + uint256 hash = Hash(msg.data.data(), msg.data.data() + nMessageSize); + CMessageHeader hdr(Params().MessageStart(), msg.command.c_str(), nMessageSize); + memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE); - LogPrint("net", "(aborted)\n"); -} + CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, serializedHeader, 0, hdr}; -void CNode::EndMessage() UNLOCK_FUNCTION(cs_vSend) -{ - // The -*messagestest options are intentionally not documented in the help message, - // since they are only used during development to debug the networking code and are - // not intended for end-users. - if (mapArgs.count("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 2)) == 0) + size_t nBytesSent = 0; { - LogPrint("net", "dropmessages DROPPING SEND MESSAGE\n"); - AbortMessage(); - return; + LOCK(pnode->cs_vSend); + bool optimisticSend(pnode->vSendMsg.empty()); + + //log total amount of bytes per command + pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; + pnode->nSendSize += nTotalSize; + + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; + pnode->vSendMsg.push_back(std::move(serializedHeader)); + if (nMessageSize) + pnode->vSendMsg.push_back(std::move(msg.data)); + + // If write queue empty, attempt "optimistic write" + if (optimisticSend == true) + nBytesSent = SocketSendData(pnode); } - if (mapArgs.count("-fuzzmessagestest")) - Fuzz(GetArg("-fuzzmessagestest", 10)); - - if (ssSend.size() == 0) - return; - - // Set the size - unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE; - memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize)); + if (nBytesSent) + RecordBytesSent(nBytesSent); +} - // Set the checksum - uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end()); - unsigned int nChecksum = 0; - memcpy(&nChecksum, &hash, sizeof(nChecksum)); - assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); - memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum)); +bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func) +{ + CNode* found = nullptr; + LOCK(cs_vNodes); + for (auto&& pnode : vNodes) { + if(pnode->GetId() == id) { + found = pnode; + break; + } + } + return found != nullptr && NodeFullyConnected(found) && func(found); +} - LogPrint("net", "(%d bytes) peer=%d\n", nSize, id); +int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) { + return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5); +} - std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData()); - ssSend.GetAndClear(*it); - nSendSize += (*it).size(); +CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const +{ + return CSipHasher(nSeed0, nSeed1).Write(id); +} - // If write queue empty, attempt "optimistic write" - if (it == vSendMsg.begin()) - SocketSendData(this); +uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const +{ + std::vector<unsigned char> vchNetGroup(ad.GetGroup()); - LEAVE_CRITICAL_SECTION(cs_vSend); + return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize(); } |