152 lines
6.1 KiB
Python
Executable file
152 lines
6.1 KiB
Python
Executable file
from getpass import getpass
|
|
import glob
|
|
import hashlib
|
|
from logging import Logger
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import random
|
|
import requests
|
|
import sys
|
|
import time
|
|
|
|
from librespot.audio.decoders import VorbisOnlyAudioQuality
|
|
from librespot.core import Session
|
|
|
|
from spodcast.config import Config
|
|
from spodcast.const import CREDENTIALS_PREFIX, TYPE, USER_READ_EMAIL, OFFSET, LIMIT
|
|
from spodcast.feedgenerator import RSS_FEED_FILE_NAME, RSS_INDEX_CODE
|
|
|
|
class Spodcast:
|
|
SESSION: Session = None
|
|
DOWNLOAD_QUALITY = None
|
|
CONFIG: Config = Config()
|
|
LOG: Logger = None
|
|
|
|
def __init__(self, args):
|
|
Spodcast.CONFIG.load(args)
|
|
logging.basicConfig(level=Spodcast.CONFIG.get_log_level())
|
|
log = logging.getLogger(__name__)
|
|
Spodcast.LOG = log
|
|
log.debug("args: %s", args)
|
|
if args.prepare_feed is True:
|
|
root_path=Spodcast.CONFIG.get_root_path()
|
|
os.makedirs(root_path, exist_ok=True)
|
|
if os.path.exists(root_path):
|
|
index_file_name = os.path.join(root_path, RSS_FEED_FILE_NAME)
|
|
if not os.path.isfile(index_file_name):
|
|
rss_file = open(index_file_name, "w")
|
|
rss_file.write(RSS_INDEX_CODE(Spodcast.CONFIG.get_bin_path(), os.path.basename(Spodcast.CONFIG.get_config_path())))
|
|
rss_file.close()
|
|
else:
|
|
sys.exit(f"Can not create root path {root_path}")
|
|
|
|
if args.login:
|
|
filename = args.login
|
|
if os.path.exists(filename):
|
|
with open(filename, 'r', encoding='utf-8') as file:
|
|
for line in file.readlines():
|
|
Spodcast.account(line.strip())
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(f"Can not read username/password file {filename}")
|
|
|
|
Spodcast.login()
|
|
|
|
@classmethod
|
|
def account(cls, line):
|
|
cred_directory = Config.get_config_dir()
|
|
if os.path.isdir(cred_directory):
|
|
(username,password) = line.split()
|
|
cred_filename = CREDENTIALS_PREFIX + "-" + hashlib.md5(username.encode('utf-8'),usedforsecurity=False).hexdigest() + ".json"
|
|
cred_location = os.path.join(cred_directory, cred_filename)
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
|
|
session = Session.Builder(conf).user_pass(username, password).create()
|
|
if not session.is_valid():
|
|
Spodcast.LOG.error("Invalid username/password for username " + username);
|
|
|
|
@classmethod
|
|
def login(cls):
|
|
cred_directory = Config.get_config_dir()
|
|
credfiles = glob.glob(os.path.join(cred_directory, CREDENTIALS_PREFIX) + "-*.json")
|
|
if credfiles:
|
|
random.shuffle(credfiles)
|
|
for credfile in credfiles:
|
|
try:
|
|
cred_location = os.path.join(cred_directory, credfile)
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
|
|
session = Session.Builder(conf).stored_file().create()
|
|
if session.is_valid():
|
|
cls.SESSION = session
|
|
return
|
|
else:
|
|
Spodcast.LOG.warning(f"Invalid credentials in {cred_location}")
|
|
except RuntimeError:
|
|
Spodcast.LOG.error("RuntimeError")
|
|
pass
|
|
|
|
cred_location = Config.get_credentials_location()
|
|
|
|
if os.path.isfile(cred_location):
|
|
try:
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
|
|
cls.SESSION = Session.Builder(conf).stored_file().create()
|
|
return
|
|
except RuntimeError:
|
|
pass
|
|
while True:
|
|
user_name = ''
|
|
while len(user_name) == 0:
|
|
user_name = input('Username: ')
|
|
password = getpass()
|
|
try:
|
|
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
|
|
cls.SESSION = Session.Builder(conf).user_pass(user_name, password).create()
|
|
return
|
|
except RuntimeError:
|
|
pass
|
|
|
|
@classmethod
|
|
def get_content_stream(cls, content_id, quality):
|
|
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
|
|
|
|
@classmethod
|
|
def __get_auth_token(cls):
|
|
return cls.SESSION.tokens().get_token(USER_READ_EMAIL).access_token
|
|
|
|
@classmethod
|
|
def get_auth_header(cls):
|
|
return {
|
|
'Authorization': f'Bearer {cls.__get_auth_token()}',
|
|
'Accept-Language': f'{cls.CONFIG.get_language()}'
|
|
}
|
|
|
|
@classmethod
|
|
def get_auth_header_and_params(cls, limit, offset):
|
|
return {
|
|
'Authorization': f'Bearer {cls.__get_auth_token()}',
|
|
'Accept-Language': f'{cls.CONFIG.get_language()}'
|
|
}, {LIMIT: limit, OFFSET: offset}
|
|
|
|
@classmethod
|
|
def invoke_url_with_params(cls, url, limit, offset, **kwargs):
|
|
headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset)
|
|
params.update(kwargs)
|
|
return requests.get(url, headers=headers, params=params).json()
|
|
|
|
@classmethod
|
|
def invoke_url(cls, url, tryCount=0):
|
|
headers = cls.get_auth_header()
|
|
response = requests.get(url, headers=headers)
|
|
responsetext = response.text
|
|
responsejson = response.json()
|
|
|
|
if 'error' in responsejson:
|
|
if tryCount < (cls.CONFIG.get_retry_attempts() - 1):
|
|
Spodcast.LOG.warning(f"Spotify API Error (try {tryCount + 1}) ({responsejson['error']['status']}): {responsejson['error']['message']}")
|
|
time.sleep(5)
|
|
return cls.invoke_url(url, tryCount + 1)
|
|
|
|
Spodcast.LOG.error(f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}")
|
|
|
|
return responsetext, responsejson
|