1st parallel multithreaded version of indexing which can do my home without crashing... Let's checkpoint

This commit is contained in:
Jean-Francois Dockes 2012-11-01 11:19:48 +01:00
parent 007f0b06c1
commit a11c696554
23 changed files with 697 additions and 431 deletions

View file

@ -1 +1 @@
1.18.002
1.19.0

31
src/configure vendored
View file

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Recoll 1.18.0.
# Generated by GNU Autoconf 2.69 for Recoll 1.19.0.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -577,8 +577,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Recoll'
PACKAGE_TARNAME='recoll'
PACKAGE_VERSION='1.18.0'
PACKAGE_STRING='Recoll 1.18.0'
PACKAGE_VERSION='1.19.0'
PACKAGE_STRING='Recoll 1.19.0'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -623,6 +623,7 @@ ac_subst_vars='LTLIBOBJS
LIBOBJS
RCLVERSION
NOPYTHON
NOTHREADS
NOPIC
LIBQZEITGEIST
QMAKE_DISABLE_ZEITGEIST
@ -1278,7 +1279,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Recoll 1.18.0 to adapt to many kinds of systems.
\`configure' configures Recoll 1.19.0 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1343,7 +1344,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Recoll 1.18.0:";;
short | recursive ) echo "Configuration of Recoll 1.19.0:";;
esac
cat <<\_ACEOF
@ -1476,7 +1477,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Recoll configure 1.18.0
Recoll configure 1.19.0
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -2029,7 +2030,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Recoll $as_me 1.18.0, which was
It was created by Recoll $as_me 1.19.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -4357,11 +4358,9 @@ $as_echo "#define RCL_USE_XATTR 1" >>confdefs.h
fi
# Enable use of threads in the indexing pipeline.
# Threads are used in bucket-brigade fashion for the processing steps
# (reading file - text splitting - indexing proper). The performance
# increase can be significant, but this is disabled by default as we
# usually care little about indexing absolute performance (more about
# impact on usability and total resources used).
# This is disabled by default as we usually care little about indexing
# absolute performance (more about impact on usability and total
# resources used).
# Check whether --enable-idxthreads was given.
if test "${enable_idxthreads+set}" = set; then :
enableval=$enable_idxthreads; idxthreadsEnabled=$enableval
@ -4374,6 +4373,9 @@ if test X$idxthreadsEnabled = Xyes ; then
$as_echo "#define IDX_THREADS 1" >>confdefs.h
NOTHREADS=""
else
NOTHREADS="#"
fi
# Enable CamelCase word splitting. This is optional because it causes
@ -5743,6 +5745,7 @@ RCLVERSION=`cat VERSION`
# All object files depend on localdefs which has the cc flags. Avoid
@ -6274,7 +6277,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Recoll $as_me 1.18.0, which was
This file was extended by Recoll $as_me 1.19.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -6336,7 +6339,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Recoll config.status 1.18.0
Recoll config.status 1.19.0
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View file

@ -177,12 +177,10 @@ if test X$xattrEnabled = Xyes ; then
AC_DEFINE(RCL_USE_XATTR, 1, [Use file extended attributes])
fi
# Enable use of threads in the indexing pipeline. Threads are used in
# bucket-brigade fashion for the processing steps (reading file - text
# splitting - indexing proper). The performance increase is small in normal
# case (might be a bit more significant if you're using an SSD), and this
# is disabled by default as we usually care little about indexing absolute
# performance (more about impact on usability and total resources used).
# Enable use of threads in the indexing pipeline.
# This is disabled by default as we usually care little about indexing
# absolute performance (more about impact on usability and total
# resources used).
AC_ARG_ENABLE(idxthreads,
AC_HELP_STRING([--enable-idxthreads],
[Enable multithread indexing. This can somewhat boost indexing
@ -191,6 +189,9 @@ AC_ARG_ENABLE(idxthreads,
if test X$idxthreadsEnabled = Xyes ; then
AC_DEFINE(IDX_THREADS, 1, [Use multiple threads for indexing])
NOTHREADS=""
else
NOTHREADS="#"
fi
# Enable CamelCase word splitting. This is optional because it causes
@ -557,6 +558,7 @@ AC_SUBST(QMAKE_ENABLE_ZEITGEIST)
AC_SUBST(QMAKE_DISABLE_ZEITGEIST)
AC_SUBST(LIBQZEITGEIST)
AC_SUBST(NOPIC)
AC_SUBST(NOTHREADS)
AC_SUBST(NOPYTHON)
AC_SUBST(RCLVERSION)

View file

@ -13,7 +13,7 @@ recollindex : $(RECOLLINDEX_OBJS)
$(LIBICONV) $(BDYNAMIC) \
$(LIBFAM) \
$(X_LIBS) $(X_PRE_LIBS) $(X_LIBX11) $(X_EXTRA_LIBS) \
$(LIBSYS)
$(LIBSYS) $(LIBTHREADS)
recollindex.o : recollindex.cpp
$(CXX) $(ALL_CXXFLAGS) -c -o recollindex.o $<
rclmonrcv.o : rclmonrcv.cpp

View file

@ -218,7 +218,7 @@ bool BeagleQueueIndexer::indexFromCache(const string& udi)
if (!stringlowercmp("bookmark", hittype)) {
// Just index the dotdoc
dotdoc.meta[Rcl::Doc::keybcknd] = "BGL";
return m_db->addOrUpdate(udi, cstr_null, dotdoc);
return m_db->addOrUpdate(m_config, udi, cstr_null, dotdoc);
} else if (stringlowercmp("webhistory", dotdoc.meta[Rcl::Doc::keybght]) ||
(dotdoc.mimetype.compare("text/html") &&
dotdoc.mimetype.compare(cstr_textplain))) {
@ -248,7 +248,7 @@ bool BeagleQueueIndexer::indexFromCache(const string& udi)
doc.pcbytes = dotdoc.pcbytes;
doc.sig.clear();
doc.meta[Rcl::Doc::keybcknd] = "BGL";
return m_db->addOrUpdate(udi, cstr_null, doc);
return m_db->addOrUpdate(m_config, udi, cstr_null, doc);
}
}
@ -414,7 +414,7 @@ BeagleQueueIndexer::processone(const string &path,
dotdoc.sig.clear();
dotdoc.meta[Rcl::Doc::keybcknd] = "BGL";
if (!m_db->addOrUpdate(udi, cstr_null, dotdoc))
if (!m_db->addOrUpdate(m_config, udi, cstr_null, dotdoc))
return FsTreeWalker::FtwError;
} else if (stringlowercmp("webhistory", dotdoc.meta[Rcl::Doc::keybght]) ||
@ -461,7 +461,7 @@ BeagleQueueIndexer::processone(const string &path,
doc.url = dotdoc.url;
doc.meta[Rcl::Doc::keybcknd] = "BGL";
if (!m_db->addOrUpdate(udi, cstr_null, doc))
if (!m_db->addOrUpdate(m_config, udi, cstr_null, doc))
return FsTreeWalker::FtwError;
}

View file

@ -53,32 +53,78 @@
#define RCL_STTIME st_mtime
#endif // RCL_USE_XATTR
#ifndef NO_NAMESPACES
using namespace std;
#endif /* NO_NAMESPACES */
#ifndef deleteZ
#define deleteZ(X) {delete X;X = 0;}
#ifdef IDX_THREADS
class DbUpdTask {
public:
DbUpdTask(RclConfig *cnf, const string& u, const string& p,
const Rcl::Doc& d)
: udi(u), parent_udi(p), doc(d), config(cnf)
{}
string udi;
string parent_udi;
Rcl::Doc doc;
RclConfig *config;
};
extern void *FsIndexerDbUpdWorker(void*);
class InternfileTask {
public:
InternfileTask(const std::string &f, const struct stat *i_stp)
: fn(f), statbuf(*i_stp)
{}
string fn;
struct stat statbuf;
};
extern void *FsIndexerInternfileWorker(void*);
#endif // IDX_THREADS
// Thread safe variation of the "missing helpers" storage. Only the
// addMissing method needs protection, the rest are called from the
// main thread either before or after the exciting part
class FSIFIMissingStore : public FIMissingStore {
#ifdef IDX_THREADS
PTMutexInit m_mutex;
#endif
public:
virtual void addMissing(const string& prog, const string& mt)
{
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
#endif
FIMissingStore::addMissing(prog, mt);
}
};
FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
: m_config(cnf), m_db(db), m_updater(updfunc), m_missing(new FIMissingStore)
: m_config(cnf), m_db(db), m_updater(updfunc),
m_missing(new FSIFIMissingStore)
#ifdef IDX_THREADS
, m_wqueue(10)
, m_iwqueue("Internfile", 2), m_dwqueue("Split", 2)
#endif // IDX_THREADS
{
m_havelocalfields = m_config->hasNameAnywhere("localfields");
#ifdef IDX_THREADS
if (!m_wqueue.start(FsIndexerIndexWorker, this)) {
m_loglevel = DebugLog::getdbl()->getlevel();
if (!m_iwqueue.start(4, FsIndexerInternfileWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
return;
}
if (!m_dwqueue.start(2, FsIndexerDbUpdWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
return;
}
#endif // IDX_THREADS
}
FsIndexer::~FsIndexer() {
#ifdef IDX_THREADS
void *status = m_wqueue.setTerminateAndWait();
LOGERR(("FsIndexer: worker status: %ld\n", long(status)));
void *status = m_iwqueue.setTerminateAndWait();
LOGINFO(("FsIndexer: internfile wrker status: %ld (1->ok)\n",
long(status)));
status = m_dwqueue.setTerminateAndWait();
LOGINFO(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status)));
#endif // IDX_THREADS
delete m_missing;
}
@ -98,10 +144,14 @@ bool FsIndexer::init()
// Recursively index each directory in the topdirs:
bool FsIndexer::index()
{
Chrono chron;
if (!init())
return false;
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
#endif
m_updater->status.reset();
m_updater->status.dbtotdocs = m_db->docCnt();
}
@ -138,15 +188,21 @@ bool FsIndexer::index()
}
#ifdef IDX_THREADS
m_wqueue.waitIdle();
m_iwqueue.waitIdle();
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
if (m_missing) {
string missing;
FileInterner::getMissingDescription(m_missing, missing);
m_missing->getMissingDescription(missing);
if (!missing.empty()) {
LOGINFO(("FsIndexer::index missing helper program(s):\n%s\n",
missing.c_str()));
}
m_config->storeMissingHelperDesc(missing);
}
LOGERR(("fsindexer index time: %d mS\n", chron.ms()));
return true;
}
@ -303,25 +359,54 @@ void FsIndexer::makesig(const struct stat *stp, string& out)
}
#ifdef IDX_THREADS
void *FsIndexerIndexWorker(void * fsp)
void *FsIndexerDbUpdWorker(void * fsp)
{
FsIndexer *fip = (FsIndexer*)fsp;
WorkQueue<IndexingTask*> *tqp = &fip->m_wqueue;
IndexingTask *tsk;
WorkQueue<DbUpdTask*> *tqp = &fip->m_dwqueue;
DebugLog::getdbl()->setloglevel(fip->m_loglevel);
DbUpdTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("FsIndexerIndexWorker: got task, ql %d\n", int(tqp->size())));
if (!fip->m_db->addOrUpdate(tsk->udi, tsk->parent_udi, tsk->doc)) {
tqp->setTerminateAndWait();
LOGDEB(("FsIndexerDbUpdWorker: got task, ql %d\n", int(tqp->size())));
if (!fip->m_db->addOrUpdate(tsk->config, tsk->udi, tsk->parent_udi,
tsk->doc)) {
LOGERR(("FsIndexerDbUpdWorker: addOrUpdate failed\n"));
tqp->workerExit();
return (void*)0;
}
delete tsk;
}
}
void *FsIndexerInternfileWorker(void * fsp)
{
FsIndexer *fip = (FsIndexer*)fsp;
WorkQueue<InternfileTask*> *tqp = &fip->m_iwqueue;
DebugLog::getdbl()->setloglevel(fip->m_loglevel);
TempDir tmpdir;
RclConfig *myconf = new RclConfig(*(fip->m_config));
InternfileTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB1(("FsIndexerInternfileWorker: fn %s\n", tsk->fn.c_str()));
if (fip->processonefile(myconf, tmpdir, tsk->fn, &tsk->statbuf) !=
FsTreeWalker::FtwOk) {
LOGERR(("FsIndexerInternfileWorker: processone failed\n"));
tqp->workerExit();
return (void*)0;
}
LOGDEB1(("FsIndexerInternfileWorker: done fn %s\n", tsk->fn.c_str()));
delete tsk;
}
}
#endif // IDX_THREADS
/// This method gets called for every file and directory found by the
@ -339,9 +424,14 @@ FsTreeWalker::Status
FsIndexer::processone(const std::string &fn, const struct stat *stp,
FsTreeWalker::CbFlag flg)
{
if (m_updater && !m_updater->update()) {
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
#endif
if (!m_updater->update()) {
return FsTreeWalker::FtwStop;
}
}
// If we're changing directories, possibly adjust parameters (set
// the current directory in configuration object)
@ -364,6 +454,26 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
return FsTreeWalker::FtwOk;
}
#ifdef IDX_THREADS
InternfileTask *tp = new InternfileTask(fn, stp);
if (!m_iwqueue.put(tp))
return FsTreeWalker::FtwError;
return FsTreeWalker::FtwOk;
#else
return processonefile(m_config, m_tmpdir, fn, stp);
#endif // IDX_THREADS
}
FsTreeWalker::Status
FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir,
const std::string &fn, const struct stat *stp)
{
#ifdef IDX_THREADS
config->setKeyDir(path_getfather(fn));
#endif
////////////////////
// Check db up to date ? Doing this before file type
// identification means that, if usesystemfilecommand is switched
@ -379,9 +489,20 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
makesig(stp, sig);
string udi;
make_udi(fn, cstr_null, udi);
if (!m_db->needUpdate(udi, sig)) {
bool needupdate;
{
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
#endif
needupdate = m_db->needUpdate(udi, sig);
}
if (!needupdate) {
LOGDEB0(("processone: up to date: %s\n", fn.c_str()));
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
#endif
// Status bar update, abort request etc.
m_updater->status.fn = fn;
++(m_updater->status.filesdone);
@ -395,7 +516,7 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
LOGDEB0(("processone: processing: [%s] %s\n",
displayableBytes(stp->st_size).c_str(), fn.c_str()));
FileInterner interner(fn, stp, m_config, m_tmpdir, FileInterner::FIF_none);
FileInterner interner(fn, stp, config, tmpdir, FileInterner::FIF_none);
if (!interner.ok()) {
// no indexing whatsoever in this case. This typically means that
// indexallfilenames is not set
@ -482,17 +603,23 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
make_udi(fn, doc.ipath, udi);
#ifdef IDX_THREADS
IndexingTask *tp = new IndexingTask(udi, doc.ipath.empty() ?
DbUpdTask *tp = new DbUpdTask(config, udi, doc.ipath.empty() ?
cstr_null : parent_udi, doc);
if (!m_wqueue.put(tp))
if (!m_dwqueue.put(tp)) {
LOGERR(("processonefile: wqueue.put failed\n"));
return FsTreeWalker::FtwError;
}
#else
if (!m_db->addOrUpdate(udi, doc.ipath.empty() ? cstr_null : parent_udi, doc))
if (!m_db->addOrUpdate(config, udi, doc.ipath.empty() ? cstr_null :
parent_udi, doc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
// Tell what we are doing and check for interrupt request
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
#endif
++(m_updater->status.docsdone);
if (m_updater->status.dbtotdocs < m_updater->status.docsdone)
m_updater->status.dbtotdocs = m_updater->status.docsdone;
@ -522,11 +649,11 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
// Document signature for up to date checks.
makesig(stp, fileDoc.sig);
#ifdef IDX_THREADS
IndexingTask *tp = new IndexingTask(parent_udi, cstr_null, fileDoc);
if (!m_wqueue.put(tp))
DbUpdTask *tp = new DbUpdTask(config, parent_udi, cstr_null, fileDoc);
if (!m_dwqueue.put(tp))
return FsTreeWalker::FtwError;
#else
if (!m_db->addOrUpdate(parent_udi, cstr_null, fileDoc))
if (!m_db->addOrUpdate(config, parent_udi, cstr_null, fileDoc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
}

View file

@ -17,14 +17,14 @@
#ifndef _fsindexer_h_included_
#define _fsindexer_h_included_
#include <sys/stat.h>
#include <list>
#ifndef NO_NAMESPACES
using std::list;
#endif
#include "indexer.h"
#include "fstreewalk.h"
#ifdef IDX_THREADS
#include "ptmutex.h"
#include "workqueue.h"
#endif // IDX_THREADS
@ -32,18 +32,8 @@ class DbIxStatusUpdater;
class FIMissingStore;
struct stat;
#ifdef IDX_THREADS
class IndexingTask {
public:
IndexingTask(const string& u, const string& p, const Rcl::Doc& d)
:udi(u), parent_udi(p), doc(d)
{}
string udi;
string parent_udi;
Rcl::Doc doc;
};
extern void *FsIndexerIndexWorker(void*);
#endif // IDX_THREADS
class DbUpdTask;
class InternfileTask;
/** Index selected parts of the file system
@ -75,11 +65,11 @@ class FsIndexer : public FsTreeWalkerCB {
bool index();
/** Index a list of files. No db cleaning or stemdb updating */
bool indexFiles(list<string> &files, ConfIndexer::IxFlag f =
bool indexFiles(std::list<std::string> &files, ConfIndexer::IxFlag f =
ConfIndexer::IxFNone);
/** Purge a list of files. */
bool purgeFiles(list<string> &files);
bool purgeFiles(std::list<std::string> &files);
/** Tree walker callback method */
FsTreeWalker::Status
@ -95,9 +85,10 @@ class FsIndexer : public FsTreeWalkerCB {
TempDir m_tmpdir;
string m_reason;
DbIxStatusUpdater *m_updater;
list<string> m_tdl;
std::list<std::string> m_tdl;
FIMissingStore *m_missing;
// The configuration can set attribute fields to be inherited by
// all files in a file system area. Ie: set "rclaptg = thunderbird"
// inside ~/.thunderbird. The boolean is set at init to avoid
@ -106,14 +97,25 @@ class FsIndexer : public FsTreeWalkerCB {
map<string, string> m_localfields;
#ifdef IDX_THREADS
friend void *FsIndexerIndexWorker(void*);
WorkQueue<IndexingTask*> m_wqueue;
// Used to protect all ops from processonefile to class members:
// m_missing, m_db. It would be possible to be more fine-grained
// but probably not worth it. m_config and m_updater have separate
// protections
PTMutexInit m_mutex;
friend void *FsIndexerDbUpdWorker(void*);
friend void *FsIndexerInternfileWorker(void*);
int m_loglevel;
WorkQueue<InternfileTask*> m_iwqueue;
WorkQueue<DbUpdTask*> m_dwqueue;
#endif // IDX_THREADS
bool init();
void localfieldsfromconf();
void setlocalfields(Rcl::Doc& doc);
string getDbDir() {return m_config->getDbDir();}
FsTreeWalker::Status
processonefile(RclConfig *config, TempDir& tmpdir, const string &fn,
const struct stat *);
};
#endif /* _fsindexer_h_included_ */

View file

@ -112,7 +112,10 @@ class MyUpdater : public DbIxStatusUpdater {
fprintf(fp, "filesdone = %d\n", status.filesdone);
fprintf(fp, "dbtotdocs = %d\n", status.dbtotdocs);
fprintf(fp, "fn = %s\n", status.fn.c_str());
ftruncate(m_fd, off_t(ftell(fp)));
if (ftruncate(m_fd, off_t(ftell(fp))) < 0) {
// ? kill compiler warning about ignoring ftruncate return
LOGDEB(("Status update: ftruncate failed\n"));
}
// Flush data and closes fd1. m_fd still valid
fclose(fp);
}

View file

@ -325,6 +325,7 @@ void FileInterner::init(const string &f, const struct stat *stp, RclConfig *cnf,
df->set_docsize(docsize);
if (!df->set_document_file(m_fn)) {
delete df;
LOGERR(("FileInterner:: error converting %s\n", m_fn.c_str()));
return;
}

View file

@ -22,6 +22,7 @@
#include <iostream>
#include <string>
#include <vector>
#include <list>
using namespace std;
#include "cstr.h"
@ -44,23 +45,35 @@ using namespace std;
// (think email attachment in email message: 2 rfc822 handlers are
// needed simulteanously)
static multimap<string, Dijon::Filter*> o_handlers;
static list<multimap<string, Dijon::Filter*>::iterator> o_hlru;
typedef list<multimap<string, Dijon::Filter*>::iterator>::iterator hlruit_tp;
static PTMutexInit o_handlers_mutex;
static const unsigned int max_handlers_cache_size = 300;
static const unsigned int max_handlers_cache_size = 100;
/* Look for mime handler in pool */
static Dijon::Filter *getMimeHandlerFromCache(const string& mtype)
static Dijon::Filter *getMimeHandlerFromCache(const string& key)
{
LOGDEB0(("getMimeHandlerFromCache: %s\n", mtype.c_str()));
PTMutexLocker locker(o_handlers_mutex);
map<string, Dijon::Filter *>::iterator it = o_handlers.find(mtype);
LOGDEB(("getMimeHandlerFromCache: %s cache size %u\n",
key.c_str(), o_handlers.size()));
multimap<string, Dijon::Filter *>::iterator it = o_handlers.find(key);
if (it != o_handlers.end()) {
Dijon::Filter *h = it->second;
hlruit_tp it1 = find(o_hlru.begin(), o_hlru.end(), it);
if (it1 != o_hlru.end()) {
o_hlru.erase(it1);
} else {
LOGERR(("getMimeHandlerFromCache: lru position not found\n"));
}
o_handlers.erase(it);
LOGDEB0(("getMimeHandlerFromCache: %s found\n", mtype.c_str()));
LOGDEB(("getMimeHandlerFromCache: %s found size %u\n",
key.c_str(), o_handlers.size()));
return h;
}
LOGDEB(("getMimeHandlerFromCache: %s not found\n", key.c_str()));
return 0;
}
@ -68,28 +81,40 @@ static Dijon::Filter *getMimeHandlerFromCache(const string& mtype)
void returnMimeHandler(Dijon::Filter *handler)
{
typedef multimap<string, Dijon::Filter*>::value_type value_type;
if (handler) {
if (handler==0)
return;
handler->clear();
PTMutexLocker locker(o_handlers_mutex);
LOGDEB2(("returnMimeHandler: returning filter for %s cache size %d\n",
LOGDEB(("returnMimeHandler: returning filter for %s cache size %d\n",
handler->get_mime_type().c_str(), o_handlers.size()));
// Limit pool size. It's possible for some reason that the
// handler was not found in the cache by getMimeHandler() and
// that a new handler is returned every time. We don't want
// the cache to grow indefinitely. We try to delete an element
// of the same kind, and if this fails, the first at
// hand. Note that going oversize *should not* normally
// happen, we're only being prudent.
// Limit pool size. The pool can grow quite big because there are
// many filter types, each of which can be used in several copies
// at the same time either because it occurs several times in a
// stack (ie mail attachment to mail), or because several threads
// are processing the same mime type at the same time.
multimap<string, Dijon::Filter *>::iterator it;
if (o_handlers.size() >= max_handlers_cache_size) {
map<string, Dijon::Filter *>::iterator it =
o_handlers.find(handler->get_mime_type());
if (it != o_handlers.end())
static int once = 1;
if (once) {
once = 0;
for (it = o_handlers.begin(); it != o_handlers.end(); it++) {
LOGERR(("Cache full key: %s\n", it->first.c_str()));
}
LOGERR(("Cache LRU size: %u\n", o_hlru.size()));
}
if (o_hlru.size() > 0) {
it = o_hlru.back();
o_hlru.pop_back();
delete it->second;
o_handlers.erase(it);
else
o_handlers.erase(o_handlers.begin());
}
o_handlers.insert(value_type(handler->get_mime_type(), handler));
}
it = o_handlers.insert(value_type(handler->get_mime_type(), handler));
o_hlru.push_front(it);
}
void clearMimeHandlerCache()
@ -279,6 +304,7 @@ Dijon::Filter *getMimeHandler(const string &mtype, RclConfig *cfg,
{bool indexunknown = false;
cfg->getConfParam("indexallfilenames", &indexunknown);
if (indexunknown) {
if ((h = getMimeHandlerFromCache("application/octet-stream")) == 0)
h = new MimeHandlerUnknown(cfg, "application/octet-stream");
goto out;
} else {

View file

@ -6,3 +6,4 @@ ALL_CXXFLAGS = $(CXXFLAGS) $(COMMONCXXFLAGS) $(LOCALCXXFLAGS) \
-D_GNU_SOURCE
LIBSYS = -lpthread -ldl
LIBSYSTHREADS = -lrt

View file

@ -25,6 +25,7 @@ libdir = @libdir@
RECOLL_DATADIR = ${datadir}/recoll
@NOPIC@PICFLAGS = $(SYSPICFLAGS)
@NOTHREADS@LIBTHREADS = $(LIBSYSTHREADS)
LOCALCXXFLAGS = -g -O2 -Wall -Wno-unused \
$(INCICONV) $(XAPIANCXXFLAGS) $(X_CFLAGS) \

View file

@ -280,10 +280,13 @@ Db::Db(RclConfig *cfp)
m_config->getConfParam("idxflushmb", &m_flushMb);
}
#ifdef IDX_THREADS
if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) {
if (m_ndb) {
m_ndb->m_loglevel = DebugLog::getdbl()->getlevel();
if (!m_ndb->m_wqueue.start(1, DbUpdWorker, this)) {
LOGERR(("Db::Db: Worker start failed\n"));
return;
}
}
#endif // IDX_THREADS
}
@ -461,7 +464,7 @@ int Db::docCnt()
if (!m_ndb || !m_ndb->m_isopen)
return -1;
XAPTRY(res = m_ndb->xdb().get_doccount(), m_ndb->xrdb, m_reason);
XAPTRY(res = m_ndb->xrdb.get_doccount(), m_ndb->xrdb, m_reason);
if (!m_reason.empty()) {
LOGERR(("Db::docCnt: got error: %s\n", m_reason.c_str()));
@ -788,48 +791,22 @@ void *DbUpdWorker(void* vdbp)
{
Db *dbp = (Db *)vdbp;
WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
DbUpdTask *tsk;
DebugLog::getdbl()->setloglevel(dbp->m_ndb->m_loglevel);
DbUpdTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
const char *fnc = tsk->udi.c_str();
string ermsg;
// Add db entry or update existing entry:
try {
Xapian::docid did =
dbp->m_ndb->xwdb.replace_document(tsk->uniterm,
tsk->doc);
if (did < dbp->updated.size()) {
dbp->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
}
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
ermsg.erase();
// FIXME: is this ever actually needed?
try {
dbp->m_ndb->xwdb.add_document(tsk->doc);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen)) {
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
tqp->workerExit();
delete tsk;
return (void*)0;
}
}
dbp->maybeflush(tsk->txtlen);
delete tsk;
}
}
@ -839,26 +816,13 @@ void *DbUpdWorker(void* vdbp)
// the title abstract and body and add special terms for file name,
// date, mime type etc. , create the document data record (more
// metadata), and update database
bool Db::addOrUpdate(const string &udi, const string &parent_udi,
Doc &doc)
bool Db::addOrUpdate(RclConfig *config, const string &udi,
const string &parent_udi, Doc &doc)
{
LOGDEB(("Db::add: udi [%s] parent [%s]\n",
udi.c_str(), parent_udi.c_str()));
if (m_ndb == 0)
return false;
// Check file system full every mbyte of indexed text.
if (m_maxFsOccupPc > 0 &&
(m_occFirstCheck || (m_curtxtsz - m_occtxtsz) / MB >= 1)) {
LOGDEB(("Db::add: checking file system usage\n"));
int pc;
m_occFirstCheck = 0;
if (fsocc(m_basedir, &pc) && pc >= m_maxFsOccupPc) {
LOGERR(("Db::add: stop indexing: file system "
"%d%% full > max %d%%\n", pc, m_maxFsOccupPc));
return false;
}
m_occtxtsz = m_curtxtsz;
}
Xapian::Document newdocument;
@ -1082,10 +1046,10 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
if (!doc.meta[Doc::keyabs].empty())
RECORD_APPEND(record, Doc::keyabs, doc.meta[Doc::keyabs]);
const set<string>& stored = m_config->getStoredFields();
const set<string>& stored = config->getStoredFields();
for (set<string>::const_iterator it = stored.begin();
it != stored.end(); it++) {
string nm = m_config->fieldCanon(*it);
string nm = config->fieldCanon(*it);
if (!doc.meta[*it].empty()) {
string value =
neutchars(truncate_to_word(doc.meta[*it], 150), cstr_nc);
@ -1125,16 +1089,42 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
return false;
}
return true;
#else
return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument,
doc.text.length());
#endif // IDX_THREADS
}
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& newdocument, size_t textlen)
{
// Check file system full every mbyte of indexed text. It's a bit wasteful
// to do this after having prepared the document, but it needs to be in
// the single-threaded section.
if (m_rcldb->m_maxFsOccupPc > 0 &&
(m_rcldb->m_occFirstCheck ||
(m_rcldb->m_curtxtsz - m_rcldb->m_occtxtsz) / MB >= 1)) {
LOGDEB(("Db::add: checking file system usage\n"));
int pc;
m_rcldb->m_occFirstCheck = 0;
if (fsocc(m_rcldb->m_basedir, &pc) && pc >= m_rcldb->m_maxFsOccupPc) {
LOGERR(("Db::add: stop indexing: file system "
"%d%% full > max %d%%\n", pc, m_rcldb->m_maxFsOccupPc));
return false;
}
m_rcldb->m_occtxtsz = m_rcldb->m_curtxtsz;
}
const char *fnc = udi.c_str();
string ermsg;
// Add db entry or update existing entry:
try {
Xapian::docid did =
m_ndb->xwdb.replace_document(uniterm, newdocument);
if (did < updated.size()) {
updated[did] = true;
xwdb.replace_document(uniterm, newdocument);
if (did < m_rcldb->updated.size()) {
m_rcldb->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
@ -1146,7 +1136,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
ermsg.erase();
// FIXME: is this ever actually needed?
try {
m_ndb->xwdb.add_document(newdocument);
xwdb.add_document(newdocument);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
@ -1157,11 +1147,16 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
}
// Test if we're over the flush threshold (limit memory usage):
maybeflush(doc.text.length());
#endif // IDX_THREADS
return true;
return m_rcldb->maybeflush(textlen);
}
#ifdef IDX_THREADS
void Db::waitUpdIdle()
{
m_ndb->m_wqueue.waitIdle();
}
#endif
// Flush when idxflushmbs is reached
bool Db::maybeflush(off_t moretext)
{
@ -1233,6 +1228,7 @@ bool Db::needUpdate(const string &udi, const string& sig)
// Set the uptodate flag for doc / pseudo doc
if (m_mode != DbRO) {
#warning we need a lock here !
updated[*docid] = true;
// Set the existence flag for all the subdocs (if any)
@ -1244,7 +1240,7 @@ bool Db::needUpdate(const string &udi, const string& sig)
for (vector<Xapian::docid>::iterator it = docids.begin();
it != docids.end(); it++) {
if (*it < updated.size()) {
LOGDEB2(("Db::needUpdate: set flag for docid %d\n", *it));
LOGDEB2(("Db::needUpdate: docid %d set\n", *it));
updated[*it] = true;
}
}

View file

@ -250,8 +250,11 @@ class Db {
/** Add or update document. The Doc class should have been filled as much as
* possible depending on the document type. parent_udi is only
* use for subdocs, else set it to empty */
bool addOrUpdate(const string &udi, const string &parent_udi,
Doc &doc);
bool addOrUpdate(RclConfig *config, const string &udi,
const string &parent_udi, Doc &doc);
#ifdef IDX_THREADS
void waitUpdIdle();
#endif
/** Delete document(s) for given UDI, including subdocs */
bool purgeFile(const string &udi, bool *existed = 0);

View file

@ -26,10 +26,10 @@
#ifdef IDX_THREADS
#include "workqueue.h"
#include "debuglog.h"
#endif // IDX_THREADS
#include "debuglog.h"
#include "xmacros.h"
#include "ptmutex.h"
namespace Rcl {
@ -61,6 +61,8 @@ class Db::Native {
bool m_noversionwrite; //Set if open failed because of version mismatch!
#ifdef IDX_THREADS
WorkQueue<DbUpdTask*> m_wqueue;
int m_loglevel;
PTMutexInit m_mutex;
#endif // IDX_THREADS
// Indexing
@ -76,19 +78,26 @@ class Db::Native {
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue(10)
, m_wqueue("DbUpd", 2)
#endif // IDX_THREADS
{ }
{
LOGDEB2(("Native::Native: me %p\n", this));
}
~Native() {
LOGDEB2(("Native::~Native: me %p\n", this));
#ifdef IDX_THREADS
if (m_iswritable) {
void *status = m_wqueue.setTerminateAndWait();
LOGDEB(("Native: worker status %ld\n", long(status)));
LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
}
#endif // IDX_THREADS
}
// Final steps of doc update, part which need to be single-threaded
bool addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& doc, size_t txtlen);
bool getPagePositions(Xapian::docid docid, vector<int>& vpos);
int getPageNumberForPosition(const vector<int>& pbreaks, unsigned int pos);

View file

@ -95,15 +95,16 @@ indexstemminglanguages = english
# appartenance to the list will turn-off both standard accent and case
# processing. Examples:
# Swedish:
# unac_except_trans = åå Åå ää Ää öö Öö
# unac_except_trans = ää Ää öö Öö üü Üü ßss œoe Œoe æae Æae fifi flfl åå Åå
# German:
# unac_except_trans = Ää Öö Üü ää öö üü ßss
# In French, you probably want to decompose oe and ae
# unac_except_trans = œoe Œoe æae Æae
# Actually, this seems a reasonable default for all until someone
# protests. These decompositions are not performed by unac, but I
# cant imagine someone typing the composed forms in a search.
unac_except_trans = ßss œoe Œoe æae ÆAE fifi flfl
# unac_except_trans = ää Ää öö Öö üü Üü ßss œoe Œoe æae Æae fifi flfl
# In French, you probably want to decompose oe and ae and nobody would type
# a German ß
# unac_except_trans = ßss œoe Œoe æae Æae fifi flfl
# Reasonable default for all until someone protests. These decompositions
# are not performed by unac, but I cant imagine someone typing the composed
# forms in a search.
unac_except_trans = ßss œoe Œoe æae Æae fifi flfl
# Maximum expansion count for a single term (ie: when using wildcards).
# We used to not limit this at all (except for filenames where the limit

View file

@ -17,17 +17,33 @@
#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_
#include "pthread.h"
#include <pthread.h>
#include <time.h>
#include <string>
#include <queue>
#include <tr1/unordered_map>
#include <tr1/unordered_set>
using std::tr1::unordered_map;
using std::tr1::unordered_set;
using std::queue;
using std::string;
#include "debuglog.h"
#define WORKQUEUE_TIMING
class WQTData {
public:
WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
struct timespec wstart;
};
/**
* A WorkQueue manages the synchronisation around a queue of work items,
* where a single client thread queues tasks and a single worker takes
* and executes them. The goal is to introduce some level of
* parallelism between the successive steps of a previously single
* where a number of client threads queue tasks and a number of worker
* threads takes and executes them. The goal is to introduce some level
* of parallelism between the successive steps of a previously single
* threaded pipe-line (data extraction / data preparation / index
* update).
*
@ -38,9 +54,17 @@ using std::string;
*/
template <class T> class WorkQueue {
public:
WorkQueue(int hi = 0, int lo = 1)
: m_high(hi), m_low(lo), m_size(0), m_worker_up(false),
m_worker_waiting(false), m_jobcnt(0), m_lenacc(0)
/** Create a WorkQueue
* @param name for message printing
* @param hi number of tasks on queue before clients blocks. Default 0
* meaning no limit.
* @param lo minimum count of tasks before worker starts. Default 1.
*/
WorkQueue(const string& name, int hi = 0, int lo = 1)
: m_name(name), m_high(hi), m_low(lo), m_size(0),
m_workers_waiting(0), m_workers_exited(0), m_jobcnt(0),
m_clientwait(0), m_workerwait(0), m_workerwork(0)
{
m_ok = (pthread_cond_init(&m_cond, 0) == 0) &&
(pthread_mutex_init(&m_mutex, 0) == 0);
@ -48,30 +72,48 @@ public:
~WorkQueue()
{
if (m_worker_up)
LOGDEB2(("WorkQueue::~WorkQueue: name %s\n", m_name.c_str()));
if (!m_worker_threads.empty())
setTerminateAndWait();
}
/** Start the worker thread. The start_routine will loop
* taking and executing tasks. */
bool start(void *(*start_routine)(void *), void *arg)
/** Start the worker threads.
*
* @param nworkers number of threads copies to start.
* @param start_routine thread function. It should loop
* taking (QueueWorker::take() and executing tasks.
* @param arg initial parameter to thread function.
* @return true if ok.
*/
bool start(int nworkers, void *(*start_routine)(void *), void *arg)
{
bool status = pthread_create(&m_worker_thread, 0,
start_routine, arg) == 0;
if (status)
m_worker_up = true;
return status;
for (int i = 0; i < nworkers; i++) {
int err;
pthread_t thr;
if ((err = pthread_create(&thr, 0, start_routine, arg))) {
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
m_name.c_str(), err));
return false;
}
m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
}
return true;
}
/**
* Add item to work queue. Sleep if there are already too many.
* Called from client.
/** Add item to work queue, called from client.
*
* Sleeps if there are already too many.
*/
bool put(T t)
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false;
#ifdef WORKQUEUE_TIMING
struct timespec before;
clock_gettime(CLOCK_MONOTONIC, &before);
#endif
while (ok() && m_high > 0 && m_queue.size() >= m_high) {
// Keep the order: we test ok() AFTER the sleep...
if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
@ -80,112 +122,218 @@ public:
}
}
#ifdef WORKQUEUE_TIMING
struct timespec after;
clock_gettime(CLOCK_MONOTONIC, &after);
m_clientwait += nanodiff(before, after);
#endif
m_queue.push(t);
++m_size;
pthread_cond_broadcast(&m_cond);
// Just wake one worker, there is only one new task.
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_mutex);
return true;
}
/** Wait until the queue is empty and the worker is
* back waiting for task. Called from the client when it needs to
* perform work that couldn't be done in parallel with the
* worker's tasks.
/** Wait until the queue is inactive. Called from client.
*
* Waits until the task queue is empty and the workers are all
* back sleeping. Used by the client to wait for all current work
* to be completed, when it needs to perform work that couldn't be
* done in parallel with the worker's tasks, or before shutting
* down. Work can be resumed after calling this.
*/
bool waitIdle()
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
if (!ok() || pthread_mutex_lock(&m_mutex) != 0) {
LOGERR(("WorkQueue::waitIdle: %s not ok or can't lock\n",
m_name.c_str()));
return false;
}
// We're done when the queue is empty AND the worker is back
// for a task (has finished the last)
while (ok() && (m_queue.size() > 0 || !m_worker_waiting)) {
// We're done when the queue is empty AND all workers are back
// waiting for a task.
while (ok() && (m_queue.size() > 0 ||
m_workers_waiting != m_worker_threads.size())) {
if (pthread_cond_wait(&m_cond, &m_mutex)) {
pthread_mutex_unlock(&m_mutex);
m_ok = false;
LOGERR(("WorkQueue::waitIdle: cond_wait failed\n"));
return false;
}
}
#ifdef WORKQUEUE_TIMING
long long M = 1000000LL;
long long wscl = m_worker_threads.size() * M;
LOGERR(("WorkQueue:%s: clients wait (all) %lld mS, "
"worker wait (avg) %lld mS, worker work (avg) %lld mS\n",
m_name.c_str(), m_clientwait / M, m_workerwait / wscl,
m_workerwork / wscl));
#endif // WORKQUEUE_TIMING
pthread_mutex_unlock(&m_mutex);
return ok();
}
/** Tell the worker to exit, and wait for it. There may still
be tasks on the queue. */
/** Tell the workers to exit, and wait for them. Does not bother about
* tasks possibly remaining on the queue, so should be called
* after waitIdle() for an orderly shutdown.
*/
void* setTerminateAndWait()
{
if (!m_worker_up)
return (void *)0;
LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
pthread_mutex_lock(&m_mutex);
m_ok = false;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
void *status;
pthread_join(m_worker_thread, &status);
m_worker_up = false;
return status;
if (m_worker_threads.empty()) {
// Already called ?
return (void*)0;
}
/** Remove task from queue. Sleep if there are not enough. Signal if we go
to sleep on empty queue: client may be waiting for our going idle */
// Wait for all worker threads to have called workerExit()
m_ok = false;
while (m_workers_exited < m_worker_threads.size()) {
pthread_cond_broadcast(&m_cond);
if (pthread_cond_wait(&m_cond, &m_mutex)) {
pthread_mutex_unlock(&m_mutex);
LOGERR(("WorkQueue::setTerminate: cond_wait failed\n"));
return false;
}
}
// Perform the thread joins and compute overall status
// Workers return (void*)1 if ok
void *statusall = (void*)1;
unordered_map<pthread_t, WQTData>::iterator it;
while (!m_worker_threads.empty()) {
void *status;
it = m_worker_threads.begin();
pthread_join(it->first, &status);
if (status == (void *)0)
statusall = status;
m_worker_threads.erase(it);
}
pthread_mutex_unlock(&m_mutex);
LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
return statusall;
}
/** Take task from queue. Called from worker.
*
* Sleeps if there are not enough. Signal if we go
* to sleep on empty queue: client may be waiting for our going idle.
*/
bool take(T* tp)
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false;
#ifdef WORKQUEUE_TIMING
struct timespec beforesleep;
clock_gettime(CLOCK_MONOTONIC, &beforesleep);
pthread_t me = pthread_self();
unordered_map<pthread_t, WQTData>::iterator it =
m_worker_threads.find(me);
if (it != m_worker_threads.end() &&
it->second.wstart.tv_sec != 0 && it->second.wstart.tv_nsec != 0) {
long long nanos = nanodiff(it->second.wstart, beforesleep);
m_workerwork += nanos;
}
#endif
while (ok() && m_queue.size() < m_low) {
m_worker_waiting = true;
m_workers_waiting++;
if (m_queue.empty())
pthread_cond_broadcast(&m_cond);
if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
// !ok is a normal condition when shutting down
if (ok())
LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
m_name.c_str()));
pthread_mutex_unlock(&m_mutex);
m_worker_waiting = false;
m_workers_waiting--;
return false;
}
m_worker_waiting = false;
m_workers_waiting--;
}
++m_jobcnt;
m_lenacc += m_size;
#ifdef WORKQUEUE_TIMING
struct timespec aftersleep;
clock_gettime(CLOCK_MONOTONIC, &aftersleep);
m_workerwait += nanodiff(beforesleep, aftersleep);
it = m_worker_threads.find(me);
if (it != m_worker_threads.end())
it->second.wstart = aftersleep;
#endif
++m_jobcnt;
*tp = m_queue.front();
m_queue.pop();
--m_size;
pthread_cond_broadcast(&m_cond);
// No reason to wake up more than one client thread
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_mutex);
return true;
}
/** Take note of the worker exit. This would normally happen after an
unrecoverable error */
/** Advertise exit and abort queue. Called from worker
* This would normally happen after an unrecoverable error, or when
* the queue is terminated by the client. Workers never exit normally,
* except when the queue is shut down (at which point m_ok is set to false
* by the shutdown code anyway). The thread must return/exit immediately
* after calling this
*/
void workerExit()
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
if (pthread_mutex_lock(&m_mutex) != 0)
return;
m_workers_exited++;
m_ok = false;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
}
/** Debug only: as the size is returned while the queue is unlocked, there
* is no warranty on its consistency. Not that we use the member size, not
* the container size() call which would need locking.
/** Return current queue size. Debug only.
*
* As the size is returned while the queue is unlocked, there
* is no warranty on its consistency. Not that we use the member
* size, not the container size() call which would need locking.
*/
size_t size() {return m_size;}
size_t size()
{
return m_size;
}
private:
bool ok() {return m_ok && m_worker_up;}
bool ok()
{
return m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
}
long long nanodiff(const struct timespec& older,
const struct timespec& newer)
{
return (newer.tv_sec - older.tv_sec) * 1000000000LL
+ newer.tv_nsec - older.tv_nsec;
}
string m_name;
size_t m_high;
size_t m_low;
size_t m_size;
bool m_worker_up;
bool m_worker_waiting;
/* Worker threads currently waiting for a job */
unsigned int m_workers_waiting;
unsigned int m_workers_exited;
/* Stats */
int m_jobcnt;
int m_lenacc;
long long m_clientwait;
long long m_workerwait;
long long m_workerwork;
pthread_t m_worker_thread;
unordered_map<pthread_t, WQTData> m_worker_threads;
queue<T> m_queue;
pthread_cond_t m_cond;
pthread_mutex_t m_mutex;

View file

@ -36,14 +36,8 @@
topmost section may also exist in older versions.</i></p>
<h2><a name="b_latest">recoll 1.18.002</a></h2>
<h2><a name="b_latest">recoll 1.18.003</a></h2>
<ul>
<li>It seems that a click in the snippets window can crash
recoll the very first time it is used. I could never reproduce
this on later runs and it is not known what causes the
problem. Just restart the application, and things should stay
up the next times.</li>
<li>On systems such as Debian Stable which use Evince version
2.x (not 3.x) as PDF viewer, the default "Open" command for
PDF files will not work. You need to edit the command:
@ -77,8 +71,18 @@
<h2><a name="b_1_18_0">recoll 1.18.0</a></h2>
<ul>
<li>Thumbnails are not found on newer desktops (e.g. Ubuntu
Quantal) because of a change in the freedesktop.org
"standard".</li>
<li>A bug in extracting search term from click data in the
snippet window results in passing an incorrect term to the
viewer. Only affects non-ascii terms.</li>
<li>Using the snippets window can sometimes crash the
GUI.</li>
<li>Tilde expansion is not properly performed for the
"beaglequeuedir" parameter. This only affects people who
develop scripts over the queue feature.</li>
<li>The missing filter recording code is broken.</li>
<li>Opening embedded documents from the Unity Lens does not
work.</li>
</ul>

View file

@ -1,6 +1,6 @@
#!/bin/sh
set -x
docdir=/home/dockes/projets/fulltext/recoll/src/doc/user/
docdir=/home/dockes/projets/fulltext/17-MAINT/src/doc/user/
#docdir=/Users/dockes/projets/fulltext/recoll/src/doc/user/
#(cd $docdir;make) || exit 1

View file

@ -40,20 +40,13 @@
(and understand english, which can probably be assumed, you
being reading this), you can take a little time to translate
the GUI messages file.</p>
<p>The newest versions of the message files follow. There
is an empty one (the xx thing), the others are partially
<p>The newest versions of the message files follow can be found
in <a href="translations">this directory</a>. There
is an empty one (the xx one), the others are partially
translated, just needing an update for the new messages.<p>
<p>Updating the files can easily be done with
the <span class="application">Qt Linguist</span>. Contact me
for more directions if needed.</p>
<ul>
<li>Blank: <a href="translations/recoll_xx.ts">recoll_xx.ts</a></li>
<li>German: <a href="translations/recoll_de.ts">recoll_de.ts</a></li>
<li>Italian: <a href="translations/recoll_it.ts">recoll_it.ts</a></li>
<li>Russian: <a href="translations/recoll_ru.ts">recoll_ru.ts</a></li>
<li>Turkish: <a href="translations/recoll_tr.ts">recoll_tr.ts</a></li>
<li>Ukrainian: <a href="translations/recoll_uk.ts">recoll_uk.ts</a></li>
</ul>
<h1><a name="development">Development</a></h1>
@ -67,7 +60,9 @@
tracking system</a>, these are the general areas where help or
ideas are particularly welcome:</p>
<ul>
<li>A better GUI design (both the ergonomy and the appearance).</li>
<li>A better GUI design (both the ergonomy and the
appearance). Adding missing shortcuts or fixing the menu
accelerators for exemple is easy and useful.</li>
<li>More support for the more advanced <span class=
"application">Xapian</span> concepts like relevance
@ -95,90 +90,10 @@
<p>Reporting crashes is very useful. It can help others, and it
can get your own problem to be solved.</p>
<p>All reports are useful. But, in order to maximize usefulness,
a crash report should include a so-called stack trace, something
that indicates what the program was doing when it
crashed. Getting a useful stack trace is not very difficult,
but it may need a little work on your part (which
will then enable me do my part of the work).</p>
<p>If your distribution includes a separate package for Recoll
debugging symbols, it probably also has a page on its web site
explaining how to use them to get a stack trace. You should
follow these instructions. If there is no debugging package,
you should follow the instructions below. A little
familiarity with the command line will be necessary.</p>
<dl><dt>Compiling and installing a debugging version</dt>
<dd>
<ul>
<li>Obtain the recoll source for the version you are using
(<a
href="http://www.recoll.org/download.html">www.recoll.org</a>),
and extract the source tree.</li>
<li>Follow the instructions for
<a
href="http://www.lesbonscomptes.com/recoll/usermanual/index.html#RCL.INSTALL.BUILDING">
building Recoll from source</a> with the following
modifications:
<ul>
<li>Before running <tt>configure</tt>, edit
the <tt>mk/localdefs.in</tt> file and remove the <tt>-O2</tt>
option(s). </li>
<li>When running <tt>configure</tt>, specify the
standard installation location for your system as a prefix
(to avoid ending up with two installed versions, which
would almost certainly end in confusion). On Linux this
would typically be:
<br><tt>configure --prefix=/usr</tt>.
</li> <li>When installing, arrange for the installed
executables not to be stripped of debugging symbols by
specifying a value for the STRIP environment variable
(ie: <tt>echo</tt> or <tt>ls</tt>): <br><tt>sudo make
install STRIP=ls</tt>
</li>
</ul>
</ul></dd>
<dt>Getting a core dump</dt>
<dd>You will need to run the operation that caused the crash
inside a writable directory, and tell the system that you
accept core dumps. The commands need to be run in a shell
inside a terminal window. Ie:
<pre><tt>
cd
ulimit -c unlimited
recoll #(or recollindex or whatever you want to run).
</tt></pre>
Hopefully, you will succeed in getting the command to crash,
and you will get a core file.
</dd>
<dt>Using gdb to get a stack trace</dt>
<dd>
<ul>
<li>Install <tt>gdb</tt> if it is not already on the system.</li>
<li>Run <tt>gdb</tt> on the command that crashed and the
core file (depending on the system, the core file may be
named "core" or something else, like recollindex.core, or
core.pid), ie:
<br><tt>gdb /usr/bin/recollindex core</tt>
</li>
<li>Inside <tt>gdb</tt>, you need to use different
commands to get a stack trace for <tt>recoll</tt>
and <tt>recollindex</tt>. For <tt>recollindex</tt> you
can use the <tt>bt</tt> command. For <tt>recoll</tt>
use: <br><tt>thread&nbsp;apply&nbsp;all&nbsp;bt&nbsp;full</tt>
</li>
<li>Copy/paste the output to your report email :), and
quit <tt>gdb</tt> ("q").</li>
</ul>
</dd>
</dl>
<p>You will find help and information about producing a useful
problem report on this
<a href="https://bitbucket.org/medoc/recoll/wiki/ProblemSolvingData">
Recoll wiki page</a>.</p>
</div>
</body>

View file

@ -317,9 +317,9 @@ application/x-tar = execm rcltar
<h2><a name="other">Other features</a></h2>
<ul>
<li>Can use <b>Beagle</b> browser plug-ins to index web
<li>Can use a Firefox extension to index visited Web pages
history. See <a href=
"http://bitbucket.org/medoc/recoll/wiki/IndexBeagleWeb">the
"http://bitbucket.org/medoc/recoll/wiki/IndexWebHistory">the
Wiki</a> for more detail.</li>
<li>Processes all email attachments, and more generally any

View file

@ -84,13 +84,24 @@
<h2>News</h2>
<div class="news">
<ul>
<li>2012-10-25: the source for <a href="recoll-1.18.002.tar.gz">
recoll 1.18.002</a> is available, and this is a call to
volunteers to test it. There are binary
packages for Ubuntu and Mint Linux users, and I can build
others. See this
<li>2012-10-30: Recoll has a brand new Firefox extension for
indexing visited Web pages. This is very similar to the old
Beagle plugin, but slightly better integrated (no-fuss
installation), and with a new lease on life. See
<a href="https://sourceforge.net/projects/recollfirefox/">
Dave King's project page on sourceforge</a> and the
<a href="https://bitbucket.org/medoc/recoll/wiki/IndexWebHistory">
page about Recoll (trivial) configuration</a>.</li>
<li>2012-10-30: the source for <a href="recoll-1.18.003.tar.gz">
recoll 1.18.003</a> is available, and this is a call to
volunteers to test it. There are binary packages on the recoll
"experimental" PPA for Ubuntu and Mint Linux users, and I can
build others. See this
<a href="http://www.freelists.org/post/recoll-user/recoll-1180-prerelease">
message</a> for more information.</li>
message</a> for more information. Mini-release 003 fixes a GUI
crash and a few other issues in 002.</li>
<li>2012-10-25: a problem with a simple workaround has caused
several reported <span class="important">recollindex

View file

@ -46,17 +46,16 @@
indexes though.</p>
<p>Case/diacritics sensitivity is off by default for this
release. It can be turned on <em>only</em> by editing
recoll.conf (see the manual). If you do so, you must reset the
index.</p>
release. It can be turned on <em>only</em> by editing recoll.conf
(see the manual). If you do so, you must then reset the index.</p>
<p>Always reset the index if installing over an older version (1.16
and older). The simplest way to do this is to quit all recoll
programs and just delete the index directory (<span
class="literal">rm&nbsp;-rf&nbsp;~/.recoll/xapiandb</span>), then
start recoll or recollindex. <span
class="literal">recollindex&nbsp;-z</span> will do the same in
most, but not all, cases.</p>
<p>Always reset the index if installing over an even older
version (1.16 and older). The simplest way to do this is to
quit all recoll programs and just delete the index directory
(<span class="literal">rm&nbsp;-rf&nbsp;~/.recoll/xapiandb</span>),
then start recoll or
recollindex. <span class="literal">recollindex&nbsp;-z</span>
will do the same in most, but not all, cases.</p>
<p>The subdirectories of xapiandb which were previously used to
store the stem expansion database (stem_english,
@ -110,18 +109,19 @@
documents, and the <b>evince</b> viewer.</li>
<li>Recoll can now also pass a search string to the native
application.</li>
application (again, works with <b>evince</b>).</li>
<li>There is a list of mime types that should be opened with
<li>There is a list of MIME types that should be opened with
the locally configured application even when <em>Use
Desktop Preferences</em> is checked. This will permit, for
example, using evince for its page access capabilities on
PDF files, while letting the desktop handle all the other
mime types. The list is not empty by default, it contains PDF,
Postscript and DVI, so you may want to reset it after
installation if you want to keep the previous behaviour
(losing the page number functionality). This can be done
from the <em>Preferences->Gui Configuration</em> menu.</li>
example, using <b>evince</b> for its page access
capabilities on PDF files, while letting the desktop handle
all the other mime types. The list is not empty by default,
it contains PDF, Postscript and DVI, so you may want to
reset it after installation if you want to keep the previous
behaviour (losing the page number functionality). This can
be done from the <em>Preferences->Gui Configuration</em>
menu.</li>
<li>The GUI result list has a new "snippets" window for
documents with page numbers, which let the user choose a
@ -129,7 +129,8 @@
<li>The advanced search screen now has a history
function. While the focus is in this window, you
can walk the history of searches using the up and down arrows.</li>
can walk the history of searches using the up and down
arrows.</li>
<li>We now allow multiple directory specifications in the query
language, as in: <i>dir:/home/me -dir:tmp</i></li>
@ -149,7 +150,19 @@
<li>When running in an UTF-8 locale, and after failing to decode a
plain text file as UTF-8, indexing will try again using an 8 bit
character set heuristically chosen according to the locale
country code.</li>
language code. This uses the LANG environment variable.</li>
<li>On initial installation (when the <em>~/.recoll</em>
directory does not exist), recoll will install a list of
characters which should not be stripped of diacritics,
according to the detected national language (based on
$LANG). There are currently specific lists for German (don't
strip the umlauts), and Nordic languages (keep the letters
with circle above in addition to the German list). Other
languages currently only have exceptions which result in
decomposing ligatures (fl, fi etc.). You can have a look at
the standard recoll.conf in /usr/share/recoll/examples for
more information.</li>
<li>A new configuration variable, <tt>maxmemberkbs</tt>, has been
implemented to limit the size of archive members we process. This