mirror of
https://code.eliotberriot.com/funkwhale/funkwhale.git
synced 2025-10-03 06:09:16 +02:00
Compare commits
21 commits
6e1b10d204
...
30aa85397f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
30aa85397f | ||
![]() |
da12e745bf | ||
![]() |
cab28a475c | ||
![]() |
2830134a75 | ||
![]() |
c5d8470a0e | ||
![]() |
4c779520ec | ||
![]() |
5555382f64 | ||
![]() |
5f8c01c065 | ||
![]() |
677c0274f4 | ||
![]() |
35ad9985e4 | ||
![]() |
3f619c1f00 | ||
![]() |
434cc031ea | ||
![]() |
91875150fb | ||
![]() |
7af11d3fdb | ||
![]() |
dc19381cd2 | ||
![]() |
ba91029b46 | ||
![]() |
ad7e9af1d2 | ||
![]() |
4186e65b05 | ||
![]() |
9b8df0ee7c | ||
![]() |
53e4a2464c | ||
![]() |
706815115a |
7 changed files with 252 additions and 3 deletions
|
@ -1,4 +1,5 @@
|
|||
from config import plugins
|
||||
import funkwhale_api
|
||||
from .funkwhale_startup import PLUGIN
|
||||
from .client import ListenBrainzClient, Track
|
||||
|
||||
|
@ -21,7 +22,10 @@ def get_track(track):
|
|||
title = track.title
|
||||
album = None
|
||||
additional_info = {
|
||||
"listening_from": "Funkwhale",
|
||||
"media_player": "Funkwhale",
|
||||
"media_player_version": funkwhale_api.__version__,
|
||||
"submission_client": "Funkwhale ListenBrainz plugin",
|
||||
"submission_client_version": PLUGIN["version"],
|
||||
"tracknumber": track.position,
|
||||
"discnumber": track.disc_number,
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ PLUGIN = plugins.get_plugin_config(
|
|||
label="ListenBrainz",
|
||||
description="A plugin that allows you to submit your listens to ListenBrainz.",
|
||||
homepage="https://docs.funkwhale.audio/users/builtinplugins.html#listenbrainz-plugin", # noqa
|
||||
version="0.1",
|
||||
version="0.2",
|
||||
user=True,
|
||||
conf=[
|
||||
{
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
from xml.etree.ElementTree import Element
|
||||
|
||||
from django.db.models.fields import CharField, IntegerField
|
||||
|
||||
from rest_framework import serializers
|
||||
|
||||
from funkwhale_api.federation import serializers as federation_serializers
|
||||
from funkwhale_api.music.models import Track
|
||||
from funkwhale_api.music.serializers import TrackSerializer
|
||||
from funkwhale_api.playlists import utils
|
||||
from funkwhale_api.playlists.models import Playlist
|
||||
from funkwhale_api.users.serializers import UserBasicSerializer
|
||||
|
||||
from . import models
|
||||
|
@ -115,3 +121,33 @@ class PlaylistAddManySerializer(serializers.Serializer):
|
|||
|
||||
class Meta:
|
||||
fields = "allow_duplicates"
|
||||
|
||||
|
||||
class XspfSerializer(serializers.Serializer):
|
||||
title = CharField()
|
||||
playlist_id = IntegerField()
|
||||
class Meta:
|
||||
fields = (
|
||||
"title",
|
||||
"playlist_id",
|
||||
)
|
||||
|
||||
def get_title():
|
||||
return "test"
|
||||
|
||||
def generate_xspf_from_playlist():
|
||||
"""
|
||||
This returns a string containing playlist data in xspf format
|
||||
"""
|
||||
fw_playlist = Playlist.objects.get(id=playlist_id)
|
||||
plt_tracks = fw_playlist.playlist_tracks.prefetch_related("track")
|
||||
xspf_playlist = Element("playlist")
|
||||
xspf_tracklist = utils.write_xspf_headers(
|
||||
xspf_playlist, fw_playlist.name, str(fw_playlist.creation_date)
|
||||
)
|
||||
|
||||
for plt_track in plt_tracks:
|
||||
track = plt_track.track
|
||||
utils.write_xspf_track_data(track, xspf_tracklist)
|
||||
return utils.prettify(xspf_playlist)
|
||||
|
||||
|
|
178
api/funkwhale_api/playlists/utils.py
Normal file
178
api/funkwhale_api/playlists/utils.py
Normal file
|
@ -0,0 +1,178 @@
|
|||
import logging
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
# /!\ The next import have xml vulnerabilities but this shouldn't have security implication in funkwhale
|
||||
# since there are only used to generate xspf file.
|
||||
from xml.etree.ElementTree import Element, SubElement
|
||||
|
||||
from defusedxml import ElementTree as etree
|
||||
from defusedxml import minidom
|
||||
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
|
||||
|
||||
from funkwhale_api.music.models import Album, Artist, Track
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def clean_namespace_xspf(xspf_file):
|
||||
"""
|
||||
This will delete any namaespace found in the xspf file. It will also delete any encoding info.
|
||||
This way xspf file will be compatible with our get_track_id_from_xspf function.
|
||||
"""
|
||||
file = open(xspf_file)
|
||||
with file as f:
|
||||
xspf_str = f.read()
|
||||
xspf_data = re.sub('xmlns="http://xspf.org/ns/0/"', "", xspf_str)
|
||||
# This is needed because lxml error : "ValueError: Unicode strings with encoding declaration are
|
||||
# not supported. Please use bytes input or XML fragments without declaration."
|
||||
xspf_data = re.sub("'encoding='.'", "", xspf_data)
|
||||
return xspf_data
|
||||
|
||||
|
||||
def album_exist(track, artist_id):
|
||||
try:
|
||||
album = track.find(".//album").text
|
||||
except AttributeError as e:
|
||||
logger.info(
|
||||
f"Couldn't find the following attribute while parsing the xml : {e!r}. No album information."
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
album_id = Album.objects.get(title=album, artist_id=artist_id)
|
||||
except Exception as e:
|
||||
logger.info(f"Error while quering database for album : {e!r}")
|
||||
return
|
||||
except MultipleObjectsReturned as e:
|
||||
album_id = Album.objects.filter(title=album, artist_id=artist_id).first
|
||||
return album_id
|
||||
return album_id
|
||||
|
||||
|
||||
def get_track_id_from_xspf(xspf_file):
|
||||
"""
|
||||
Return a list of funkwhale tracks id from a xspf file. Tracks not found in database are ignored.
|
||||
Usefull to generate playlist from xspf files.
|
||||
"""
|
||||
track_list = []
|
||||
xspf_data_clean = clean_namespace_xspf(xspf_file)
|
||||
tree = etree.fromstring(xspf_data_clean)
|
||||
tracks = tree.findall(".//track")
|
||||
added_track_count = 0
|
||||
|
||||
for track in tracks:
|
||||
track_id = ""
|
||||
# Getting metadata of the xspf file
|
||||
try:
|
||||
artist = track.find(".//creator").text
|
||||
title = track.find(".//title").text
|
||||
except AttributeError as e:
|
||||
logger.info(
|
||||
f"Couldn't find the following attribute while parsing the xml file for artist and title data : {e!r}. \
|
||||
Switching to next track..."
|
||||
)
|
||||
continue
|
||||
|
||||
# Finding track id in the db
|
||||
try:
|
||||
artist_id = Artist.objects.get(name=artist)
|
||||
except Exception as e:
|
||||
logger.info(f"Error while quering database : {e!r}. Switching to next track.")
|
||||
continue
|
||||
except MultipleObjectsReturned as e:
|
||||
artist_id = Artist.objects.filter(name=artist).first()
|
||||
|
||||
album_id = album_exist(track, artist_id)
|
||||
if album_id:
|
||||
try:
|
||||
track_id = Track.objects.get(
|
||||
title=title, artist=artist_id.id, album=album_id.id
|
||||
)
|
||||
except ObjectDoesNotExist as e :
|
||||
logger.info(f"Couldn't find track in the database : {e!r}. Trying without album...")
|
||||
except MultipleObjectsReturned as e:
|
||||
track_id = Track.objects.filter(
|
||||
title=title, artist=artist_id.id, album=album_id.id
|
||||
).first()
|
||||
|
||||
else:
|
||||
try:
|
||||
track_id = Track.objects.get(title=title, artist=artist_id.id)
|
||||
except ObjectDoesNotExist as e:
|
||||
logger.info(f"Couldn't find track in the database : {e!r}")
|
||||
continue
|
||||
except MultipleObjectsReturned as e:
|
||||
track_id = Track.objects.filter(title=title, artist=artist_id.id).first()
|
||||
|
||||
if track_id:
|
||||
track_list.append(track_id.id)
|
||||
added_track_count = added_track_count + 1
|
||||
|
||||
logger.info(
|
||||
str(len(tracks))
|
||||
+ " tracks where found in xspf file. "
|
||||
+ str(added_track_count)
|
||||
+ " are gonna be added to playlist."
|
||||
)
|
||||
return track_list
|
||||
|
||||
|
||||
def generate_xspf_from_tracks_ids(tracks_ids):
|
||||
"""
|
||||
This returns a string containing playlist data in xspf format. It's used for test purposes.
|
||||
"""
|
||||
xspf_title = "An automated generated playlist"
|
||||
now = datetime.now()
|
||||
xspf_date = now.strftime("%m/%d/%Y")
|
||||
xspf_playlist = Element("playlist")
|
||||
xspf_tracklist = write_xspf_headers(xspf_playlist, xspf_title, xspf_date)
|
||||
|
||||
for track_id in tracks_ids:
|
||||
try:
|
||||
track = Track.objects.get(id=track_id)
|
||||
write_xspf_track_data(track, xspf_tracklist)
|
||||
except ObjectDoesNotExist as e:
|
||||
logger.info(f"Error while quering database : {e!r}")
|
||||
return prettify(xspf_playlist)
|
||||
|
||||
|
||||
def write_xspf_headers(xspf_playlist, xspf_title, xspf_date):
|
||||
"""
|
||||
This generate the playlist metadata and return a trackList subelement used to insert each track
|
||||
into the playlist
|
||||
"""
|
||||
xspf_playlist.set("version", "1")
|
||||
title_xspf = SubElement(xspf_playlist, "title")
|
||||
title_xspf.text = xspf_title
|
||||
date_xspf = SubElement(xspf_playlist, "date")
|
||||
date_xspf.text = xspf_date
|
||||
trackList_xspf = SubElement(xspf_playlist, "trackList")
|
||||
return trackList_xspf
|
||||
|
||||
|
||||
def write_xspf_track_data(track, trackList_xspf):
|
||||
"""
|
||||
Insert a track into the trackList subelement of a xspf file
|
||||
"""
|
||||
track_xspf = SubElement(trackList_xspf, "track")
|
||||
location_xspf = SubElement(track_xspf, "location")
|
||||
location_xspf.text = "https://" + track.domain_name + track.listen_url
|
||||
title_xspf = SubElement(track_xspf, "title")
|
||||
title_xspf.text = str(track.title)
|
||||
creator_xspf = SubElement(track_xspf, "creator")
|
||||
creator_xspf.text = str(track.artist)
|
||||
if str(track.album) == "[non-album tracks]":
|
||||
return
|
||||
else:
|
||||
album_xspf = SubElement(track_xspf, "album")
|
||||
album_xspf.text = str(track.album)
|
||||
|
||||
|
||||
def prettify(elem):
|
||||
"""
|
||||
Return a pretty-printed XML string for the Element.
|
||||
"""
|
||||
rough_string = etree.tostring(elem, "utf-8")
|
||||
reparsed = minidom.parseString(rough_string)
|
||||
return reparsed.toprettyxml(indent=" ")
|
|
@ -1,5 +1,7 @@
|
|||
from defusedxml import ElementTree as etree
|
||||
|
||||
from funkwhale_api.federation import serializers as federation_serializers
|
||||
from funkwhale_api.playlists import serializers
|
||||
from funkwhale_api.playlists import models, serializers
|
||||
from funkwhale_api.users import serializers as users_serializers
|
||||
|
||||
|
||||
|
@ -91,3 +93,14 @@ def test_playlist_serializer(factories, to_api_date):
|
|||
serializer = serializers.PlaylistSerializer(playlist)
|
||||
|
||||
assert serializer.data == expected
|
||||
|
||||
|
||||
def test_generate_xspf_from_playlist(factories):
|
||||
playlist = factories["playlists.PlaylistTrack"]()
|
||||
playlist_factory = models.Playlist.objects.get()
|
||||
xspf_test = serializers.PlaylistSerializer.generate_xspf_from_playlist(playlist.id)
|
||||
tree = etree.fromstring(xspf_test)
|
||||
track1 = playlist_factory.playlist_tracks.get(id=1)
|
||||
track1_name = track1.track
|
||||
assert playlist_factory.name == tree.findtext("./title")
|
||||
assert track1_name.title == tree.findtext("./trackList/track/title")
|
||||
|
|
17
api/tests/playlists/test_utils.py
Normal file
17
api/tests/playlists/test_utils.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
import os
|
||||
|
||||
from funkwhale_api.playlists import utils
|
||||
|
||||
|
||||
def test_get_track_id_from_xspf(factories, tmp_path):
|
||||
track1 = factories["music.Track"]()
|
||||
track2 = factories["music.Track"]()
|
||||
tracks_ids = [track1.id, track2.id]
|
||||
xspf_content = utils.generate_xspf_from_tracks_ids(tracks_ids)
|
||||
f = open("test.xspf", "w")
|
||||
f.write(xspf_content)
|
||||
f.close()
|
||||
xspf_file = "test.xspf"
|
||||
expected = [track1.id, track2.id]
|
||||
assert utils.get_track_id_from_xspf(xspf_file) == expected
|
||||
os.remove("test.xspf")
|
1
changes/changelog.d/1610.enhancement
Normal file
1
changes/changelog.d/1610.enhancement
Normal file
|
@ -0,0 +1 @@
|
|||
ListenBrainz: Submit media player and submission client information
|
Loading…
Add table
Add a link
Reference in a new issue