mirror of
https://code.eliotberriot.com/funkwhale/funkwhale.git
synced 2025-10-04 15:39:16 +02:00
Logic to refetch remote entities
This commit is contained in:
parent
63b1007596
commit
cdc617be27
23 changed files with 632 additions and 9 deletions
|
@ -1,9 +1,11 @@
|
|||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import requests
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
from django.db.models import Q, F
|
||||
from django.utils import timezone
|
||||
from dynamic_preferences.registries import global_preferences_registry
|
||||
|
@ -16,6 +18,7 @@ from funkwhale_api.music import models as music_models
|
|||
from funkwhale_api.taskapp import celery
|
||||
|
||||
from . import actors
|
||||
from . import jsonld
|
||||
from . import keys
|
||||
from . import models, signing
|
||||
from . import serializers
|
||||
|
@ -278,3 +281,83 @@ def rotate_actor_key(actor):
|
|||
actor.private_key = pair[0].decode()
|
||||
actor.public_key = pair[1].decode()
|
||||
actor.save(update_fields=["private_key", "public_key"])
|
||||
|
||||
|
||||
@celery.app.task(name="federation.fetch")
|
||||
@transaction.atomic
|
||||
@celery.require_instance(
|
||||
models.Fetch.objects.filter(status="pending").select_related("actor"), "fetch"
|
||||
)
|
||||
def fetch(fetch):
|
||||
actor = fetch.actor
|
||||
auth = signing.get_auth(actor.private_key, actor.private_key_id)
|
||||
|
||||
def error(code, **kwargs):
|
||||
fetch.status = "errored"
|
||||
fetch.fetch_date = timezone.now()
|
||||
fetch.detail = {"error_code": code}
|
||||
fetch.detail.update(kwargs)
|
||||
fetch.save(update_fields=["fetch_date", "status", "detail"])
|
||||
|
||||
try:
|
||||
response = session.get_session().get(
|
||||
auth=auth,
|
||||
url=fetch.url,
|
||||
timeout=5,
|
||||
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
|
||||
headers={"Content-Type": "application/activity+json"},
|
||||
)
|
||||
logger.debug("Remote answered with %s", response.status_code)
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
return error("http", status_code=e.response.status_code if e.response else None)
|
||||
except requests.exceptions.Timeout:
|
||||
return error("timeout")
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
return error("connection", message=str(e))
|
||||
except requests.RequestException as e:
|
||||
return error("request", message=str(e))
|
||||
except Exception as e:
|
||||
return error("unhandled", message=str(e))
|
||||
|
||||
try:
|
||||
payload = response.json()
|
||||
except json.decoder.JSONDecodeError:
|
||||
return error("invalid_json")
|
||||
|
||||
try:
|
||||
doc = jsonld.expand(payload)
|
||||
except ValueError:
|
||||
return error("invalid_jsonld")
|
||||
|
||||
try:
|
||||
type = doc.get("@type", [])[0]
|
||||
except IndexError:
|
||||
return error("missing_jsonld_type")
|
||||
try:
|
||||
serializer_class = fetch.serializers[type]
|
||||
model = serializer_class.Meta.model
|
||||
except (KeyError, AttributeError):
|
||||
fetch.status = "skipped"
|
||||
fetch.fetch_date = timezone.now()
|
||||
fetch.detail = {"reason": "unhandled_type", "type": type}
|
||||
return fetch.save(update_fields=["fetch_date", "status", "detail"])
|
||||
try:
|
||||
id = doc.get("@id")
|
||||
except IndexError:
|
||||
existing = None
|
||||
else:
|
||||
existing = model.objects.filter(fid=id).first()
|
||||
|
||||
serializer = serializer_class(existing, data=payload)
|
||||
if not serializer.is_valid():
|
||||
return error("validation", validation_errors=serializer.errors)
|
||||
try:
|
||||
serializer.save()
|
||||
except Exception as e:
|
||||
error("save", message=str(e))
|
||||
raise
|
||||
|
||||
fetch.status = "finished"
|
||||
fetch.fetch_date = timezone.now()
|
||||
return fetch.save(update_fields=["fetch_date", "status"])
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue