implemented configuration for thread structure. Still crashes, but not often...

This commit is contained in:
Jean-Francois Dockes 2012-11-29 17:26:48 +01:00
parent 30516ceb52
commit 2f800e3eb3
10 changed files with 368 additions and 177 deletions

View file

@ -343,26 +343,48 @@ bool RclConfig::getConfParam(const string &name, vector<string> *svvp) const
return false;
return stringToStrings(s, *svvp);
}
bool RclConfig::getConfParam(const string &name, list<string> *svvp) const
bool RclConfig::getConfParam(const string &name, vector<int> *vip) const
{
if (!svvp)
if (!vip)
return false;
svvp->clear();
string s;
if (!getConfParam(name, s))
vip->clear();
vector<string> vs;
if (!getConfParam(name, &vs))
return false;
return stringToStrings(s, *svvp);
vip->reserve(vs.size());
for (unsigned int i = 0; i < vs.size(); i++) {
char *ep;
vip->push_back(strtol(vs[i].c_str(), &ep, 0));
if (ep == vs[i].c_str()) {
LOGDEB(("RclConfig::getConfParam: bad int value in [%s]\n",
name.c_str()));
return false;
}
}
return true;
}
list<string> RclConfig::getTopdirs() const
pair<int,int> RclConfig::getThrConf(ThrStage who) const
{
list<string> tdl;
vector<int> vq;
vector<int> vt;
if (!getConfParam("thrQSizes", &vq) || !getConfParam("thrTCounts", &vt)) {
return pair<int,int>(-1,-1);
}
return pair<int,int>(vq[who], vt[who]);
}
vector<string> RclConfig::getTopdirs() const
{
vector<string> tdl;
if (!getConfParam("topdirs", &tdl)) {
LOGERR(("RclConfig::getTopdirs: no top directories in config or bad list format\n"));
LOGERR(("RclConfig::getTopdirs: no top directories in config or "
"bad list format\n"));
return tdl;
}
for (list<string>::iterator it = tdl.begin(); it != tdl.end(); it++) {
for (vector<string>::iterator it = tdl.begin(); it != tdl.end(); it++) {
*it = path_tildexpand(*it);
*it = path_canon(*it);
}

View file

@ -112,10 +112,14 @@ class RclConfig {
bool getConfParam(const string &name, int *value) const;
/** Variant with autoconversion to bool */
bool getConfParam(const string &name, bool *value) const;
/** Variant with conversion to string list/vector
/** Variant with conversion to vector<string>
* (stringToStrings). Can fail if the string is malformed. */
bool getConfParam(const string &name, vector<string> *value) const;
bool getConfParam(const string &name, list<string> *value) const;
/** Variant with conversion to vector<int> */
bool getConfParam(const string &name, vector<int> *value) const;
enum ThrStage {ThrIntern=0, ThrSplit=1, ThrDbWrite=2};
pair<int, int> getThrConf(ThrStage who) const;
/**
* Get list of config names under current sk, with possible
@ -140,7 +144,7 @@ class RclConfig {
/** Get list of top directories. This is needed from a number of places
* and needs some cleaning-up code. An empty list is always an error, no
* need for other status */
list<string> getTopdirs() const;
vector<string> getTopdirs() const;
/** Get database directory */
string getDbDir() const;

View file

@ -102,33 +102,58 @@ FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
: m_config(cnf), m_db(db), m_updater(updfunc),
m_missing(new FSIFIMissingStore)
#ifdef IDX_THREADS
, m_iwqueue("Internfile", 2), m_dwqueue("Split", 2)
, m_iwqueue("Internfile", cnf->getThrConf(RclConfig::ThrIntern).first),
m_dwqueue("Split", cnf->getThrConf(RclConfig::ThrSplit).first)
#endif // IDX_THREADS
{
LOGDEB1(("FsIndexer::FsIndexer\n"));
m_havelocalfields = m_config->hasNameAnywhere("localfields");
#ifdef IDX_THREADS
m_loglevel = DebugLog::getdbl()->getlevel();
if (!m_iwqueue.start(4, FsIndexerInternfileWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
m_haveInternQ = m_haveSplitQ = false;
int internqlen = cnf->getThrConf(RclConfig::ThrIntern).first;
int internthreads = cnf->getThrConf(RclConfig::ThrIntern).second;
if (internqlen >= 0) {
if (!m_iwqueue.start(internthreads, FsIndexerInternfileWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: intern worker start failed\n"));
return;
}
if (!m_dwqueue.start(2, FsIndexerDbUpdWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
m_haveInternQ = true;
}
int splitqlen = cnf->getThrConf(RclConfig::ThrSplit).first;
int splitthreads = cnf->getThrConf(RclConfig::ThrSplit).second;
if (splitqlen >= 0) {
if (!m_dwqueue.start(splitthreads, FsIndexerDbUpdWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: split worker start failed\n"));
return;
}
m_haveSplitQ = true;
}
LOGDEB(("FsIndexer: threads: haveIQ %d iql %d iqts %d "
"haveSQ %d sql %d sqts %d\n", m_haveInternQ, internqlen,
internthreads, m_haveSplitQ, splitqlen, splitthreads));
#endif // IDX_THREADS
}
FsIndexer::~FsIndexer()
{
LOGDEB1(("FsIndexer::~FsIndexer()\n"));
#ifdef IDX_THREADS
void *status = m_iwqueue.setTerminateAndWait();
LOGDEB0(("FsIndexer: internfile wrkr status: %ld (1->ok)\n", long(status)));
void *status;
if (m_haveInternQ) {
status = m_iwqueue.setTerminateAndWait();
LOGDEB0(("FsIndexer: internfile wrkr status: %ld (1->ok)\n",
long(status)));
}
if (m_haveSplitQ) {
status = m_dwqueue.setTerminateAndWait();
LOGDEB0(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status)));
LOGDEB0(("FsIndexer: dbupd worker status: %ld (1->ok)\n",
long(status)));
}
#endif // IDX_THREADS
delete m_missing;
}
@ -161,7 +186,7 @@ bool FsIndexer::index()
m_walker.setSkippedPaths(m_config->getSkippedPaths());
m_walker.addSkippedPath(path_tildexpand("~/.beagle"));
for (list<string>::const_iterator it = m_tdl.begin();
for (vector<string>::const_iterator it = m_tdl.begin();
it != m_tdl.end(); it++) {
LOGDEB(("FsIndexer::index: Indexing %s into %s\n", it->c_str(),
getDbDir().c_str()));
@ -191,7 +216,9 @@ bool FsIndexer::index()
}
#ifdef IDX_THREADS
if (m_haveInternQ)
m_iwqueue.waitIdle();
if (m_haveSplitQ)
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
@ -209,13 +236,14 @@ bool FsIndexer::index()
return true;
}
static bool matchesSkipped(const list<string>& tdl,
static bool matchesSkipped(const vector<string>& tdl,
FsTreeWalker& walker,
const string& path)
{
// First check what (if any) topdir this is in:
string td;
for (list<string>::const_iterator it = tdl.begin(); it != tdl.end(); it++) {
for (vector<string>::const_iterator it = tdl.begin();
it != tdl.end(); it++) {
if (path.find(*it) == 0) {
td = *it;
break;
@ -308,10 +336,13 @@ bool FsIndexer::indexFiles(list<string>& files, ConfIndexer::IxFlag flag)
}
#ifdef IDX_THREADS
if (m_haveInternQ)
m_iwqueue.waitIdle();
if (m_haveSplitQ)
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
return true;
}
@ -372,6 +403,10 @@ void FsIndexer::makesig(const struct stat *stp, string& out)
}
#ifdef IDX_THREADS
// Called updworker as seen from here, but the first step (and only in
// most meaningful configurations) is doing the word-splitting, which
// is why the task is referred as "Split" in the grand scheme of
// things. An other stage usually deals with the actual index update.
void *FsIndexerDbUpdWorker(void * fsp)
{
recoll_threadinit();
@ -471,13 +506,18 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
}
#ifdef IDX_THREADS
if (m_haveInternQ) {
InternfileTask *tp = new InternfileTask(fn, stp);
if (!m_iwqueue.put(tp))
return FsTreeWalker::FtwError;
if (m_iwqueue.put(tp)) {
return FsTreeWalker::FtwOk;
#else
} else {
return FsTreeWalker::FtwError;
}
}
#endif
return processonefile(m_config, m_tmpdir, fn, stp);
#endif // IDX_THREADS
}
@ -505,13 +545,7 @@ FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir,
makesig(stp, sig);
string udi;
make_udi(fn, cstr_null, udi);
bool needupdate;
{
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
#endif
needupdate = m_db->needUpdate(udi, sig);
}
bool needupdate = m_db->needUpdate(udi, sig);
if (!needupdate) {
LOGDEB0(("processone: up to date: %s\n", fn.c_str()));
@ -619,17 +653,22 @@ FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir,
make_udi(fn, doc.ipath, udi);
#ifdef IDX_THREADS
if (m_haveSplitQ) {
DbUpdTask *tp = new DbUpdTask(config, udi, doc.ipath.empty() ?
cstr_null : parent_udi, doc);
if (!m_dwqueue.put(tp)) {
LOGERR(("processonefile: wqueue.put failed\n"));
return FsTreeWalker::FtwError;
}
#else
} else {
#endif
if (!m_db->addOrUpdate(config, udi, doc.ipath.empty() ? cstr_null :
parent_udi, doc))
parent_udi, doc)) {
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
}
#ifdef IDX_THREADS
}
#endif
// Tell what we are doing and check for interrupt request
if (m_updater) {
@ -664,14 +703,19 @@ FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir,
fileDoc.pcbytes = cbuf;
// Document signature for up to date checks.
makesig(stp, fileDoc.sig);
#ifdef IDX_THREADS
DbUpdTask *tp = new DbUpdTask(config, parent_udi, cstr_null, fileDoc);
if (m_haveSplitQ) {
DbUpdTask *tp = new DbUpdTask(config, parent_udi, cstr_null,
fileDoc);
if (!m_dwqueue.put(tp))
return FsTreeWalker::FtwError;
#else
else
return FsTreeWalker::FtwOk;
}
#endif
if (!m_db->addOrUpdate(config, parent_udi, cstr_null, fileDoc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
}
return FsTreeWalker::FtwOk;

View file

@ -85,7 +85,7 @@ class FsIndexer : public FsTreeWalkerCB {
TempDir m_tmpdir;
string m_reason;
DbIxStatusUpdater *m_updater;
std::list<std::string> m_tdl;
std::vector<std::string> m_tdl;
FIMissingStore *m_missing;
@ -107,6 +107,8 @@ class FsIndexer : public FsTreeWalkerCB {
int m_loglevel;
WorkQueue<InternfileTask*> m_iwqueue;
WorkQueue<DbUpdTask*> m_dwqueue;
bool m_haveInternQ;
bool m_haveSplitQ;
#endif // IDX_THREADS
bool init();

View file

@ -172,7 +172,7 @@ void *rclMonRcvRun(void *q)
// Get top directories from config
list<string> tdl = lconfig.getTopdirs();
vector<string> tdl = lconfig.getTopdirs();
if (tdl.empty()) {
LOGERR(("rclMonRcvRun:: top directory list (topdirs param.) not"
"found in config or Directory list parse error"));
@ -184,7 +184,7 @@ void *rclMonRcvRun(void *q)
FsTreeWalker walker;
walker.setSkippedPaths(lconfig.getDaemSkippedPaths());
WalkCB walkcb(&lconfig, mon, queue, walker);
for (list<string>::iterator it = tdl.begin(); it != tdl.end(); it++) {
for (vector<string>::iterator it = tdl.begin(); it != tdl.end(); it++) {
lconfig.setKeyDir(*it);
// Adjust the follow symlinks options
bool follow;

View file

@ -127,6 +127,29 @@ static inline string make_parentterm(const string& udi)
return pterm;
}
Db::Native::Native(Db *db)
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue("DbUpd",
m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
m_totalworkns(0LL)
#endif // IDX_THREADS
{
LOGDEB1(("Native::Native: me %p\n", this));
}
Db::Native::~Native()
{
LOGDEB1(("Native::~Native: me %p\n", this));
#ifdef IDX_THREADS
if (m_haveWriteQ) {
void *status = m_wqueue.setTerminateAndWait();
LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
}
#endif // IDX_THREADS
}
#ifdef IDX_THREADS
void *DbUpdWorker(void* vdbp)
{
@ -143,8 +166,14 @@ void *DbUpdWorker(void* vdbp)
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
if (!ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen)) {
bool status;
if (tsk->txtlen == (size_t)-1) {
status = ndbp->m_rcldb->purgeFileWrite(tsk->udi, tsk->uniterm);
} else {
status = ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen);
}
if (!status) {
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
tqp->workerExit();
delete tsk;
@ -153,35 +182,31 @@ void *DbUpdWorker(void* vdbp)
delete tsk;
}
}
#endif // IDX_THREADS
Db::Native::Native(Db *db)
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue("DbUpd", 2), m_totalworkns(0LL)
#endif // IDX_THREADS
void Db::Native::maybeStartThreads()
{
LOGDEB1(("Native::Native: me %p\n", this));
#ifdef IDX_THREADS
m_loglevel = DebugLog::getdbl()->getlevel();
if (!m_wqueue.start(1, DbUpdWorker, this)) {
m_haveWriteQ = false;
const RclConfig *cnf = m_rcldb->m_config;
int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
if (writethreads > 1) {
LOGINFO(("RclDb: write threads count was forced down to 1\n"));
writethreads = 1;
}
if (writeqlen >= 0 && writethreads > 0) {
if (!m_wqueue.start(writethreads, DbUpdWorker, this)) {
LOGERR(("Db::Db: Worker start failed\n"));
return;
}
#endif // IDX_THREADS
m_haveWriteQ = true;
}
LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
m_haveWriteQ, writeqlen, writethreads));
}
Db::Native::~Native()
{
LOGDEB1(("Native::~Native: me %p\n", this));
#ifdef IDX_THREADS
if (m_iswritable) {
void *status = m_wqueue.setTerminateAndWait();
LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
}
#endif // IDX_THREADS
}
/* See comment in class declaration: return all subdocuments of a
* document given by its unique id.
@ -313,11 +338,11 @@ int Db::Native::getPageNumberForPosition(const vector<int>& pbreaks,
bool Db::o_inPlaceReset;
Db::Db(RclConfig *cfp)
: m_ndb(0), m_config(cfp), m_idxAbsTruncLen(250), m_synthAbsLen(250),
m_synthAbsWordCtxLen(4), m_flushMb(-1),
m_curtxtsz(0), m_flushtxtsz(0), m_occtxtsz(0), m_occFirstCheck(1),
m_maxFsOccupPc(0), m_mode(Db::DbRO)
Db::Db(const RclConfig *cfp)
: m_ndb(0), m_config(cfp), m_mode(Db::DbRO), m_curtxtsz(0), m_flushtxtsz(0),
m_occtxtsz(0), m_occFirstCheck(1),
m_idxAbsTruncLen(250), m_synthAbsLen(250), m_synthAbsWordCtxLen(4),
m_flushMb(-1), m_maxFsOccupPc(0)
{
#ifndef RCL_INDEX_STRIPCHARS
if (start_of_field_term.empty()) {
@ -390,6 +415,7 @@ bool Db::open(OpenMode mode, OpenError *error)
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY,
cstr_RCL_IDX_VERSION);
m_ndb->m_iswritable = true;
m_ndb->maybeStartThreads();
// We open a readonly object in all cases (possibly in
// addition to the r/w one) because some operations
// are faster when performed through a Database: no
@ -424,8 +450,8 @@ bool Db::open(OpenMode mode, OpenError *error)
// Check index format version. Must not try to check a just created or
// truncated db
if (mode != DbTrunc && m_ndb->xdb().get_doccount() > 0) {
string version = m_ndb->xdb().get_metadata(cstr_RCL_IDX_VERSION_KEY);
if (mode != DbTrunc && m_ndb->xrdb.get_doccount() > 0) {
string version = m_ndb->xrdb.get_metadata(cstr_RCL_IDX_VERSION_KEY);
if (version.compare(cstr_RCL_IDX_VERSION)) {
m_ndb->m_noversionwrite = true;
LOGERR(("Rcl::Db::open: file index [%s], software [%s]\n",
@ -541,7 +567,7 @@ int Db::termDocCnt(const string& _term)
return 0;
}
XAPTRY(res = m_ndb->xdb().get_termfreq(term), m_ndb->xrdb, m_reason);
XAPTRY(res = m_ndb->xrdb.get_termfreq(term), m_ndb->xrdb, m_reason);
if (!m_reason.empty()) {
LOGERR(("Db::termDocCnt: got error: %s\n", m_reason.c_str()));
@ -1110,16 +1136,20 @@ bool Db::addOrUpdate(RclConfig *config, const string &udi,
newdocument.set_data(record);
#ifdef IDX_THREADS
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
if (m_ndb->m_haveWriteQ) {
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument,
doc.text.length());
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
return false;
}
} else {
return true;
#else
}
}
#endif
return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument,
doc.text.length());
#endif // IDX_THREADS
}
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
@ -1127,7 +1157,13 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
{
#ifdef IDX_THREADS
Chrono chron;
// In the case where there is a separate (single) db update
// thread, we only need to protect the update map update below
// (against interaction with threads calling needUpdate()). Else,
// all threads from above need to synchronize here
PTMutexLocker lock(m_mutex, m_haveWriteQ);
#endif
// Check file system full every mbyte of indexed text. It's a bit wasteful
// to do this after having prepared the document, but it needs to be in
// the single-threaded section.
@ -1155,7 +1191,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
#ifdef IDX_THREADS
// Need to protect against interaction with the up-to-date checks
// which also update the existence map
PTMutexLocker lock(m_rcldb->m_ndb->m_mutex);
PTMutexLocker lock(m_mutex, !m_haveWriteQ);
#endif
if (did < m_rcldb->updated.size()) {
m_rcldb->updated[did] = true;
@ -1191,8 +1227,10 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
#ifdef IDX_THREADS
void Db::waitUpdIdle()
{
if (m_ndb->m_haveWriteQ) {
Chrono chron;
m_ndb->m_wqueue.waitIdle();
// We flush here just for correct measurement of the thread work time
string ermsg;
try {
m_ndb->xwdb.flush();
@ -1203,6 +1241,7 @@ void Db::waitUpdIdle()
m_ndb->m_totalworkns += chron.nanos();
LOGDEB(("Db::waitUpdIdle: total work %lld mS\n",
m_ndb->m_totalworkns/1000000));
}
}
#endif
@ -1243,6 +1282,13 @@ bool Db::needUpdate(const string &udi, const string& sig)
string uniterm = make_uniterm(udi);
string ermsg;
#ifdef IDX_THREADS
// Need to protect against interaction with the doc update/insert
// thread which also updates the existence map, and even multiple
// accesses to the readonly Xapian::Database are not allowed
// anyway
PTMutexLocker lock(m_ndb->m_mutex);
#endif
// We look up the document indexed by the uniterm. This is either
// the actual document file, or, for a multi-document file, the
// pseudo-doc we create to stand for the file itself.
@ -1277,12 +1323,6 @@ bool Db::needUpdate(const string &udi, const string& sig)
// Set the uptodate flag for doc / pseudo doc
if (m_mode != DbRO) {
#ifdef IDX_THREADS
// Need to protect against interaction with the doc
// update/insert thread which also updates the
// existence map
PTMutexLocker lock(m_ndb->m_mutex);
#endif
updated[*docid] = true;
// Set the existence flag for all the subdocs (if any)
@ -1372,7 +1412,13 @@ bool Db::purge()
return false;
#ifdef IDX_THREADS
m_ndb->m_wqueue.waitIdle();
// If we manage our own write queue, make sure it's drained and closed
if (m_ndb->m_haveWriteQ)
m_ndb->m_wqueue.setTerminateAndWait();
// else we need to lock out other top level threads. This is just
// a precaution as they should have been waited for by the top
// level actor at this point
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
#endif // IDX_THREADS
// For xapian versions up to 1.0.1, deleting a non-existant
@ -1390,8 +1436,6 @@ bool Db::purge()
// Walk the document array and delete any xapian document whose
// flag is not set (we did not see its source during indexing).
// Threads: we do not need a mutex here as the indexing threads
// are necessarily done at this point.
int purgecount = 0;
for (Xapian::docid docid = 1; docid < updated.size(); ++docid) {
if (!updated[docid]) {
@ -1436,6 +1480,30 @@ bool Db::purge()
return true;
}
// Test for doc existence.
bool Db::docExists(const string& uniterm)
{
#ifdef IDX_THREADS
// If we're not running our own (single) thread, need to protect
// read db against multiaccess (e.g. from needUpdate(), or this method).
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
#endif
string ermsg;
try {
Xapian::PostingIterator docid = m_ndb->xrdb.postlist_begin(uniterm);
if (docid == m_ndb->xrdb.postlist_end(uniterm)) {
return false;
} else {
return true;
}
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::docExists(%s) %s\n", uniterm.c_str(), ermsg.c_str()));
}
return false;
}
/* Delete document(s) for given unique identifier (doc and descendents) */
bool Db::purgeFile(const string &udi, bool *existed)
{
@ -1443,21 +1511,44 @@ bool Db::purgeFile(const string &udi, bool *existed)
if (m_ndb == 0 || !m_ndb->m_iswritable)
return false;
string uniterm = make_uniterm(udi);
bool exists = docExists(uniterm);
if (existed)
*existed = exists;
if (!exists)
return true;
#ifdef IDX_THREADS
m_ndb->m_wqueue.waitIdle();
if (m_ndb->m_haveWriteQ) {
Xapian::Document xdoc;
DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::purgeFile:Cant queue task\n"));
return false;
} else {
return true;
}
}
#endif
return purgeFileWrite(udi, uniterm);
}
bool Db::purgeFileWrite(const string& udi, const string& uniterm)
{
#if defined(IDX_THREADS)
// If we have a write queue we're called from there, and single threaded, no locking.
// Else need to mutex other threads from above
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ);
#endif // IDX_THREADS
Xapian::WritableDatabase db = m_ndb->xwdb;
string uniterm = make_uniterm(udi);
string ermsg;
try {
Xapian::PostingIterator docid = db.postlist_begin(uniterm);
if (docid == db.postlist_end(uniterm)) {
if (existed)
*existed = false;
return true;
}
*existed = true;
LOGDEB(("purgeFile: delete docid %d\n", *docid));
if (m_flushMb > 0) {
Xapian::termcount trms = m_ndb->xwdb.get_doclength(*docid);
@ -1613,7 +1704,7 @@ bool Db::termMatch(MatchType typ, const string &lang,
{
if (!m_ndb || !m_ndb->m_isopen)
return false;
Xapian::Database xdb = m_ndb->xdb();
Xapian::Database xdb = m_ndb->xrdb;
XAPTRY(res.dbdoccount = xdb.get_doccount();
res.dbavgdoclen = xdb.get_avlength(), xdb, m_reason);
@ -1769,7 +1860,7 @@ TermIter *Db::termWalkOpen()
return 0;
TermIter *tit = new TermIter;
if (tit) {
tit->db = m_ndb->xdb();
tit->db = m_ndb->xrdb;
XAPTRY(tit->it = tit->db.allterms_begin(), tit->db, m_reason);
if (!m_reason.empty()) {
LOGERR(("Db::termWalkOpen: xapian error: %s\n", m_reason.c_str()));
@ -1804,7 +1895,7 @@ bool Db::termExists(const string& word)
if (!m_ndb || !m_ndb->m_isopen)
return 0;
XAPTRY(if (!m_ndb->xdb().term_exists(word)) return false,
XAPTRY(if (!m_ndb->xrdb.term_exists(word)) return false,
m_ndb->xrdb, m_reason);
if (!m_reason.empty()) {

View file

@ -182,7 +182,7 @@ class Db {
friend class Native;
/* General stuff (valid for query or update) ****************************/
Db(RclConfig *cfp);
Db(const RclConfig *cfp);
~Db();
enum OpenMode {DbRO, DbUpd, DbTrunc};
@ -356,7 +356,7 @@ class Db {
bool stemDiffers(const string& lang, const string& term,
const string& base);
RclConfig *getConf() {return m_config;}
const RclConfig *getConf() {return m_config;}
/**
Activate the "in place reset" mode where all documents are
@ -368,15 +368,31 @@ class Db {
/* This has to be public for access by embedded Query::Native */
Native *m_ndb;
bool purgeFileWrite(const string& udi, const string& uniterm);
private:
// Internal form of close, can be called during destruction
bool i_close(bool final);
RclConfig *m_config;
const RclConfig *m_config;
string m_reason; // Error explanation
/* Parameters cached out of the configuration files */
// Xapian directories for additional databases to query
vector<string> m_extraDbs;
OpenMode m_mode;
// File existence vector: this is filled during the indexing pass. Any
// document whose bit is not set at the end is purged
vector<bool> updated;
// Stop terms: those don't get indexed.
StopList m_stops;
// Text bytes indexed since beginning
long long m_curtxtsz;
// Text bytes at last flush
long long m_flushtxtsz;
// Text bytes at last fsoccup check
long long m_occtxtsz;
// First fs occup check ?
int m_occFirstCheck;
/***************
* Parameters cached out of the configuration files. Logically const
* after init */
// This is how long an abstract we keep or build from beginning of
// text when indexing. It only has an influence on the size of the
// db as we are free to shorten it again when displaying
@ -389,32 +405,19 @@ private:
int m_synthAbsWordCtxLen;
// Flush threshold. Megabytes of text indexed before we flush.
int m_flushMb;
// Text bytes indexed since beginning
long long m_curtxtsz;
// Text bytes at last flush
long long m_flushtxtsz;
// Text bytes at last fsoccup check
long long m_occtxtsz;
// First fs occup check ?
int m_occFirstCheck;
// Maximum file system occupation percentage
int m_maxFsOccupPc;
// Database directory
string m_basedir;
// Xapian directories for additional databases to query
vector<string> m_extraDbs;
OpenMode m_mode;
// File existence vector: this is filled during the indexing pass. Any
// document whose bit is not set at the end is purged
vector<bool> updated;
// Stop terms: those don't get indexed.
StopList m_stops;
// When this is set, all documents are considered as needing a reindex.
// This implements an alternative to just erasing the index before
// beginning, with the advantage that, for small index formats updates,
// between releases the index remains available while being recreated.
static bool o_inPlaceReset;
/******* End logical constnesss */
// Internal form of close, can be called during destruction
bool i_close(bool final);
// Reinitialize when adding/removing additional dbs
bool adjustdbs();
bool stemExpand(const string &lang, const string &s,
@ -422,6 +425,7 @@ private:
// Flush when idxflushmb is reached
bool maybeflush(off_t moretext);
bool docExists(const string& uniterm);
/* Copyconst and assignement private and forbidden */
Db(const Db &) {}

View file

@ -36,6 +36,8 @@ namespace Rcl {
class Query;
#ifdef IDX_THREADS
// Task for the index update thread. This can be either and add/update
// or a purge op, in which case the txtlen is (size_t)-1
class DbUpdTask {
public:
DbUpdTask(const string& ud, const string& un, const Xapian::Document &d,
@ -64,6 +66,8 @@ class Db::Native {
int m_loglevel;
PTMutexInit m_mutex;
long long m_totalworkns;
bool m_haveWriteQ;
void maybeStartThreads();
#endif // IDX_THREADS
// Indexing
@ -71,10 +75,6 @@ class Db::Native {
// Querying (active even if the wdb is too)
Xapian::Database xrdb;
// We sometimes go through the wdb for some query ops, don't
// really know if this makes sense
Xapian::Database& xdb() {return m_iswritable ? xwdb : xrdb;}
Native(Db *db);
~Native();

View file

@ -22,7 +22,8 @@
/// A trivial wrapper/helper for pthread mutex locks
/// Init lock. Used as a single PTMutexInit static object.
/// Lock storage with auto-initialization. Must be created before any
/// lock-using thread of course (possibly as a static object).
class PTMutexInit {
public:
pthread_mutex_t m_mutex;
@ -32,15 +33,20 @@ public:
}
};
/// Take the lock when constructed, release when deleted
/// Take the lock when constructed, release when deleted. Can be disabled
/// by constructor params for conditional use.
class PTMutexLocker {
public:
PTMutexLocker(PTMutexInit& l) : m_lock(l)
// The nolock arg enables conditional locking
PTMutexLocker(PTMutexInit& l, bool nolock = false)
: m_lock(l), m_status(-1)
{
if (!nolock)
m_status = pthread_mutex_lock(&m_lock.m_mutex);
}
~PTMutexLocker()
{
if (m_status == 0)
pthread_mutex_unlock(&m_lock.m_mutex);
}
int ok() {return m_status == 0;}

View file

@ -57,7 +57,7 @@ public:
/** Create a WorkQueue
* @param name for message printing
* @param hi number of tasks on queue before clients blocks. Default 0
* meaning no limit.
* meaning no limit. hi == -1 means that the queue is disabled.
* @param lo minimum count of tasks before worker starts. Default 1.
*/
WorkQueue(const string& name, int hi = 0, int lo = 1)
@ -66,14 +66,14 @@ public:
m_clients_waiting(0), m_tottasks(0), m_nowake(0),
m_workersleeps(0)
{
m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) &&
(pthread_cond_init(&m_wcond, 0) == 0) &&
(pthread_mutex_init(&m_mutex, 0) == 0);
}
~WorkQueue()
{
LOGDEB2(("WorkQueue::~WorkQueue: name %s\n", m_name.c_str()));
LOGDEB2(("WorkQueue::~WorkQueue:%s\n", m_name.c_str()));
if (!m_worker_threads.empty())
setTerminateAndWait();
}
@ -88,16 +88,19 @@ public:
*/
bool start(int nworkers, void *(*start_routine)(void *), void *arg)
{
pthread_mutex_lock(&m_mutex);
for (int i = 0; i < nworkers; i++) {
int err;
pthread_t thr;
if ((err = pthread_create(&thr, 0, start_routine, arg))) {
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
m_name.c_str(), err));
pthread_mutex_unlock(&m_mutex);
return false;
}
m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
}
pthread_mutex_unlock(&m_mutex);
return true;
}
@ -107,8 +110,9 @@ public:
*/
bool put(T t)
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0) {
LOGERR(("WorkQueue::put: !ok or mutex_lock failed\n"));
if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
m_name.c_str()));
return false;
}
@ -141,12 +145,15 @@ public:
* back sleeping. Used by the client to wait for all current work
* to be completed, when it needs to perform work that couldn't be
* done in parallel with the worker's tasks, or before shutting
* down. Work can be resumed after calling this.
* down. Work can be resumed after calling this. Note that the only thread
* which can call it safely is the client just above (which can
control the task flow), else there could be
* tasks in the intermediate queues.
*/
bool waitIdle()
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0) {
LOGERR(("WorkQueue::waitIdle: %s not ok or can't lock\n",
if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
m_name.c_str()));
return false;
}
@ -159,7 +166,8 @@ public:
if (pthread_cond_wait(&m_ccond, &m_mutex)) {
m_clients_waiting--;
m_ok = false;
LOGERR(("WorkQueue::waitIdle: cond_wait failed\n"));
LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n",
m_name.c_str()));
pthread_mutex_unlock(&m_mutex);
return false;
}
@ -192,7 +200,8 @@ public:
m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex)) {
pthread_mutex_unlock(&m_mutex);
LOGERR(("WorkQueue::setTerminate: cond_wait failed\n"));
LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n",
m_name.c_str()));
m_clients_waiting--;
return (void*)0;
}
@ -225,8 +234,10 @@ public:
*/
bool take(T* tp, size_t *szp = 0)
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str()));
return false;
}
while (ok() && m_queue.size() < m_low) {
m_workersleeps++;
@ -269,6 +280,7 @@ public:
*/
void workerExit()
{
LOGDEB(("workerExit:%s\n", m_name.c_str()));
if (pthread_mutex_lock(&m_mutex) != 0)
return;
m_workers_exited++;
@ -288,7 +300,13 @@ public:
private:
bool ok()
{
return m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
if (!isok) {
LOGDEB(("WorkQueue:ok:%s: not ok m_ok %d m_workers_exited %d "
"m_worker_threads size %d\n", m_name.c_str(),
m_ok, m_workers_exited, int(m_worker_threads.size())));
}
return isok;
}
long long nanodiff(const struct timespec& older,