1
0
Fork 0
mirror of https://github.com/deltachat/deltachat-core.git synced 2025-10-05 10:39:27 +02:00

improve docs and structure

see online snapshot: https://m.devpi.net/hpk/dev/deltachat/0.5.dev0/+doc/index.html
This commit is contained in:
holger krekel 2018-09-14 18:10:01 +02:00
parent 099306408e
commit 8683de19e3
11 changed files with 253 additions and 213 deletions

View file

@ -1,4 +1,4 @@
""" Delta.Chat high level API objects. """
""" Delta.Chat Account class. """
from __future__ import print_function
import threading
@ -12,192 +12,11 @@ except ImportError:
import deltachat
from . import capi
from .cutil import convert_to_bytes_utf8, ffi_unicode, iter_array_and_unref
from .capi import ffi, lib
from .types import cached_property, property_with_doc
import attr
from attr import validators as v
@attr.s
class EventHandler(object):
dc_context = attr.ib(validator=v.instance_of(ffi.CData))
def read_url(self, url):
try:
r = requests.get(url)
except requests.ConnectionError:
return ''
else:
return r.content
def dc_event_http_get(self, data1, data2):
url = data1
content = self.read_url(url)
if not isinstance(content, bytes):
content = content.encode("utf8")
# we need to return a fresh pointer that the core owns
return capi.lib.dupstring_helper(content)
def dc_event_is_offline(self, data1, data2):
return 0 # always online
class EventLogger:
def __init__(self, dc_context, logid=None, debug=True):
self.dc_context = dc_context
self._event_queue = Queue()
self._debug = debug
if logid is None:
logid = str(self.dc_context).strip(">").split()[-1]
self.logid = logid
self._timeout = None
def __call__(self, evt_name, data1, data2):
self._log_event(evt_name, data1, data2)
self._event_queue.put((evt_name, data1, data2))
def set_timeout(self, timeout):
self._timeout = timeout
def get(self, timeout=None, check_error=True):
timeout = timeout or self._timeout
ev = self._event_queue.get(timeout=timeout)
if check_error and ev[0] == "DC_EVENT_ERROR":
raise ValueError("{}({!r},{!r})".format(*ev))
return ev
def get_matching(self, event_name_regex):
rex = re.compile("(?:{}).*".format(event_name_regex))
while 1:
ev = self.get()
if rex.match(ev[0]):
return ev
def _log_event(self, evt_name, data1, data2):
if self._debug:
t = threading.currentThread()
tname = getattr(t, "name", t)
print("[{}-{}] {}({!r},{!r})".format(
tname, self.logid, evt_name, data1, data2))
@attr.s
class Contact(object):
""" Delta-Chat Contact. You obtain instances of it through the :class:`Account`.
:ivar id: integer id of this chat.
"""
dc_context = attr.ib(validator=v.instance_of(ffi.CData))
id = attr.ib(validator=v.instance_of(int))
@cached_property # only get it once because we only free it once
def dc_contact_t(self):
return capi.lib.dc_get_contact(self.dc_context, self.id)
def __del__(self):
if lib is not None and hasattr(self, "_property_cache"):
lib.dc_contact_unref(self.dc_contact_t)
@property_with_doc
def addr(self):
""" normalized e-mail address for this account. """
return ffi_unicode(capi.lib.dc_contact_get_addr(self.dc_contact_t))
@property_with_doc
def display_name(self):
""" display name for this contact. """
return ffi_unicode(capi.lib.dc_contact_get_display_name(self.dc_contact_t))
def is_blocked(self):
""" Return True if the contact is blocked. """
return capi.lib.dc_contact_is_blocked(self.dc_contact_t)
def is_verified(self):
""" Return True if the contact is verified. """
return capi.lib.dc_contact_is_verified(self.dc_contact_t)
@attr.s
class Chat(object):
""" Chat object which manages members and through which you can send and retrieve messages.
"""
dc_context = attr.ib(validator=v.instance_of(ffi.CData))
id = attr.ib(validator=v.instance_of(int))
@cached_property
def dc_chat_t(self):
return capi.lib.dc_get_chat(self.dc_context, self.id)
def __del__(self):
if lib is not None and hasattr(self, "_property_cache"):
lib.dc_chat_unref(self.dc_chat_t)
def is_deaddrop(self):
""" return true if this chat is a deaddrop chat. """
return self.id == lib.DC_CHAT_ID_DEADDROP
def send_text_message(self, msg):
""" send a text message and return the resulting Message instance.
:param msg: unicode text
:returns: the resulting :class:`Message` instance
"""
msg = convert_to_bytes_utf8(msg)
print ("chat id", self.id)
msg_id = capi.lib.dc_send_text_msg(self.dc_context, self.id, msg)
return Message(self.dc_context, msg_id)
def get_messages(self):
""" return list of messages in this chat.
:returns: list of :class:`Message` objects for this chat.
"""
dc_array_t = lib.dc_get_chat_msgs(self.dc_context, self.id, 0, 0)
return list(iter_array_and_unref(dc_array_t, lambda x: Message(self.dc_context, x)))
def count_fresh_messages(self):
""" return number of fresh messages in this chat.
:returns: number of fresh messages
"""
return lib.dc_get_fresh_msg_cnt(self.dc_context, self.id)
def mark_noticed(self):
""" mark all messages in this chat as noticed.
Noticed messages are no longer fresh.
"""
return lib.dc_marknoticed_chat(self.dc_context, self.id)
@attr.s
class Message(object):
""" Message object. """
dc_context = attr.ib(validator=v.instance_of(ffi.CData))
id = attr.ib(validator=v.instance_of(int))
@cached_property
def dc_msg_t(self):
return capi.lib.dc_get_msg(self.dc_context, self.id)
def __del__(self):
if lib is not None and hasattr(self, "_property_cache"):
lib.dc_msg_unref(self.dc_msg_t)
@property_with_doc
def text(self):
"""unicode representation. """
return ffi_unicode(capi.lib.dc_msg_get_text(self.dc_msg_t))
@property
def chat(self):
"""chat this message was posted in.
:returns: :class:`Chat` object
"""
chat_id = capi.lib.dc_msg_get_chat_id(self.dc_msg_t)
return Chat(self.dc_context, chat_id)
from .chatting import Contact, Chat, Message
class Account(object):
@ -402,21 +221,64 @@ class IOThreads:
capi.lib.dc_perform_smtp_idle(self.dc_context)
def convert_to_bytes_utf8(obj):
if obj == ffi.NULL:
return obj
if not isinstance(obj, bytes):
return obj.encode("utf8")
return obj
@attr.s
class EventHandler(object):
dc_context = attr.ib(validator=v.instance_of(ffi.CData))
def read_url(self, url):
try:
r = requests.get(url)
except requests.ConnectionError:
return ''
else:
return r.content
def dc_event_http_get(self, data1, data2):
url = data1
content = self.read_url(url)
if not isinstance(content, bytes):
content = content.encode("utf8")
# we need to return a fresh pointer that the core owns
return capi.lib.dupstring_helper(content)
def dc_event_is_offline(self, data1, data2):
return 0 # always online
def iter_array_and_unref(dc_array_t, constructor):
try:
for i in range(0, lib.dc_array_get_cnt(dc_array_t)):
yield constructor(lib.dc_array_get_id(dc_array_t, i))
finally:
lib.dc_array_unref(dc_array_t)
class EventLogger:
def __init__(self, dc_context, logid=None, debug=True):
self.dc_context = dc_context
self._event_queue = Queue()
self._debug = debug
if logid is None:
logid = str(self.dc_context).strip(">").split()[-1]
self.logid = logid
self._timeout = None
def __call__(self, evt_name, data1, data2):
self._log_event(evt_name, data1, data2)
self._event_queue.put((evt_name, data1, data2))
def ffi_unicode(obj):
return ffi.string(obj).decode("utf8")
def set_timeout(self, timeout):
self._timeout = timeout
def get(self, timeout=None, check_error=True):
timeout = timeout or self._timeout
ev = self._event_queue.get(timeout=timeout)
if check_error and ev[0] == "DC_EVENT_ERROR":
raise ValueError("{}({!r},{!r})".format(*ev))
return ev
def get_matching(self, event_name_regex):
rex = re.compile("(?:{}).*".format(event_name_regex))
while 1:
ev = self.get()
if rex.match(ev[0]):
return ev
def _log_event(self, evt_name, data1, data2):
if self._debug:
t = threading.currentThread()
tname = getattr(t, "name", t)
print("[{}-{}] {}({!r},{!r})".format(
tname, self.logid, evt_name, data1, data2))