m_havewriteq was not always initialized, resulting in stray flush call

This commit is contained in:
Jean-Francois Dockes 2012-12-28 16:34:15 +01:00
parent 2172c5d38d
commit f52bdf5cbb
2 changed files with 19 additions and 15 deletions

View file

@ -132,7 +132,8 @@ Db::Native::Native(Db *db)
m_noversionwrite(false) m_noversionwrite(false)
#ifdef IDX_THREADS #ifdef IDX_THREADS
, m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first), , m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
m_totalworkns(0LL) m_loglevel(4),
m_totalworkns(0LL), m_havewriteq(false)
#endif // IDX_THREADS #endif // IDX_THREADS
{ {
LOGDEB1(("Native::Native: me %p\n", this)); LOGDEB1(("Native::Native: me %p\n", this));
@ -142,7 +143,7 @@ Db::Native::~Native()
{ {
LOGDEB1(("Native::~Native: me %p\n", this)); LOGDEB1(("Native::~Native: me %p\n", this));
#ifdef IDX_THREADS #ifdef IDX_THREADS
if (m_haveWriteQ) { if (m_havewriteq) {
void *status = m_wqueue.setTerminateAndWait(); void *status = m_wqueue.setTerminateAndWait();
LOGDEB2(("Native::~Native: worker status %ld\n", long(status))); LOGDEB2(("Native::~Native: worker status %ld\n", long(status)));
} }
@ -186,7 +187,7 @@ void Db::Native::maybeStartThreads()
{ {
m_loglevel = DebugLog::getdbl()->getlevel(); m_loglevel = DebugLog::getdbl()->getlevel();
m_haveWriteQ = false; m_havewriteq = false;
const RclConfig *cnf = m_rcldb->m_config; const RclConfig *cnf = m_rcldb->m_config;
int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first; int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first;
int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second; int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second;
@ -199,10 +200,10 @@ void Db::Native::maybeStartThreads()
LOGERR(("Db::Db: Worker start failed\n")); LOGERR(("Db::Db: Worker start failed\n"));
return; return;
} }
m_haveWriteQ = true; m_havewriteq = true;
} }
LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n", LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n",
m_haveWriteQ, writeqlen, writethreads)); m_havewriteq, writeqlen, writethreads));
} }
#endif // IDX_THREADS #endif // IDX_THREADS
@ -500,6 +501,9 @@ bool Db::i_close(bool final)
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION); m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION);
LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n")); LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n"));
} }
#ifdef IDX_THREADS
waitUpdIdle();
#endif
// Used to do a flush here. Cant see why it should be necessary. // Used to do a flush here. Cant see why it should be necessary.
deleteZ(m_ndb); deleteZ(m_ndb);
if (w) if (w)
@ -1138,7 +1142,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
newdocument.set_data(record); newdocument.set_data(record);
#ifdef IDX_THREADS #ifdef IDX_THREADS
if (m_ndb->m_haveWriteQ) { if (m_ndb->m_havewriteq) {
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument,
doc.text.length()); doc.text.length());
if (!m_ndb->m_wqueue.put(tp)) { if (!m_ndb->m_wqueue.put(tp)) {
@ -1163,7 +1167,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
// thread, we only need to protect the update map update below // thread, we only need to protect the update map update below
// (against interaction with threads calling needUpdate()). Else, // (against interaction with threads calling needUpdate()). Else,
// all threads from above need to synchronize here // all threads from above need to synchronize here
PTMutexLocker lock(m_mutex, m_haveWriteQ); PTMutexLocker lock(m_mutex, m_havewriteq);
#endif #endif
// Check file system full every mbyte of indexed text. It's a bit wasteful // Check file system full every mbyte of indexed text. It's a bit wasteful
@ -1193,7 +1197,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
#ifdef IDX_THREADS #ifdef IDX_THREADS
// Need to protect against interaction with the up-to-date checks // Need to protect against interaction with the up-to-date checks
// which also update the existence map // which also update the existence map
PTMutexLocker lock(m_mutex, !m_haveWriteQ); PTMutexLocker lock(m_mutex, !m_havewriteq);
#endif #endif
if (did < m_rcldb->updated.size()) { if (did < m_rcldb->updated.size()) {
m_rcldb->updated[did] = true; m_rcldb->updated[did] = true;
@ -1229,7 +1233,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
#ifdef IDX_THREADS #ifdef IDX_THREADS
void Db::waitUpdIdle() void Db::waitUpdIdle()
{ {
if (m_ndb->m_haveWriteQ) { if (m_ndb->m_iswritable && m_ndb->m_havewriteq) {
Chrono chron; Chrono chron;
m_ndb->m_wqueue.waitIdle(); m_ndb->m_wqueue.waitIdle();
// We flush here just for correct measurement of the thread work time // We flush here just for correct measurement of the thread work time
@ -1415,12 +1419,12 @@ bool Db::purge()
#ifdef IDX_THREADS #ifdef IDX_THREADS
// If we manage our own write queue, make sure it's drained and closed // If we manage our own write queue, make sure it's drained and closed
if (m_ndb->m_haveWriteQ) if (m_ndb->m_havewriteq)
m_ndb->m_wqueue.setTerminateAndWait(); m_ndb->m_wqueue.setTerminateAndWait();
// else we need to lock out other top level threads. This is just // else we need to lock out other top level threads. This is just
// a precaution as they should have been waited for by the top // a precaution as they should have been waited for by the top
// level actor at this point // level actor at this point
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
#endif // IDX_THREADS #endif // IDX_THREADS
// For xapian versions up to 1.0.1, deleting a non-existant // For xapian versions up to 1.0.1, deleting a non-existant
@ -1488,7 +1492,7 @@ bool Db::docExists(const string& uniterm)
#ifdef IDX_THREADS #ifdef IDX_THREADS
// If we're not running our own (single) thread, need to protect // If we're not running our own (single) thread, need to protect
// read db against multiaccess (e.g. from needUpdate(), or this method). // read db against multiaccess (e.g. from needUpdate(), or this method).
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
#endif #endif
string ermsg; string ermsg;
@ -1521,7 +1525,7 @@ bool Db::purgeFile(const string &udi, bool *existed)
return true; return true;
#ifdef IDX_THREADS #ifdef IDX_THREADS
if (m_ndb->m_haveWriteQ) { if (m_ndb->m_havewriteq) {
Xapian::Document xdoc; Xapian::Document xdoc;
DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1); DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1);
if (!m_ndb->m_wqueue.put(tp)) { if (!m_ndb->m_wqueue.put(tp)) {
@ -1542,7 +1546,7 @@ bool Db::purgeFileWrite(const string& udi, const string& uniterm)
// If we have a write queue we're called from there, and single // If we have a write queue we're called from there, and single
// threaded, no locking. Else need to mutex other threads from // threaded, no locking. Else need to mutex other threads from
// above // above
PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq);
#endif // IDX_THREADS #endif // IDX_THREADS
Xapian::WritableDatabase db = m_ndb->xwdb; Xapian::WritableDatabase db = m_ndb->xwdb;

View file

@ -66,7 +66,7 @@ class Db::Native {
int m_loglevel; int m_loglevel;
PTMutexInit m_mutex; PTMutexInit m_mutex;
long long m_totalworkns; long long m_totalworkns;
bool m_haveWriteQ; bool m_havewriteq;
void maybeStartThreads(); void maybeStartThreads();
#endif // IDX_THREADS #endif // IDX_THREADS