timing and traces

This commit is contained in:
Jean-Francois Dockes 2012-11-26 09:16:06 +01:00
parent d084ac9899
commit e412dc0d20
4 changed files with 41 additions and 28 deletions

View file

@ -5,5 +5,5 @@ include $(depth)/mk/localdefs
ALL_CXXFLAGS = $(CXXFLAGS) $(COMMONCXXFLAGS) $(LOCALCXXFLAGS) \
-D_GNU_SOURCE
LIBSYS = -lpthread -ldl
LIBSYS = -lpthread -ldl -lrt
LIBSYSTHREADS = -lrt

View file

@ -801,11 +801,12 @@ void *DbUpdWorker(void* vdbp)
DbUpdTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
size_t qsz;
if (!tqp->take(&tsk, &qsz)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz)));
if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm,
tsk->doc, tsk->txtlen)) {
LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n"));
@ -1163,7 +1164,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
// Test if we're over the flush threshold (limit memory usage):
bool ret = m_rcldb->maybeflush(textlen);
#ifdef IDX_THREADS
m_totalworkus += chron.micros();
m_totalworkns += chron.nanos();
#endif
return ret;
}
@ -1171,9 +1172,18 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
#ifdef IDX_THREADS
void Db::waitUpdIdle()
{
Chrono chron;
m_ndb->m_wqueue.waitIdle();
string ermsg;
try {
m_ndb->xwdb.flush();
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str()));
}
m_ndb->m_totalworkns += chron.nanos();
LOGDEB(("Db::waitUpdIdle: total work %lld mS\n",
m_ndb->m_totalworkus/1000));
m_ndb->m_totalworkns/1000000));
}
#endif

View file

@ -63,7 +63,7 @@ class Db::Native {
WorkQueue<DbUpdTask*> m_wqueue;
int m_loglevel;
PTMutexInit m_mutex;
long long m_totalworkus;
long long m_totalworkns;
#endif // IDX_THREADS
// Indexing
@ -79,7 +79,7 @@ class Db::Native {
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue("DbUpd", 2), m_totalworkus(0LL)
, m_wqueue("DbUpd", 2), m_totalworkns(0LL)
#endif // IDX_THREADS
{
LOGDEB2(("Native::Native: me %p\n", this));

View file

@ -31,6 +31,7 @@ using std::string;
#include "debuglog.h"
/// Just an initialized timespec. Not really used any more.
class WQTData {
public:
WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
@ -40,7 +41,7 @@ class WQTData {
/**
* A WorkQueue manages the synchronisation around a queue of work items,
* where a number of client threads queue tasks and a number of worker
* threads takes and executes them. The goal is to introduce some level
* threads take and execute them. The goal is to introduce some level
* of parallelism between the successive steps of a previously single
* threaded pipe-line (data extraction / data preparation / index
* update).
@ -60,9 +61,10 @@ public:
* @param lo minimum count of tasks before worker starts. Default 1.
*/
WorkQueue(const string& name, int hi = 0, int lo = 1)
: m_name(name), m_high(hi), m_low(lo), m_size(0),
: m_name(name), m_high(hi), m_low(lo),
m_workers_waiting(0), m_workers_exited(0),
m_clients_waiting(0), m_tottasks(0), m_nowake(0)
m_clients_waiting(0), m_tottasks(0), m_nowake(0),
m_workersleeps(0)
{
m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
(pthread_cond_init(&m_wcond, 0) == 0) &&
@ -112,15 +114,14 @@ public:
// Keep the order: we test ok() AFTER the sleep...
m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) {
pthread_mutex_unlock(&m_mutex);
m_clients_waiting--;
pthread_mutex_unlock(&m_mutex);
return false;
}
m_clients_waiting--;
}
m_queue.push(t);
++m_size;
if (m_workers_waiting > 0) {
// Just wake one worker, there is only one new task.
pthread_cond_signal(&m_wcond);
@ -155,9 +156,9 @@ public:
m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, &m_mutex)) {
m_clients_waiting--;
pthread_mutex_unlock(&m_mutex);
m_ok = false;
LOGERR(("WorkQueue::waitIdle: cond_wait failed\n"));
pthread_mutex_unlock(&m_mutex);
return false;
}
m_clients_waiting--;
@ -196,8 +197,8 @@ public:
m_clients_waiting--;
}
LOGDEB(("%s: %u tasks %u nowakes\n", m_name.c_str(), m_tottasks,
m_nowake));
LOGDEB(("%s: %u tasks %u nowakes %u wsleeps \n", m_name.c_str(),
m_tottasks, m_nowake, m_workersleeps));
// Perform the thread joins and compute overall status
// Workers return (void*)1 if ok
void *statusall = (void*)1;
@ -210,8 +211,8 @@ public:
statusall = status;
m_worker_threads.erase(it);
}
pthread_mutex_unlock(&m_mutex);
LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
pthread_mutex_unlock(&m_mutex);
return statusall;
}
@ -220,12 +221,13 @@ public:
* Sleeps if there are not enough. Signal if we go
* to sleep on empty queue: client may be waiting for our going idle.
*/
bool take(T* tp)
bool take(T* tp, size_t *szp = 0)
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false;
while (ok() && m_queue.size() < m_low) {
m_workersleeps++;
m_workers_waiting++;
if (m_queue.empty())
pthread_cond_broadcast(&m_ccond);
@ -234,8 +236,8 @@ public:
if (ok())
LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
m_name.c_str()));
pthread_mutex_unlock(&m_mutex);
m_workers_waiting--;
pthread_mutex_unlock(&m_mutex);
return false;
}
m_workers_waiting--;
@ -243,8 +245,9 @@ public:
m_tottasks++;
*tp = m_queue.front();
if (szp)
*szp = m_queue.size();
m_queue.pop();
--m_size;
if (m_clients_waiting > 0) {
// No reason to wake up more than one client thread
pthread_cond_signal(&m_ccond);
@ -272,15 +275,12 @@ public:
pthread_mutex_unlock(&m_mutex);
}
/** Return current queue size. Debug only.
*
* As the size is returned while the queue is unlocked, there
* is no warranty on its consistency. Not that we use the member
* size, not the container size() call which would need locking.
*/
size_t size()
size_t qsize()
{
return m_size;
pthread_mutex_lock(&m_mutex);
size_t sz = m_queue.size();
pthread_mutex_unlock(&m_mutex);
return sz;
}
private:
@ -299,11 +299,13 @@ private:
string m_name;
size_t m_high;
size_t m_low;
size_t m_size;
/* Worker threads currently waiting for a job */
unsigned int m_workers_waiting;
unsigned int m_workers_exited;
// Per-thread data. The data is not used currently, this could be
// a set<pthread_t>
unordered_map<pthread_t, WQTData> m_worker_threads;
queue<T> m_queue;
pthread_cond_t m_ccond;
@ -312,6 +314,7 @@ private:
unsigned int m_clients_waiting;
unsigned int m_tottasks;
unsigned int m_nowake;
unsigned int m_workersleeps;
bool m_ok;
};