From d2545cbf7f7dfae4b1d01c299ed0e17d71664439 Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Tue, 30 Oct 2012 16:56:52 -0400 Subject: [PATCH 5/6] Added a DB:SuspendCompations() and DB:ResumeCompactions() methods. Fixes issue #184 https://code.google.com/p/leveldb/issues/detail?id=184 --- db/db_impl.cc | 36 ++++++++++++++++++++++++++++++++++++ db/db_impl.h | 9 +++++++++ db/db_test.cc | 4 ++++ include/leveldb/db.h | 6 ++++++ 4 files changed, 55 insertions(+) diff --git a/db/db_impl.cc b/db/db_impl.cc index 395d317..593a5b5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -124,6 +124,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) dbname_(dbname), db_lock_(NULL), shutting_down_(NULL), + suspend_cv(&suspend_mutex), + suspend_count(0), + suspended(false), bg_cv_(&mutex_), mem_(new MemTable(internal_comparator_)), imm_(NULL), @@ -1398,6 +1401,39 @@ void DBImpl::GetApproximateSizes( } } +void DBImpl::SuspendCompactions() { + MutexLock l(& suspend_mutex); + env_->Schedule(&SuspendWork, this); + suspend_count++; + while( !suspended ) { + suspend_cv.Wait(); + } +} +void DBImpl::SuspendWork(void* db) { + reinterpret_cast(db)->SuspendCallback(); +} +void DBImpl::SuspendCallback() { + MutexLock l(&suspend_mutex); + Log(options_.info_log, "Compactions suspended"); + suspended = true; + suspend_cv.SignalAll(); + while( suspend_count > 0 ) { + suspend_cv.Wait(); + } + suspended = false; + suspend_cv.SignalAll(); + Log(options_.info_log, "Compactions resumed"); +} +void DBImpl::ResumeCompactions() { + MutexLock l(&suspend_mutex); + suspend_count--; + suspend_cv.SignalAll(); + while( suspended ) { + suspend_cv.Wait(); + } +} + + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index 3c8d711..9f3949c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -41,6 +41,8 @@ class DBImpl : public DB { virtual bool GetProperty(const Slice& property, std::string* value); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); virtual void CompactRange(const Slice* begin, const Slice* end); + virtual void SuspendCompactions(); + virtual void ResumeCompactions(); // Extra methods (for testing) that are not in the public DB interface @@ -125,6 +127,13 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-NULL iff successfully acquired. FileLock* db_lock_; + port::Mutex suspend_mutex; + port::CondVar suspend_cv; + int suspend_count; + bool suspended; + static void SuspendWork(void* db); + void SuspendCallback(); + // State below is protected by mutex_ port::Mutex mutex_; port::AtomicPointer shutting_down_; diff --git a/db/db_test.cc b/db/db_test.cc index 49aae04..782be38 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1801,6 +1801,10 @@ class ModelDB: public DB { explicit ModelDB(const Options& options): options_(options) { } ~ModelDB() { } + + virtual void SuspendCompactions() {} + virtual void ResumeCompactions() {} + virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) { return DB::Put(o, k, v); } diff --git a/include/leveldb/db.h b/include/leveldb/db.h index da8b11a..089707c 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -140,6 +140,12 @@ class DB { // db->CompactRange(NULL, NULL); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + // Suspends the background compaction thread. This methods + // returns once suspended. + virtual void SuspendCompactions() = 0; + // Resumes a suspended background compation thread. + virtual void ResumeCompactions() = 0; + private: // No copying allowed DB(const DB&); -- 1.8.3.1