From 2875f22e5a542f824abe43ec366a63bc39334d4f Mon Sep 17 00:00:00 2001 From: "B. Petersen" Date: Mon, 18 Jun 2018 20:28:40 +0200 Subject: [PATCH] perform_jobs() tries to execute every job once; the oldest jobs are tried first --- src/mrjob.c | 52 +++++++++++++++++++++------------------------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/mrjob.c b/src/mrjob.c index d5ac48bd..4df1971a 100644 --- a/src/mrjob.c +++ b/src/mrjob.c @@ -595,7 +595,7 @@ void mrjob_kill_actions(mrmailbox_t* mailbox, int action1, int action2) static void mrjob_perform(mrmailbox_t* mailbox, int thread) { - sqlite3_stmt* stmt; + sqlite3_stmt* select_stmt = NULL; mrjob_t job; memset(&job, 0, sizeof(mrjob_t)); @@ -605,27 +605,17 @@ static void mrjob_perform(mrmailbox_t* mailbox, int thread) goto cleanup; } - while( 1 ) + select_stmt = mrsqlite3_prepare_v2_(mailbox->m_sql, + "SELECT id, action, foreign_id, param FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;"); + sqlite3_bind_int64(select_stmt, 1, thread); + sqlite3_bind_int64(select_stmt, 2, time(NULL)); + while( sqlite3_step(select_stmt) == SQLITE_ROW ) { - // get next waiting job - job.m_job_id = 0; - stmt = mrsqlite3_prepare_v2_(mailbox->m_sql, - "SELECT id, action, foreign_id, param FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, id LIMIT 1;"); - sqlite3_bind_int64(stmt, 1, thread); - sqlite3_bind_int64(stmt, 2, time(NULL)); - if( sqlite3_step(stmt) == SQLITE_ROW ) { - job.m_job_id = sqlite3_column_int (stmt, 0); - job.m_action = sqlite3_column_int (stmt, 1); - job.m_foreign_id = sqlite3_column_int (stmt, 2); - mrparam_set_packed(job.m_param, (char*)sqlite3_column_text(stmt, 3)); - } - sqlite3_finalize(stmt); + job.m_job_id = sqlite3_column_int (select_stmt, 0); + job.m_action = sqlite3_column_int (select_stmt, 1); + job.m_foreign_id = sqlite3_column_int (select_stmt, 2); + mrparam_set_packed(job.m_param, (char*)sqlite3_column_text(select_stmt, 3)); - if( job.m_job_id == 0 ) { - break; - } - - // execute job mrmailbox_log_info(mailbox, 0, "Executing job #%i, action %i...", (int)job.m_job_id, (int)job.m_action); job.m_start_again_at = 0; switch( job.m_action ) { @@ -638,29 +628,29 @@ static void mrjob_perform(mrmailbox_t* mailbox, int thread) case MRJ_CONFIGURE_IMAP: mrjob_do_MRJ_CONFIGURE_IMAP (mailbox, &job); break; } - // delete job or execute job later again if( job.m_start_again_at ) { - stmt = mrsqlite3_prepare_v2_(mailbox->m_sql, + sqlite3_stmt* update_stmt = mrsqlite3_prepare_v2_(mailbox->m_sql, "UPDATE jobs SET desired_timestamp=?, param=? WHERE id=?;"); - sqlite3_bind_int64(stmt, 1, job.m_start_again_at); - sqlite3_bind_text (stmt, 2, job.m_param->m_packed, -1, SQLITE_STATIC); - sqlite3_bind_int (stmt, 3, job.m_job_id); - sqlite3_step(stmt); - sqlite3_finalize(stmt); + sqlite3_bind_int64(update_stmt, 1, job.m_start_again_at); + sqlite3_bind_text (update_stmt, 2, job.m_param->m_packed, -1, SQLITE_STATIC); + sqlite3_bind_int (update_stmt, 3, job.m_job_id); + sqlite3_step(update_stmt); + sqlite3_finalize(update_stmt); mrmailbox_log_info(mailbox, 0, "Job #%i delayed for %i seconds", (int)job.m_job_id, (int)(job.m_start_again_at-time(NULL))); } else { - stmt = mrsqlite3_prepare_v2_(mailbox->m_sql, + sqlite3_stmt* delete_stmt = mrsqlite3_prepare_v2_(mailbox->m_sql, "DELETE FROM jobs WHERE id=?;"); - sqlite3_bind_int(stmt, 1, job.m_job_id); - sqlite3_step(stmt); - sqlite3_finalize(stmt); + sqlite3_bind_int(delete_stmt, 1, job.m_job_id); + sqlite3_step(delete_stmt); + sqlite3_finalize(delete_stmt); mrmailbox_log_info(mailbox, 0, "Job #%i done and deleted from database", (int)job.m_job_id); } } cleanup: mrparam_unref(job.m_param); + if( select_stmt ) { sqlite3_finalize(select_stmt); } }