ensure that indexing threads operations are transparent to high level callers such as the real time monitor

This commit is contained in:
Jean-Francois Dockes 2012-11-26 15:40:31 +01:00
parent d7962cb5c2
commit 8ac559677c
5 changed files with 76 additions and 67 deletions

View file

@ -105,6 +105,7 @@ FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
, m_iwqueue("Internfile", 2), m_dwqueue("Split", 2) , m_iwqueue("Internfile", 2), m_dwqueue("Split", 2)
#endif // IDX_THREADS #endif // IDX_THREADS
{ {
LOGDEB1(("FsIndexer::FsIndexer\n"));
m_havelocalfields = m_config->hasNameAnywhere("localfields"); m_havelocalfields = m_config->hasNameAnywhere("localfields");
#ifdef IDX_THREADS #ifdef IDX_THREADS
m_loglevel = DebugLog::getdbl()->getlevel(); m_loglevel = DebugLog::getdbl()->getlevel();
@ -119,13 +120,14 @@ FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
#endif // IDX_THREADS #endif // IDX_THREADS
} }
FsIndexer::~FsIndexer() { FsIndexer::~FsIndexer()
{
LOGDEB1(("FsIndexer::~FsIndexer()\n"));
#ifdef IDX_THREADS #ifdef IDX_THREADS
void *status = m_iwqueue.setTerminateAndWait(); void *status = m_iwqueue.setTerminateAndWait();
LOGINFO(("FsIndexer: internfile wrker status: %ld (1->ok)\n", LOGDEB0(("FsIndexer: internfile wrkr status: %ld (1->ok)\n", long(status)));
long(status)));
status = m_dwqueue.setTerminateAndWait(); status = m_dwqueue.setTerminateAndWait();
LOGINFO(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status))); LOGDEB0(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status)));
#endif // IDX_THREADS #endif // IDX_THREADS
delete m_missing; delete m_missing;
} }
@ -305,6 +307,11 @@ bool FsIndexer::indexFiles(list<string>& files, ConfIndexer::IxFlag flag)
it = files.erase(it); it = files.erase(it);
} }
#ifdef IDX_THREADS
m_iwqueue.waitIdle();
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
return true; return true;
} }
@ -314,6 +321,7 @@ bool FsIndexer::purgeFiles(list<string>& files)
{ {
if (!init()) if (!init())
return false; return false;
for (list<string>::iterator it = files.begin(); it != files.end(); ) { for (list<string>::iterator it = files.begin(); it != files.end(); ) {
string udi; string udi;
make_udi(*it, cstr_null, udi); make_udi(*it, cstr_null, udi);
@ -378,7 +386,7 @@ void *FsIndexerDbUpdWorker(void * fsp)
tqp->workerExit(); tqp->workerExit();
return (void*)1; return (void*)1;
} }
LOGDEB(("FsIndexerDbUpdWorker: got task, ql %d\n", int(qsz))); LOGDEB0(("FsIndexerDbUpdWorker: task ql %d\n", int(qsz)));
if (!fip->m_db->addOrUpdate(tsk->config, tsk->udi, tsk->parent_udi, if (!fip->m_db->addOrUpdate(tsk->config, tsk->udi, tsk->parent_udi,
tsk->doc)) { tsk->doc)) {
LOGERR(("FsIndexerDbUpdWorker: addOrUpdate failed\n")); LOGERR(("FsIndexerDbUpdWorker: addOrUpdate failed\n"));
@ -404,7 +412,7 @@ void *FsIndexerInternfileWorker(void * fsp)
tqp->workerExit(); tqp->workerExit();
return (void*)1; return (void*)1;
} }
LOGDEB1(("FsIndexerInternfileWorker: fn %s\n", tsk->fn.c_str())); LOGDEB0(("FsIndexerInternfileWorker: task fn %s\n", tsk->fn.c_str()));
if (fip->processonefile(myconf, tmpdir, tsk->fn, &tsk->statbuf) != if (fip->processonefile(myconf, tmpdir, tsk->fn, &tsk->statbuf) !=
FsTreeWalker::FtwOk) { FsTreeWalker::FtwOk) {
LOGERR(("FsIndexerInternfileWorker: processone failed\n")); LOGERR(("FsIndexerInternfileWorker: processone failed\n"));

View file

@ -127,6 +127,62 @@ static inline string make_parentterm(const string& udi)
return pterm; return pterm;
} }
#ifdef IDX_THREADS
void *DbUpdWorker(void* vdbp)
{
recoll_threadinit();
Db::Native *ndbp = (Db::Native *)vdbp;
WorkQueue<DbUpdTask*> *tqp = &(ndbp->m_wqueue);
DebugLog::getdbl()->setloglevel(ndbp->m_loglevel);
DbUpdTask *tsk;
for (;;) {
size_t qsz;
if (!tqp->take(&tsk, &qsz)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
if (!ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen)) {
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
tqp->workerExit();
delete tsk;
return (void*)0;
}
delete tsk;
}
}
#endif // IDX_THREADS
Db::Native::Native(Db *db)
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue("DbUpd", 2), m_totalworkns(0LL)
#endif // IDX_THREADS
{
LOGDEB1(("Native::Native: me %p\n", this));
#ifdef IDX_THREADS
m_loglevel = DebugLog::getdbl()->getlevel();
if (!m_wqueue.start(1, DbUpdWorker, this)) {
LOGERR(("Db::Db: Worker start failed\n"));
return;
}
#endif // IDX_THREADS
}
Db::Native::~Native()
{
LOGDEB1(("Native::~Native: me %p\n", this));
#ifdef IDX_THREADS
if (m_iswritable) {
void *status = m_wqueue.setTerminateAndWait();
LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
}
#endif // IDX_THREADS
}
/* See comment in class declaration: return all subdocuments of a /* See comment in class declaration: return all subdocuments of a
* document given by its unique id. * document given by its unique id.
*/ */
@ -280,15 +336,6 @@ Db::Db(RclConfig *cfp)
m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc); m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc);
m_config->getConfParam("idxflushmb", &m_flushMb); m_config->getConfParam("idxflushmb", &m_flushMb);
} }
#ifdef IDX_THREADS
if (m_ndb) {
m_ndb->m_loglevel = DebugLog::getdbl()->getlevel();
if (!m_ndb->m_wqueue.start(1, DbUpdWorker, this)) {
LOGERR(("Db::Db: Worker start failed\n"));
return;
}
}
#endif // IDX_THREADS
} }
Db::~Db() Db::~Db()
@ -791,34 +838,6 @@ static const string cstr_nc("\n\r\x0c");
#define RECORD_APPEND(R, NM, VAL) {R += NM + "=" + VAL + "\n";} #define RECORD_APPEND(R, NM, VAL) {R += NM + "=" + VAL + "\n";}
#ifdef IDX_THREADS
void *DbUpdWorker(void* vdbp)
{
recoll_threadinit();
Db *dbp = (Db *)vdbp;
WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
DebugLog::getdbl()->setloglevel(dbp->m_ndb->m_loglevel);
DbUpdTask *tsk;
for (;;) {
size_t qsz;
if (!tqp->take(&tsk, &qsz)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen)) {
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
tqp->workerExit();
delete tsk;
return (void*)0;
}
delete tsk;
}
}
#endif // IDX_THREADS
// Add document in internal form to the database: index the terms in // Add document in internal form to the database: index the terms in
// the title abstract and body and add special terms for file name, // the title abstract and body and add special terms for file name,
// date, mime type etc. , create the document data record (more // date, mime type etc. , create the document data record (more

View file

@ -123,10 +123,6 @@ public:
double dbavgdoclen; double dbavgdoclen;
}; };
#ifdef IDX_THREADS
extern void *DbUpdWorker(void*);
#endif // IDX_THREADS
inline bool has_prefix(const string& trm) inline bool has_prefix(const string& trm)
{ {
#ifndef RCL_INDEX_STRIPCHARS #ifndef RCL_INDEX_STRIPCHARS
@ -184,9 +180,6 @@ class Db {
// A place for things we don't want visible here. // A place for things we don't want visible here.
class Native; class Native;
friend class Native; friend class Native;
#ifdef IDX_THREADS
friend void *DbUpdWorker(void*);
#endif // IDX_THREADS
/* General stuff (valid for query or update) ****************************/ /* General stuff (valid for query or update) ****************************/
Db(RclConfig *cfp); Db(RclConfig *cfp);

View file

@ -75,25 +75,12 @@ class Db::Native {
// really know if this makes sense // really know if this makes sense
Xapian::Database& xdb() {return m_iswritable ? xwdb : xrdb;} Xapian::Database& xdb() {return m_iswritable ? xwdb : xrdb;}
Native(Db *db) Native(Db *db);
: m_rcldb(db), m_isopen(false), m_iswritable(false), ~Native();
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue("DbUpd", 2), m_totalworkns(0LL)
#endif // IDX_THREADS
{
LOGDEB2(("Native::Native: me %p\n", this));
}
~Native() {
LOGDEB2(("Native::~Native: me %p\n", this));
#ifdef IDX_THREADS #ifdef IDX_THREADS
if (m_iswritable) { friend void *DbUpdWorker(void*);
void *status = m_wqueue.setTerminateAndWait();
LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
}
#endif // IDX_THREADS #endif // IDX_THREADS
}
// Final steps of doc update, part which need to be single-threaded // Final steps of doc update, part which need to be single-threaded
bool addOrUpdateWrite(const string& udi, const string& uniterm, bool addOrUpdateWrite(const string& udi, const string& uniterm,

View file

@ -107,8 +107,10 @@ public:
*/ */
bool put(T t) bool put(T t)
{ {
if (!ok() || pthread_mutex_lock(&m_mutex) != 0) if (!ok() || pthread_mutex_lock(&m_mutex) != 0) {
LOGERR(("WorkQueue::put: !ok or mutex_lock failed\n"));
return false; return false;
}
while (ok() && m_high > 0 && m_queue.size() >= m_high) { while (ok() && m_high > 0 && m_queue.size() >= m_high) {
// Keep the order: we test ok() AFTER the sleep... // Keep the order: we test ok() AFTER the sleep...