mirror of
https://code.eliotberriot.com/funkwhale/funkwhale.git
synced 2025-10-04 11:09:16 +02:00
See #170: fetching remote objects
This commit is contained in:
parent
65097f6297
commit
c2eeee5eb1
30 changed files with 1173 additions and 169 deletions
|
@ -14,6 +14,7 @@ from requests.exceptions import RequestException
|
|||
from funkwhale_api.common import preferences
|
||||
from funkwhale_api.common import session
|
||||
from funkwhale_api.common import utils as common_utils
|
||||
from funkwhale_api.moderation import mrf
|
||||
from funkwhale_api.music import models as music_models
|
||||
from funkwhale_api.taskapp import celery
|
||||
|
||||
|
@ -24,6 +25,7 @@ from . import models, signing
|
|||
from . import serializers
|
||||
from . import routes
|
||||
from . import utils
|
||||
from . import webfinger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -285,24 +287,45 @@ def rotate_actor_key(actor):
|
|||
@celery.app.task(name="federation.fetch")
|
||||
@transaction.atomic
|
||||
@celery.require_instance(
|
||||
models.Fetch.objects.filter(status="pending").select_related("actor"), "fetch"
|
||||
models.Fetch.objects.filter(status="pending").select_related("actor"),
|
||||
"fetch_obj",
|
||||
"fetch_id",
|
||||
)
|
||||
def fetch(fetch):
|
||||
actor = fetch.actor
|
||||
auth = signing.get_auth(actor.private_key, actor.private_key_id)
|
||||
|
||||
def fetch(fetch_obj):
|
||||
def error(code, **kwargs):
|
||||
fetch.status = "errored"
|
||||
fetch.fetch_date = timezone.now()
|
||||
fetch.detail = {"error_code": code}
|
||||
fetch.detail.update(kwargs)
|
||||
fetch.save(update_fields=["fetch_date", "status", "detail"])
|
||||
fetch_obj.status = "errored"
|
||||
fetch_obj.fetch_date = timezone.now()
|
||||
fetch_obj.detail = {"error_code": code}
|
||||
fetch_obj.detail.update(kwargs)
|
||||
fetch_obj.save(update_fields=["fetch_date", "status", "detail"])
|
||||
|
||||
url = fetch_obj.url
|
||||
mrf_check_url = url
|
||||
if not mrf_check_url.startswith("webfinger://"):
|
||||
payload, updated = mrf.inbox.apply({"id": mrf_check_url})
|
||||
if not payload:
|
||||
return error("blocked", message="Blocked by MRF")
|
||||
|
||||
actor = fetch_obj.actor
|
||||
if settings.FEDERATION_AUTHENTIFY_FETCHES:
|
||||
auth = signing.get_auth(actor.private_key, actor.private_key_id)
|
||||
else:
|
||||
auth = None
|
||||
try:
|
||||
if url.startswith("webfinger://"):
|
||||
# we first grab the correpsonding webfinger representation
|
||||
# to get the ActivityPub actor ID
|
||||
webfinger_data = webfinger.get_resource(
|
||||
"acct:" + url.replace("webfinger://", "")
|
||||
)
|
||||
url = webfinger.get_ap_url(webfinger_data["links"])
|
||||
if not url:
|
||||
return error("webfinger", message="Invalid or missing webfinger data")
|
||||
payload, updated = mrf.inbox.apply({"id": url})
|
||||
if not payload:
|
||||
return error("blocked", message="Blocked by MRF")
|
||||
response = session.get_session().get(
|
||||
auth=auth,
|
||||
url=fetch.url,
|
||||
headers={"Content-Type": "application/activity+json"},
|
||||
auth=auth, url=url, headers={"Accept": "application/activity+json"},
|
||||
)
|
||||
logger.debug("Remote answered with %s", response.status_code)
|
||||
response.raise_for_status()
|
||||
|
@ -320,8 +343,19 @@ def fetch(fetch):
|
|||
try:
|
||||
payload = response.json()
|
||||
except json.decoder.JSONDecodeError:
|
||||
# we attempt to extract a <link rel=alternate> that points
|
||||
# to an activity pub resource, if possible, and retry with this URL
|
||||
alternate_url = utils.find_alternate(response.text)
|
||||
if alternate_url:
|
||||
fetch_obj.url = alternate_url
|
||||
fetch_obj.save(update_fields=["url"])
|
||||
return fetch(fetch_id=fetch_obj.pk)
|
||||
return error("invalid_json")
|
||||
|
||||
payload, updated = mrf.inbox.apply(payload)
|
||||
if not payload:
|
||||
return error("blocked", message="Blocked by MRF")
|
||||
|
||||
try:
|
||||
doc = jsonld.expand(payload)
|
||||
except ValueError:
|
||||
|
@ -332,13 +366,13 @@ def fetch(fetch):
|
|||
except IndexError:
|
||||
return error("missing_jsonld_type")
|
||||
try:
|
||||
serializer_class = fetch.serializers[type]
|
||||
serializer_class = fetch_obj.serializers[type]
|
||||
model = serializer_class.Meta.model
|
||||
except (KeyError, AttributeError):
|
||||
fetch.status = "skipped"
|
||||
fetch.fetch_date = timezone.now()
|
||||
fetch.detail = {"reason": "unhandled_type", "type": type}
|
||||
return fetch.save(update_fields=["fetch_date", "status", "detail"])
|
||||
fetch_obj.status = "skipped"
|
||||
fetch_obj.fetch_date = timezone.now()
|
||||
fetch_obj.detail = {"reason": "unhandled_type", "type": type}
|
||||
return fetch_obj.save(update_fields=["fetch_date", "status", "detail"])
|
||||
try:
|
||||
id = doc.get("@id")
|
||||
except IndexError:
|
||||
|
@ -350,11 +384,14 @@ def fetch(fetch):
|
|||
if not serializer.is_valid():
|
||||
return error("validation", validation_errors=serializer.errors)
|
||||
try:
|
||||
serializer.save()
|
||||
obj = serializer.save()
|
||||
except Exception as e:
|
||||
error("save", message=str(e))
|
||||
raise
|
||||
|
||||
fetch.status = "finished"
|
||||
fetch.fetch_date = timezone.now()
|
||||
return fetch.save(update_fields=["fetch_date", "status"])
|
||||
fetch_obj.object = obj
|
||||
fetch_obj.status = "finished"
|
||||
fetch_obj.fetch_date = timezone.now()
|
||||
return fetch_obj.save(
|
||||
update_fields=["fetch_date", "status", "object_id", "object_content_type"]
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue