aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp294
1 files changed, 124 insertions, 170 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 5ccedb52d5..836f48e769 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -3,18 +3,16 @@
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
-#include "headers.h"
#include "irc.h"
#include "db.h"
#include "net.h"
#include "init.h"
#include "strlcpy.h"
#include "addrman.h"
+#include "ui_interface.h"
#ifdef WIN32
#include <string.h>
-#else
-#include <netinet/in.h>
#endif
#ifdef USE_UPNP
@@ -37,7 +35,7 @@ void ThreadOpenAddedConnections2(void* parg);
void ThreadMapPort2(void* parg);
#endif
void ThreadDNSAddressSeed2(void* parg);
-bool OpenNetworkConnection(const CAddress& addrConnect);
+bool OpenNetworkConnection(const CAddress& addrConnect, bool fUseGrant = true);
@@ -66,7 +64,7 @@ map<CInv, int64> mapAlreadyAskedFor;
set<CNetAddr> setservAddNodeAddresses;
CCriticalSection cs_setservAddNodeAddresses;
-
+static CSemaphore *semOutbound = NULL;
unsigned short GetListenPort()
{
@@ -271,9 +269,11 @@ void ThreadGetMyExternalIP(void* parg)
// setAddrKnown automatically filters any duplicate sends.
CAddress addr(addrLocalHost);
addr.nTime = GetAdjustedTime();
- CRITICAL_BLOCK(cs_vNodes)
+ {
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
pnode->PushAddress(addr);
+ }
}
}
}
@@ -291,111 +291,12 @@ void AddressCurrentlyConnected(const CService& addr)
-void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1)
-{
- // If the dialog might get closed before the reply comes back,
- // call this in the destructor so it doesn't get called after it's deleted.
- CRITICAL_BLOCK(cs_vNodes)
- {
- BOOST_FOREACH(CNode* pnode, vNodes)
- {
- CRITICAL_BLOCK(pnode->cs_mapRequests)
- {
- for (map<uint256, CRequestTracker>::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();)
- {
- CRequestTracker& tracker = (*mi).second;
- if (tracker.fn == fn && tracker.param1 == param1)
- pnode->mapRequests.erase(mi++);
- else
- mi++;
- }
- }
- }
- }
-}
-
-
-
-
-
-
-
-//
-// Subscription methods for the broadcast and subscription system.
-// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT.
-//
-// The subscription system uses a meet-in-the-middle strategy.
-// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers
-// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through.
-//
-
-bool AnySubscribed(unsigned int nChannel)
-{
- if (pnodeLocalHost->IsSubscribed(nChannel))
- return true;
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->IsSubscribed(nChannel))
- return true;
- return false;
-}
-
-bool CNode::IsSubscribed(unsigned int nChannel)
-{
- if (nChannel >= vfSubscribe.size())
- return false;
- return vfSubscribe[nChannel];
-}
-
-void CNode::Subscribe(unsigned int nChannel, unsigned int nHops)
-{
- if (nChannel >= vfSubscribe.size())
- return;
-
- if (!AnySubscribed(nChannel))
- {
- // Relay subscribe
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode != this)
- pnode->PushMessage("subscribe", nChannel, nHops);
- }
-
- vfSubscribe[nChannel] = true;
-}
-
-void CNode::CancelSubscribe(unsigned int nChannel)
-{
- if (nChannel >= vfSubscribe.size())
- return;
-
- // Prevent from relaying cancel if wasn't subscribed
- if (!vfSubscribe[nChannel])
- return;
- vfSubscribe[nChannel] = false;
-
- if (!AnySubscribed(nChannel))
- {
- // Relay subscription cancel
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode != this)
- pnode->PushMessage("sub-cancel", nChannel);
- }
-}
-
-
-
-
-
-
-
CNode* FindNode(const CNetAddr& ip)
{
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
if ((CNetAddr)pnode->addr == ip)
return (pnode);
@@ -405,8 +306,8 @@ CNode* FindNode(const CNetAddr& ip)
CNode* FindNode(const CService& addr)
{
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
if ((CService)pnode->addr == addr)
return (pnode);
@@ -460,8 +361,10 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
pnode->AddRef(nTimeout);
else
pnode->AddRef();
- CRITICAL_BLOCK(cs_vNodes)
+ {
+ LOCK(cs_vNodes);
vNodes.push_back(pnode);
+ }
pnode->nTimeConnected = GetTime();
return pnode;
@@ -488,13 +391,6 @@ void CNode::CloseSocketDisconnect()
void CNode::Cleanup()
{
- // All of a nodes broadcasts and subscriptions are automatically torn down
- // when it goes down, so a node has to stay up to keep its broadcast going.
-
- // Cancel subscriptions
- for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++)
- if (vfSubscribe[nChannel])
- CancelSubscribe(nChannel);
}
@@ -524,8 +420,8 @@ void CNode::ClearBanned()
bool CNode::IsBanned(CNetAddr ip)
{
bool fResult = false;
- CRITICAL_BLOCK(cs_setBanned)
{
+ LOCK(cs_setBanned);
std::map<CNetAddr, int64>::iterator i = setBanned.find(ip);
if (i != setBanned.end())
{
@@ -549,9 +445,11 @@ bool CNode::Misbehaving(int howmuch)
if (nMisbehavior >= GetArg("-banscore", 100))
{
int64 banTime = GetTime()+GetArg("-bantime", 60*60*24); // Default 24-hour ban
- CRITICAL_BLOCK(cs_setBanned)
+ {
+ LOCK(cs_setBanned);
if (setBanned[addr] < banTime)
setBanned[addr] = banTime;
+ }
CloseSocketDisconnect();
printf("Disconnected %s for misbehavior (score=%d)\n", addr.ToString().c_str(), nMisbehavior);
return true;
@@ -593,15 +491,15 @@ void ThreadSocketHandler2(void* parg)
{
printf("ThreadSocketHandler started\n");
list<CNode*> vNodesDisconnected;
- int nPrevNodeCount = 0;
+ unsigned int nPrevNodeCount = 0;
loop
{
//
// Disconnect nodes
//
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
// Disconnect unused nodes
vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
@@ -612,6 +510,10 @@ void ThreadSocketHandler2(void* parg)
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
+ if (pnode->fHasGrant)
+ semOutbound->post();
+ pnode->fHasGrant = false;
+
// close socket and cleanup
pnode->CloseSocketDisconnect();
pnode->Cleanup();
@@ -632,11 +534,23 @@ void ThreadSocketHandler2(void* parg)
if (pnode->GetRefCount() <= 0)
{
bool fDelete = false;
- TRY_CRITICAL_BLOCK(pnode->cs_vSend)
- TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
- TRY_CRITICAL_BLOCK(pnode->cs_mapRequests)
- TRY_CRITICAL_BLOCK(pnode->cs_inventory)
- fDelete = true;
+ {
+ TRY_LOCK(pnode->cs_vSend, lockSend);
+ if (lockSend)
+ {
+ TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ if (lockRecv)
+ {
+ TRY_LOCK(pnode->cs_mapRequests, lockReq);
+ if (lockReq)
+ {
+ TRY_LOCK(pnode->cs_inventory, lockInv);
+ if (lockInv)
+ fDelete = true;
+ }
+ }
+ }
+ }
if (fDelete)
{
vNodesDisconnected.remove(pnode);
@@ -674,8 +588,8 @@ void ThreadSocketHandler2(void* parg)
hSocketMax = max(hSocketMax, hListenSocket);
have_fds = true;
}
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
{
if (pnode->hSocket == INVALID_SOCKET)
@@ -684,9 +598,11 @@ void ThreadSocketHandler2(void* parg)
FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = max(hSocketMax, pnode->hSocket);
have_fds = true;
- TRY_CRITICAL_BLOCK(pnode->cs_vSend)
- if (!pnode->vSend.empty())
+ {
+ TRY_LOCK(pnode->cs_vSend, lockSend);
+ if (lockSend && !pnode->vSend.empty())
FD_SET(pnode->hSocket, &fdsetSend);
+ }
}
}
@@ -725,10 +641,12 @@ void ThreadSocketHandler2(void* parg)
if (hSocket != INVALID_SOCKET)
addr = CAddress(sockaddr);
- CRITICAL_BLOCK(cs_vNodes)
+ {
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->fInbound)
- nInbound++;
+ if (pnode->fInbound)
+ nInbound++;
+ }
if (hSocket == INVALID_SOCKET)
{
@@ -737,9 +655,11 @@ void ThreadSocketHandler2(void* parg)
}
else if (nInbound >= GetArg("-maxconnections", 125) - MAX_OUTBOUND_CONNECTIONS)
{
- CRITICAL_BLOCK(cs_setservAddNodeAddresses)
+ {
+ LOCK(cs_setservAddNodeAddresses);
if (!setservAddNodeAddresses.count(addr))
closesocket(hSocket);
+ }
}
else if (CNode::IsBanned(addr))
{
@@ -751,8 +671,10 @@ void ThreadSocketHandler2(void* parg)
printf("accepted connection %s\n", addr.ToString().c_str());
CNode* pnode = new CNode(hSocket, addr, true);
pnode->AddRef();
- CRITICAL_BLOCK(cs_vNodes)
+ {
+ LOCK(cs_vNodes);
vNodes.push_back(pnode);
+ }
}
}
@@ -761,8 +683,8 @@ void ThreadSocketHandler2(void* parg)
// Service each socket
//
vector<CNode*> vNodesCopy;
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
pnode->AddRef();
@@ -779,7 +701,8 @@ void ThreadSocketHandler2(void* parg)
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
- TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
+ TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ if (lockRecv)
{
CDataStream& vRecv = pnode->vRecv;
unsigned int nPos = vRecv.size();
@@ -828,7 +751,8 @@ void ThreadSocketHandler2(void* parg)
continue;
if (FD_ISSET(pnode->hSocket, &fdsetSend))
{
- TRY_CRITICAL_BLOCK(pnode->cs_vSend)
+ TRY_LOCK(pnode->cs_vSend, lockSend);
+ if (lockSend)
{
CDataStream& vSend = pnode->vSend;
if (!vSend.empty())
@@ -882,8 +806,8 @@ void ThreadSocketHandler2(void* parg)
}
}
}
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodesCopy)
pnode->Release();
}
@@ -1271,7 +1195,7 @@ void ThreadOpenConnections2(void* parg)
{
CAddress addr(CService(strAddr, GetDefaultPort(), fAllowDNS));
if (addr.IsValid())
- OpenNetworkConnection(addr);
+ OpenNetworkConnection(addr, false);
for (int i = 0; i < 10 && i < nLoop; i++)
{
Sleep(500);
@@ -1286,32 +1210,18 @@ void ThreadOpenConnections2(void* parg)
int64 nStart = GetTime();
loop
{
- int nOutbound = 0;
-
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(500);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;
- // Limit outbound connections
- loop
- {
- nOutbound = 0;
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (!pnode->fInbound)
- nOutbound++;
- int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
- nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
- if (nOutbound < nMaxOutboundConnections)
- break;
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- Sleep(2000);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return;
- }
+
+ vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
+ semOutbound->wait();
+ vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
+ if (fShutdown)
+ return;
// Add seed nodes if IRC isn't working
bool fTOR = (fUseProxy && addrProxy.GetPort() == 9050);
@@ -1341,12 +1251,17 @@ void ThreadOpenConnections2(void* parg)
// Only connect out to one peer per network group (/16 for IPv4).
// Do this here so we don't have to critsect vNodes inside mapAddresses critsect.
+ int nOutbound = 0;
set<vector<unsigned char> > setConnected;
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
+ {
+ LOCK(cs_vNodes);
+ BOOST_FOREACH(CNode* pnode, vNodes) {
if (!pnode->fInbound) {
setConnected.insert(pnode->addr.GetGroup());
+ nOutbound++;
}
+ }
+ }
int64 nANow = GetAdjustedTime();
@@ -1376,6 +1291,8 @@ void ThreadOpenConnections2(void* parg)
if (addrConnect.IsValid())
OpenNetworkConnection(addrConnect);
+ else
+ semOutbound->post();
}
}
@@ -1412,9 +1329,11 @@ void ThreadOpenAddedConnections2(void* parg)
if(Lookup(strAddNode.c_str(), vservNode, GetDefaultPort(), fAllowDNS, 0))
{
vservAddressesToAdd.push_back(vservNode);
- CRITICAL_BLOCK(cs_setservAddNodeAddresses)
+ {
+ LOCK(cs_setservAddNodeAddresses);
BOOST_FOREACH(CService& serv, vservNode)
setservAddNodeAddresses.insert(serv);
+ }
}
}
loop
@@ -1422,7 +1341,8 @@ void ThreadOpenAddedConnections2(void* parg)
vector<vector<CService> > vservConnectAddresses = vservAddressesToAdd;
// Attempt to connect to each IP for each addnode entry until at least one is successful per addnode entry
// (keeping in mind that addnode entries can have many IPs if fAllowDNS)
- CRITICAL_BLOCK(cs_vNodes)
+ {
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
for (vector<vector<CService> >::iterator it = vservConnectAddresses.begin(); it != vservConnectAddresses.end(); it++)
BOOST_FOREACH(CService& addrNode, *(it))
@@ -1432,8 +1352,10 @@ void ThreadOpenAddedConnections2(void* parg)
it--;
break;
}
+ }
BOOST_FOREACH(vector<CService>& vserv, vservConnectAddresses)
{
+ semOutbound->wait();
OpenNetworkConnection(CAddress(*(vserv.begin())));
Sleep(500);
if (fShutdown)
@@ -1449,7 +1371,14 @@ void ThreadOpenAddedConnections2(void* parg)
}
}
-bool OpenNetworkConnection(const CAddress& addrConnect)
+bool static ReleaseGrant(bool fUseGrant) {
+ if (fUseGrant)
+ semOutbound->post();
+ return false;
+}
+
+// only call this function when semOutbound has been waited for
+bool OpenNetworkConnection(const CAddress& addrConnect, bool fUseGrant)
{
//
// Initiate outbound network connection
@@ -1458,7 +1387,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect)
return false;
if ((CNetAddr)addrConnect == (CNetAddr)addrLocalHost || !addrConnect.IsIPv4() ||
FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect))
- return false;
+ return ReleaseGrant(fUseGrant);
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
CNode* pnode = ConnectNode(addrConnect);
@@ -1466,7 +1395,13 @@ bool OpenNetworkConnection(const CAddress& addrConnect)
if (fShutdown)
return false;
if (!pnode)
- return false;
+ return ReleaseGrant(fUseGrant);
+ if (pnode->fHasGrant) {
+ // node already has connection grant, release the one that was passed to us
+ ReleaseGrant(fUseGrant);
+ } else {
+ pnode->fHasGrant = fUseGrant;
+ }
pnode->fNetworkNode = true;
return true;
@@ -1505,8 +1440,8 @@ void ThreadMessageHandler2(void* parg)
while (!fShutdown)
{
vector<CNode*> vNodesCopy;
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
pnode->AddRef();
@@ -1519,20 +1454,26 @@ void ThreadMessageHandler2(void* parg)
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
// Receive messages
- TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
- ProcessMessages(pnode);
+ {
+ TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ if (lockRecv)
+ ProcessMessages(pnode);
+ }
if (fShutdown)
return;
// Send messages
- TRY_CRITICAL_BLOCK(pnode->cs_vSend)
- SendMessages(pnode, pnode == pnodeTrickle);
+ {
+ TRY_LOCK(pnode->cs_vSend, lockSend);
+ if (lockSend)
+ SendMessages(pnode, pnode == pnodeTrickle);
+ }
if (fShutdown)
return;
}
- CRITICAL_BLOCK(cs_vNodes)
{
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodesCopy)
pnode->Release();
}
@@ -1637,6 +1578,12 @@ bool BindListenPort(string& strError)
void StartNode(void* parg)
{
+ if (semOutbound == NULL) {
+ // initialize semaphore
+ int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
+ semOutbound = new CSemaphore(nMaxOutbound);
+ }
+
#ifdef USE_UPNP
#if USE_UPNP
fUseUPnP = GetBoolArg("-upnp", true);
@@ -1655,12 +1602,16 @@ void StartNode(void* parg)
{
vector<CNetAddr> vaddr;
if (LookupHost(pszHostName, vaddr))
+ {
BOOST_FOREACH (const CNetAddr &addr, vaddr)
+ {
if (!addr.IsLocal())
{
addrLocalHost.SetIP(addr);
break;
}
+ }
+ }
}
#else
// Get local host IP
@@ -1759,6 +1710,9 @@ bool StopNode()
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
+ if (semOutbound)
+ for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++)
+ semOutbound->post();
do
{
int nThreadsRunning = 0;