From 4d6cdc7e616e9c22dc7edd66ab8a5a72ab5c039b Mon Sep 17 00:00:00 2001 From: Jean-Francois Dockes Date: Mon, 22 Apr 2013 11:32:49 +0200 Subject: [PATCH] added code to purge obsolete messages when a compound document (esp. mbox) is shortened and a partial update is performed (no general purge). Else the orphan docs remained in the index potentially forever (needed actual reindex of the file by a full pass to go away) --- src/index/fsindexer.cpp | 102 ++++++++++++++------- src/index/fsindexer.h | 36 ++++++++ src/rcldb/rcldb.cpp | 134 +++++++++++++++++++++------- src/rcldb/rcldb.h | 7 +- src/rcldb/rcldb_p.h | 27 ++++-- tests/partialpurge/partialpurge.sh | 41 +++++++++ tests/partialpurge/partialpurge.txt | 18 ++++ 7 files changed, 291 insertions(+), 74 deletions(-) create mode 100755 tests/partialpurge/partialpurge.sh create mode 100644 tests/partialpurge/partialpurge.txt diff --git a/src/index/fsindexer.cpp b/src/index/fsindexer.cpp index 3c91e449..e880e5be 100644 --- a/src/index/fsindexer.cpp +++ b/src/index/fsindexer.cpp @@ -313,7 +313,9 @@ bool FsIndexer::indexFiles(list& files, ConfIndexer::IxFlag flag) int abslen; if (m_config->getConfParam("idxabsmlen", &abslen)) m_db->setAbstractParams(abslen, -1, -1); - + + m_purgeCandidates.setRecord(true); + // We use an FsTreeWalker just for handling the skipped path/name lists FsTreeWalker walker; walker.setSkippedPaths(m_config->getSkippedPaths()); @@ -365,6 +367,21 @@ out: m_dwqueue.waitIdle(); m_db->waitUpdIdle(); #endif // IDX_THREADS + + // Purge possible orphan documents + if (ret == true) { + LOGDEB(("Indexfiles: purging orphans\n")); + const vector& purgecandidates = m_purgeCandidates.getCandidates(); + for (vector::const_iterator it = purgecandidates.begin(); + it != purgecandidates.end(); it++) { + LOGDEB(("Indexfiles: purging orphans for %s\n", it->c_str())); + m_db->purgeOrphans(*it); + } +#ifdef IDX_THREADS + m_db->waitUpdIdle(); +#endif // IDX_THREADS + } + LOGDEB(("FsIndexer::indexFiles: done\n")); return ret; } @@ -622,6 +639,27 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp, return processonefile(m_config, fn, stp, m_localfields, m_mdreapers); } +// File name transcoded to utf8 for indexing. If this fails, the file +// name won't be indexed, no big deal Note that we used to do the full +// path here, but I ended up believing that it made more sense to use +// only the file name The charset is used is the one from the locale. +static string compute_utf8fn(RclConfig *config, const string& fn) +{ + string charset = config->getDefCharset(true); + string utf8fn; + int ercnt; + if (!transcode(path_getsimple(fn), utf8fn, charset, "UTF-8", &ercnt)) { + LOGERR(("processone: fn transcode failure from [%s] to UTF-8: %s\n", + charset.c_str(), path_getsimple(fn).c_str())); + } else if (ercnt) { + LOGDEB(("processone: fn transcode %d errors from [%s] to UTF-8: %s\n", + ercnt, charset.c_str(), path_getsimple(fn).c_str())); + } + LOGDEB2(("processone: fn transcoded from [%s] to [%s] (%s->%s)\n", + path_getsimple(fn).c_str(), utf8fn.c_str(), charset.c_str(), + "UTF-8")); + return utf8fn; +} FsTreeWalker::Status FsIndexer::processonefile(RclConfig *config, @@ -644,7 +682,8 @@ FsIndexer::processonefile(RclConfig *config, makesig(stp, sig); string udi; make_udi(fn, cstr_null, udi); - bool needupdate = m_db->needUpdate(udi, sig); + bool existingDoc; + bool needupdate = m_db->needUpdate(udi, sig, &existingDoc); if (!needupdate) { LOGDEB0(("processone: up to date: %s\n", fn.c_str())); @@ -673,32 +712,19 @@ FsIndexer::processonefile(RclConfig *config, } interner.setMissingStore(m_missing); - // File name transcoded to utf8 for indexing. - // If this fails, the file name won't be indexed, no big deal - // Note that we used to do the full path here, but I ended up believing - // that it made more sense to use only the file name - // The charset is used is the one from the locale. - string charset = config->getDefCharset(true); - string utf8fn; int ercnt; - if (!transcode(path_getsimple(fn), utf8fn, charset, "UTF-8", &ercnt)) { - LOGERR(("processone: fn transcode failure from [%s] to UTF-8: %s\n", - charset.c_str(), path_getsimple(fn).c_str())); - } else if (ercnt) { - LOGDEB(("processone: fn transcode %d errors from [%s] to UTF-8: %s\n", - ercnt, charset.c_str(), path_getsimple(fn).c_str())); - } - LOGDEB2(("processone: fn transcoded from [%s] to [%s] (%s->%s)\n", - path_getsimple(fn).c_str(), utf8fn.c_str(), charset.c_str(), - "UTF-8")); + string utf8fn = compute_utf8fn(config, fn); + + // parent_udi is initially the same as udi, it will be used if there + // are subdocs. + string parent_udi = udi; - string parent_udi; - make_udi(fn, cstr_null, parent_udi); Rcl::Doc doc; char ascdate[30]; sprintf(ascdate, "%ld", long(stp->st_mtime)); FileInterner::Status fis = FileInterner::FIAgain; bool hadNullIpath = false; + bool hadNonNullIpath = false; while (fis == FileInterner::FIAgain) { doc.erase(); try { @@ -708,7 +734,7 @@ FsIndexer::processonefile(RclConfig *config, return FsTreeWalker::FtwStop; } - // Index at least the file name even if there was an error. + // We index at least the file name even if there was an error. // We'll change the signature to ensure that the indexing will // be retried every time. @@ -718,7 +744,10 @@ FsIndexer::processonefile(RclConfig *config, hadNullIpath = true; if (m_havemdreapers) reapmetadata(mdreapers, fn, doc); - } + } else { + hadNonNullIpath = true; + make_udi(fn, doc.ipath, udi); + } // Set file name, mod time and url if not done by filter if (doc.fmtime.empty()) @@ -732,11 +761,9 @@ FsIndexer::processonefile(RclConfig *config, char cbuf[100]; sprintf(cbuf, OFFTPC, stp->st_size); doc.pcbytes = cbuf; - // Document signature for up to date checks: concatenate - // m/ctime and size. Looking for changes only, no need to - // parseback so no need for reversible formatting. Also set, - // but never used, for subdocs. - makesig(stp, doc.sig); + // Document signature for up to date checks. All subdocs inherit the + // file's. + doc.sig = sig; // If there was an error, ensure indexing will be // retried. This is for the once missing, later installed @@ -750,14 +777,13 @@ FsIndexer::processonefile(RclConfig *config, // Possibly add fields from local config if (m_havelocalfields) setlocalfields(localfields, doc); + // Add document to database. If there is an ipath, add it as a children // of the file document. - string udi; - make_udi(fn, doc.ipath, udi); - #ifdef IDX_THREADS if (m_haveSplitQ) { - DbUpdTask *tp = new DbUpdTask(udi, doc.ipath.empty() ? cstr_null : parent_udi, doc); + DbUpdTask *tp = new DbUpdTask(udi, doc.ipath.empty() ? + cstr_null : parent_udi, doc); if (!m_dwqueue.put(tp)) { LOGERR(("processonefile: wqueue.put failed\n")); return FsTreeWalker::FtwError; @@ -789,6 +815,15 @@ FsIndexer::processonefile(RclConfig *config, } } + // If this doc existed and it's a container, recording for + // possible subdoc purge (this will be used only if we don't do a + // db-wide purge, e.g. if we're called from indexfiles()). + LOGDEB2(("processOnefile: existingDoc %d hadNonNullIpath %d\n", + existingDoc, hadNonNullIpath)); + if (existingDoc && hadNonNullIpath) { + m_purgeCandidates.record(parent_udi); + } + // If we had no instance with a null ipath, we create an empty // document to stand for the file itself, to be used mainly for up // to date checks. Typically this happens for an mbox file. @@ -806,8 +841,7 @@ FsIndexer::processonefile(RclConfig *config, char cbuf[100]; sprintf(cbuf, OFFTPC, stp->st_size); fileDoc.pcbytes = cbuf; - // Document signature for up to date checks. - makesig(stp, fileDoc.sig); + fileDoc.sig = sig; #ifdef IDX_THREADS if (m_haveSplitQ) { diff --git a/src/index/fsindexer.h b/src/index/fsindexer.h index a65630d0..0269152e 100644 --- a/src/index/fsindexer.h +++ b/src/index/fsindexer.h @@ -83,13 +83,49 @@ class FsIndexer : public FsTreeWalkerCB { }; private: + + class PurgeCandidateRecorder { + public: + PurgeCandidateRecorder() + : dorecord(false) {} + void setRecord(bool onoff) + { + dorecord = onoff; + } + void record(const string& udi) + { + // This test does not need to be protected: the value is set at + // init and never changed. + if (!dorecord) + return; +#ifdef IDX_THREADS + PTMutexLocker locker(mutex); +#endif + udis.push_back(udi); + } + const vector& getCandidates() + { + return udis; + } + private: +#ifdef IDX_THREADS + PTMutexInit mutex; +#endif + bool dorecord; + std::vector udis; + }; + FsTreeWalker m_walker; RclConfig *m_config; Rcl::Db *m_db; string m_reason; DbIxStatusUpdater *m_updater; + // Top/start directories list std::vector m_tdl; + // Store for missing filters and associated mime types FIMissingStore *m_missing; + // Recorder for files that may need subdoc purging. + PurgeCandidateRecorder m_purgeCandidates; // The configuration can set attribute fields to be inherited by // all files in a file system area. Ie: set "rclaptg = thunderbird" diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index cbc2573b..d7656c5c 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -149,23 +149,34 @@ void *DbUpdWorker(void* vdbp) WorkQueue *tqp = &(ndbp->m_wqueue); DebugLog::getdbl()->setloglevel(ndbp->m_loglevel); - DbUpdTask *tsk; + DbUpdTask *tsk = 0; for (;;) { - size_t qsz; + size_t qsz = -1; if (!tqp->take(&tsk, &qsz)) { tqp->workerExit(); return (void*)1; } - LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz))); - bool status; - if (tsk->txtlen == (size_t)-1) { - status = ndbp->m_rcldb->purgeFileWrite(tsk->udi, tsk->uniterm); - } else { + bool status = false; + switch (tsk->op) { + case DbUpdTask::AddOrUpdate: + LOGDEB(("DbUpdWorker: got add/update task, ql %d\n", int(qsz))); status = ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm, tsk->doc, tsk->txtlen); + break; + case DbUpdTask::Delete: + LOGDEB(("DbUpdWorker: got delete task, ql %d\n", int(qsz))); + status = ndbp->purgeFileWrite(false, tsk->udi, tsk->uniterm); + break; + case DbUpdTask::PurgeOrphans: + LOGDEB(("DbUpdWorker: got orphans purge task, ql %d\n", int(qsz))); + status = ndbp->purgeFileWrite(true, tsk->udi, tsk->uniterm); + break; + default: + LOGERR(("DbUpdWorker: unknown op %d !!\n", tsk->op)); + break; } if (!status) { - LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n")); + LOGERR(("DbUpdWorker: xxWrite failed\n")); tqp->workerExit(); delete tsk; return (void*)0; @@ -1151,8 +1162,8 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc) #ifdef IDX_THREADS if (m_ndb->m_havewriteq) { - DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, - doc.text.length()); + DbUpdTask *tp = new DbUpdTask(DbUpdTask::AddOrUpdate, udi, uniterm, + newdocument, doc.text.length()); if (!m_ndb->m_wqueue.put(tp)) { LOGERR(("Db::addOrUpdate:Cant queue task\n")); return false; @@ -1292,7 +1303,7 @@ bool Db::doFlush() } // Test if doc given by udi has changed since last indexed (test sigs) -bool Db::needUpdate(const string &udi, const string& sig) +bool Db::needUpdate(const string &udi, const string& sig, bool *existed) { if (m_ndb == 0) return false; @@ -1300,8 +1311,12 @@ bool Db::needUpdate(const string &udi, const string& sig) // If we are doing an in place or full reset, no need to // test. Note that there is no need to update the existence map // either, it will be done when updating the index - if (o_inPlaceReset || m_mode == DbTrunc) + if (o_inPlaceReset || m_mode == DbTrunc) { + // For in place reset, pretend the doc existed, to enable subdoc purge + if (existed) + *existed = o_inPlaceReset; return true; + } string uniterm = make_uniterm(udi); string ermsg; @@ -1325,9 +1340,13 @@ bool Db::needUpdate(const string &udi, const string& sig) if (docid == m_ndb->xrdb.postlist_end(uniterm)) { // If no document exist with this path, we do need update LOGDEB(("Db::needUpdate:yes (new): [%s]\n", uniterm.c_str())); + if (existed) + *existed = false; return true; } Xapian::Document doc = m_ndb->xrdb.get_document(*docid); + if (existed) + *existed = true; // Retrieve old file/doc signature from value string osig = doc.get_value(VALUE_SIG); @@ -1542,8 +1561,8 @@ bool Db::purgeFile(const string &udi, bool *existed) #ifdef IDX_THREADS if (m_ndb->m_havewriteq) { - Xapian::Document xdoc; - DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1); + DbUpdTask *tp = new DbUpdTask(DbUpdTask::Delete, udi, uniterm, + Xapian::Document(), (size_t)-1); if (!m_ndb->m_wqueue.put(tp)) { LOGERR(("Db::purgeFile:Cant queue task\n")); return false; @@ -1552,49 +1571,98 @@ bool Db::purgeFile(const string &udi, bool *existed) } } #endif - - return purgeFileWrite(udi, uniterm); + /* We get there is IDX_THREADS is not defined or there is no queue */ + return m_ndb->purgeFileWrite(false, udi, uniterm); } -bool Db::purgeFileWrite(const string& udi, const string& uniterm) +/* Delete subdocs with an out of date sig. We do this to purge + obsolete subdocs during a partial update where no general purge + will be done */ +bool Db::purgeOrphans(const string &udi) +{ + LOGDEB(("Db:purgeOrphans: [%s]\n", udi.c_str())); + if (m_ndb == 0 || !m_ndb->m_iswritable) + return false; + + string uniterm = make_uniterm(udi); + +#ifdef IDX_THREADS + if (m_ndb->m_havewriteq) { + DbUpdTask *tp = new DbUpdTask(DbUpdTask::PurgeOrphans, udi, uniterm, + Xapian::Document(), (size_t)-1); + if (!m_ndb->m_wqueue.put(tp)) { + LOGERR(("Db::purgeFile:Cant queue task\n")); + return false; + } else { + return true; + } + } +#endif + /* We get there is IDX_THREADS is not defined or there is no queue */ + return m_ndb->purgeFileWrite(true, udi, uniterm); +} + +bool Db::Native::purgeFileWrite(bool orphansOnly, const string& udi, + const string& uniterm) { #if defined(IDX_THREADS) // We need a mutex even if we have a write queue (so we can only // be called by a single thread) to protect about multiple acces // to xrdb from subDocs() which is also called from needupdate() // (called from outside the write thread ! - PTMutexLocker lock(m_ndb->m_mutex); + PTMutexLocker lock(m_mutex); #endif // IDX_THREADS - Xapian::WritableDatabase db = m_ndb->xwdb; string ermsg; try { - Xapian::PostingIterator docid = db.postlist_begin(uniterm); - if (docid == db.postlist_end(uniterm)) { + Xapian::PostingIterator docid = xwdb.postlist_begin(uniterm); + if (docid == xwdb.postlist_end(uniterm)) { return true; } - LOGDEB(("purgeFile: delete docid %d\n", *docid)); - if (m_flushMb > 0) { - Xapian::termcount trms = m_ndb->xwdb.get_doclength(*docid); - maybeflush(trms * 5); + if (m_rcldb->m_flushMb > 0) { + Xapian::termcount trms = xwdb.get_doclength(*docid); + m_rcldb->maybeflush(trms * 5); + } + string sig; + if (orphansOnly) { + Xapian::Document doc = xwdb.get_document(*docid); + sig = doc.get_value(VALUE_SIG); + if (sig.empty()) { + LOGINFO(("purgeFileWrite: got empty sig\n")); + return false; + } + } else { + LOGDEB(("purgeFile: delete docid %d\n", *docid)); + xwdb.delete_document(*docid); } - db.delete_document(*docid); vector docids; - m_ndb->subDocs(udi, docids); + subDocs(udi, docids); LOGDEB(("purgeFile: subdocs cnt %d\n", docids.size())); for (vector::iterator it = docids.begin(); it != docids.end(); it++) { - LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it)); - if (m_flushMb > 0) { - Xapian::termcount trms = m_ndb->xwdb.get_doclength(*it); - maybeflush(trms * 5); + if (m_rcldb->m_flushMb > 0) { + Xapian::termcount trms = xwdb.get_doclength(*it); + m_rcldb->maybeflush(trms * 5); + } + string subdocsig; + if (orphansOnly) { + Xapian::Document doc = xwdb.get_document(*it); + subdocsig = doc.get_value(VALUE_SIG); + if (subdocsig.empty()) { + LOGINFO(("purgeFileWrite: got empty sig for subdoc??\n")); + continue; + } + } + + if (!orphansOnly || sig != subdocsig) { + LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it)); + xwdb.delete_document(*it); } - db.delete_document(*it); } return true; } XCATCHERROR(ermsg); if (!ermsg.empty()) { - LOGERR(("Db::purgeFile: %s\n", ermsg.c_str())); + LOGERR(("Db::purgeFileWrite: %s\n", ermsg.c_str())); } return false; } diff --git a/src/rcldb/rcldb.h b/src/rcldb/rcldb.h index cdf8b9e6..2d105f7c 100644 --- a/src/rcldb/rcldb.h +++ b/src/rcldb/rcldb.h @@ -232,7 +232,7 @@ class Db { * Side-effect: set the existence flag for the file document * and all subdocs if any (for later use by 'purge()') */ - bool needUpdate(const string &udi, const string& sig); + bool needUpdate(const string &udi, const string& sig, bool *existed=0); /** Add or update document identified by unique identifier. * @param config Config object to use. Can be the same as the member config @@ -260,6 +260,10 @@ class Db { /** Delete document(s) for given UDI, including subdocs */ bool purgeFile(const string &udi, bool *existed = 0); + /** Delete subdocs with an out of date sig. We do this to purge + obsolete subdocs during a partial update where no general purge + will be done */ + bool purgeOrphans(const string &udi); /** Remove documents that no longer exist in the file system. This * depends on the update map, which is built during @@ -442,7 +446,6 @@ private: #ifdef IDX_THREADS friend void *DbUpdWorker(void*); #endif // IDX_THREADS - bool purgeFileWrite(const string& udi, const string& uniterm); // Internal form of close, can be called during destruction bool i_close(bool final); diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h index 94c38ca5..b0ee3cd1 100644 --- a/src/rcldb/rcldb_p.h +++ b/src/rcldb/rcldb_p.h @@ -36,17 +36,32 @@ namespace Rcl { class Query; #ifdef IDX_THREADS -// Task for the index update thread. This can be either and add/update -// or a purge op, in which case the txtlen is (size_t)-1 +// Task for the index update thread. This can be +// - add/update for a new / update documment +// - delete for a deleted document +// - purgeOrphans when a multidoc file is updated during a partial pass (no +// general purge). We want to remove subDocs that possibly don't +// exist anymore. We find them by their different sig +// txtlen and doc are only valid for add/update else, len is (size_t)-1 and doc +// is empty class DbUpdTask { public: - DbUpdTask(const string& ud, const string& un, const Xapian::Document &d, - size_t tl) - : udi(ud), uniterm(un), doc(d), txtlen(tl) + enum Op {AddOrUpdate, Delete, PurgeOrphans}; + // Note that udi and uniterm are strictly equivalent and are + // passed both just to avoid recomputing uniterm which is + // available on the caller site. + DbUpdTask(Op _op, const string& ud, const string& un, + const Xapian::Document &d, size_t tl) + : op(_op), udi(ud), uniterm(un), doc(d), txtlen(tl) {} + // Udi and uniterm equivalently designate the doc + Op op; string udi; string uniterm; Xapian::Document doc; + // txtlen is used to update the flush interval. It's -1 for a + // purge because we actually don't know it, and the code fakes a + // text length based on the term count. size_t txtlen; }; #endif // IDX_THREADS @@ -86,6 +101,8 @@ class Db::Native { bool addOrUpdateWrite(const string& udi, const string& uniterm, Xapian::Document& doc, size_t txtlen); + bool purgeFileWrite(bool onlyOrphans, const string& udi, + const string& uniterm); bool getPagePositions(Xapian::docid docid, vector& vpos); int getPageNumberForPosition(const vector& pbreaks, unsigned int pos); diff --git a/tests/partialpurge/partialpurge.sh b/tests/partialpurge/partialpurge.sh new file mode 100755 index 00000000..49111c70 --- /dev/null +++ b/tests/partialpurge/partialpurge.sh @@ -0,0 +1,41 @@ +#!/bin/sh + +topdir=`dirname $0`/.. +. $topdir/shared.sh + +initvariables $0 + +d=${tstdata}/partialpurge/ + +# Check that partial purge works: the message orphaned by shortening +# the mbox should not exist in the index any more +( + cp $d/longmbox $d/testmbox + recollindex -Zi $d/testmbox + + echo Should have 2 results: testmbox and longmbox: + recollq -q deletedmessageuniqueterm + + echo + echo Changing file and reindexing + cp $d/shortmbox $d/testmbox + recollindex -Zi $d/testmbox + + echo Should have 1 result: longmbox: + recollq -q deletedmessageuniqueterm + + echo + echo Purging whole test file + recollindex -e $d/testmbox + + echo Should have 1 result: longmbox: + recollq -q deletedmessageuniqueterm + + echo Should have 2 results: longmbox shortmbox: + recollq -q stablemessageuniqueterm + +) 2> $mystderr | egrep -v '^Recoll query: ' > $mystdout + +diff -w ${myname}.txt $mystdout > $mydiffs 2>&1 + +checkresult diff --git a/tests/partialpurge/partialpurge.txt b/tests/partialpurge/partialpurge.txt new file mode 100644 index 00000000..b73bd045 --- /dev/null +++ b/tests/partialpurge/partialpurge.txt @@ -0,0 +1,18 @@ +Should have 2 results: testmbox and longmbox: +2 results +message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email goes: deletedmessageuniqueterm] 755 bytes +message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/testmbox] [This email goes: deletedmessageuniqueterm] 755 bytes + +Changing file and reindexing +Should have 1 result: longmbox: +1 results +message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email goes: deletedmessageuniqueterm] 755 bytes + +Purging whole test file +Should have 1 result: longmbox: +1 results +message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email goes: deletedmessageuniqueterm] 755 bytes +Should have 2 results: longmbox shortmbox: +2 results +message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email remains: stablemessageuniqueterm] 759 bytes +message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/shortmbox] [This email remains: stablemessageuniqueterm] 1173 bytes