added code to purge obsolete messages when a compound document (esp. mbox) is shortened and a partial update is performed (no general purge). Else the orphan docs remained in the index potentially forever (needed actual reindex of the file by a full pass to go away)

This commit is contained in:
Jean-Francois Dockes 2013-04-22 11:32:49 +02:00
parent 39e2db774a
commit 4d6cdc7e61
7 changed files with 291 additions and 74 deletions

View file

@ -314,6 +314,8 @@ bool FsIndexer::indexFiles(list<string>& files, ConfIndexer::IxFlag flag)
if (m_config->getConfParam("idxabsmlen", &abslen))
m_db->setAbstractParams(abslen, -1, -1);
m_purgeCandidates.setRecord(true);
// We use an FsTreeWalker just for handling the skipped path/name lists
FsTreeWalker walker;
walker.setSkippedPaths(m_config->getSkippedPaths());
@ -365,6 +367,21 @@ out:
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
// Purge possible orphan documents
if (ret == true) {
LOGDEB(("Indexfiles: purging orphans\n"));
const vector<string>& purgecandidates = m_purgeCandidates.getCandidates();
for (vector<string>::const_iterator it = purgecandidates.begin();
it != purgecandidates.end(); it++) {
LOGDEB(("Indexfiles: purging orphans for %s\n", it->c_str()));
m_db->purgeOrphans(*it);
}
#ifdef IDX_THREADS
m_db->waitUpdIdle();
#endif // IDX_THREADS
}
LOGDEB(("FsIndexer::indexFiles: done\n"));
return ret;
}
@ -622,6 +639,27 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
return processonefile(m_config, fn, stp, m_localfields, m_mdreapers);
}
// File name transcoded to utf8 for indexing. If this fails, the file
// name won't be indexed, no big deal Note that we used to do the full
// path here, but I ended up believing that it made more sense to use
// only the file name The charset is used is the one from the locale.
static string compute_utf8fn(RclConfig *config, const string& fn)
{
string charset = config->getDefCharset(true);
string utf8fn;
int ercnt;
if (!transcode(path_getsimple(fn), utf8fn, charset, "UTF-8", &ercnt)) {
LOGERR(("processone: fn transcode failure from [%s] to UTF-8: %s\n",
charset.c_str(), path_getsimple(fn).c_str()));
} else if (ercnt) {
LOGDEB(("processone: fn transcode %d errors from [%s] to UTF-8: %s\n",
ercnt, charset.c_str(), path_getsimple(fn).c_str()));
}
LOGDEB2(("processone: fn transcoded from [%s] to [%s] (%s->%s)\n",
path_getsimple(fn).c_str(), utf8fn.c_str(), charset.c_str(),
"UTF-8"));
return utf8fn;
}
FsTreeWalker::Status
FsIndexer::processonefile(RclConfig *config,
@ -644,7 +682,8 @@ FsIndexer::processonefile(RclConfig *config,
makesig(stp, sig);
string udi;
make_udi(fn, cstr_null, udi);
bool needupdate = m_db->needUpdate(udi, sig);
bool existingDoc;
bool needupdate = m_db->needUpdate(udi, sig, &existingDoc);
if (!needupdate) {
LOGDEB0(("processone: up to date: %s\n", fn.c_str()));
@ -673,32 +712,19 @@ FsIndexer::processonefile(RclConfig *config,
}
interner.setMissingStore(m_missing);
// File name transcoded to utf8 for indexing.
// If this fails, the file name won't be indexed, no big deal
// Note that we used to do the full path here, but I ended up believing
// that it made more sense to use only the file name
// The charset is used is the one from the locale.
string charset = config->getDefCharset(true);
string utf8fn; int ercnt;
if (!transcode(path_getsimple(fn), utf8fn, charset, "UTF-8", &ercnt)) {
LOGERR(("processone: fn transcode failure from [%s] to UTF-8: %s\n",
charset.c_str(), path_getsimple(fn).c_str()));
} else if (ercnt) {
LOGDEB(("processone: fn transcode %d errors from [%s] to UTF-8: %s\n",
ercnt, charset.c_str(), path_getsimple(fn).c_str()));
}
LOGDEB2(("processone: fn transcoded from [%s] to [%s] (%s->%s)\n",
path_getsimple(fn).c_str(), utf8fn.c_str(), charset.c_str(),
"UTF-8"));
string utf8fn = compute_utf8fn(config, fn);
// parent_udi is initially the same as udi, it will be used if there
// are subdocs.
string parent_udi = udi;
string parent_udi;
make_udi(fn, cstr_null, parent_udi);
Rcl::Doc doc;
char ascdate[30];
sprintf(ascdate, "%ld", long(stp->st_mtime));
FileInterner::Status fis = FileInterner::FIAgain;
bool hadNullIpath = false;
bool hadNonNullIpath = false;
while (fis == FileInterner::FIAgain) {
doc.erase();
try {
@ -708,7 +734,7 @@ FsIndexer::processonefile(RclConfig *config,
return FsTreeWalker::FtwStop;
}
// Index at least the file name even if there was an error.
// We index at least the file name even if there was an error.
// We'll change the signature to ensure that the indexing will
// be retried every time.
@ -718,6 +744,9 @@ FsIndexer::processonefile(RclConfig *config,
hadNullIpath = true;
if (m_havemdreapers)
reapmetadata(mdreapers, fn, doc);
} else {
hadNonNullIpath = true;
make_udi(fn, doc.ipath, udi);
}
// Set file name, mod time and url if not done by filter
@ -732,11 +761,9 @@ FsIndexer::processonefile(RclConfig *config,
char cbuf[100];
sprintf(cbuf, OFFTPC, stp->st_size);
doc.pcbytes = cbuf;
// Document signature for up to date checks: concatenate
// m/ctime and size. Looking for changes only, no need to
// parseback so no need for reversible formatting. Also set,
// but never used, for subdocs.
makesig(stp, doc.sig);
// Document signature for up to date checks. All subdocs inherit the
// file's.
doc.sig = sig;
// If there was an error, ensure indexing will be
// retried. This is for the once missing, later installed
@ -750,14 +777,13 @@ FsIndexer::processonefile(RclConfig *config,
// Possibly add fields from local config
if (m_havelocalfields)
setlocalfields(localfields, doc);
// Add document to database. If there is an ipath, add it as a children
// of the file document.
string udi;
make_udi(fn, doc.ipath, udi);
#ifdef IDX_THREADS
if (m_haveSplitQ) {
DbUpdTask *tp = new DbUpdTask(udi, doc.ipath.empty() ? cstr_null : parent_udi, doc);
DbUpdTask *tp = new DbUpdTask(udi, doc.ipath.empty() ?
cstr_null : parent_udi, doc);
if (!m_dwqueue.put(tp)) {
LOGERR(("processonefile: wqueue.put failed\n"));
return FsTreeWalker::FtwError;
@ -789,6 +815,15 @@ FsIndexer::processonefile(RclConfig *config,
}
}
// If this doc existed and it's a container, recording for
// possible subdoc purge (this will be used only if we don't do a
// db-wide purge, e.g. if we're called from indexfiles()).
LOGDEB2(("processOnefile: existingDoc %d hadNonNullIpath %d\n",
existingDoc, hadNonNullIpath));
if (existingDoc && hadNonNullIpath) {
m_purgeCandidates.record(parent_udi);
}
// If we had no instance with a null ipath, we create an empty
// document to stand for the file itself, to be used mainly for up
// to date checks. Typically this happens for an mbox file.
@ -806,8 +841,7 @@ FsIndexer::processonefile(RclConfig *config,
char cbuf[100];
sprintf(cbuf, OFFTPC, stp->st_size);
fileDoc.pcbytes = cbuf;
// Document signature for up to date checks.
makesig(stp, fileDoc.sig);
fileDoc.sig = sig;
#ifdef IDX_THREADS
if (m_haveSplitQ) {

View file

@ -83,13 +83,49 @@ class FsIndexer : public FsTreeWalkerCB {
};
private:
class PurgeCandidateRecorder {
public:
PurgeCandidateRecorder()
: dorecord(false) {}
void setRecord(bool onoff)
{
dorecord = onoff;
}
void record(const string& udi)
{
// This test does not need to be protected: the value is set at
// init and never changed.
if (!dorecord)
return;
#ifdef IDX_THREADS
PTMutexLocker locker(mutex);
#endif
udis.push_back(udi);
}
const vector<string>& getCandidates()
{
return udis;
}
private:
#ifdef IDX_THREADS
PTMutexInit mutex;
#endif
bool dorecord;
std::vector<std::string> udis;
};
FsTreeWalker m_walker;
RclConfig *m_config;
Rcl::Db *m_db;
string m_reason;
DbIxStatusUpdater *m_updater;
// Top/start directories list
std::vector<std::string> m_tdl;
// Store for missing filters and associated mime types
FIMissingStore *m_missing;
// Recorder for files that may need subdoc purging.
PurgeCandidateRecorder m_purgeCandidates;
// The configuration can set attribute fields to be inherited by
// all files in a file system area. Ie: set "rclaptg = thunderbird"

View file

@ -149,23 +149,34 @@ void *DbUpdWorker(void* vdbp)
WorkQueue<DbUpdTask*> *tqp = &(ndbp->m_wqueue);
DebugLog::getdbl()->setloglevel(ndbp->m_loglevel);
DbUpdTask *tsk;
DbUpdTask *tsk = 0;
for (;;) {
size_t qsz;
size_t qsz = -1;
if (!tqp->take(&tsk, &qsz)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
bool status;
if (tsk->txtlen == (size_t)-1) {
status = ndbp->m_rcldb->purgeFileWrite(tsk->udi, tsk->uniterm);
} else {
bool status = false;
switch (tsk->op) {
case DbUpdTask::AddOrUpdate:
LOGDEB(("DbUpdWorker: got add/update task, ql %d\n", int(qsz)));
status = ndbp->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen);
break;
case DbUpdTask::Delete:
LOGDEB(("DbUpdWorker: got delete task, ql %d\n", int(qsz)));
status = ndbp->purgeFileWrite(false, tsk->udi, tsk->uniterm);
break;
case DbUpdTask::PurgeOrphans:
LOGDEB(("DbUpdWorker: got orphans purge task, ql %d\n", int(qsz)));
status = ndbp->purgeFileWrite(true, tsk->udi, tsk->uniterm);
break;
default:
LOGERR(("DbUpdWorker: unknown op %d !!\n", tsk->op));
break;
}
if (!status) {
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
LOGERR(("DbUpdWorker: xxWrite failed\n"));
tqp->workerExit();
delete tsk;
return (void*)0;
@ -1151,8 +1162,8 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument,
doc.text.length());
DbUpdTask *tp = new DbUpdTask(DbUpdTask::AddOrUpdate, udi, uniterm,
newdocument, doc.text.length());
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
return false;
@ -1292,7 +1303,7 @@ bool Db::doFlush()
}
// Test if doc given by udi has changed since last indexed (test sigs)
bool Db::needUpdate(const string &udi, const string& sig)
bool Db::needUpdate(const string &udi, const string& sig, bool *existed)
{
if (m_ndb == 0)
return false;
@ -1300,8 +1311,12 @@ bool Db::needUpdate(const string &udi, const string& sig)
// If we are doing an in place or full reset, no need to
// test. Note that there is no need to update the existence map
// either, it will be done when updating the index
if (o_inPlaceReset || m_mode == DbTrunc)
if (o_inPlaceReset || m_mode == DbTrunc) {
// For in place reset, pretend the doc existed, to enable subdoc purge
if (existed)
*existed = o_inPlaceReset;
return true;
}
string uniterm = make_uniterm(udi);
string ermsg;
@ -1325,9 +1340,13 @@ bool Db::needUpdate(const string &udi, const string& sig)
if (docid == m_ndb->xrdb.postlist_end(uniterm)) {
// If no document exist with this path, we do need update
LOGDEB(("Db::needUpdate:yes (new): [%s]\n", uniterm.c_str()));
if (existed)
*existed = false;
return true;
}
Xapian::Document doc = m_ndb->xrdb.get_document(*docid);
if (existed)
*existed = true;
// Retrieve old file/doc signature from value
string osig = doc.get_value(VALUE_SIG);
@ -1542,8 +1561,8 @@ bool Db::purgeFile(const string &udi, bool *existed)
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
Xapian::Document xdoc;
DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
DbUpdTask *tp = new DbUpdTask(DbUpdTask::Delete, udi, uniterm,
Xapian::Document(), (size_t)-1);
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::purgeFile:Cant queue task\n"));
return false;
@ -1552,49 +1571,98 @@ bool Db::purgeFile(const string &udi, bool *existed)
}
}
#endif
return purgeFileWrite(udi, uniterm);
/* We get there is IDX_THREADS is not defined or there is no queue */
return m_ndb->purgeFileWrite(false, udi, uniterm);
}
bool Db::purgeFileWrite(const string& udi, const string& uniterm)
/* Delete subdocs with an out of date sig. We do this to purge
obsolete subdocs during a partial update where no general purge
will be done */
bool Db::purgeOrphans(const string &udi)
{
LOGDEB(("Db:purgeOrphans: [%s]\n", udi.c_str()));
if (m_ndb == 0 || !m_ndb->m_iswritable)
return false;
string uniterm = make_uniterm(udi);
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
DbUpdTask *tp = new DbUpdTask(DbUpdTask::PurgeOrphans, udi, uniterm,
Xapian::Document(), (size_t)-1);
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::purgeFile:Cant queue task\n"));
return false;
} else {
return true;
}
}
#endif
/* We get there is IDX_THREADS is not defined or there is no queue */
return m_ndb->purgeFileWrite(true, udi, uniterm);
}
bool Db::Native::purgeFileWrite(bool orphansOnly, const string& udi,
const string& uniterm)
{
#if defined(IDX_THREADS)
// We need a mutex even if we have a write queue (so we can only
// be called by a single thread) to protect about multiple acces
// to xrdb from subDocs() which is also called from needupdate()
// (called from outside the write thread !
PTMutexLocker lock(m_ndb->m_mutex);
PTMutexLocker lock(m_mutex);
#endif // IDX_THREADS
Xapian::WritableDatabase db = m_ndb->xwdb;
string ermsg;
try {
Xapian::PostingIterator docid = db.postlist_begin(uniterm);
if (docid == db.postlist_end(uniterm)) {
Xapian::PostingIterator docid = xwdb.postlist_begin(uniterm);
if (docid == xwdb.postlist_end(uniterm)) {
return true;
}
LOGDEB(("purgeFile: delete docid %d\n", *docid));
if (m_flushMb > 0) {
Xapian::termcount trms = m_ndb->xwdb.get_doclength(*docid);
maybeflush(trms * 5);
if (m_rcldb->m_flushMb > 0) {
Xapian::termcount trms = xwdb.get_doclength(*docid);
m_rcldb->maybeflush(trms * 5);
}
string sig;
if (orphansOnly) {
Xapian::Document doc = xwdb.get_document(*docid);
sig = doc.get_value(VALUE_SIG);
if (sig.empty()) {
LOGINFO(("purgeFileWrite: got empty sig\n"));
return false;
}
} else {
LOGDEB(("purgeFile: delete docid %d\n", *docid));
xwdb.delete_document(*docid);
}
db.delete_document(*docid);
vector<Xapian::docid> docids;
m_ndb->subDocs(udi, docids);
subDocs(udi, docids);
LOGDEB(("purgeFile: subdocs cnt %d\n", docids.size()));
for (vector<Xapian::docid>::iterator it = docids.begin();
it != docids.end(); it++) {
LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it));
if (m_flushMb > 0) {
Xapian::termcount trms = m_ndb->xwdb.get_doclength(*it);
maybeflush(trms * 5);
if (m_rcldb->m_flushMb > 0) {
Xapian::termcount trms = xwdb.get_doclength(*it);
m_rcldb->maybeflush(trms * 5);
}
string subdocsig;
if (orphansOnly) {
Xapian::Document doc = xwdb.get_document(*it);
subdocsig = doc.get_value(VALUE_SIG);
if (subdocsig.empty()) {
LOGINFO(("purgeFileWrite: got empty sig for subdoc??\n"));
continue;
}
}
if (!orphansOnly || sig != subdocsig) {
LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it));
xwdb.delete_document(*it);
}
db.delete_document(*it);
}
return true;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::purgeFile: %s\n", ermsg.c_str()));
LOGERR(("Db::purgeFileWrite: %s\n", ermsg.c_str()));
}
return false;
}

View file

@ -232,7 +232,7 @@ class Db {
* Side-effect: set the existence flag for the file document
* and all subdocs if any (for later use by 'purge()')
*/
bool needUpdate(const string &udi, const string& sig);
bool needUpdate(const string &udi, const string& sig, bool *existed=0);
/** Add or update document identified by unique identifier.
* @param config Config object to use. Can be the same as the member config
@ -260,6 +260,10 @@ class Db {
/** Delete document(s) for given UDI, including subdocs */
bool purgeFile(const string &udi, bool *existed = 0);
/** Delete subdocs with an out of date sig. We do this to purge
obsolete subdocs during a partial update where no general purge
will be done */
bool purgeOrphans(const string &udi);
/** Remove documents that no longer exist in the file system. This
* depends on the update map, which is built during
@ -442,7 +446,6 @@ private:
#ifdef IDX_THREADS
friend void *DbUpdWorker(void*);
#endif // IDX_THREADS
bool purgeFileWrite(const string& udi, const string& uniterm);
// Internal form of close, can be called during destruction
bool i_close(bool final);

View file

@ -36,17 +36,32 @@ namespace Rcl {
class Query;
#ifdef IDX_THREADS
// Task for the index update thread. This can be either and add/update
// or a purge op, in which case the txtlen is (size_t)-1
// Task for the index update thread. This can be
// - add/update for a new / update documment
// - delete for a deleted document
// - purgeOrphans when a multidoc file is updated during a partial pass (no
// general purge). We want to remove subDocs that possibly don't
// exist anymore. We find them by their different sig
// txtlen and doc are only valid for add/update else, len is (size_t)-1 and doc
// is empty
class DbUpdTask {
public:
DbUpdTask(const string& ud, const string& un, const Xapian::Document &d,
size_t tl)
: udi(ud), uniterm(un), doc(d), txtlen(tl)
enum Op {AddOrUpdate, Delete, PurgeOrphans};
// Note that udi and uniterm are strictly equivalent and are
// passed both just to avoid recomputing uniterm which is
// available on the caller site.
DbUpdTask(Op _op, const string& ud, const string& un,
const Xapian::Document &d, size_t tl)
: op(_op), udi(ud), uniterm(un), doc(d), txtlen(tl)
{}
// Udi and uniterm equivalently designate the doc
Op op;
string udi;
string uniterm;
Xapian::Document doc;
// txtlen is used to update the flush interval. It's -1 for a
// purge because we actually don't know it, and the code fakes a
// text length based on the term count.
size_t txtlen;
};
#endif // IDX_THREADS
@ -86,6 +101,8 @@ class Db::Native {
bool addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& doc, size_t txtlen);
bool purgeFileWrite(bool onlyOrphans, const string& udi,
const string& uniterm);
bool getPagePositions(Xapian::docid docid, vector<int>& vpos);
int getPageNumberForPosition(const vector<int>& pbreaks, unsigned int pos);

View file

@ -0,0 +1,41 @@
#!/bin/sh
topdir=`dirname $0`/..
. $topdir/shared.sh
initvariables $0
d=${tstdata}/partialpurge/
# Check that partial purge works: the message orphaned by shortening
# the mbox should not exist in the index any more
(
cp $d/longmbox $d/testmbox
recollindex -Zi $d/testmbox
echo Should have 2 results: testmbox and longmbox:
recollq -q deletedmessageuniqueterm
echo
echo Changing file and reindexing
cp $d/shortmbox $d/testmbox
recollindex -Zi $d/testmbox
echo Should have 1 result: longmbox:
recollq -q deletedmessageuniqueterm
echo
echo Purging whole test file
recollindex -e $d/testmbox
echo Should have 1 result: longmbox:
recollq -q deletedmessageuniqueterm
echo Should have 2 results: longmbox shortmbox:
recollq -q stablemessageuniqueterm
) 2> $mystderr | egrep -v '^Recoll query: ' > $mystdout
diff -w ${myname}.txt $mystdout > $mydiffs 2>&1
checkresult

View file

@ -0,0 +1,18 @@
Should have 2 results: testmbox and longmbox:
2 results
message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email goes: deletedmessageuniqueterm] 755 bytes
message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/testmbox] [This email goes: deletedmessageuniqueterm] 755 bytes
Changing file and reindexing
Should have 1 result: longmbox:
1 results
message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email goes: deletedmessageuniqueterm] 755 bytes
Purging whole test file
Should have 1 result: longmbox:
1 results
message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email goes: deletedmessageuniqueterm] 755 bytes
Should have 2 results: longmbox shortmbox:
2 results
message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/longmbox] [This email remains: stablemessageuniqueterm] 759 bytes
message/rfc822 [file:///home/dockes/projets/fulltext/testrecoll/partialpurge/shortmbox] [This email remains: stablemessageuniqueterm] 1173 bytes