initial commit

This commit is contained in:
Frank de Lange 2022-02-13 14:05:30 +00:00
commit f59b9683f7
15 changed files with 1542 additions and 0 deletions

0
spodcast/__init__.py Normal file
View file

38
spodcast/__main__.py Normal file
View file

@ -0,0 +1,38 @@
import argparse
from spodcast.app import client
from spodcast.config import CONFIG_VALUES
def main():
parser = argparse.ArgumentParser(prog='spodcast',
description='A caching Spotify podcast to RSS proxy.')
parser.add_argument('-c', '--config-location',
type=str,
help='Specify the spodcast.json location')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('urls',
type=str,
# action='extend',
default='',
nargs='*',
help='Download podcast episode(s) from a url. Can take multiple urls.')
group.add_argument('-l', '--login',
type=str,
help='Reads username and password from file passed as argument and stores credentials for later use.')
for configkey in CONFIG_VALUES:
parser.add_argument(CONFIG_VALUES[configkey]['arg'],
type=str,
default=None,
help=CONFIG_VALUES[configkey]['help'])
parser.set_defaults(func=client)
args = parser.parse_args()
args.func(args)
if __name__ == '__main__':
main()

24
spodcast/app.py Normal file
View file

@ -0,0 +1,24 @@
import logging
from itertools import islice
from librespot.audio.decoders import AudioQuality
from spodcast.podcast import download_episode, get_show_episodes
from spodcast.utils import regex_input_for_urls
from spodcast.spodcast import Spodcast
log = logging.getLogger(__name__)
def client(args) -> None:
Spodcast(args)
Spodcast.DOWNLOAD_QUALITY = AudioQuality.NORMAL
if args.urls:
for spotify_url in args.urls:
episode_id, show_id = regex_input_for_urls(spotify_url)
log.debug(f"episode_id {episode_id}. show_id {show_id}")
if episode_id is not None:
download_episode(episode_id)
elif show_id is not None:
for episode in islice(get_show_episodes(show_id), Spodcast.CONFIG.get_max_episodes()):
download_episode(episode)

174
spodcast/config.py Normal file
View file

@ -0,0 +1,174 @@
import json
import os
from typing import Any
CONFIG_FILE_PATH = '../spodcast.json'
CONFIG_DIR = 'CONFIG_DIR'
ROOT_PATH = 'ROOT_PATH'
SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES'
CHUNK_SIZE = 'CHUNK_SIZE'
DOWNLOAD_REAL_TIME = 'DOWNLOAD_REAL_TIME'
LANGUAGE = 'LANGUAGE'
CREDENTIALS_LOCATION = 'CREDENTIALS_LOCATION'
RETRY_ATTEMPTS = 'RETRY_ATTEMPTS'
MAX_EPISODES = 'MAX_EPISODES'
LOG_LEVEL = 'LOG_LEVEL'
ENABLE_RSS_FEED = 'ENABLE_RSS_FEED'
CONFIG_VALUES = {
ROOT_PATH: { 'default': '../Spodcast/',
'type': str,
'arg': '--root-path',
'help': 'set root path for podcast cache' },
SKIP_EXISTING_FILES: { 'default': 'True',
'type': bool,
'arg': '--skip-existing-files',
'help': 'skip files with the same name and size' },
RETRY_ATTEMPTS: { 'default': '5',
'type': int,
'arg': '--retry-attemps',
'help': 'retry count for Spotify API access' },
MAX_EPISODES: { 'default': '1000',
'type': int,
'arg': '--max-episodes',
'help': 'number of episodes to download' },
CHUNK_SIZE: { 'default': '50000',
'type': int,
'arg': '--chunk-size',
'help': 'download chunk size' },
DOWNLOAD_REAL_TIME: { 'default': 'False',
'type': bool,
'arg': '--download-real-time',
'help': 'simulate streaming client' },
LANGUAGE: { 'default': 'en',
'type': str,
'arg': '--language',
'help': 'preferred content language' },
CREDENTIALS_LOCATION: { 'default': 'credentials.json',
'type': str,
'arg': '--credentials-location',
'help': 'path to credentials file' },
ENABLE_RSS_FEED: { 'default': 'True',
'type': bool,
'arg': '--enable-rss-feed',
'help': 'add a (php) RSS feed server and related metadata for feed. To serve the feed, point a web server at the spodcast root path as configured using --root-path.' },
LOG_LEVEL: { 'default': 'warning',
'type': str,
'arg': '--log-level',
'help': 'log level (debug/info/warning/error/critical)' }
}
class Config:
Values = {}
@classmethod
def load(cls, args) -> None:
app_dir = os.path.dirname(__file__)
config_fp = CONFIG_FILE_PATH
if args.config_location:
config_fp = args.config_location
true_config_file_path = os.path.join(app_dir, config_fp)
# Load config from spodcast.json
if not os.path.exists(true_config_file_path):
with open(true_config_file_path, 'w', encoding='utf-8') as config_file:
json.dump(cls.get_default_json(), config_file, indent=4)
cls.Values = cls.get_default_json()
else:
with open(true_config_file_path, encoding='utf-8') as config_file:
jsonvalues = json.load(config_file)
cls.Values = {}
for key in CONFIG_VALUES:
if key in jsonvalues:
cls.Values[key] = cls.parse_arg_value(key, jsonvalues[key])
# Add default values for missing keys
for key in CONFIG_VALUES:
if key not in cls.Values:
cls.Values[key] = cls.parse_arg_value(key, CONFIG_VALUES[key]['default'])
# Override config from commandline arguments
for key in CONFIG_VALUES:
if key.lower() in vars(args) and vars(args)[key.lower()] is not None:
cls.Values[key] = cls.parse_arg_value(key, vars(args)[key.lower()])
# this value should not be overriden
cls.Values[CONFIG_DIR] = os.path.dirname(true_config_file_path)
@classmethod
def get_default_json(cls) -> Any:
r = {}
for key in CONFIG_VALUES:
r[key] = CONFIG_VALUES[key]['default']
return r
@classmethod
def parse_arg_value(cls, key: str, value: Any) -> Any:
if type(value) == CONFIG_VALUES[key]['type']:
return value
if CONFIG_VALUES[key]['type'] == str:
return str(value)
if CONFIG_VALUES[key]['type'] == int:
return int(value)
if CONFIG_VALUES[key]['type'] == bool:
if str(value).lower() in ['yes', 'true', '1']:
return True
if str(value).lower() in ['no', 'false', '0']:
return False
raise ValueError("Not a boolean: " + value)
raise ValueError("Unknown Type: " + value)
@classmethod
def get(cls, key: str) -> Any:
return cls.Values.get(key)
@classmethod
def get_config_dir(cls) -> str:
return cls.get(CONFIG_DIR)
@classmethod
def get_root_path(cls) -> str:
return os.path.join(os.path.dirname(__file__), cls.get(ROOT_PATH))
@classmethod
def get_skip_existing_files(cls) -> bool:
return cls.get(SKIP_EXISTING_FILES)
@classmethod
def get_chunk_size(cls) -> int:
return cls.get(CHUNK_SIZE)
@classmethod
def get_language(cls) -> str:
return cls.get(LANGUAGE)
@classmethod
def get_download_real_time(cls) -> bool:
return cls.get(DOWNLOAD_REAL_TIME)
@classmethod
def get_credentials_location(cls) -> str:
return os.path.join(os.getcwd(), cls.get(CREDENTIALS_LOCATION))
@classmethod
def get_retry_attempts(cls) -> int:
return cls.get(RETRY_ATTEMPTS)
@classmethod
def get_max_episodes(cls) -> int:
return cls.get(MAX_EPISODES)
@classmethod
def get_enable_rss_feed(cls) -> bool:
return cls.get(ENABLE_RSS_FEED)
@classmethod
def get_log_level(cls) -> str:
return str(cls.get(LOG_LEVEL)).upper()

22
spodcast/const.py Normal file
View file

@ -0,0 +1,22 @@
ERROR = 'error'
ITEMS = 'items'
NAME = 'name'
DESCRIPTION = "description"
ID = 'id'
URL = 'url'
URI = 'uri'
EXTERNAL_URLS = 'external_urls'
SPOTIFY = 'spotify'
RELEASE_DATE = 'release_date'
IMAGES = 'images'
LIMIT = 'limit'
OFFSET = 'offset'
CREDENTIALS_PREFIX = 'spodcast-cred'
AUTHORIZATION = 'Authorization'
DURATION_MS = 'duration_ms'
SHOW = 'show'
TYPE = 'type'
USER_READ_EMAIL = 'user-read-email'
PLAYLIST_READ_PRIVATE = 'playlist-read-private'
USER_LIBRARY_READ = 'user-library-read'
FILE_EXISTS = -1

67
spodcast/feedgenerator.py Normal file
View file

@ -0,0 +1,67 @@
RSS_FEED_FILE_NAME = '.index.php'
RSS_FEED_INFO_EXTENSION = 'info'
RSS_FEED_SHOW_INDEX = 'index'
RSS_FEED_CODE = r'''<?php
header("Content-type: text/xml");
$feed_name = "Spodcast autofeed";
$feed_description = "Spodcast autofeed";
$base_url = strtok('https://' . $_SERVER['HTTP_HOST'] . $_SERVER['REQUEST_URI'], '?');
$feed_logo = "$base_url/.image.jpg";
$feed_link = $base_url;
$allowed_extensions = array('mp4','m4a','aac','mp3','ogg');
$sinfo="''' + RSS_FEED_SHOW_INDEX + "." + RSS_FEED_INFO_EXTENSION + r'''";
if(file_exists($sinfo)) {
$json=file_get_contents($sinfo);
$info=json_decode($json);
$feed_name=$info->title;
$feed_description=$info->description;
$feed_logo=$info->image;
$feed_link=$info->link;
}
?>
<?php echo '<?xml version="1.0" encoding="utf-8"?>'; // use php to output the "<?" ?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd" >
<channel>
<title><?php echo $feed_name; ?></title>
<link><?php echo $feed_link; ?></link>
<image>
<url><?php echo $feed_logo; ?></url>
<title><?php echo $feed_name; ?></title>
<link><?php echo $feed_link; ?></link>
</image>
<description><?php echo $feed_description; ?></description>
<atom:link href="<?php echo $base_url; ?>" rel="self" type="application/rss+xml" />
<?php
$raw_files = scandir ('.');
usort($raw_files, function($a, $b) {
return filemtime($a) < filemtime($b);
});
foreach ($raw_files as &$raw_file) {
$raw_file_info = pathinfo($raw_file);
$extension = strtolower($raw_file_info['extension']);
if(!empty($extension)) {
if(in_array ($extension,$allowed_extensions)) {
$finfo=$raw_file.".''' + RSS_FEED_INFO_EXTENSION + r'''";
if(file_exists($finfo)) {
$json=file_get_contents($finfo);
$info=json_decode($json);
echo " <item>\n";
echo " <title>".$info->title."</title>\n";
echo " <description>".$info->description."</description>\n";
echo " <guid>".$info->guid."</guid>\n";
echo " <link>".$base_url.$info->filename."</link>\n";
echo " <enclosure url=\"".$base_url.$info->filename."\" length=\"".$info->size."\" type=\"".$info->mimetype."\" />\n";
echo " <media:content url=\"".$base_url.$info->filename."\" medium=\"".$info->medium."\" duration=\"".$info->duration."\" type=\"".$info->mimetype."\" />\n";
echo " <pubDate>".$info->date."</pubDate>\n";
echo " <itunes:duration>".$info->duration."</itunes:duration>\n";
echo " </item>\n";
}
}
}
}
?>
</channel>
</rss>'''

186
spodcast/podcast.py Normal file
View file

@ -0,0 +1,186 @@
import json
import logging
import os
import time
from html import escape
import urllib.parse
from librespot.metadata import EpisodeId
from spodcast.const import ERROR, ID, ITEMS, NAME, SHOW, DURATION_MS, DESCRIPTION, RELEASE_DATE, URI, URL, EXTERNAL_URLS, IMAGES, SPOTIFY, FILE_EXISTS
from spodcast.feedgenerator import RSS_FEED_CODE, RSS_FEED_FILE_NAME, RSS_FEED_SHOW_INDEX, RSS_FEED_INFO_EXTENSION
from spodcast.spotapi import EPISODE_INFO_URL, SHOWS_URL, EPISODE_DOWNLOAD_URL, ANON_PODCAST_DOMAIN
from spodcast.utils import clean_filename
from spodcast.spodcast import Spodcast
log = logging.getLogger(__name__)
def get_info(episode_id_str, target="episode"):
log.info("Fetching episode information...")
(raw, info) = Spodcast.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
if not info:
log.error('INVALID EPISODE ID')
log.debug("episode info: %s", info)
if ERROR in info:
return None, None
if target == "episode":
podcast_name = info[SHOW][NAME]
episode_name = info[NAME]
duration_ms = info[DURATION_MS]
description = info[DESCRIPTION]
release_date = info[RELEASE_DATE]
uri = info[URI]
return podcast_name, duration_ms, episode_name, description, release_date, uri
elif target == "show":
podcast_name = info[SHOW][NAME]
link = info[SHOW][EXTERNAL_URLS][SPOTIFY]
description = info[SHOW][DESCRIPTION]
image = info[SHOW][IMAGES][0][URL]
return podcast_name, link, description, image
def get_show_episodes(show_id_str) -> list:
episodes = []
offset = 0
limit = 50
log.info("Fetching episodes...")
while True:
resp = Spodcast.invoke_url_with_params(
f'{SHOWS_URL}/{show_id_str}/episodes', limit=limit, offset=offset)
offset += limit
for episode in resp[ITEMS]:
episodes.append(episode[ID])
if len(resp[ITEMS]) < limit:
break
return episodes
def download_file(url, filepath):
import functools
import pathlib
import shutil
import requests
r = requests.get(url, stream=True, allow_redirects=True)
if r.status_code != 200:
r.raise_for_status() # Will only raise for 4xx codes, so...
log.error(f"Request to {url} returned status code {r.status_code}")
return
file_size = int(r.headers.get('Content-Length', 0))
if (
os.path.isfile(filepath)
and abs(file_size - os.path.getsize(filepath)) < 1000
and Spodcast.CONFIG.get_skip_existing_files()
):
return filepath, FILE_EXISTS
log.info("Downloading file")
r.raw.read = functools.partial(r.raw.read, decode_content=True)
with open(filepath, "wb") as file:
shutil.copyfileobj(r.raw, file)
return filepath, os.path.getsize(filepath)
def download_stream(stream, filepath):
size = stream.input_stream.size
if (
os.path.isfile(filepath)
and abs(size - os.path.getsize(filepath)) < 1000
and Spodcast.CONFIG.get_skip_existing_files()
):
return filepath, FILE_EXISTS
log.info("Downloading stream")
time_start = time.time()
downloaded = 0
with open(filepath, 'wb') as file:
for _ in range(int(size / Spodcast.CONFIG.get_chunk_size()) + 1):
data = stream.input_stream.stream().read(Spodcast.CONFIG.get_chunk_size())
file.write(data)
downloaded += len(data)
if Spodcast.CONFIG.get_download_real_time():
delta_real = time.time() - time_start
delta_want = (downloaded / size) * (duration_ms/1000)
log.debug(f"realtime enabled, waiting for {delta_real} seconds...")
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
return filepath, downloaded
def download_episode(episode_id) -> None:
podcast_name, duration_ms, episode_name, description, release_date, uri = get_info(episode_id, "episode")
if podcast_name is None:
log.warning('Skipping episode (podcast NOT FOUND)')
elif episode_name is None:
log.warning('Skipping episode (episode NOT FOUND)')
else:
filename = clean_filename(podcast_name + ' - ' + episode_name)
download_url = Spodcast.invoke_url(EPISODE_DOWNLOAD_URL(episode_id))[1]["data"]["episode"]["audio"]["items"][-1]["url"]
log.debug(f"download_url: {download_url}")
show_directory = os.path.realpath(os.path.join(Spodcast.CONFIG.get_root_path(), clean_filename(podcast_name) + '/'))
os.makedirs(show_directory, exist_ok=True)
if ANON_PODCAST_DOMAIN in download_url:
episode_stream_id = EpisodeId.from_base62(episode_id)
stream = Spodcast.get_content_stream(episode_stream_id, Spodcast.DOWNLOAD_QUALITY)
basename = f"{filename}.ogg"
filepath = os.path.join(show_directory, basename)
path, size = download_stream(stream, filepath)
mimetype="audio/ogg"
else:
basename=f"{filename}.mp3"
filepath = os.path.join(show_directory, basename)
path, size = download_file(download_url, filepath)
mimetype="audio/mpeg"
if size == FILE_EXISTS:
log.info(f"Skipped {podcast_name}: {episode_name}")
return
else:
log.warning(f"Downloaded {podcast_name}: {episode_name}")
if Spodcast.CONFIG.get_enable_rss_feed():
episode_info = {
"mimetype": mimetype,
"medium": "audio",
"duration": int(duration_ms/1000),
"date": time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.strptime(release_date, "%Y-%m-%d")),
"title": escape(episode_name), "guid": uri, "description": escape(description),
"filename": urllib.parse.quote(basename),
"size": int(size) }
info_file = open(os.path.join(show_directory, f"{basename}.{RSS_FEED_INFO_EXTENSION}"), "w")
info_file.write(json.dumps(episode_info))
info_file.close()
show_index_file_name = os.path.join(show_directory, f"{RSS_FEED_SHOW_INDEX}.{RSS_FEED_INFO_EXTENSION}")
if not os.path.isfile(show_index_file_name):
podcast_name, link, description, image = get_info(episode_id, "show")
show_info = {
"title": escape(podcast_name),
"link": link,
"description": escape(description),
"image": image }
show_index_file = open(show_index_file_name, "w")
show_index_file.write(json.dumps(show_info))
show_index_file.close()
rss_file_name = os.path.join(show_directory, RSS_FEED_FILE_NAME)
if not os.path.isfile(rss_file_name):
rss_file = open(rss_file_name, "w")
rss_file.write(RSS_FEED_CODE)
rss_file.close()

139
spodcast/spodcast.py Executable file
View file

@ -0,0 +1,139 @@
from getpass import getpass
import glob
import hashlib
from logging import Logger
import logging
import os
import os.path
import random
import requests
import sys
import time
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import Session
from spodcast.config import Config
from spodcast.const import CREDENTIALS_PREFIX, TYPE, USER_READ_EMAIL, OFFSET, LIMIT, PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ
class Spodcast:
SESSION: Session = None
DOWNLOAD_QUALITY = None
CONFIG: Config = Config()
LOG: Logger = None
def __init__(self, args):
Spodcast.CONFIG.load(args)
logging.basicConfig(level=Spodcast.CONFIG.get_log_level())
log = logging.getLogger(__name__)
Spodcast.LOG = log
log.debug("args: %s", args)
if args.login:
filename = args.login
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
for line in file.readlines():
Spodcast.account(line.strip())
sys.exit(0)
else:
sys.exit(f"Can not read username/password file {filename}")
Spodcast.login()
@classmethod
def account(cls, line):
cred_directory = Config.get_config_dir()
if os.path.isdir(cred_directory):
(username,password) = line.split()
cred_filename = CREDENTIALS_PREFIX + "-" + hashlib.md5(username.encode('utf-8'),usedforsecurity=False).hexdigest() + ".json"
cred_location = os.path.join(cred_directory, cred_filename)
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
session = Session.Builder(conf).user_pass(username, password).create()
if not session.is_valid():
Spodcast.LOG.error("Invalid username/password for username " + username);
@classmethod
def login(cls):
cred_directory = Config.get_config_dir()
credfiles = glob.glob(os.path.join(cred_directory, CREDENTIALS_PREFIX) + "-*.json")
if credfiles:
random.shuffle(credfiles)
for credfile in credfiles:
try:
cred_location = os.path.join(cred_directory, credfile)
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
session = Session.Builder(conf).stored_file().create()
if session.is_valid():
cls.SESSION = session
return
else:
Spodcast.LOG.warning(f"Invalid credentials in {cred_location}")
except RuntimeError:
Spodcast.LOG.error("RuntimeError")
pass
cred_location = Config.get_credentials_location()
if os.path.isfile(cred_location):
try:
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).set_store_credentials(False).build()
cls.SESSION = Session.Builder(conf).stored_file().create()
return
except RuntimeError:
pass
while True:
user_name = ''
while len(user_name) == 0:
user_name = input('Username: ')
password = getpass()
try:
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
cls.SESSION = Session.Builder(conf).user_pass(user_name, password).create()
return
except RuntimeError:
pass
@classmethod
def get_content_stream(cls, content_id, quality):
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
@classmethod
def __get_auth_token(cls):
return cls.SESSION.tokens().get_token(USER_READ_EMAIL).access_token
@classmethod
def get_auth_header(cls):
return {
'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get_language()}'
}
@classmethod
def get_auth_header_and_params(cls, limit, offset):
return {
'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get_language()}'
}, {LIMIT: limit, OFFSET: offset}
@classmethod
def invoke_url_with_params(cls, url, limit, offset, **kwargs):
headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset)
params.update(kwargs)
return requests.get(url, headers=headers, params=params).json()
@classmethod
def invoke_url(cls, url, tryCount=0):
headers = cls.get_auth_header()
response = requests.get(url, headers=headers)
responsetext = response.text
responsejson = response.json()
if 'error' in responsejson:
if tryCount < (cls.CONFIG.get_retry_attempts() - 1):
Spodcast.LOG.warning(f"Spotify API Error (try {tryCount + 1}) ({responsejson['error']['status']}): {responsejson['error']['message']}")
time.sleep(5)
return cls.invoke_url(url, tryCount + 1)
Spodcast.LOG.error(f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}")
return responsetext, responsejson

9
spodcast/spotapi.py Normal file
View file

@ -0,0 +1,9 @@
EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes'
SHOWS_URL = 'https://api.spotify.com/v1/shows'
EPISODE_DOWNLOAD_URL = lambda episode_id: f'https://api-partner.spotify.com/pathfinder/v1/query?operationName=getEpisode&variables={{"uri":"spotify:episode:{episode_id}"}}&extensions={{"persistedQuery":{{"version":1,"sha256Hash":"224ba0fd89fcfdfb3a15fa2d82a6112d3f4e2ac88fba5c6713de04d1b72cf482"}}}}'
ANON_PODCAST_DOMAIN = 'anon-podcast.scdn.co'

49
spodcast/utils.py Normal file
View file

@ -0,0 +1,49 @@
import re
import string
import unicodedata
from enum import Enum
from typing import List, Tuple
from .spodcast import Spodcast
valid_filename_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
def regex_input_for_urls(search_input) -> Tuple[str, str, str, str, str, str]:
episode_uri_search = re.search(
r'^spotify:episode:(?P<EpisodeID>[0-9a-zA-Z]{22})$', search_input)
episode_url_search = re.search(
r'^(https?://)?open\.spotify\.com/episode/(?P<EpisodeID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
show_uri_search = re.search(
r'^spotify:show:(?P<ShowID>[0-9a-zA-Z]{22})$', search_input)
show_url_search = re.search(
r'^(https?://)?open\.spotify\.com/show/(?P<ShowID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
if episode_uri_search is not None or episode_url_search is not None:
episode_id_str = (episode_uri_search
if episode_uri_search is not None else
episode_url_search).group('EpisodeID')
else:
episode_id_str = None
if show_uri_search is not None or show_url_search is not None:
show_id_str = (show_uri_search
if show_uri_search is not None else
show_url_search).group('ShowID')
else:
show_id_str = None
return episode_id_str, show_id_str
def clean_filename(filename, whitelist=valid_filename_chars, replace=' '):
for r in replace:
filename = filename.replace(r,'_')
cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode()
cleaned_filename = ''.join(c for c in cleaned_filename if c in whitelist)
return cleaned_filename