diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 1023 |
1 files changed, 429 insertions, 594 deletions
diff --git a/src/net.cpp b/src/net.cpp index f83f39a67d..1335804b06 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto -// Copyright (c) 2009-2018 The Bitcoin Core developers +// Copyright (c) 2009-2019 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -9,6 +9,7 @@ #include <net.h> +#include <banman.h> #include <chainparams.h> #include <clientversion.h> #include <consensus/consensus.h> @@ -18,7 +19,7 @@ #include <netbase.h> #include <scheduler.h> #include <ui_interface.h> -#include <utilstrencodings.h> +#include <util/strencodings.h> #ifdef WIN32 #include <string.h> @@ -26,6 +27,10 @@ #include <fcntl.h> #endif +#ifdef USE_POLL +#include <poll.h> +#endif + #ifdef USE_UPNP #include <miniupnpc/miniupnpc.h> #include <miniupnpc/miniwget.h> @@ -33,11 +38,12 @@ #include <miniupnpc/upnperrors.h> #endif +#include <unordered_map> #include <math.h> -// Dump addresses to peers.dat and banlist.dat every 15 minutes (900s) -#define DUMP_ADDRESSES_INTERVAL 900 +// Dump addresses to peers.dat every 15 minutes (900s) +static constexpr int DUMP_PEERS_INTERVAL = 15 * 60; // We add a random period time (0 to 1 seconds) to feeler connections to prevent synchronization. #define FEELER_SLEEP_WINDOW 1 @@ -52,17 +58,6 @@ #define MSG_DONTWAIT 0 #endif -// Fix for ancient MinGW versions, that don't have defined these in ws2tcpip.h. -// Todo: Can be removed when our pull-tester is upgraded to a modern MinGW version. -#ifdef WIN32 -#ifndef PROTECTION_LEVEL_UNRESTRICTED -#define PROTECTION_LEVEL_UNRESTRICTED 10 -#endif -#ifndef IPV6_PROTECTION_LEVEL -#define IPV6_PROTECTION_LEVEL 23 -#endif -#endif - /** Used to pass flags to the Bind() function */ enum BindFlags { BF_NONE = 0, @@ -71,7 +66,11 @@ enum BindFlags { BF_WHITELIST = (1U << 2), }; -const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*"; +// The set of sockets cannot be modified while waiting +// The sleep time needs to be small to avoid new sockets stalling +static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50; + +const std::string NET_MESSAGE_COMMAND_OTHER = "*other*"; static const uint64_t RANDOMIZER_ID_NETGROUP = 0x6c0edd8036ef4036ULL; // SHA256("netgroup")[0:8] static const uint64_t RANDOMIZER_ID_LOCALHOSTNONCE = 0xd93e69e2bbfa5735ULL; // SHA256("localhostnonce")[0:8] @@ -82,12 +81,10 @@ bool fDiscover = true; bool fListen = true; bool fRelayTxes = true; CCriticalSection cs_mapLocalHost; -std::map<CNetAddr, LocalServiceInfo> mapLocalHost; -static bool vfLimited[NET_MAX] = {}; +std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(cs_mapLocalHost); +static bool vfLimited[NET_MAX] GUARDED_BY(cs_mapLocalHost) = {}; std::string strSubVersion; -limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ); - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -134,11 +131,12 @@ static std::vector<CAddress> convertSeed6(const std::vector<SeedSpec6> &vSeedsIn const int64_t nOneWeek = 7*24*60*60; std::vector<CAddress> vSeedsOut; vSeedsOut.reserve(vSeedsIn.size()); + FastRandomContext rng; for (const auto& seed_in : vSeedsIn) { struct in6_addr ip; memcpy(&ip, seed_in.addr, sizeof(ip)); CAddress addr(CService(ip, seed_in.port), GetDesirableServiceFlags(NODE_NONE)); - addr.nTime = GetTime() - GetRand(nOneWeek) - nOneWeek; + addr.nTime = GetTime() - rng.randrange(nOneWeek) - nOneWeek; vSeedsOut.push_back(addr); } return vSeedsOut; @@ -163,8 +161,7 @@ CAddress GetLocalAddress(const CNetAddr *paddrPeer, ServiceFlags nLocalServices) static int GetnScore(const CService& addr) { LOCK(cs_mapLocalHost); - if (mapLocalHost.count(addr) == LOCAL_NONE) - return 0; + if (mapLocalHost.count(addr) == 0) return 0; return mapLocalHost[addr].nScore; } @@ -173,7 +170,7 @@ bool IsPeerAddrLocalGood(CNode *pnode) { CService addrLocal = pnode->GetAddrLocal(); return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() && - !IsLimited(addrLocal.GetNetwork()); + IsReachable(addrLocal.GetNetwork()); } // pushes our own address to a peer @@ -189,16 +186,16 @@ void AdvertiseLocal(CNode *pnode) // If discovery is enabled, sometimes give our peer the address it // tells us that it sees us as in case it has a better idea of our // address than we do. + FastRandomContext rng; if (IsPeerAddrLocalGood(pnode) && (!addrLocal.IsRoutable() || - GetRand((GetnScore(addrLocal) > LOCAL_MANUAL) ? 8:2) == 0)) + rng.randbits((GetnScore(addrLocal) > LOCAL_MANUAL) ? 3 : 1) == 0)) { addrLocal.SetIP(pnode->GetAddrLocal()); } if (addrLocal.IsRoutable() || gArgs.GetBoolArg("-addrmantest", false)) { LogPrint(BCLog::NET, "AdvertiseLocal: advertising address %s\n", addrLocal.ToString()); - FastRandomContext insecure_rand; - pnode->PushAddress(addrLocal, insecure_rand); + pnode->PushAddress(addrLocal, rng); } } } @@ -212,7 +209,7 @@ bool AddLocal(const CService& addr, int nScore) if (!fDiscover && nScore < LOCAL_MANUAL) return false; - if (IsLimited(addr)) + if (!IsReachable(addr)) return false; LogPrintf("AddLocal(%s,%i)\n", addr.ToString(), nScore); @@ -242,24 +239,23 @@ void RemoveLocal(const CService& addr) mapLocalHost.erase(addr); } -/** Make a particular network entirely off-limits (no automatic connects to it) */ -void SetLimited(enum Network net, bool fLimited) +void SetReachable(enum Network net, bool reachable) { if (net == NET_UNROUTABLE || net == NET_INTERNAL) return; LOCK(cs_mapLocalHost); - vfLimited[net] = fLimited; + vfLimited[net] = !reachable; } -bool IsLimited(enum Network net) +bool IsReachable(enum Network net) { LOCK(cs_mapLocalHost); - return vfLimited[net]; + return !vfLimited[net]; } -bool IsLimited(const CNetAddr &addr) +bool IsReachable(const CNetAddr &addr) { - return IsLimited(addr.GetNetwork()); + return IsReachable(addr.GetNetwork()); } /** vote for a local address */ @@ -282,21 +278,6 @@ bool IsLocal(const CService& addr) return mapLocalHost.count(addr) > 0; } -/** check whether a given network is one we can probably connect to */ -bool IsReachable(enum Network net) -{ - LOCK(cs_mapLocalHost); - return !vfLimited[net]; -} - -/** check whether a given address is in a network we can probably connect to */ -bool IsReachable(const CNetAddr& addr) -{ - enum Network net = addr.GetNetwork(); - return IsReachable(net); -} - - CNode* CConnman::FindNode(const CNetAddr& ip) { LOCK(cs_vNodes); @@ -463,26 +444,6 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo return pnode; } -void CConnman::DumpBanlist() -{ - SweepBanned(); // clean unused entries (if bantime has expired) - - if (!BannedSetIsDirty()) - return; - - int64_t nStart = GetTimeMillis(); - - CBanDB bandb; - banmap_t banmap; - GetBanned(banmap); - if (bandb.Write(banmap)) { - SetBannedSetDirty(false); - } - - LogPrint(BCLog::NET, "Flushed %d banned node ips/subnets to banlist.dat %dms\n", - banmap.size(), GetTimeMillis() - nStart); -} - void CNode::CloseSocketDisconnect() { fDisconnect = true; @@ -494,157 +455,6 @@ void CNode::CloseSocketDisconnect() } } -void CConnman::ClearBanned() -{ - { - LOCK(cs_setBanned); - setBanned.clear(); - setBannedIsDirty = true; - } - DumpBanlist(); //store banlist to disk - if(clientInterface) - clientInterface->BannedListChanged(); -} - -bool CConnman::IsBanned(CNetAddr ip) -{ - LOCK(cs_setBanned); - for (const auto& it : setBanned) { - CSubNet subNet = it.first; - CBanEntry banEntry = it.second; - - if (subNet.Match(ip) && GetTime() < banEntry.nBanUntil) { - return true; - } - } - return false; -} - -bool CConnman::IsBanned(CSubNet subnet) -{ - LOCK(cs_setBanned); - banmap_t::iterator i = setBanned.find(subnet); - if (i != setBanned.end()) - { - CBanEntry banEntry = (*i).second; - if (GetTime() < banEntry.nBanUntil) { - return true; - } - } - return false; -} - -void CConnman::Ban(const CNetAddr& addr, const BanReason &banReason, int64_t bantimeoffset, bool sinceUnixEpoch) { - CSubNet subNet(addr); - Ban(subNet, banReason, bantimeoffset, sinceUnixEpoch); -} - -void CConnman::Ban(const CSubNet& subNet, const BanReason &banReason, int64_t bantimeoffset, bool sinceUnixEpoch) { - CBanEntry banEntry(GetTime()); - banEntry.banReason = banReason; - if (bantimeoffset <= 0) - { - bantimeoffset = gArgs.GetArg("-bantime", DEFAULT_MISBEHAVING_BANTIME); - sinceUnixEpoch = false; - } - banEntry.nBanUntil = (sinceUnixEpoch ? 0 : GetTime() )+bantimeoffset; - - { - LOCK(cs_setBanned); - if (setBanned[subNet].nBanUntil < banEntry.nBanUntil) { - setBanned[subNet] = banEntry; - setBannedIsDirty = true; - } - else - return; - } - if(clientInterface) - clientInterface->BannedListChanged(); - { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { - if (subNet.Match(static_cast<CNetAddr>(pnode->addr))) - pnode->fDisconnect = true; - } - } - if(banReason == BanReasonManuallyAdded) - DumpBanlist(); //store banlist to disk immediately if user requested ban -} - -bool CConnman::Unban(const CNetAddr &addr) { - CSubNet subNet(addr); - return Unban(subNet); -} - -bool CConnman::Unban(const CSubNet &subNet) { - { - LOCK(cs_setBanned); - if (!setBanned.erase(subNet)) - return false; - setBannedIsDirty = true; - } - if(clientInterface) - clientInterface->BannedListChanged(); - DumpBanlist(); //store banlist to disk immediately - return true; -} - -void CConnman::GetBanned(banmap_t &banMap) -{ - LOCK(cs_setBanned); - // Sweep the banlist so expired bans are not returned - SweepBanned(); - banMap = setBanned; //create a thread safe copy -} - -void CConnman::SetBanned(const banmap_t &banMap) -{ - LOCK(cs_setBanned); - setBanned = banMap; - setBannedIsDirty = true; -} - -void CConnman::SweepBanned() -{ - int64_t now = GetTime(); - bool notifyUI = false; - { - LOCK(cs_setBanned); - banmap_t::iterator it = setBanned.begin(); - while(it != setBanned.end()) - { - CSubNet subNet = (*it).first; - CBanEntry banEntry = (*it).second; - if(now > banEntry.nBanUntil) - { - setBanned.erase(it++); - setBannedIsDirty = true; - notifyUI = true; - LogPrint(BCLog::NET, "%s: Removed banned node ip/subnet from banlist.dat: %s\n", __func__, subNet.ToString()); - } - else - ++it; - } - } - // update UI - if(notifyUI && clientInterface) { - clientInterface->BannedListChanged(); - } -} - -bool CConnman::BannedSetIsDirty() -{ - LOCK(cs_setBanned); - return setBannedIsDirty; -} - -void CConnman::SetBannedSetDirty(bool dirty) -{ - LOCK(cs_setBanned); //reuse setBanned lock for the isDirty flag - setBannedIsDirty = dirty; -} - - bool CConnman::IsWhitelistedRange(const CNetAddr &addr) { for (const CSubNet& subnet : vWhitelistedRange) { if (subnet.Match(addr)) @@ -715,7 +525,10 @@ void CNode::copyStats(CNodeStats &stats) X(nRecvBytes); } X(fWhitelisted); - X(minFeeFilter); + { + LOCK(cs_feeFilter); + X(minFeeFilter); + } // It is common for nodes with good ping times to suddenly become lagged, // due to a new block arriving or other large transfer. @@ -774,7 +587,6 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete nBytes -= handled; if (msg.complete()) { - //store received bytes per message command //to prevent a memory DOS, only allow valid commands mapMsgCmdSize::iterator i = mapRecvBytesPerMsgCmd.find(msg.hdr.pchCommand); @@ -874,16 +686,7 @@ const uint256& CNetMessage::GetMessageHash() const return data_hash; } - - - - - - - - -// requires LOCK(cs_vSend) -size_t CConnman::SocketSendData(CNode *pnode) const +size_t CConnman::SocketSendData(CNode *pnode) const EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_vSend) { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -947,6 +750,7 @@ struct NodeEvictionCandidate bool fBloomFilter; CAddress addr; uint64_t nKeyedNetGroup; + bool prefer_evict; }; static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) @@ -1011,10 +815,12 @@ bool CConnman::AttemptToEvictConnection() continue; if (node->fDisconnect) continue; + LOCK(node->cs_filter); NodeEvictionCandidate candidate = {node->GetId(), node->nTimeConnected, node->nMinPingUsecTime, node->nLastBlockTime, node->nLastTXTime, HasAllDesirableServiceFlags(node->nServices), - node->fRelayTxes, node->pfilter != nullptr, node->addr, node->nKeyedNetGroup}; + node->fRelayTxes, node->pfilter != nullptr, node->addr, node->nKeyedNetGroup, + node->m_prefer_evict}; vEvictionCandidates.push_back(candidate); } } @@ -1039,6 +845,14 @@ bool CConnman::AttemptToEvictConnection() if (vEvictionCandidates.empty()) return false; + // If any remaining peers are preferred for eviction consider only them. + // This happens after the other preferences since if a peer is really the best by other criteria (esp relaying blocks) + // then we probably don't want to evict it no matter what. + if (std::any_of(vEvictionCandidates.begin(),vEvictionCandidates.end(),[](NodeEvictionCandidate const &n){return n.prefer_evict;})) { + vEvictionCandidates.erase(std::remove_if(vEvictionCandidates.begin(),vEvictionCandidates.end(), + [](NodeEvictionCandidate const &n){return !n.prefer_evict;}),vEvictionCandidates.end()); + } + // Identify the network group with the most connections and youngest member. // (vEvictionCandidates is already sorted by reverse connect time) uint64_t naMostConnections; @@ -1119,7 +933,11 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { // on all platforms. Set it again here just to be sure. SetSocketNoDelay(hSocket); - if (IsBanned(addr) && !whitelisted) + int bannedlevel = m_banman ? m_banman->IsBannedLevel(addr) : 0; + + // Don't accept connections from banned peers, but if our inbound slots aren't almost full, accept + // if the only banning reason was an automatic misbehavior ban. + if (!whitelisted && bannedlevel > ((nInbound + 1 < nMaxInbound) ? 1 : 0)) { LogPrint(BCLog::NET, "connection from %s dropped (banned)\n", addr.ToString()); CloseSocket(hSocket); @@ -1143,6 +961,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; + pnode->m_prefer_evict = bannedlevel > 0; m_msgproc->InitializeNode(pnode); LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); @@ -1153,310 +972,409 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { } } -void CConnman::ThreadSocketHandler() +void CConnman::DisconnectNodes() { - unsigned int nPrevNodeCount = 0; - while (!interruptNet) { - // - // Disconnect nodes - // - { - LOCK(cs_vNodes); + LOCK(cs_vNodes); - if (!fNetworkActive) { - // Disconnect any connected nodes - for (CNode* pnode : vNodes) { - if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); - pnode->fDisconnect = true; - } + if (!fNetworkActive) { + // Disconnect any connected nodes + for (CNode* pnode : vNodes) { + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); + pnode->fDisconnect = true; } } + } - // Disconnect unused nodes - std::vector<CNode*> vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) + // Disconnect unused nodes + std::vector<CNode*> vNodesCopy = vNodes; + for (CNode* pnode : vNodesCopy) + { + if (pnode->fDisconnect) { - if (pnode->fDisconnect) - { - // remove from vNodes - vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); + // remove from vNodes + vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); - // release outbound grant (if any) - pnode->grantOutbound.Release(); + // release outbound grant (if any) + pnode->grantOutbound.Release(); - // close socket and cleanup - pnode->CloseSocketDisconnect(); + // close socket and cleanup + pnode->CloseSocketDisconnect(); - // hold in disconnected pool until all refs are released - pnode->Release(); - vNodesDisconnected.push_back(pnode); - } + // hold in disconnected pool until all refs are released + pnode->Release(); + vNodesDisconnected.push_back(pnode); } } + } + { + // Delete disconnected nodes + std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected; + for (CNode* pnode : vNodesDisconnectedCopy) { - // Delete disconnected nodes - std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected; - for (CNode* pnode : vNodesDisconnectedCopy) - { - // wait until threads are done using it - if (pnode->GetRefCount() <= 0) { - bool fDelete = false; - { - TRY_LOCK(pnode->cs_inventory, lockInv); - if (lockInv) { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) { - fDelete = true; - } + // wait until threads are done using it + if (pnode->GetRefCount() <= 0) { + bool fDelete = false; + { + TRY_LOCK(pnode->cs_inventory, lockInv); + if (lockInv) { + TRY_LOCK(pnode->cs_vSend, lockSend); + if (lockSend) { + fDelete = true; } } - if (fDelete) { - vNodesDisconnected.remove(pnode); - DeleteNode(pnode); - } + } + if (fDelete) { + vNodesDisconnected.remove(pnode); + DeleteNode(pnode); } } } - size_t vNodesSize; + } +} + +void CConnman::NotifyNumConnectionsChanged() +{ + size_t vNodesSize; + { + LOCK(cs_vNodes); + vNodesSize = vNodes.size(); + } + if(vNodesSize != nPrevNodeCount) { + nPrevNodeCount = vNodesSize; + if(clientInterface) + clientInterface->NotifyNumConnectionsChanged(vNodesSize); + } +} + +void CConnman::InactivityCheck(CNode *pnode) +{ + int64_t nTime = GetSystemTimeInSeconds(); + if (nTime - pnode->nTimeConnected > m_peer_connect_timeout) + { + if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) { - LOCK(cs_vNodes); - vNodesSize = vNodes.size(); + LogPrint(BCLog::NET, "socket no message in first %i seconds, %d %d from %d\n", m_peer_connect_timeout, pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId()); + pnode->fDisconnect = true; } - if(vNodesSize != nPrevNodeCount) { - nPrevNodeCount = vNodesSize; - if(clientInterface) - clientInterface->NotifyNumConnectionsChanged(vNodesSize); + else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) + { + LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend); + pnode->fDisconnect = true; } - - // - // Find which sockets have data to receive - // - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 50000; // frequency to poll pnode->vSend - - fd_set fdsetRecv; - fd_set fdsetSend; - fd_set fdsetError; - FD_ZERO(&fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - SOCKET hSocketMax = 0; - bool have_fds = false; - - for (const ListenSocket& hListenSocket : vhListenSocket) { - FD_SET(hListenSocket.socket, &fdsetRecv); - hSocketMax = std::max(hSocketMax, hListenSocket.socket); - have_fds = true; + else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60)) + { + LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv); + pnode->fDisconnect = true; + } + else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros()) + { + LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart)); + pnode->fDisconnect = true; } + else if (!pnode->fSuccessfullyConnected) + { + LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId()); + pnode->fDisconnect = true; + } + } +} +bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set) +{ + for (const ListenSocket& hListenSocket : vhListenSocket) { + recv_set.insert(hListenSocket.socket); + } + + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) + // Implement the following logic: + // * If there is data to send, select() for sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. + + bool select_recv = !pnode->fPauseRecv; + bool select_send; { - // Implement the following logic: - // * If there is data to send, select() for sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer, select() for - // receiving data. - // * Hand off all complete messages to the processor, to be handled without - // blocking here. - - bool select_recv = !pnode->fPauseRecv; - bool select_send; - { - LOCK(pnode->cs_vSend); - select_send = !pnode->vSendMsg.empty(); - } - - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; + LOCK(pnode->cs_vSend); + select_send = !pnode->vSendMsg.empty(); + } - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = std::max(hSocketMax, pnode->hSocket); - have_fds = true; + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; - if (select_send) { - FD_SET(pnode->hSocket, &fdsetSend); - continue; - } - if (select_recv) { - FD_SET(pnode->hSocket, &fdsetRecv); - } + error_set.insert(pnode->hSocket); + if (select_send) { + send_set.insert(pnode->hSocket); + continue; + } + if (select_recv) { + recv_set.insert(pnode->hSocket); } } + } - int nSelect = select(have_fds ? hSocketMax + 1 : 0, - &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - if (interruptNet) + return !recv_set.empty() || !send_set.empty() || !error_set.empty(); +} + +#ifdef USE_POLL +void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set) +{ + std::set<SOCKET> recv_select_set, send_select_set, error_select_set; + if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { + interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); + return; + } + + std::unordered_map<SOCKET, struct pollfd> pollfds; + for (SOCKET socket_id : recv_select_set) { + pollfds[socket_id].fd = socket_id; + pollfds[socket_id].events |= POLLIN; + } + + for (SOCKET socket_id : send_select_set) { + pollfds[socket_id].fd = socket_id; + pollfds[socket_id].events |= POLLOUT; + } + + for (SOCKET socket_id : error_select_set) { + pollfds[socket_id].fd = socket_id; + // These flags are ignored, but we set them for clarity + pollfds[socket_id].events |= POLLERR|POLLHUP; + } + + std::vector<struct pollfd> vpollfds; + vpollfds.reserve(pollfds.size()); + for (auto it : pollfds) { + vpollfds.push_back(std::move(it.second)); + } + + if (poll(vpollfds.data(), vpollfds.size(), SELECT_TIMEOUT_MILLISECONDS) < 0) return; + + if (interruptNet) return; + + for (struct pollfd pollfd_entry : vpollfds) { + if (pollfd_entry.revents & POLLIN) recv_set.insert(pollfd_entry.fd); + if (pollfd_entry.revents & POLLOUT) send_set.insert(pollfd_entry.fd); + if (pollfd_entry.revents & (POLLERR|POLLHUP)) error_set.insert(pollfd_entry.fd); + } +} +#else +void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set) +{ + std::set<SOCKET> recv_select_set, send_select_set, error_select_set; + if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { + interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); + return; + } + + // + // Find which sockets have data to receive + // + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend + + fd_set fdsetRecv; + fd_set fdsetSend; + fd_set fdsetError; + FD_ZERO(&fdsetRecv); + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + SOCKET hSocketMax = 0; + + for (SOCKET hSocket : recv_select_set) { + FD_SET(hSocket, &fdsetRecv); + hSocketMax = std::max(hSocketMax, hSocket); + } + + for (SOCKET hSocket : send_select_set) { + FD_SET(hSocket, &fdsetSend); + hSocketMax = std::max(hSocketMax, hSocket); + } + + for (SOCKET hSocket : error_select_set) { + FD_SET(hSocket, &fdsetError); + hSocketMax = std::max(hSocketMax, hSocket); + } + + int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); + + if (interruptNet) + return; + + if (nSelect == SOCKET_ERROR) + { + int nErr = WSAGetLastError(); + LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); + for (unsigned int i = 0; i <= hSocketMax; i++) + FD_SET(i, &fdsetRecv); + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + if (!interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS))) return; + } - if (nSelect == SOCKET_ERROR) - { - if (have_fds) - { - int nErr = WSAGetLastError(); - LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); - for (unsigned int i = 0; i <= hSocketMax; i++) - FD_SET(i, &fdsetRecv); - } - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) - return; + for (SOCKET hSocket : recv_select_set) { + if (FD_ISSET(hSocket, &fdsetRecv)) { + recv_set.insert(hSocket); } + } - // - // Accept new connections - // - for (const ListenSocket& hListenSocket : vhListenSocket) + for (SOCKET hSocket : send_select_set) { + if (FD_ISSET(hSocket, &fdsetSend)) { + send_set.insert(hSocket); + } + } + + for (SOCKET hSocket : error_select_set) { + if (FD_ISSET(hSocket, &fdsetError)) { + error_set.insert(hSocket); + } + } +} +#endif + +void CConnman::SocketHandler() +{ + std::set<SOCKET> recv_set, send_set, error_set; + SocketEvents(recv_set, send_set, error_set); + + if (interruptNet) return; + + // + // Accept new connections + // + for (const ListenSocket& hListenSocket : vhListenSocket) + { + if (hListenSocket.socket != INVALID_SOCKET && recv_set.count(hListenSocket.socket) > 0) { - if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) - { - AcceptConnection(hListenSocket); - } + AcceptConnection(hListenSocket); } + } + + // + // Service each socket + // + std::vector<CNode*> vNodesCopy; + { + LOCK(cs_vNodes); + vNodesCopy = vNodes; + for (CNode* pnode : vNodesCopy) + pnode->AddRef(); + } + for (CNode* pnode : vNodesCopy) + { + if (interruptNet) + return; // - // Service each socket + // Receive // - std::vector<CNode*> vNodesCopy; + bool recvSet = false; + bool sendSet = false; + bool errorSet = false; { - LOCK(cs_vNodes); - vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) - pnode->AddRef(); + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + recvSet = recv_set.count(pnode->hSocket) > 0; + sendSet = send_set.count(pnode->hSocket) > 0; + errorSet = error_set.count(pnode->hSocket) > 0; } - for (CNode* pnode : vNodesCopy) + if (recvSet || errorSet) { - if (interruptNet) - return; - - // - // Receive - // - bool recvSet = false; - bool sendSet = false; - bool errorSet = false; + // typical socket buffer is 8K-64K + char pchBuf[0x10000]; + int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; - recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); - sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); - errorSet = FD_ISSET(pnode->hSocket, &fdsetError); + nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); } - if (recvSet || errorSet) + if (nBytes > 0) { - // typical socket buffer is 8K-64K - char pchBuf[0x10000]; - int nBytes = 0; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; - nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); - } - if (nBytes > 0) - { - bool notify = false; - if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) - pnode->CloseSocketDisconnect(); - RecordBytesRecv(nBytes); - if (notify) { - size_t nSizeAdded = 0; - auto it(pnode->vRecvMsg.begin()); - for (; it != pnode->vRecvMsg.end(); ++it) { - if (!it->complete()) - break; - nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; - } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; - } - WakeMessageHandler(); - } - } - else if (nBytes == 0) - { - // socket closed gracefully - if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "socket closed\n"); - } + bool notify = false; + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } { - if (!pnode->fDisconnect) - LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); - pnode->CloseSocketDisconnect(); + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } + WakeMessageHandler(); } } - - // - // Send - // - if (sendSet) + else if (nBytes == 0) { - LOCK(pnode->cs_vSend); - size_t nBytes = SocketSendData(pnode); - if (nBytes) { - RecordBytesSent(nBytes); + // socket closed gracefully + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "socket closed\n"); } + pnode->CloseSocketDisconnect(); } - - // - // Inactivity checking - // - int64_t nTime = GetSystemTimeInSeconds(); - if (nTime - pnode->nTimeConnected > 60) + else if (nBytes < 0) { - if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) - { - LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId()); - pnode->fDisconnect = true; - } - else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) - { - LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend); - pnode->fDisconnect = true; - } - else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60)) - { - LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv); - pnode->fDisconnect = true; - } - else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros()) - { - LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart)); - pnode->fDisconnect = true; - } - else if (!pnode->fSuccessfullyConnected) + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { - LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId()); - pnode->fDisconnect = true; + if (!pnode->fDisconnect) + LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); + pnode->CloseSocketDisconnect(); } } } + + // + // Send + // + if (sendSet) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodesCopy) - pnode->Release(); + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); + } } + + InactivityCheck(pnode); + } + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodesCopy) + pnode->Release(); + } +} + +void CConnman::ThreadSocketHandler() +{ + while (!interruptNet) + { + DisconnectNodes(); + NotifyNumConnectionsChanged(); + SocketHandler(); } } @@ -1688,12 +1606,6 @@ void CConnman::DumpAddresses() addrman.size(), GetTimeMillis() - nStart); } -void CConnman::DumpData() -{ - DumpAddresses(); - DumpBanlist(); -} - void CConnman::ProcessOneShot() { std::string strDest; @@ -1853,9 +1765,15 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) addr = addrman.Select(fFeeler); } - // if we selected an invalid address, restart - if (!addr.IsValid() || setConnected.count(addr.GetGroup()) || IsLocal(addr)) + // Require outbound connections, other than feelers, to be to distinct network groups + if (!fFeeler && setConnected.count(addr.GetGroup())) { break; + } + + // if we selected an invalid or local address, restart + if (!addr.IsValid() || IsLocal(addr)) { + break; + } // If we didn't find an appropriate destination after trying 100 addresses fetched from addrman, // stop this loop, and let the outer loop run again (which sleeps, adds seed nodes, recalculates @@ -1864,7 +1782,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) if (nTries > 100) break; - if (IsLimited(addr)) + if (!IsReachable(addr)) continue; // only consider very recently tried nodes after 30 failed attempts @@ -1998,7 +1916,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai } if (!pszDest) { if (IsLocal(addrConnect) || - FindNode(static_cast<CNetAddr>(addrConnect)) || IsBanned(addrConnect) || + FindNode(static_cast<CNetAddr>(addrConnect)) || (m_banman && m_banman->IsBanned(addrConnect)) || FindNode(addrConnect.ToStringIPPort())) return; } else if (FindNode(std::string(pszDest))) @@ -2213,13 +2131,6 @@ void CConnman::SetNetworkActive(bool active) CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSeed1(nSeed1In) { - fNetworkActive = true; - setBannedIsDirty = false; - fAddressesInitialized = false; - nLastNodeId = 0; - nSendBufferMaxSize = 0; - nReceiveFloodSize = 0; - flagInterruptMsgProc = false; SetTryNewOutboundPeer(false); Options connOptions; @@ -2233,7 +2144,7 @@ NodeId CConnman::GetNewNodeId() bool CConnman::Bind(const CService &addr, unsigned int flags) { - if (!(flags & BF_EXPLICIT) && IsLimited(addr)) + if (!(flags & BF_EXPLICIT) && !IsReachable(addr)) return false; std::string strError; if (!BindListenPort(addr, strError, (flags & BF_WHITELIST) != 0)) { @@ -2306,24 +2217,6 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) DumpAddresses(); } } - if (clientInterface) - clientInterface->InitMessage(_("Loading banlist...")); - // Load addresses from banlist.dat - nStart = GetTimeMillis(); - CBanDB bandb; - banmap_t banmap; - if (bandb.Read(banmap)) { - SetBanned(banmap); // thread save setter - SetBannedSetDirty(false); // no need to write down, just read data - SweepBanned(); // sweep out unused entries - - LogPrint(BCLog::NET, "Loaded %d banned node ips/subnets from banlist.dat %dms\n", - banmap.size(), GetTimeMillis() - nStart); - } else { - LogPrintf("Invalid or missing banlist.dat; recreating\n"); - SetBannedSetDirty(true); // force write - DumpBanlist(); - } uiInterface.InitMessage(_("Starting network threads...")); @@ -2377,7 +2270,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses - scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000); + scheduler.scheduleEvery(std::bind(&CConnman::DumpAddresses, this), DUMP_PEERS_INTERVAL * 1000); return true; } @@ -2436,7 +2329,7 @@ void CConnman::Stop() if (fAddressesInitialized) { - DumpData(); + DumpAddresses(); fAddressesInitialized = false; } @@ -2563,6 +2456,25 @@ bool CConnman::DisconnectNode(const std::string& strNode) } return false; } + +bool CConnman::DisconnectNode(const CSubNet& subnet) +{ + bool disconnected = false; + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) { + if (subnet.Match(pnode->addr)) { + pnode->fDisconnect = true; + disconnected = true; + } + } + return disconnected; +} + +bool CConnman::DisconnectNode(const CNetAddr& addr) +{ + return DisconnectNode(CSubNet(addr)); +} + bool CConnman::DisconnectNode(NodeId id) { LOCK(cs_vNodes); @@ -2700,8 +2612,8 @@ int CConnman::GetBestHeight() const unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; } -CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string& addrNameIn, bool fInboundIn) : - nTimeConnected(GetSystemTimeInSeconds()), +CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress& addrBindIn, const std::string& addrNameIn, bool fInboundIn) + : nTimeConnected(GetSystemTimeInSeconds()), addr(addrIn), addrBind(addrBindIn), fInbound(fInboundIn), @@ -2711,56 +2623,13 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn id(idIn), nLocalHostNonce(nLocalHostNonceIn), nLocalServices(nLocalServicesIn), - nMyStartingHeight(nMyStartingHeightIn), - nSendVersion(0) + nMyStartingHeight(nMyStartingHeightIn) { - nServices = NODE_NONE; hSocket = hSocketIn; - nRecvVersion = INIT_PROTO_VERSION; - nLastSend = 0; - nLastRecv = 0; - nSendBytes = 0; - nRecvBytes = 0; - nTimeOffset = 0; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; - nVersion = 0; - strSubVer = ""; - fWhitelisted = false; - fOneShot = false; - m_manual_connection = false; - fClient = false; // set by version message - m_limited_node = false; // set by version message - fFeeler = false; - fSuccessfullyConnected = false; - fDisconnect = false; - nRefCount = 0; - nSendSize = 0; - nSendOffset = 0; hashContinue = uint256(); - nStartingHeight = -1; filterInventoryKnown.reset(); - fSendMempool = false; - fGetAddr = false; - nNextLocalAddrSend = 0; - nNextAddrSend = 0; - nNextInvSend = 0; - fRelayTxes = false; - fSentAddr = false; pfilter = MakeUnique<CBloomFilter>(); - timeLastMempoolReq = 0; - nLastBlockTime = 0; - nLastTXTime = 0; - nPingNonceSent = 0; - nPingUsecStart = 0; - nPingUsecTime = 0; - fPingQueued = false; - nMinPingUsecTime = std::numeric_limits<int64_t>::max(); - minFeeFilter = 0; - lastSentFeeFilter = 0; - nextSendTimeFeeFilter = 0; - fPauseRecv = false; - fPauseSend = false; - nProcessQueueSize = 0; for (const std::string &msg : getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; @@ -2778,40 +2647,6 @@ CNode::~CNode() CloseSocket(hSocket); } -void CNode::AskFor(const CInv& inv) -{ - if (mapAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ) - return; - // a peer may not have multiple non-responded queue positions for a single inv item - if (!setAskFor.insert(inv.hash).second) - return; - - // We're using mapAskFor as a priority queue, - // the key is the earliest time the request can be sent - int64_t nRequestTime; - limitedmap<uint256, int64_t>::const_iterator it = mapAlreadyAskedFor.find(inv.hash); - if (it != mapAlreadyAskedFor.end()) - nRequestTime = it->second; - else - nRequestTime = 0; - LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, FormatISO8601Time(nRequestTime/1000000), id); - - // Make sure not to reuse time indexes to keep things in the same order - int64_t nNow = GetTimeMicros() - 1000000; - static int64_t nLastTime; - ++nLastTime; - nNow = std::max(nNow, nLastTime); - nLastTime = nNow; - - // Each retry is 2 minutes after the last - nRequestTime = std::max(nRequestTime + 2 * 60 * 1000000, nNow); - if (it != mapAlreadyAskedFor.end()) - mapAlreadyAskedFor.update(it, nRequestTime); - else - mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime)); - mapAskFor.insert(std::make_pair(nRequestTime, inv)); -} - bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; |