diff --git a/src/common/rclconfig.cpp b/src/common/rclconfig.cpp index d4339edd..ff414b43 100644 --- a/src/common/rclconfig.cpp +++ b/src/common/rclconfig.cpp @@ -1197,7 +1197,6 @@ void RclConfig::initFrom(const RclConfig& r) m_datadir = r.m_datadir; m_keydir = r.m_keydir; m_cdirs = r.m_cdirs; - // We should use reference-counted objects instead! if (r.m_conf) m_conf = new ConfStack(*(r.m_conf)); if (r.mimemap) diff --git a/src/index/indexer.cpp b/src/index/indexer.cpp index 48327739..4cd9f013 100644 --- a/src/index/indexer.cpp +++ b/src/index/indexer.cpp @@ -64,6 +64,7 @@ bool ConfIndexer::index(bool resetbefore, ixType typestorun) deleteZ(m_fsindexer); m_fsindexer = new FsIndexer(m_config, &m_db, m_updater); if (!m_fsindexer || !m_fsindexer->index()) { + m_db.close(); return false; } } @@ -72,6 +73,7 @@ bool ConfIndexer::index(bool resetbefore, ixType typestorun) deleteZ(m_beagler); m_beagler = new BeagleQueueIndexer(m_config, &m_db, m_updater); if (!m_beagler || !m_beagler->index()) { + m_db.close(); return false; } } @@ -79,8 +81,10 @@ bool ConfIndexer::index(bool resetbefore, ixType typestorun) if (typestorun == IxTAll) { // Get rid of all database entries that don't exist in the // filesystem anymore. Only if all *configured* indexers ran. - if (m_updater && !m_updater->update(DbIxStatus::DBIXS_PURGE, string())) + if (m_updater && !m_updater->update(DbIxStatus::DBIXS_PURGE, string())) { + m_db.close(); return false; + } m_db.purge(); } diff --git a/src/index/recollindex.cpp b/src/index/recollindex.cpp index bcdf4e59..f7f42ace 100644 --- a/src/index/recollindex.cpp +++ b/src/index/recollindex.cpp @@ -471,8 +471,7 @@ int main(int argc, char **argv) } deleteZ(confindexer); o_reexec->insertArgs(vector(1, "-n")); - LOGDEB(("recollindex: calling reexec after init path with " - "option -n\n")); + LOGINFO(("recollindex: reexecuting with -n after initial full pass\n")); // Note that -n will be inside the reexec when we come // back, but the monitor will explicitely strip it before // starting a config change exec to ensure that we do a diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index 4f86dbf2..38f4a646 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -131,8 +131,7 @@ Db::Native::Native(Db *db) : m_rcldb(db), m_isopen(false), m_iswritable(false), m_noversionwrite(false) #ifdef IDX_THREADS - , m_wqueue("DbUpd", - m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first), + , m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first), m_totalworkns(0LL) #endif // IDX_THREADS { @@ -477,7 +476,7 @@ bool Db::open(OpenMode mode, OpenError *error) // Note: xapian has no close call, we delete and recreate the db bool Db::close() { - LOGDEB2(("Db::close()\n")); + LOGDEB1(("Db::close()\n")); return i_close(false); } bool Db::i_close(bool final) diff --git a/src/utils/ptmutex.h b/src/utils/ptmutex.h index 9aacfc4f..4cd9324f 100644 --- a/src/utils/ptmutex.h +++ b/src/utils/ptmutex.h @@ -27,9 +27,10 @@ class PTMutexInit { public: pthread_mutex_t m_mutex; + int m_status; PTMutexInit() { - pthread_mutex_init(&m_mutex, 0); + m_status = pthread_mutex_init(&m_mutex, 0); } }; @@ -50,6 +51,11 @@ public: pthread_mutex_unlock(&m_lock.m_mutex); } int ok() {return m_status == 0;} + // For pthread_cond_wait etc. + pthread_mutex_t *getMutex() + { + return &m_lock.m_mutex; + } private: PTMutexInit& m_lock; int m_status; diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h index 7c4bbb09..61c32154 100644 --- a/src/utils/workqueue.h +++ b/src/utils/workqueue.h @@ -30,6 +30,7 @@ using std::queue; using std::string; #include "debuglog.h" +#include "ptmutex.h" /// Just an initialized timespec. Not really used any more. class WQTData { @@ -67,8 +68,7 @@ public: m_workersleeps(0) { m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) && - (pthread_cond_init(&m_wcond, 0) == 0) && - (pthread_mutex_init(&m_mutex, 0) == 0); + (pthread_cond_init(&m_wcond, 0) == 0); } ~WorkQueue() @@ -88,19 +88,17 @@ public: */ bool start(int nworkers, void *(*start_routine)(void *), void *arg) { - pthread_mutex_lock(&m_mutex); + PTMutexLocker lock(m_mutex); for (int i = 0; i < nworkers; i++) { int err; pthread_t thr; if ((err = pthread_create(&thr, 0, start_routine, arg))) { LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n", m_name.c_str(), err)); - pthread_mutex_unlock(&m_mutex); return false; } m_worker_threads.insert(pair(thr, WQTData())); } - pthread_mutex_unlock(&m_mutex); return true; } @@ -110,7 +108,8 @@ public: */ bool put(T t) { - if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { + PTMutexLocker lock(m_mutex); + if (!lock.ok() || !ok()) { LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n", m_name.c_str())); return false; @@ -119,9 +118,8 @@ public: while (ok() && m_high > 0 && m_queue.size() >= m_high) { // Keep the order: we test ok() AFTER the sleep... m_clients_waiting++; - if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) { + if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) { m_clients_waiting--; - pthread_mutex_unlock(&m_mutex); return false; } m_clients_waiting--; @@ -135,7 +133,6 @@ public: m_nowake++; } - pthread_mutex_unlock(&m_mutex); return true; } @@ -152,7 +149,8 @@ public: */ bool waitIdle() { - if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { + PTMutexLocker lock(m_mutex); + if (!lock.ok() || !ok()) { LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n", m_name.c_str())); return false; @@ -163,18 +161,16 @@ public: while (ok() && (m_queue.size() > 0 || m_workers_waiting != m_worker_threads.size())) { m_clients_waiting++; - if (pthread_cond_wait(&m_ccond, &m_mutex)) { + if (pthread_cond_wait(&m_ccond, lock.getMutex())) { m_clients_waiting--; m_ok = false; LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n", m_name.c_str())); - pthread_mutex_unlock(&m_mutex); return false; } m_clients_waiting--; } - pthread_mutex_unlock(&m_mutex); return ok(); } @@ -185,8 +181,8 @@ public: */ void* setTerminateAndWait() { + PTMutexLocker lock(m_mutex); LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str())); - pthread_mutex_lock(&m_mutex); if (m_worker_threads.empty()) { // Already called ? @@ -198,8 +194,7 @@ public: while (m_workers_exited < m_worker_threads.size()) { pthread_cond_broadcast(&m_wcond); m_clients_waiting++; - if (pthread_cond_wait(&m_ccond, &m_mutex)) { - pthread_mutex_unlock(&m_mutex); + if (pthread_cond_wait(&m_ccond, lock.getMutex())) { LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n", m_name.c_str())); m_clients_waiting--; @@ -222,8 +217,13 @@ public: statusall = status; m_worker_threads.erase(it); } + + // Reset to start state. + m_workers_waiting = m_workers_exited = m_clients_waiting = m_tottasks = + m_nowake = m_workersleeps = 0; + m_ok = true; + LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str())); - pthread_mutex_unlock(&m_mutex); return statusall; } @@ -234,7 +234,8 @@ public: */ bool take(T* tp, size_t *szp = 0) { - if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { + PTMutexLocker lock(m_mutex); + if (!lock.ok() || !ok()) { LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str())); return false; } @@ -244,13 +245,12 @@ public: m_workers_waiting++; if (m_queue.empty()) pthread_cond_broadcast(&m_ccond); - if (pthread_cond_wait(&m_wcond, &m_mutex) || !ok()) { + if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) { // !ok is a normal condition when shutting down if (ok()) LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", m_name.c_str())); m_workers_waiting--; - pthread_mutex_unlock(&m_mutex); return false; } m_workers_waiting--; @@ -267,7 +267,6 @@ public: } else { m_nowake++; } - pthread_mutex_unlock(&m_mutex); return true; } @@ -281,19 +280,16 @@ public: void workerExit() { LOGDEB(("workerExit:%s\n", m_name.c_str())); - if (pthread_mutex_lock(&m_mutex) != 0) - return; + PTMutexLocker lock(m_mutex); m_workers_exited++; m_ok = false; pthread_cond_broadcast(&m_ccond); - pthread_mutex_unlock(&m_mutex); } size_t qsize() { - pthread_mutex_lock(&m_mutex); + PTMutexLocker lock(m_mutex); size_t sz = m_queue.size(); - pthread_mutex_unlock(&m_mutex); return sz; } @@ -330,7 +326,7 @@ private: queue m_queue; pthread_cond_t m_ccond; pthread_cond_t m_wcond; - pthread_mutex_t m_mutex; + PTMutexInit m_mutex; unsigned int m_clients_waiting; unsigned int m_tottasks; unsigned int m_nowake;