diff --git a/src/index/fsindexer.cpp b/src/index/fsindexer.cpp index 920898ce..ba0f8b55 100644 --- a/src/index/fsindexer.cpp +++ b/src/index/fsindexer.cpp @@ -105,6 +105,7 @@ FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc) , m_iwqueue("Internfile", 2), m_dwqueue("Split", 2) #endif // IDX_THREADS { + LOGDEB1(("FsIndexer::FsIndexer\n")); m_havelocalfields = m_config->hasNameAnywhere("localfields"); #ifdef IDX_THREADS m_loglevel = DebugLog::getdbl()->getlevel(); @@ -119,13 +120,14 @@ FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc) #endif // IDX_THREADS } -FsIndexer::~FsIndexer() { +FsIndexer::~FsIndexer() +{ + LOGDEB1(("FsIndexer::~FsIndexer()\n")); #ifdef IDX_THREADS void *status = m_iwqueue.setTerminateAndWait(); - LOGINFO(("FsIndexer: internfile wrker status: %ld (1->ok)\n", - long(status))); + LOGDEB0(("FsIndexer: internfile wrkr status: %ld (1->ok)\n", long(status))); status = m_dwqueue.setTerminateAndWait(); - LOGINFO(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status))); + LOGDEB0(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status))); #endif // IDX_THREADS delete m_missing; } @@ -305,6 +307,11 @@ bool FsIndexer::indexFiles(list& files, ConfIndexer::IxFlag flag) it = files.erase(it); } +#ifdef IDX_THREADS + m_iwqueue.waitIdle(); + m_dwqueue.waitIdle(); + m_db->waitUpdIdle(); +#endif // IDX_THREADS return true; } @@ -314,6 +321,7 @@ bool FsIndexer::purgeFiles(list& files) { if (!init()) return false; + for (list::iterator it = files.begin(); it != files.end(); ) { string udi; make_udi(*it, cstr_null, udi); @@ -378,7 +386,7 @@ void *FsIndexerDbUpdWorker(void * fsp) tqp->workerExit(); return (void*)1; } - LOGDEB(("FsIndexerDbUpdWorker: got task, ql %d\n", int(qsz))); + LOGDEB0(("FsIndexerDbUpdWorker: task ql %d\n", int(qsz))); if (!fip->m_db->addOrUpdate(tsk->config, tsk->udi, tsk->parent_udi, tsk->doc)) { LOGERR(("FsIndexerDbUpdWorker: addOrUpdate failed\n")); @@ -404,7 +412,7 @@ void *FsIndexerInternfileWorker(void * fsp) tqp->workerExit(); return (void*)1; } - LOGDEB1(("FsIndexerInternfileWorker: fn %s\n", tsk->fn.c_str())); + LOGDEB0(("FsIndexerInternfileWorker: task fn %s\n", tsk->fn.c_str())); if (fip->processonefile(myconf, tmpdir, tsk->fn, &tsk->statbuf) != FsTreeWalker::FtwOk) { LOGERR(("FsIndexerInternfileWorker: processone failed\n")); diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index 3252248d..ef82c39b 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -127,6 +127,62 @@ static inline string make_parentterm(const string& udi) return pterm; } +#ifdef IDX_THREADS +void *DbUpdWorker(void* vdbp) +{ + recoll_threadinit(); + Db::Native *ndbp = (Db::Native *)vdbp; + WorkQueue *tqp = &(ndbp->m_wqueue); + DebugLog::getdbl()->setloglevel(ndbp->m_loglevel); + + DbUpdTask *tsk; + for (;;) { + size_t qsz; + if (!tqp->take(&tsk, &qsz)) { + tqp->workerExit(); + return (void*)1; + } + LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz))); + if (!ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm, + tsk->doc, tsk->txtlen)) { + LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n")); + tqp->workerExit(); + delete tsk; + return (void*)0; + } + delete tsk; + } +} +#endif // IDX_THREADS + +Db::Native::Native(Db *db) + : m_rcldb(db), m_isopen(false), m_iswritable(false), + m_noversionwrite(false) +#ifdef IDX_THREADS + , m_wqueue("DbUpd", 2), m_totalworkns(0LL) +#endif // IDX_THREADS +{ + LOGDEB1(("Native::Native: me %p\n", this)); +#ifdef IDX_THREADS + m_loglevel = DebugLog::getdbl()->getlevel(); + if (!m_wqueue.start(1, DbUpdWorker, this)) { + LOGERR(("Db::Db: Worker start failed\n")); + return; + } +#endif // IDX_THREADS +} + +Db::Native::~Native() +{ + LOGDEB1(("Native::~Native: me %p\n", this)); +#ifdef IDX_THREADS + if (m_iswritable) { + void *status = m_wqueue.setTerminateAndWait(); + LOGDEB2(("Native::~Native: worker status %ld\n", long(status))); + } +#endif // IDX_THREADS +} + /* See comment in class declaration: return all subdocuments of a * document given by its unique id. */ @@ -280,15 +336,6 @@ Db::Db(RclConfig *cfp) m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc); m_config->getConfParam("idxflushmb", &m_flushMb); } -#ifdef IDX_THREADS - if (m_ndb) { - m_ndb->m_loglevel = DebugLog::getdbl()->getlevel(); - if (!m_ndb->m_wqueue.start(1, DbUpdWorker, this)) { - LOGERR(("Db::Db: Worker start failed\n")); - return; - } - } -#endif // IDX_THREADS } Db::~Db() @@ -791,34 +838,6 @@ static const string cstr_nc("\n\r\x0c"); #define RECORD_APPEND(R, NM, VAL) {R += NM + "=" + VAL + "\n";} -#ifdef IDX_THREADS -void *DbUpdWorker(void* vdbp) -{ - recoll_threadinit(); - Db *dbp = (Db *)vdbp; - WorkQueue *tqp = &(dbp->m_ndb->m_wqueue); - DebugLog::getdbl()->setloglevel(dbp->m_ndb->m_loglevel); - - DbUpdTask *tsk; - for (;;) { - size_t qsz; - if (!tqp->take(&tsk, &qsz)) { - tqp->workerExit(); - return (void*)1; - } - LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz))); - if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm, - tsk->doc, tsk->txtlen)) { - LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n")); - tqp->workerExit(); - delete tsk; - return (void*)0; - } - delete tsk; - } -} -#endif // IDX_THREADS - // Add document in internal form to the database: index the terms in // the title abstract and body and add special terms for file name, // date, mime type etc. , create the document data record (more diff --git a/src/rcldb/rcldb.h b/src/rcldb/rcldb.h index 9e00cff0..3f9ce1be 100644 --- a/src/rcldb/rcldb.h +++ b/src/rcldb/rcldb.h @@ -123,10 +123,6 @@ public: double dbavgdoclen; }; -#ifdef IDX_THREADS -extern void *DbUpdWorker(void*); -#endif // IDX_THREADS - inline bool has_prefix(const string& trm) { #ifndef RCL_INDEX_STRIPCHARS @@ -184,9 +180,6 @@ class Db { // A place for things we don't want visible here. class Native; friend class Native; -#ifdef IDX_THREADS - friend void *DbUpdWorker(void*); -#endif // IDX_THREADS /* General stuff (valid for query or update) ****************************/ Db(RclConfig *cfp); diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h index 4eaecea5..1c0ff74b 100644 --- a/src/rcldb/rcldb_p.h +++ b/src/rcldb/rcldb_p.h @@ -75,25 +75,12 @@ class Db::Native { // really know if this makes sense Xapian::Database& xdb() {return m_iswritable ? xwdb : xrdb;} - Native(Db *db) - : m_rcldb(db), m_isopen(false), m_iswritable(false), - m_noversionwrite(false) -#ifdef IDX_THREADS - , m_wqueue("DbUpd", 2), m_totalworkns(0LL) -#endif // IDX_THREADS - { - LOGDEB2(("Native::Native: me %p\n", this)); - } + Native(Db *db); + ~Native(); - ~Native() { - LOGDEB2(("Native::~Native: me %p\n", this)); #ifdef IDX_THREADS - if (m_iswritable) { - void *status = m_wqueue.setTerminateAndWait(); - LOGDEB2(("Native::~Native: worker status %ld\n", long(status))); - } + friend void *DbUpdWorker(void*); #endif // IDX_THREADS - } // Final steps of doc update, part which need to be single-threaded bool addOrUpdateWrite(const string& udi, const string& uniterm, diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h index b6478bb2..30d58d22 100644 --- a/src/utils/workqueue.h +++ b/src/utils/workqueue.h @@ -107,8 +107,10 @@ public: */ bool put(T t) { - if (!ok() || pthread_mutex_lock(&m_mutex) != 0) + if (!ok() || pthread_mutex_lock(&m_mutex) != 0) { + LOGERR(("WorkQueue::put: !ok or mutex_lock failed\n")); return false; + } while (ok() && m_high > 0 && m_queue.size() >= m_high) { // Keep the order: we test ok() AFTER the sleep...