Blob Blame History Raw
From 600610444c444d446c092804b575f5f3eeb49917 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 21 May 2010 12:49:22 +0000
Subject: [PATCH 01/13] Added a lock to protect MessageList in MessageStoreImpl and the static variables in JournalImpl; Switched all locks at this level to qpid::sys::Mutex and qpid::sys::ScopedLock for consistency.

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3980 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/JournalImpl.cpp      |   41 +++++++++++++++++++++++++----------------
 lib/JournalImpl.h        |   13 ++++++-------
 lib/MessageStoreImpl.cpp |   32 ++++++++++++++++++++++++--------
 lib/MessageStoreImpl.h   |    1 +
 4 files changed, 56 insertions(+), 31 deletions(-)

diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index eafd807..ed1c334 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -40,12 +40,13 @@ using namespace mrg::journal;
 using qpid::management::ManagementAgent;
 namespace _qmf = qmf::com::redhat::rhm::store;
 
+qpid::sys::Mutex JournalImpl::_static_lock;
 qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
 u_int32_t JournalImpl::cnt = 0;
 
-void InactivityFireEvent::fire() { slock s(_ife_mutex); if (_parent) _parent->flushFire(); }
+void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
 
-void GetEventsFireEvent::fire() { slock s(_gefe_mutex); if (_parent) _parent->getEventsFire(); }
+void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
 
 JournalImpl::JournalImpl(const std::string& journalId,
                          const std::string& journalDirectory,
@@ -68,12 +69,15 @@ JournalImpl::JournalImpl(const std::string& journalId,
 {
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
-    if (journalTimerPtr == 0)
-        journalTimerPtr = new qpid::sys::Timer;
-    assert (journalTimerPtr != 0);
-    cnt++;
-    journalTimerPtr->start();
-    journalTimerPtr->add(inactivityFireEventPtr);
+    {
+        qpid::sys::Mutex::ScopedLock sl(_static_lock);
+        if (journalTimerPtr == 0)
+            journalTimerPtr = new qpid::sys::Timer;
+        assert (journalTimerPtr != 0);
+        cnt++;
+        journalTimerPtr->start();
+        journalTimerPtr->add(inactivityFireEventPtr);
+    }
 
     if (_agent != 0)
     {
@@ -112,11 +116,13 @@ JournalImpl::~JournalImpl()
     inactivityFireEventPtr->cancel();
     free_read_buffers();
 
-    // TODO: Make this if() thread-safe
-    if (journalTimerPtr && --cnt == 0)
     {
-        delete journalTimerPtr;
-        journalTimerPtr = 0;
+        qpid::sys::Mutex::ScopedLock sl(_static_lock);
+        if (journalTimerPtr && --cnt == 0)
+        {
+            delete journalTimerPtr;
+            journalTimerPtr = 0;
+        }
     }
 
     if (_mgmtObject != 0) {
@@ -503,7 +509,7 @@ JournalImpl::flush(const bool block_till_aio_cmpl)
 {
     const iores res = jcntl::flush(block_till_aio_cmpl);
     {
-        slock s(_getf_mutex);
+        qpid::sys::Mutex::ScopedLock sl(_getf_lock);
         if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
     }
     return res;
@@ -533,7 +539,7 @@ JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const
 void
 JournalImpl::getEventsFire()
 {
-    slock s(_getf_mutex);
+    qpid::sys::Mutex::ScopedLock sl(_getf_lock);
     getEventsTimerSetFlag = false;
     if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
     if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
@@ -552,8 +558,11 @@ JournalImpl::flushFire()
         }
     }
     inactivityFireEventPtr->setupNextFire();
-    assert(journalTimerPtr != 0);
-    journalTimerPtr->add(inactivityFireEventPtr);
+    {
+        qpid::sys::Mutex::ScopedLock sl(_static_lock);
+        assert(journalTimerPtr != 0);
+        journalTimerPtr->add(inactivityFireEventPtr);
+    }
 }
 
 void
diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h
index 2d1b869..a3f5a17 100644
--- a/lib/JournalImpl.h
+++ b/lib/JournalImpl.h
@@ -27,8 +27,6 @@
 #include <set>
 #include "jrnl/enums.hpp"
 #include "jrnl/jcntl.hpp"
-#include "jrnl/slock.hpp"
-#include "jrnl/smutex.hpp"
 #include "DataTokenImpl.h"
 #include "PreparedTransaction.h"
 #include <qpid/broker/PersistableQueue.h>
@@ -47,38 +45,39 @@ namespace mrg {
         class InactivityFireEvent : public qpid::sys::TimerTask
         {
             JournalImpl* _parent;
-            mrg::journal::smutex _ife_mutex;
+            qpid::sys::Mutex _ife_lock;
 
         public:
 	        InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
                 qpid::sys::TimerTask(timeout), _parent(p) {}
             virtual ~InactivityFireEvent() {}
             void fire();
-            inline void cancel() { mrg::journal::slock s(_ife_mutex); _parent = 0; }
+            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
         };
 
         class GetEventsFireEvent : public qpid::sys::TimerTask
         {
             JournalImpl* _parent;
-            mrg::journal::smutex _gefe_mutex;
+            qpid::sys::Mutex _gefe_lock;
 
         public:
 	        GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
                 qpid::sys::TimerTask(timeout), _parent(p) {}
             virtual ~GetEventsFireEvent() {}
             void fire();
-            inline void cancel() { mrg::journal::slock s(_gefe_mutex); _parent = 0; }
+            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
         };
 
         class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
         {
         private:
+            static qpid::sys::Mutex _static_lock;
             static qpid::sys::Timer* journalTimerPtr;
             static u_int32_t cnt;
 
             bool getEventsTimerSetFlag;
             boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
-            mrg::journal::smutex _getf_mutex;
+            qpid::sys::Mutex _getf_lock;
 
             u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
             std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 9b4bf25..ed3975d 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -362,10 +362,13 @@ void MessageStoreImpl::init()
 void MessageStoreImpl::finalize()
 {
     if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
-    for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
     {
-        JournalImpl* jQueue = i->second;
-        if (jQueue->is_ready()) jQueue->stop(true);
+        qpid::sys::Mutex::ScopedLock sl(journalListLock);
+        for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
+        {
+            JournalImpl* jQueue = i->second;
+            if (jQueue->is_ready()) jQueue->stop(true);
+        }
     }
 
     if (mgmtObject != 0) {
@@ -377,10 +380,13 @@ void MessageStoreImpl::finalize()
 void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
 {
     if (isInit) {
-        if (journalList.size()) { // check no queues exist
-            std::ostringstream oss;
-            oss << "truncateInit() called with " << journalList.size() << " queues still in existence";
-            THROW_STORE_EXCEPTION(oss.str());
+        {
+            qpid::sys::Mutex::ScopedLock sl(journalListLock);
+            if (journalList.size()) { // check no queues exist
+                std::ostringstream oss;
+                oss << "truncateInit() called with " << journalList.size() << " queues still in existence";
+                THROW_STORE_EXCEPTION(oss.str());
+            }
         }
         for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
@@ -402,6 +408,7 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
 
 void MessageStoreImpl::chkTplStoreInit()
 {
+    // Don't take lock unless necessary
     if (!tplStorePtr->is_ready()) {
         qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
         if (!tplStorePtr->is_ready()) {
@@ -480,6 +487,9 @@ void MessageStoreImpl::create(PersistableQueue& queue,
         jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
                                  std::string("JournalData"), defJournalGetEventsTimeout,
                                  defJournalFlushTimeout, agent);
+    }
+    {
+        qpid::sys::Mutex::ScopedLock sl(journalListLock);
         journalList[queue.getName()]=jQueue;
     }
 
@@ -517,7 +527,10 @@ void MessageStoreImpl::destroy(PersistableQueue& queue)
         JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
         jQueue->delete_jrnl_files();
         queue.setExternalQueueStore(0); // will delete the journal if exists
-        journalList.erase(journalList.find(queue.getName()));
+        {
+            qpid::sys::Mutex::ScopedLock sl(journalListLock);
+            journalList.erase(journalList.find(queue.getName()));
+        }
     }
 }
 
@@ -759,6 +772,9 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
         {
             qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
             jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+        }
+        {
+            qpid::sys::Mutex::ScopedLock sl(journalListLock);
             journalList[queueName] = jQueue;
         }
         queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 076a0ca..136659f 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -126,6 +126,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
     boost::shared_ptr<TplJournalImpl> tplStorePtr;
     TplRecoverMap tplRecoverMap;
     JournalListMap journalList;
+    qpid::sys::Mutex journalListLock;
 
     IdSequence queueIdSequence;
     IdSequence exchangeIdSequence;
-- 
1.6.6.1

From 1392d6fdbf4625bcbcadc16ba323af467305e4d8 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 21 May 2010 16:46:34 +0000
Subject: [PATCH 02/13] Removed redundant locks; the previous checkin installed the correct lock in JournalImpl::JournalImpl.

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3982 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/MessageStoreImpl.cpp |   24 +++++++-----------------
 lib/MessageStoreImpl.h   |    2 --
 2 files changed, 7 insertions(+), 19 deletions(-)

diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index ed3975d..8cedb51 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -410,12 +410,9 @@ void MessageStoreImpl::chkTplStoreInit()
 {
     // Don't take lock unless necessary
     if (!tplStorePtr->is_ready()) {
-        qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-        if (!tplStorePtr->is_ready()) {
-            journal::jdir::create_dir(getTplBaseDir());
-            tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
-            if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
-        }
+        journal::jdir::create_dir(getTplBaseDir());
+        tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+        if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
     }
 }
 
@@ -481,13 +478,8 @@ void MessageStoreImpl::create(PersistableQueue& queue,
         return;
     }
 
-    {
-        // TODO: Is this mutex necessary?
-        qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-        jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
-                                 std::string("JournalData"), defJournalGetEventsTimeout,
-                                 defJournalFlushTimeout, agent);
-    }
+    jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
+                             defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
     {
         qpid::sys::Mutex::ScopedLock sl(journalListLock);
         journalList[queue.getName()]=jQueue;
@@ -769,10 +761,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
             QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
             break;
         }
-        {
-            qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-            jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
-        }
+        jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
+                                 defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
             journalList[queueName] = jQueue;
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 136659f..12e1d97 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -93,7 +93,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
     typedef std::map<std::string, TplRecover> TplRecoverMap;
     typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
 
-    typedef std::pair<std::string, JournalImpl*> JournalListPair;
     typedef std::map<std::string, JournalImpl*> JournalListMap;
     typedef JournalListMap::iterator JournalListMapItr;
 
@@ -149,7 +148,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
     const char* envPath;
 
     qmf::com::redhat::rhm::store::Store* mgmtObject;
-    qpid::sys::Mutex jrnlCreateLock;
     qpid::management::ManagementAgent* agent;
 
     // Parameter validation and calculation
-- 
1.6.6.1

From e62a68ecde439e5a98576cbbae65bbc3f3005449 Mon Sep 17 00:00:00 2001
From: aconway <aconway@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Thu, 27 May 2010 18:06:48 +0000
Subject: [PATCH 03/13] Bug 596765: Remove global shared_ptr to store in store plugin.

The global shared_ptr delays destruction of the store till after the broker is deleted causing core dumps when unregistering management objects.
https://bugzilla.redhat.com/show_bug.cgi?id=596765

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3995 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/MessageStoreImpl.cpp |    2 +-
 lib/StorePlugin.cpp      |    8 +++-----
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 8cedb51..e7cb405 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -428,11 +428,11 @@ void MessageStoreImpl::open(db_ptr db,
 
 MessageStoreImpl::~MessageStoreImpl()
 {
+    finalize();
     try {
         for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
         }
-//        if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
     } catch (const DbException& e) {
         QPID_LOG(error, "Error closing BDB databases: " <<  e.what());
     } catch (const journal::jexception& e) {
diff --git a/lib/StorePlugin.cpp b/lib/StorePlugin.cpp
index 1cbdbff..0fb3512 100644
--- a/lib/StorePlugin.cpp
+++ b/lib/StorePlugin.cpp
@@ -36,16 +36,15 @@ using namespace std;
 struct StorePlugin : public Plugin {
 
     mrg::msgstore::MessageStoreImpl::StoreOptions options;
-    boost::shared_ptr<qpid::broker::MessageStore> store;
 
     Options* getOptions() { return &options; }
 
     void earlyInitialize (Plugin::Target& target)
     {
         Broker* broker = dynamic_cast<Broker*>(&target);
-        store.reset(new mrg::msgstore::MessageStoreImpl ());
+        if (!broker) return;
+        boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl ());
         DataDir& dataDir = broker->getDataDir ();
-
         if (options.storeDir.empty ())
         {
             if (!dataDir.isEnabled ())
@@ -67,8 +66,7 @@ struct StorePlugin : public Plugin {
 
     void finalize()
     {
-        MessageStore* sp = store.get();
-        static_cast<mrg::msgstore::MessageStoreImpl*>(sp)->finalize();
+        // This function intentionally left blank
     }
 
     const char* id() {return "StorePlugin";}
-- 
1.6.6.1

From 91dcd499e54fd82b0a808f705b18c2b6c5775bd0 Mon Sep 17 00:00:00 2001
From: aconway <aconway@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Mon, 31 May 2010 14:08:28 +0000
Subject: [PATCH 04/13] Skip cluster_tests.ShortTests.test_sasl as it depends on a SASL database not available in the store build environment.

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3996 06e15bec-b515-0410-bef0-cc27a458cf48
---
 tests/cluster/run_python_cluster_tests |    7 ++++++-
 1 files changed, 6 insertions(+), 1 deletions(-)

diff --git a/tests/cluster/run_python_cluster_tests b/tests/cluster/run_python_cluster_tests
index 4bd2126..ce96152 100755
--- a/tests/cluster/run_python_cluster_tests
+++ b/tests/cluster/run_python_cluster_tests
@@ -28,8 +28,13 @@ func_check_qpid_python || exit 0           # A warning, not a failure.
 echo "Running Python cluster tests..."
 OUTDIR=brokertest.tmp
 rm -rf $OUTDIR
-# Ignore tests requiring a store by default. 
+
+# Ignore tests known to fail.
 CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:-"-I ${CLUSTER_TESTS_FAIL}"}
+# Ignore tests that don't work in the store environment
+# SASL test needs sasl test database which is not installed.
+CLUSTER_TESTS_IGNORE="${CLUSTER_TESTS_IGNORE} -i cluster_tests.ShortTests.test_sasl"
+
 CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
 
 TEST_CMD="${QPID_PYTHON_TEST} -m cluster_tests ${CLUSTER_TESTS_IGNORE} ${CLUSTER_TESTS} -DOUTDIR=$OUTDIR"
-- 
1.6.6.1

From d4795a9796726dbdb1f911c81e3b2e899fbfc40e Mon Sep 17 00:00:00 2001
From: aconway <aconway@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Mon, 31 May 2010 19:31:45 +0000
Subject: [PATCH 05/13] Fix valgrind errors caused by order of destruction issue.

Added a callback so that MessageStoreImpl is informed when JournalImpl
instances are deleted and can remove them from its map.

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3997 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/JournalImpl.cpp      |    7 +-
 lib/JournalImpl.h        |  467 +++++++++++++++++++++++-----------------------
 lib/MessageStoreImpl.cpp |   17 ++-
 lib/MessageStoreImpl.h   |    5 +
 4 files changed, 261 insertions(+), 235 deletions(-)

diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index ed1c334..a660d3c 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -53,7 +53,8 @@ JournalImpl::JournalImpl(const std::string& journalId,
                          const std::string& journalBaseFilename,
                          const qpid::sys::Duration getEventsTimeout,
                          const qpid::sys::Duration flushTimeout,
-                         qpid::management::ManagementAgent* a):
+                         qpid::management::ManagementAgent* a,
+                         DeleteCallback onDelete):
                          jcntl(journalId, journalDirectory, journalBaseFilename),
                          getEventsTimerSetFlag(false),
                          lastReadRid(0),
@@ -65,7 +66,8 @@ JournalImpl::JournalImpl(const std::string& journalId,
                          _dtok(),
                          _external(false),
                          _agent(a),
-                         _mgmtObject(0)
+                         _mgmtObject(0),
+                         deleteCallback(onDelete)
 {
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
@@ -108,6 +110,7 @@ JournalImpl::JournalImpl(const std::string& journalId,
 
 JournalImpl::~JournalImpl()
 {
+    if (deleteCallback) deleteCallback(*this);
     if (_init_flag && !_stop_flag){
     	try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
         catch (const jexception& e) { log(LOG_ERROR, e.what()); }
diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h
index a3f5a17..aab8467 100644
--- a/lib/JournalImpl.h
+++ b/lib/JournalImpl.h
@@ -1,25 +1,25 @@
 /*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+  Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
 
- This file is part of the Qpid async store library msgstore.so.
+  This file is part of the Qpid async store library msgstore.so.
 
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
 
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- Lesser General Public License for more details.
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
 
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
- USA
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+  USA
 
- The GNU Lesser General Public License is available in the file COPYING.
- */
+  The GNU Lesser General Public License is available in the file COPYING.
+*/
 
 #ifndef _JournalImpl_
 #define _JournalImpl_
@@ -38,219 +38,228 @@
 #include "qmf/com/redhat/rhm/store/Journal.h"
 
 namespace mrg {
-    namespace msgstore {
-
-        class JournalImpl;
-
-        class InactivityFireEvent : public qpid::sys::TimerTask
-        {
-            JournalImpl* _parent;
-            qpid::sys::Mutex _ife_lock;
-
-        public:
-	        InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::sys::TimerTask(timeout), _parent(p) {}
-            virtual ~InactivityFireEvent() {}
-            void fire();
-            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
-        };
-
-        class GetEventsFireEvent : public qpid::sys::TimerTask
-        {
-            JournalImpl* _parent;
-            qpid::sys::Mutex _gefe_lock;
-
-        public:
-	        GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::sys::TimerTask(timeout), _parent(p) {}
-            virtual ~GetEventsFireEvent() {}
-            void fire();
-            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
-        };
-
-        class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
-        {
-        private:
-            static qpid::sys::Mutex _static_lock;
-            static qpid::sys::Timer* journalTimerPtr;
-            static u_int32_t cnt;
-
-            bool getEventsTimerSetFlag;
-            boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
-            qpid::sys::Mutex _getf_lock;
-
-            u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
-            std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
-
-            bool writeActivityFlag;
-            bool flushTriggeredFlag;
-            boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
-
-            // temp local vars for loadMsgContent below
-            void* _xidp;
-            void* _datap;
-            size_t _dlen;
-            mrg::journal::data_tok _dtok;
-            bool _external;
-
-            qpid::management::ManagementAgent* _agent;
-            qmf::com::redhat::rhm::store::Journal* _mgmtObject;
-
-        public:
-            JournalImpl(const std::string& journalId,
-                        const std::string& journalDirectory,
-                        const std::string& journalBaseFilename,
-                        const qpid::sys::Duration getEventsTimeout,
-                        const qpid::sys::Duration flushTimeout,
-                        qpid::management::ManagementAgent* agent);
-
-            virtual ~JournalImpl();
-
-            void initialize(const u_int16_t num_jfiles,
-                            const bool auto_expand,
-                            const u_int16_t ae_max_jfiles,
-                            const u_int32_t jfsize_sblks,
-                            const u_int16_t wcache_num_pages,
-                            const u_int32_t wcache_pgsize_sblks,
-                            mrg::journal::aio_callback* const cbp);
-
-            inline void initialize(const u_int16_t num_jfiles,
-                                   const bool auto_expand,
-                                   const u_int16_t ae_max_jfiles,
-                                   const u_int32_t jfsize_sblks,
-                                   const u_int16_t wcache_num_pages,
-                                   const u_int32_t wcache_pgsize_sblks) {
-                initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
-                        this);
-            }
-
-            void recover(const u_int16_t num_jfiles,
-                         const bool auto_expand,
-                         const u_int16_t ae_max_jfiles,
-                         const u_int32_t jfsize_sblks,
-                         const u_int16_t wcache_num_pages,
-                         const u_int32_t wcache_pgsize_sblks,
-                         mrg::journal::aio_callback* const cbp,
-                         boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
-                         u_int64_t& highest_rid,
-                         u_int64_t queue_id);
-
-            inline void recover(const u_int16_t num_jfiles,
-                                const bool auto_expand,
-                                const u_int16_t ae_max_jfiles,
-                                const u_int32_t jfsize_sblks,
-                                const u_int16_t wcache_num_pages,
-                                const u_int32_t wcache_pgsize_sblks,
-                                boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
-                                u_int64_t& highest_rid,
-                                u_int64_t queue_id) {
-                recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
-                        this, prep_tx_list_ptr, highest_rid, queue_id);
-            }
-
-            void recover_complete();
-
-            // Temporary fn to read and save last msg read from journal so it can be assigned
-            // in chunks. To be replaced when coding to do this direct from the journal is ready.
-            // Returns true if the record is extern, false if local.
-            bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
-
-            // Overrides for write inactivity timer
-            void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
-                    const size_t this_data_len, mrg::journal::data_tok* dtokp,
-                    const bool transient = false);
-
-            void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
-                    const bool transient = false);
-
-            void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
-                    const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
-                    const bool transient = false);
-
-            void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
-                    const std::string& xid, const bool transient = false);
-
-            void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
-
-            void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
-
-            mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
-                    size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
-                    bool ignore_pending_txns = false);
-
-            void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
-
-            void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
-
-            void stop(bool block_till_aio_cmpl = false);
-
-            // Logging
-            void log(mrg::journal::log_level level, const std::string& log_stmt) const;
-            void log(mrg::journal::log_level level, const char* const log_stmt) const;
-
-            // Overrides for get_events timer
-            mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
-
-            // TimerTask callback
-            void getEventsFire();
-            void flushFire();
-
-            // AIO callbacks
-            virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
-            virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
-
-            qpid::management::ManagementObject* GetManagementObject (void) const
-                { return _mgmtObject; }
-
-            qpid::management::Manageable::status_t ManagementMethod (uint32_t,
-                                                                     qpid::management::Args&,
-                                                                     std::string&);
-
-        private:
-            void free_read_buffers();
-
-            inline void setGetEventTimer()
-            {
-                assert(journalTimerPtr != 0);
-                getEventsFireEventsPtr->setupNextFire();
-                journalTimerPtr->add(getEventsFireEventsPtr);
-                getEventsTimerSetFlag = true;
-            }
-            void handleIoResult(const mrg::journal::iores r);
-
-            // Management instrumentation callbacks overridden from jcntl
-            inline void instr_incr_outstanding_aio_cnt() {
-                if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
-            }
-            inline void instr_decr_outstanding_aio_cnt() {
-                if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
-            }
-        }; // class JournalImpl
-
-        class TplJournalImpl : public JournalImpl
-        {
-        public:
-            TplJournalImpl(const std::string& journalId,
-                           const std::string& journalDirectory,
-                           const std::string& journalBaseFilename,
-                           const qpid::sys::Duration getEventsTimeout,
-                           const qpid::sys::Duration flushTimeout,
-                           qpid::management::ManagementAgent* agent) :
-                JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
-            {}
-
-            ~TplJournalImpl() {}
-
-            // Special version of read_data_record that ignores transactions - needed when reading the TPL
-            inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
-                    void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
-                    mrg::journal::data_tok* const dtokp) {
-                return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
-            }
-            inline void read_reset() { _rmgr.invalidate(); }
-        }; // class TplJournalImpl
-
-    } // namespace msgstore
+namespace msgstore {
+
+class JournalImpl;
+
+class InactivityFireEvent : public qpid::sys::TimerTask
+{
+    JournalImpl* _parent;
+    qpid::sys::Mutex _ife_lock;
+
+  public:
+    InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+        qpid::sys::TimerTask(timeout), _parent(p) {}
+    virtual ~InactivityFireEvent() {}
+    void fire();
+    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+};
+
+class GetEventsFireEvent : public qpid::sys::TimerTask
+{
+    JournalImpl* _parent;
+    qpid::sys::Mutex _gefe_lock;
+
+  public:
+    GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+        qpid::sys::TimerTask(timeout), _parent(p) {}
+    virtual ~GetEventsFireEvent() {}
+    void fire();
+    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+};
+
+class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
+{
+  public:
+    typedef boost::function<void (JournalImpl&)> DeleteCallback;
+    
+  private:
+    static qpid::sys::Mutex _static_lock;
+    static qpid::sys::Timer* journalTimerPtr;
+    static u_int32_t cnt;
+
+    bool getEventsTimerSetFlag;
+    boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
+    qpid::sys::Mutex _getf_lock;
+
+    u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
+    std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
+
+    bool writeActivityFlag;
+    bool flushTriggeredFlag;
+    boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+
+    // temp local vars for loadMsgContent below
+    void* _xidp;
+    void* _datap;
+    size_t _dlen;
+    mrg::journal::data_tok _dtok;
+    bool _external;
+
+    qpid::management::ManagementAgent* _agent;
+    qmf::com::redhat::rhm::store::Journal* _mgmtObject;
+    DeleteCallback deleteCallback;
+    
+  public:
+    
+    JournalImpl(const std::string& journalId,
+                const std::string& journalDirectory,
+                const std::string& journalBaseFilename,
+                const qpid::sys::Duration getEventsTimeout,
+                const qpid::sys::Duration flushTimeout,
+                qpid::management::ManagementAgent* agent,
+                DeleteCallback deleteCallback=DeleteCallback() );
+
+    virtual ~JournalImpl();
+
+    void initialize(const u_int16_t num_jfiles,
+                    const bool auto_expand,
+                    const u_int16_t ae_max_jfiles,
+                    const u_int32_t jfsize_sblks,
+                    const u_int16_t wcache_num_pages,
+                    const u_int32_t wcache_pgsize_sblks,
+                    mrg::journal::aio_callback* const cbp);
+
+    inline void initialize(const u_int16_t num_jfiles,
+                           const bool auto_expand,
+                           const u_int16_t ae_max_jfiles,
+                           const u_int32_t jfsize_sblks,
+                           const u_int16_t wcache_num_pages,
+                           const u_int32_t wcache_pgsize_sblks) {
+        initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+                   this);
+    }
+
+    void recover(const u_int16_t num_jfiles,
+                 const bool auto_expand,
+                 const u_int16_t ae_max_jfiles,
+                 const u_int32_t jfsize_sblks,
+                 const u_int16_t wcache_num_pages,
+                 const u_int32_t wcache_pgsize_sblks,
+                 mrg::journal::aio_callback* const cbp,
+                 boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+                 u_int64_t& highest_rid,
+                 u_int64_t queue_id);
+
+    inline void recover(const u_int16_t num_jfiles,
+                        const bool auto_expand,
+                        const u_int16_t ae_max_jfiles,
+                        const u_int32_t jfsize_sblks,
+                        const u_int16_t wcache_num_pages,
+                        const u_int32_t wcache_pgsize_sblks,
+                        boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+                        u_int64_t& highest_rid,
+                        u_int64_t queue_id) {
+        recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+                this, prep_tx_list_ptr, highest_rid, queue_id);
+    }
+
+    void recover_complete();
+
+    // Temporary fn to read and save last msg read from journal so it can be assigned
+    // in chunks. To be replaced when coding to do this direct from the journal is ready.
+    // Returns true if the record is extern, false if local.
+    bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
+
+    // Overrides for write inactivity timer
+    void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+                             const size_t this_data_len, mrg::journal::data_tok* dtokp,
+                             const bool transient = false);
+
+    void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+                                    const bool transient = false);
+
+    void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+                                 const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
+                                 const bool transient = false);
+
+    void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+                                        const std::string& xid, const bool transient = false);
+
+    void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
+
+    void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+
+    mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
+                                         size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
+                                         bool ignore_pending_txns = false);
+
+    void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
+
+    void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
+
+    void stop(bool block_till_aio_cmpl = false);
+
+    // Logging
+    void log(mrg::journal::log_level level, const std::string& log_stmt) const;
+    void log(mrg::journal::log_level level, const char* const log_stmt) const;
+
+    // Overrides for get_events timer
+    mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
+
+    // TimerTask callback
+    void getEventsFire();
+    void flushFire();
+
+    // AIO callbacks
+    virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
+    virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
+
+    qpid::management::ManagementObject* GetManagementObject (void) const
+    { return _mgmtObject; }
+
+    qpid::management::Manageable::status_t ManagementMethod (uint32_t,
+                                                             qpid::management::Args&,
+                                                             std::string&);
+
+    void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
+
+  private:
+    void free_read_buffers();
+
+    inline void setGetEventTimer()
+    {
+        assert(journalTimerPtr != 0);
+        getEventsFireEventsPtr->setupNextFire();
+        journalTimerPtr->add(getEventsFireEventsPtr);
+        getEventsTimerSetFlag = true;
+    }
+    void handleIoResult(const mrg::journal::iores r);
+
+    // Management instrumentation callbacks overridden from jcntl
+    inline void instr_incr_outstanding_aio_cnt() {
+        if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
+    }
+    inline void instr_decr_outstanding_aio_cnt() {
+        if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
+    }
+
+}; // class JournalImpl
+
+class TplJournalImpl : public JournalImpl
+{
+  public:
+    TplJournalImpl(const std::string& journalId,
+                   const std::string& journalDirectory,
+                   const std::string& journalBaseFilename,
+                   const qpid::sys::Duration getEventsTimeout,
+                   const qpid::sys::Duration flushTimeout,
+                   qpid::management::ManagementAgent* agent) :
+        JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+    {}
+
+    ~TplJournalImpl() {}
+
+    // Special version of read_data_record that ignores transactions - needed when reading the TPL
+    inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
+                                                void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+                                                mrg::journal::data_tok* const dtokp) {
+        return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+    }
+    inline void read_reset() { _rmgr.invalidate(); }
+}; // class TplJournalImpl
+
+} // namespace msgstore
 } // namespace mrg
 
 #endif
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index e7cb405..04297e8 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -367,6 +367,7 @@ void MessageStoreImpl::finalize()
         for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
         {
             JournalImpl* jQueue = i->second;
+            jQueue->resetDeleteCallback();
             if (jQueue->is_ready()) jQueue->stop(true);
         }
     }
@@ -479,7 +480,8 @@ void MessageStoreImpl::create(PersistableQueue& queue,
     }
 
     jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
-                             defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+                             defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+                             boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
     {
         qpid::sys::Mutex::ScopedLock sl(journalListLock);
         journalList[queue.getName()]=jQueue;
@@ -521,7 +523,7 @@ void MessageStoreImpl::destroy(PersistableQueue& queue)
         queue.setExternalQueueStore(0); // will delete the journal if exists
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
-            journalList.erase(journalList.find(queue.getName()));
+            journalList.erase(queue.getName());
         }
     }
 }
@@ -762,7 +764,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
             break;
         }
         jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
-                                 defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+                                 defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+                                 boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
             journalList[queueName] = jQueue;
@@ -1644,6 +1647,11 @@ std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for
 
 std::string MessageStoreImpl::getStoreDir() const { return storeDir; }
 
+void MessageStoreImpl::journalDeleted(JournalImpl& j) {
+    qpid::sys::Mutex::ScopedLock sl(journalListLock);
+    journalList.erase(j.id());
+}
+
 MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
                                              qpid::Options(name),
                                              numJrnlFiles(defNumJrnlFiles),
@@ -1668,7 +1676,7 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
 //                 "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
 //         ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
 //                 "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
-        ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+         ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
                 "Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
         ("truncate", qpid::optValue(truncateFlag, "yes|no"),
                 "If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve "
@@ -1687,3 +1695,4 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
                 "Lower values decrease latency at the expense of throughput.")
         ;
 }
+
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 12e1d97..d650020 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -149,6 +149,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
 
     qmf::com::redhat::rhm::store::Store* mgmtObject;
     qpid::management::ManagementAgent* agent;
+    
 
     // Parameter validation and calculation
     static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
@@ -359,6 +360,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
         { return qpid::management::Manageable::STATUS_OK; }
 
     std::string getStoreDir() const;
+
+  private:
+    void journalDeleted(JournalImpl&);
+
 }; // class MessageStoreImpl
 
 } // namespace msgstore
-- 
1.6.6.1

From 821aca95eeb79c7c83bc6d47176d188d8a8a1ba1 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Tue, 1 Jun 2010 16:02:36 +0000
Subject: [PATCH 06/13] Fix for Bug 598557: "qpidd --no-data dir with store loaded segfaults".

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3998 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/MessageStoreImpl.cpp |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 04297e8..e3b2599 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -361,7 +361,7 @@ void MessageStoreImpl::init()
 
 void MessageStoreImpl::finalize()
 {
-    if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
+    if (tplStorePtr.get() && tplStorePtr->is_ready()) tplStorePtr->stop(true);
     {
         qpid::sys::Mutex::ScopedLock sl(journalListLock);
         for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
-- 
1.6.6.1

From 3e2438975882993c66f4f8b9fb40dd37da3fe95e Mon Sep 17 00:00:00 2001
From: gordonsim <gordonsim@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Wed, 2 Jun 2010 19:33:45 +0000
Subject: [PATCH 07/13] Set reliability for link to prevent auto-delete being set on subscription queues (broker now cleans these up even for connections that are open when broker is shutdown)

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4005 06e15bec-b515-0410-bef0-cc27a458cf48
---
 tests/python_tests/client_persistence.py |    6 +++---
 tests/python_tests/store_test.py         |    4 +++-
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/tests/python_tests/client_persistence.py b/tests/python_tests/client_persistence.py
index f16e548..dc197dc 100644
--- a/tests/python_tests/client_persistence.py
+++ b/tests/python_tests/client_persistence.py
@@ -103,9 +103,9 @@ class ExchangeQueueTests(StoreTest):
         broker = self.broker(store_args(), name="testFanout", expect=EXPECT_EXIT_OK)
         ssn = broker.connect().session()
         snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}")
-        ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True}}")
-        ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True}}")
-        ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}")
         msg1 = Message("Msg1", durable=True, correlation_id="Msg0001")
         snd.send(msg1)
         msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
diff --git a/tests/python_tests/store_test.py b/tests/python_tests/store_test.py
index 61d3687..87dcefa 100644
--- a/tests/python_tests/store_test.py
+++ b/tests/python_tests/store_test.py
@@ -301,9 +301,11 @@ class StoreTest(BrokerTest):
         x_bindings_list = []
         for binding in binding_list:
             x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+        if durable: reliability = 'at-least-once'
+        else: reliability = None
         return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
                              link_name=link_name, durable=durable, x_declare_list=x_declare_list,
-                             x_bindings_list=x_bindings_list)
+                             x_bindings_list=x_bindings_list, link_reliability=reliability)
     
     def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
         """Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
-- 
1.6.6.1

From 22c33fd72f2e8e32a6537b1891934834dc7f2d95 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 4 Jun 2010 17:37:23 +0000
Subject: [PATCH 08/13] Fixes for various Coverity-indicated problems: 11689(MessageStoreImpl.cpp), 11691(jdir.cpp) and 11688(JournalImpl.cpp).

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4008 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/JournalImpl.cpp      |    4 +++-
 lib/MessageStoreImpl.cpp |    7 +++----
 lib/jrnl/jdir.cpp        |    1 +
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index a660d3c..5e1ed7a 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -498,7 +498,9 @@ JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
 void
 JournalImpl::stop(bool block_till_aio_cmpl)
 {
-    (dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()))->cancel();
+    InactivityFireEvent* ifep = dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get());
+    assert(ifep); // dynamic_cast can return null if the cast fails
+    ifep->cancel();
     jcntl::stop(block_till_aio_cmpl);
 
     if (_mgmtObject != 0) {
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index e3b2599..2262b0d 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -193,8 +193,7 @@ void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts,
                 << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
         return;
     }
-    u_int16_t q = opts->autoJrnlExpandMaxFiles;
-    if (q && q == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) {
+    if (p && p == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) {
         // num-jfiles is different from the default AND max-auto-expand-jfiles is still at default
         // change value of max-auto-expand-jfiles
         autoJrnlExpand = true;
@@ -1327,10 +1326,10 @@ void MessageStoreImpl::store(const PersistableQueue* queue,
                 }
             }
         } else {
-            THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: queue NULL.");
+            THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
        }
     } catch (const journal::jexception& e) {
-        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " +
                               e.what());
     }
 }
diff --git a/lib/jrnl/jdir.cpp b/lib/jrnl/jdir.cpp
index 74651bd..d26cef0 100644
--- a/lib/jrnl/jdir.cpp
+++ b/lib/jrnl/jdir.cpp
@@ -202,6 +202,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const
             break;
         }
     }
+    close_dir(dir, dirname, "push_down");
     return bak_dir_name;
 }
 
-- 
1.6.6.1

From fbd21b64d3caa4c6a1937dcc560973de1c173b2d Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 4 Jun 2010 18:43:28 +0000
Subject: [PATCH 09/13] Further tidy-up: closing directory handles in exception paths

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4009 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/jrnl/jdir.cpp |    5 +++++
 1 files changed, 5 insertions(+), 0 deletions(-)

diff --git a/lib/jrnl/jdir.cpp b/lib/jrnl/jdir.cpp
index d26cef0..b718f74 100644
--- a/lib/jrnl/jdir.cpp
+++ b/lib/jrnl/jdir.cpp
@@ -152,6 +152,7 @@ jdir::clear_dir(const std::string& dirname, const std::string&
                     newname << bak_dir << "/" << entry->d_name;
                     if (::rename(oldname.str().c_str(), newname.str().c_str()))
                     {
+                        ::closedir(dir);
                         std::ostringstream oss;
                         oss << "file=\"" << oldname.str() << "\" dest=\"" <<
                                 newname.str() << "\"" << FORMAT_SYSERR(errno);
@@ -195,6 +196,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const
             newname << bak_dir_name << "/" << target_dir;
             if (::rename(oldname.str().c_str(), newname.str().c_str()))
             {
+                ::closedir(dir);
                 std::ostringstream oss;
                 oss << "file=\"" << oldname.str() << "\" dest=\"" <<  newname.str() << "\"" << FORMAT_SYSERR(errno);
                 throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "jdir", "push_down");
@@ -284,6 +286,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
                 std::string full_name(dirname + "/" + entry->d_name);
                 if (::stat(full_name.c_str(), &s))
                 {
+                    ::closedir(dir);
                     std::ostringstream oss;
                     oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
                     throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
@@ -294,6 +297,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
                 {
                     if(::unlink(full_name.c_str()))
                     {
+                        ::closedir(dir);
                         std::ostringstream oss;
                         oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno);
                         throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir");
@@ -305,6 +309,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
                 }
                 else // all other types, throw up!
                 {
+                    ::closedir(dir);
                     std::ostringstream oss;
                     oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink.";
                     oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")";
-- 
1.6.6.1

From 7d09142721e18ff5769b9c35c446eab723793d44 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Tue, 8 Jun 2010 19:11:00 +0000
Subject: [PATCH 10/13] Fix for a recent regression in r.3982 in which a lock wich protects the TPL from being initialized by multiple threads was erroneously removed. The lock is now replaced.

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4017 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/MessageStoreImpl.cpp |    3 ++-
 lib/MessageStoreImpl.h   |    1 +
 2 files changed, 3 insertions(+), 1 deletions(-)

diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 2262b0d..5f98055 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -408,7 +408,8 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
 
 void MessageStoreImpl::chkTplStoreInit()
 {
-    // Don't take lock unless necessary
+    // Prevent multiple threads from late-initializing the TPL
+    qpid::sys::Mutex::ScopedLock sl(tplInitLock);
     if (!tplStorePtr->is_ready()) {
         journal::jdir::create_dir(getTplBaseDir());
         tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index d650020..2659f32 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -124,6 +124,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
     // Pointer to Transaction Prepared List (TPL) journal instance
     boost::shared_ptr<TplJournalImpl> tplStorePtr;
     TplRecoverMap tplRecoverMap;
+    qpid::sys::Mutex tplInitLock;
     JournalListMap journalList;
     qpid::sys::Mutex journalListLock;
 
-- 
1.6.6.1

From 1bb317d8e88c910b5247b54a9530a5505fb67168 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Thu, 17 Jun 2010 18:58:04 +0000
Subject: [PATCH 11/13] Added variable MSGSTORE_VERSION_INFO to control msgstore.so.x.x.x lib version numbers

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4034 06e15bec-b515-0410-bef0-cc27a458cf48
---
 configure.ac    |    9 +--------
 lib/Makefile.am |    6 ++++--
 2 files changed, 5 insertions(+), 10 deletions(-)

diff --git a/configure.ac b/configure.ac
index 9a32097..3c014d9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -21,7 +21,7 @@ dnl The GNU Lesser General Public License is available in the file COPYING.
 dnl 
 dnl Process this file with autoconf to produce a configure script.
 
-AC_INIT([msg-store], [0.6], [rhemrg-users-list@redhat.com])
+AC_INIT([msg-store], [0.7], [rhemrg-users-list@redhat.com])
 AC_CONFIG_AUX_DIR([build-aux])
 AM_INIT_AUTOMAKE([dist-bzip2])
 
@@ -201,13 +201,6 @@ if test x$DB_CXX_HEADER_PREFIX = x; then
 fi
 AC_SUBST(DB_CXX_HEADER_PREFIX)
 
-# Set the argument to be used in "libtool -version-info ARG".
-QPID_CURRENT=1
-QPID_REVISION=0
-QPID_AGE=1
-LIBTOOL_VERSION_INFO_ARG=$QPID_CURRENT:$QPID_REVISION:$QPID_AGE
-AC_SUBST(LIBTOOL_VERSION_INFO_ARG)
-
 gl_CLOCK_TIME
 
 # We use valgrind for the tests.  See if it's available.
diff --git a/lib/Makefile.am b/lib/Makefile.am
index ab72d96..8f0301b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -34,8 +34,10 @@ msgstore_la_LIBADD =         \
   $(LIB_CLOCK_GETTIME)          \
   $(QPID_LIBS)
 
-msgstore_la_LDFLAGS =        \
-  $(PLUGINLDFLAGS)
+MSGSTORE_VERSION_INFO = 1:0:0
+msgstore_la_LDFLAGS =            \
+  $(PLUGINLDFLAGS)               \
+  -version-info $(MSGSTORE_VERSION_INFO)
 
 msgstore_la_SOURCES =        \
   StorePlugin.cpp               \
-- 
1.6.6.1

From 01305c0b44a6167ca587ddd940361bd623677564 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 18 Jun 2010 14:06:28 +0000
Subject: [PATCH 12/13] Removed the lib version info from the previous checkin

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4036 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/Makefile.am |   10 ++++------
 1 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/lib/Makefile.am b/lib/Makefile.am
index 8f0301b..95428f1 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -27,19 +27,17 @@ PLUGINLDFLAGS=-no-undefined -module -avoid-version
 dmoduledir=$(libdir)/qpid/daemon
 dmodule_LTLIBRARIES = msgstore.la
  
-msgstore_la_LIBADD =         \
+msgstore_la_LIBADD =            \
   $(APR_LIBS)                   \
   $(LIB_DLOPEN)                 \
   $(LIB_BERKELEY_DB)            \
   $(LIB_CLOCK_GETTIME)          \
   $(QPID_LIBS)
 
-MSGSTORE_VERSION_INFO = 1:0:0
-msgstore_la_LDFLAGS =            \
-  $(PLUGINLDFLAGS)               \
-  -version-info $(MSGSTORE_VERSION_INFO)
+msgstore_la_LDFLAGS =           \
+  $(PLUGINLDFLAGS)
 
-msgstore_la_SOURCES =        \
+msgstore_la_SOURCES =           \
   StorePlugin.cpp               \
   BindingDbt.cpp                \
   BufferValue.cpp               \
-- 
1.6.6.1

From fe4143cc7226143cb3eb025efcf0e6a8d873866d Mon Sep 17 00:00:00 2001
From: aconway <aconway@06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Mon, 28 Jun 2010 18:18:31 +0000
Subject: [PATCH 13/13] Bug 607748 - Crash on exit in store cluster tests.

This is an order-of-static-destructors problem.

This is an order-of-static-destructors problem. Fixed by having the
store use the broker's Timer. This ensures orderly shut down as the
brokers destructor will destroy the store first and then the timer.

git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4053 06e15bec-b515-0410-bef0-cc27a458cf48
---
 lib/JournalImpl.cpp          |   31 ++++++----------------------
 lib/JournalImpl.h            |   17 ++++++++++-----
 lib/MessageStoreImpl.cpp     |    9 ++++---
 lib/MessageStoreImpl.h       |    7 +++++-
 lib/StorePlugin.cpp          |    2 +-
 tests/OrderingTest.cpp       |    7 ++++-
 tests/SimpleTest.cpp         |   45 ++++++++++++++++++++++-------------------
 tests/TransactionalTest.cpp  |    9 +++++--
 tests/TwoPhaseCommitTest.cpp |    9 +++++--
 9 files changed, 71 insertions(+), 65 deletions(-)

diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index 5e1ed7a..962125b 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -33,6 +33,7 @@
 #include "qmf/com/redhat/rhm/store/EventFull.h"
 #include "qmf/com/redhat/rhm/store/EventRecovered.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
 #include "StoreException.h"
 
 using namespace mrg::msgstore;
@@ -40,15 +41,12 @@ using namespace mrg::journal;
 using qpid::management::ManagementAgent;
 namespace _qmf = qmf::com::redhat::rhm::store;
 
-qpid::sys::Mutex JournalImpl::_static_lock;
-qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
-u_int32_t JournalImpl::cnt = 0;
-
 void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
 
 void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
 
-JournalImpl::JournalImpl(const std::string& journalId,
+JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+                         const std::string& journalId,
                          const std::string& journalDirectory,
                          const std::string& journalBaseFilename,
                          const qpid::sys::Duration getEventsTimeout,
@@ -56,6 +54,7 @@ JournalImpl::JournalImpl(const std::string& journalId,
                          qpid::management::ManagementAgent* a,
                          DeleteCallback onDelete):
                          jcntl(journalId, journalDirectory, journalBaseFilename),
+                         timer(timer_),
                          getEventsTimerSetFlag(false),
                          lastReadRid(0),
                          writeActivityFlag(false),
@@ -72,13 +71,8 @@ JournalImpl::JournalImpl(const std::string& journalId,
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
     {
-        qpid::sys::Mutex::ScopedLock sl(_static_lock);
-        if (journalTimerPtr == 0)
-            journalTimerPtr = new qpid::sys::Timer;
-        assert (journalTimerPtr != 0);
-        cnt++;
-        journalTimerPtr->start();
-        journalTimerPtr->add(inactivityFireEventPtr);
+        timer.start();
+        timer.add(inactivityFireEventPtr);
     }
 
     if (_agent != 0)
@@ -119,15 +113,6 @@ JournalImpl::~JournalImpl()
     inactivityFireEventPtr->cancel();
     free_read_buffers();
 
-    {
-        qpid::sys::Mutex::ScopedLock sl(_static_lock);
-        if (journalTimerPtr && --cnt == 0)
-        {
-            delete journalTimerPtr;
-            journalTimerPtr = 0;
-        }
-    }
-
     if (_mgmtObject != 0) {
         _mgmtObject->resourceDestroy();
         _mgmtObject = 0;
@@ -564,9 +549,7 @@ JournalImpl::flushFire()
     }
     inactivityFireEventPtr->setupNextFire();
     {
-        qpid::sys::Mutex::ScopedLock sl(_static_lock);
-        assert(journalTimerPtr != 0);
-        journalTimerPtr->add(inactivityFireEventPtr);
+        timer.add(inactivityFireEventPtr);
     }
 }
 
diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h
index aab8467..b85cf02 100644
--- a/lib/JournalImpl.h
+++ b/lib/JournalImpl.h
@@ -37,6 +37,10 @@
 #include "qpid/management/Manageable.h"
 #include "qmf/com/redhat/rhm/store/Journal.h"
 
+namespace qpid { namespace sys {
+class Timer;
+}}
+
 namespace mrg {
 namespace msgstore {
 
@@ -75,9 +79,9 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
     
   private:
     static qpid::sys::Mutex _static_lock;
-    static qpid::sys::Timer* journalTimerPtr;
     static u_int32_t cnt;
 
+    qpid::sys::Timer& timer;
     bool getEventsTimerSetFlag;
     boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
     qpid::sys::Mutex _getf_lock;
@@ -102,7 +106,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
     
   public:
     
-    JournalImpl(const std::string& journalId,
+    JournalImpl(qpid::sys::Timer& timer,
+                const std::string& journalId,
                 const std::string& journalDirectory,
                 const std::string& journalBaseFilename,
                 const qpid::sys::Duration getEventsTimeout,
@@ -219,9 +224,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
 
     inline void setGetEventTimer()
     {
-        assert(journalTimerPtr != 0);
         getEventsFireEventsPtr->setupNextFire();
-        journalTimerPtr->add(getEventsFireEventsPtr);
+        timer.add(getEventsFireEventsPtr);
         getEventsTimerSetFlag = true;
     }
     void handleIoResult(const mrg::journal::iores r);
@@ -239,13 +243,14 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
 class TplJournalImpl : public JournalImpl
 {
   public:
-    TplJournalImpl(const std::string& journalId,
+    TplJournalImpl(qpid::sys::Timer& timer,
+                   const std::string& journalId,
                    const std::string& journalDirectory,
                    const std::string& journalBaseFilename,
                    const qpid::sys::Duration getEventsTimeout,
                    const qpid::sys::Duration flushTimeout,
                    qpid::management::ManagementAgent* agent) :
-        JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+        JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
     {}
 
     ~TplJournalImpl() {}
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 5f98055..e4f98b5 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -62,7 +62,7 @@ MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
                                                               tpc_flag(_tpc_flag)
 {}
 
-MessageStoreImpl::MessageStoreImpl(const char* envpath) :
+MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) :
                                  numJrnlFiles(0),
                                  autoJrnlExpand(false),
                                  autoJrnlExpandMaxFiles(0),
@@ -77,6 +77,7 @@ MessageStoreImpl::MessageStoreImpl(const char* envpath) :
                                  highestRid(0),
                                  isInit(false),
                                  envPath(envpath),
+                                 timer(timer_),
                                  mgmtObject(0),
                                  agent(0)
 {}
@@ -339,7 +340,7 @@ void MessageStoreImpl::init()
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
-        tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
+        tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
         txn.commit();
     } catch (const journal::jexception& e) {
         QPID_LOG(error, "Journal Exception occurred while initializing store: " << e);
@@ -479,7 +480,7 @@ void MessageStoreImpl::create(PersistableQueue& queue,
         return;
     }
 
-    jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
+    jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
                              defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                              boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
     {
@@ -763,7 +764,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
             QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
             break;
         }
-        jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
+        jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName), std::string("JournalData"),
                                  defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                                  boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
         {
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 2659f32..8e46dd2 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -45,6 +45,10 @@
 #define DB_BUFFER_SMALL ENOMEM
 #endif
 
+namespace qpid { namespace sys {
+class Timer;
+}}
+
 namespace mrg {
 namespace msgstore {
 
@@ -147,6 +151,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
     u_int64_t highestRid;
     bool isInit;
     const char* envPath;
+    qpid::sys::Timer& timer;
 
     qmf::com::redhat::rhm::store::Store* mgmtObject;
     qpid::management::ManagementAgent* agent;
@@ -266,7 +271,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
   public:
     typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
 
-    MessageStoreImpl(const char* envpath = 0);
+    MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0);
 
     virtual ~MessageStoreImpl();
 
diff --git a/lib/StorePlugin.cpp b/lib/StorePlugin.cpp
index 0fb3512..8231bd6 100644
--- a/lib/StorePlugin.cpp
+++ b/lib/StorePlugin.cpp
@@ -43,7 +43,7 @@ struct StorePlugin : public Plugin {
     {
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl ());
+        boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl (broker->getTimer()));
         DataDir& dataDir = broker->getDataDir ();
         if (options.storeDir.empty ())
         {
diff --git a/tests/OrderingTest.cpp b/tests/OrderingTest.cpp
index 16f88d0..10fda1d 100644
--- a/tests/OrderingTest.cpp
+++ b/tests/OrderingTest.cpp
@@ -30,6 +30,9 @@
 #include <qpid/broker/RecoveryManagerImpl.h>
 #include <qpid/framing/AMQHeaderBody.h>
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
 
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
@@ -59,7 +62,7 @@ int counter = 1;
 
 void setup()
 {
-    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
     store->init(test_dir, 4, 1, true); // truncate store
 
     queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -98,7 +101,7 @@ void restart()
     queue.reset();
     store.reset();
 
-    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
     store->init(test_dir, 4, 1);
     ExchangeRegistry exchanges;
     LinkRegistry links;
diff --git a/tests/SimpleTest.cpp b/tests/SimpleTest.cpp
index 4d5f155..c62869d 100644
--- a/tests/SimpleTest.cpp
+++ b/tests/SimpleTest.cpp
@@ -32,6 +32,9 @@
 #include <qpid/framing/AMQHeaderBody.h>
 #include <qpid/framing/FieldTable.h>
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
 
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
@@ -92,7 +95,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName,
                    const string& key, const FieldTable& args)
 {
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
         Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -102,7 +105,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName,
         store.bind(*exchange, *queue, key, args);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
@@ -121,7 +124,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName,
         store.unbind(*exchange, *queue, key, args);
     }
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
@@ -148,7 +151,7 @@ QPID_AUTO_TEST_CASE(CreateDelete)
     SET_LOG_LEVEL("error+"); // This only needs to be set once.
 
     cout << test_filename << ".CreateDelete: " << flush;
-    MessageStoreImpl store;
+    MessageStoreImpl store(timer);
     store.init(test_dir, 4, 1, true); // truncate store
     string name("CreateDeleteQueue");
     Queue queue(name, 0, &store, 0);
@@ -164,7 +167,7 @@ QPID_AUTO_TEST_CASE(CreateDelete)
 QPID_AUTO_TEST_CASE(EmptyRecover)
 {
     cout << test_filename << ".EmptyRecover: " << flush;
-    MessageStoreImpl store;
+    MessageStoreImpl store(timer);
     store.init(test_dir, 4, 1, true); // truncate store
     QueueRegistry registry;
     registry.setStore (&store);
@@ -181,7 +184,7 @@ QPID_AUTO_TEST_CASE(QueueCreate)
     uint64_t id(0);
     string name("MyDurableQueue");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue queue(name, 0, &store, 0);
         store.create(queue, qpid::framing::FieldTable());
@@ -189,7 +192,7 @@ QPID_AUTO_TEST_CASE(QueueCreate)
         id = queue.getPersistenceId();
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -209,7 +212,7 @@ QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
     std::auto_ptr<QueuePolicy> policy( QueuePolicy::createQueuePolicy(101, 202));
     string name("MyDurableQueue");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue queue(name, 0, &store, 0);
         FieldTable settings;
@@ -218,7 +221,7 @@ QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
         BOOST_REQUIRE(queue.getPersistenceId());
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -239,14 +242,14 @@ QPID_AUTO_TEST_CASE(QueueDestroy)
 
     string name("MyDurableQueue");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue queue(name, 0, &store, 0);
         store.create(queue, qpid::framing::FieldTable());
         store.destroy(queue);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -272,7 +275,7 @@ QPID_AUTO_TEST_CASE(Enqueue)
     string data1("abcdefg");
     string data2("hijklmn");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
         FieldTable settings;
@@ -290,7 +293,7 @@ QPID_AUTO_TEST_CASE(Enqueue)
         queue->enqueue(0, msg);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -331,7 +334,7 @@ QPID_AUTO_TEST_CASE(Dequeue)
         string routingKey("MyRoutingKey");
         Uuid messageId(true);
         string data("abcdefg");
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
         FieldTable settings;
@@ -347,7 +350,7 @@ QPID_AUTO_TEST_CASE(Dequeue)
         queue->dequeue(0, qm);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -370,7 +373,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
     FieldTable args;
     args.setString("a", "A");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         ExchangeRegistry registry;
         Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -379,7 +382,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
         BOOST_REQUIRE(id);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry registry;
 
@@ -393,7 +396,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
         store.destroy(*exchange);
     }
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry registry;
 
@@ -441,7 +444,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
     string key("my-routing-key");
     FieldTable args;
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
         Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -455,7 +458,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
         store.destroy(*queue1);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
@@ -472,7 +475,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
         store.destroy(*exchange);
     }
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
diff --git a/tests/TransactionalTest.cpp b/tests/TransactionalTest.cpp
index d6f6d7f..ac5a6b6 100644
--- a/tests/TransactionalTest.cpp
+++ b/tests/TransactionalTest.cpp
@@ -32,6 +32,9 @@
 #include "qpid/framing/AMQHeaderBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
 
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
@@ -69,7 +72,7 @@ class TestTxnCtxt : public TxnCtxt
 class TestMessageStore: public MessageStoreImpl
 {
   public:
-    TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+    TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {}
     std::auto_ptr<qpid::broker::TransactionContext> begin() {
         checkInit();
         // pass sequence number for c/a
@@ -109,7 +112,7 @@ Queue::shared_ptr queueB;
 template <class T>
 void setup()
 {
-    store = std::auto_ptr<T>(new T());
+    store = std::auto_ptr<T>(new T(timer));
     store->init(test_dir, 4, 1, true); // truncate store
 
     //create two queues:
@@ -128,7 +131,7 @@ void restart()
     queues.reset();
     store.reset();
 
-    store = std::auto_ptr<T>(new T());
+    store = std::auto_ptr<T>(new T(timer));
     store->init(test_dir, 4, 1);
     queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
     ExchangeRegistry exchanges;
diff --git a/tests/TwoPhaseCommitTest.cpp b/tests/TwoPhaseCommitTest.cpp
index 86d3976..f442310 100644
--- a/tests/TwoPhaseCommitTest.cpp
+++ b/tests/TwoPhaseCommitTest.cpp
@@ -32,6 +32,9 @@
 #include "qpid/log/Statement.h"
 #include "TxnCtxt.h"
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
 
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
@@ -182,7 +185,7 @@ class TwoPhaseCommitTest
     class TestMessageStore: public MessageStoreImpl
     {
       public:
-        TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+        TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {}
         std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid) {
             checkInit();
             IdSequence* jtx = &messageIdSequence;
@@ -325,7 +328,7 @@ class TwoPhaseCommitTest
     template <class T>
     void setup()
     {
-        store = std::auto_ptr<T>(new T());
+        store = std::auto_ptr<T>(new T(timer));
         store->init(test_dir, 4, 1, true); // truncate store
 
         //create two queues:
@@ -353,7 +356,7 @@ class TwoPhaseCommitTest
         queues.reset();
         links.reset();
 
-        store = std::auto_ptr<T>(new T());
+        store = std::auto_ptr<T>(new T(timer));
         store->init(test_dir, 4, 1);
         sys::Timer t;
         ExchangeRegistry exchanges;
-- 
1.6.6.1