Replaced pthread with std:: thread and mutex

This commit is contained in:
Jean-Francois Dockes 2016-07-12 18:08:21 +02:00
parent dfcd57f665
commit e0473b5606
33 changed files with 367 additions and 633 deletions

View file

@ -222,7 +222,6 @@ utils/netcon.cpp \
utils/netcon.h \
utils/pathut.cpp \
utils/pathut.h \
utils/ptmutex.h \
utils/pxattr.cpp \
utils/pxattr.h \
utils/rclionice.cpp \

View file

@ -21,6 +21,8 @@
#ifdef RCL_USE_ASPELL
#include <mutex>
#include <unistd.h>
#include <dlfcn.h>
#include <stdlib.h>
@ -32,7 +34,6 @@
#include "rclaspell.h"
#include "log.h"
#include "unacpp.h"
#include "ptmutex.h"
using namespace std;
@ -63,7 +64,7 @@ public:
};
static AspellApi aapi;
static PTMutexInit o_aapi_mutex;
static std::mutex o_aapi_mutex;
#define NMTOPTR(NM, TP) \
if ((aapi.NM = TP dlsym(m_data->m_handle, #NM)) == 0) { \
@ -114,7 +115,7 @@ Aspell::~Aspell()
bool Aspell::init(string &reason)
{
PTMutexLocker locker(o_aapi_mutex);
std::unique_lock<std::mutex> locker(o_aapi_mutex);
deleteZ(m_data);
// Language: we get this from the configuration, else from the NLS

View file

@ -32,9 +32,6 @@ overriden in the c++ code by ifdefs _WIN32 anyway */
/* Define to 1 if you have the `dl' library (-ldl). */
#define HAVE_LIBDL 1
/* Define to 1 if you have the `pthread' library (-lpthread). */
#define HAVE_LIBPTHREAD 1
/* Define to 1 if you have the `z' library (-lz). */
#define HAVE_LIBZ 1

View file

@ -1,30 +1,10 @@
#ifdef HAVE_CXX0X_UNORDERED
# define UNORDERED_MAP_INCLUDE <unordered_map>
# define UNORDERED_SET_INCLUDE <unordered_set>
# define STD_UNORDERED_MAP std::unordered_map
# define STD_UNORDERED_SET std::unordered_set
#elif defined(HAVE_TR1_UNORDERED)
# define UNORDERED_MAP_INCLUDE <tr1/unordered_map>
# define UNORDERED_SET_INCLUDE <tr1/unordered_set>
# define STD_UNORDERED_MAP std::tr1::unordered_map
# define STD_UNORDERED_SET std::tr1::unordered_set
#else
# define UNORDERED_MAP_INCLUDE <map>
# define UNORDERED_SET_INCLUDE <set>
# define STD_UNORDERED_MAP std::map
# define STD_UNORDERED_SET std::set
#endif
#define UNORDERED_MAP_INCLUDE <unordered_map>
#define UNORDERED_SET_INCLUDE <unordered_set>
#define STD_UNORDERED_MAP std::unordered_map
#define STD_UNORDERED_SET std::unordered_set
#define MEMORY_INCLUDE <memory>
#define STD_SHARED_PTR std::shared_ptr
#ifdef HAVE_SHARED_PTR_STD
# define MEMORY_INCLUDE <memory>
# define STD_SHARED_PTR std::shared_ptr
#elif defined(HAVE_SHARED_PTR_TR1)
# define MEMORY_INCLUDE <tr1/memory>
# define STD_SHARED_PTR std::tr1::shared_ptr
#else
# define MEMORY_INCLUDE "refcntr.h"
# define STD_SHARED_PTR RefCntr
#endif
#ifdef _WIN32
#include "safewindows.h"

View file

@ -22,12 +22,13 @@
#endif
#include <signal.h>
#include <locale.h>
#include <pthread.h>
#include <cstdlib>
#if !defined(PUTENV_ARG_CONST)
#include <string.h>
#endif
#include <thread>
#include "log.h"
#include "rclconfig.h"
#include "rclinit.h"
@ -37,7 +38,7 @@
#include "smallut.h"
#include "execmd.h"
static pthread_t mainthread_id;
std::thread::id mainthread_id;
// Signal etc. processing. We want to be able to properly close the
// index if we are currently writing to it.
@ -309,10 +310,8 @@ RclConfig *recollinit(RclInitFlags flags,
// threads don't try to do it at once).
config->getDefCharset();
mainthread_id = pthread_self();
mainthread_id = std::this_thread::get_id();
// Init unac locking
unac_init_mt();
// Init smallut and pathut static values
pathut_init_mt();
smallut_init_mt();
@ -388,6 +387,6 @@ void recoll_threadinit()
bool recoll_ismainthread()
{
return pthread_equal(pthread_self(), mainthread_id);
return std::this_thread::get_id() == mainthread_id;
}

View file

@ -16,16 +16,17 @@
*/
#include "autoconfig.h"
#include <mutex>
#include "rcldoc.h"
#include "fetcher.h"
#include "bglfetcher.h"
#include "log.h"
#include "ptmutex.h"
#include "beaglequeuecache.h"
// We use a single beagle cache object to access beagle data. We protect it
// against multiple thread access.
static PTMutexInit o_beagler_mutex;
static std::mutex o_beagler_mutex;
bool BGLDocFetcher::fetch(RclConfig* cnf, const Rcl::Doc& idoc, RawDoc& out)
{
@ -36,7 +37,7 @@ bool BGLDocFetcher::fetch(RclConfig* cnf, const Rcl::Doc& idoc, RawDoc& out)
}
Rcl::Doc dotdoc;
{
PTMutexLocker locker(o_beagler_mutex);
std::unique_lock<std::mutex> locker(o_beagler_mutex);
// Retrieve from our webcache (beagle data). The beagler
// object is created at the first call of this routine and
// deleted when the program exits.

View file

@ -85,13 +85,13 @@ extern void *FsIndexerInternfileWorker(void*);
// main thread either before or after the exciting part
class FSIFIMissingStore : public FIMissingStore {
#ifdef IDX_THREADS
PTMutexInit m_mutex;
std::mutex m_mutex;
#endif
public:
virtual void addMissing(const string& prog, const string& mt)
{
#ifdef IDX_THREADS
PTMutexLocker locker(m_mutex);
std::unique_lock<std::mutex> locker(m_mutex);
#endif
FIMissingStore::addMissing(prog, mt);
}
@ -178,7 +178,7 @@ bool FsIndexer::index(int flags)
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_updater->m_mutex);
std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif
m_updater->status.dbtotdocs = m_db->docCnt();
}
@ -554,7 +554,7 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
{
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_updater->m_mutex);
std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif
if (!m_updater->update()) {
return FsTreeWalker::FtwStop;
@ -668,7 +668,7 @@ FsIndexer::processonefile(RclConfig *config,
LOGDEB0("processone: up to date: " << (fn) << "\n" );
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_updater->m_mutex);
std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif
// Status bar update, abort request etc.
m_updater->status.fn = fn;
@ -799,7 +799,7 @@ FsIndexer::processonefile(RclConfig *config,
// Tell what we are doing and check for interrupt request
if (m_updater) {
#ifdef IDX_THREADS
PTMutexLocker locker(m_updater->m_mutex);
std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif
++(m_updater->status.docsdone);
if (m_updater->status.dbtotdocs < m_updater->status.docsdone)

View file

@ -18,11 +18,11 @@
#define _fsindexer_h_included_
#include <list>
#include <mutex>
#include "indexer.h"
#include "fstreewalk.h"
#ifdef IDX_THREADS
#include "ptmutex.h"
#include "workqueue.h"
#endif // IDX_THREADS
@ -93,7 +93,7 @@ class FsIndexer : public FsTreeWalkerCB {
if (!dorecord)
return;
#ifdef IDX_THREADS
PTMutexLocker locker(mutex);
std::unique_lock<std::mutex> locker(mutex);
#endif
udis.push_back(udi);
}
@ -103,7 +103,7 @@ class FsIndexer : public FsTreeWalkerCB {
}
private:
#ifdef IDX_THREADS
PTMutexInit mutex;
std::mutex mutex;
#endif
bool dorecord;
std::vector<std::string> udis;

View file

@ -22,6 +22,7 @@
#include <list>
#include <map>
#include <vector>
#include <mutex>
using std::string;
using std::list;
@ -30,9 +31,6 @@ using std::vector;
#include "rcldb.h"
#include "rcldoc.h"
#ifdef IDX_THREADS
#include "ptmutex.h"
#endif
class FsIndexer;
class BeagleQueueIndexer;
@ -66,7 +64,7 @@ class DbIxStatus {
class DbIxStatusUpdater {
public:
#ifdef IDX_THREADS
PTMutexInit m_mutex;
std::mutex m_mutex;
#endif
DbIxStatus status;
virtual ~DbIxStatusUpdater(){}
@ -75,7 +73,7 @@ class DbIxStatusUpdater {
virtual bool update(DbIxStatus::Phase phase, const string& fn)
{
#ifdef IDX_THREADS
PTMutexLocker lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
#endif
status.phase = phase;
status.fn = fn;

View file

@ -32,6 +32,7 @@
#include <time.h>
#include <string>
#include <map>
#include <mutex>
#include "rclconfig.h"
@ -78,16 +79,12 @@ class RclMonEventQueue {
public:
RclMonEventQueue();
~RclMonEventQueue();
/** Unlock queue and wait until there are new events.
* Returns with the queue locked */
bool wait(int secs = -1, bool *timedout = 0);
/** Unlock queue */
bool unlock();
/** Lock queue. */
bool lock();
/** Lock queue and add event. */
/** Wait for event or timeout. Returns with the queue locked */
std::unique_lock<std::mutex> wait(int secs = -1, bool *timedout = 0);
/** Add event. */
bool pushEvent(const RclMonEvent &ev);
void setTerminate(); /* To all threads: end processing */
/** To all threads: end processing */
void setTerminate();
bool ok();
bool empty();
RclMonEvent pop();

View file

@ -24,7 +24,6 @@
* initialization function.
*/
#include <pthread.h>
#include <errno.h>
#include <fnmatch.h>
#include "safeunistd.h"
@ -34,6 +33,11 @@
#include <cstdlib>
#include <list>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using std::list;
using std::vector;
@ -51,8 +55,6 @@ using std::vector;
typedef unsigned long mttcast;
static pthread_t rcv_thrid;
// Seconds between auxiliary db (stem, spell) updates:
static const int dfltauxinterval = 60 *60;
static int auxinterval = dfltauxinterval;
@ -135,13 +137,13 @@ public:
vector<DelayPat> m_delaypats;
RclConfig *m_config;
bool m_ok;
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
std::mutex m_mutex;
std::condition_variable m_cond;
RclEQData()
: m_config(0), m_ok(false)
: m_config(0), m_ok(true)
{
if (!pthread_mutex_init(&m_mutex, 0) && !pthread_cond_init(&m_cond, 0))
m_ok = true;
}
void readDelayPats(int dfltsecs);
DelayPat searchDelayPats(const string& path)
@ -192,8 +194,8 @@ void RclEQData::readDelayPats(int dfltsecs)
// when necessary.
void RclEQData::delayInsert(const queue_type::iterator &qit)
{
MONDEB(("RclEQData::delayInsert: minclock %lu\n",
(mttcast)qit->second.m_minclock));
MONDEB("RclEQData::delayInsert: minclock " << qit->second.m_minclock <<
std::endl);
for (delays_type::iterator dit = m_delays.begin();
dit != m_delays.end(); dit++) {
queue_type::iterator qit1 = *dit;
@ -222,62 +224,33 @@ void RclMonEventQueue::setopts(int opts)
}
/** Wait until there is something to process on the queue, or timeout.
* Must be called with the queue locked
* returns a queue lock
*/
bool RclMonEventQueue::wait(int seconds, bool *top)
std::unique_lock<std::mutex> RclMonEventQueue::wait(int seconds, bool *top)
{
MONDEB(("RclMonEventQueue::wait\n"));
std::unique_lock<std::mutex> lock(m_data->m_mutex);
MONDEB("RclMonEventQueue::wait, seconds: " << seconds << std::endl);
if (!empty()) {
MONDEB(("RclMonEventQueue:: imm return\n"));
return true;
MONDEB("RclMonEventQueue:: immediate return\n");
return lock;
}
int err;
if (seconds > 0) {
struct timespec to;
to.tv_sec = time(0L) + seconds;
to.tv_nsec = 0;
if (top)
*top = false;
if ((err =
pthread_cond_timedwait(&m_data->m_cond, &m_data->m_mutex, &to))) {
if (err == ETIMEDOUT) {
*top = true;
MONDEB(("RclMonEventQueue:: timeout\n"));
return true;
}
LOGERR("RclMonEventQueue::wait:pthread_cond_timedwait failedwith err " << (err) << "\n" );
return false;
}
if (m_data->m_cond.wait_for(lock, std::chrono::seconds(seconds)) ==
std::cv_status::timeout) {
*top = true;
MONDEB("RclMonEventQueue:: timeout\n");
return lock;
}
} else {
if ((err = pthread_cond_wait(&m_data->m_cond, &m_data->m_mutex))) {
LOGERR("RclMonEventQueue::wait: pthread_cond_wait failedwith err " << (err) << "\n" );
return false;
}
m_data->m_cond.wait(lock);
}
MONDEB(("RclMonEventQueue:: normal return\n"));
return true;
}
bool RclMonEventQueue::lock()
{
MONDEB(("RclMonEventQueue:: lock\n"));
if (pthread_mutex_lock(&m_data->m_mutex)) {
LOGERR("RclMonEventQueue::lock: pthread_mutex_lock failed\n" );
return false;
}
MONDEB(("RclMonEventQueue:: lock return\n"));
return true;
}
bool RclMonEventQueue::unlock()
{
MONDEB(("RclMonEventQueue:: unlock\n"));
if (pthread_mutex_unlock(&m_data->m_mutex)) {
LOGERR("RclMonEventQueue::lock: pthread_mutex_unlock failed\n" );
return false;
}
return true;
MONDEB("RclMonEventQueue:: non-timeout return\n");
return lock;
}
void RclMonEventQueue::setConfig(RclConfig *cnf)
@ -312,37 +285,36 @@ bool RclMonEventQueue::ok()
void RclMonEventQueue::setTerminate()
{
MONDEB(("RclMonEventQueue:: setTerminate\n"));
lock();
MONDEB("RclMonEventQueue:: setTerminate\n");
std::unique_lock<std::mutex> lock(m_data->m_mutex);
m_data->m_ok = false;
pthread_cond_broadcast(&m_data->m_cond);
unlock();
m_data->m_cond.notify_all();
}
// Must be called with the queue locked
bool RclMonEventQueue::empty()
{
if (m_data == 0) {
MONDEB(("RclMonEventQueue::empty(): true (m_data==0)\n"));
MONDEB("RclMonEventQueue::empty(): true (m_data==0)\n");
return true;
}
if (!m_data->m_iqueue.empty()) {
MONDEB(("RclMonEventQueue::empty(): false (m_iqueue not empty)\n"));
MONDEB("RclMonEventQueue::empty(): false (m_iqueue not empty)\n");
return true;
}
if (m_data->m_dqueue.empty()) {
MONDEB(("RclMonEventQueue::empty(): true (m_Xqueue both empty)\n"));
MONDEB("RclMonEventQueue::empty(): true (m_Xqueue both empty)\n");
return true;
}
// Only dqueue has events. Have to check the delays (only the
// first, earliest one):
queue_type::iterator qit = *(m_data->m_delays.begin());
if (qit->second.m_minclock > time(0)) {
MONDEB(("RclMonEventQueue::empty(): true (no delay ready %lu)\n",
(mttcast)qit->second.m_minclock));
MONDEB("RclMonEventQueue::empty(): true (no delay ready " <<
qit->second.m_minclock << ")\n");
return true;
}
MONDEB(("RclMonEventQueue::empty(): returning false (delay expired)\n"));
MONDEB("RclMonEventQueue::empty(): returning false (delay expired)\n");
return false;
}
@ -352,15 +324,15 @@ bool RclMonEventQueue::empty()
RclMonEvent RclMonEventQueue::pop()
{
time_t now = time(0);
MONDEB(("RclMonEventQueue::pop(), now %lu\n", (mttcast)now));
MONDEB("RclMonEventQueue::pop(), now " << now << std::endl);
// Look at the delayed events, get rid of the expired/unactive
// ones, possibly return an expired/needidx one.
while (!m_data->m_delays.empty()) {
delays_type::iterator dit = m_data->m_delays.begin();
queue_type::iterator qit = *dit;
MONDEB(("RclMonEventQueue::pop(): in delays: evt minclock %lu\n",
(mttcast)qit->second.m_minclock));
MONDEB("RclMonEventQueue::pop(): in delays: evt minclock " <<
qit->second.m_minclock << std::endl);
if (qit->second.m_minclock <= now) {
if (qit->second.m_needidx) {
RclMonEvent ev = qit->second;
@ -399,8 +371,8 @@ RclMonEvent RclMonEventQueue::pop()
// special processing to limit their reindexing rate.
bool RclMonEventQueue::pushEvent(const RclMonEvent &ev)
{
MONDEB(("RclMonEventQueue::pushEvent for %s\n", ev.m_path.c_str()));
lock();
MONDEB("RclMonEventQueue::pushEvent for " << ev.m_path << std::endl);
std::unique_lock<std::mutex> lock(m_data->m_mutex);
DelayPat pat = m_data->searchDelayPats(ev.m_path);
if (pat.seconds != 0) {
@ -432,8 +404,7 @@ bool RclMonEventQueue::pushEvent(const RclMonEvent &ev)
m_data->m_iqueue[ev.m_path] = ev;
}
pthread_cond_broadcast(&m_data->m_cond);
unlock();
m_data->m_cond.notify_all();
return true;
}
@ -482,19 +453,12 @@ bool startMonitor(RclConfig *conf, int opts)
if (!conf->getConfParam("monixinterval", &ixinterval))
ixinterval = dfltixinterval;
rclEQ.setConfig(conf);
rclEQ.setopts(opts);
if (pthread_create(&rcv_thrid, 0, &rclMonRcvRun, &rclEQ) != 0) {
LOGERR("startMonitor: cant create event-receiving thread\n" );
return false;
}
if (!rclEQ.lock()) {
LOGERR("startMonitor: cant lock queue ???\n" );
return false;
}
std::thread treceive(rclMonRcvRun, &rclEQ);
treceive.detach();
LOGDEB("start_monitoring: entering main loop\n" );
bool timedout;
@ -504,61 +468,62 @@ bool startMonitor(RclConfig *conf, int opts)
list<string> modified;
list<string> deleted;
;
// Set a relatively short timeout for better monitoring of exit requests
while (rclEQ.wait(2, &timedout)) {
// Queue is locked.
while (true) {
{
std::unique_lock<std::mutex> lock = rclEQ.wait(2, &timedout);
// x11IsAlive() can't be called from ok() because both threads call it
// and Xlib is not multithreaded.
// x11IsAlive() can't be called from ok() because both
// threads call it and Xlib is not multithreaded.
#ifndef _WIN32
bool x11dead = !(opts & RCLMON_NOX11) && !x11IsAlive();
if (x11dead)
LOGDEB("RclMonprc: x11 is dead\n" );
bool x11dead = !(opts & RCLMON_NOX11) && !x11IsAlive();
if (x11dead)
LOGDEB("RclMonprc: x11 is dead\n" );
#else
bool x11dead = false;
bool x11dead = false;
#endif
if (!rclEQ.ok() || x11dead) {
rclEQ.unlock();
break;
}
if (!rclEQ.ok() || x11dead) {
break;
}
// Process event queue
for (;;) {
// Retrieve event
RclMonEvent ev = rclEQ.pop();
if (ev.m_path.empty())
break;
switch (ev.evtype()) {
case RclMonEvent::RCLEVT_MODIFY:
case RclMonEvent::RCLEVT_DIRCREATE:
LOGDEB0("Monitor: Modify/Check on " << (ev.m_path) << "\n" );
modified.push_back(ev.m_path);
break;
case RclMonEvent::RCLEVT_DELETE:
LOGDEB0("Monitor: Delete on " << (ev.m_path) << "\n" );
// If this is for a directory (which the caller should
// tell us because he knows), we should purge the db
// of all the subtree, because on a directory rename,
// inotify will only generate one event for the
// renamed top, not the subentries. This is relatively
// complicated to do though, and we currently do not
// do it, and just wait for a restart to do a full run and
// purge.
deleted.push_back(ev.m_path);
if (ev.evflags() & RclMonEvent::RCLEVT_ISDIR) {
vector<string> paths;
if (subtreelist(conf, ev.m_path, paths)) {
deleted.insert(deleted.end(),
paths.begin(), paths.end());
}
}
break;
default:
LOGDEB("Monitor: got Other on [" << (ev.m_path) << "]\n" );
}
}
// Unlock queue before processing lists
rclEQ.unlock();
// Process event queue
for (;;) {
// Retrieve event
RclMonEvent ev = rclEQ.pop();
if (ev.m_path.empty())
break;
switch (ev.evtype()) {
case RclMonEvent::RCLEVT_MODIFY:
case RclMonEvent::RCLEVT_DIRCREATE:
LOGDEB0("Monitor: Modify/Check on " << ev.m_path << "\n");
modified.push_back(ev.m_path);
break;
case RclMonEvent::RCLEVT_DELETE:
LOGDEB0("Monitor: Delete on " << (ev.m_path) << "\n" );
// If this is for a directory (which the caller should
// tell us because he knows), we should purge the db
// of all the subtree, because on a directory rename,
// inotify will only generate one event for the
// renamed top, not the subentries. This is relatively
// complicated to do though, and we currently do not
// do it, and just wait for a restart to do a full run and
// purge.
deleted.push_back(ev.m_path);
if (ev.evflags() & RclMonEvent::RCLEVT_ISDIR) {
vector<string> paths;
if (subtreelist(conf, ev.m_path, paths)) {
deleted.insert(deleted.end(),
paths.begin(), paths.end());
}
}
break;
default:
LOGDEB("Monitor: got Other on [" << (ev.m_path) << "]\n" );
}
}
}
// Process. We don't do this every time but let the lists accumulate
// a little, this saves processing. Start at once if list is big.
@ -608,8 +573,6 @@ bool startMonitor(RclConfig *conf, int opts)
o_reexec->removeArg("-n");
o_reexec->reexec();
}
// Lock queue before waiting again
rclEQ.lock();
}
LOGDEB("Rclmonprc: calling queue setTerminate\n" );
rclEQ.setTerminate();
@ -619,7 +582,6 @@ bool startMonitor(RclConfig *conf, int opts)
// during our limited time window for exiting. To be reviewed if
// we ever need several monitor invocations in the same process
// (can't foresee any reason why we'd want to do this).
// pthread_join(rcv_thrid, 0);
LOGDEB("Monitor: returning\n" );
return true;
}

View file

@ -35,6 +35,7 @@
#include <cstring>
#include <map>
#include <mutex>
#include "cstr.h"
#include "mimehandler.h"
@ -45,7 +46,6 @@
#include "rclconfig.h"
#include "md5ut.h"
#include "conftree.h"
#include "ptmutex.h"
using namespace std;
@ -65,7 +65,7 @@ public:
private: FILE **m_fpp;
};
static PTMutexInit o_mcache_mutex;
static std::mutex o_mcache_mutex;
/**
* Handles a cache for message numbers to offset translations. Permits direct
@ -105,7 +105,7 @@ public:
LOGDEB0("MboxCache::get_offsets: init failed\n" );
return -1;
}
PTMutexLocker locker(o_mcache_mutex);
std::unique_lock<std::mutex> locker(o_mcache_mutex);
string fn = makefilename(udi);
FILE *fp = 0;
if ((fp = fopen(fn.c_str(), "r")) == 0) {
@ -149,7 +149,7 @@ public:
return;
if (fsize < m_minfsize)
return;
PTMutexLocker locker(o_mcache_mutex);
std::unique_lock<std::mutex> locker(o_mcache_mutex);
string fn = makefilename(udi);
FILE *fp;
if ((fp = fopen(fn.c_str(), "w")) == 0) {
@ -179,7 +179,7 @@ public:
// Check state, possibly initialize
bool ok(RclConfig *config) {
PTMutexLocker locker(o_mcache_mutex);
std::unique_lock<std::mutex> locker(o_mcache_mutex);
if (m_minfsize == -1)
return false;
if (!m_ok) {
@ -393,11 +393,11 @@ basic_regex<char> minifromregex;
#endif
static bool regcompiled;
static PTMutexInit o_regex_mutex;
static std::mutex o_regex_mutex;
static void compileregexes()
{
PTMutexLocker locker(o_regex_mutex);
std::unique_lock<std::mutex> locker(o_regex_mutex);
// As the initial test of regcompiled is unprotected the value may
// have changed while we were waiting for the lock. Test again now
// that we are alone.

View file

@ -23,6 +23,7 @@
#include <string>
#include <vector>
#include <list>
#include <mutex>
using namespace std;
#include "cstr.h"
@ -40,7 +41,6 @@ using namespace std;
#include "mh_symlink.h"
#include "mh_unknown.h"
#include "mh_null.h"
#include "ptmutex.h"
// Performance help: we use a pool of already known and created
// handlers. There can be several instances for a given mime type
@ -50,14 +50,14 @@ static multimap<string, RecollFilter*> o_handlers;
static list<multimap<string, RecollFilter*>::iterator> o_hlru;
typedef list<multimap<string, RecollFilter*>::iterator>::iterator hlruit_tp;
static PTMutexInit o_handlers_mutex;
static std::mutex o_handlers_mutex;
static const unsigned int max_handlers_cache_size = 100;
/* Look for mime handler in pool */
static RecollFilter *getMimeHandlerFromCache(const string& key)
{
PTMutexLocker locker(o_handlers_mutex);
std::unique_lock<std::mutex> locker(o_handlers_mutex);
string xdigest;
MD5HexPrint(key, xdigest);
LOGDEB("getMimeHandlerFromCache: " << (xdigest) << " cache size " << (o_handlers.size()) << "\n" );
@ -90,7 +90,7 @@ void returnMimeHandler(RecollFilter *handler)
}
handler->clear();
PTMutexLocker locker(o_handlers_mutex);
std::unique_lock<std::mutex> locker(o_handlers_mutex);
LOGDEB("returnMimeHandler: returning filter for " << (handler->get_mime_type()) << " cache size " << (o_handlers.size()) << "\n" );
@ -124,7 +124,7 @@ void clearMimeHandlerCache()
{
LOGDEB("clearMimeHandlerCache()\n" );
multimap<string, RecollFilter *>::iterator it;
PTMutexLocker locker(o_handlers_mutex);
std::unique_lock<std::mutex> locker(o_handlers_mutex);
for (it = o_handlers.begin(); it != o_handlers.end(); it++) {
delete it->second;
}

View file

@ -39,7 +39,7 @@ bool Uncomp::uncompressfile(const string& ifn,
const vector<string>& cmdv, string& tfile)
{
if (m_docache) {
PTMutexLocker lock(o_cache.m_lock);
std::unique_lock<std::mutex> lock(o_cache.m_lock);
if (!o_cache.m_srcpath.compare(ifn)) {
m_dir = o_cache.m_dir;
m_tfile = tfile = o_cache.m_tfile;
@ -123,7 +123,7 @@ bool Uncomp::uncompressfile(const string& ifn,
Uncomp::~Uncomp()
{
if (m_docache) {
PTMutexLocker lock(o_cache.m_lock);
std::unique_lock<std::mutex> lock(o_cache.m_lock);
delete o_cache.m_dir;
o_cache.m_dir = m_dir;
o_cache.m_tfile = m_tfile;

View file

@ -19,10 +19,10 @@
#include <vector>
#include <string>
#include <mutex>
#include "pathut.h"
#include "rclutil.h"
#include "ptmutex.h"
/// Uncompression script interface.
class Uncomp {
@ -58,7 +58,7 @@ private:
{
delete m_dir;
}
PTMutexInit m_lock;
std::mutex m_lock;
TempDir *m_dir;
std::string m_tfile;
std::string m_srcpath;

View file

@ -50,14 +50,14 @@
extern RclConfig *theconfig;
PTMutexInit thetempfileslock;
std::mutex thetempfileslock;
static vector<TempFile> o_tempfiles;
/* Keep an array of temporary files for deletion at exit. It happens that we
erase some of them before exiting (ie: when closing a preview tab), we don't
reuse the array holes for now */
void rememberTempFile(TempFile temp)
{
PTMutexLocker locker(thetempfileslock);
std::unique_lock<std::mutex> locker(thetempfileslock);
o_tempfiles.push_back(temp);
}
@ -65,7 +65,7 @@ void forgetTempFile(string &fn)
{
if (fn.empty())
return;
PTMutexLocker locker(thetempfileslock);
std::unique_lock<std::mutex> locker(thetempfileslock);
for (vector<TempFile>::iterator it = o_tempfiles.begin();
it != o_tempfiles.end(); it++) {
if ((*it) && !fn.compare((*it)->filename())) {
@ -77,7 +77,7 @@ void forgetTempFile(string &fn)
void deleteAllTempFiles()
{
PTMutexLocker locker(thetempfileslock);
std::unique_lock<std::mutex> locker(thetempfileslock);
o_tempfiles.clear();
}

View file

@ -22,7 +22,6 @@
#include "rclconfig.h"
#include "rcldb.h"
#include "rclutil.h"
#include "ptmutex.h"
#include <QString>

View file

@ -22,7 +22,7 @@
#include "log.h"
#include "internfile.h"
PTMutexInit DocSequence::o_dblock;
std::mutex DocSequence::o_dblock;
string DocSequence::o_sort_trans;
string DocSequence::o_filt_trans;
@ -46,7 +46,7 @@ bool DocSequence::getEnclosing(Rcl::Doc& doc, Rcl::Doc& pdoc)
LOGERR("DocSequence::getEnclosing: no db\n" );
return false;
}
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
string udi;
if (!FileInterner::getEnclosingUDI(doc, udi))
return false;

View file

@ -22,11 +22,11 @@
#include <string>
#include <list>
#include <vector>
#include MEMORY_INCLUDE
#include <mutex>
#include <memory>
#include "rcldoc.h"
#include "hldata.h"
#include "ptmutex.h"
// Need this for the "Snippet" class def.
#include "rclquery.h"
@ -167,7 +167,7 @@ class DocSequence {
protected:
friend class DocSeqModifier;
virtual Rcl::Db *getDb() = 0;
static PTMutexInit o_dblock;
static std::mutex o_dblock;
static std::string o_sort_trans;
static std::string o_filt_trans;
std::string m_reason;

View file

@ -53,7 +53,7 @@ string DocSequenceDb::getDescription()
bool DocSequenceDb::getDoc(int num, Rcl::Doc &doc, string *sh)
{
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (!setQuery())
return false;
if (sh) sh->erase();
@ -62,7 +62,7 @@ bool DocSequenceDb::getDoc(int num, Rcl::Doc &doc, string *sh)
int DocSequenceDb::getResCnt()
{
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (!setQuery())
return false;
if (m_rescnt < 0) {
@ -78,7 +78,7 @@ static const string cstr_mre("[...]");
bool DocSequenceDb::getAbstract(Rcl::Doc &doc, vector<Rcl::Snippet>& vpabs)
{
LOGDEB("DocSequenceDb::getAbstract/pair\n" );
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (!setQuery())
return false;
@ -108,7 +108,7 @@ bool DocSequenceDb::getAbstract(Rcl::Doc &doc, vector<Rcl::Snippet>& vpabs)
bool DocSequenceDb::getAbstract(Rcl::Doc &doc, vector<string>& vabs)
{
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (!setQuery())
return false;
if (m_q->whatDb() &&
@ -122,7 +122,7 @@ bool DocSequenceDb::getAbstract(Rcl::Doc &doc, vector<string>& vabs)
int DocSequenceDb::getFirstMatchPage(Rcl::Doc &doc, string& term)
{
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (!setQuery())
return false;
if (m_q->whatDb()) {
@ -138,7 +138,7 @@ Rcl::Db *DocSequenceDb::getDb()
list<string> DocSequenceDb::expand(Rcl::Doc &doc)
{
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (!setQuery())
return list<string>();
vector<string> v = m_q->expand(doc);
@ -161,7 +161,7 @@ string DocSequenceDb::title()
bool DocSequenceDb::setFiltSpec(const DocSeqFiltSpec &fs)
{
LOGDEB("DocSequenceDb::setFiltSpec\n" );
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (fs.isNotNull()) {
// We build a search spec by adding a filtering layer to the base one.
m_fsdata = STD_SHARED_PTR<Rcl::SearchData>(
@ -209,7 +209,7 @@ bool DocSequenceDb::setFiltSpec(const DocSeqFiltSpec &fs)
bool DocSequenceDb::setSortSpec(const DocSeqSortSpec &spec)
{
LOGDEB("DocSequenceDb::setSortSpec: fld [" << (spec.field) << "] " << (spec.desc ? "desc" : "asc") << "\n" );
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
if (spec.isNotNull()) {
m_q->setSortBy(spec.field, !spec.desc);
m_isSorted = true;
@ -239,7 +239,7 @@ bool DocSequenceDb::setQuery()
bool DocSequenceDb::docDups(const Rcl::Doc& doc, std::vector<Rcl::Doc>& dups)
{
if (m_q->whatDb()) {
PTMutexLocker locker(o_dblock);
std::unique_lock<std::mutex> locker(o_dblock);
return m_q->whatDb()->docDups(doc, dups);
} else {
return false;

View file

@ -52,7 +52,6 @@ using namespace std;
#include "md5ut.h"
#include "rclversion.h"
#include "cancelcheck.h"
#include "ptmutex.h"
#include "termproc.h"
#include "expansiondbs.h"
#include "rclinit.h"
@ -571,7 +570,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
{
#ifdef IDX_THREADS
Chrono chron;
PTMutexLocker lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
#endif
STD_SHARED_PTR<Xapian::Document> doc_cleaner(newdocument_ptr);
@ -640,7 +639,7 @@ bool Db::Native::purgeFileWrite(bool orphansOnly, const string& udi,
// be called by a single thread) to protect about multiple acces
// to xrdb from subDocs() which is also called from needupdate()
// (called from outside the write thread !
PTMutexLocker lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
#endif // IDX_THREADS
string ermsg;
@ -1586,7 +1585,7 @@ bool Db::Native::docToXdocXattrOnly(TextSplitDb *splitter, const string &udi,
{
LOGDEB0("Db::docToXdocXattrOnly\n" );
#ifdef IDX_THREADS
PTMutexLocker lock(m_mutex);
std::unique_lock<std::mutex> lock(m_mutex);
#endif
// Read existing document and its data record
@ -1714,7 +1713,7 @@ void Db::setExistingFlags(const string& udi, unsigned int docid)
return;
}
#ifdef IDX_THREADS
PTMutexLocker lock(m_ndb->m_mutex);
std::unique_lock<std::mutex> lock(m_ndb->m_mutex);
#endif
i_setExistingFlags(udi, docid);
}
@ -1774,7 +1773,7 @@ bool Db::needUpdate(const string &udi, const string& sig,
// thread which also updates the existence map, and even multiple
// accesses to the readonly Xapian::Database are not allowed
// anyway
PTMutexLocker lock(m_ndb->m_mutex);
std::unique_lock<std::mutex> lock(m_ndb->m_mutex);
#endif
// Try to find the document indexed by the uniterm.
@ -1890,7 +1889,7 @@ bool Db::purge()
// else we need to lock out other top level threads. This is just
// a precaution as they should have been waited for by the top
// level actor at this point
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
std::unique_lock<std::mutex> lock(m_ndb->m_mutex);
#endif // IDX_THREADS
// For xapian versions up to 1.0.1, deleting a non-existant
@ -1957,7 +1956,7 @@ bool Db::docExists(const string& uniterm)
{
#ifdef IDX_THREADS
// Need to protect read db against multiaccess.
PTMutexLocker lock(m_ndb->m_mutex);
std::unique_lock<std::mutex> lock(m_ndb->m_mutex);
#endif
string ermsg;

View file

@ -21,6 +21,7 @@
#include "autoconfig.h"
#include <map>
#include <mutex>
#include <xapian.h>
@ -28,7 +29,6 @@
#include "workqueue.h"
#endif // IDX_THREADS
#include "xmacros.h"
#include "ptmutex.h"
namespace Rcl {
@ -81,7 +81,7 @@ class Db::Native {
bool m_noversionwrite; //Set if open failed because of version mismatch!
#ifdef IDX_THREADS
WorkQueue<DbUpdTask*> m_wqueue;
PTMutexInit m_mutex;
std::mutex m_mutex;
long long m_totalworkns;
bool m_havewriteq;
void maybeStartThreads();

View file

@ -30,25 +30,24 @@
#include <string>
#include <algorithm>
#include <iostream>
#include UNORDERED_MAP_INCLUDE
#include "smallut.h"
#include <unordered_map>
#include <mutex>
using std::string;
using std::vector;
#include "smallut.h"
/*
Storage for the exception translations. These are chars which
should not be translated according to what UnicodeData says, but
instead according to some local rule. There will usually be very
few of them, but they must be looked up for every translated char.
*/
STD_UNORDERED_MAP<unsigned short, string> except_trans;
std::unordered_map<unsigned short, string> except_trans;
static inline bool is_except_char(unsigned short c, string& trans)
{
STD_UNORDERED_MAP<unsigned short, string>::const_iterator it
= except_trans.find(c);
auto it = except_trans.find(c);
if (it == except_trans.end())
return false;
trans = it->second;
@ -76,7 +75,6 @@ static inline bool is_except_char(unsigned short c, string& trans)
#include <stdio.h>
#include <stdarg.h>
#endif /* HAVE_VSNPRINTF */
#include <pthread.h>
#include "unac.h"
#include "unac_version.h"
@ -14314,14 +14312,7 @@ int fold_string_utf16(const char* in, size_t in_length,
static const char *utf16be = "UTF-16BE";
static iconv_t u8tou16_cd = (iconv_t)-1;
static iconv_t u16tou8_cd = (iconv_t)-1;
static pthread_mutex_t o_unac_mutex;
static int unac_mutex_is_init;
// Call this or take your chances with the auto init.
void unac_init_mt()
{
pthread_mutex_init(&o_unac_mutex, 0);
unac_mutex_is_init = 1;
}
static std::mutex o_unac_mutex;
/*
* Convert buffer <in> containing string encoded in charset <from> into
@ -14343,14 +14334,7 @@ static int convert(const char* from, const char* to,
int from_utf16, from_utf8, to_utf16, to_utf8, u8tou16, u16tou8;
const char space[] = { 0x00, 0x20 };
/* Note: better call explicit unac_init_mt() before starting threads than
rely on this.
*/
if (unac_mutex_is_init == 0) {
pthread_mutex_init(&o_unac_mutex, 0);
unac_mutex_is_init = 1;
}
pthread_mutex_lock(&o_unac_mutex);
std::unique_lock<std::mutex> lock(o_unac_mutex);
if (!strcmp(utf16be, from)) {
from_utf8 = 0;
@ -14494,7 +14478,6 @@ static int convert(const char* from, const char* to,
ret = 0;
out:
pthread_mutex_unlock(&o_unac_mutex);
return ret;
}

View file

@ -111,9 +111,6 @@ int fold_string(const char* charset,
const char* in, size_t in_length,
char** out, size_t* out_length);
/* To be called before starting threads in mt programs */
void unac_init_mt();
#ifdef BUILDING_RECOLL
#include <string>
/**

View file

@ -17,10 +17,6 @@
#ifndef _LOG_H_X_INCLUDED_
#define _LOG_H_X_INCLUDED_
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <fstream>
#include <iostream>
#include <string>

View file

@ -1,105 +0,0 @@
/* Copyright (C) 2004 J.F.Dockes
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the
* Free Software Foundation, Inc.,
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
//
// Small test program to evaluate the cost of using mutex locks: calls
// to methods doing a small (150 bytes) base64 encoding job + string
// manips, with and without locking. The performance cost is
// negligible on all machines I tested (around 0.3% to 2% depending on
// the system and machine), but not inexistent, you would not want
// this in a tight loop.
#include <stdio.h>
#include <stdlib.h>
#include <string>
using namespace std;
#include "ptmutex.h"
#include "base64.h"
static char *thisprog;
static char usage [] =
"ptmutex [-l] count\n"
"\n"
;
static void
Usage(void)
{
fprintf(stderr, "%s: usage:\n%s", thisprog, usage);
exit(1);
}
static int op_flags;
#define OPT_MOINS 0x1
#define OPT_l 0x2
static const string convertbuffer =
"* The recoll GUI program sometimes crashes when running a query while\
the indexing thread is active. Possible workarounds:";
static PTMutexInit o_lock;
void workerlock(string& out)
{
PTMutexLocker locker(o_lock);
base64_encode(convertbuffer, out);
}
void workernolock(string& out)
{
base64_encode(convertbuffer, out);
}
int main(int argc, char **argv)
{
int count = 0;
thisprog = argv[0];
argc--; argv++;
while (argc > 0 && **argv == '-') {
(*argv)++;
if (!(**argv))
/* Cas du "adb - core" */
Usage();
while (**argv)
switch (*(*argv)++) {
case 'l': op_flags |= OPT_l; break;
default: Usage(); break;
}
b1: argc--; argv++;
}
if (argc != 1)
Usage();
count = atoi(*argv++);argc--;
if (op_flags & OPT_l) {
fprintf(stderr, "Looping %d, locking\n", count);
for (int i = 0; i < count; i++) {
string s;
workerlock(s);
}
} else {
fprintf(stderr, "Looping %d, no locking\n", count);
for (int i = 0; i < count; i++) {
string s;
workernolock(s);
}
}
exit(0);
}

View file

@ -1,64 +0,0 @@
/* Copyright (C) 2011 J.F.Dockes
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the
* Free Software Foundation, Inc.,
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _PTMUTEX_H_INCLUDED_
#define _PTMUTEX_H_INCLUDED_
#include <pthread.h>
/// A trivial wrapper/helper for pthread mutex locks
/// Lock storage with auto-initialization. Must be created before any
/// lock-using thread of course (possibly as a static object).
class PTMutexInit {
public:
pthread_mutex_t m_mutex;
int m_status;
PTMutexInit()
{
m_status = pthread_mutex_init(&m_mutex, 0);
}
};
/// Take the lock when constructed, release when deleted. Can be disabled
/// by constructor params for conditional use.
class PTMutexLocker {
public:
// The nolock arg enables conditional locking
PTMutexLocker(PTMutexInit& l, bool nolock = false)
: m_lock(l), m_status(-1)
{
if (!nolock)
m_status = pthread_mutex_lock(&m_lock.m_mutex);
}
~PTMutexLocker()
{
if (m_status == 0)
pthread_mutex_unlock(&m_lock.m_mutex);
}
int ok() {return m_status == 0;}
// For pthread_cond_wait etc.
pthread_mutex_t *getMutex()
{
return &m_lock.m_mutex;
}
private:
PTMutexInit& m_lock;
int m_status;
};
#endif /* _PTMUTEX_H_INCLUDED_ */

View file

@ -35,7 +35,8 @@
#include <errno.h>
#include <sys/types.h>
#include "safesysstat.h"
#include "ptmutex.h"
#include <mutex>
#include "rclutil.h"
#include "pathut.h"
@ -238,8 +239,8 @@ bool maketmpdir(string& tdir, string& reason)
// mkdir. try to make sure that we at least don't shoot ourselves
// in the foot
#if !defined(HAVE_MKDTEMP) || defined(_WIN32)
static PTMutexInit mlock;
PTMutexLocker lock(mlock);
static std::mutex mmutex;
std::unique_lock lock(mmutex);
#endif
if (!
@ -261,8 +262,8 @@ bool maketmpdir(string& tdir, string& reason)
// There is a race condition between name computation and
// mkdir. try to make sure that we at least don't shoot ourselves
// in the foot
static PTMutexInit mlock;
PTMutexLocker lock(mlock);
static std::mutex mmutex;
std::unique_lock lock(mmutex);
tdir = path_wingettempfilename(TEXT("rcltmp"));
#endif
@ -287,8 +288,8 @@ TempFileInternal::TempFileInternal(const string& suffix)
// well. There is a race condition between name computation and
// file creation. try to make sure that we at least don't shoot
// our own selves in the foot. maybe we'll use mkstemps one day.
static PTMutexInit mlock;
PTMutexLocker lock(mlock);
static std::mutex mmutex;
std::unique_lock<std::mutex> lock(mmutex);
#ifndef _WIN32
string filename = path_cat(tmplocation(), "rcltmpfXXXXXX");

View file

@ -20,7 +20,7 @@
#include <string>
#include <iostream>
#include <mutex>
using std::string;
#include <errno.h>
@ -28,7 +28,7 @@ using std::string;
#include "transcode.h"
#include "log.h"
#include "ptmutex.h"
#ifdef RCL_ICONV_INBUF_CONST
#define ICV_P2_TYPE const char**
#else
@ -53,8 +53,8 @@ bool transcode(const string &in, string &out, const string &icode,
static iconv_t ic = (iconv_t)-1;
static string cachedicode;
static string cachedocode;
static PTMutexInit o_cachediconv_mutex;
PTMutexLocker locker(o_cachediconv_mutex);
static std::mutex o_cachediconv_mutex;
std::unique_lock<std::mutex> lock(o_cachediconv_mutex);
#else
iconv_t ic;
#endif

View file

@ -1,4 +1,4 @@
/* Copyright (C) 2012 J.F.Dockes
/* Copyright (C) 2012-2016 J.F.Dockes
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
@ -17,15 +17,15 @@
#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_
#include <pthread.h>
#include <time.h>
#include <thread>
#include <future>
#include <string>
#include <queue>
#include <list>
#include <mutex>
#include <condition_variable>
#include "log.h"
#include "ptmutex.h"
/**
* A WorkQueue manages the synchronisation around a queue of work items,
@ -39,51 +39,48 @@
* the client or worker sets an end condition on the queue. A second
* queue could conceivably be used for returning individual task
* status.
*
* The strange thread functions argument and return values
* comes from compatibility with an earlier pthread-based
* implementation.
*/
template <class T> class WorkQueue {
public:
/** Create a WorkQueue
* @param name for message printing
* @param hi number of tasks on queue before clients blocks. Default 0
* @param hi number of tasks on queue before clients blocks. Default 0
* meaning no limit. hi == -1 means that the queue is disabled.
* @param lo minimum count of tasks before worker starts. Default 1.
*/
WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
: m_name(name), m_high(hi), m_low(lo),
m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
{
m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
(pthread_cond_init(&m_wcond, 0) == 0);
: m_name(name), m_high(hi), m_low(lo), m_workers_exited(0),
m_ok(true), m_clients_waiting(0), m_workers_waiting(0),
m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0) {
}
~WorkQueue()
{
LOGDEB2("WorkQueue::~WorkQueue:" << (m_name) << "\n" );
if (!m_worker_threads.empty())
~WorkQueue() {
if (!m_worker_threads.empty()) {
setTerminateAndWait();
}
}
/** Start the worker threads.
/** Start the worker threads.
*
* @param nworkers number of threads copies to start.
* @param start_routine thread function. It should loop
* taking (QueueWorker::take()) and executing tasks.
* taking (QueueWorker::take()) and executing tasks.
* @param arg initial parameter to thread function.
* @return true if ok.
*/
bool start(int nworkers, void *(*start_routine)(void *), void *arg)
{
PTMutexLocker lock(m_mutex);
for (int i = 0; i < nworkers; i++) {
int err;
pthread_t thr;
if ((err = pthread_create(&thr, 0, start_routine, arg))) {
LOGERR("WorkQueue:" << (m_name) << ": pthread_create failed, err " << (err) << "\n" );
return false;
}
m_worker_threads.push_back(thr);
bool start(int nworkers, void *(workproc)(void *), void *arg) {
std::unique_lock<std::mutex> lock(m_mutex);
for (int i = 0; i < nworkers; i++) {
std::packaged_task<void *(void *)> task(workproc);
Worker w;
w.res = task.get_future();
w.thr = std::thread(std::move(task), arg);
m_worker_threads.push_back(std::move(w));
}
return true;
}
@ -92,32 +89,37 @@ public:
*
* Sleeps if there are already too many.
*/
bool put(T t)
{
PTMutexLocker lock(m_mutex);
if (!lock.ok() || !ok()) {
LOGERR("WorkQueue::put:" << (m_name) << ": !ok or mutex_lock failed\n" );
bool put(T t, bool flushprevious = false) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
LOGERR("WorkQueue::put:" << m_name << ": !ok\n");
return false;
}
}
while (ok() && m_high > 0 && m_queue.size() >= m_high) {
m_clientsleeps++;
m_clientsleeps++;
// Keep the order: we test ok() AFTER the sleep...
m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
m_clients_waiting--;
m_clients_waiting++;
m_ccond.wait(lock);
if (!ok()) {
m_clients_waiting--;
return false;
}
m_clients_waiting--;
m_clients_waiting--;
}
if (flushprevious) {
while (!m_queue.empty()) {
m_queue.pop();
}
}
m_queue.push(t);
if (m_workers_waiting > 0) {
// Just wake one worker, there is only one new task.
pthread_cond_signal(&m_wcond);
} else {
m_nowake++;
}
if (m_workers_waiting > 0) {
// Just wake one worker, there is only one new task.
m_wcond.notify_one();
} else {
m_nowake++;
}
return true;
}
@ -133,127 +135,138 @@ public:
* (which can control the task flow), else there could be
* tasks in the intermediate queues.
* To rephrase: there is no warranty on return that the queue is actually
* idle EXCEPT if the caller knows that no jobs are still being created.
* idle EXCEPT if the caller knows that no jobs are still being created.
* It would be possible to transform this into a safe call if some kind
* of suspend condition was set on the queue by waitIdle(), to be reset by
* of suspend condition was set on the queue by waitIdle(), to be reset by
* some kind of "resume" call. Not currently the case.
*/
bool waitIdle()
{
PTMutexLocker lock(m_mutex);
if (!lock.ok() || !ok()) {
LOGERR("WorkQueue::waitIdle:" << (m_name) << ": not ok or can't lock\n" );
bool waitIdle() {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
LOGERR("WorkQueue::waitIdle:" << m_name << ": not ok\n");
return false;
}
// We're done when the queue is empty AND all workers are back
// waiting for a task.
while (ok() && (m_queue.size() > 0 ||
while (ok() && (m_queue.size() > 0 ||
m_workers_waiting != m_worker_threads.size())) {
m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
m_clients_waiting--;
m_ok = false;
LOGERR("WorkQueue::waitIdle:" << (m_name) << ": cond_wait failed\n" );
return false;
}
m_clients_waiting--;
m_clients_waiting++;
m_ccond.wait(lock);
m_clients_waiting--;
}
return ok();
}
/** Tell the workers to exit, and wait for them.
/** Tell the workers to exit, and wait for them.
*
* Does not bother about tasks possibly remaining on the queue, so
* should be called after waitIdle() for an orderly shutdown.
*/
void* setTerminateAndWait()
{
PTMutexLocker lock(m_mutex);
LOGDEB("setTerminateAndWait:" << (m_name) << "\n" );
void *setTerminateAndWait() {
std::unique_lock<std::mutex> lock(m_mutex);
LOGDEB("setTerminateAndWait:" << m_name << "\n");
if (m_worker_threads.empty()) {
// Already called ?
return (void*)0;
}
if (m_worker_threads.empty()) {
// Already called ?
return (void*)0;
}
// Wait for all worker threads to have called workerExit()
// Wait for all worker threads to have called workerExit()
m_ok = false;
while (m_workers_exited < m_worker_threads.size()) {
pthread_cond_broadcast(&m_wcond);
m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
LOGERR("WorkQueue::setTerminate:" << (m_name) << ": cond_wait failed\n" );
m_clients_waiting--;
return (void*)0;
}
m_clients_waiting--;
m_wcond.notify_all();
m_clients_waiting++;
m_ccond.wait(lock);
m_clients_waiting--;
}
LOGINFO("" << (m_name) << ": tasks " << (m_tottasks) << " nowakes " << (m_nowake) << " wsleeps " << (m_workersleeps) << " csleeps " << (m_clientsleeps) << "\n" );
// Perform the thread joins and compute overall status
LOGINFO("" << m_name << ": tasks " << m_tottasks << " nowakes " <<
m_nowake << " wsleeps " << m_workersleeps << " csleeps " <<
m_clientsleeps << "\n");
// Perform the thread joins and compute overall status
// Workers return (void*)1 if ok
void *statusall = (void*)1;
std::list<pthread_t>::iterator it;
while (!m_worker_threads.empty()) {
void *status;
it = m_worker_threads.begin();
pthread_join(*it, &status);
if (status == (void *)0)
void *status = m_worker_threads.front().res.get();
m_worker_threads.front().thr.join();
if (status == (void *)0) {
statusall = status;
m_worker_threads.erase(it);
}
m_worker_threads.pop_front();
}
// Reset to start state.
m_workers_exited = m_clients_waiting = m_workers_waiting =
m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
// Reset to start state.
m_workers_exited = m_clients_waiting = m_workers_waiting =
m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
m_ok = true;
LOGDEB("setTerminateAndWait:" << (m_name) << " done\n" );
LOGDEB("setTerminateAndWait:" << m_name << " done\n");
return statusall;
}
/** Take task from queue. Called from worker.
*
* Sleeps if there are not enough. Signal if we go to sleep on empty
*
* Sleeps if there are not enough. Signal if we go to sleep on empty
* queue: client may be waiting for our going idle.
*/
bool take(T* tp, size_t *szp = 0)
{
PTMutexLocker lock(m_mutex);
if (!lock.ok() || !ok()) {
LOGDEB("WorkQueue::take:" << (m_name) << ": not ok\n" );
bool take(T* tp, size_t *szp = 0) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
LOGDEB("WorkQueue::take:" << m_name << ": not ok\n");
return false;
}
}
while (ok() && m_queue.size() < m_low) {
m_workersleeps++;
m_workersleeps++;
m_workers_waiting++;
if (m_queue.empty())
pthread_cond_broadcast(&m_ccond);
if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
// !ok is a normal condition when shutting down
if (ok())
LOGERR("WorkQueue::take:" << (m_name) << ": cond_wait failed or !ok\n" );
if (m_queue.empty()) {
m_ccond.notify_all();
}
m_wcond.wait(lock);
if (!ok()) {
// !ok is a normal condition when shutting down
m_workers_waiting--;
return false;
}
m_workers_waiting--;
}
m_tottasks++;
m_tottasks++;
*tp = m_queue.front();
if (szp)
*szp = m_queue.size();
if (szp) {
*szp = m_queue.size();
}
m_queue.pop();
if (m_clients_waiting > 0) {
// No reason to wake up more than one client thread
pthread_cond_signal(&m_ccond);
} else {
m_nowake++;
}
if (m_clients_waiting > 0) {
// No reason to wake up more than one client thread
m_ccond.notify_one();
} else {
m_nowake++;
}
return true;
}
bool waitminsz(size_t sz) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
return false;
}
while (ok() && m_queue.size() < sz) {
m_workersleeps++;
m_workers_waiting++;
if (m_queue.empty()) {
m_ccond.notify_all();
}
m_wcond.wait(lock);
if (!ok()) {
m_workers_waiting--;
return false;
}
m_workers_waiting--;
}
return true;
}
@ -265,58 +278,57 @@ public:
* false by the shutdown code anyway). The thread must return/exit
* immediately after calling this.
*/
void workerExit()
{
LOGDEB("workerExit:" << (m_name) << "\n" );
PTMutexLocker lock(m_mutex);
void workerExit() {
LOGDEB("workerExit:" << m_name << "\n");
std::unique_lock<std::mutex> lock(m_mutex);
m_workers_exited++;
m_ok = false;
pthread_cond_broadcast(&m_ccond);
m_ccond.notify_all();
}
size_t qsize()
{
PTMutexLocker lock(m_mutex);
size_t sz = m_queue.size();
return sz;
size_t qsize() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
private:
bool ok()
{
bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
if (!isok) {
LOGDEB("WorkQueue:ok:" << (m_name) << ": not ok m_ok " << (m_ok) << " m_workers_exited " << (m_workers_exited) << " m_worker_threads size " << (int(m_worker_threads.size())) << "\n" );
}
bool ok() {
bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
if (!isok) {
LOGDEB("WorkQueue:ok:" << m_name << ": not ok m_ok " << m_ok <<
" m_workers_exited " << m_workers_exited <<
" m_worker_threads size " << m_worker_threads.size() <<
"\n");
}
return isok;
}
long long nanodiff(const struct timespec& older,
const struct timespec& newer)
{
return (newer.tv_sec - older.tv_sec) * 1000000000LL
+ newer.tv_nsec - older.tv_nsec;
}
struct Worker {
std::thread thr;
std::future<void *> res;
};
// Configuration
std::string m_name;
size_t m_high;
size_t m_low;
size_t m_low;
// Status
// Worker threads having called exit
// Worker threads having called exit. Used to decide when we're done
unsigned int m_workers_exited;
// Status
bool m_ok;
// Per-thread data. The data is not used currently, this could be
// a set<pthread_t>
std::list<pthread_t> m_worker_threads;
// Our threads.
std::list<Worker> m_worker_threads;
// Synchronization
// Jobs input queue
std::queue<T> m_queue;
pthread_cond_t m_ccond;
pthread_cond_t m_wcond;
PTMutexInit m_mutex;
// Synchronization
std::condition_variable m_ccond;
std::condition_variable m_wcond;
std::mutex m_mutex;
// Client/Worker threads currently waiting for a job
unsigned int m_clients_waiting;
unsigned int m_workers_waiting;

View file

@ -30,9 +30,11 @@
#include <string>
#include <algorithm>
#include <iostream>
#include UNORDERED_MAP_INCLUDE
#include <unordered_map>
#include <mutex>
using std::string;
using std::vector;
#include "smallut.h"
@ -42,11 +44,10 @@ using std::string;
instead according to some local rule. There will usually be very
few of them, but they must be looked up for every translated char.
*/
STD_UNORDERED_MAP<unsigned short, string> except_trans;
std::unordered_map<unsigned short, string> except_trans;
static inline bool is_except_char(unsigned short c, string& trans)
{
STD_UNORDERED_MAP<unsigned short, string>::const_iterator it
= except_trans.find(c);
auto it = except_trans.find(c);
if (it == except_trans.end())
return false;
trans = it->second;
@ -74,7 +75,6 @@ static inline bool is_except_char(unsigned short c, string& trans)
#include <stdio.h>
#include <stdarg.h>
#endif /* HAVE_VSNPRINTF */
#include <pthread.h>
#include "unac.h"
#include "unac_version.h"
@ -14312,14 +14312,7 @@ int fold_string_utf16(const char* in, size_t in_length,
static const char *utf16be = "UTF-16BE";
static iconv_t u8tou16_cd = (iconv_t)-1;
static iconv_t u16tou8_cd = (iconv_t)-1;
static pthread_mutex_t o_unac_mutex;
static int unac_mutex_is_init;
// Call this or take your chances with the auto init.
void unac_init_mt()
{
pthread_mutex_init(&o_unac_mutex, 0);
unac_mutex_is_init = 1;
}
static std::mutex o_unac_mutex;
/*
* Convert buffer <in> containing string encoded in charset <from> into
@ -14341,14 +14334,7 @@ static int convert(const char* from, const char* to,
int from_utf16, from_utf8, to_utf16, to_utf8, u8tou16, u16tou8;
const char space[] = { 0x00, 0x20 };
/* Note: better call explicit unac_init_mt() before starting threads than
rely on this.
*/
if (unac_mutex_is_init == 0) {
pthread_mutex_init(&o_unac_mutex, 0);
unac_mutex_is_init = 1;
}
pthread_mutex_lock(&o_unac_mutex);
std::unique_lock<std::mutex> lock(o_unac_mutex);
if (!strcmp(utf16be, from)) {
from_utf8 = 0;
@ -14492,7 +14478,6 @@ static int convert(const char* from, const char* to,
ret = 0;
out:
pthread_mutex_unlock(&o_unac_mutex);
return ret;
}

View file

@ -111,10 +111,7 @@ int fold_string(const char* charset,
const char* in, size_t in_length,
char** out, size_t* out_length);
/* To be called before starting threads in mt programs */
void unac_init_mt();
#ifdef RECOLL_DATADIR
#ifdef BUILDING_RECOLL
#include <string>
/**
* Set exceptions for unaccenting, for characters which should not be
@ -128,7 +125,7 @@ void unac_init_mt();
* can't be an exception character, deal with it...
*/
void unac_set_except_translations(const char *spectrans);
#endif /* RECOLL_DATADIR */
#endif /* BUILDING_RECOLL */
/*
* Return unac version number.