diff --git a/src/index/recollindex.cpp b/src/index/recollindex.cpp index b823495f..bcdf4e59 100644 --- a/src/index/recollindex.cpp +++ b/src/index/recollindex.cpp @@ -364,11 +364,15 @@ int main(int argc, char **argv) } o_reexec->atexit(cleanup); - bool rezero(op_flags & OPT_z); - bool inPlaceReset(op_flags & OPT_Z); + bool rezero((op_flags & OPT_z) != 0); + bool inPlaceReset((op_flags & OPT_Z) != 0); Pidfile pidfile(config->getPidfile()); updater = new MyUpdater(config); + // Log something at LOGINFO to reset the trace file. Else at level + // 3 it's not even truncated if all docs are up to date. + LOGINFO(("recollindex: starting up\n")); + if (setpriority(PRIO_PROCESS, 0, 20) != 0) { LOGINFO(("recollindex: can't setpriority(), errno %d\n", errno)); } @@ -417,7 +421,7 @@ int main(int argc, char **argv) exit(!createstemdb(config, lang)); #ifdef RCL_USE_ASPELL } else if (op_flags & OPT_S) { - makeIndexerOrExit(config, inPlaceReset); + makeIndexerOrExit(config, false); exit(!confindexer->createAspellDict()); #endif // ASPELL diff --git a/src/internfile/mimehandler.cpp b/src/internfile/mimehandler.cpp index cdc0304b..fda87fc0 100644 --- a/src/internfile/mimehandler.cpp +++ b/src/internfile/mimehandler.cpp @@ -102,9 +102,9 @@ void returnMimeHandler(Dijon::Filter *handler) if (once) { once = 0; for (it = o_handlers.begin(); it != o_handlers.end(); it++) { - LOGERR(("Cache full key: %s\n", it->first.c_str())); + LOGDEB1(("Cache full. key: %s\n", it->first.c_str())); } - LOGERR(("Cache LRU size: %u\n", o_hlru.size())); + LOGDEB1(("Cache LRU size: %u\n", o_hlru.size())); } if (o_hlru.size() > 0) { it = o_hlru.back(); diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index 0b4c6b48..22ab241d 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -1097,8 +1097,11 @@ bool Db::addOrUpdate(RclConfig *config, const string &udi, } bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, - Xapian::Document& newdocument, size_t textlen) + Xapian::Document& newdocument, size_t textlen) { +#ifdef IDX_THREADS + Chrono chron; +#endif // Check file system full every mbyte of indexed text. It's a bit wasteful // to do this after having prepared the document, but it needs to be in // the single-threaded section. @@ -1123,6 +1126,11 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, try { Xapian::docid did = xwdb.replace_document(uniterm, newdocument); +#ifdef IDX_THREADS + // Need to protect against interaction with the up-to-date checks + // which also update the existence map + PTMutexLocker lock(m_rcldb->m_ndb->m_mutex); +#endif if (did < m_rcldb->updated.size()) { m_rcldb->updated[did] = true; LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc)); @@ -1147,13 +1155,18 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, } // Test if we're over the flush threshold (limit memory usage): - return m_rcldb->maybeflush(textlen); + bool ret = m_rcldb->maybeflush(textlen); +#ifdef IDX_THREADS + m_totalworkus += chron.micros(); +#endif + return ret; } #ifdef IDX_THREADS void Db::waitUpdIdle() { m_ndb->m_wqueue.waitIdle(); + LOGDEB(("Db::waitUpdIdle: total work %ll mS\n", m_ndb->m_totalworkus/1000)); } #endif @@ -1185,10 +1198,10 @@ bool Db::needUpdate(const string &udi, const string& sig) if (m_ndb == 0) return false; - // If we are doing an in place reset, no need to test. Note that there is - // no need to update the existence map either, it will be done while - // indexing - if (o_inPlaceReset) + // If we are doing an in place or full reset, no need to + // test. Note that there is no need to update the existence map + // either, it will be done when updating the index + if (o_inPlaceReset || m_mode == DbTrunc) return true; string uniterm = make_uniterm(udi); @@ -1202,7 +1215,7 @@ bool Db::needUpdate(const string &udi, const string& sig) for (int tries = 0; tries < 2; tries++) { try { // Get the doc or pseudo-doc - Xapian::PostingIterator docid =m_ndb->xrdb.postlist_begin(uniterm); + Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm); if (docid == m_ndb->xrdb.postlist_end(uniterm)) { // If no document exist with this path, we do need update LOGDEB(("Db::needUpdate:yes (new): [%s]\n", uniterm.c_str())); @@ -1228,7 +1241,12 @@ bool Db::needUpdate(const string &udi, const string& sig) // Set the uptodate flag for doc / pseudo doc if (m_mode != DbRO) { -#warning we need a lock here ! +#ifdef IDX_THREADS + // Need to protect against interaction with the doc + // update/insert thread which also updates the + // existence map + PTMutexLocker lock(m_ndb->m_mutex); +#endif updated[*docid] = true; // Set the existence flag for all the subdocs (if any) @@ -1336,6 +1354,8 @@ bool Db::purge() // Walk the document array and delete any xapian document whose // flag is not set (we did not see its source during indexing). + // Threads: we do not need a mutex here as the indexing threads + // are necessarily done at this point. int purgecount = 0; for (Xapian::docid docid = 1; docid < updated.size(); ++docid) { if (!updated[docid]) { diff --git a/src/rcldb/rcldb.h b/src/rcldb/rcldb.h index 5a238831..fa9dda09 100644 --- a/src/rcldb/rcldb.h +++ b/src/rcldb/rcldb.h @@ -371,7 +371,7 @@ class Db { Activate the "in place reset" mode where all documents are considered as needing update. This is a global/per-process option, and can't be reset. It should be set at the start of - the indexing pass + the indexing pass. 2012-10: no idea why this is done this way... */ static void setInPlaceReset() {o_inPlaceReset = true;} diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h index bb648209..d5af6152 100644 --- a/src/rcldb/rcldb_p.h +++ b/src/rcldb/rcldb_p.h @@ -63,6 +63,7 @@ class Db::Native { WorkQueue m_wqueue; int m_loglevel; PTMutexInit m_mutex; + long long m_totalworkus; #endif // IDX_THREADS // Indexing @@ -78,7 +79,7 @@ class Db::Native { : m_rcldb(db), m_isopen(false), m_iswritable(false), m_noversionwrite(false) #ifdef IDX_THREADS - , m_wqueue("DbUpd", 2) + , m_wqueue("DbUpd", 2), m_totalworkus(0LL) #endif // IDX_THREADS { LOGDEB2(("Native::Native: me %p\n", this)); diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h index 102f6cd8..d604dfb8 100644 --- a/src/utils/workqueue.h +++ b/src/utils/workqueue.h @@ -31,8 +31,6 @@ using std::string; #include "debuglog.h" -#define WORKQUEUE_TIMING - class WQTData { public: WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;} @@ -63,10 +61,11 @@ public: */ WorkQueue(const string& name, int hi = 0, int lo = 1) : m_name(name), m_high(hi), m_low(lo), m_size(0), - m_workers_waiting(0), m_workers_exited(0), m_jobcnt(0), - m_clientwait(0), m_workerwait(0), m_workerwork(0) + m_workers_waiting(0), m_workers_exited(0), + m_clients_waiting(0), m_tottasks(0), m_nowake(0) { - m_ok = (pthread_cond_init(&m_cond, 0) == 0) && + m_ok = (pthread_cond_init(&m_ccond, 0) == 0) && + (pthread_cond_init(&m_wcond, 0) == 0) && (pthread_mutex_init(&m_mutex, 0) == 0); } @@ -109,29 +108,26 @@ public: if (!ok() || pthread_mutex_lock(&m_mutex) != 0) return false; -#ifdef WORKQUEUE_TIMING - struct timespec before; - clock_gettime(CLOCK_MONOTONIC, &before); -#endif - while (ok() && m_high > 0 && m_queue.size() >= m_high) { // Keep the order: we test ok() AFTER the sleep... - if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) { + m_clients_waiting++; + if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) { pthread_mutex_unlock(&m_mutex); + m_clients_waiting--; return false; } + m_clients_waiting--; } -#ifdef WORKQUEUE_TIMING - struct timespec after; - clock_gettime(CLOCK_MONOTONIC, &after); - m_clientwait += nanodiff(before, after); -#endif - m_queue.push(t); ++m_size; - // Just wake one worker, there is only one new task. - pthread_cond_signal(&m_cond); + if (m_workers_waiting > 0) { + // Just wake one worker, there is only one new task. + pthread_cond_signal(&m_wcond); + } else { + m_nowake++; + } + pthread_mutex_unlock(&m_mutex); return true; } @@ -156,23 +152,17 @@ public: // waiting for a task. while (ok() && (m_queue.size() > 0 || m_workers_waiting != m_worker_threads.size())) { - if (pthread_cond_wait(&m_cond, &m_mutex)) { + m_clients_waiting++; + if (pthread_cond_wait(&m_ccond, &m_mutex)) { + m_clients_waiting--; pthread_mutex_unlock(&m_mutex); m_ok = false; LOGERR(("WorkQueue::waitIdle: cond_wait failed\n")); return false; } + m_clients_waiting--; } -#ifdef WORKQUEUE_TIMING - long long M = 1000000LL; - long long wscl = m_worker_threads.size() * M; - LOGERR(("WorkQueue:%s: clients wait (all) %lld mS, " - "worker wait (avg) %lld mS, worker work (avg) %lld mS\n", - m_name.c_str(), m_clientwait / M, m_workerwait / wscl, - m_workerwork / wscl)); -#endif // WORKQUEUE_TIMING - pthread_mutex_unlock(&m_mutex); return ok(); } @@ -195,14 +185,19 @@ public: // Wait for all worker threads to have called workerExit() m_ok = false; while (m_workers_exited < m_worker_threads.size()) { - pthread_cond_broadcast(&m_cond); - if (pthread_cond_wait(&m_cond, &m_mutex)) { + pthread_cond_broadcast(&m_wcond); + m_clients_waiting++; + if (pthread_cond_wait(&m_ccond, &m_mutex)) { pthread_mutex_unlock(&m_mutex); LOGERR(("WorkQueue::setTerminate: cond_wait failed\n")); + m_clients_waiting--; return false; } + m_clients_waiting--; } + LOGDEB(("%s: %u tasks %u nowakes\n", m_name.c_str(), m_tottasks, + m_nowake)); // Perform the thread joins and compute overall status // Workers return (void*)1 if ok void *statusall = (void*)1; @@ -230,25 +225,11 @@ public: if (!ok() || pthread_mutex_lock(&m_mutex) != 0) return false; -#ifdef WORKQUEUE_TIMING - struct timespec beforesleep; - clock_gettime(CLOCK_MONOTONIC, &beforesleep); - - pthread_t me = pthread_self(); - unordered_map::iterator it = - m_worker_threads.find(me); - if (it != m_worker_threads.end() && - it->second.wstart.tv_sec != 0 && it->second.wstart.tv_nsec != 0) { - long long nanos = nanodiff(it->second.wstart, beforesleep); - m_workerwork += nanos; - } -#endif - while (ok() && m_queue.size() < m_low) { m_workers_waiting++; if (m_queue.empty()) - pthread_cond_broadcast(&m_cond); - if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) { + pthread_cond_broadcast(&m_ccond); + if (pthread_cond_wait(&m_wcond, &m_mutex) || !ok()) { // !ok is a normal condition when shutting down if (ok()) LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", @@ -260,21 +241,16 @@ public: m_workers_waiting--; } -#ifdef WORKQUEUE_TIMING - struct timespec aftersleep; - clock_gettime(CLOCK_MONOTONIC, &aftersleep); - m_workerwait += nanodiff(beforesleep, aftersleep); - it = m_worker_threads.find(me); - if (it != m_worker_threads.end()) - it->second.wstart = aftersleep; -#endif - - ++m_jobcnt; + m_tottasks++; *tp = m_queue.front(); m_queue.pop(); --m_size; - // No reason to wake up more than one client thread - pthread_cond_signal(&m_cond); + if (m_clients_waiting > 0) { + // No reason to wake up more than one client thread + pthread_cond_signal(&m_ccond); + } else { + m_nowake++; + } pthread_mutex_unlock(&m_mutex); return true; } @@ -292,7 +268,7 @@ public: return; m_workers_exited++; m_ok = false; - pthread_cond_broadcast(&m_cond); + pthread_cond_broadcast(&m_ccond); pthread_mutex_unlock(&m_mutex); } @@ -327,16 +303,15 @@ private: /* Worker threads currently waiting for a job */ unsigned int m_workers_waiting; unsigned int m_workers_exited; - /* Stats */ - int m_jobcnt; - long long m_clientwait; - long long m_workerwait; - long long m_workerwork; unordered_map m_worker_threads; queue m_queue; - pthread_cond_t m_cond; + pthread_cond_t m_ccond; + pthread_cond_t m_wcond; pthread_mutex_t m_mutex; + unsigned int m_clients_waiting; + unsigned int m_tottasks; + unsigned int m_nowake; bool m_ok; };