mirror of
https://code.eliotberriot.com/funkwhale/funkwhale.git
synced 2025-10-04 04:09:16 +02:00
Library follows and user notifications
This commit is contained in:
parent
a879993280
commit
ecd395d6b0
41 changed files with 1191 additions and 347 deletions
|
@ -1,7 +1,13 @@
|
|||
import uuid
|
||||
import logging
|
||||
|
||||
from django.db import transaction, IntegrityError
|
||||
from django.utils import timezone
|
||||
|
||||
from funkwhale_api.common import channels
|
||||
from funkwhale_api.common import utils as funkwhale_utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
PUBLIC_ADDRESS = "https://www.w3.org/ns/activitystreams#Public"
|
||||
|
||||
ACTIVITY_TYPES = [
|
||||
|
@ -54,19 +60,10 @@ OBJECT_TYPES = [
|
|||
] + ACTIVITY_TYPES
|
||||
|
||||
|
||||
def deliver(activity, on_behalf_of, to=[]):
|
||||
from . import tasks
|
||||
|
||||
return tasks.send.delay(activity=activity, actor_id=on_behalf_of.pk, to=to)
|
||||
|
||||
|
||||
def accept_follow(follow):
|
||||
from . import serializers
|
||||
|
||||
serializer = serializers.AcceptFollowSerializer(follow)
|
||||
return deliver(serializer.data, to=[follow.actor.fid], on_behalf_of=follow.target)
|
||||
BROADCAST_TO_USER_ACTIVITIES = ["Follow", "Accept"]
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def receive(activity, on_behalf_of):
|
||||
from . import models
|
||||
from . import serializers
|
||||
|
@ -78,7 +75,14 @@ def receive(activity, on_behalf_of):
|
|||
data=activity, context={"actor": on_behalf_of, "local_recipients": True}
|
||||
)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
copy = serializer.save()
|
||||
try:
|
||||
copy = serializer.save()
|
||||
except IntegrityError:
|
||||
logger.warning(
|
||||
"[federation] Discarding already elivered activity %s",
|
||||
serializer.validated_data.get("id"),
|
||||
)
|
||||
return
|
||||
# we create inbox items for further delivery
|
||||
items = [
|
||||
models.InboxItem(activity=copy, actor=r, type="to")
|
||||
|
@ -93,7 +97,7 @@ def receive(activity, on_behalf_of):
|
|||
models.InboxItem.objects.bulk_create(items)
|
||||
# at this point, we have the activity in database. Even if we crash, it's
|
||||
# okay, as we can retry later
|
||||
tasks.dispatch_inbox.delay(activity_id=copy.pk)
|
||||
funkwhale_utils.on_commit(tasks.dispatch_inbox.delay, activity_id=copy.pk)
|
||||
return copy
|
||||
|
||||
|
||||
|
@ -113,17 +117,64 @@ class Router:
|
|||
|
||||
|
||||
class InboxRouter(Router):
|
||||
@transaction.atomic
|
||||
def dispatch(self, payload, context):
|
||||
"""
|
||||
Receives an Activity payload and some context and trigger our
|
||||
business logic
|
||||
"""
|
||||
from . import api_serializers
|
||||
from . import models
|
||||
|
||||
for route, handler in self.routes:
|
||||
if match_route(route, payload):
|
||||
return handler(payload, context=context)
|
||||
r = handler(payload, context=context)
|
||||
activity_obj = context.get("activity")
|
||||
if activity_obj and r:
|
||||
# handler returned additional data we can use
|
||||
# to update the activity target
|
||||
for key, value in r.items():
|
||||
setattr(activity_obj, key, value)
|
||||
|
||||
update_fields = []
|
||||
for k in r.keys():
|
||||
if k in ["object", "target", "related_object"]:
|
||||
update_fields += [
|
||||
"{}_id".format(k),
|
||||
"{}_content_type".format(k),
|
||||
]
|
||||
else:
|
||||
update_fields.append(k)
|
||||
activity_obj.save(update_fields=update_fields)
|
||||
|
||||
if payload["type"] not in BROADCAST_TO_USER_ACTIVITIES:
|
||||
return
|
||||
|
||||
inbox_items = context.get(
|
||||
"inbox_items", models.InboxItem.objects.none()
|
||||
)
|
||||
for ii in inbox_items:
|
||||
user = ii.actor.get_user()
|
||||
if not user:
|
||||
continue
|
||||
group = "user.{}.inbox".format(user.pk)
|
||||
channels.group_send(
|
||||
group,
|
||||
{
|
||||
"type": "event.send",
|
||||
"text": "",
|
||||
"data": {
|
||||
"type": "inbox.item_added",
|
||||
"item": api_serializers.InboxItemSerializer(ii).data,
|
||||
},
|
||||
},
|
||||
)
|
||||
inbox_items.update(is_delivered=True, last_delivery_date=timezone.now())
|
||||
return
|
||||
|
||||
|
||||
class OutboxRouter(Router):
|
||||
@transaction.atomic
|
||||
def dispatch(self, routing, context):
|
||||
"""
|
||||
Receives a routing payload and some business objects in the context
|
||||
|
@ -140,12 +191,11 @@ class OutboxRouter(Router):
|
|||
# a route can yield zero, one or more activity payloads
|
||||
if e:
|
||||
activities_data.append(e)
|
||||
|
||||
inbox_items_by_activity_uuid = {}
|
||||
prepared_activities = []
|
||||
for activity_data in activities_data:
|
||||
to = activity_data.pop("to", [])
|
||||
cc = activity_data.pop("cc", [])
|
||||
to = activity_data["payload"].pop("to", [])
|
||||
cc = activity_data["payload"].pop("cc", [])
|
||||
a = models.Activity(**activity_data)
|
||||
a.uuid = uuid.uuid4()
|
||||
to_items, new_to = prepare_inbox_items(to, "to")
|
||||
|
@ -160,7 +210,6 @@ class OutboxRouter(Router):
|
|||
prepared_activities.append(a)
|
||||
|
||||
activities = models.Activity.objects.bulk_create(prepared_activities)
|
||||
activities = [a for a in activities if a]
|
||||
|
||||
final_inbox_items = []
|
||||
for a in activities:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue