better stats in workqueue

This commit is contained in:
Jean-Francois Dockes 2012-12-03 19:11:59 +01:00
parent 9c6b88d2f2
commit 98c38513e7
2 changed files with 21 additions and 12 deletions

View file

@ -1241,8 +1241,8 @@ void Db::waitUpdIdle()
LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str())); LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str()));
} }
m_ndb->m_totalworkns += chron.nanos(); m_ndb->m_totalworkns += chron.nanos();
LOGDEB(("Db::waitUpdIdle: total work %lld mS\n", LOGINFO(("Db::waitUpdIdle: total xapian work %lld mS\n",
m_ndb->m_totalworkns/1000000)); m_ndb->m_totalworkns/1000000));
} }
} }
#endif #endif

View file

@ -63,9 +63,8 @@ public:
*/ */
WorkQueue(const string& name, int hi = 0, int lo = 1) WorkQueue(const string& name, int hi = 0, int lo = 1)
: m_name(name), m_high(hi), m_low(lo), : m_name(name), m_high(hi), m_low(lo),
m_workers_waiting(0), m_workers_exited(0), m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
m_clients_waiting(0), m_tottasks(0), m_nowake(0), m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
m_workersleeps(0)
{ {
m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) && m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) &&
(pthread_cond_init(&m_wcond, 0) == 0); (pthread_cond_init(&m_wcond, 0) == 0);
@ -116,6 +115,7 @@ public:
} }
while (ok() && m_high > 0 && m_queue.size() >= m_high) { while (ok() && m_high > 0 && m_queue.size() >= m_high) {
m_clientsleeps++;
// Keep the order: we test ok() AFTER the sleep... // Keep the order: we test ok() AFTER the sleep...
m_clients_waiting++; m_clients_waiting++;
if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) { if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
@ -203,8 +203,9 @@ public:
m_clients_waiting--; m_clients_waiting--;
} }
LOGDEB(("%s: %u tasks %u nowakes %u wsleeps \n", m_name.c_str(), LOGINFO(("%s: tasks %u nowakes %u wsleeps %u csleeps %u\n",
m_tottasks, m_nowake, m_workersleeps)); m_name.c_str(), m_tottasks, m_nowake, m_workersleeps,
m_clientsleeps));
// Perform the thread joins and compute overall status // Perform the thread joins and compute overall status
// Workers return (void*)1 if ok // Workers return (void*)1 if ok
void *statusall = (void*)1; void *statusall = (void*)1;
@ -219,8 +220,8 @@ public:
} }
// Reset to start state. // Reset to start state.
m_workers_waiting = m_workers_exited = m_clients_waiting = m_tottasks = m_workers_exited = m_clients_waiting = m_workers_waiting =
m_nowake = m_workersleeps = 0; m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
m_ok = true; m_ok = true;
LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str())); LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
@ -312,26 +313,34 @@ private:
+ newer.tv_nsec - older.tv_nsec; + newer.tv_nsec - older.tv_nsec;
} }
// Configuration
string m_name; string m_name;
size_t m_high; size_t m_high;
size_t m_low; size_t m_low;
/* Worker threads currently waiting for a job */ // Status
unsigned int m_workers_waiting; // Worker threads having called exit
unsigned int m_workers_exited; unsigned int m_workers_exited;
bool m_ok;
// Per-thread data. The data is not used currently, this could be // Per-thread data. The data is not used currently, this could be
// a set<pthread_t> // a set<pthread_t>
unordered_map<pthread_t, WQTData> m_worker_threads; unordered_map<pthread_t, WQTData> m_worker_threads;
// Synchronization
queue<T> m_queue; queue<T> m_queue;
pthread_cond_t m_ccond; pthread_cond_t m_ccond;
pthread_cond_t m_wcond; pthread_cond_t m_wcond;
PTMutexInit m_mutex; PTMutexInit m_mutex;
// Client/Worker threads currently waiting for a job
unsigned int m_clients_waiting; unsigned int m_clients_waiting;
unsigned int m_workers_waiting;
// Statistics
unsigned int m_tottasks; unsigned int m_tottasks;
unsigned int m_nowake; unsigned int m_nowake;
unsigned int m_workersleeps; unsigned int m_workersleeps;
bool m_ok; unsigned int m_clientsleeps;
}; };
#endif /* _WORKQUEUE_H_INCLUDED_ */ #endif /* _WORKQUEUE_H_INCLUDED_ */