""" handles moving tv downloads """ import logging import os import re import subprocess from time import sleep import requests from src.config import get_config class Static: """ staticmethods collection used from EpisodeIdentify """ @staticmethod def split_file_name(filename): """ takes the file name, returns showname, season, episode and id_style based on regex match """ multi_reg = r'[sS][0-9]{1,3}[eE][0-9]{1,3}-?[eE][0-9]{1,3}' if re.compile(multi_reg).findall(filename): # S01E01-02 season_id_pattern = re.compile(multi_reg) season_id = season_id_pattern.findall(filename)[0] get_s_nr = re.compile(r'[0-9]{1,3}') season = str(get_s_nr.findall(season_id)[0]) e_list = get_s_nr.findall(season_id)[1:] episode = ' '.join(e_list) id_style = 'multi' elif re.compile(r'[sS][0-9]{1,3} ?[eE][0-9]{1,3}').findall(filename): # S01E01 season_id_pattern = re.compile(r'[sS]\d{1,3} ?[eE]\d{1,3}') season_id = season_id_pattern.findall(filename)[0] get_s_nr = re.compile(r'[0-9]{1,3}') season = str(get_s_nr.findall(season_id)[0]) episode = str(get_s_nr.findall(season_id)[1]) id_style = 'se' elif re.compile(r'[0-9]{4}.[0-9]{2}.[0-9]{2}').findall(filename): # YYYY.MM.DD season_id_pattern = re.compile(r'[0-9]{4}.[0-9]{2}.[0-9]{2}') season_id = season_id_pattern.findall(filename)[0] season = "NA" episode = "NA" id_style = 'year' elif re.compile(r'0?[0-9][xX][0-9]{1,2}').findall(filename): # 01X01 season_id_pattern = re.compile(r'0?[0-9][xX][0-9]{2}') season_id = season_id_pattern.findall(filename)[0] get_s_nr = re.compile(r'[0-9]{1,3}') season = str(get_s_nr.findall(season_id)[0]) episode = str(get_s_nr.findall(season_id)[1]) id_style = 'se' elif re.compile(r'[sS][0-9]{1,3}[. ]?[eE][0-9]{1,3}').findall(filename): # S01*E01 season_id_pattern = re.compile(r'[sS]\d{1,3}[. ]?[eE]\d{1,3}') season_id = season_id_pattern.findall(filename)[0] get_s_nr = re.compile(r'[0-9]{1,3}') season = str(get_s_nr.findall(season_id)[0]) episode = str(get_s_nr.findall(season_id)[1]) id_style = 'se' else: # id syle not dealt with print('season episode id failed for:') print(filename) raise ValueError return season, episode, season_id, id_style @staticmethod def showname_encoder(showname): """ encodes showname for best possible match """ # tvmaze doesn't like years in showname showname = showname.strip().rstrip('-').rstrip(".").strip().lower() year_pattern = re.compile(r'\(?[0-9]{4}\)?') year = year_pattern.findall(showname) if year and year[0] != showname: showname = showname.rstrip(year[0]).strip() # find acronym acronym = [i for i in showname.split(".") if len(i) == 1] # clean up encoded = showname.replace(" ", "%20") encoded = encoded.replace(".", "%20").replace("'", "%27") # put acronym back if acronym: to_replace = "%20".join(acronym) original_acronym = ".".join(acronym) encoded = encoded.replace(to_replace, original_acronym) return encoded @staticmethod def tvmaze_request(url): """ call the api with back_off on rate limit and user-agent """ headers = { 'User-Agent': 'https://github.com/bbilly1/media_organizer' } # retry up to 5 times for i in range(5): response = requests.get(url, headers=headers) if response.ok: # all good break if response.status_code == 429: # rate limited print('hit tvmaze rate limiting, slowing down') else: # general fail print('request failed with url:\n' + url) # slow down back_off = (i + 1) ** 2 sleep(back_off) request = response.json() return request class Episode: """ describes single episode """ def __init__(self, filename, discovered): self.filename = filename self.discovered = discovered self.file_parsed = self.parse_filename() showname = self.file_parsed['showname'] show_id = None showname_clean = None for i in discovered: if showname == i['showname']: # found it show_id = i['show_id'] showname_clean = i['showname_clean'] break if not show_id and not showname_clean: self.all_results = self.get_show_id() self.episode_details = self.get_ep_details(show_id, showname_clean) def parse_filename(self): """ parse the file name into its parts """ filename = self.filename season, episode, season_id, id_style = Static.split_file_name(filename) showname = filename.split(season_id)[0] if 'aka' in showname.lower().split(): showname = showname.lower().split('aka')[0] # build file_parsed dict file_parsed = { 'season': season, 'episode': episode, 'season_id': season_id, 'id_style': id_style, 'showname': Static.showname_encoder(showname), 'ext': os.path.splitext(filename)[1] } # return dict return file_parsed def get_show_id(self): """ return dict of matches """ showname = self.file_parsed['showname'] url = 'http://api.tvmaze.com/search/shows?q=' + showname request = Static.tvmaze_request(url) # loop through results all_results = [] for idx, result in enumerate(request): desc_raw = result['show']['summary'] # filter out basic html tags try: desc = re.sub('<[^<]+?>', '', desc_raw) except TypeError: desc = desc_raw result_dict = { 'list_id': idx, 'show_id': result['show']['id'], 'showname_clean': result['show']['name'], 'status': result['show']['status'], 'desc': desc } all_results.append(result_dict) # return all_results dict return all_results def pick_show_id(self): """ simple menu to pick matching show manually """ all_results = self.all_results filename = self.filename # more than one possibility if len(all_results) > 1: print(f'\nfilename: {filename}') # print menu for i in all_results: list_id = i['list_id'] showname_clean = i['showname_clean'] message = f'[{list_id}] {showname_clean}' print(message) print('[?] show more\n') # select select = input('select: ') # long menu with desc if select == '?': # print menu for i in all_results[:5]: list_id = i['list_id'] showname_clean = i['showname_clean'] status = i['status'] desc = i['desc'] message = (f'[{list_id}] {showname_clean},' + f'status: {status}\n{desc}\n') print(message) # select select = input('select: ') else: # only one possibility select = 0 # build string based on selected index = int(select) show_id = all_results[index]['show_id'] showname_clean = all_results[index]['showname_clean'] # return tuble return show_id, showname_clean def get_ep_details(self, show_id=None, showname_clean=None): """ build the show details dict""" if not show_id and not showname_clean: show_id, showname_clean = self.pick_show_id() season, episode, episode_name = self.get_episode_name(show_id) episode_details = { 'show_id': show_id, 'showname_clean': showname_clean, 'season': season, 'episode': episode, 'episode_name': episode_name } return episode_details def multi_parser(self, show_id): """ parse multi episode files names for get_episode_name() """ file_parsed = self.file_parsed season = file_parsed['season'] episode_list = file_parsed['episode'].split() # loop through all episodes episode_name_list = [] for episode in episode_list: url = (f'http://api.tvmaze.com/shows/{show_id}/episodebynumber?' f'season={season}&number={episode}') request = Static.tvmaze_request(url) episode_name = request['name'] episode_name_list.append(episode_name) episode = '-E'.join(episode_list) episode_name = ', '.join(episode_name_list) return season, episode, episode_name def get_episode_name(self, show_id): """ find episode based on show_id and id_style """ file_parsed = self.file_parsed id_style = file_parsed['id_style'] # multi episode filename if id_style == 'multi': # build and return tuple on multi episode season, episode, episode_name = self.multi_parser(show_id) return season, episode, episode_name # season - episode based if id_style == 'se': season = file_parsed['season'] episode = file_parsed['episode'] url = (f'http://api.tvmaze.com/shows/{show_id}/episodebynumber?' f'season={season}&number={episode}') request = Static.tvmaze_request(url) # returns a dict show_response = request # date based elif id_style == 'year': date_raw = file_parsed['season_id'] year, month, day = date_raw.split('.') url = (f'https://api.tvmaze.com/shows/{show_id}/episodesbydate?' f'date={year}-{month}-{day}') request = Static.tvmaze_request(url) # returns a list show_response = request[0] # build and return tuple season = str(show_response['season']).zfill(2) episode = str(show_response['number']).zfill(2) episode_name = show_response['name'].replace('/', '-') return season, episode, episode_name class TvHandler: """ handles the tv sort classes """ CONFIG = get_config() def __init__(self): self.pending = self.get_pending() self.discovered = [] def get_pending(self): """ return how many shows are pending """ tv_downpath = self.CONFIG['media']['tv_downpath'] pending = len(os.listdir(tv_downpath)) return pending def move_to_sort(self): """ move tv files to sortpath """ tv_downpath = self.CONFIG['media']['tv_downpath'] ext = self.CONFIG['media']['ext'] min_file_size = self.CONFIG['media']['min_file_size'] sortpath = self.CONFIG['media']['sortpath'] # walk through tv_downpath for dirpath, _, filenames in os.walk(tv_downpath): for filename in filenames: path = os.path.join(dirpath, filename) _, extension = os.path.splitext(path) extension = extension.lstrip('.').lower() f_size = os.stat(path).st_size if (extension in ext and 'sample' not in filename and f_size > min_file_size): move_to = os.path.join(sortpath, filename) os.rename(path, move_to) pending = sorted(os.listdir(sortpath)) return pending def episode_identify(self, to_rename): """ loops through the pending list """ identified = [] for filename in to_rename: episode = Episode(filename, self.discovered) # add to discovered showname = episode.file_parsed['showname'] showname_clean = episode.episode_details['showname_clean'] discovered_item = { 'showname': showname, 'showname_clean': showname_clean, 'show_id': episode.episode_details['show_id'] } self.discovered.append(discovered_item) identified.append(episode) print(filename) return identified def episode_rename(self, identified): """ make folder and rename files as identified """ sortpath = self.CONFIG['media']['sortpath'] renamed = [] for episode in identified: # build vars ext = episode.file_parsed['ext'] showname_clean = episode.episode_details['showname_clean'] season = episode.episode_details['season'] season_int = int(season) episode_id = episode.episode_details['episode'] episode_name = episode.episode_details['episode_name'] # build paths old_file = os.path.join(sortpath, episode.filename) new_folder = os.path.join(sortpath, showname_clean, f'Season {season_int}') new_file_name = (f'{showname_clean} - S{season}E{episode_id} - ' + f'{episode_name}{ext}') new_file = os.path.join(new_folder, new_file_name) # do it os.makedirs(new_folder, exist_ok=True) os.rename(old_file, new_file) # finish up renamed.append(new_file) logging.info( 'tv:from [%s] to [%s]', episode.filename, new_file_name ) return renamed def move_to_archive(self): """ moves the renamed files to the archive """ sortpath = self.CONFIG['media']['sortpath'] tvpath = self.CONFIG['media']['tvpath'] print() for dirpath, _, filenames in os.walk(sortpath): for show in sorted(filenames): print(show) input('\ncontinue?') # apply for dirpath, _, filenames in os.walk(sortpath): for show in filenames: # make folders folder_name = dirpath.lstrip(sortpath) new_folder = os.path.join(tvpath, folder_name) os.makedirs(new_folder, exist_ok=True) # move file old_file = os.path.join(sortpath, dirpath, show) new_file = os.path.join(new_folder, show) os.rename(old_file, new_file) def clean_up(self): """ clean up download and sort folder """ sortpath = self.CONFIG['media']['sortpath'] tv_downpath = self.CONFIG['media']['tv_downpath'] to_clean_list = os.listdir(sortpath) for to_clean in to_clean_list: to_trash = os.path.join(sortpath, to_clean) subprocess.call(["trash", to_trash]) to_clean_list = os.listdir(tv_downpath) for to_clean in to_clean_list: to_trash = os.path.join(tv_downpath, to_clean) subprocess.call(["trash", to_trash]) def main(): """ main function to sort tv shows """ handler = TvHandler() if not handler.pending: print('no tvshows to sort') return to_rename = handler.move_to_sort() if to_rename: identified = handler.episode_identify(to_rename) renamed = handler.episode_rename(identified) if renamed: handler.move_to_archive() print(f'renamed {len(renamed)} tv episodes') handler.clean_up()