Resolve "Per-user libraries" (use !368 instead)

This commit is contained in:
Eliot Berriot 2018-09-06 18:35:02 +00:00
parent b0ca181016
commit 2ea21994ee
144 changed files with 6749 additions and 5347 deletions

View file

@ -1,92 +1,23 @@
import datetime
import json
import logging
import os
from django.conf import settings
from django.db.models import Q
from django.db.models import Q, F
from django.utils import timezone
from dynamic_preferences.registries import global_preferences_registry
from requests.exceptions import RequestException
from funkwhale_api.common import session
from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery
from . import actors
from . import library as lb
from . import models, signing
from . import routes
logger = logging.getLogger(__name__)
@celery.app.task(
name="federation.send",
autoretry_for=[RequestException],
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(models.Actor, "actor")
def send(activity, actor, to):
logger.info("Preparing activity delivery to %s", to)
auth = signing.get_auth(actor.private_key, actor.private_key_id)
for url in to:
recipient_actor = actors.get_actor(url)
logger.debug("delivering to %s", recipient_actor.inbox_url)
logger.debug("activity content: %s", json.dumps(activity))
response = session.get_session().post(
auth=auth,
json=activity,
url=recipient_actor.inbox_url,
timeout=5,
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
headers={"Content-Type": "application/activity+json"},
)
response.raise_for_status()
logger.debug("Remote answered with %s", response.status_code)
@celery.app.task(
name="federation.scan_library",
autoretry_for=[RequestException],
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(models.Library, "library")
def scan_library(library, until=None):
if not library.federation_enabled:
return
data = lb.get_library_data(library.url)
scan_library_page.delay(library_id=library.id, page_url=data["first"], until=until)
library.fetched_date = timezone.now()
library.tracks_count = data["totalItems"]
library.save(update_fields=["fetched_date", "tracks_count"])
@celery.app.task(
name="federation.scan_library_page",
autoretry_for=[RequestException],
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(models.Library, "library")
def scan_library_page(library, page_url, until=None):
if not library.federation_enabled:
return
data = lb.get_library_page(library, page_url)
lts = []
for item_serializer in data["items"]:
item_date = item_serializer.validated_data["published"]
if until and item_date < until:
return
lts.append(item_serializer.save())
next_page = data.get("next")
if next_page and next_page != page_url:
scan_library_page.delay(library_id=library.id, page_url=next_page)
@celery.app.task(name="federation.clean_music_cache")
def clean_music_cache():
preferences = global_preferences_registry.manager()
@ -96,23 +27,22 @@ def clean_music_cache():
limit = timezone.now() - datetime.timedelta(minutes=delay)
candidates = (
models.LibraryTrack.objects.filter(
music_models.TrackFile.objects.filter(
Q(audio_file__isnull=False)
& (
Q(local_track_file__accessed_date__lt=limit)
| Q(local_track_file__accessed_date=None)
)
& (Q(accessed_date__lt=limit) | Q(accessed_date=None))
)
.local(False)
.exclude(audio_file="")
.only("audio_file", "id")
.order_by("id")
)
for lt in candidates:
lt.audio_file.delete()
for tf in candidates:
tf.audio_file.delete()
# we also delete orphaned files, if any
storage = models.LibraryTrack._meta.get_field("audio_file").storage
files = get_files(storage, "federation_cache")
existing = models.LibraryTrack.objects.filter(audio_file__in=files)
files = get_files(storage, "federation_cache/tracks")
existing = music_models.TrackFile.objects.filter(audio_file__in=files)
missing = set(files) - set(existing.values_list("audio_file", flat=True))
for m in missing:
storage.delete(m)
@ -130,3 +60,100 @@ def get_files(storage, *parts):
for dir in dirs:
files += get_files(storage, *(list(parts) + [dir]))
return [os.path.join(parts[-1], path) for path in files]
@celery.app.task(name="federation.dispatch_inbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_inbox(activity):
"""
Given an activity instance, triggers our internal delivery logic (follow
creation, etc.)
"""
try:
routes.inbox.dispatch(
activity.payload,
context={
"actor": activity.actor,
"inbox_items": list(activity.inbox_items.local().select_related()),
},
)
except Exception:
activity.inbox_items.local().update(
delivery_attempts=F("delivery_attempts") + 1,
last_delivery_date=timezone.now(),
)
raise
else:
activity.inbox_items.local().update(
delivery_attempts=F("delivery_attempts") + 1,
last_delivery_date=timezone.now(),
is_delivered=True,
)
@celery.app.task(name="federation.dispatch_outbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_outbox(activity):
"""
Deliver a local activity to its recipients
"""
inbox_items = activity.inbox_items.all().select_related("actor")
local_recipients_items = [ii for ii in inbox_items if ii.actor.is_local]
if local_recipients_items:
dispatch_inbox.delay(activity_id=activity.pk)
remote_recipients_items = [ii for ii in inbox_items if not ii.actor.is_local]
shared_inbox_urls = {
ii.actor.shared_inbox_url
for ii in remote_recipients_items
if ii.actor.shared_inbox_url
}
inbox_urls = {
ii.actor.inbox_url
for ii in remote_recipients_items
if not ii.actor.shared_inbox_url
}
for url in shared_inbox_urls:
deliver_to_remote_inbox.delay(activity_id=activity.pk, shared_inbox_url=url)
for url in inbox_urls:
deliver_to_remote_inbox.delay(activity_id=activity.pk, inbox_url=url)
@celery.app.task(
name="federation.deliver_to_remote_inbox",
autoretry_for=[RequestException],
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def deliver_to_remote_inbox(activity, inbox_url=None, shared_inbox_url=None):
url = inbox_url or shared_inbox_url
actor = activity.actor
inbox_items = activity.inbox_items.filter(is_delivered=False)
if inbox_url:
inbox_items = inbox_items.filter(actor__inbox_url=inbox_url)
else:
inbox_items = inbox_items.filter(actor__shared_inbox_url=shared_inbox_url)
logger.info("Preparing activity delivery to %s", url)
auth = signing.get_auth(actor.private_key, actor.private_key_id)
try:
response = session.get_session().post(
auth=auth,
json=activity.payload,
url=url,
timeout=5,
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
headers={"Content-Type": "application/activity+json"},
)
logger.debug("Remote answered with %s", response.status_code)
response.raise_for_status()
except Exception:
inbox_items.update(
last_delivery_date=timezone.now(),
delivery_attempts=F("delivery_attempts") + 1,
)
raise
else:
inbox_items.update(last_delivery_date=timezone.now(), is_delivered=True)