Multithreaded indexing seems not to crash anymore thanks to locked existence map

This commit is contained in:
Jean-Francois Dockes 2012-11-02 21:43:51 +01:00
parent a7304b9770
commit 570388de5a
6 changed files with 81 additions and 81 deletions

View file

@ -364,11 +364,15 @@ int main(int argc, char **argv)
} }
o_reexec->atexit(cleanup); o_reexec->atexit(cleanup);
bool rezero(op_flags & OPT_z); bool rezero((op_flags & OPT_z) != 0);
bool inPlaceReset(op_flags & OPT_Z); bool inPlaceReset((op_flags & OPT_Z) != 0);
Pidfile pidfile(config->getPidfile()); Pidfile pidfile(config->getPidfile());
updater = new MyUpdater(config); updater = new MyUpdater(config);
// Log something at LOGINFO to reset the trace file. Else at level
// 3 it's not even truncated if all docs are up to date.
LOGINFO(("recollindex: starting up\n"));
if (setpriority(PRIO_PROCESS, 0, 20) != 0) { if (setpriority(PRIO_PROCESS, 0, 20) != 0) {
LOGINFO(("recollindex: can't setpriority(), errno %d\n", errno)); LOGINFO(("recollindex: can't setpriority(), errno %d\n", errno));
} }
@ -417,7 +421,7 @@ int main(int argc, char **argv)
exit(!createstemdb(config, lang)); exit(!createstemdb(config, lang));
#ifdef RCL_USE_ASPELL #ifdef RCL_USE_ASPELL
} else if (op_flags & OPT_S) { } else if (op_flags & OPT_S) {
makeIndexerOrExit(config, inPlaceReset); makeIndexerOrExit(config, false);
exit(!confindexer->createAspellDict()); exit(!confindexer->createAspellDict());
#endif // ASPELL #endif // ASPELL

View file

@ -102,9 +102,9 @@ void returnMimeHandler(Dijon::Filter *handler)
if (once) { if (once) {
once = 0; once = 0;
for (it = o_handlers.begin(); it != o_handlers.end(); it++) { for (it = o_handlers.begin(); it != o_handlers.end(); it++) {
LOGERR(("Cache full key: %s\n", it->first.c_str())); LOGDEB1(("Cache full. key: %s\n", it->first.c_str()));
} }
LOGERR(("Cache LRU size: %u\n", o_hlru.size())); LOGDEB1(("Cache LRU size: %u\n", o_hlru.size()));
} }
if (o_hlru.size() > 0) { if (o_hlru.size() > 0) {
it = o_hlru.back(); it = o_hlru.back();

View file

@ -1097,8 +1097,11 @@ bool Db::addOrUpdate(RclConfig *config, const string &udi,
} }
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& newdocument, size_t textlen) Xapian::Document& newdocument, size_t textlen)
{ {
#ifdef IDX_THREADS
Chrono chron;
#endif
// Check file system full every mbyte of indexed text. It's a bit wasteful // Check file system full every mbyte of indexed text. It's a bit wasteful
// to do this after having prepared the document, but it needs to be in // to do this after having prepared the document, but it needs to be in
// the single-threaded section. // the single-threaded section.
@ -1123,6 +1126,11 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
try { try {
Xapian::docid did = Xapian::docid did =
xwdb.replace_document(uniterm, newdocument); xwdb.replace_document(uniterm, newdocument);
#ifdef IDX_THREADS
// Need to protect against interaction with the up-to-date checks
// which also update the existence map
PTMutexLocker lock(m_rcldb->m_ndb->m_mutex);
#endif
if (did < m_rcldb->updated.size()) { if (did < m_rcldb->updated.size()) {
m_rcldb->updated[did] = true; m_rcldb->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc)); LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
@ -1147,13 +1155,18 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
} }
// Test if we're over the flush threshold (limit memory usage): // Test if we're over the flush threshold (limit memory usage):
return m_rcldb->maybeflush(textlen); bool ret = m_rcldb->maybeflush(textlen);
#ifdef IDX_THREADS
m_totalworkus += chron.micros();
#endif
return ret;
} }
#ifdef IDX_THREADS #ifdef IDX_THREADS
void Db::waitUpdIdle() void Db::waitUpdIdle()
{ {
m_ndb->m_wqueue.waitIdle(); m_ndb->m_wqueue.waitIdle();
LOGDEB(("Db::waitUpdIdle: total work %ll mS\n", m_ndb->m_totalworkus/1000));
} }
#endif #endif
@ -1185,10 +1198,10 @@ bool Db::needUpdate(const string &udi, const string& sig)
if (m_ndb == 0) if (m_ndb == 0)
return false; return false;
// If we are doing an in place reset, no need to test. Note that there is // If we are doing an in place or full reset, no need to
// no need to update the existence map either, it will be done while // test. Note that there is no need to update the existence map
// indexing // either, it will be done when updating the index
if (o_inPlaceReset) if (o_inPlaceReset || m_mode == DbTrunc)
return true; return true;
string uniterm = make_uniterm(udi); string uniterm = make_uniterm(udi);
@ -1202,7 +1215,7 @@ bool Db::needUpdate(const string &udi, const string& sig)
for (int tries = 0; tries < 2; tries++) { for (int tries = 0; tries < 2; tries++) {
try { try {
// Get the doc or pseudo-doc // Get the doc or pseudo-doc
Xapian::PostingIterator docid =m_ndb->xrdb.postlist_begin(uniterm); Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
if (docid == m_ndb->xrdb.postlist_end(uniterm)) { if (docid == m_ndb->xrdb.postlist_end(uniterm)) {
// If no document exist with this path, we do need update // If no document exist with this path, we do need update
LOGDEB(("Db::needUpdate:yes (new): [%s]\n", uniterm.c_str())); LOGDEB(("Db::needUpdate:yes (new): [%s]\n", uniterm.c_str()));
@ -1228,7 +1241,12 @@ bool Db::needUpdate(const string &udi, const string& sig)
// Set the uptodate flag for doc / pseudo doc // Set the uptodate flag for doc / pseudo doc
if (m_mode != DbRO) { if (m_mode != DbRO) {
#warning we need a lock here ! #ifdef IDX_THREADS
// Need to protect against interaction with the doc
// update/insert thread which also updates the
// existence map
PTMutexLocker lock(m_ndb->m_mutex);
#endif
updated[*docid] = true; updated[*docid] = true;
// Set the existence flag for all the subdocs (if any) // Set the existence flag for all the subdocs (if any)
@ -1336,6 +1354,8 @@ bool Db::purge()
// Walk the document array and delete any xapian document whose // Walk the document array and delete any xapian document whose
// flag is not set (we did not see its source during indexing). // flag is not set (we did not see its source during indexing).
// Threads: we do not need a mutex here as the indexing threads
// are necessarily done at this point.
int purgecount = 0; int purgecount = 0;
for (Xapian::docid docid = 1; docid < updated.size(); ++docid) { for (Xapian::docid docid = 1; docid < updated.size(); ++docid) {
if (!updated[docid]) { if (!updated[docid]) {

View file

@ -371,7 +371,7 @@ class Db {
Activate the "in place reset" mode where all documents are Activate the "in place reset" mode where all documents are
considered as needing update. This is a global/per-process considered as needing update. This is a global/per-process
option, and can't be reset. It should be set at the start of option, and can't be reset. It should be set at the start of
the indexing pass the indexing pass. 2012-10: no idea why this is done this way...
*/ */
static void setInPlaceReset() {o_inPlaceReset = true;} static void setInPlaceReset() {o_inPlaceReset = true;}

View file

@ -63,6 +63,7 @@ class Db::Native {
WorkQueue<DbUpdTask*> m_wqueue; WorkQueue<DbUpdTask*> m_wqueue;
int m_loglevel; int m_loglevel;
PTMutexInit m_mutex; PTMutexInit m_mutex;
long long m_totalworkus;
#endif // IDX_THREADS #endif // IDX_THREADS
// Indexing // Indexing
@ -78,7 +79,7 @@ class Db::Native {
: m_rcldb(db), m_isopen(false), m_iswritable(false), : m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false) m_noversionwrite(false)
#ifdef IDX_THREADS #ifdef IDX_THREADS
, m_wqueue("DbUpd", 2) , m_wqueue("DbUpd", 2), m_totalworkus(0LL)
#endif // IDX_THREADS #endif // IDX_THREADS
{ {
LOGDEB2(("Native::Native: me %p\n", this)); LOGDEB2(("Native::Native: me %p\n", this));

View file

@ -31,8 +31,6 @@ using std::string;
#include "debuglog.h" #include "debuglog.h"
#define WORKQUEUE_TIMING
class WQTData { class WQTData {
public: public:
WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;} WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
@ -63,10 +61,11 @@ public:
*/ */
WorkQueue(const string& name, int hi = 0, int lo = 1) WorkQueue(const string& name, int hi = 0, int lo = 1)
: m_name(name), m_high(hi), m_low(lo), m_size(0), : m_name(name), m_high(hi), m_low(lo), m_size(0),
m_workers_waiting(0), m_workers_exited(0), m_jobcnt(0), m_workers_waiting(0), m_workers_exited(0),
m_clientwait(0), m_workerwait(0), m_workerwork(0) m_clients_waiting(0), m_tottasks(0), m_nowake(0)
{ {
m_ok = (pthread_cond_init(&m_cond, 0) == 0) && m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
(pthread_cond_init(&m_wcond, 0) == 0) &&
(pthread_mutex_init(&m_mutex, 0) == 0); (pthread_mutex_init(&m_mutex, 0) == 0);
} }
@ -109,29 +108,26 @@ public:
if (!ok() || pthread_mutex_lock(&m_mutex) != 0) if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false; return false;
#ifdef WORKQUEUE_TIMING
struct timespec before;
clock_gettime(CLOCK_MONOTONIC, &before);
#endif
while (ok() && m_high > 0 && m_queue.size() >= m_high) { while (ok() && m_high > 0 && m_queue.size() >= m_high) {
// Keep the order: we test ok() AFTER the sleep... // Keep the order: we test ok() AFTER the sleep...
if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) { m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) {
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
m_clients_waiting--;
return false; return false;
} }
m_clients_waiting--;
} }
#ifdef WORKQUEUE_TIMING
struct timespec after;
clock_gettime(CLOCK_MONOTONIC, &after);
m_clientwait += nanodiff(before, after);
#endif
m_queue.push(t); m_queue.push(t);
++m_size; ++m_size;
// Just wake one worker, there is only one new task. if (m_workers_waiting > 0) {
pthread_cond_signal(&m_cond); // Just wake one worker, there is only one new task.
pthread_cond_signal(&m_wcond);
} else {
m_nowake++;
}
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
return true; return true;
} }
@ -156,23 +152,17 @@ public:
// waiting for a task. // waiting for a task.
while (ok() && (m_queue.size() > 0 || while (ok() && (m_queue.size() > 0 ||
m_workers_waiting != m_worker_threads.size())) { m_workers_waiting != m_worker_threads.size())) {
if (pthread_cond_wait(&m_cond, &m_mutex)) { m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex)) {
m_clients_waiting--;
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
m_ok = false; m_ok = false;
LOGERR(("WorkQueue::waitIdle: cond_wait failed\n")); LOGERR(("WorkQueue::waitIdle: cond_wait failed\n"));
return false; return false;
} }
m_clients_waiting--;
} }
#ifdef WORKQUEUE_TIMING
long long M = 1000000LL;
long long wscl = m_worker_threads.size() * M;
LOGERR(("WorkQueue:%s: clients wait (all) %lld mS, "
"worker wait (avg) %lld mS, worker work (avg) %lld mS\n",
m_name.c_str(), m_clientwait / M, m_workerwait / wscl,
m_workerwork / wscl));
#endif // WORKQUEUE_TIMING
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
return ok(); return ok();
} }
@ -195,14 +185,19 @@ public:
// Wait for all worker threads to have called workerExit() // Wait for all worker threads to have called workerExit()
m_ok = false; m_ok = false;
while (m_workers_exited < m_worker_threads.size()) { while (m_workers_exited < m_worker_threads.size()) {
pthread_cond_broadcast(&m_cond); pthread_cond_broadcast(&m_wcond);
if (pthread_cond_wait(&m_cond, &m_mutex)) { m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex)) {
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
LOGERR(("WorkQueue::setTerminate: cond_wait failed\n")); LOGERR(("WorkQueue::setTerminate: cond_wait failed\n"));
m_clients_waiting--;
return false; return false;
} }
m_clients_waiting--;
} }
LOGDEB(("%s: %u tasks %u nowakes\n", m_name.c_str(), m_tottasks,
m_nowake));
// Perform the thread joins and compute overall status // Perform the thread joins and compute overall status
// Workers return (void*)1 if ok // Workers return (void*)1 if ok
void *statusall = (void*)1; void *statusall = (void*)1;
@ -230,25 +225,11 @@ public:
if (!ok() || pthread_mutex_lock(&m_mutex) != 0) if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false; return false;
#ifdef WORKQUEUE_TIMING
struct timespec beforesleep;
clock_gettime(CLOCK_MONOTONIC, &beforesleep);
pthread_t me = pthread_self();
unordered_map<pthread_t, WQTData>::iterator it =
m_worker_threads.find(me);
if (it != m_worker_threads.end() &&
it->second.wstart.tv_sec != 0 && it->second.wstart.tv_nsec != 0) {
long long nanos = nanodiff(it->second.wstart, beforesleep);
m_workerwork += nanos;
}
#endif
while (ok() && m_queue.size() < m_low) { while (ok() && m_queue.size() < m_low) {
m_workers_waiting++; m_workers_waiting++;
if (m_queue.empty()) if (m_queue.empty())
pthread_cond_broadcast(&m_cond); pthread_cond_broadcast(&m_ccond);
if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) { if (pthread_cond_wait(&m_wcond, &m_mutex) || !ok()) {
// !ok is a normal condition when shutting down // !ok is a normal condition when shutting down
if (ok()) if (ok())
LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
@ -260,21 +241,16 @@ public:
m_workers_waiting--; m_workers_waiting--;
} }
#ifdef WORKQUEUE_TIMING m_tottasks++;
struct timespec aftersleep;
clock_gettime(CLOCK_MONOTONIC, &aftersleep);
m_workerwait += nanodiff(beforesleep, aftersleep);
it = m_worker_threads.find(me);
if (it != m_worker_threads.end())
it->second.wstart = aftersleep;
#endif
++m_jobcnt;
*tp = m_queue.front(); *tp = m_queue.front();
m_queue.pop(); m_queue.pop();
--m_size; --m_size;
// No reason to wake up more than one client thread if (m_clients_waiting > 0) {
pthread_cond_signal(&m_cond); // No reason to wake up more than one client thread
pthread_cond_signal(&m_ccond);
} else {
m_nowake++;
}
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
return true; return true;
} }
@ -292,7 +268,7 @@ public:
return; return;
m_workers_exited++; m_workers_exited++;
m_ok = false; m_ok = false;
pthread_cond_broadcast(&m_cond); pthread_cond_broadcast(&m_ccond);
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
} }
@ -327,16 +303,15 @@ private:
/* Worker threads currently waiting for a job */ /* Worker threads currently waiting for a job */
unsigned int m_workers_waiting; unsigned int m_workers_waiting;
unsigned int m_workers_exited; unsigned int m_workers_exited;
/* Stats */
int m_jobcnt;
long long m_clientwait;
long long m_workerwait;
long long m_workerwork;
unordered_map<pthread_t, WQTData> m_worker_threads; unordered_map<pthread_t, WQTData> m_worker_threads;
queue<T> m_queue; queue<T> m_queue;
pthread_cond_t m_cond; pthread_cond_t m_ccond;
pthread_cond_t m_wcond;
pthread_mutex_t m_mutex; pthread_mutex_t m_mutex;
unsigned int m_clients_waiting;
unsigned int m_tottasks;
unsigned int m_nowake;
bool m_ok; bool m_ok;
}; };