diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 294 |
1 files changed, 124 insertions, 170 deletions
diff --git a/src/net.cpp b/src/net.cpp index 5ccedb52d5..836f48e769 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3,18 +3,16 @@ // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#include "headers.h" #include "irc.h" #include "db.h" #include "net.h" #include "init.h" #include "strlcpy.h" #include "addrman.h" +#include "ui_interface.h" #ifdef WIN32 #include <string.h> -#else -#include <netinet/in.h> #endif #ifdef USE_UPNP @@ -37,7 +35,7 @@ void ThreadOpenAddedConnections2(void* parg); void ThreadMapPort2(void* parg); #endif void ThreadDNSAddressSeed2(void* parg); -bool OpenNetworkConnection(const CAddress& addrConnect); +bool OpenNetworkConnection(const CAddress& addrConnect, bool fUseGrant = true); @@ -66,7 +64,7 @@ map<CInv, int64> mapAlreadyAskedFor; set<CNetAddr> setservAddNodeAddresses; CCriticalSection cs_setservAddNodeAddresses; - +static CSemaphore *semOutbound = NULL; unsigned short GetListenPort() { @@ -271,9 +269,11 @@ void ThreadGetMyExternalIP(void* parg) // setAddrKnown automatically filters any duplicate sends. CAddress addr(addrLocalHost); addr.nTime = GetAdjustedTime(); - CRITICAL_BLOCK(cs_vNodes) + { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) pnode->PushAddress(addr); + } } } } @@ -291,111 +291,12 @@ void AddressCurrentlyConnected(const CService& addr) -void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1) -{ - // If the dialog might get closed before the reply comes back, - // call this in the destructor so it doesn't get called after it's deleted. - CRITICAL_BLOCK(cs_vNodes) - { - BOOST_FOREACH(CNode* pnode, vNodes) - { - CRITICAL_BLOCK(pnode->cs_mapRequests) - { - for (map<uint256, CRequestTracker>::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();) - { - CRequestTracker& tracker = (*mi).second; - if (tracker.fn == fn && tracker.param1 == param1) - pnode->mapRequests.erase(mi++); - else - mi++; - } - } - } - } -} - - - - - - - -// -// Subscription methods for the broadcast and subscription system. -// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT. -// -// The subscription system uses a meet-in-the-middle strategy. -// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers -// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through. -// - -bool AnySubscribed(unsigned int nChannel) -{ - if (pnodeLocalHost->IsSubscribed(nChannel)) - return true; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->IsSubscribed(nChannel)) - return true; - return false; -} - -bool CNode::IsSubscribed(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return false; - return vfSubscribe[nChannel]; -} - -void CNode::Subscribe(unsigned int nChannel, unsigned int nHops) -{ - if (nChannel >= vfSubscribe.size()) - return; - - if (!AnySubscribed(nChannel)) - { - // Relay subscribe - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("subscribe", nChannel, nHops); - } - - vfSubscribe[nChannel] = true; -} - -void CNode::CancelSubscribe(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return; - - // Prevent from relaying cancel if wasn't subscribed - if (!vfSubscribe[nChannel]) - return; - vfSubscribe[nChannel] = false; - - if (!AnySubscribed(nChannel)) - { - // Relay subscription cancel - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("sub-cancel", nChannel); - } -} - - - - - - - CNode* FindNode(const CNetAddr& ip) { - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) if ((CNetAddr)pnode->addr == ip) return (pnode); @@ -405,8 +306,8 @@ CNode* FindNode(const CNetAddr& ip) CNode* FindNode(const CService& addr) { - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) if ((CService)pnode->addr == addr) return (pnode); @@ -460,8 +361,10 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout) pnode->AddRef(nTimeout); else pnode->AddRef(); - CRITICAL_BLOCK(cs_vNodes) + { + LOCK(cs_vNodes); vNodes.push_back(pnode); + } pnode->nTimeConnected = GetTime(); return pnode; @@ -488,13 +391,6 @@ void CNode::CloseSocketDisconnect() void CNode::Cleanup() { - // All of a nodes broadcasts and subscriptions are automatically torn down - // when it goes down, so a node has to stay up to keep its broadcast going. - - // Cancel subscriptions - for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++) - if (vfSubscribe[nChannel]) - CancelSubscribe(nChannel); } @@ -524,8 +420,8 @@ void CNode::ClearBanned() bool CNode::IsBanned(CNetAddr ip) { bool fResult = false; - CRITICAL_BLOCK(cs_setBanned) { + LOCK(cs_setBanned); std::map<CNetAddr, int64>::iterator i = setBanned.find(ip); if (i != setBanned.end()) { @@ -549,9 +445,11 @@ bool CNode::Misbehaving(int howmuch) if (nMisbehavior >= GetArg("-banscore", 100)) { int64 banTime = GetTime()+GetArg("-bantime", 60*60*24); // Default 24-hour ban - CRITICAL_BLOCK(cs_setBanned) + { + LOCK(cs_setBanned); if (setBanned[addr] < banTime) setBanned[addr] = banTime; + } CloseSocketDisconnect(); printf("Disconnected %s for misbehavior (score=%d)\n", addr.ToString().c_str(), nMisbehavior); return true; @@ -593,15 +491,15 @@ void ThreadSocketHandler2(void* parg) { printf("ThreadSocketHandler started\n"); list<CNode*> vNodesDisconnected; - int nPrevNodeCount = 0; + unsigned int nPrevNodeCount = 0; loop { // // Disconnect nodes // - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); // Disconnect unused nodes vector<CNode*> vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) @@ -612,6 +510,10 @@ void ThreadSocketHandler2(void* parg) // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); + if (pnode->fHasGrant) + semOutbound->post(); + pnode->fHasGrant = false; + // close socket and cleanup pnode->CloseSocketDisconnect(); pnode->Cleanup(); @@ -632,11 +534,23 @@ void ThreadSocketHandler2(void* parg) if (pnode->GetRefCount() <= 0) { bool fDelete = false; - TRY_CRITICAL_BLOCK(pnode->cs_vSend) - TRY_CRITICAL_BLOCK(pnode->cs_vRecv) - TRY_CRITICAL_BLOCK(pnode->cs_mapRequests) - TRY_CRITICAL_BLOCK(pnode->cs_inventory) - fDelete = true; + { + TRY_LOCK(pnode->cs_vSend, lockSend); + if (lockSend) + { + TRY_LOCK(pnode->cs_vRecv, lockRecv); + if (lockRecv) + { + TRY_LOCK(pnode->cs_mapRequests, lockReq); + if (lockReq) + { + TRY_LOCK(pnode->cs_inventory, lockInv); + if (lockInv) + fDelete = true; + } + } + } + } if (fDelete) { vNodesDisconnected.remove(pnode); @@ -674,8 +588,8 @@ void ThreadSocketHandler2(void* parg) hSocketMax = max(hSocketMax, hListenSocket); have_fds = true; } - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) { if (pnode->hSocket == INVALID_SOCKET) @@ -684,9 +598,11 @@ void ThreadSocketHandler2(void* parg) FD_SET(pnode->hSocket, &fdsetError); hSocketMax = max(hSocketMax, pnode->hSocket); have_fds = true; - TRY_CRITICAL_BLOCK(pnode->cs_vSend) - if (!pnode->vSend.empty()) + { + TRY_LOCK(pnode->cs_vSend, lockSend); + if (lockSend && !pnode->vSend.empty()) FD_SET(pnode->hSocket, &fdsetSend); + } } } @@ -725,10 +641,12 @@ void ThreadSocketHandler2(void* parg) if (hSocket != INVALID_SOCKET) addr = CAddress(sockaddr); - CRITICAL_BLOCK(cs_vNodes) + { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->fInbound) - nInbound++; + if (pnode->fInbound) + nInbound++; + } if (hSocket == INVALID_SOCKET) { @@ -737,9 +655,11 @@ void ThreadSocketHandler2(void* parg) } else if (nInbound >= GetArg("-maxconnections", 125) - MAX_OUTBOUND_CONNECTIONS) { - CRITICAL_BLOCK(cs_setservAddNodeAddresses) + { + LOCK(cs_setservAddNodeAddresses); if (!setservAddNodeAddresses.count(addr)) closesocket(hSocket); + } } else if (CNode::IsBanned(addr)) { @@ -751,8 +671,10 @@ void ThreadSocketHandler2(void* parg) printf("accepted connection %s\n", addr.ToString().c_str()); CNode* pnode = new CNode(hSocket, addr, true); pnode->AddRef(); - CRITICAL_BLOCK(cs_vNodes) + { + LOCK(cs_vNodes); vNodes.push_back(pnode); + } } } @@ -761,8 +683,8 @@ void ThreadSocketHandler2(void* parg) // Service each socket // vector<CNode*> vNodesCopy; - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) pnode->AddRef(); @@ -779,7 +701,8 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_CRITICAL_BLOCK(pnode->cs_vRecv) + TRY_LOCK(pnode->cs_vRecv, lockRecv); + if (lockRecv) { CDataStream& vRecv = pnode->vRecv; unsigned int nPos = vRecv.size(); @@ -828,7 +751,8 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetSend)) { - TRY_CRITICAL_BLOCK(pnode->cs_vSend) + TRY_LOCK(pnode->cs_vSend, lockSend); + if (lockSend) { CDataStream& vSend = pnode->vSend; if (!vSend.empty()) @@ -882,8 +806,8 @@ void ThreadSocketHandler2(void* parg) } } } - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodesCopy) pnode->Release(); } @@ -1271,7 +1195,7 @@ void ThreadOpenConnections2(void* parg) { CAddress addr(CService(strAddr, GetDefaultPort(), fAllowDNS)); if (addr.IsValid()) - OpenNetworkConnection(addr); + OpenNetworkConnection(addr, false); for (int i = 0; i < 10 && i < nLoop; i++) { Sleep(500); @@ -1286,32 +1210,18 @@ void ThreadOpenConnections2(void* parg) int64 nStart = GetTime(); loop { - int nOutbound = 0; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; Sleep(500); vnThreadsRunning[THREAD_OPENCONNECTIONS]++; if (fShutdown) return; - // Limit outbound connections - loop - { - nOutbound = 0; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (!pnode->fInbound) - nOutbound++; - int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS; - nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125)); - if (nOutbound < nMaxOutboundConnections) - break; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - Sleep(2000); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; - } + + vnThreadsRunning[THREAD_OPENCONNECTIONS]--; + semOutbound->wait(); + vnThreadsRunning[THREAD_OPENCONNECTIONS]++; + if (fShutdown) + return; // Add seed nodes if IRC isn't working bool fTOR = (fUseProxy && addrProxy.GetPort() == 9050); @@ -1341,12 +1251,17 @@ void ThreadOpenConnections2(void* parg) // Only connect out to one peer per network group (/16 for IPv4). // Do this here so we don't have to critsect vNodes inside mapAddresses critsect. + int nOutbound = 0; set<vector<unsigned char> > setConnected; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) + { + LOCK(cs_vNodes); + BOOST_FOREACH(CNode* pnode, vNodes) { if (!pnode->fInbound) { setConnected.insert(pnode->addr.GetGroup()); + nOutbound++; } + } + } int64 nANow = GetAdjustedTime(); @@ -1376,6 +1291,8 @@ void ThreadOpenConnections2(void* parg) if (addrConnect.IsValid()) OpenNetworkConnection(addrConnect); + else + semOutbound->post(); } } @@ -1412,9 +1329,11 @@ void ThreadOpenAddedConnections2(void* parg) if(Lookup(strAddNode.c_str(), vservNode, GetDefaultPort(), fAllowDNS, 0)) { vservAddressesToAdd.push_back(vservNode); - CRITICAL_BLOCK(cs_setservAddNodeAddresses) + { + LOCK(cs_setservAddNodeAddresses); BOOST_FOREACH(CService& serv, vservNode) setservAddNodeAddresses.insert(serv); + } } } loop @@ -1422,7 +1341,8 @@ void ThreadOpenAddedConnections2(void* parg) vector<vector<CService> > vservConnectAddresses = vservAddressesToAdd; // Attempt to connect to each IP for each addnode entry until at least one is successful per addnode entry // (keeping in mind that addnode entries can have many IPs if fAllowDNS) - CRITICAL_BLOCK(cs_vNodes) + { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodes) for (vector<vector<CService> >::iterator it = vservConnectAddresses.begin(); it != vservConnectAddresses.end(); it++) BOOST_FOREACH(CService& addrNode, *(it)) @@ -1432,8 +1352,10 @@ void ThreadOpenAddedConnections2(void* parg) it--; break; } + } BOOST_FOREACH(vector<CService>& vserv, vservConnectAddresses) { + semOutbound->wait(); OpenNetworkConnection(CAddress(*(vserv.begin()))); Sleep(500); if (fShutdown) @@ -1449,7 +1371,14 @@ void ThreadOpenAddedConnections2(void* parg) } } -bool OpenNetworkConnection(const CAddress& addrConnect) +bool static ReleaseGrant(bool fUseGrant) { + if (fUseGrant) + semOutbound->post(); + return false; +} + +// only call this function when semOutbound has been waited for +bool OpenNetworkConnection(const CAddress& addrConnect, bool fUseGrant) { // // Initiate outbound network connection @@ -1458,7 +1387,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect) return false; if ((CNetAddr)addrConnect == (CNetAddr)addrLocalHost || !addrConnect.IsIPv4() || FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect)) - return false; + return ReleaseGrant(fUseGrant); vnThreadsRunning[THREAD_OPENCONNECTIONS]--; CNode* pnode = ConnectNode(addrConnect); @@ -1466,7 +1395,13 @@ bool OpenNetworkConnection(const CAddress& addrConnect) if (fShutdown) return false; if (!pnode) - return false; + return ReleaseGrant(fUseGrant); + if (pnode->fHasGrant) { + // node already has connection grant, release the one that was passed to us + ReleaseGrant(fUseGrant); + } else { + pnode->fHasGrant = fUseGrant; + } pnode->fNetworkNode = true; return true; @@ -1505,8 +1440,8 @@ void ThreadMessageHandler2(void* parg) while (!fShutdown) { vector<CNode*> vNodesCopy; - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) pnode->AddRef(); @@ -1519,20 +1454,26 @@ void ThreadMessageHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { // Receive messages - TRY_CRITICAL_BLOCK(pnode->cs_vRecv) - ProcessMessages(pnode); + { + TRY_LOCK(pnode->cs_vRecv, lockRecv); + if (lockRecv) + ProcessMessages(pnode); + } if (fShutdown) return; // Send messages - TRY_CRITICAL_BLOCK(pnode->cs_vSend) - SendMessages(pnode, pnode == pnodeTrickle); + { + TRY_LOCK(pnode->cs_vSend, lockSend); + if (lockSend) + SendMessages(pnode, pnode == pnodeTrickle); + } if (fShutdown) return; } - CRITICAL_BLOCK(cs_vNodes) { + LOCK(cs_vNodes); BOOST_FOREACH(CNode* pnode, vNodesCopy) pnode->Release(); } @@ -1637,6 +1578,12 @@ bool BindListenPort(string& strError) void StartNode(void* parg) { + if (semOutbound == NULL) { + // initialize semaphore + int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125)); + semOutbound = new CSemaphore(nMaxOutbound); + } + #ifdef USE_UPNP #if USE_UPNP fUseUPnP = GetBoolArg("-upnp", true); @@ -1655,12 +1602,16 @@ void StartNode(void* parg) { vector<CNetAddr> vaddr; if (LookupHost(pszHostName, vaddr)) + { BOOST_FOREACH (const CNetAddr &addr, vaddr) + { if (!addr.IsLocal()) { addrLocalHost.SetIP(addr); break; } + } + } } #else // Get local host IP @@ -1759,6 +1710,9 @@ bool StopNode() fShutdown = true; nTransactionsUpdated++; int64 nStart = GetTime(); + if (semOutbound) + for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++) + semOutbound->post(); do { int nThreadsRunning = 0; |