Spodcast/spodcast/spodcast.py
Frank de Lange 14b213b315 New release, 0.4.1
Changes:
 - added --transcode yes/no to enable transcoding .off into .mp3 for handicapped devices which do not support open codes (iOS, looking at you here)
 - added webcron endpoint to run feed updates in situations where the system scheduler can not be used
 - feed manager is now mostly a single page app with live updates
 - added -v (version) option
 - added versioned updatees for feed and index manager

Fixes:
 - direct login now works as intended

New install requirements:
 - ffmpeg-python
 - setuptools

The change to a SPA was necessitated by the introduction of the `--transcode yes/no` option which (when activated) causes feed updates to take much more time, especially on less powerful hardware. This would cause the feed manager process to timeout before the feeds were updated. This problem is mostly fixed but can still occur in the webcron update process. If this happens the php-fpm and/or web server timeout needs to be increased. This should only happen on slower hardware and/or slow links.
2022-03-01 23:37:26 +00:00

152 lines
6.2 KiB
Python
Executable file

from getpass import getpass
import glob
import hashlib
from logging import Logger
import logging
import os
import os.path
import random
import requests
import sys
import time
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import Session
from spodcast.config import Config
from spodcast.const import CREDENTIALS_PREFIX, TYPE, USER_READ_EMAIL, OFFSET, LIMIT
from spodcast.feedgenerator import RSS_FEED_FILE_NAME, RSS_INDEX_CODE, get_index_version
class Spodcast:
SESSION: Session = None
DOWNLOAD_QUALITY = None
CONFIG: Config = Config()
LOG: Logger = None
def __init__(self, args):
Spodcast.CONFIG.load(args)
logging.basicConfig(level=Spodcast.CONFIG.get_log_level())
log = logging.getLogger(__name__)
Spodcast.LOG = log
log.debug("args: %s", args)
if args.prepare_feed is True:
root_path=Spodcast.CONFIG.get_root_path()
os.makedirs(root_path, exist_ok=True)
if os.path.exists(root_path):
index_file_name = os.path.join(root_path, RSS_FEED_FILE_NAME)
if not os.path.isfile(index_file_name) or int(get_index_version(index_file_name)) < Spodcast.CONFIG.get_version_int():
rss_file = open(index_file_name, "w")
rss_file.write(RSS_INDEX_CODE(Spodcast.CONFIG.get_bin_path(), os.path.basename(Spodcast.CONFIG.get_config_path()), Spodcast.CONFIG.get_version_str()))
rss_file.close()
else:
sys.exit(f"Can not create root path {root_path}")
if args.login:
filename = args.login
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
for line in file.readlines():
Spodcast.account(line.strip())
sys.exit(0)
else:
sys.exit(f"Can not read username/password file {filename}")
Spodcast.login()
@classmethod
def account(cls, line):
cred_directory = Config.get_config_dir()
if os.path.isdir(cred_directory):
(username,password) = line.split()
cred_filename = CREDENTIALS_PREFIX + "-" + hashlib.md5(username.encode('utf-8'),usedforsecurity=False).hexdigest() + ".json"
cred_location = os.path.join(cred_directory, cred_filename)
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
session = Session.Builder(conf).user_pass(username, password).create()
if not session.is_valid():
Spodcast.LOG.error("Invalid username/password for username " + username);
@classmethod
def login(cls):
cred_directory = Config.get_config_dir()
credfiles = glob.glob(os.path.join(cred_directory, CREDENTIALS_PREFIX) + "-*.json")
if credfiles:
random.shuffle(credfiles)
for credfile in credfiles:
try:
cred_location = os.path.join(cred_directory, credfile)
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
session = Session.Builder(conf).stored_file().create()
if session.is_valid():
cls.SESSION = session
return
else:
Spodcast.LOG.warning(f"Invalid credentials in {cred_location}")
except RuntimeError:
Spodcast.LOG.error("RuntimeError")
pass
cred_location = Config.get_credentials_location()
if os.path.isfile(cred_location):
try:
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
cls.SESSION = Session.Builder(conf).stored_file().create()
return
except RuntimeError:
pass
while True:
user_name = ''
while len(user_name) == 0:
user_name = input('Username: ')
password = getpass()
try:
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
cls.SESSION = Session.Builder(conf).user_pass(user_name, password).create()
return
except RuntimeError:
pass
@classmethod
def get_content_stream(cls, content_id, quality):
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
@classmethod
def __get_auth_token(cls):
return cls.SESSION.tokens().get_token(USER_READ_EMAIL).access_token
@classmethod
def get_auth_header(cls):
return {
'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get_language()}'
}
@classmethod
def get_auth_header_and_params(cls, limit, offset):
return {
'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get_language()}'
}, {LIMIT: limit, OFFSET: offset}
@classmethod
def invoke_url_with_params(cls, url, limit, offset, **kwargs):
headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset)
params.update(kwargs)
return requests.get(url, headers=headers, params=params).json()
@classmethod
def invoke_url(cls, url, tryCount=0):
headers = cls.get_auth_header()
response = requests.get(url, headers=headers)
responsetext = response.text
responsejson = response.json()
if 'error' in responsejson:
if tryCount < (cls.CONFIG.get_retry() - 1):
Spodcast.LOG.warning(f"Spotify API Error (try {tryCount + 1}) ({responsejson['error']['status']}): {responsejson['error']['message']}")
time.sleep(5)
return cls.invoke_url(url, tryCount + 1)
Spodcast.LOG.error(f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}")
return responsetext, responsejson