diff --git a/src/common/rclconfig.cpp b/src/common/rclconfig.cpp index 65457513..d4339edd 100644 --- a/src/common/rclconfig.cpp +++ b/src/common/rclconfig.cpp @@ -343,26 +343,48 @@ bool RclConfig::getConfParam(const string &name, vector *svvp) const return false; return stringToStrings(s, *svvp); } -bool RclConfig::getConfParam(const string &name, list *svvp) const + +bool RclConfig::getConfParam(const string &name, vector *vip) const { - if (!svvp) + if (!vip) return false; - svvp->clear(); - string s; - if (!getConfParam(name, s)) + vip->clear(); + vector vs; + if (!getConfParam(name, &vs)) return false; - return stringToStrings(s, *svvp); + vip->reserve(vs.size()); + for (unsigned int i = 0; i < vs.size(); i++) { + char *ep; + vip->push_back(strtol(vs[i].c_str(), &ep, 0)); + if (ep == vs[i].c_str()) { + LOGDEB(("RclConfig::getConfParam: bad int value in [%s]\n", + name.c_str())); + return false; + } + } + return true; } -list RclConfig::getTopdirs() const +pair RclConfig::getThrConf(ThrStage who) const { - list tdl; + vector vq; + vector vt; + if (!getConfParam("thrQSizes", &vq) || !getConfParam("thrTCounts", &vt)) { + return pair(-1,-1); + } + return pair(vq[who], vt[who]); +} + +vector RclConfig::getTopdirs() const +{ + vector tdl; if (!getConfParam("topdirs", &tdl)) { - LOGERR(("RclConfig::getTopdirs: no top directories in config or bad list format\n")); + LOGERR(("RclConfig::getTopdirs: no top directories in config or " + "bad list format\n")); return tdl; } - for (list::iterator it = tdl.begin(); it != tdl.end(); it++) { + for (vector::iterator it = tdl.begin(); it != tdl.end(); it++) { *it = path_tildexpand(*it); *it = path_canon(*it); } diff --git a/src/common/rclconfig.h b/src/common/rclconfig.h index fdb1ac84..175193e7 100644 --- a/src/common/rclconfig.h +++ b/src/common/rclconfig.h @@ -112,10 +112,14 @@ class RclConfig { bool getConfParam(const string &name, int *value) const; /** Variant with autoconversion to bool */ bool getConfParam(const string &name, bool *value) const; - /** Variant with conversion to string list/vector + /** Variant with conversion to vector * (stringToStrings). Can fail if the string is malformed. */ bool getConfParam(const string &name, vector *value) const; - bool getConfParam(const string &name, list *value) const; + /** Variant with conversion to vector */ + bool getConfParam(const string &name, vector *value) const; + + enum ThrStage {ThrIntern=0, ThrSplit=1, ThrDbWrite=2}; + pair getThrConf(ThrStage who) const; /** * Get list of config names under current sk, with possible @@ -140,7 +144,7 @@ class RclConfig { /** Get list of top directories. This is needed from a number of places * and needs some cleaning-up code. An empty list is always an error, no * need for other status */ - list getTopdirs() const; + vector getTopdirs() const; /** Get database directory */ string getDbDir() const; diff --git a/src/index/fsindexer.cpp b/src/index/fsindexer.cpp index ba0f8b55..2cb49abb 100644 --- a/src/index/fsindexer.cpp +++ b/src/index/fsindexer.cpp @@ -102,33 +102,58 @@ FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc) : m_config(cnf), m_db(db), m_updater(updfunc), m_missing(new FSIFIMissingStore) #ifdef IDX_THREADS - , m_iwqueue("Internfile", 2), m_dwqueue("Split", 2) + , m_iwqueue("Internfile", cnf->getThrConf(RclConfig::ThrIntern).first), + m_dwqueue("Split", cnf->getThrConf(RclConfig::ThrSplit).first) #endif // IDX_THREADS { LOGDEB1(("FsIndexer::FsIndexer\n")); m_havelocalfields = m_config->hasNameAnywhere("localfields"); + #ifdef IDX_THREADS m_loglevel = DebugLog::getdbl()->getlevel(); - if (!m_iwqueue.start(4, FsIndexerInternfileWorker, this)) { - LOGERR(("FsIndexer::FsIndexer: worker start failed\n")); - return; - } - if (!m_dwqueue.start(2, FsIndexerDbUpdWorker, this)) { - LOGERR(("FsIndexer::FsIndexer: worker start failed\n")); - return; + m_haveInternQ = m_haveSplitQ = false; + int internqlen = cnf->getThrConf(RclConfig::ThrIntern).first; + int internthreads = cnf->getThrConf(RclConfig::ThrIntern).second; + if (internqlen >= 0) { + if (!m_iwqueue.start(internthreads, FsIndexerInternfileWorker, this)) { + LOGERR(("FsIndexer::FsIndexer: intern worker start failed\n")); + return; + } + m_haveInternQ = true; + } + int splitqlen = cnf->getThrConf(RclConfig::ThrSplit).first; + int splitthreads = cnf->getThrConf(RclConfig::ThrSplit).second; + if (splitqlen >= 0) { + if (!m_dwqueue.start(splitthreads, FsIndexerDbUpdWorker, this)) { + LOGERR(("FsIndexer::FsIndexer: split worker start failed\n")); + return; + } + m_haveSplitQ = true; } + LOGDEB(("FsIndexer: threads: haveIQ %d iql %d iqts %d " + "haveSQ %d sql %d sqts %d\n", m_haveInternQ, internqlen, + internthreads, m_haveSplitQ, splitqlen, splitthreads)); #endif // IDX_THREADS } FsIndexer::~FsIndexer() { LOGDEB1(("FsIndexer::~FsIndexer()\n")); + #ifdef IDX_THREADS - void *status = m_iwqueue.setTerminateAndWait(); - LOGDEB0(("FsIndexer: internfile wrkr status: %ld (1->ok)\n", long(status))); - status = m_dwqueue.setTerminateAndWait(); - LOGDEB0(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status))); + void *status; + if (m_haveInternQ) { + status = m_iwqueue.setTerminateAndWait(); + LOGDEB0(("FsIndexer: internfile wrkr status: %ld (1->ok)\n", + long(status))); + } + if (m_haveSplitQ) { + status = m_dwqueue.setTerminateAndWait(); + LOGDEB0(("FsIndexer: dbupd worker status: %ld (1->ok)\n", + long(status))); + } #endif // IDX_THREADS + delete m_missing; } @@ -161,7 +186,7 @@ bool FsIndexer::index() m_walker.setSkippedPaths(m_config->getSkippedPaths()); m_walker.addSkippedPath(path_tildexpand("~/.beagle")); - for (list::const_iterator it = m_tdl.begin(); + for (vector::const_iterator it = m_tdl.begin(); it != m_tdl.end(); it++) { LOGDEB(("FsIndexer::index: Indexing %s into %s\n", it->c_str(), getDbDir().c_str())); @@ -191,8 +216,10 @@ bool FsIndexer::index() } #ifdef IDX_THREADS - m_iwqueue.waitIdle(); - m_dwqueue.waitIdle(); + if (m_haveInternQ) + m_iwqueue.waitIdle(); + if (m_haveSplitQ) + m_dwqueue.waitIdle(); m_db->waitUpdIdle(); #endif // IDX_THREADS @@ -209,13 +236,14 @@ bool FsIndexer::index() return true; } -static bool matchesSkipped(const list& tdl, +static bool matchesSkipped(const vector& tdl, FsTreeWalker& walker, const string& path) { // First check what (if any) topdir this is in: string td; - for (list::const_iterator it = tdl.begin(); it != tdl.end(); it++) { + for (vector::const_iterator it = tdl.begin(); + it != tdl.end(); it++) { if (path.find(*it) == 0) { td = *it; break; @@ -308,10 +336,13 @@ bool FsIndexer::indexFiles(list& files, ConfIndexer::IxFlag flag) } #ifdef IDX_THREADS - m_iwqueue.waitIdle(); - m_dwqueue.waitIdle(); + if (m_haveInternQ) + m_iwqueue.waitIdle(); + if (m_haveSplitQ) + m_dwqueue.waitIdle(); m_db->waitUpdIdle(); #endif // IDX_THREADS + return true; } @@ -372,6 +403,10 @@ void FsIndexer::makesig(const struct stat *stp, string& out) } #ifdef IDX_THREADS +// Called updworker as seen from here, but the first step (and only in +// most meaningful configurations) is doing the word-splitting, which +// is why the task is referred as "Split" in the grand scheme of +// things. An other stage usually deals with the actual index update. void *FsIndexerDbUpdWorker(void * fsp) { recoll_threadinit(); @@ -471,13 +506,18 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp, } #ifdef IDX_THREADS - InternfileTask *tp = new InternfileTask(fn, stp); - if (!m_iwqueue.put(tp)) - return FsTreeWalker::FtwError; - return FsTreeWalker::FtwOk; -#else + if (m_haveInternQ) { + InternfileTask *tp = new InternfileTask(fn, stp); + if (m_iwqueue.put(tp)) { + return FsTreeWalker::FtwOk; + } else { + return FsTreeWalker::FtwError; + } + } +#endif + return processonefile(m_config, m_tmpdir, fn, stp); -#endif // IDX_THREADS + } @@ -505,13 +545,7 @@ FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir, makesig(stp, sig); string udi; make_udi(fn, cstr_null, udi); - bool needupdate; - { -#ifdef IDX_THREADS - PTMutexLocker locker(m_mutex); -#endif - needupdate = m_db->needUpdate(udi, sig); - } + bool needupdate = m_db->needUpdate(udi, sig); if (!needupdate) { LOGDEB0(("processone: up to date: %s\n", fn.c_str())); @@ -619,17 +653,22 @@ FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir, make_udi(fn, doc.ipath, udi); #ifdef IDX_THREADS - DbUpdTask *tp = new DbUpdTask(config, udi, doc.ipath.empty() ? - cstr_null : parent_udi, doc); - if (!m_dwqueue.put(tp)) { - LOGERR(("processonefile: wqueue.put failed\n")); - return FsTreeWalker::FtwError; + if (m_haveSplitQ) { + DbUpdTask *tp = new DbUpdTask(config, udi, doc.ipath.empty() ? + cstr_null : parent_udi, doc); + if (!m_dwqueue.put(tp)) { + LOGERR(("processonefile: wqueue.put failed\n")); + return FsTreeWalker::FtwError; + } + } else { +#endif + if (!m_db->addOrUpdate(config, udi, doc.ipath.empty() ? cstr_null : + parent_udi, doc)) { + return FsTreeWalker::FtwError; + } +#ifdef IDX_THREADS } -#else - if (!m_db->addOrUpdate(config, udi, doc.ipath.empty() ? cstr_null : - parent_udi, doc)) - return FsTreeWalker::FtwError; -#endif // IDX_THREADS +#endif // Tell what we are doing and check for interrupt request if (m_updater) { @@ -664,14 +703,19 @@ FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir, fileDoc.pcbytes = cbuf; // Document signature for up to date checks. makesig(stp, fileDoc.sig); + #ifdef IDX_THREADS - DbUpdTask *tp = new DbUpdTask(config, parent_udi, cstr_null, fileDoc); - if (!m_dwqueue.put(tp)) - return FsTreeWalker::FtwError; -#else + if (m_haveSplitQ) { + DbUpdTask *tp = new DbUpdTask(config, parent_udi, cstr_null, + fileDoc); + if (!m_dwqueue.put(tp)) + return FsTreeWalker::FtwError; + else + return FsTreeWalker::FtwOk; + } +#endif if (!m_db->addOrUpdate(config, parent_udi, cstr_null, fileDoc)) return FsTreeWalker::FtwError; -#endif // IDX_THREADS } return FsTreeWalker::FtwOk; diff --git a/src/index/fsindexer.h b/src/index/fsindexer.h index 6463535d..6ef8071c 100644 --- a/src/index/fsindexer.h +++ b/src/index/fsindexer.h @@ -85,7 +85,7 @@ class FsIndexer : public FsTreeWalkerCB { TempDir m_tmpdir; string m_reason; DbIxStatusUpdater *m_updater; - std::list m_tdl; + std::vector m_tdl; FIMissingStore *m_missing; @@ -107,6 +107,8 @@ class FsIndexer : public FsTreeWalkerCB { int m_loglevel; WorkQueue m_iwqueue; WorkQueue m_dwqueue; + bool m_haveInternQ; + bool m_haveSplitQ; #endif // IDX_THREADS bool init(); diff --git a/src/index/rclmonrcv.cpp b/src/index/rclmonrcv.cpp index 384dff53..7cecd6a6 100644 --- a/src/index/rclmonrcv.cpp +++ b/src/index/rclmonrcv.cpp @@ -172,7 +172,7 @@ void *rclMonRcvRun(void *q) // Get top directories from config - list tdl = lconfig.getTopdirs(); + vector tdl = lconfig.getTopdirs(); if (tdl.empty()) { LOGERR(("rclMonRcvRun:: top directory list (topdirs param.) not" "found in config or Directory list parse error")); @@ -184,7 +184,7 @@ void *rclMonRcvRun(void *q) FsTreeWalker walker; walker.setSkippedPaths(lconfig.getDaemSkippedPaths()); WalkCB walkcb(&lconfig, mon, queue, walker); - for (list::iterator it = tdl.begin(); it != tdl.end(); it++) { + for (vector::iterator it = tdl.begin(); it != tdl.end(); it++) { lconfig.setKeyDir(*it); // Adjust the follow symlinks options bool follow; diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index ef82c39b..4f86dbf2 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -127,6 +127,29 @@ static inline string make_parentterm(const string& udi) return pterm; } +Db::Native::Native(Db *db) + : m_rcldb(db), m_isopen(false), m_iswritable(false), + m_noversionwrite(false) +#ifdef IDX_THREADS + , m_wqueue("DbUpd", + m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first), + m_totalworkns(0LL) +#endif // IDX_THREADS +{ + LOGDEB1(("Native::Native: me %p\n", this)); +} + +Db::Native::~Native() +{ + LOGDEB1(("Native::~Native: me %p\n", this)); +#ifdef IDX_THREADS + if (m_haveWriteQ) { + void *status = m_wqueue.setTerminateAndWait(); + LOGDEB2(("Native::~Native: worker status %ld\n", long(status))); + } +#endif // IDX_THREADS +} + #ifdef IDX_THREADS void *DbUpdWorker(void* vdbp) { @@ -143,8 +166,14 @@ void *DbUpdWorker(void* vdbp) return (void*)1; } LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz))); - if (!ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm, - tsk->doc, tsk->txtlen)) { + bool status; + if (tsk->txtlen == (size_t)-1) { + status = ndbp->m_rcldb->purgeFileWrite(tsk->udi, tsk->uniterm); + } else { + status = ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm, + tsk->doc, tsk->txtlen); + } + if (!status) { LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n")); tqp->workerExit(); delete tsk; @@ -153,35 +182,31 @@ void *DbUpdWorker(void* vdbp) delete tsk; } } -#endif // IDX_THREADS -Db::Native::Native(Db *db) - : m_rcldb(db), m_isopen(false), m_iswritable(false), - m_noversionwrite(false) -#ifdef IDX_THREADS - , m_wqueue("DbUpd", 2), m_totalworkns(0LL) -#endif // IDX_THREADS -{ - LOGDEB1(("Native::Native: me %p\n", this)); -#ifdef IDX_THREADS +void Db::Native::maybeStartThreads() +{ m_loglevel = DebugLog::getdbl()->getlevel(); - if (!m_wqueue.start(1, DbUpdWorker, this)) { - LOGERR(("Db::Db: Worker start failed\n")); - return; + + m_haveWriteQ = false; + const RclConfig *cnf = m_rcldb->m_config; + int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first; + int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second; + if (writethreads > 1) { + LOGINFO(("RclDb: write threads count was forced down to 1\n")); + writethreads = 1; } -#endif // IDX_THREADS + if (writeqlen >= 0 && writethreads > 0) { + if (!m_wqueue.start(writethreads, DbUpdWorker, this)) { + LOGERR(("Db::Db: Worker start failed\n")); + return; + } + m_haveWriteQ = true; + } + LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n", + m_haveWriteQ, writeqlen, writethreads)); } -Db::Native::~Native() -{ - LOGDEB1(("Native::~Native: me %p\n", this)); -#ifdef IDX_THREADS - if (m_iswritable) { - void *status = m_wqueue.setTerminateAndWait(); - LOGDEB2(("Native::~Native: worker status %ld\n", long(status))); - } #endif // IDX_THREADS -} /* See comment in class declaration: return all subdocuments of a * document given by its unique id. @@ -313,11 +338,11 @@ int Db::Native::getPageNumberForPosition(const vector& pbreaks, bool Db::o_inPlaceReset; -Db::Db(RclConfig *cfp) - : m_ndb(0), m_config(cfp), m_idxAbsTruncLen(250), m_synthAbsLen(250), - m_synthAbsWordCtxLen(4), m_flushMb(-1), - m_curtxtsz(0), m_flushtxtsz(0), m_occtxtsz(0), m_occFirstCheck(1), - m_maxFsOccupPc(0), m_mode(Db::DbRO) +Db::Db(const RclConfig *cfp) + : m_ndb(0), m_config(cfp), m_mode(Db::DbRO), m_curtxtsz(0), m_flushtxtsz(0), + m_occtxtsz(0), m_occFirstCheck(1), + m_idxAbsTruncLen(250), m_synthAbsLen(250), m_synthAbsWordCtxLen(4), + m_flushMb(-1), m_maxFsOccupPc(0) { #ifndef RCL_INDEX_STRIPCHARS if (start_of_field_term.empty()) { @@ -390,6 +415,7 @@ bool Db::open(OpenMode mode, OpenError *error) m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION); m_ndb->m_iswritable = true; + m_ndb->maybeStartThreads(); // We open a readonly object in all cases (possibly in // addition to the r/w one) because some operations // are faster when performed through a Database: no @@ -424,8 +450,8 @@ bool Db::open(OpenMode mode, OpenError *error) // Check index format version. Must not try to check a just created or // truncated db - if (mode != DbTrunc && m_ndb->xdb().get_doccount() > 0) { - string version = m_ndb->xdb().get_metadata(cstr_RCL_IDX_VERSION_KEY); + if (mode != DbTrunc && m_ndb->xrdb.get_doccount() > 0) { + string version = m_ndb->xrdb.get_metadata(cstr_RCL_IDX_VERSION_KEY); if (version.compare(cstr_RCL_IDX_VERSION)) { m_ndb->m_noversionwrite = true; LOGERR(("Rcl::Db::open: file index [%s], software [%s]\n", @@ -541,7 +567,7 @@ int Db::termDocCnt(const string& _term) return 0; } - XAPTRY(res = m_ndb->xdb().get_termfreq(term), m_ndb->xrdb, m_reason); + XAPTRY(res = m_ndb->xrdb.get_termfreq(term), m_ndb->xrdb, m_reason); if (!m_reason.empty()) { LOGERR(("Db::termDocCnt: got error: %s\n", m_reason.c_str())); @@ -1110,16 +1136,20 @@ bool Db::addOrUpdate(RclConfig *config, const string &udi, newdocument.set_data(record); #ifdef IDX_THREADS - DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length()); - if (!m_ndb->m_wqueue.put(tp)) { - LOGERR(("Db::addOrUpdate:Cant queue task\n")); - return false; + if (m_ndb->m_haveWriteQ) { + DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, + doc.text.length()); + if (!m_ndb->m_wqueue.put(tp)) { + LOGERR(("Db::addOrUpdate:Cant queue task\n")); + return false; + } else { + return true; + } } - return true; -#else +#endif + return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument, doc.text.length()); -#endif // IDX_THREADS } bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, @@ -1127,7 +1157,13 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, { #ifdef IDX_THREADS Chrono chron; + // In the case where there is a separate (single) db update + // thread, we only need to protect the update map update below + // (against interaction with threads calling needUpdate()). Else, + // all threads from above need to synchronize here + PTMutexLocker lock(m_mutex, m_haveWriteQ); #endif + // Check file system full every mbyte of indexed text. It's a bit wasteful // to do this after having prepared the document, but it needs to be in // the single-threaded section. @@ -1155,7 +1191,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, #ifdef IDX_THREADS // Need to protect against interaction with the up-to-date checks // which also update the existence map - PTMutexLocker lock(m_rcldb->m_ndb->m_mutex); + PTMutexLocker lock(m_mutex, !m_haveWriteQ); #endif if (did < m_rcldb->updated.size()) { m_rcldb->updated[did] = true; @@ -1191,18 +1227,21 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, #ifdef IDX_THREADS void Db::waitUpdIdle() { - Chrono chron; - m_ndb->m_wqueue.waitIdle(); - string ermsg; - try { - m_ndb->xwdb.flush(); - } XCATCHERROR(ermsg); - if (!ermsg.empty()) { - LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str())); + if (m_ndb->m_haveWriteQ) { + Chrono chron; + m_ndb->m_wqueue.waitIdle(); + // We flush here just for correct measurement of the thread work time + string ermsg; + try { + m_ndb->xwdb.flush(); + } XCATCHERROR(ermsg); + if (!ermsg.empty()) { + LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str())); + } + m_ndb->m_totalworkns += chron.nanos(); + LOGDEB(("Db::waitUpdIdle: total work %lld mS\n", + m_ndb->m_totalworkns/1000000)); } - m_ndb->m_totalworkns += chron.nanos(); - LOGDEB(("Db::waitUpdIdle: total work %lld mS\n", - m_ndb->m_totalworkns/1000000)); } #endif @@ -1243,6 +1282,13 @@ bool Db::needUpdate(const string &udi, const string& sig) string uniterm = make_uniterm(udi); string ermsg; +#ifdef IDX_THREADS + // Need to protect against interaction with the doc update/insert + // thread which also updates the existence map, and even multiple + // accesses to the readonly Xapian::Database are not allowed + // anyway + PTMutexLocker lock(m_ndb->m_mutex); +#endif // We look up the document indexed by the uniterm. This is either // the actual document file, or, for a multi-document file, the // pseudo-doc we create to stand for the file itself. @@ -1277,12 +1323,6 @@ bool Db::needUpdate(const string &udi, const string& sig) // Set the uptodate flag for doc / pseudo doc if (m_mode != DbRO) { -#ifdef IDX_THREADS - // Need to protect against interaction with the doc - // update/insert thread which also updates the - // existence map - PTMutexLocker lock(m_ndb->m_mutex); -#endif updated[*docid] = true; // Set the existence flag for all the subdocs (if any) @@ -1372,7 +1412,13 @@ bool Db::purge() return false; #ifdef IDX_THREADS - m_ndb->m_wqueue.waitIdle(); + // If we manage our own write queue, make sure it's drained and closed + if (m_ndb->m_haveWriteQ) + m_ndb->m_wqueue.setTerminateAndWait(); + // else we need to lock out other top level threads. This is just + // a precaution as they should have been waited for by the top + // level actor at this point + PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); #endif // IDX_THREADS // For xapian versions up to 1.0.1, deleting a non-existant @@ -1390,8 +1436,6 @@ bool Db::purge() // Walk the document array and delete any xapian document whose // flag is not set (we did not see its source during indexing). - // Threads: we do not need a mutex here as the indexing threads - // are necessarily done at this point. int purgecount = 0; for (Xapian::docid docid = 1; docid < updated.size(); ++docid) { if (!updated[docid]) { @@ -1436,6 +1480,30 @@ bool Db::purge() return true; } +// Test for doc existence. +bool Db::docExists(const string& uniterm) +{ +#ifdef IDX_THREADS + // If we're not running our own (single) thread, need to protect + // read db against multiaccess (e.g. from needUpdate(), or this method). + PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); +#endif + + string ermsg; + try { + Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm); + if (docid == m_ndb->xrdb.postlist_end(uniterm)) { + return false; + } else { + return true; + } + } XCATCHERROR(ermsg); + if (!ermsg.empty()) { + LOGERR(("Db::docExists(%s) %s\n", uniterm.c_str(), ermsg.c_str())); + } + return false; +} + /* Delete document(s) for given unique identifier (doc and descendents) */ bool Db::purgeFile(const string &udi, bool *existed) { @@ -1443,21 +1511,44 @@ bool Db::purgeFile(const string &udi, bool *existed) if (m_ndb == 0 || !m_ndb->m_iswritable) return false; + string uniterm = make_uniterm(udi); + bool exists = docExists(uniterm); + if (existed) + *existed = exists; + if (!exists) + return true; + #ifdef IDX_THREADS - m_ndb->m_wqueue.waitIdle(); + if (m_ndb->m_haveWriteQ) { + Xapian::Document xdoc; + DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1); + if (!m_ndb->m_wqueue.put(tp)) { + LOGERR(("Db::purgeFile:Cant queue task\n")); + return false; + } else { + return true; + } + } +#endif + + return purgeFileWrite(udi, uniterm); +} + +bool Db::purgeFileWrite(const string& udi, const string& uniterm) +{ +#if defined(IDX_THREADS) + // If we have a write queue we're called from there, and single threaded, no locking. + // Else need to mutex other threads from above + PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); #endif // IDX_THREADS Xapian::WritableDatabase db = m_ndb->xwdb; - string uniterm = make_uniterm(udi); string ermsg; try { Xapian::PostingIterator docid = db.postlist_begin(uniterm); if (docid == db.postlist_end(uniterm)) { - if (existed) - *existed = false; return true; } - *existed = true; LOGDEB(("purgeFile: delete docid %d\n", *docid)); if (m_flushMb > 0) { Xapian::termcount trms = m_ndb->xwdb.get_doclength(*docid); @@ -1613,7 +1704,7 @@ bool Db::termMatch(MatchType typ, const string &lang, { if (!m_ndb || !m_ndb->m_isopen) return false; - Xapian::Database xdb = m_ndb->xdb(); + Xapian::Database xdb = m_ndb->xrdb; XAPTRY(res.dbdoccount = xdb.get_doccount(); res.dbavgdoclen = xdb.get_avlength(), xdb, m_reason); @@ -1769,7 +1860,7 @@ TermIter *Db::termWalkOpen() return 0; TermIter *tit = new TermIter; if (tit) { - tit->db = m_ndb->xdb(); + tit->db = m_ndb->xrdb; XAPTRY(tit->it = tit->db.allterms_begin(), tit->db, m_reason); if (!m_reason.empty()) { LOGERR(("Db::termWalkOpen: xapian error: %s\n", m_reason.c_str())); @@ -1804,7 +1895,7 @@ bool Db::termExists(const string& word) if (!m_ndb || !m_ndb->m_isopen) return 0; - XAPTRY(if (!m_ndb->xdb().term_exists(word)) return false, + XAPTRY(if (!m_ndb->xrdb.term_exists(word)) return false, m_ndb->xrdb, m_reason); if (!m_reason.empty()) { diff --git a/src/rcldb/rcldb.h b/src/rcldb/rcldb.h index 3f9ce1be..dd846920 100644 --- a/src/rcldb/rcldb.h +++ b/src/rcldb/rcldb.h @@ -182,7 +182,7 @@ class Db { friend class Native; /* General stuff (valid for query or update) ****************************/ - Db(RclConfig *cfp); + Db(const RclConfig *cfp); ~Db(); enum OpenMode {DbRO, DbUpd, DbTrunc}; @@ -356,7 +356,7 @@ class Db { bool stemDiffers(const string& lang, const string& term, const string& base); - RclConfig *getConf() {return m_config;} + const RclConfig *getConf() {return m_config;} /** Activate the "in place reset" mode where all documents are @@ -368,15 +368,31 @@ class Db { /* This has to be public for access by embedded Query::Native */ Native *m_ndb; - + bool purgeFileWrite(const string& udi, const string& uniterm); private: - // Internal form of close, can be called during destruction - bool i_close(bool final); - - RclConfig *m_config; + const RclConfig *m_config; string m_reason; // Error explanation - /* Parameters cached out of the configuration files */ + // Xapian directories for additional databases to query + vector m_extraDbs; + OpenMode m_mode; + // File existence vector: this is filled during the indexing pass. Any + // document whose bit is not set at the end is purged + vector updated; + // Stop terms: those don't get indexed. + StopList m_stops; + // Text bytes indexed since beginning + long long m_curtxtsz; + // Text bytes at last flush + long long m_flushtxtsz; + // Text bytes at last fsoccup check + long long m_occtxtsz; + // First fs occup check ? + int m_occFirstCheck; + + /*************** + * Parameters cached out of the configuration files. Logically const + * after init */ // This is how long an abstract we keep or build from beginning of // text when indexing. It only has an influence on the size of the // db as we are free to shorten it again when displaying @@ -389,32 +405,19 @@ private: int m_synthAbsWordCtxLen; // Flush threshold. Megabytes of text indexed before we flush. int m_flushMb; - // Text bytes indexed since beginning - long long m_curtxtsz; - // Text bytes at last flush - long long m_flushtxtsz; - // Text bytes at last fsoccup check - long long m_occtxtsz; - // First fs occup check ? - int m_occFirstCheck; // Maximum file system occupation percentage int m_maxFsOccupPc; // Database directory string m_basedir; - // Xapian directories for additional databases to query - vector m_extraDbs; - OpenMode m_mode; - // File existence vector: this is filled during the indexing pass. Any - // document whose bit is not set at the end is purged - vector updated; - // Stop terms: those don't get indexed. - StopList m_stops; // When this is set, all documents are considered as needing a reindex. // This implements an alternative to just erasing the index before // beginning, with the advantage that, for small index formats updates, // between releases the index remains available while being recreated. static bool o_inPlaceReset; + /******* End logical constnesss */ + // Internal form of close, can be called during destruction + bool i_close(bool final); // Reinitialize when adding/removing additional dbs bool adjustdbs(); bool stemExpand(const string &lang, const string &s, @@ -422,6 +425,7 @@ private: // Flush when idxflushmb is reached bool maybeflush(off_t moretext); + bool docExists(const string& uniterm); /* Copyconst and assignement private and forbidden */ Db(const Db &) {} diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h index 1c0ff74b..cdfda99f 100644 --- a/src/rcldb/rcldb_p.h +++ b/src/rcldb/rcldb_p.h @@ -36,6 +36,8 @@ namespace Rcl { class Query; #ifdef IDX_THREADS +// Task for the index update thread. This can be either and add/update +// or a purge op, in which case the txtlen is (size_t)-1 class DbUpdTask { public: DbUpdTask(const string& ud, const string& un, const Xapian::Document &d, @@ -64,6 +66,8 @@ class Db::Native { int m_loglevel; PTMutexInit m_mutex; long long m_totalworkns; + bool m_haveWriteQ; + void maybeStartThreads(); #endif // IDX_THREADS // Indexing @@ -71,10 +75,6 @@ class Db::Native { // Querying (active even if the wdb is too) Xapian::Database xrdb; - // We sometimes go through the wdb for some query ops, don't - // really know if this makes sense - Xapian::Database& xdb() {return m_iswritable ? xwdb : xrdb;} - Native(Db *db); ~Native(); diff --git a/src/utils/ptmutex.h b/src/utils/ptmutex.h index 67cdb2e3..9aacfc4f 100644 --- a/src/utils/ptmutex.h +++ b/src/utils/ptmutex.h @@ -22,7 +22,8 @@ /// A trivial wrapper/helper for pthread mutex locks -/// Init lock. Used as a single PTMutexInit static object. +/// Lock storage with auto-initialization. Must be created before any +/// lock-using thread of course (possibly as a static object). class PTMutexInit { public: pthread_mutex_t m_mutex; @@ -32,16 +33,21 @@ public: } }; -/// Take the lock when constructed, release when deleted +/// Take the lock when constructed, release when deleted. Can be disabled +/// by constructor params for conditional use. class PTMutexLocker { public: - PTMutexLocker(PTMutexInit& l) : m_lock(l) + // The nolock arg enables conditional locking + PTMutexLocker(PTMutexInit& l, bool nolock = false) + : m_lock(l), m_status(-1) { - m_status = pthread_mutex_lock(&m_lock.m_mutex); + if (!nolock) + m_status = pthread_mutex_lock(&m_lock.m_mutex); } ~PTMutexLocker() { - pthread_mutex_unlock(&m_lock.m_mutex); + if (m_status == 0) + pthread_mutex_unlock(&m_lock.m_mutex); } int ok() {return m_status == 0;} private: diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h index 30d58d22..7c4bbb09 100644 --- a/src/utils/workqueue.h +++ b/src/utils/workqueue.h @@ -57,7 +57,7 @@ public: /** Create a WorkQueue * @param name for message printing * @param hi number of tasks on queue before clients blocks. Default 0 - * meaning no limit. + * meaning no limit. hi == -1 means that the queue is disabled. * @param lo minimum count of tasks before worker starts. Default 1. */ WorkQueue(const string& name, int hi = 0, int lo = 1) @@ -66,14 +66,14 @@ public: m_clients_waiting(0), m_tottasks(0), m_nowake(0), m_workersleeps(0) { - m_ok = (pthread_cond_init(&m_ccond, 0) == 0) && + m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) && (pthread_cond_init(&m_wcond, 0) == 0) && (pthread_mutex_init(&m_mutex, 0) == 0); } ~WorkQueue() { - LOGDEB2(("WorkQueue::~WorkQueue: name %s\n", m_name.c_str())); + LOGDEB2(("WorkQueue::~WorkQueue:%s\n", m_name.c_str())); if (!m_worker_threads.empty()) setTerminateAndWait(); } @@ -88,16 +88,19 @@ public: */ bool start(int nworkers, void *(*start_routine)(void *), void *arg) { + pthread_mutex_lock(&m_mutex); for (int i = 0; i < nworkers; i++) { int err; pthread_t thr; if ((err = pthread_create(&thr, 0, start_routine, arg))) { LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n", m_name.c_str(), err)); + pthread_mutex_unlock(&m_mutex); return false; } m_worker_threads.insert(pair(thr, WQTData())); } + pthread_mutex_unlock(&m_mutex); return true; } @@ -107,8 +110,9 @@ public: */ bool put(T t) { - if (!ok() || pthread_mutex_lock(&m_mutex) != 0) { - LOGERR(("WorkQueue::put: !ok or mutex_lock failed\n")); + if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { + LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n", + m_name.c_str())); return false; } @@ -141,12 +145,15 @@ public: * back sleeping. Used by the client to wait for all current work * to be completed, when it needs to perform work that couldn't be * done in parallel with the worker's tasks, or before shutting - * down. Work can be resumed after calling this. + * down. Work can be resumed after calling this. Note that the only thread + * which can call it safely is the client just above (which can + control the task flow), else there could be + * tasks in the intermediate queues. */ bool waitIdle() { - if (!ok() || pthread_mutex_lock(&m_mutex) != 0) { - LOGERR(("WorkQueue::waitIdle: %s not ok or can't lock\n", + if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { + LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n", m_name.c_str())); return false; } @@ -159,7 +166,8 @@ public: if (pthread_cond_wait(&m_ccond, &m_mutex)) { m_clients_waiting--; m_ok = false; - LOGERR(("WorkQueue::waitIdle: cond_wait failed\n")); + LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n", + m_name.c_str())); pthread_mutex_unlock(&m_mutex); return false; } @@ -192,7 +200,8 @@ public: m_clients_waiting++; if (pthread_cond_wait(&m_ccond, &m_mutex)) { pthread_mutex_unlock(&m_mutex); - LOGERR(("WorkQueue::setTerminate: cond_wait failed\n")); + LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n", + m_name.c_str())); m_clients_waiting--; return (void*)0; } @@ -225,8 +234,10 @@ public: */ bool take(T* tp, size_t *szp = 0) { - if (!ok() || pthread_mutex_lock(&m_mutex) != 0) + if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) { + LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str())); return false; + } while (ok() && m_queue.size() < m_low) { m_workersleeps++; @@ -269,6 +280,7 @@ public: */ void workerExit() { + LOGDEB(("workerExit:%s\n", m_name.c_str())); if (pthread_mutex_lock(&m_mutex) != 0) return; m_workers_exited++; @@ -288,7 +300,13 @@ public: private: bool ok() { - return m_ok && m_workers_exited == 0 && !m_worker_threads.empty(); + bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty(); + if (!isok) { + LOGDEB(("WorkQueue:ok:%s: not ok m_ok %d m_workers_exited %d " + "m_worker_threads size %d\n", m_name.c_str(), + m_ok, m_workers_exited, int(m_worker_threads.size()))); + } + return isok; } long long nanodiff(const struct timespec& older,