
Changes: - added --transcode yes/no to enable transcoding .off into .mp3 for handicapped devices which do not support open codes (iOS, looking at you here) - added webcron endpoint to run feed updates in situations where the system scheduler can not be used - feed manager is now mostly a single page app with live updates - added -v (version) option - added versioned updatees for feed and index manager Fixes: - direct login now works as intended New install requirements: - ffmpeg-python - setuptools The change to a SPA was necessitated by the introduction of the `--transcode yes/no` option which (when activated) causes feed updates to take much more time, especially on less powerful hardware. This would cause the feed manager process to timeout before the feeds were updated. This problem is mostly fixed but can still occur in the webcron update process. If this happens the php-fpm and/or web server timeout needs to be increased. This should only happen on slower hardware and/or slow links.
152 lines
6.2 KiB
Python
Executable file
152 lines
6.2 KiB
Python
Executable file
from getpass import getpass
|
|
import glob
|
|
import hashlib
|
|
from logging import Logger
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import random
|
|
import requests
|
|
import sys
|
|
import time
|
|
|
|
from librespot.audio.decoders import VorbisOnlyAudioQuality
|
|
from librespot.core import Session
|
|
|
|
from spodcast.config import Config
|
|
from spodcast.const import CREDENTIALS_PREFIX, TYPE, USER_READ_EMAIL, OFFSET, LIMIT
|
|
from spodcast.feedgenerator import RSS_FEED_FILE_NAME, RSS_INDEX_CODE, get_index_version
|
|
|
|
class Spodcast:
|
|
SESSION: Session = None
|
|
DOWNLOAD_QUALITY = None
|
|
CONFIG: Config = Config()
|
|
LOG: Logger = None
|
|
|
|
def __init__(self, args):
|
|
Spodcast.CONFIG.load(args)
|
|
logging.basicConfig(level=Spodcast.CONFIG.get_log_level())
|
|
log = logging.getLogger(__name__)
|
|
Spodcast.LOG = log
|
|
log.debug("args: %s", args)
|
|
if args.prepare_feed is True:
|
|
root_path=Spodcast.CONFIG.get_root_path()
|
|
os.makedirs(root_path, exist_ok=True)
|
|
if os.path.exists(root_path):
|
|
index_file_name = os.path.join(root_path, RSS_FEED_FILE_NAME)
|
|
if not os.path.isfile(index_file_name) or int(get_index_version(index_file_name)) < Spodcast.CONFIG.get_version_int():
|
|
rss_file = open(index_file_name, "w")
|
|
rss_file.write(RSS_INDEX_CODE(Spodcast.CONFIG.get_bin_path(), os.path.basename(Spodcast.CONFIG.get_config_path()), Spodcast.CONFIG.get_version_str()))
|
|
rss_file.close()
|
|
else:
|
|
sys.exit(f"Can not create root path {root_path}")
|
|
|
|
if args.login:
|
|
filename = args.login
|
|
if os.path.exists(filename):
|
|
with open(filename, 'r', encoding='utf-8') as file:
|
|
for line in file.readlines():
|
|
Spodcast.account(line.strip())
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(f"Can not read username/password file {filename}")
|
|
|
|
Spodcast.login()
|
|
|
|
@classmethod
|
|
def account(cls, line):
|
|
cred_directory = Config.get_config_dir()
|
|
if os.path.isdir(cred_directory):
|
|
(username,password) = line.split()
|
|
cred_filename = CREDENTIALS_PREFIX + "-" + hashlib.md5(username.encode('utf-8'),usedforsecurity=False).hexdigest() + ".json"
|
|
cred_location = os.path.join(cred_directory, cred_filename)
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
|
|
session = Session.Builder(conf).user_pass(username, password).create()
|
|
if not session.is_valid():
|
|
Spodcast.LOG.error("Invalid username/password for username " + username);
|
|
|
|
@classmethod
|
|
def login(cls):
|
|
cred_directory = Config.get_config_dir()
|
|
credfiles = glob.glob(os.path.join(cred_directory, CREDENTIALS_PREFIX) + "-*.json")
|
|
if credfiles:
|
|
random.shuffle(credfiles)
|
|
for credfile in credfiles:
|
|
try:
|
|
cred_location = os.path.join(cred_directory, credfile)
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
|
|
session = Session.Builder(conf).stored_file().create()
|
|
if session.is_valid():
|
|
cls.SESSION = session
|
|
return
|
|
else:
|
|
Spodcast.LOG.warning(f"Invalid credentials in {cred_location}")
|
|
except RuntimeError:
|
|
Spodcast.LOG.error("RuntimeError")
|
|
pass
|
|
|
|
cred_location = Config.get_credentials_location()
|
|
|
|
if os.path.isfile(cred_location):
|
|
try:
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
|
|
cls.SESSION = Session.Builder(conf).stored_file().create()
|
|
return
|
|
except RuntimeError:
|
|
pass
|
|
while True:
|
|
user_name = ''
|
|
while len(user_name) == 0:
|
|
user_name = input('Username: ')
|
|
password = getpass()
|
|
try:
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
|
|
cls.SESSION = Session.Builder(conf).user_pass(user_name, password).create()
|
|
return
|
|
except RuntimeError:
|
|
pass
|
|
|
|
@classmethod
|
|
def get_content_stream(cls, content_id, quality):
|
|
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
|
|
|
|
@classmethod
|
|
def __get_auth_token(cls):
|
|
return cls.SESSION.tokens().get_token(USER_READ_EMAIL).access_token
|
|
|
|
@classmethod
|
|
def get_auth_header(cls):
|
|
return {
|
|
'Authorization': f'Bearer {cls.__get_auth_token()}',
|
|
'Accept-Language': f'{cls.CONFIG.get_language()}'
|
|
}
|
|
|
|
@classmethod
|
|
def get_auth_header_and_params(cls, limit, offset):
|
|
return {
|
|
'Authorization': f'Bearer {cls.__get_auth_token()}',
|
|
'Accept-Language': f'{cls.CONFIG.get_language()}'
|
|
}, {LIMIT: limit, OFFSET: offset}
|
|
|
|
@classmethod
|
|
def invoke_url_with_params(cls, url, limit, offset, **kwargs):
|
|
headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset)
|
|
params.update(kwargs)
|
|
return requests.get(url, headers=headers, params=params).json()
|
|
|
|
@classmethod
|
|
def invoke_url(cls, url, tryCount=0):
|
|
headers = cls.get_auth_header()
|
|
response = requests.get(url, headers=headers)
|
|
responsetext = response.text
|
|
responsejson = response.json()
|
|
|
|
if 'error' in responsejson:
|
|
if tryCount < (cls.CONFIG.get_retry() - 1):
|
|
Spodcast.LOG.warning(f"Spotify API Error (try {tryCount + 1}) ({responsejson['error']['status']}): {responsejson['error']['message']}")
|
|
time.sleep(5)
|
|
return cls.invoke_url(url, tryCount + 1)
|
|
|
|
Spodcast.LOG.error(f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}")
|
|
|
|
return responsetext, responsejson
|