1
0
Fork 0
mirror of https://github.com/deltachat/deltachat-core.git synced 2025-10-05 19:42:04 +02:00

allow jobs for different threads

This commit is contained in:
B. Petersen 2018-06-12 23:18:59 +02:00
parent 40dbd099ef
commit aafb6b5915
5 changed files with 80 additions and 55 deletions

View file

@ -25,12 +25,7 @@
#include "mrosnative.h"
/*******************************************************************************
* The job thread
******************************************************************************/
void mrjob_perform(mrmailbox_t* mailbox)
void mrjob_perform(mrmailbox_t* mailbox, int thread)
{
sqlite3_stmt* stmt;
mrjob_t job;
@ -42,15 +37,17 @@ void mrjob_perform(mrmailbox_t* mailbox)
{
/* get next waiting job */
job.m_job_id = 0;
stmt = mrsqlite3_predefine__(mailbox->m_sql, SELECT_iafp_FROM_jobs,
"SELECT id, action, foreign_id, param FROM jobs WHERE desired_timestamp<=? ORDER BY action DESC, id LIMIT 1;");
sqlite3_bind_int64(stmt, 1, time(NULL));
if( sqlite3_step(stmt) == SQLITE_ROW ) {
job.m_job_id = sqlite3_column_int (stmt, 0);
job.m_action = sqlite3_column_int (stmt, 1);
job.m_foreign_id = sqlite3_column_int (stmt, 2);
mrparam_set_packed(job.m_param, (char*)sqlite3_column_text(stmt, 3));
}
stmt = mrsqlite3_prepare_v2_(mailbox->m_sql,
"SELECT id, action, foreign_id, param FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, id LIMIT 1;");
sqlite3_bind_int64(stmt, 1, thread);
sqlite3_bind_int64(stmt, 2, time(NULL));
if( sqlite3_step(stmt) == SQLITE_ROW ) {
job.m_job_id = sqlite3_column_int (stmt, 0);
job.m_action = sqlite3_column_int (stmt, 1);
job.m_foreign_id = sqlite3_column_int (stmt, 2);
mrparam_set_packed(job.m_param, (char*)sqlite3_column_text(stmt, 3));
}
sqlite3_finalize(stmt);
if( job.m_job_id == 0 ) {
break;
@ -71,23 +68,21 @@ void mrjob_perform(mrmailbox_t* mailbox)
/* delete job or execute job later again */
if( job.m_start_again_at ) {
mrsqlite3_lock(mailbox->m_sql);
stmt = mrsqlite3_predefine__(mailbox->m_sql, UPDATE_jobs_SET_dp_WHERE_id,
"UPDATE jobs SET desired_timestamp=?, param=? WHERE id=?;");
sqlite3_bind_int64(stmt, 1, job.m_start_again_at);
sqlite3_bind_text (stmt, 2, job.m_param->m_packed, -1, SQLITE_STATIC);
sqlite3_bind_int (stmt, 3, job.m_job_id);
sqlite3_step(stmt);
mrsqlite3_unlock(mailbox->m_sql);
stmt = mrsqlite3_prepare_v2_(mailbox->m_sql,
"UPDATE jobs SET desired_timestamp=?, param=? WHERE id=?;");
sqlite3_bind_int64(stmt, 1, job.m_start_again_at);
sqlite3_bind_text (stmt, 2, job.m_param->m_packed, -1, SQLITE_STATIC);
sqlite3_bind_int (stmt, 3, job.m_job_id);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
mrmailbox_log_info(mailbox, 0, "Job #%i delayed for %i seconds", (int)job.m_job_id, (int)(job.m_start_again_at-time(NULL)));
}
else {
mrsqlite3_lock(mailbox->m_sql);
stmt = mrsqlite3_predefine__(mailbox->m_sql, DELETE_FROM_jobs_WHERE_id,
"DELETE FROM jobs WHERE id=?;");
sqlite3_bind_int(stmt, 1, job.m_job_id);
sqlite3_step(stmt);
mrsqlite3_unlock(mailbox->m_sql);
stmt = mrsqlite3_prepare_v2_(mailbox->m_sql,
"DELETE FROM jobs WHERE id=?;");
sqlite3_bind_int(stmt, 1, job.m_job_id);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
mrmailbox_log_info(mailbox, 0, "Job #%i done and deleted from database", (int)job.m_job_id);
}
}
@ -107,18 +102,29 @@ uint32_t mrjob_add__(mrmailbox_t* mailbox, int action, int foreign_id, const cha
time_t timestamp = time(NULL);
sqlite3_stmt* stmt;
uint32_t job_id = 0;
int thread;
stmt = mrsqlite3_predefine__(mailbox->m_sql, INSERT_INTO_jobs_aafp,
"INSERT INTO jobs (added_timestamp, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?);");
sqlite3_bind_int64(stmt, 1, timestamp);
sqlite3_bind_int (stmt, 2, action);
sqlite3_bind_int (stmt, 3, foreign_id);
sqlite3_bind_text (stmt, 4, param? param : "", -1, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 5, delay_seconds>0? (timestamp+delay_seconds) : 0);
if( sqlite3_step(stmt) != SQLITE_DONE ) {
if( action >= MR_IMAP_THREAD && action < MR_IMAP_THREAD+1000 ) {
thread = MR_IMAP_THREAD;
}
else if( action >= MR_SMTP_THREAD && action < MR_SMTP_THREAD+1000 ) {
thread = MR_SMTP_THREAD;
}
else {
return 0;
}
stmt = mrsqlite3_prepare_v2_(mailbox->m_sql,
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);");
sqlite3_bind_int64(stmt, 1, timestamp);
sqlite3_bind_int (stmt, 2, thread);
sqlite3_bind_int (stmt, 3, action);
sqlite3_bind_int (stmt, 4, foreign_id);
sqlite3_bind_text (stmt, 5, param? param : "", -1, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 6, delay_seconds>0? (timestamp+delay_seconds) : 0);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
job_id = sqlite3_last_insert_rowid(mailbox->m_sql->m_cobj);
mrmailbox_interrupt_idle(mailbox);
@ -169,10 +175,11 @@ void mrjob_kill_actions__(mrmailbox_t* mailbox, int action1, int action2)
return;
}
sqlite3_stmt* stmt = mrsqlite3_predefine__(mailbox->m_sql, DELETE_FROM_jobs_WHERE_action,
sqlite3_stmt* stmt = mrsqlite3_prepare_v2_(mailbox->m_sql,
"DELETE FROM jobs WHERE action=? OR action=?;");
sqlite3_bind_int(stmt, 1, action1);
sqlite3_bind_int(stmt, 2, action2);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
}

View file

@ -26,13 +26,22 @@ extern "C" {
#endif
#define MRJ_DELETE_MSG_ON_IMAP 100 /* low priority ... */
#define MRJ_MARKSEEN_MDN_ON_IMAP 102
#define MRJ_SEND_MDN 105
#define MRJ_MARKSEEN_MSG_ON_IMAP 110
// thread IDs
#define MR_IMAP_THREAD 100
#define MR_SMTP_THREAD 5000
// jobs in the IMAP-thread
#define MRJ_DELETE_MSG_ON_IMAP 110 // low priority ...
#define MRJ_MARKSEEN_MDN_ON_IMAP 120
#define MRJ_MARKSEEN_MSG_ON_IMAP 130
#define MRJ_SEND_MSG_TO_IMAP 700
#define MRJ_SEND_MSG_TO_SMTP 800 /* ... high priority*/
#define MRJ_CONFIGURE_IMAP 900
#define MRJ_CONFIGURE_IMAP 900 // ... high priority
// jobs in the SMTP-thread
#define MRJ_SEND_MDN 5010 // low priority ...
#define MRJ_SEND_MSG_TO_SMTP 5900 // ... high priority
/**
@ -50,7 +59,7 @@ typedef struct mrjob_t
time_t m_start_again_at; /* 1=on next loop, >1=on timestamp, 0=delete job (default) */
} mrjob_t;
void mrjob_perform (mrmailbox_t*);
void mrjob_perform (mrmailbox_t*, int thread);
uint32_t mrjob_add__ (mrmailbox_t*, int action, int foreign_id, const char* param, int delay); /* returns the job_id or 0 on errors. the job may or may not be done if the function returns. */
void mrjob_kill_actions__ (mrmailbox_t*, int action1, int action2); /* delete all pending jobs with the given actions */

View file

@ -27,6 +27,12 @@
#include "mrjob.h"
#include "mrimap.h"
#include "mrsmtp.h"
#include "mrmimefactory.h"
/*******************************************************************************
* High-level IMAP-thread functions
******************************************************************************/
int mrmailbox_ll_connect_to_imap(mrmailbox_t* mailbox, mrjob_t* job /*may be NULL if the function is called directly!*/)
@ -88,11 +94,11 @@ void mrmailbox_perform_jobs(mrmailbox_t* mailbox)
return;
}
mrmailbox_log_info(mailbox, 0, ">>>>> perform-jobs started.");
mrmailbox_log_info(mailbox, 0, ">>>>> perform-IMAP-jobs started.");
mrjob_perform(mailbox);
mrjob_perform(mailbox, MR_IMAP_THREAD);
mrmailbox_log_info(mailbox, 0, "<<<<< perform-jobs ended.");
mrmailbox_log_info(mailbox, 0, "<<<<< perform-IMAP-jobs ended.");
}

View file

@ -421,6 +421,16 @@ int mrsqlite3_open__(mrsqlite3_t* ths, const char* dbfile, int flags)
}
#undef NEW_DB_VERSION
#define NEW_DB_VERSION 40
if( dbversion < NEW_DB_VERSION )
{
mrsqlite3_execute__(ths, "ALTER TABLE jobs ADD COLUMN thread INTEGER DEFAULT 0;");
dbversion = NEW_DB_VERSION;
mrsqlite3_set_config_int__(ths, "dbversion", NEW_DB_VERSION);
}
#undef NEW_DB_VERSION
// (2) updates that require high-level objects (the structure is complete now and all objects are usable)
if( recalc_fingerprints )
{

View file

@ -131,13 +131,6 @@ enum
,SELECT_COUNT_FROM_msgs_mdns_WHERE_m
,DELETE_FROM_msgs_mdns_WHERE_m
,INSERT_INTO_jobs_aafp
,SELECT_MIN_d_FROM_jobs
,SELECT_iafp_FROM_jobs
,DELETE_FROM_jobs_WHERE_id
,DELETE_FROM_jobs_WHERE_action
,UPDATE_jobs_SET_dp_WHERE_id
,SELECT_FROM_leftgrps_WHERE_grpid
,INSERT_INTO_acpeerstates_a