mirror of
https://github.com/deltachat/deltachat-core.git
synced 2025-10-04 18:29:19 +02:00
70 lines
1.9 KiB
Python
70 lines
1.9 KiB
Python
import pytest
|
|
|
|
import deltachat
|
|
from deltachat import capi
|
|
import threading
|
|
|
|
|
|
@pytest.fixture
|
|
def register_dc_callback(monkeypatch):
|
|
"""Register a callback for a given context.
|
|
|
|
This is a function-scoped fixture and the function will be
|
|
unregisterd automatically on fixture teardown.
|
|
"""
|
|
def register_dc_callback(ctx, func):
|
|
monkeypatch.setitem(deltachat._DC_CALLBACK_MAP, ctx, func)
|
|
return register_dc_callback
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
parser.addoption("--user", action="store", default=None,
|
|
help="user and domain of test account: example user@example.org")
|
|
parser.addoption("--password", action="store", default=None)
|
|
|
|
|
|
@pytest.fixture
|
|
def userpassword(pytestconfig):
|
|
user = pytestconfig.getoption("--user")
|
|
passwd = pytestconfig.getoption("--password")
|
|
if user and passwd:
|
|
return user, passwd
|
|
pytest.skip("specify a test account with --user and --password options")
|
|
|
|
|
|
|
|
def imap_thread(context, quitflag):
|
|
print ("starting imap thread")
|
|
while not quitflag.is_set():
|
|
capi.lib.dc_perform_imap_jobs(context)
|
|
capi.lib.dc_perform_imap_fetch(context)
|
|
capi.lib.dc_perform_imap_idle(context)
|
|
|
|
|
|
def smtp_thread(context, quitflag):
|
|
print ("starting smtp thread")
|
|
while not quitflag.is_set():
|
|
capi.lib.dc_perform_smtp_jobs(context)
|
|
capi.lib.dc_perform_smtp_idle(context)
|
|
|
|
|
|
@pytest.fixture
|
|
def dc_context():
|
|
ctx = capi.lib.dc_context_new(capi.lib.py_dc_callback,
|
|
capi.ffi.NULL, capi.ffi.NULL)
|
|
yield ctx
|
|
capi.lib.dc_close(ctx)
|
|
|
|
|
|
@pytest.fixture
|
|
def dc_threads(dc_context):
|
|
quitflag = threading.Event()
|
|
t1 = threading.Thread(target=imap_thread, name="imap", args=[dc_context, quitflag])
|
|
t1.setDaemon(1)
|
|
t1.start()
|
|
t2 = threading.Thread(target=smtp_thread, name="smtp", args=[dc_context, quitflag])
|
|
t2.setDaemon(1)
|
|
t2.start()
|
|
yield
|
|
quitflag.set()
|
|
|