Use PTMutexLocker in workqueue, and make it restartable

This commit is contained in:
Jean-Francois Dockes 2012-11-30 07:35:03 +01:00
parent 2f800e3eb3
commit 45d56f17de
6 changed files with 38 additions and 35 deletions

View file

@ -1197,7 +1197,6 @@ void RclConfig::initFrom(const RclConfig& r)
m_datadir = r.m_datadir; m_datadir = r.m_datadir;
m_keydir = r.m_keydir; m_keydir = r.m_keydir;
m_cdirs = r.m_cdirs; m_cdirs = r.m_cdirs;
// We should use reference-counted objects instead!
if (r.m_conf) if (r.m_conf)
m_conf = new ConfStack<ConfTree>(*(r.m_conf)); m_conf = new ConfStack<ConfTree>(*(r.m_conf));
if (r.mimemap) if (r.mimemap)

View file

@ -64,6 +64,7 @@ bool ConfIndexer::index(bool resetbefore, ixType typestorun)
deleteZ(m_fsindexer); deleteZ(m_fsindexer);
m_fsindexer = new FsIndexer(m_config, &m_db, m_updater); m_fsindexer = new FsIndexer(m_config, &m_db, m_updater);
if (!m_fsindexer || !m_fsindexer->index()) { if (!m_fsindexer || !m_fsindexer->index()) {
m_db.close();
return false; return false;
} }
} }
@ -72,6 +73,7 @@ bool ConfIndexer::index(bool resetbefore, ixType typestorun)
deleteZ(m_beagler); deleteZ(m_beagler);
m_beagler = new BeagleQueueIndexer(m_config, &m_db, m_updater); m_beagler = new BeagleQueueIndexer(m_config, &m_db, m_updater);
if (!m_beagler || !m_beagler->index()) { if (!m_beagler || !m_beagler->index()) {
m_db.close();
return false; return false;
} }
} }
@ -79,8 +81,10 @@ bool ConfIndexer::index(bool resetbefore, ixType typestorun)
if (typestorun == IxTAll) { if (typestorun == IxTAll) {
// Get rid of all database entries that don't exist in the // Get rid of all database entries that don't exist in the
// filesystem anymore. Only if all *configured* indexers ran. // filesystem anymore. Only if all *configured* indexers ran.
if (m_updater && !m_updater->update(DbIxStatus::DBIXS_PURGE, string())) if (m_updater && !m_updater->update(DbIxStatus::DBIXS_PURGE, string())) {
m_db.close();
return false; return false;
}
m_db.purge(); m_db.purge();
} }

View file

@ -471,8 +471,7 @@ int main(int argc, char **argv)
} }
deleteZ(confindexer); deleteZ(confindexer);
o_reexec->insertArgs(vector<string>(1, "-n")); o_reexec->insertArgs(vector<string>(1, "-n"));
LOGDEB(("recollindex: calling reexec after init path with " LOGINFO(("recollindex: reexecuting with -n after initial full pass\n"));
"option -n\n"));
// Note that -n will be inside the reexec when we come // Note that -n will be inside the reexec when we come
// back, but the monitor will explicitely strip it before // back, but the monitor will explicitely strip it before
// starting a config change exec to ensure that we do a // starting a config change exec to ensure that we do a

View file

@ -131,8 +131,7 @@ Db::Native::Native(Db *db)
: m_rcldb(db), m_isopen(false), m_iswritable(false), : m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false) m_noversionwrite(false)
#ifdef IDX_THREADS #ifdef IDX_THREADS
, m_wqueue("DbUpd", , m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
m_totalworkns(0LL) m_totalworkns(0LL)
#endif // IDX_THREADS #endif // IDX_THREADS
{ {
@ -477,7 +476,7 @@ bool Db::open(OpenMode mode, OpenError *error)
// Note: xapian has no close call, we delete and recreate the db // Note: xapian has no close call, we delete and recreate the db
bool Db::close() bool Db::close()
{ {
LOGDEB2(("Db::close()\n")); LOGDEB1(("Db::close()\n"));
return i_close(false); return i_close(false);
} }
bool Db::i_close(bool final) bool Db::i_close(bool final)

View file

@ -27,9 +27,10 @@
class PTMutexInit { class PTMutexInit {
public: public:
pthread_mutex_t m_mutex; pthread_mutex_t m_mutex;
int m_status;
PTMutexInit() PTMutexInit()
{ {
pthread_mutex_init(&m_mutex, 0); m_status = pthread_mutex_init(&m_mutex, 0);
} }
}; };
@ -50,6 +51,11 @@ public:
pthread_mutex_unlock(&m_lock.m_mutex); pthread_mutex_unlock(&m_lock.m_mutex);
} }
int ok() {return m_status == 0;} int ok() {return m_status == 0;}
// For pthread_cond_wait etc.
pthread_mutex_t *getMutex()
{
return &m_lock.m_mutex;
}
private: private:
PTMutexInit& m_lock; PTMutexInit& m_lock;
int m_status; int m_status;

View file

@ -30,6 +30,7 @@ using std::queue;
using std::string; using std::string;
#include "debuglog.h" #include "debuglog.h"
#include "ptmutex.h"
/// Just an initialized timespec. Not really used any more. /// Just an initialized timespec. Not really used any more.
class WQTData { class WQTData {
@ -67,8 +68,7 @@ public:
m_workersleeps(0) m_workersleeps(0)
{ {
m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) && m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) &&
(pthread_cond_init(&m_wcond, 0) == 0) && (pthread_cond_init(&m_wcond, 0) == 0);
(pthread_mutex_init(&m_mutex, 0) == 0);
} }
~WorkQueue() ~WorkQueue()
@ -88,19 +88,17 @@ public:
*/ */
bool start(int nworkers, void *(*start_routine)(void *), void *arg) bool start(int nworkers, void *(*start_routine)(void *), void *arg)
{ {
pthread_mutex_lock(&m_mutex); PTMutexLocker lock(m_mutex);
for (int i = 0; i < nworkers; i++) { for (int i = 0; i < nworkers; i++) {
int err; int err;
pthread_t thr; pthread_t thr;
if ((err = pthread_create(&thr, 0, start_routine, arg))) { if ((err = pthread_create(&thr, 0, start_routine, arg))) {
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n", LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
m_name.c_str(), err)); m_name.c_str(), err));
pthread_mutex_unlock(&m_mutex);
return false; return false;
} }
m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData())); m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
} }
pthread_mutex_unlock(&m_mutex);
return true; return true;
} }
@ -110,7 +108,8 @@ public:
*/ */
bool put(T t) bool put(T t)
{ {
if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { PTMutexLocker lock(m_mutex);
if (!lock.ok() || !ok()) {
LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n", LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
m_name.c_str())); m_name.c_str()));
return false; return false;
@ -119,9 +118,8 @@ public:
while (ok() && m_high > 0 && m_queue.size() >= m_high) { while (ok() && m_high > 0 && m_queue.size() >= m_high) {
// Keep the order: we test ok() AFTER the sleep... // Keep the order: we test ok() AFTER the sleep...
m_clients_waiting++; m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) { if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
m_clients_waiting--; m_clients_waiting--;
pthread_mutex_unlock(&m_mutex);
return false; return false;
} }
m_clients_waiting--; m_clients_waiting--;
@ -135,7 +133,6 @@ public:
m_nowake++; m_nowake++;
} }
pthread_mutex_unlock(&m_mutex);
return true; return true;
} }
@ -152,7 +149,8 @@ public:
*/ */
bool waitIdle() bool waitIdle()
{ {
if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { PTMutexLocker lock(m_mutex);
if (!lock.ok() || !ok()) {
LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n", LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
m_name.c_str())); m_name.c_str()));
return false; return false;
@ -163,18 +161,16 @@ public:
while (ok() && (m_queue.size() > 0 || while (ok() && (m_queue.size() > 0 ||
m_workers_waiting != m_worker_threads.size())) { m_workers_waiting != m_worker_threads.size())) {
m_clients_waiting++; m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex)) { if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
m_clients_waiting--; m_clients_waiting--;
m_ok = false; m_ok = false;
LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n", LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n",
m_name.c_str())); m_name.c_str()));
pthread_mutex_unlock(&m_mutex);
return false; return false;
} }
m_clients_waiting--; m_clients_waiting--;
} }
pthread_mutex_unlock(&m_mutex);
return ok(); return ok();
} }
@ -185,8 +181,8 @@ public:
*/ */
void* setTerminateAndWait() void* setTerminateAndWait()
{ {
PTMutexLocker lock(m_mutex);
LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str())); LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
pthread_mutex_lock(&m_mutex);
if (m_worker_threads.empty()) { if (m_worker_threads.empty()) {
// Already called ? // Already called ?
@ -198,8 +194,7 @@ public:
while (m_workers_exited < m_worker_threads.size()) { while (m_workers_exited < m_worker_threads.size()) {
pthread_cond_broadcast(&m_wcond); pthread_cond_broadcast(&m_wcond);
m_clients_waiting++; m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex)) { if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
pthread_mutex_unlock(&m_mutex);
LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n", LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n",
m_name.c_str())); m_name.c_str()));
m_clients_waiting--; m_clients_waiting--;
@ -222,8 +217,13 @@ public:
statusall = status; statusall = status;
m_worker_threads.erase(it); m_worker_threads.erase(it);
} }
// Reset to start state.
m_workers_waiting = m_workers_exited = m_clients_waiting = m_tottasks =
m_nowake = m_workersleeps = 0;
m_ok = true;
LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str())); LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
pthread_mutex_unlock(&m_mutex);
return statusall; return statusall;
} }
@ -234,7 +234,8 @@ public:
*/ */
bool take(T* tp, size_t *szp = 0) bool take(T* tp, size_t *szp = 0)
{ {
if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { PTMutexLocker lock(m_mutex);
if (!lock.ok() || !ok()) {
LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str())); LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str()));
return false; return false;
} }
@ -244,13 +245,12 @@ public:
m_workers_waiting++; m_workers_waiting++;
if (m_queue.empty()) if (m_queue.empty())
pthread_cond_broadcast(&m_ccond); pthread_cond_broadcast(&m_ccond);
if (pthread_cond_wait(&m_wcond, &m_mutex) || !ok()) { if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
// !ok is a normal condition when shutting down // !ok is a normal condition when shutting down
if (ok()) if (ok())
LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
m_name.c_str())); m_name.c_str()));
m_workers_waiting--; m_workers_waiting--;
pthread_mutex_unlock(&m_mutex);
return false; return false;
} }
m_workers_waiting--; m_workers_waiting--;
@ -267,7 +267,6 @@ public:
} else { } else {
m_nowake++; m_nowake++;
} }
pthread_mutex_unlock(&m_mutex);
return true; return true;
} }
@ -281,19 +280,16 @@ public:
void workerExit() void workerExit()
{ {
LOGDEB(("workerExit:%s\n", m_name.c_str())); LOGDEB(("workerExit:%s\n", m_name.c_str()));
if (pthread_mutex_lock(&m_mutex) != 0) PTMutexLocker lock(m_mutex);
return;
m_workers_exited++; m_workers_exited++;
m_ok = false; m_ok = false;
pthread_cond_broadcast(&m_ccond); pthread_cond_broadcast(&m_ccond);
pthread_mutex_unlock(&m_mutex);
} }
size_t qsize() size_t qsize()
{ {
pthread_mutex_lock(&m_mutex); PTMutexLocker lock(m_mutex);
size_t sz = m_queue.size(); size_t sz = m_queue.size();
pthread_mutex_unlock(&m_mutex);
return sz; return sz;
} }
@ -330,7 +326,7 @@ private:
queue<T> m_queue; queue<T> m_queue;
pthread_cond_t m_ccond; pthread_cond_t m_ccond;
pthread_cond_t m_wcond; pthread_cond_t m_wcond;
pthread_mutex_t m_mutex; PTMutexInit m_mutex;
unsigned int m_clients_waiting; unsigned int m_clients_waiting;
unsigned int m_tottasks; unsigned int m_tottasks;
unsigned int m_nowake; unsigned int m_nowake;