1
0
Fork 0
mirror of https://github.com/deltachat/deltachat-core.git synced 2025-10-06 03:50:08 +02:00

Merge pull request #417 from deltachat/backoff

use exponential backoff
This commit is contained in:
holger krekel 2018-11-12 21:52:03 +01:00 committed by GitHub
commit 502e44ab17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 165 additions and 38 deletions

View file

@ -411,7 +411,7 @@ char* dc_cmdline(dc_context_t* context, const char* cmdline)
"configure\n"
"connect\n"
"disconnect\n"
"poll\n"
"maybenetwork\n"
"help imex (Import/Export)\n"
"==============================Chat commands==\n"
"listchats [<query>]\n"
@ -639,6 +639,11 @@ char* dc_cmdline(dc_context_t* context, const char* cmdline)
ret = COMMAND_FAILED;
}
}
else if (strcmp(cmd, "maybenetwork")==0)
{
dc_maybe_network(context);
ret = COMMAND_SUCCEEDED;
}
/*******************************************************************************
* Chat commands

View file

@ -35,6 +35,10 @@ endif
# pthreads is not a real dependency
pthreads = dependency('threads')
# declare option to link against libm
cc = meson.get_compiler('c')
math = cc.find_library('m')
# zlib should move grow static-pic-lib support and be handled like
# this as well.
zlib = dependency('zlib', fallback: ['zlib', 'zlib_dep'])

View file

@ -50,6 +50,7 @@ struct _dc_context
dc_imap_t* imap; /**< Internal IMAP object, never NULL */
pthread_mutex_t imapidle_condmutex;
int perform_imap_jobs_needed;
int probe_imap_network; /**< if this flag is set, the imap-job timeouts are bypassed and messages are sent until they fail */
dc_smtp_t* smtp; /**< Internal SMTP object, never NULL */
pthread_cond_t smtpidle_cond;
@ -60,6 +61,7 @@ struct _dc_context
#define DC_JOBS_NEEDED_AT_ONCE 1
#define DC_JOBS_NEEDED_AVOID_DOS 2
int perform_smtp_jobs_needed;
int probe_smtp_network; /**< if this flag is set, the smtp-job timeouts are bypassed and messages are sent until they fail */
dc_callback_t cb; /**< Internal */

View file

@ -1,5 +1,6 @@
#include <stdarg.h>
#include <unistd.h>
#include <math.h>
#include "dc_context.h"
#include "dc_loginparam.h"
#include "dc_job.h"
@ -409,6 +410,48 @@ static void dc_suspend_smtp_thread(dc_context_t* context, int suspend)
******************************************************************************/
static time_t get_backoff_time_offset(int c_tries)
{
#define MULTIPLY 60
#define JOB_RETRIES 16 // results in ~3 weeks for the last backoff timespan
int N = (int)pow((double)2, c_tries) - 1;
int r = rand() % (N+1);
time_t seconds = r * MULTIPLY;
if (seconds<1) {
seconds = 1;
}
return seconds;
}
static time_t get_next_wakeup_time(dc_context_t* context, int thread)
{
time_t wakeup_time = 0;
sqlite3_stmt* stmt = NULL;
stmt = dc_sqlite3_prepare(context->sql,
"SELECT MIN(desired_timestamp)"
" FROM jobs"
" WHERE thread=?;");
sqlite3_bind_int(stmt, 1, thread);
if (sqlite3_step(stmt)==SQLITE_ROW) {
wakeup_time = sqlite3_column_int(stmt, 0);
}
if (wakeup_time==0) {
wakeup_time = time(NULL) + 10*60;
}
sqlite3_finalize(stmt);
return wakeup_time;
}
void dc_job_add(dc_context_t* context, int action, int foreign_id, const char* param, int delay_seconds)
{
time_t timestamp = time(NULL);
@ -432,7 +475,7 @@ void dc_job_add(dc_context_t* context, int action, int foreign_id, const char* p
sqlite3_bind_int (stmt, 3, action);
sqlite3_bind_int (stmt, 4, foreign_id);
sqlite3_bind_text (stmt, 5, param? param : "", -1, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 6, delay_seconds>0? (timestamp+delay_seconds) : 0);
sqlite3_bind_int64(stmt, 6, timestamp+delay_seconds);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
@ -447,12 +490,16 @@ void dc_job_add(dc_context_t* context, int action, int foreign_id, const char* p
static void dc_job_update(dc_context_t* context, const dc_job_t* job)
{
sqlite3_stmt* update_stmt = dc_sqlite3_prepare(context->sql,
"UPDATE jobs SET desired_timestamp=0, param=? WHERE id=?;");
sqlite3_bind_text (update_stmt, 1, job->param->packed, -1, SQLITE_STATIC);
sqlite3_bind_int (update_stmt, 2, job->job_id);
sqlite3_step(update_stmt);
sqlite3_finalize(update_stmt);
sqlite3_stmt* stmt = dc_sqlite3_prepare(context->sql,
"UPDATE jobs"
" SET desired_timestamp=?, tries=?, param=?"
" WHERE id=?;");
sqlite3_bind_int64(stmt, 1, job->desired_timestamp);
sqlite3_bind_int64(stmt, 2, job->tries);
sqlite3_bind_text (stmt, 3, job->param->packed, -1, SQLITE_STATIC);
sqlite3_bind_int (stmt, 4, job->job_id);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
}
@ -494,7 +541,7 @@ void dc_job_kill_actions(dc_context_t* context, int action1, int action2)
}
static void dc_job_perform(dc_context_t* context, int thread)
static void dc_job_perform(dc_context_t* context, int thread, int probe_network)
{
sqlite3_stmt* select_stmt = NULL;
dc_job_t job;
@ -508,16 +555,37 @@ static void dc_job_perform(dc_context_t* context, int thread)
goto cleanup;
}
select_stmt = dc_sqlite3_prepare(context->sql,
"SELECT id, action, foreign_id, param FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;");
sqlite3_bind_int64(select_stmt, 1, thread);
sqlite3_bind_int64(select_stmt, 2, time(NULL));
if (probe_network==0) {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
#define FIELDS "id, action, foreign_id, param, added_timestamp, desired_timestamp, tries"
select_stmt = dc_sqlite3_prepare(context->sql,
"SELECT " FIELDS " FROM jobs"
" WHERE thread=? AND desired_timestamp<=?"
" ORDER BY action DESC, added_timestamp;");
sqlite3_bind_int64(select_stmt, 1, thread);
sqlite3_bind_int64(select_stmt, 2, time(NULL));
}
else {
// processing after call to dc_maybe_network():
// process _all_ pending jobs that failed before
// in the order of their backoff-times.
select_stmt = dc_sqlite3_prepare(context->sql,
"SELECT " FIELDS " FROM jobs"
" WHERE thread=? AND tries>0"
" ORDER BY desired_timestamp, action DESC;");
sqlite3_bind_int64(select_stmt, 1, thread);
}
while (sqlite3_step(select_stmt)==SQLITE_ROW)
{
job.job_id = sqlite3_column_int (select_stmt, 0);
job.action = sqlite3_column_int (select_stmt, 1);
job.foreign_id = sqlite3_column_int (select_stmt, 2);
dc_param_set_packed(job.param, (char*)sqlite3_column_text(select_stmt, 3));
job.job_id = sqlite3_column_int (select_stmt, 0);
job.action = sqlite3_column_int (select_stmt, 1);
job.foreign_id = sqlite3_column_int (select_stmt, 2);
dc_param_set_packed(job.param, (char*)sqlite3_column_text (select_stmt, 3));
job.added_timestamp = sqlite3_column_int64(select_stmt, 4);
job.desired_timestamp = sqlite3_column_int64(select_stmt, 5);
job.tries = sqlite3_column_int (select_stmt, 6);
dc_log_info(context, 0, "%s-job #%i, action %i started...", THREAD_STR, (int)job.job_id, (int)job.action);
@ -562,20 +630,19 @@ static void dc_job_perform(dc_context_t* context, int thread)
}
else if (job.try_again==DC_AT_ONCE || job.try_again==DC_STANDARD_DELAY)
{
// Define the number of job-retries, each retry may result in 2 tries (for fast network-failure-recover).
// The first job-retries are done asap, the last retry is delayed about a minute.
// Network errors do not count as failed tries.
#define JOB_RETRIES 3
int tries = job.tries + 1;
int is_online = dc_is_online(context)? 1 : 0;
int tries_while_online = dc_param_get_int(job.param, DC_PARAM_TIMES, 0) + is_online;
if( tries < JOB_RETRIES ) {
job.tries = tries;
time_t time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = job.added_timestamp + time_offset;
if( tries_while_online < JOB_RETRIES ) {
dc_param_set_int(job.param, DC_PARAM_TIMES, tries_while_online);
dc_job_update(context, &job);
dc_log_info(context, 0, "%s-job #%i not succeeded on try #%i.", THREAD_STR, (int)job.job_id, tries_while_online);
dc_log_info(context, 0, "%s-job #%i not succeeded on try #%i, retry in ADD_TIME+%i (in %i seconds).", THREAD_STR, (int)job.job_id,
tries, time_offset, (job.added_timestamp+time_offset)-time(NULL));
if (thread==DC_SMTP_THREAD && is_online && tries_while_online<(JOB_RETRIES-1)) {
if (thread==DC_SMTP_THREAD && tries<(JOB_RETRIES-1)) {
pthread_mutex_lock(&context->smtpidle_condmutex);
context->perform_smtp_jobs_needed = DC_JOBS_NEEDED_AVOID_DOS;
pthread_mutex_unlock(&context->smtpidle_condmutex);
@ -587,6 +654,14 @@ static void dc_job_perform(dc_context_t* context, int thread)
}
dc_job_delete(context, &job);
}
if (probe_network) {
// on dc_maybe_network() we stop trying here;
// these jobs are already tried once.
// otherwise, we just continue with the next job
// to give other jobs a chance being tried at least once.
goto cleanup;
}
}
else
{
@ -622,10 +697,13 @@ void dc_perform_imap_jobs(dc_context_t* context)
dc_log_info(context, 0, "IMAP-jobs started...");
pthread_mutex_lock(&context->imapidle_condmutex);
int probe_imap_network = context->probe_imap_network;
context->probe_imap_network = 0;
context->perform_imap_jobs_needed = 0;
pthread_mutex_unlock(&context->imapidle_condmutex);
dc_job_perform(context, DC_IMAP_THREAD);
dc_job_perform(context, DC_IMAP_THREAD, probe_imap_network);
dc_log_info(context, 0, "IMAP-jobs ended.");
}
@ -779,6 +857,9 @@ void dc_interrupt_imap_idle(dc_context_t* context)
void dc_perform_smtp_jobs(dc_context_t* context)
{
pthread_mutex_lock(&context->smtpidle_condmutex);
int probe_smtp_network = context->probe_smtp_network;
context->probe_smtp_network = 0;
context->perform_smtp_jobs_needed = 0;
if (context->smtp_suspended) {
dc_log_info(context, 0, "SMTP-jobs suspended.");
@ -789,7 +870,7 @@ void dc_perform_smtp_jobs(dc_context_t* context)
pthread_mutex_unlock(&context->smtpidle_condmutex);
dc_log_info(context, 0, "SMTP-jobs started...");
dc_job_perform(context, DC_SMTP_THREAD);
dc_job_perform(context, DC_SMTP_THREAD, probe_smtp_network);
dc_log_info(context, 0, "SMTP-jobs ended.");
pthread_mutex_lock(&context->smtpidle_condmutex);
@ -829,7 +910,7 @@ void dc_perform_smtp_idle(dc_context_t* context)
int r = 0;
struct timespec wakeup_at;
memset(&wakeup_at, 0, sizeof(wakeup_at));
wakeup_at.tv_sec = time(NULL) + ((context->perform_smtp_jobs_needed==DC_JOBS_NEEDED_AVOID_DOS)? 2 : DC_SMTP_IDLE_SEC);
wakeup_at.tv_sec = get_next_wakeup_time(context, DC_SMTP_THREAD)+1;
while (context->smtpidle_condflag==0 && r==0) {
r = pthread_cond_timedwait(&context->smtpidle_cond, &context->smtpidle_condmutex, &wakeup_at); // unlock mutex -> wait -> lock mutex
}
@ -899,3 +980,31 @@ void dc_interrupt_smtp_idle(dc_context_t* context)
pthread_mutex_unlock(&context->smtpidle_condmutex);
}
/**
* This function can be called whenever there is a hint
* that the network is available again.
* The library will try to send pending messages out.
*
* @memberof dc_context_t
* @param context The context as created by dc_context_new().
* @return None.
*/
void dc_maybe_network(dc_context_t* context)
{
// the following flags are forwarded to dc_job_perform() and make sure,
// sending is tried independingly of retry-count or timeouts.
// if the first messages comes through, the others are be retried as well.
pthread_mutex_lock(&context->smtpidle_condmutex);
context->probe_smtp_network = 1;
pthread_mutex_unlock(&context->smtpidle_condmutex);
pthread_mutex_lock(&context->imapidle_condmutex);
context->probe_imap_network = 1;
pthread_mutex_unlock(&context->imapidle_condmutex);
dc_interrupt_smtp_idle(context);
dc_interrupt_imap_idle(context);
}

View file

@ -30,12 +30,6 @@ extern "C" {
#define DC_SMTP_TIMEOUT_SEC 10
// this is the timeout after which dc_perform_smtp_idle() returns at latest.
// this timeout should not be too large as this might be the only option to perform
// jobs that failed on the first execution.
#define DC_SMTP_IDLE_SEC 60
/**
* Library-internal.
*/
@ -46,6 +40,9 @@ typedef struct dc_job_t
uint32_t job_id;
int action;
uint32_t foreign_id;
time_t desired_timestamp;
time_t added_timestamp;
int tries;
dc_param_t* param;
int try_again;

View file

@ -40,7 +40,6 @@ typedef struct dc_param_t
#define DC_PARAM_SERVER_FOLDER 'Z' /* for jobs */
#define DC_PARAM_SERVER_UID 'z' /* for jobs */
#define DC_PARAM_TIMES 't' /* for jobs: times a job was tried */
#define DC_PARAM_UNPROMOTED 'U' /* for groups */
#define DC_PARAM_PROFILE_IMAGE 'i' /* for groups and contacts */

View file

@ -484,6 +484,15 @@ int dc_sqlite3_open(dc_sqlite3_t* sql, const char* dbfile, int flags)
}
#undef NEW_DB_VERSION
#define NEW_DB_VERSION 47
if (dbversion < NEW_DB_VERSION)
{
dc_sqlite3_execute(sql, "ALTER TABLE jobs ADD COLUMN tries INTEGER DEFAULT 0;");
dbversion = NEW_DB_VERSION;
dc_sqlite3_set_config_int(sql, "dbversion", NEW_DB_VERSION);
}
#undef NEW_DB_VERSION
// (2) updates that require high-level objects
// (the structure is complete now and all objects are usable)

View file

@ -227,6 +227,8 @@ void dc_perform_smtp_jobs (dc_context_t*);
void dc_perform_smtp_idle (dc_context_t*);
void dc_interrupt_smtp_idle (dc_context_t*);
void dc_maybe_network (dc_context_t*);
// handle chatlists
#define DC_GCL_ARCHIVED_ONLY 0x01

View file

@ -39,7 +39,7 @@ lib_src = [
'dc_tools.c',
]
lib_deps = [pthreads, zlib, openssl, sasl, sqlite, etpan, netpgp]
lib_deps = [pthreads, zlib, openssl, sasl, sqlite, etpan, netpgp, math]
lib_inc = include_directories('.')
lib = library(
'deltachat', lib_src,