aboutsummaryrefslogtreecommitdiff
path: root/src/leveldb/port/port_win.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/leveldb/port/port_win.cc')
-rw-r--r--src/leveldb/port/port_win.cc119
1 files changed, 43 insertions, 76 deletions
diff --git a/src/leveldb/port/port_win.cc b/src/leveldb/port/port_win.cc
index 786cd6018a..99c1d8e346 100644
--- a/src/leveldb/port/port_win.cc
+++ b/src/leveldb/port/port_win.cc
@@ -37,102 +37,96 @@ namespace leveldb {
namespace port {
Mutex::Mutex() :
- mutex_(::CreateMutex(NULL, FALSE, NULL)) {
- assert(mutex_);
+ cs_(NULL) {
+ assert(!cs_);
+ cs_ = static_cast<void *>(new CRITICAL_SECTION());
+ ::InitializeCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
+ assert(cs_);
}
Mutex::~Mutex() {
- assert(mutex_);
- ::CloseHandle(mutex_);
+ assert(cs_);
+ ::DeleteCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
+ delete static_cast<CRITICAL_SECTION *>(cs_);
+ cs_ = NULL;
+ assert(!cs_);
}
void Mutex::Lock() {
- assert(mutex_);
- ::WaitForSingleObject(mutex_, INFINITE);
+ assert(cs_);
+ ::EnterCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
}
void Mutex::Unlock() {
- assert(mutex_);
- ::ReleaseMutex(mutex_);
+ assert(cs_);
+ ::LeaveCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
}
void Mutex::AssertHeld() {
- assert(mutex_);
+ assert(cs_);
assert(1);
}
CondVar::CondVar(Mutex* mu) :
waiting_(0),
mu_(mu),
- sema_(::CreateSemaphore(NULL, 0, 0x7fffffff, NULL)),
- event_(::CreateEvent(NULL, FALSE, FALSE, NULL)),
- broadcasted_(false){
+ sem1_(::CreateSemaphore(NULL, 0, 10000, NULL)),
+ sem2_(::CreateSemaphore(NULL, 0, 10000, NULL)) {
assert(mu_);
}
CondVar::~CondVar() {
- ::CloseHandle(sema_);
- ::CloseHandle(event_);
+ ::CloseHandle(sem1_);
+ ::CloseHandle(sem2_);
}
void CondVar::Wait() {
+ mu_->AssertHeld();
+
wait_mtx_.Lock();
++waiting_;
- assert(waiting_ > 0);
wait_mtx_.Unlock();
- ::SignalObjectAndWait(mu_->mutex_, sema_, INFINITE, FALSE);
-
- wait_mtx_.Lock();
- bool last = broadcasted_ && (--waiting_ == 0);
- assert(waiting_ >= 0);
- wait_mtx_.Unlock();
+ mu_->Unlock();
- // we leave this function with the mutex held
- if (last)
- {
- ::SignalObjectAndWait(event_, mu_->mutex_, INFINITE, FALSE);
- }
- else
- {
- ::WaitForSingleObject(mu_->mutex_, INFINITE);
- }
+ // initiate handshake
+ ::WaitForSingleObject(sem1_, INFINITE);
+ ::ReleaseSemaphore(sem2_, 1, NULL);
+ mu_->Lock();
}
void CondVar::Signal() {
wait_mtx_.Lock();
- bool waiters = waiting_ > 0;
- wait_mtx_.Unlock();
+ if (waiting_ > 0) {
+ --waiting_;
- if (waiters)
- {
- ::ReleaseSemaphore(sema_, 1, 0);
+ // finalize handshake
+ ::ReleaseSemaphore(sem1_, 1, NULL);
+ ::WaitForSingleObject(sem2_, INFINITE);
}
+ wait_mtx_.Unlock();
}
void CondVar::SignalAll() {
wait_mtx_.Lock();
-
- broadcasted_ = (waiting_ > 0);
-
- if (broadcasted_)
- {
- // release all
- ::ReleaseSemaphore(sema_, waiting_, 0);
- wait_mtx_.Unlock();
- ::WaitForSingleObject(event_, INFINITE);
- broadcasted_ = false;
- }
- else
- {
- wait_mtx_.Unlock();
+ for(long i = 0; i < waiting_; ++i) {
+ ::ReleaseSemaphore(sem1_, 1, NULL);
+ while(waiting_ > 0) {
+ --waiting_;
+ ::WaitForSingleObject(sem2_, INFINITE);
+ }
}
+ wait_mtx_.Unlock();
}
AtomicPointer::AtomicPointer(void* v) {
Release_Store(v);
}
+void InitOnce(OnceType* once, void (*initializer)()) {
+ once->InitOnce(initializer);
+}
+
void* AtomicPointer::Acquire_Load() const {
void * p = NULL;
InterlockedExchangePointer(&p, rep_);
@@ -151,32 +145,5 @@ void AtomicPointer::NoBarrier_Store(void* v) {
rep_ = v;
}
-enum InitializationState
-{
- Uninitialized = 0,
- Running = 1,
- Initialized = 2
-};
-
-void InitOnce(OnceType* once, void (*initializer)()) {
-
- assert(Uninitialized == LEVELDB_ONCE_INIT);
-
- InitializationState state = static_cast<InitializationState>(InterlockedCompareExchange(once, Running, Uninitialized));
-
- if (state == Uninitialized) {
- initializer();
- *once = Initialized;
- }
-
- if (state == Running) {
- while(*once != Initialized) {
- Sleep(0); // yield
- }
- }
-
- assert(*once == Initialized);
-}
-
}
}