aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPieter Wuille <pieter.wuille@gmail.com>2012-04-06 04:11:14 -0700
committerPieter Wuille <pieter.wuille@gmail.com>2012-04-06 04:11:14 -0700
commit9362da78b0d8f0dd7bbad259adcb3e4d0cf58e56 (patch)
tree9abfb65cfff9b76217e72e55608baa357274f1f1
parent9682087b65b820f24579871946731445302a1a5a (diff)
parent092631f0ba0030dec1ccfa66e206fb2a6c9bf73b (diff)
Merge pull request #1033 from sipa/wait
Condition variables instead of polling
-rw-r--r--src/net.cpp41
-rw-r--r--src/util.cpp56
-rw-r--r--src/util.h157
3 files changed, 132 insertions, 122 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 7dc2d4c22a..59bace41bb 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -64,6 +64,9 @@ map<CInv, int64> mapAlreadyAskedFor;
set<CNetAddr> setservAddNodeAddresses;
CCriticalSection cs_setservAddNodeAddresses;
+static CWaitableCriticalSection csOutbound;
+static int nOutbound = 0;
+static CConditionVariable condOutbound;
unsigned short GetListenPort()
@@ -361,6 +364,8 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
pnode->AddRef();
CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode);
+ WAITABLE_CRITICAL_BLOCK(csOutbound)
+ nOutbound++;
pnode->nTimeConnected = GetTime();
return pnode;
@@ -504,6 +509,15 @@ void ThreadSocketHandler2(void* parg)
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
+ if (!pnode->fInbound)
+ WAITABLE_CRITICAL_BLOCK(csOutbound)
+ {
+ nOutbound--;
+
+ // Connection slot(s) were removed, notify connection creator(s)
+ NOTIFY(condOutbound);
+ }
+
// close socket and cleanup
pnode->CloseSocketDisconnect();
pnode->Cleanup();
@@ -1172,8 +1186,6 @@ void ThreadOpenConnections2(void* parg)
int64 nStart = GetTime();
loop
{
- int nOutbound = 0;
-
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(500);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
@@ -1181,23 +1193,13 @@ void ThreadOpenConnections2(void* parg)
return;
// Limit outbound connections
- loop
- {
- nOutbound = 0;
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (!pnode->fInbound)
- nOutbound++;
- int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
- nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
- if (nOutbound < nMaxOutboundConnections)
- break;
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- Sleep(2000);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return;
- }
+ int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
+ vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
+ WAITABLE_CRITICAL_BLOCK(csOutbound)
+ WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
+ vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
+ if (fShutdown)
+ return;
bool fAddSeeds = false;
@@ -1646,6 +1648,7 @@ bool StopNode()
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
+ NOTIFY_ALL(condOutbound);
do
{
int nThreadsRunning = 0;
diff --git a/src/util.cpp b/src/util.cpp
index 5c47551526..d55e7ae10e 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -1183,62 +1183,14 @@ static void pop_lock()
dd_mutex.unlock();
}
-void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
+void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs)
{
- push_lock(this, CLockLocation(pszName, pszFile, nLine));
-#ifdef DEBUG_LOCKCONTENTION
- bool result = mutex.try_lock();
- if (!result)
- {
- printf("LOCKCONTENTION: %s\n", pszName);
- printf("Locker: %s:%d\n", pszFile, nLine);
- mutex.lock();
- printf("Locked\n");
- }
-#else
- mutex.lock();
-#endif
-}
-void CCriticalSection::Leave()
-{
- mutex.unlock();
- pop_lock();
-}
-bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine)
-{
- push_lock(this, CLockLocation(pszName, pszFile, nLine));
- bool result = mutex.try_lock();
- if (!result) pop_lock();
- return result;
+ push_lock(cs, CLockLocation(pszName, pszFile, nLine));
}
-#else
-
-void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
+void LeaveCritical()
{
-#ifdef DEBUG_LOCKCONTENTION
- bool result = mutex.try_lock();
- if (!result)
- {
- printf("LOCKCONTENTION: %s\n", pszName);
- printf("Locker: %s:%d\n", pszFile, nLine);
- mutex.lock();
- }
-#else
- mutex.lock();
-#endif
-}
-
-void CCriticalSection::Leave()
-{
- mutex.unlock();
-}
-
-bool CCriticalSection::TryEnter(const char*, const char*, int)
-{
- bool result = mutex.try_lock();
- return result;
+ pop_lock();
}
#endif /* DEBUG_LOCKORDER */
-
diff --git a/src/util.h b/src/util.h
index e4cf83f433..635790b71b 100644
--- a/src/util.h
+++ b/src/util.h
@@ -20,6 +20,9 @@ typedef int pid_t; /* define for windows compatiblity */
#include <boost/thread.hpp>
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
+#include <boost/interprocess/sync/scoped_lock.hpp>
+#include <boost/interprocess/sync/interprocess_condition.hpp>
+#include <boost/interprocess/sync/lock_options.hpp>
#include <boost/date_time/gregorian/gregorian_types.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
@@ -180,82 +183,134 @@ void AddTimeData(const CNetAddr& ip, int64 nTime);
+/** Wrapped boost mutex: supports recursive locking, but no waiting */
+typedef boost::interprocess::interprocess_recursive_mutex CCriticalSection;
-/** Wrapper to automatically initialize mutex. */
-class CCriticalSection
-{
-protected:
- boost::interprocess::interprocess_recursive_mutex mutex;
-public:
- explicit CCriticalSection() { }
- ~CCriticalSection() { }
- void Enter(const char* pszName, const char* pszFile, int nLine);
- void Leave();
- bool TryEnter(const char* pszName, const char* pszFile, int nLine);
-};
+/** Wrapped boost mutex: supports waiting but not recursive locking */
+typedef boost::interprocess::interprocess_mutex CWaitableCriticalSection;
-/** RAII object that acquires mutex. Needed for exception safety. */
-class CCriticalBlock
-{
-protected:
- CCriticalSection* pcs;
+#ifdef DEBUG_LOCKORDER
+void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs);
+void LeaveCritical();
+#else
+void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) {}
+void static inline LeaveCritical() {}
+#endif
+/** Wrapper around boost::interprocess::scoped_lock */
+template<typename Mutex>
+class CMutexLock
+{
+private:
+ boost::interprocess::scoped_lock<Mutex> lock;
public:
- CCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)
+
+ void Enter(const char* pszName, const char* pszFile, int nLine)
{
- pcs = &csIn;
- pcs->Enter(pszName, pszFile, nLine);
+ if (!lock.owns())
+ {
+ EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
+#ifdef DEBUG_LOCKCONTENTION
+ if (!lock.try_lock())
+ {
+ printf("LOCKCONTENTION: %s\n", pszName);
+ printf("Locker: %s:%d\n", pszFile, nLine);
+ }
+#endif
+ lock.lock();
+ }
}
- operator bool() const
+ void Leave()
{
- return true;
+ if (lock.owns())
+ {
+ lock.unlock();
+ LeaveCritical();
+ }
}
- ~CCriticalBlock()
+ bool TryEnter(const char* pszName, const char* pszFile, int nLine)
{
- pcs->Leave();
+ if (!lock.owns())
+ {
+ EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
+ lock.try_lock();
+ if (!lock.owns())
+ LeaveCritical();
+ }
+ return lock.owns();
}
-};
-
-#define CRITICAL_BLOCK(cs) \
- if (CCriticalBlock criticalblock = CCriticalBlock(cs, #cs, __FILE__, __LINE__))
-
-#define ENTER_CRITICAL_SECTION(cs) \
- (cs).Enter(#cs, __FILE__, __LINE__)
-
-#define LEAVE_CRITICAL_SECTION(cs) \
- (cs).Leave()
-/** RAII object that tries to acquire mutex. Needed for exception safety. */
-class CTryCriticalBlock
-{
-protected:
- CCriticalSection* pcs;
+ CMutexLock(Mutex& mutexIn, const char* pszName, const char* pszFile, int nLine, bool fTry = false) : lock(mutexIn, boost::interprocess::defer_lock)
+ {
+ if (fTry)
+ TryEnter(pszName, pszFile, nLine);
+ else
+ Enter(pszName, pszFile, nLine);
+ }
-public:
- CTryCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)
+ ~CMutexLock()
{
- pcs = (csIn.TryEnter(pszName, pszFile, nLine) ? &csIn : NULL);
+ if (lock.owns())
+ LeaveCritical();
}
- operator bool() const
+ operator bool()
{
- return Entered();
+ return lock.owns();
}
- ~CTryCriticalBlock()
+ boost::interprocess::scoped_lock<Mutex> &GetLock()
{
- if (pcs)
- {
- pcs->Leave();
- }
+ return lock;
}
- bool Entered() const { return pcs != NULL; }
};
+typedef CMutexLock<CCriticalSection> CCriticalBlock;
+typedef CMutexLock<CWaitableCriticalSection> CWaitableCriticalBlock;
+typedef boost::interprocess::interprocess_condition CConditionVariable;
+
+/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */
+#define WAIT(name,condition) \
+ do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0)
+
+/** Notify waiting threads that a condition may hold now */
+#define NOTIFY(name) \
+ do { (name).notify_one(); } while(0)
+
+#define NOTIFY_ALL(name) \
+ do { (name).notify_all(); } while(0)
+
+#define CRITICAL_BLOCK(cs) \
+ for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
+ for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)
+
+#define WAITABLE_CRITICAL_BLOCK(cs) \
+ for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by WAITABLE_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
+ for (CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)
+
+#define ENTER_CRITICAL_SECTION(cs) \
+ { \
+ EnterCritical(#cs, __FILE__, __LINE__, (void*)(&cs)); \
+ (cs).lock(); \
+ }
+
+#define LEAVE_CRITICAL_SECTION(cs) \
+ { \
+ (cs).unlock(); \
+ LeaveCritical(); \
+ }
+
#define TRY_CRITICAL_BLOCK(cs) \
- if (CTryCriticalBlock criticalblock = CTryCriticalBlock(cs, #cs, __FILE__, __LINE__))
+ for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by TRY_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
+ for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__, true); fcriticalblockonce && (fcriticalblockonce = criticalblock); fcriticalblockonce=false)
+
+
+// This is exactly like std::string, but with a custom allocator.
+// (secure_allocator<> is defined in serialize.h)
+typedef std::basic_string<char, std::char_traits<char>, secure_allocator<char> > SecureString;
+