From a11c6965543d664b1de20503eb02e180a726008c Mon Sep 17 00:00:00 2001
From: Jean-Francois Dockes
Date: Thu, 1 Nov 2012 11:19:48 +0100
Subject: [PATCH] 1st parallel multithreaded version of indexing which can do
my home without crashing... Let's checkpoint
---
src/VERSION | 2 +-
src/configure | 31 +--
src/configure.ac | 14 +-
src/index/Makefile | 2 +-
src/index/beaglequeue.cpp | 8 +-
src/index/fsindexer.cpp | 193 ++++++++++++++---
src/index/fsindexer.h | 44 ++--
src/index/recollindex.cpp | 5 +-
src/internfile/internfile.cpp | 1 +
src/internfile/mimehandler.cpp | 82 +++++---
src/mk/Linux | 1 +
src/mk/localdefs.in | 1 +
src/rcldb/rcldb.cpp | 122 ++++++-----
src/rcldb/rcldb.h | 7 +-
src/rcldb/rcldb_p.h | 19 +-
src/sampleconf/recoll.conf.in | 17 +-
src/utils/workqueue.h | 364 +++++++++++++++++++++++----------
website/BUGS.html | 26 ++-
website/copydocs | 2 +-
website/devel.html | 105 +---------
website/features.html | 4 +-
website/index.html.en | 23 ++-
website/release-1.18.html | 55 +++--
23 files changed, 697 insertions(+), 431 deletions(-)
diff --git a/src/VERSION b/src/VERSION
index 3b4c1669..815d5ca0 100644
--- a/src/VERSION
+++ b/src/VERSION
@@ -1 +1 @@
-1.18.002
+1.19.0
diff --git a/src/configure b/src/configure
index 894af913..378915f1 100755
--- a/src/configure
+++ b/src/configure
@@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
-# Generated by GNU Autoconf 2.69 for Recoll 1.18.0.
+# Generated by GNU Autoconf 2.69 for Recoll 1.19.0.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@@ -577,8 +577,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Recoll'
PACKAGE_TARNAME='recoll'
-PACKAGE_VERSION='1.18.0'
-PACKAGE_STRING='Recoll 1.18.0'
+PACKAGE_VERSION='1.19.0'
+PACKAGE_STRING='Recoll 1.19.0'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@@ -623,6 +623,7 @@ ac_subst_vars='LTLIBOBJS
LIBOBJS
RCLVERSION
NOPYTHON
+NOTHREADS
NOPIC
LIBQZEITGEIST
QMAKE_DISABLE_ZEITGEIST
@@ -1278,7 +1279,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
-\`configure' configures Recoll 1.18.0 to adapt to many kinds of systems.
+\`configure' configures Recoll 1.19.0 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@@ -1343,7 +1344,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
- short | recursive ) echo "Configuration of Recoll 1.18.0:";;
+ short | recursive ) echo "Configuration of Recoll 1.19.0:";;
esac
cat <<\_ACEOF
@@ -1476,7 +1477,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
-Recoll configure 1.18.0
+Recoll configure 1.19.0
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@@ -2029,7 +2030,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
-It was created by Recoll $as_me 1.18.0, which was
+It was created by Recoll $as_me 1.19.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@@ -4357,11 +4358,9 @@ $as_echo "#define RCL_USE_XATTR 1" >>confdefs.h
fi
# Enable use of threads in the indexing pipeline.
-# Threads are used in bucket-brigade fashion for the processing steps
-# (reading file - text splitting - indexing proper). The performance
-# increase can be significant, but this is disabled by default as we
-# usually care little about indexing absolute performance (more about
-# impact on usability and total resources used).
+# This is disabled by default as we usually care little about indexing
+# absolute performance (more about impact on usability and total
+# resources used).
# Check whether --enable-idxthreads was given.
if test "${enable_idxthreads+set}" = set; then :
enableval=$enable_idxthreads; idxthreadsEnabled=$enableval
@@ -4374,6 +4373,9 @@ if test X$idxthreadsEnabled = Xyes ; then
$as_echo "#define IDX_THREADS 1" >>confdefs.h
+ NOTHREADS=""
+else
+ NOTHREADS="#"
fi
# Enable CamelCase word splitting. This is optional because it causes
@@ -5743,6 +5745,7 @@ RCLVERSION=`cat VERSION`
+
# All object files depend on localdefs which has the cc flags. Avoid
@@ -6274,7 +6277,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
-This file was extended by Recoll $as_me 1.18.0, which was
+This file was extended by Recoll $as_me 1.19.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@@ -6336,7 +6339,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
-Recoll config.status 1.18.0
+Recoll config.status 1.19.0
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"
diff --git a/src/configure.ac b/src/configure.ac
index 41c45bb2..9a82d65f 100644
--- a/src/configure.ac
+++ b/src/configure.ac
@@ -177,12 +177,10 @@ if test X$xattrEnabled = Xyes ; then
AC_DEFINE(RCL_USE_XATTR, 1, [Use file extended attributes])
fi
-# Enable use of threads in the indexing pipeline. Threads are used in
-# bucket-brigade fashion for the processing steps (reading file - text
-# splitting - indexing proper). The performance increase is small in normal
-# case (might be a bit more significant if you're using an SSD), and this
-# is disabled by default as we usually care little about indexing absolute
-# performance (more about impact on usability and total resources used).
+# Enable use of threads in the indexing pipeline.
+# This is disabled by default as we usually care little about indexing
+# absolute performance (more about impact on usability and total
+# resources used).
AC_ARG_ENABLE(idxthreads,
AC_HELP_STRING([--enable-idxthreads],
[Enable multithread indexing. This can somewhat boost indexing
@@ -191,6 +189,9 @@ AC_ARG_ENABLE(idxthreads,
if test X$idxthreadsEnabled = Xyes ; then
AC_DEFINE(IDX_THREADS, 1, [Use multiple threads for indexing])
+ NOTHREADS=""
+else
+ NOTHREADS="#"
fi
# Enable CamelCase word splitting. This is optional because it causes
@@ -557,6 +558,7 @@ AC_SUBST(QMAKE_ENABLE_ZEITGEIST)
AC_SUBST(QMAKE_DISABLE_ZEITGEIST)
AC_SUBST(LIBQZEITGEIST)
AC_SUBST(NOPIC)
+AC_SUBST(NOTHREADS)
AC_SUBST(NOPYTHON)
AC_SUBST(RCLVERSION)
diff --git a/src/index/Makefile b/src/index/Makefile
index 04c983d9..e334521a 100644
--- a/src/index/Makefile
+++ b/src/index/Makefile
@@ -13,7 +13,7 @@ recollindex : $(RECOLLINDEX_OBJS)
$(LIBICONV) $(BDYNAMIC) \
$(LIBFAM) \
$(X_LIBS) $(X_PRE_LIBS) $(X_LIBX11) $(X_EXTRA_LIBS) \
- $(LIBSYS)
+ $(LIBSYS) $(LIBTHREADS)
recollindex.o : recollindex.cpp
$(CXX) $(ALL_CXXFLAGS) -c -o recollindex.o $<
rclmonrcv.o : rclmonrcv.cpp
diff --git a/src/index/beaglequeue.cpp b/src/index/beaglequeue.cpp
index 80c721f8..b80e7854 100644
--- a/src/index/beaglequeue.cpp
+++ b/src/index/beaglequeue.cpp
@@ -218,7 +218,7 @@ bool BeagleQueueIndexer::indexFromCache(const string& udi)
if (!stringlowercmp("bookmark", hittype)) {
// Just index the dotdoc
dotdoc.meta[Rcl::Doc::keybcknd] = "BGL";
- return m_db->addOrUpdate(udi, cstr_null, dotdoc);
+ return m_db->addOrUpdate(m_config, udi, cstr_null, dotdoc);
} else if (stringlowercmp("webhistory", dotdoc.meta[Rcl::Doc::keybght]) ||
(dotdoc.mimetype.compare("text/html") &&
dotdoc.mimetype.compare(cstr_textplain))) {
@@ -248,7 +248,7 @@ bool BeagleQueueIndexer::indexFromCache(const string& udi)
doc.pcbytes = dotdoc.pcbytes;
doc.sig.clear();
doc.meta[Rcl::Doc::keybcknd] = "BGL";
- return m_db->addOrUpdate(udi, cstr_null, doc);
+ return m_db->addOrUpdate(m_config, udi, cstr_null, doc);
}
}
@@ -414,7 +414,7 @@ BeagleQueueIndexer::processone(const string &path,
dotdoc.sig.clear();
dotdoc.meta[Rcl::Doc::keybcknd] = "BGL";
- if (!m_db->addOrUpdate(udi, cstr_null, dotdoc))
+ if (!m_db->addOrUpdate(m_config, udi, cstr_null, dotdoc))
return FsTreeWalker::FtwError;
} else if (stringlowercmp("webhistory", dotdoc.meta[Rcl::Doc::keybght]) ||
@@ -461,7 +461,7 @@ BeagleQueueIndexer::processone(const string &path,
doc.url = dotdoc.url;
doc.meta[Rcl::Doc::keybcknd] = "BGL";
- if (!m_db->addOrUpdate(udi, cstr_null, doc))
+ if (!m_db->addOrUpdate(m_config, udi, cstr_null, doc))
return FsTreeWalker::FtwError;
}
diff --git a/src/index/fsindexer.cpp b/src/index/fsindexer.cpp
index 4a40644c..f2345697 100644
--- a/src/index/fsindexer.cpp
+++ b/src/index/fsindexer.cpp
@@ -53,32 +53,78 @@
#define RCL_STTIME st_mtime
#endif // RCL_USE_XATTR
-#ifndef NO_NAMESPACES
using namespace std;
-#endif /* NO_NAMESPACES */
-#ifndef deleteZ
-#define deleteZ(X) {delete X;X = 0;}
+#ifdef IDX_THREADS
+class DbUpdTask {
+public:
+ DbUpdTask(RclConfig *cnf, const string& u, const string& p,
+ const Rcl::Doc& d)
+ : udi(u), parent_udi(p), doc(d), config(cnf)
+ {}
+ string udi;
+ string parent_udi;
+ Rcl::Doc doc;
+ RclConfig *config;
+};
+extern void *FsIndexerDbUpdWorker(void*);
+
+class InternfileTask {
+public:
+ InternfileTask(const std::string &f, const struct stat *i_stp)
+ : fn(f), statbuf(*i_stp)
+ {}
+ string fn;
+ struct stat statbuf;
+};
+extern void *FsIndexerInternfileWorker(void*);
+#endif // IDX_THREADS
+
+// Thread safe variation of the "missing helpers" storage. Only the
+// addMissing method needs protection, the rest are called from the
+// main thread either before or after the exciting part
+class FSIFIMissingStore : public FIMissingStore {
+#ifdef IDX_THREADS
+ PTMutexInit m_mutex;
#endif
+public:
+ virtual void addMissing(const string& prog, const string& mt)
+ {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
+ FIMissingStore::addMissing(prog, mt);
+ }
+};
FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
- : m_config(cnf), m_db(db), m_updater(updfunc), m_missing(new FIMissingStore)
+ : m_config(cnf), m_db(db), m_updater(updfunc),
+ m_missing(new FSIFIMissingStore)
#ifdef IDX_THREADS
- , m_wqueue(10)
+ , m_iwqueue("Internfile", 2), m_dwqueue("Split", 2)
#endif // IDX_THREADS
{
m_havelocalfields = m_config->hasNameAnywhere("localfields");
#ifdef IDX_THREADS
- if (!m_wqueue.start(FsIndexerIndexWorker, this)) {
+ m_loglevel = DebugLog::getdbl()->getlevel();
+ if (!m_iwqueue.start(4, FsIndexerInternfileWorker, this)) {
+ LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
+ return;
+ }
+ if (!m_dwqueue.start(2, FsIndexerDbUpdWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
return;
}
#endif // IDX_THREADS
}
+
FsIndexer::~FsIndexer() {
#ifdef IDX_THREADS
- void *status = m_wqueue.setTerminateAndWait();
- LOGERR(("FsIndexer: worker status: %ld\n", long(status)));
+ void *status = m_iwqueue.setTerminateAndWait();
+ LOGINFO(("FsIndexer: internfile wrker status: %ld (1->ok)\n",
+ long(status)));
+ status = m_dwqueue.setTerminateAndWait();
+ LOGINFO(("FsIndexer: dbupd worker status: %ld (1->ok)\n", long(status)));
#endif // IDX_THREADS
delete m_missing;
}
@@ -98,10 +144,14 @@ bool FsIndexer::init()
// Recursively index each directory in the topdirs:
bool FsIndexer::index()
{
+ Chrono chron;
if (!init())
return false;
if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
m_updater->status.reset();
m_updater->status.dbtotdocs = m_db->docCnt();
}
@@ -138,15 +188,21 @@ bool FsIndexer::index()
}
#ifdef IDX_THREADS
- m_wqueue.waitIdle();
+ m_iwqueue.waitIdle();
+ m_dwqueue.waitIdle();
+ m_db->waitUpdIdle();
#endif // IDX_THREADS
- string missing;
- FileInterner::getMissingDescription(m_missing, missing);
- if (!missing.empty()) {
- LOGINFO(("FsIndexer::index missing helper program(s):\n%s\n",
- missing.c_str()));
+
+ if (m_missing) {
+ string missing;
+ m_missing->getMissingDescription(missing);
+ if (!missing.empty()) {
+ LOGINFO(("FsIndexer::index missing helper program(s):\n%s\n",
+ missing.c_str()));
+ }
+ m_config->storeMissingHelperDesc(missing);
}
- m_config->storeMissingHelperDesc(missing);
+ LOGERR(("fsindexer index time: %d mS\n", chron.ms()));
return true;
}
@@ -303,25 +359,54 @@ void FsIndexer::makesig(const struct stat *stp, string& out)
}
#ifdef IDX_THREADS
-void *FsIndexerIndexWorker(void * fsp)
+void *FsIndexerDbUpdWorker(void * fsp)
{
FsIndexer *fip = (FsIndexer*)fsp;
- WorkQueue *tqp = &fip->m_wqueue;
- IndexingTask *tsk;
+ WorkQueue *tqp = &fip->m_dwqueue;
+ DebugLog::getdbl()->setloglevel(fip->m_loglevel);
+
+ DbUpdTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
- LOGDEB(("FsIndexerIndexWorker: got task, ql %d\n", int(tqp->size())));
- if (!fip->m_db->addOrUpdate(tsk->udi, tsk->parent_udi, tsk->doc)) {
- tqp->setTerminateAndWait();
+ LOGDEB(("FsIndexerDbUpdWorker: got task, ql %d\n", int(tqp->size())));
+ if (!fip->m_db->addOrUpdate(tsk->config, tsk->udi, tsk->parent_udi,
+ tsk->doc)) {
+ LOGERR(("FsIndexerDbUpdWorker: addOrUpdate failed\n"));
tqp->workerExit();
return (void*)0;
}
delete tsk;
}
}
+
+void *FsIndexerInternfileWorker(void * fsp)
+{
+ FsIndexer *fip = (FsIndexer*)fsp;
+ WorkQueue *tqp = &fip->m_iwqueue;
+ DebugLog::getdbl()->setloglevel(fip->m_loglevel);
+ TempDir tmpdir;
+ RclConfig *myconf = new RclConfig(*(fip->m_config));
+
+ InternfileTask *tsk;
+ for (;;) {
+ if (!tqp->take(&tsk)) {
+ tqp->workerExit();
+ return (void*)1;
+ }
+ LOGDEB1(("FsIndexerInternfileWorker: fn %s\n", tsk->fn.c_str()));
+ if (fip->processonefile(myconf, tmpdir, tsk->fn, &tsk->statbuf) !=
+ FsTreeWalker::FtwOk) {
+ LOGERR(("FsIndexerInternfileWorker: processone failed\n"));
+ tqp->workerExit();
+ return (void*)0;
+ }
+ LOGDEB1(("FsIndexerInternfileWorker: done fn %s\n", tsk->fn.c_str()));
+ delete tsk;
+ }
+}
#endif // IDX_THREADS
/// This method gets called for every file and directory found by the
@@ -339,8 +424,13 @@ FsTreeWalker::Status
FsIndexer::processone(const std::string &fn, const struct stat *stp,
FsTreeWalker::CbFlag flg)
{
- if (m_updater && !m_updater->update()) {
- return FsTreeWalker::FtwStop;
+ if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
+ if (!m_updater->update()) {
+ return FsTreeWalker::FtwStop;
+ }
}
// If we're changing directories, possibly adjust parameters (set
@@ -364,6 +454,26 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
return FsTreeWalker::FtwOk;
}
+#ifdef IDX_THREADS
+ InternfileTask *tp = new InternfileTask(fn, stp);
+ if (!m_iwqueue.put(tp))
+ return FsTreeWalker::FtwError;
+ return FsTreeWalker::FtwOk;
+#else
+ return processonefile(m_config, m_tmpdir, fn, stp);
+#endif // IDX_THREADS
+}
+
+
+FsTreeWalker::Status
+FsIndexer::processonefile(RclConfig *config, TempDir& tmpdir,
+ const std::string &fn, const struct stat *stp)
+{
+
+#ifdef IDX_THREADS
+ config->setKeyDir(path_getfather(fn));
+#endif
+
////////////////////
// Check db up to date ? Doing this before file type
// identification means that, if usesystemfilecommand is switched
@@ -379,9 +489,20 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
makesig(stp, sig);
string udi;
make_udi(fn, cstr_null, udi);
- if (!m_db->needUpdate(udi, sig)) {
+ bool needupdate;
+ {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
+ needupdate = m_db->needUpdate(udi, sig);
+ }
+
+ if (!needupdate) {
LOGDEB0(("processone: up to date: %s\n", fn.c_str()));
if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
// Status bar update, abort request etc.
m_updater->status.fn = fn;
++(m_updater->status.filesdone);
@@ -395,7 +516,7 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
LOGDEB0(("processone: processing: [%s] %s\n",
displayableBytes(stp->st_size).c_str(), fn.c_str()));
- FileInterner interner(fn, stp, m_config, m_tmpdir, FileInterner::FIF_none);
+ FileInterner interner(fn, stp, config, tmpdir, FileInterner::FIF_none);
if (!interner.ok()) {
// no indexing whatsoever in this case. This typically means that
// indexallfilenames is not set
@@ -482,17 +603,23 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
make_udi(fn, doc.ipath, udi);
#ifdef IDX_THREADS
- IndexingTask *tp = new IndexingTask(udi, doc.ipath.empty() ?
- cstr_null : parent_udi, doc);
- if (!m_wqueue.put(tp))
+ DbUpdTask *tp = new DbUpdTask(config, udi, doc.ipath.empty() ?
+ cstr_null : parent_udi, doc);
+ if (!m_dwqueue.put(tp)) {
+ LOGERR(("processonefile: wqueue.put failed\n"));
return FsTreeWalker::FtwError;
+ }
#else
- if (!m_db->addOrUpdate(udi, doc.ipath.empty() ? cstr_null : parent_udi, doc))
+ if (!m_db->addOrUpdate(config, udi, doc.ipath.empty() ? cstr_null :
+ parent_udi, doc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
// Tell what we are doing and check for interrupt request
if (m_updater) {
+#ifdef IDX_THREADS
+ PTMutexLocker locker(m_mutex);
+#endif
++(m_updater->status.docsdone);
if (m_updater->status.dbtotdocs < m_updater->status.docsdone)
m_updater->status.dbtotdocs = m_updater->status.docsdone;
@@ -522,11 +649,11 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
// Document signature for up to date checks.
makesig(stp, fileDoc.sig);
#ifdef IDX_THREADS
- IndexingTask *tp = new IndexingTask(parent_udi, cstr_null, fileDoc);
- if (!m_wqueue.put(tp))
+ DbUpdTask *tp = new DbUpdTask(config, parent_udi, cstr_null, fileDoc);
+ if (!m_dwqueue.put(tp))
return FsTreeWalker::FtwError;
#else
- if (!m_db->addOrUpdate(parent_udi, cstr_null, fileDoc))
+ if (!m_db->addOrUpdate(config, parent_udi, cstr_null, fileDoc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
}
diff --git a/src/index/fsindexer.h b/src/index/fsindexer.h
index 97602ed7..6463535d 100644
--- a/src/index/fsindexer.h
+++ b/src/index/fsindexer.h
@@ -17,14 +17,14 @@
#ifndef _fsindexer_h_included_
#define _fsindexer_h_included_
+#include
+
#include
-#ifndef NO_NAMESPACES
-using std::list;
-#endif
#include "indexer.h"
#include "fstreewalk.h"
#ifdef IDX_THREADS
+#include "ptmutex.h"
#include "workqueue.h"
#endif // IDX_THREADS
@@ -32,18 +32,8 @@ class DbIxStatusUpdater;
class FIMissingStore;
struct stat;
-#ifdef IDX_THREADS
-class IndexingTask {
-public:
- IndexingTask(const string& u, const string& p, const Rcl::Doc& d)
- :udi(u), parent_udi(p), doc(d)
- {}
- string udi;
- string parent_udi;
- Rcl::Doc doc;
-};
-extern void *FsIndexerIndexWorker(void*);
-#endif // IDX_THREADS
+class DbUpdTask;
+class InternfileTask;
/** Index selected parts of the file system
@@ -75,11 +65,11 @@ class FsIndexer : public FsTreeWalkerCB {
bool index();
/** Index a list of files. No db cleaning or stemdb updating */
- bool indexFiles(list &files, ConfIndexer::IxFlag f =
+ bool indexFiles(std::list &files, ConfIndexer::IxFlag f =
ConfIndexer::IxFNone);
/** Purge a list of files. */
- bool purgeFiles(list &files);
+ bool purgeFiles(std::list &files);
/** Tree walker callback method */
FsTreeWalker::Status
@@ -92,12 +82,13 @@ class FsIndexer : public FsTreeWalkerCB {
FsTreeWalker m_walker;
RclConfig *m_config;
Rcl::Db *m_db;
- TempDir m_tmpdir;
+ TempDir m_tmpdir;
string m_reason;
DbIxStatusUpdater *m_updater;
- list m_tdl;
+ std::list m_tdl;
FIMissingStore *m_missing;
+
// The configuration can set attribute fields to be inherited by
// all files in a file system area. Ie: set "rclaptg = thunderbird"
// inside ~/.thunderbird. The boolean is set at init to avoid
@@ -106,14 +97,25 @@ class FsIndexer : public FsTreeWalkerCB {
map m_localfields;
#ifdef IDX_THREADS
- friend void *FsIndexerIndexWorker(void*);
- WorkQueue m_wqueue;
+ // Used to protect all ops from processonefile to class members:
+ // m_missing, m_db. It would be possible to be more fine-grained
+ // but probably not worth it. m_config and m_updater have separate
+ // protections
+ PTMutexInit m_mutex;
+ friend void *FsIndexerDbUpdWorker(void*);
+ friend void *FsIndexerInternfileWorker(void*);
+ int m_loglevel;
+ WorkQueue m_iwqueue;
+ WorkQueue m_dwqueue;
#endif // IDX_THREADS
bool init();
void localfieldsfromconf();
void setlocalfields(Rcl::Doc& doc);
string getDbDir() {return m_config->getDbDir();}
+ FsTreeWalker::Status
+ processonefile(RclConfig *config, TempDir& tmpdir, const string &fn,
+ const struct stat *);
};
#endif /* _fsindexer_h_included_ */
diff --git a/src/index/recollindex.cpp b/src/index/recollindex.cpp
index 2c9d03f6..b823495f 100644
--- a/src/index/recollindex.cpp
+++ b/src/index/recollindex.cpp
@@ -112,7 +112,10 @@ class MyUpdater : public DbIxStatusUpdater {
fprintf(fp, "filesdone = %d\n", status.filesdone);
fprintf(fp, "dbtotdocs = %d\n", status.dbtotdocs);
fprintf(fp, "fn = %s\n", status.fn.c_str());
- ftruncate(m_fd, off_t(ftell(fp)));
+ if (ftruncate(m_fd, off_t(ftell(fp))) < 0) {
+ // ? kill compiler warning about ignoring ftruncate return
+ LOGDEB(("Status update: ftruncate failed\n"));
+ }
// Flush data and closes fd1. m_fd still valid
fclose(fp);
}
diff --git a/src/internfile/internfile.cpp b/src/internfile/internfile.cpp
index 4bcce3f9..748f5f96 100644
--- a/src/internfile/internfile.cpp
+++ b/src/internfile/internfile.cpp
@@ -325,6 +325,7 @@ void FileInterner::init(const string &f, const struct stat *stp, RclConfig *cnf,
df->set_docsize(docsize);
if (!df->set_document_file(m_fn)) {
+ delete df;
LOGERR(("FileInterner:: error converting %s\n", m_fn.c_str()));
return;
}
diff --git a/src/internfile/mimehandler.cpp b/src/internfile/mimehandler.cpp
index 55b2f498..cdc0304b 100644
--- a/src/internfile/mimehandler.cpp
+++ b/src/internfile/mimehandler.cpp
@@ -22,6 +22,7 @@
#include
#include
#include
+#include
using namespace std;
#include "cstr.h"
@@ -44,23 +45,35 @@ using namespace std;
// (think email attachment in email message: 2 rfc822 handlers are
// needed simulteanously)
static multimap o_handlers;
+static list::iterator> o_hlru;
+typedef list::iterator>::iterator hlruit_tp;
+
static PTMutexInit o_handlers_mutex;
-static const unsigned int max_handlers_cache_size = 300;
+static const unsigned int max_handlers_cache_size = 100;
/* Look for mime handler in pool */
-static Dijon::Filter *getMimeHandlerFromCache(const string& mtype)
+static Dijon::Filter *getMimeHandlerFromCache(const string& key)
{
- LOGDEB0(("getMimeHandlerFromCache: %s\n", mtype.c_str()));
-
PTMutexLocker locker(o_handlers_mutex);
- map::iterator it = o_handlers.find(mtype);
+ LOGDEB(("getMimeHandlerFromCache: %s cache size %u\n",
+ key.c_str(), o_handlers.size()));
+
+ multimap::iterator it = o_handlers.find(key);
if (it != o_handlers.end()) {
Dijon::Filter *h = it->second;
+ hlruit_tp it1 = find(o_hlru.begin(), o_hlru.end(), it);
+ if (it1 != o_hlru.end()) {
+ o_hlru.erase(it1);
+ } else {
+ LOGERR(("getMimeHandlerFromCache: lru position not found\n"));
+ }
o_handlers.erase(it);
- LOGDEB0(("getMimeHandlerFromCache: %s found\n", mtype.c_str()));
+ LOGDEB(("getMimeHandlerFromCache: %s found size %u\n",
+ key.c_str(), o_handlers.size()));
return h;
}
+ LOGDEB(("getMimeHandlerFromCache: %s not found\n", key.c_str()));
return 0;
}
@@ -68,28 +81,40 @@ static Dijon::Filter *getMimeHandlerFromCache(const string& mtype)
void returnMimeHandler(Dijon::Filter *handler)
{
typedef multimap::value_type value_type;
- if (handler) {
- handler->clear();
- PTMutexLocker locker(o_handlers_mutex);
- LOGDEB2(("returnMimeHandler: returning filter for %s cache size %d\n",
- handler->get_mime_type().c_str(), o_handlers.size()));
- // Limit pool size. It's possible for some reason that the
- // handler was not found in the cache by getMimeHandler() and
- // that a new handler is returned every time. We don't want
- // the cache to grow indefinitely. We try to delete an element
- // of the same kind, and if this fails, the first at
- // hand. Note that going oversize *should not* normally
- // happen, we're only being prudent.
- if (o_handlers.size() >= max_handlers_cache_size) {
- map::iterator it =
- o_handlers.find(handler->get_mime_type());
- if (it != o_handlers.end())
- o_handlers.erase(it);
- else
- o_handlers.erase(o_handlers.begin());
+
+ if (handler==0)
+ return;
+ handler->clear();
+
+ PTMutexLocker locker(o_handlers_mutex);
+
+ LOGDEB(("returnMimeHandler: returning filter for %s cache size %d\n",
+ handler->get_mime_type().c_str(), o_handlers.size()));
+
+ // Limit pool size. The pool can grow quite big because there are
+ // many filter types, each of which can be used in several copies
+ // at the same time either because it occurs several times in a
+ // stack (ie mail attachment to mail), or because several threads
+ // are processing the same mime type at the same time.
+ multimap::iterator it;
+ if (o_handlers.size() >= max_handlers_cache_size) {
+ static int once = 1;
+ if (once) {
+ once = 0;
+ for (it = o_handlers.begin(); it != o_handlers.end(); it++) {
+ LOGERR(("Cache full key: %s\n", it->first.c_str()));
+ }
+ LOGERR(("Cache LRU size: %u\n", o_hlru.size()));
+ }
+ if (o_hlru.size() > 0) {
+ it = o_hlru.back();
+ o_hlru.pop_back();
+ delete it->second;
+ o_handlers.erase(it);
}
- o_handlers.insert(value_type(handler->get_mime_type(), handler));
}
+ it = o_handlers.insert(value_type(handler->get_mime_type(), handler));
+ o_hlru.push_front(it);
}
void clearMimeHandlerCache()
@@ -203,7 +228,7 @@ Dijon::Filter *getMimeHandler(const string &mtype, RclConfig *cfg,
bool filtertypes)
{
LOGDEB(("getMimeHandler: mtype [%s] filtertypes %d\n",
- mtype.c_str(), filtertypes));
+ mtype.c_str(), filtertypes));
Dijon::Filter *h = 0;
// Get handler definition for mime type. We do this even if an
@@ -279,7 +304,8 @@ Dijon::Filter *getMimeHandler(const string &mtype, RclConfig *cfg,
{bool indexunknown = false;
cfg->getConfParam("indexallfilenames", &indexunknown);
if (indexunknown) {
- h = new MimeHandlerUnknown(cfg, "application/octet-stream");
+ if ((h = getMimeHandlerFromCache("application/octet-stream")) == 0)
+ h = new MimeHandlerUnknown(cfg, "application/octet-stream");
goto out;
} else {
goto out;
diff --git a/src/mk/Linux b/src/mk/Linux
index aa9a9774..a74ca3e1 100644
--- a/src/mk/Linux
+++ b/src/mk/Linux
@@ -6,3 +6,4 @@ ALL_CXXFLAGS = $(CXXFLAGS) $(COMMONCXXFLAGS) $(LOCALCXXFLAGS) \
-D_GNU_SOURCE
LIBSYS = -lpthread -ldl
+LIBSYSTHREADS = -lrt
diff --git a/src/mk/localdefs.in b/src/mk/localdefs.in
index 48838425..e56c711c 100644
--- a/src/mk/localdefs.in
+++ b/src/mk/localdefs.in
@@ -25,6 +25,7 @@ libdir = @libdir@
RECOLL_DATADIR = ${datadir}/recoll
@NOPIC@PICFLAGS = $(SYSPICFLAGS)
+@NOTHREADS@LIBTHREADS = $(LIBSYSTHREADS)
LOCALCXXFLAGS = -g -O2 -Wall -Wno-unused \
$(INCICONV) $(XAPIANCXXFLAGS) $(X_CFLAGS) \
diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp
index ee83c7f5..0b4c6b48 100644
--- a/src/rcldb/rcldb.cpp
+++ b/src/rcldb/rcldb.cpp
@@ -280,9 +280,12 @@ Db::Db(RclConfig *cfp)
m_config->getConfParam("idxflushmb", &m_flushMb);
}
#ifdef IDX_THREADS
- if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) {
- LOGERR(("Db::Db: Worker start failed\n"));
- return;
+ if (m_ndb) {
+ m_ndb->m_loglevel = DebugLog::getdbl()->getlevel();
+ if (!m_ndb->m_wqueue.start(1, DbUpdWorker, this)) {
+ LOGERR(("Db::Db: Worker start failed\n"));
+ return;
+ }
}
#endif // IDX_THREADS
}
@@ -461,7 +464,7 @@ int Db::docCnt()
if (!m_ndb || !m_ndb->m_isopen)
return -1;
- XAPTRY(res = m_ndb->xdb().get_doccount(), m_ndb->xrdb, m_reason);
+ XAPTRY(res = m_ndb->xrdb.get_doccount(), m_ndb->xrdb, m_reason);
if (!m_reason.empty()) {
LOGERR(("Db::docCnt: got error: %s\n", m_reason.c_str()));
@@ -788,48 +791,22 @@ void *DbUpdWorker(void* vdbp)
{
Db *dbp = (Db *)vdbp;
WorkQueue *tqp = &(dbp->m_ndb->m_wqueue);
- DbUpdTask *tsk;
+ DebugLog::getdbl()->setloglevel(dbp->m_ndb->m_loglevel);
+ DbUpdTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
-
- const char *fnc = tsk->udi.c_str();
- string ermsg;
-
- // Add db entry or update existing entry:
- try {
- Xapian::docid did =
- dbp->m_ndb->xwdb.replace_document(tsk->uniterm,
- tsk->doc);
- if (did < dbp->updated.size()) {
- dbp->updated[did] = true;
- LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
- } else {
- LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
- }
- } XCATCHERROR(ermsg);
-
- if (!ermsg.empty()) {
- LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
- ermsg.erase();
- // FIXME: is this ever actually needed?
- try {
- dbp->m_ndb->xwdb.add_document(tsk->doc);
- LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
- fnc));
- } XCATCHERROR(ermsg);
- if (!ermsg.empty()) {
- LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
- tqp->workerExit();
- return (void*)0;
- }
+ if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm,
+ tsk->doc, tsk->txtlen)) {
+ LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
+ tqp->workerExit();
+ delete tsk;
+ return (void*)0;
}
- dbp->maybeflush(tsk->txtlen);
-
delete tsk;
}
}
@@ -839,26 +816,13 @@ void *DbUpdWorker(void* vdbp)
// the title abstract and body and add special terms for file name,
// date, mime type etc. , create the document data record (more
// metadata), and update database
-bool Db::addOrUpdate(const string &udi, const string &parent_udi,
- Doc &doc)
+bool Db::addOrUpdate(RclConfig *config, const string &udi,
+ const string &parent_udi, Doc &doc)
{
LOGDEB(("Db::add: udi [%s] parent [%s]\n",
udi.c_str(), parent_udi.c_str()));
if (m_ndb == 0)
return false;
- // Check file system full every mbyte of indexed text.
- if (m_maxFsOccupPc > 0 &&
- (m_occFirstCheck || (m_curtxtsz - m_occtxtsz) / MB >= 1)) {
- LOGDEB(("Db::add: checking file system usage\n"));
- int pc;
- m_occFirstCheck = 0;
- if (fsocc(m_basedir, &pc) && pc >= m_maxFsOccupPc) {
- LOGERR(("Db::add: stop indexing: file system "
- "%d%% full > max %d%%\n", pc, m_maxFsOccupPc));
- return false;
- }
- m_occtxtsz = m_curtxtsz;
- }
Xapian::Document newdocument;
@@ -1082,10 +1046,10 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
if (!doc.meta[Doc::keyabs].empty())
RECORD_APPEND(record, Doc::keyabs, doc.meta[Doc::keyabs]);
- const set& stored = m_config->getStoredFields();
+ const set& stored = config->getStoredFields();
for (set::const_iterator it = stored.begin();
it != stored.end(); it++) {
- string nm = m_config->fieldCanon(*it);
+ string nm = config->fieldCanon(*it);
if (!doc.meta[*it].empty()) {
string value =
neutchars(truncate_to_word(doc.meta[*it], 150), cstr_nc);
@@ -1125,16 +1089,42 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
return false;
}
+ return true;
#else
+ return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument,
+ doc.text.length());
+#endif // IDX_THREADS
+}
+
+bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
+ Xapian::Document& newdocument, size_t textlen)
+{
+ // Check file system full every mbyte of indexed text. It's a bit wasteful
+ // to do this after having prepared the document, but it needs to be in
+ // the single-threaded section.
+ if (m_rcldb->m_maxFsOccupPc > 0 &&
+ (m_rcldb->m_occFirstCheck ||
+ (m_rcldb->m_curtxtsz - m_rcldb->m_occtxtsz) / MB >= 1)) {
+ LOGDEB(("Db::add: checking file system usage\n"));
+ int pc;
+ m_rcldb->m_occFirstCheck = 0;
+ if (fsocc(m_rcldb->m_basedir, &pc) && pc >= m_rcldb->m_maxFsOccupPc) {
+ LOGERR(("Db::add: stop indexing: file system "
+ "%d%% full > max %d%%\n", pc, m_rcldb->m_maxFsOccupPc));
+ return false;
+ }
+ m_rcldb->m_occtxtsz = m_rcldb->m_curtxtsz;
+ }
+
const char *fnc = udi.c_str();
string ermsg;
// Add db entry or update existing entry:
try {
Xapian::docid did =
- m_ndb->xwdb.replace_document(uniterm, newdocument);
- if (did < updated.size()) {
- updated[did] = true;
+ xwdb.replace_document(uniterm, newdocument);
+ if (did < m_rcldb->updated.size()) {
+ m_rcldb->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
@@ -1146,7 +1136,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
ermsg.erase();
// FIXME: is this ever actually needed?
try {
- m_ndb->xwdb.add_document(newdocument);
+ xwdb.add_document(newdocument);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
@@ -1157,11 +1147,16 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
}
// Test if we're over the flush threshold (limit memory usage):
- maybeflush(doc.text.length());
-#endif // IDX_THREADS
- return true;
+ return m_rcldb->maybeflush(textlen);
}
+#ifdef IDX_THREADS
+void Db::waitUpdIdle()
+{
+ m_ndb->m_wqueue.waitIdle();
+}
+#endif
+
// Flush when idxflushmbs is reached
bool Db::maybeflush(off_t moretext)
{
@@ -1233,6 +1228,7 @@ bool Db::needUpdate(const string &udi, const string& sig)
// Set the uptodate flag for doc / pseudo doc
if (m_mode != DbRO) {
+#warning we need a lock here !
updated[*docid] = true;
// Set the existence flag for all the subdocs (if any)
@@ -1244,7 +1240,7 @@ bool Db::needUpdate(const string &udi, const string& sig)
for (vector::iterator it = docids.begin();
it != docids.end(); it++) {
if (*it < updated.size()) {
- LOGDEB2(("Db::needUpdate: set flag for docid %d\n", *it));
+ LOGDEB2(("Db::needUpdate: docid %d set\n", *it));
updated[*it] = true;
}
}
diff --git a/src/rcldb/rcldb.h b/src/rcldb/rcldb.h
index 20b9e426..4ecc1eea 100644
--- a/src/rcldb/rcldb.h
+++ b/src/rcldb/rcldb.h
@@ -250,8 +250,11 @@ class Db {
/** Add or update document. The Doc class should have been filled as much as
* possible depending on the document type. parent_udi is only
* use for subdocs, else set it to empty */
- bool addOrUpdate(const string &udi, const string &parent_udi,
- Doc &doc);
+ bool addOrUpdate(RclConfig *config, const string &udi,
+ const string &parent_udi, Doc &doc);
+#ifdef IDX_THREADS
+ void waitUpdIdle();
+#endif
/** Delete document(s) for given UDI, including subdocs */
bool purgeFile(const string &udi, bool *existed = 0);
diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h
index 0c5a7a88..bb648209 100644
--- a/src/rcldb/rcldb_p.h
+++ b/src/rcldb/rcldb_p.h
@@ -26,10 +26,10 @@
#ifdef IDX_THREADS
#include "workqueue.h"
-#include "debuglog.h"
#endif // IDX_THREADS
-
+#include "debuglog.h"
#include "xmacros.h"
+#include "ptmutex.h"
namespace Rcl {
@@ -61,6 +61,8 @@ class Db::Native {
bool m_noversionwrite; //Set if open failed because of version mismatch!
#ifdef IDX_THREADS
WorkQueue m_wqueue;
+ int m_loglevel;
+ PTMutexInit m_mutex;
#endif // IDX_THREADS
// Indexing
@@ -76,19 +78,26 @@ class Db::Native {
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
- , m_wqueue(10)
+ , m_wqueue("DbUpd", 2)
#endif // IDX_THREADS
- { }
+ {
+ LOGDEB2(("Native::Native: me %p\n", this));
+ }
~Native() {
+ LOGDEB2(("Native::~Native: me %p\n", this));
#ifdef IDX_THREADS
if (m_iswritable) {
void *status = m_wqueue.setTerminateAndWait();
- LOGDEB(("Native: worker status %ld\n", long(status)));
+ LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
}
#endif // IDX_THREADS
}
+ // Final steps of doc update, part which need to be single-threaded
+ bool addOrUpdateWrite(const string& udi, const string& uniterm,
+ Xapian::Document& doc, size_t txtlen);
+
bool getPagePositions(Xapian::docid docid, vector& vpos);
int getPageNumberForPosition(const vector& pbreaks, unsigned int pos);
diff --git a/src/sampleconf/recoll.conf.in b/src/sampleconf/recoll.conf.in
index a4e53318..a9b04712 100644
--- a/src/sampleconf/recoll.conf.in
+++ b/src/sampleconf/recoll.conf.in
@@ -95,15 +95,16 @@ indexstemminglanguages = english
# appartenance to the list will turn-off both standard accent and case
# processing. Examples:
# Swedish:
-# unac_except_trans = åå Åå ää Ää öö Öö
+# unac_except_trans = ää Ää öö Öö üü Üü ßss œoe Œoe æae Æae fifi flfl åå Åå
# German:
-# unac_except_trans = Ää Öö Üü ää öö üü ßss
-# In French, you probably want to decompose oe and ae
-# unac_except_trans = œoe Œoe æae Æae
-# Actually, this seems a reasonable default for all until someone
-# protests. These decompositions are not performed by unac, but I
-# cant imagine someone typing the composed forms in a search.
-unac_except_trans = ßss œoe Œoe æae ÆAE fifi flfl
+# unac_except_trans = ää Ää öö Öö üü Üü ßss œoe Œoe æae Æae fifi flfl
+# In French, you probably want to decompose oe and ae and nobody would type
+# a German ß
+# unac_except_trans = ßss œoe Œoe æae Æae fifi flfl
+# Reasonable default for all until someone protests. These decompositions
+# are not performed by unac, but I cant imagine someone typing the composed
+# forms in a search.
+unac_except_trans = ßss œoe Œoe æae Æae fifi flfl
# Maximum expansion count for a single term (ie: when using wildcards).
# We used to not limit this at all (except for filenames where the limit
diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h
index 946ae513..102f6cd8 100644
--- a/src/utils/workqueue.h
+++ b/src/utils/workqueue.h
@@ -17,17 +17,33 @@
#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_
-#include "pthread.h"
+#include
+#include
+
#include
#include
+#include
+#include
+using std::tr1::unordered_map;
+using std::tr1::unordered_set;
using std::queue;
using std::string;
+#include "debuglog.h"
+
+#define WORKQUEUE_TIMING
+
+class WQTData {
+ public:
+ WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
+ struct timespec wstart;
+};
+
/**
* A WorkQueue manages the synchronisation around a queue of work items,
- * where a single client thread queues tasks and a single worker takes
- * and executes them. The goal is to introduce some level of
- * parallelism between the successive steps of a previously single
+ * where a number of client threads queue tasks and a number of worker
+ * threads takes and executes them. The goal is to introduce some level
+ * of parallelism between the successive steps of a previously single
* threaded pipe-line (data extraction / data preparation / index
* update).
*
@@ -38,154 +54,286 @@ using std::string;
*/
template class WorkQueue {
public:
- WorkQueue(int hi = 0, int lo = 1)
- : m_high(hi), m_low(lo), m_size(0), m_worker_up(false),
- m_worker_waiting(false), m_jobcnt(0), m_lenacc(0)
+
+ /** Create a WorkQueue
+ * @param name for message printing
+ * @param hi number of tasks on queue before clients blocks. Default 0
+ * meaning no limit.
+ * @param lo minimum count of tasks before worker starts. Default 1.
+ */
+ WorkQueue(const string& name, int hi = 0, int lo = 1)
+ : m_name(name), m_high(hi), m_low(lo), m_size(0),
+ m_workers_waiting(0), m_workers_exited(0), m_jobcnt(0),
+ m_clientwait(0), m_workerwait(0), m_workerwork(0)
{
- m_ok = (pthread_cond_init(&m_cond, 0) == 0) &&
- (pthread_mutex_init(&m_mutex, 0) == 0);
+ m_ok = (pthread_cond_init(&m_cond, 0) == 0) &&
+ (pthread_mutex_init(&m_mutex, 0) == 0);
}
~WorkQueue()
{
- if (m_worker_up)
- setTerminateAndWait();
+ LOGDEB2(("WorkQueue::~WorkQueue: name %s\n", m_name.c_str()));
+ if (!m_worker_threads.empty())
+ setTerminateAndWait();
}
- /** Start the worker thread. The start_routine will loop
- * taking and executing tasks. */
- bool start(void *(*start_routine)(void *), void *arg)
+ /** Start the worker threads.
+ *
+ * @param nworkers number of threads copies to start.
+ * @param start_routine thread function. It should loop
+ * taking (QueueWorker::take() and executing tasks.
+ * @param arg initial parameter to thread function.
+ * @return true if ok.
+ */
+ bool start(int nworkers, void *(*start_routine)(void *), void *arg)
{
- bool status = pthread_create(&m_worker_thread, 0,
- start_routine, arg) == 0;
- if (status)
- m_worker_up = true;
- return status;
+ for (int i = 0; i < nworkers; i++) {
+ int err;
+ pthread_t thr;
+ if ((err = pthread_create(&thr, 0, start_routine, arg))) {
+ LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
+ m_name.c_str(), err));
+ return false;
+ }
+ m_worker_threads.insert(pair(thr, WQTData()));
+ }
+ return true;
}
- /**
- * Add item to work queue. Sleep if there are already too many.
- * Called from client.
+ /** Add item to work queue, called from client.
+ *
+ * Sleeps if there are already too many.
*/
bool put(T t)
{
- if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
- return false;
+ if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
+ return false;
- while (ok() && m_high > 0 && m_queue.size() >= m_high) {
- // Keep the order: we test ok() AFTER the sleep...
- if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
- pthread_mutex_unlock(&m_mutex);
- return false;
- }
- }
+#ifdef WORKQUEUE_TIMING
+ struct timespec before;
+ clock_gettime(CLOCK_MONOTONIC, &before);
+#endif
- m_queue.push(t);
- ++m_size;
- pthread_cond_broadcast(&m_cond);
- pthread_mutex_unlock(&m_mutex);
- return true;
+ while (ok() && m_high > 0 && m_queue.size() >= m_high) {
+ // Keep the order: we test ok() AFTER the sleep...
+ if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
+ pthread_mutex_unlock(&m_mutex);
+ return false;
+ }
+ }
+
+#ifdef WORKQUEUE_TIMING
+ struct timespec after;
+ clock_gettime(CLOCK_MONOTONIC, &after);
+ m_clientwait += nanodiff(before, after);
+#endif
+
+ m_queue.push(t);
+ ++m_size;
+ // Just wake one worker, there is only one new task.
+ pthread_cond_signal(&m_cond);
+ pthread_mutex_unlock(&m_mutex);
+ return true;
}
- /** Wait until the queue is empty and the worker is
- * back waiting for task. Called from the client when it needs to
- * perform work that couldn't be done in parallel with the
- * worker's tasks.
+ /** Wait until the queue is inactive. Called from client.
+ *
+ * Waits until the task queue is empty and the workers are all
+ * back sleeping. Used by the client to wait for all current work
+ * to be completed, when it needs to perform work that couldn't be
+ * done in parallel with the worker's tasks, or before shutting
+ * down. Work can be resumed after calling this.
*/
bool waitIdle()
{
- if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
- return false;
+ if (!ok() || pthread_mutex_lock(&m_mutex) != 0) {
+ LOGERR(("WorkQueue::waitIdle: %s not ok or can't lock\n",
+ m_name.c_str()));
+ return false;
+ }
- // We're done when the queue is empty AND the worker is back
- // for a task (has finished the last)
- while (ok() && (m_queue.size() > 0 || !m_worker_waiting)) {
- if (pthread_cond_wait(&m_cond, &m_mutex)) {
- pthread_mutex_unlock(&m_mutex);
- return false;
- }
- }
- pthread_mutex_unlock(&m_mutex);
- return ok();
+ // We're done when the queue is empty AND all workers are back
+ // waiting for a task.
+ while (ok() && (m_queue.size() > 0 ||
+ m_workers_waiting != m_worker_threads.size())) {
+ if (pthread_cond_wait(&m_cond, &m_mutex)) {
+ pthread_mutex_unlock(&m_mutex);
+ m_ok = false;
+ LOGERR(("WorkQueue::waitIdle: cond_wait failed\n"));
+ return false;
+ }
+ }
+
+#ifdef WORKQUEUE_TIMING
+ long long M = 1000000LL;
+ long long wscl = m_worker_threads.size() * M;
+ LOGERR(("WorkQueue:%s: clients wait (all) %lld mS, "
+ "worker wait (avg) %lld mS, worker work (avg) %lld mS\n",
+ m_name.c_str(), m_clientwait / M, m_workerwait / wscl,
+ m_workerwork / wscl));
+#endif // WORKQUEUE_TIMING
+
+ pthread_mutex_unlock(&m_mutex);
+ return ok();
}
- /** Tell the worker to exit, and wait for it. There may still
- be tasks on the queue. */
+
+ /** Tell the workers to exit, and wait for them. Does not bother about
+ * tasks possibly remaining on the queue, so should be called
+ * after waitIdle() for an orderly shutdown.
+ */
void* setTerminateAndWait()
{
- if (!m_worker_up)
- return (void *)0;
+ LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
+ pthread_mutex_lock(&m_mutex);
- pthread_mutex_lock(&m_mutex);
- m_ok = false;
- pthread_cond_broadcast(&m_cond);
- pthread_mutex_unlock(&m_mutex);
- void *status;
- pthread_join(m_worker_thread, &status);
- m_worker_up = false;
- return status;
- }
-
- /** Remove task from queue. Sleep if there are not enough. Signal if we go
- to sleep on empty queue: client may be waiting for our going idle */
- bool take(T* tp)
- {
- if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
- return false;
-
- while (ok() && m_queue.size() < m_low) {
- m_worker_waiting = true;
- if (m_queue.empty())
- pthread_cond_broadcast(&m_cond);
- if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
- pthread_mutex_unlock(&m_mutex);
- m_worker_waiting = false;
- return false;
- }
- m_worker_waiting = false;
+ if (m_worker_threads.empty()) {
+ // Already called ?
+ return (void*)0;
}
- ++m_jobcnt;
- m_lenacc += m_size;
+ // Wait for all worker threads to have called workerExit()
+ m_ok = false;
+ while (m_workers_exited < m_worker_threads.size()) {
+ pthread_cond_broadcast(&m_cond);
+ if (pthread_cond_wait(&m_cond, &m_mutex)) {
+ pthread_mutex_unlock(&m_mutex);
+ LOGERR(("WorkQueue::setTerminate: cond_wait failed\n"));
+ return false;
+ }
+ }
- *tp = m_queue.front();
- m_queue.pop();
- --m_size;
-
- pthread_cond_broadcast(&m_cond);
- pthread_mutex_unlock(&m_mutex);
- return true;
+ // Perform the thread joins and compute overall status
+ // Workers return (void*)1 if ok
+ void *statusall = (void*)1;
+ unordered_map::iterator it;
+ while (!m_worker_threads.empty()) {
+ void *status;
+ it = m_worker_threads.begin();
+ pthread_join(it->first, &status);
+ if (status == (void *)0)
+ statusall = status;
+ m_worker_threads.erase(it);
+ }
+ pthread_mutex_unlock(&m_mutex);
+ LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
+ return statusall;
}
- /** Take note of the worker exit. This would normally happen after an
- unrecoverable error */
+ /** Take task from queue. Called from worker.
+ *
+ * Sleeps if there are not enough. Signal if we go
+ * to sleep on empty queue: client may be waiting for our going idle.
+ */
+ bool take(T* tp)
+ {
+ if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
+ return false;
+
+#ifdef WORKQUEUE_TIMING
+ struct timespec beforesleep;
+ clock_gettime(CLOCK_MONOTONIC, &beforesleep);
+
+ pthread_t me = pthread_self();
+ unordered_map::iterator it =
+ m_worker_threads.find(me);
+ if (it != m_worker_threads.end() &&
+ it->second.wstart.tv_sec != 0 && it->second.wstart.tv_nsec != 0) {
+ long long nanos = nanodiff(it->second.wstart, beforesleep);
+ m_workerwork += nanos;
+ }
+#endif
+
+ while (ok() && m_queue.size() < m_low) {
+ m_workers_waiting++;
+ if (m_queue.empty())
+ pthread_cond_broadcast(&m_cond);
+ if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
+ // !ok is a normal condition when shutting down
+ if (ok())
+ LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
+ m_name.c_str()));
+ pthread_mutex_unlock(&m_mutex);
+ m_workers_waiting--;
+ return false;
+ }
+ m_workers_waiting--;
+ }
+
+#ifdef WORKQUEUE_TIMING
+ struct timespec aftersleep;
+ clock_gettime(CLOCK_MONOTONIC, &aftersleep);
+ m_workerwait += nanodiff(beforesleep, aftersleep);
+ it = m_worker_threads.find(me);
+ if (it != m_worker_threads.end())
+ it->second.wstart = aftersleep;
+#endif
+
+ ++m_jobcnt;
+ *tp = m_queue.front();
+ m_queue.pop();
+ --m_size;
+ // No reason to wake up more than one client thread
+ pthread_cond_signal(&m_cond);
+ pthread_mutex_unlock(&m_mutex);
+ return true;
+ }
+
+ /** Advertise exit and abort queue. Called from worker
+ * This would normally happen after an unrecoverable error, or when
+ * the queue is terminated by the client. Workers never exit normally,
+ * except when the queue is shut down (at which point m_ok is set to false
+ * by the shutdown code anyway). The thread must return/exit immediately
+ * after calling this
+ */
void workerExit()
{
- if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
- return;
- m_ok = false;
- pthread_cond_broadcast(&m_cond);
- pthread_mutex_unlock(&m_mutex);
+ if (pthread_mutex_lock(&m_mutex) != 0)
+ return;
+ m_workers_exited++;
+ m_ok = false;
+ pthread_cond_broadcast(&m_cond);
+ pthread_mutex_unlock(&m_mutex);
}
- /** Debug only: as the size is returned while the queue is unlocked, there
- * is no warranty on its consistency. Not that we use the member size, not
- * the container size() call which would need locking.
+ /** Return current queue size. Debug only.
+ *
+ * As the size is returned while the queue is unlocked, there
+ * is no warranty on its consistency. Not that we use the member
+ * size, not the container size() call which would need locking.
*/
- size_t size() {return m_size;}
+ size_t size()
+ {
+ return m_size;
+ }
private:
- bool ok() {return m_ok && m_worker_up;}
+ bool ok()
+ {
+ return m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
+ }
+ long long nanodiff(const struct timespec& older,
+ const struct timespec& newer)
+ {
+ return (newer.tv_sec - older.tv_sec) * 1000000000LL
+ + newer.tv_nsec - older.tv_nsec;
+ }
+
+ string m_name;
size_t m_high;
size_t m_low;
size_t m_size;
- bool m_worker_up;
- bool m_worker_waiting;
+ /* Worker threads currently waiting for a job */
+ unsigned int m_workers_waiting;
+ unsigned int m_workers_exited;
+ /* Stats */
int m_jobcnt;
- int m_lenacc;
+ long long m_clientwait;
+ long long m_workerwait;
+ long long m_workerwork;
- pthread_t m_worker_thread;
+ unordered_map m_worker_threads;
queue m_queue;
pthread_cond_t m_cond;
pthread_mutex_t m_mutex;
diff --git a/website/BUGS.html b/website/BUGS.html
index bf928b73..00b9cfd8 100644
--- a/website/BUGS.html
+++ b/website/BUGS.html
@@ -36,14 +36,8 @@
topmost section may also exist in older versions.
-
+
- - It seems that a click in the snippets window can crash
- recoll the very first time it is used. I could never reproduce
- this on later runs and it is not known what causes the
- problem. Just restart the application, and things should stay
- up the next times.
-
- On systems such as Debian Stable which use Evince version
2.x (not 3.x) as PDF viewer, the default "Open" command for
PDF files will not work. You need to edit the command:
@@ -77,10 +71,20 @@
- - The missing filter recording code is broken.
-
- - Opening embedded documents from the Unity Lens does not
- work.
+ - Thumbnails are not found on newer desktops (e.g. Ubuntu
+ Quantal) because of a change in the freedesktop.org
+ "standard".
+ - A bug in extracting search term from click data in the
+ snippet window results in passing an incorrect term to the
+ viewer. Only affects non-ascii terms.
+ - Using the snippets window can sometimes crash the
+ GUI.
+ - Tilde expansion is not properly performed for the
+ "beaglequeuedir" parameter. This only affects people who
+ develop scripts over the queue feature.
+ - The missing filter recording code is broken.
+ - Opening embedded documents from the Unity Lens does not
+ work.
diff --git a/website/copydocs b/website/copydocs
index 0acb8a89..a3f28abf 100644
--- a/website/copydocs
+++ b/website/copydocs
@@ -1,6 +1,6 @@
#!/bin/sh
set -x
-docdir=/home/dockes/projets/fulltext/recoll/src/doc/user/
+docdir=/home/dockes/projets/fulltext/17-MAINT/src/doc/user/
#docdir=/Users/dockes/projets/fulltext/recoll/src/doc/user/
#(cd $docdir;make) || exit 1
diff --git a/website/devel.html b/website/devel.html
index 617e2f71..d720b3eb 100644
--- a/website/devel.html
+++ b/website/devel.html
@@ -40,20 +40,13 @@
(and understand english, which can probably be assumed, you
being reading this), you can take a little time to translate
the GUI messages file.
- The newest versions of the message files follow. There
- is an empty one (the xx thing), the others are partially
+
The newest versions of the message files follow can be found
+ in this directory. There
+ is an empty one (the xx one), the others are partially
translated, just needing an update for the new messages.
Updating the files can easily be done with
the Qt Linguist. Contact me
for more directions if needed.
-
@@ -67,7 +60,9 @@
tracking system, these are the general areas where help or
ideas are particularly welcome:
- - A better GUI design (both the ergonomy and the appearance).
+ - A better GUI design (both the ergonomy and the
+ appearance). Adding missing shortcuts or fixing the menu
+ accelerators for exemple is easy and useful.
- More support for the more advanced Xapian concepts like relevance
@@ -95,90 +90,10 @@
Reporting crashes is very useful. It can help others, and it
can get your own problem to be solved.
- All reports are useful. But, in order to maximize usefulness,
- a crash report should include a so-called stack trace, something
- that indicates what the program was doing when it
- crashed. Getting a useful stack trace is not very difficult,
- but it may need a little work on your part (which
- will then enable me do my part of the work).
-
- If your distribution includes a separate package for Recoll
- debugging symbols, it probably also has a page on its web site
- explaining how to use them to get a stack trace. You should
- follow these instructions. If there is no debugging package,
- you should follow the instructions below. A little
- familiarity with the command line will be necessary.
-
- - Compiling and installing a debugging version
- -
-
- - Obtain the recoll source for the version you are using
- (www.recoll.org),
- and extract the source tree.
-
- - Follow the instructions for
-
- building Recoll from source with the following
- modifications:
-
- - Before running configure, edit
- the mk/localdefs.in file and remove the -O2
- option(s).
- - When running configure, specify the
- standard installation location for your system as a prefix
- (to avoid ending up with two installed versions, which
- would almost certainly end in confusion). On Linux this
- would typically be:
-
configure --prefix=/usr.
- - When installing, arrange for the installed
- executables not to be stripped of debugging symbols by
- specifying a value for the STRIP environment variable
- (ie: echo or ls):
sudo make
- install STRIP=ls
-
-
-
-
- - Getting a core dump
- - You will need to run the operation that caused the crash
- inside a writable directory, and tell the system that you
- accept core dumps. The commands need to be run in a shell
- inside a terminal window. Ie:
-
-cd
-ulimit -c unlimited
-recoll #(or recollindex or whatever you want to run).
-
-
- Hopefully, you will succeed in getting the command to crash,
- and you will get a core file.
-
-
- - Using gdb to get a stack trace
- -
-
- - Install gdb if it is not already on the system.
-
- - Run gdb on the command that crashed and the
- core file (depending on the system, the core file may be
- named "core" or something else, like recollindex.core, or
- core.pid), ie:
-
-
gdb /usr/bin/recollindex core
-
- - Inside gdb, you need to use different
- commands to get a stack trace for recoll
- and recollindex. For recollindex you
- can use the bt command. For recoll
- use:
thread apply all bt full
-
- - Copy/paste the output to your report email :), and
- quit gdb ("q").
-
-
-
+ You will find help and information about producing a useful
+ problem report on this
+
+ Recoll wiki page.