aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/net.cpp66
-rw-r--r--src/net.h1
-rw-r--r--src/util.h71
3 files changed, 91 insertions, 47 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 8603514f91..67427a3e8e 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -35,7 +35,7 @@ void ThreadOpenAddedConnections2(void* parg);
void ThreadMapPort2(void* parg);
#endif
void ThreadDNSAddressSeed2(void* parg);
-bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest = NULL, bool fOneShot = false);
+bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false);
@@ -66,10 +66,7 @@ CCriticalSection cs_vOneShots;
set<CNetAddr> setservAddNodeAddresses;
CCriticalSection cs_setservAddNodeAddresses;
-static CWaitableCriticalSection csOutbound;
-static int nOutbound = 0;
-static CConditionVariable condOutbound;
-
+static CSemaphore *semOutbound = NULL;
void AddOneShot(string strDest)
{
@@ -463,10 +460,6 @@ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout)
LOCK(cs_vNodes);
vNodes.push_back(pnode);
}
- {
- WAITABLE_LOCK(csOutbound);
- nOutbound++;
- }
pnode->nTimeConnected = GetTime();
return pnode;
@@ -612,14 +605,8 @@ void ThreadSocketHandler2(void* parg)
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
- if (!pnode->fInbound)
- {
- WAITABLE_LOCK(csOutbound);
- nOutbound--;
-
- // Connection slot(s) were removed, notify connection creator(s)
- NOTIFY(condOutbound);
- }
+ // release outbound grant (if any)
+ pnode->grantOutbound.Release();
// close socket and cleanup
pnode->CloseSocketDisconnect();
@@ -1295,8 +1282,11 @@ void static ProcessOneShot()
vOneShots.pop_front();
}
CAddress addr;
- if (!OpenNetworkConnection(addr, strDest.c_str(), true))
- AddOneShot(strDest);
+ CSemaphoreGrant grant(*semOutbound, true);
+ if (grant) {
+ if (!OpenNetworkConnection(addr, &grant, strDest.c_str(), true))
+ AddOneShot(strDest);
+ }
}
void ThreadOpenConnections2(void* parg)
@@ -1312,7 +1302,7 @@ void ThreadOpenConnections2(void* parg)
BOOST_FOREACH(string strAddr, mapMultiArgs["-connect"])
{
CAddress addr;
- OpenNetworkConnection(addr, strAddr.c_str());
+ OpenNetworkConnection(addr, NULL, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++)
{
Sleep(500);
@@ -1335,13 +1325,9 @@ void ThreadOpenConnections2(void* parg)
if (fShutdown)
return;
- // Limit outbound connections
- int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
+
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- {
- WAITABLE_LOCK(csOutbound);
- WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
- }
+ CSemaphoreGrant grant(*semOutbound);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;
@@ -1374,11 +1360,15 @@ void ThreadOpenConnections2(void* parg)
// Only connect to one address per a.b.?.? range.
// Do this here so we don't have to critsect vNodes inside mapAddresses critsect.
+ int nOutbound = 0;
set<vector<unsigned char> > setConnected;
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
+ BOOST_FOREACH(CNode* pnode, vNodes) {
setConnected.insert(pnode->addr.GetGroup());
+ if (!pnode->fInbound)
+ nOutbound++;
+ }
}
int64 nANow = GetAdjustedTime();
@@ -1408,7 +1398,7 @@ void ThreadOpenConnections2(void* parg)
}
if (addrConnect.IsValid())
- OpenNetworkConnection(addrConnect);
+ OpenNetworkConnection(addrConnect, &grant);
}
}
@@ -1442,7 +1432,8 @@ void ThreadOpenAddedConnections2(void* parg)
while(!fShutdown) {
BOOST_FOREACH(string& strAddNode, mapMultiArgs["-addnode"]) {
CAddress addr;
- OpenNetworkConnection(addr, strAddNode.c_str());
+ CSemaphoreGrant grant(*semOutbound);
+ OpenNetworkConnection(addr, &grant, strAddNode.c_str());
Sleep(500);
}
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
@@ -1485,7 +1476,8 @@ void ThreadOpenAddedConnections2(void* parg)
}
BOOST_FOREACH(vector<CService>& vserv, vservConnectAddresses)
{
- OpenNetworkConnection(CAddress(*(vserv.begin())));
+ CSemaphoreGrant grant(*semOutbound);
+ OpenNetworkConnection(CAddress(*(vserv.begin())), &grant);
Sleep(500);
if (fShutdown)
return;
@@ -1500,7 +1492,8 @@ void ThreadOpenAddedConnections2(void* parg)
}
}
-bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest, bool fOneShot)
+// if succesful, this moves the passed grant to the constructed node
+bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *strDest, bool fOneShot)
{
//
// Initiate outbound network connection
@@ -1522,6 +1515,8 @@ bool OpenNetworkConnection(const CAddress& addrConnect, const char *strDest, boo
return false;
if (!pnode)
return false;
+ if (grantOutbound)
+ grantOutbound->MoveTo(pnode->grantOutbound);
pnode->fNetworkNode = true;
if (fOneShot)
pnode->fOneShot = true;
@@ -1770,6 +1765,12 @@ void StartNode(void* parg)
#endif
#endif
+ if (semOutbound == NULL) {
+ // initialize semaphore
+ int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
+ semOutbound = new CSemaphore(nMaxOutbound);
+ }
+
if (pnodeLocalHost == NULL)
pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices));
@@ -1823,7 +1824,8 @@ bool StopNode()
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
- NOTIFY_ALL(condOutbound);
+ for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++)
+ semOutbound->post();
do
{
int nThreadsRunning = 0;
diff --git a/src/net.h b/src/net.h
index a00dd1b8cc..4e4ea31ead 100644
--- a/src/net.h
+++ b/src/net.h
@@ -147,6 +147,7 @@ public:
bool fNetworkNode;
bool fSuccessfullyConnected;
bool fDisconnect;
+ CSemaphoreGrant grantOutbound;
protected:
int nRefCount;
diff --git a/src/util.h b/src/util.h
index ebd574f896..61ff553539 100644
--- a/src/util.h
+++ b/src/util.h
@@ -23,7 +23,7 @@ typedef int pid_t; /* define for windows compatiblity */
#include <boost/filesystem/path.hpp>
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
-#include <boost/interprocess/sync/interprocess_condition.hpp>
+#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/interprocess/sync/lock_options.hpp>
#include <boost/date_time/gregorian/gregorian_types.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
@@ -275,24 +275,10 @@ public:
};
typedef CMutexLock<CCriticalSection> CCriticalBlock;
-typedef CMutexLock<CWaitableCriticalSection> CWaitableCriticalBlock;
-typedef boost::interprocess::interprocess_condition CConditionVariable;
-
-/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */
-#define WAIT(name,condition) \
- do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0)
-
-/** Notify waiting threads that a condition may hold now */
-#define NOTIFY(name) \
- do { (name).notify_one(); } while(0)
-
-#define NOTIFY_ALL(name) \
- do { (name).notify_all(); } while(0)
#define LOCK(cs) CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__)
#define LOCK2(cs1,cs2) CCriticalBlock criticalblock1(cs1, #cs1, __FILE__, __LINE__),criticalblock2(cs2, #cs2, __FILE__, __LINE__)
#define TRY_LOCK(cs,name) CCriticalBlock name(cs, #cs, __FILE__, __LINE__, true)
-#define WAITABLE_LOCK(cs) CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__)
#define ENTER_CRITICAL_SECTION(cs) \
{ \
@@ -306,6 +292,61 @@ typedef boost::interprocess::interprocess_condition CConditionVariable;
LeaveCritical(); \
}
+typedef boost::interprocess::interprocess_semaphore CSemaphore;
+
+/** RAII-style semaphore lock */
+class CSemaphoreGrant
+{
+private:
+ CSemaphore *sem;
+ bool fHaveGrant;
+
+public:
+ void Acquire() {
+ if (fHaveGrant)
+ return;
+ sem->wait();
+ fHaveGrant = true;
+ }
+
+ void Release() {
+ if (!fHaveGrant)
+ return;
+ sem->post();
+ fHaveGrant = false;
+ }
+
+ bool TryAcquire() {
+ if (!fHaveGrant && sem->try_wait())
+ fHaveGrant = true;
+ return fHaveGrant;
+ }
+
+ void MoveTo(CSemaphoreGrant &grant) {
+ grant.Release();
+ grant.sem = sem;
+ grant.fHaveGrant = fHaveGrant;
+ sem = NULL;
+ fHaveGrant = false;
+ }
+
+ CSemaphoreGrant() : sem(NULL), fHaveGrant(false) {}
+
+ CSemaphoreGrant(CSemaphore &sema, bool fTry = false) : sem(&sema), fHaveGrant(false) {
+ if (fTry)
+ TryAcquire();
+ else
+ Acquire();
+ }
+
+ ~CSemaphoreGrant() {
+ Release();
+ }
+
+ operator bool() {
+ return fHaveGrant;
+ }
+};
inline std::string i64tostr(int64 n)
{