From 4aefcdcc3d2c3a34802d58e0788df602dcdc9739 Mon Sep 17 00:00:00 2001 From: Colin Coe Date: Sat, 31 Aug 2024 21:27:43 -0600 Subject: [PATCH] Bump version to 2024.8.1 in preparation for next release (#197) * Bump version to 2024.8.1 --- pyinstaller.py | 2 +- tidal_wave/main.py | 2 +- tidal_wave/mix.py | 37 +++--- tidal_wave/playlist.py | 273 +++++++++++++++++++++-------------------- tidal_wave/track.py | 30 ++--- 5 files changed, 179 insertions(+), 165 deletions(-) diff --git a/pyinstaller.py b/pyinstaller.py index 70e47df..ea3a800 100644 --- a/pyinstaller.py +++ b/pyinstaller.py @@ -29,7 +29,7 @@ from tidal_wave.utils import is_tidal_api_reachable from tidal_wave.video import Video -__version__ = "2024.7.1" +__version__ = "2024.8.1" # https://typer.tiangolo.com/tutorial/options/version/#fix-with-is_eager diff --git a/tidal_wave/main.py b/tidal_wave/main.py index f1f53f5..ab9e201 100644 --- a/tidal_wave/main.py +++ b/tidal_wave/main.py @@ -29,7 +29,7 @@ from .utils import is_tidal_api_reachable from .video import Video -__version__ = "2024.7.1" +__version__ = "2024.8.1" # https://typer.tiangolo.com/tutorial/options/version/#fix-with-is_eager diff --git a/tidal_wave/mix.py b/tidal_wave/mix.py index 57abb23..493dc0a 100644 --- a/tidal_wave/mix.py +++ b/tidal_wave/mix.py @@ -1,24 +1,24 @@ -from dataclasses import dataclass import json import logging -from pathlib import Path import shutil import sys +from dataclasses import dataclass +from pathlib import Path from types import SimpleNamespace from typing import Dict, List, Optional, Set, Tuple, Union from uuid import uuid4 +from requests import HTTPError, Session + from .media import AudioFormat from .models import ( TracksEndpointResponseJSON, VideosEndpointResponseJSON, ) from .track import Track -from .utils import replace_illegal_characters, TIDAL_API_URL +from .utils import TIDAL_API_URL, replace_illegal_characters from .video import Video -from requests import HTTPError, Session - logger = logging.getLogger("__name__") # union type for type hinting @@ -52,7 +52,7 @@ def set_items(self, session: Session): mix_items: Optional[MixesItemsResponseJSON] = retrieve_mix_items( session=session, mix_id=self.mix_id, transparent=self.transparent ) - except TidalMixException as tme: + except TidalMixError as tme: logger.exception(tme.args[0]) self.items = tuple() else: @@ -326,7 +326,7 @@ def get_elements( self.files: List[Optional[str]] = files -class TidalMixException(Exception): +class TidalMixError(Exception): pass @@ -463,10 +463,11 @@ def mix_items_response_json_maker( try: mix_item = TracksEndpointResponseJSON.from_dict(namespace.item) except Exception as e: - logger.warning( - f"TidalPlaylistException: unable to parse playlist item [i] " + _msg: str = ( + f"TidalMixError: unable to parse playlist item [i] " f"with type '{namespace.type}'" ) + logger.warning(_msg) logger.debug(e) # value stays None else: @@ -475,18 +476,18 @@ def mix_items_response_json_maker( try: mix_item = VideosEndpointResponseJSON.from_dict(namespace.item) except Exception as e: - logger.warning( - f"TidalMixException: unable to parse mix item [i] " + _msg: str = ( + f"TidalMixError: unable to parse mix item [i] " f"with type '{namespace.type}'" ) + logger.warning(_msg) logger.debug(e) # value stays None else: mixes_items[i] = mix_item else: continue # value stays None - else: - init_args["items"] = tuple(mixes_items) + init_args["items"] = tuple(mixes_items) return MixesItemsResponseJSON(**init_args) @@ -505,11 +506,11 @@ def retrieve_mix_items( session=session, mix_id=mix_id, transparent=transparent ) if mixes_response is None: - raise TidalMixException(f"Could not retrieve the items in mix '{mix_id}'") + raise TidalMixError(f"Could not retrieve the items in mix '{mix_id}'") total_number_of_items: Optional[int] = mixes_response.get("totalNumberOfItems") if total_number_of_items is None: - raise TidalMixException( + raise TidalMixError( f"TIDAL API did not respond with number of items in mix '{mix_id}'" ) else: @@ -517,7 +518,7 @@ def retrieve_mix_items( num_items: int = len(mixes_response.get("items", [])) if num_items == 0: - raise TidalMixException( + raise TidalMixError( f"TIDAL API did not return any mix items for mix '{mix_id}'" ) @@ -537,7 +538,7 @@ def retrieve_mix_items( items_to_retrieve -= 100 else: logger.exception( - TidalMixException( + TidalMixError( f"Could not retrieve more than {len(items_list)} " f"elements of mix '{mix_id}'. Continuing " "without the remaining " @@ -554,6 +555,6 @@ def retrieve_mix_items( mix_items_response_json_maker(mixes_response=all_items_mixes_response) ) except Exception as e: - logger.exception(TidalMixException(e.args[0])) + logger.exception(TidalMixError(e.args[0])) finally: return mixes_items_response_json diff --git a/tidal_wave/playlist.py b/tidal_wave/playlist.py index a8e8c29..b010c21 100644 --- a/tidal_wave/playlist.py +++ b/tidal_wave/playlist.py @@ -1,15 +1,23 @@ +"""Retrieve playlist data from the TIDAL API.""" + from __future__ import annotations -from dataclasses import dataclass + import json import logging import math -from pathlib import Path import shutil import sys +from dataclasses import dataclass +from pathlib import Path from types import SimpleNamespace +from typing import TYPE_CHECKING + from uuid import uuid4 -from .media import AudioFormat +import ffmpeg +import mutagen +from requests import HTTPError, Session + from .models import ( PlaylistsEndpointResponseJSON, TracksEndpointResponseJSON, @@ -18,16 +26,15 @@ from .requesting import request_playlists from .track import Track from .utils import ( + TIDAL_API_URL, download_cover_image, replace_illegal_characters, temporary_file, - TIDAL_API_URL, ) from .video import Video -import ffmpeg -import mutagen -from requests import HTTPError, Session +if TYPE_CHECKING: + from .media import AudioFormat logger = logging.getLogger("__name__") @@ -41,10 +48,12 @@ def __post_init__(self): self.playlist_dir: Path | None = None self.playlist_cover_saved: bool = False - def set_metadata(self, session: Session): - """Request from TIDAL API /playlists endpoint""" + def set_metadata(self, session: Session) -> None: + """Request from TIDAL API /playlists endpoint.""" self.metadata: PlaylistsEndpointResponseJSON | None = request_playlists( - session=session, playlist_id=self.playlist_id, transparent=self.transparent + session=session, + playlist_id=self.playlist_id, + transparent=self.transparent, ) if self.metadata is None: @@ -53,28 +62,30 @@ def set_metadata(self, session: Session): self.name = replace_illegal_characters(self.metadata.title) def set_items(self, session: Session): - """Uses data from TIDAL API /playlists/items endpoint to - populate self.items. If 'totalNumberOfItems' field returned + """Populate self.items using data from TIDAL API /playlists/items endpoint. + + If 'totalNumberOfItems' field returned has value greater than 100, multiple requests of size <= 100 will be - sent to the endpoint until all items for the playlist are retrieved.""" + sent to the endpoint until all items for the playlist are retrieved. + """ playlist_items: PlaylistsItemsResponseJSON | None = retrieve_playlist_items( - session=session, playlist_id=self.playlist_id + session=session, playlist_id=self.playlist_id, ) if playlist_items is None: - self.items = tuple() + self.items = () else: - self.items: tuple[PlaylistItem | None] = tuple( - filter(None, playlist_items.items) + self.items: tuple[PlaylistItem] = tuple( + filter(None, playlist_items.items), ) def set_playlist_dir(self, out_dir: Path): - """Populates self.playlist_dir based on self.name, self.playlist_id""" + """Populate self.playlist_dir based on self.name, self.playlist_id.""" playlist_substring: str = f"{self.name} [{self.playlist_id}]" self.playlist_dir: Path = out_dir / "Playlists" / playlist_substring self.playlist_dir.mkdir(parents=True, exist_ok=True) def save_cover_image(self, session: Session, out_dir: Path): - """Requests self.metadata.image and attempts to write it to disk""" + """Request self.metadata.image and attempts to write it to disk.""" if self.playlist_dir is None: self.set_playlist_dir(out_dir=out_dir) self.cover_path: Path = self.playlist_dir / "cover.jpg" @@ -89,27 +100,33 @@ def save_cover_image(self, session: Session, out_dir: Path): self.playlist_cover_saved = True def save_description(self): - """Requests self.metadata.description and attempts to write it to disk""" + """Request self.metadata.description and attempts to write it to disk.""" self.description_path: Path = self.playlist_dir / "PlaylistDescription.txt" - if self.metadata.description is not None and len(self.metadata.description) > 0: - if not self.description_path.exists(): - self.description_path.write_text(f"{self.metadata.description}\n") + if ( + self.metadata.description is not None + and len(self.metadata.description) > 0 + and not self.description_path.exists() + ): + self.description_path.write_text(f"{self.metadata.description}\n") def get_items( - self, session: Session, audio_format: AudioFormat, no_extra_files: bool - ): + self, + session: Session, + audio_format: AudioFormat, + no_extra_files: bool, + ) -> tuple[Track | Video | None] | None: """Using either Track.get() or Video.get(), attempt to request the data for each track or video in self.items. If no_extra_files is True, do not attempt to retrieve or save any of: playlist description text, playlist m3u8 text, playlist cover image.""" if len(self.items) == 0: - return + return None tracks_videos: list = [None] * len(self.items) for i, item in enumerate(self.items): if item is None: tracks_videos[i] = None continue - elif isinstance(item, TracksEndpointResponseJSON): + if isinstance(item, TracksEndpointResponseJSON): track: Track = Track(track_id=item.id, transparent=self.transparent) track.get( session=session, @@ -131,10 +148,7 @@ def get_items( else: tracks_videos[i] = None continue - else: - self.tracks_videos: tuple[Track | Video | None] = tuple( - tracks_videos - ) + self.tracks_videos: tuple[Track | Video | None] = tuple(tracks_videos) return tracks_videos def flatten_playlist_dir(self): @@ -146,13 +160,13 @@ def flatten_playlist_dir(self): subdirectories created""" files: list[dict[int, str | None]] = [None] * len(self.tracks_videos) if len(self.tracks_videos) == 0: - return + return None subdirs: set[Path] = set() for i, tv in enumerate(self.tracks_videos, 1): - if getattr(tv, "outfile") is None: + if tv.outfile is None: try: - getattr(tv, "album_dir") + _ = tv.album_dir except AttributeError: pass else: @@ -187,14 +201,13 @@ def flatten_playlist_dir(self): new_path.write_bytes(_path.read_bytes()) _path.unlink() files[i - 1] = {i: str(new_path.absolute())} - else: - self.files: list[dict[int, str | None]] = files + self.files: list[dict[int, str | None]] = files # Find all subdirectories written to for tv in self.tracks_videos: if isinstance(tv, Track): try: - getattr(tv, "album_dir") + _ = tv.album_dir except AttributeError: pass else: @@ -211,32 +224,29 @@ def flatten_playlist_dir(self): if p.name == "cover.jpg": continue artist_images.add(p) - else: - for artist_image_path in artist_images: - if artist_image_path.exists(): - shutil.copyfile( - artist_image_path.absolute(), - self.playlist_dir / artist_image_path.name, - ) + for artist_image_path in artist_images: + if artist_image_path.exists(): + shutil.copyfile( + artist_image_path.absolute(), + self.playlist_dir / artist_image_path.name, + ) artist_bios: set[Path] = set() for subdir in subdirs: for p in subdir.glob("*bio.json"): artist_bios.add(p) - else: - for artist_bio_path in artist_bios: - if artist_bio_path.exists(): - shutil.copyfile( - artist_bio_path.absolute(), - self.playlist_dir / artist_bio_path.name, - ) + for artist_bio_path in artist_bios: + if artist_bio_path.exists(): + shutil.copyfile( + artist_bio_path.absolute(), + self.playlist_dir / artist_bio_path.name, + ) # Remove all subdirs for subdir in subdirs: if subdir.exists(): shutil.rmtree(subdir) - else: - return self.playlist_dir + return self.playlist_dir def craft_m3u8_text(self): """This method creates a file called playlist.m3u8 in self.playlist_dir @@ -246,14 +256,15 @@ def craft_m3u8_text(self): temporary directory because .m4a files cannot be read with mutagen.""" m3u_text: str = f"#EXTM3U\n#EXTENC:UTF-8\n#EXTIMG:{str(self.cover_path.absolute())}\n#PLAYLIST:{self.name}\n" - logger.info( + _msg: str = ( f"Creating .m3u8 playlist file for Playlist with ID '{self.playlist_id}'" ) + logger.info(_msg) for d in self.files: file: str = next(iter(d.values())) if file is None: continue - elif file.endswith(".flac"): + if file.endswith(".flac"): m = mutagen.File(file) artist: str = m.get("artist", [""])[0] title: str = m.get("title", [""])[0] @@ -299,10 +310,9 @@ def craft_m3u8_text(self): f"{artist} - {title}\n{file}\n" ) m3u_text += extinf - else: - return m3u_text + return m3u_text - def dumps(self): + def dumps(self) -> str: """This method emulates the stdlib json.dumps(). In particular, it returns the JSON-formatted string of the self.files attribute, which is an array of objects with one key each: the index in the @@ -325,8 +335,9 @@ def get( out_dir: Path, no_extra_files: bool, ): - """The main method of this class, executing a number of other methods - in a row: + """Execute a number of other methods in a row. + + The methods are: - self.set_metadata() - self.set_items() - self.set_playlist_dir() @@ -347,40 +358,45 @@ def get( self.set_playlist_dir(out_dir) if self.get_items(session, audio_format, no_extra_files) is None: - logger.critical(f"Could not retrieve playlist with ID '{self.playlist_id}'") + _msg: str = f"Could not retrieve playlist with ID '{self.playlist_id}'" + logger.critical(_msg) self.files = {} return self.flatten_playlist_dir() - logger.info(f"Playlist files written to '{self.playlist_dir}'") + _msg: str = f"Playlist files written to '{self.playlist_dir}'" + logger.info(_msg) if not no_extra_files: try: self.save_description() except Exception: - pass + logger.exception() else: - logger.info( + _msg: str = ( "Playlist description written to " f"{self.playlist_dir / 'PlaylistDescription.txt'}" ) + logger.info(_msg) self.save_cover_image(session, out_dir) try: m3u8_text: str = self.craft_m3u8_text() except Exception as e: - logger.warning( + _msg: str = ( "Unable to create playlist.m3u8 file for " f"playlist with ID '{self.playlist_id}'" ) + logger.warning(_msg) logger.debug(e) else: (self.playlist_dir / "playlist.m3u8").write_text(m3u8_text) - logger.info( + _msg: str = ( "Playlist M3U file written to " f"{self.playlist_dir / 'playlist.m3u8'}" ) + logger.info(_msg) def get_elements( self, @@ -388,10 +404,11 @@ def get_elements( audio_format: AudioFormat, out_dir: Path, no_extra_files: bool, - ): - """The main method of this class when no_flatten is True at - the program top level. It executes a number of other methods - in a row: + ) -> None: + """Execute a number of other methods in a row. + + The main method of this class when no_flatten is True at + the program top level. The methods executed are: - self.set_metadata() - self.set_items() """ @@ -405,14 +422,13 @@ def get_elements( if len(self.items) == 0: self.files = {} return - else: - files: list[dict[int, str | None]] = [None] * len(self.items) + files: list[dict[int, str | None]] = [None] * len(self.items) for i, item in enumerate(self.items): if item is None: files[i] = {i: None} continue - elif isinstance(item, TracksEndpointResponseJSON): + if isinstance(item, TracksEndpointResponseJSON): track: Track = Track(track_id=item.id, transparent=self.transparent) track_file: str | None = track.get( session=session, @@ -434,12 +450,11 @@ def get_elements( else: files[i] = {i: None} continue - else: - self.files: list[dict[int, str | None]] = files + self.files: list[dict[int, str | None]] = files -class TidalPlaylistException(Exception): - pass +class TidalPlaylistError(Exception): + """Catch-all custom exception for retrieval of playlist data from TIDAL API.""" def request_playlists_items( @@ -462,51 +477,47 @@ def request_playlists_items( json_name: str = f"playlists-{playlist_id}-items_{uuid4().hex}.json" data: dict | None = None - logger.info(f"Requesting from TIDAL API: playlists/{playlist_id}/items") - with session.get(**kwargs) as resp: + _msg: str = f"Requesting from TIDAL API: playlists/{playlist_id}/items" + logger.info(_msg) + with session.get(**kwargs) as r: try: - resp.raise_for_status() - except HTTPError as he: - if resp.status_code == 404: - logger.warning( - f"404 Client Error: not found for TIDAL API endpoint playlists/{playlist_id}/items" + r.raise_for_status() + except HTTPError: + if r.status_code == 404: + _msg: str = ( + "404 Client Error: not found for TIDAL API endpoint " + f"playlists/{playlist_id}/items" ) + logger.warning(_msg) else: - logger.exception(he) - else: - if transparent: - Path(json_name).write_text( - json.dumps(resp.json(), ensure_ascii=True, indent=4, sort_keys=True) - ) - data = resp.json() - logger.debug( - f"{resp.status_code} response from TIDAL API to request: playlists/{playlist_id}/items" - ) - else: - data = resp.json() - logger.debug( - f"{resp.status_code} response from TIDAL API to request: playlists/{playlist_id}/items" - ) - finally: - return data + logger.exception() + + _msg: str = ( + f"{r.status_code} response from TIDAL API to request: " + f"playlists/{playlist_id}/items" + ) + if transparent: + Path(json_name).write_text( + json.dumps(r.json(), ensure_ascii=True, indent=4, sort_keys=True), + ) + data = r.json() + logger.debug(_msg) + return data @dataclass(frozen=True) class PlaylistsItemsResponseJSON: - """The response from the TIDAL API endpoint /playlists//items - is modeled by this class.""" + """Represent the response from the TIDAL API endpoint /playlists//items.""" limit: int offset: int total_number_of_items: int - items: tuple[ - TracksEndpointResponseJSON | VideosEndpointResponseJSON | None - ] + items: tuple[TracksEndpointResponseJSON | VideosEndpointResponseJSON | None] def playlists_items_response_json_maker( playlists_response: dict[str, int | list[dict]], -) -> "PlaylistsItemsResponseJSON": +) -> PlaylistsItemsResponseJSON | None: """This function massages the response from the TIDAL API endpoint /playlists/items into a format that PlaylistsItemsResponseJSON.__init__() can ingest, and then returns a PlaylistsItemsResponseJSON instance""" @@ -520,7 +531,7 @@ def playlists_items_response_json_maker( SimpleNamespace(**d) for d in playlists_response["items"] ) if len(items) == 0: - return + return None playlist_items: list[ TracksEndpointResponseJSON | VideosEndpointResponseJSON | None @@ -531,10 +542,11 @@ def playlists_items_response_json_maker( try: playlist_item = TracksEndpointResponseJSON.from_dict(namespace.item) except Exception as e: - logger.warning( - f"TidalPlaylistException: unable to parse playlist item {i} " + _msg: str = ( + f"TidalPlaylistError: unable to parse playlist item {i} " f"with type '{namespace.type}'" ) + logger.warning(_msg) logger.debug(e) # value stays None else: @@ -543,18 +555,18 @@ def playlists_items_response_json_maker( try: playlist_item = VideosEndpointResponseJSON.from_dict(namespace.item) except Exception as e: - logger.warning( - f"TidalPlaylistException: unable to parse playlist item {i} " + _msg: str = ( + f"TidalPlaylistError: unable to parse playlist item {i} " f"with type '{namespace.type}'" ) + logger.warning(_msg) logger.debug(e) # value stays None else: playlist_items[i] = playlist_item else: continue # value stays None - else: - init_args["items"] = tuple(playlist_items) + init_args["items"] = tuple(playlist_items) return PlaylistsItemsResponseJSON(**init_args) @@ -574,20 +586,21 @@ def retrieve_playlist_items( session=session, playlist_id=playlist_id, transparent=transparent ) if playlists_response is None: - raise TidalPlaylistException( - f"Could not retrieve the items in playlist '{playlist_id}'" - ) + _msg: str = f"Could not retrieve the items in playlist '{playlist_id}'" + raise TidalPlaylistError(_msg) total_number_of_items: int | None = playlists_response.get("totalNumberOfItems") - logger.info( + _msg: str = ( f"Playlist '{playlist_id}' is comprised of {total_number_of_items} items" ) + logger.info(_msg) if total_number_of_items is None: - raise TidalPlaylistException( - f"TIDAL API did not respond with number of items in playlist '{playlist_id}'" + _msg: str = ( + "TIDAL API did not respond with number of items " + f"in playlist '{playlist_id}'" ) - else: - items_to_retrieve: int = total_number_of_items + raise TidalPlaylistError(_msg) + items_to_retrieve: int = total_number_of_items all_items_playlist_response: dict = playlists_response @@ -605,26 +618,24 @@ def retrieve_playlist_items( items_to_retrieve -= 100 else: logger.exception( - TidalPlaylistException( + TidalPlaylistError( f"Could not retrieve more than {len(items_list)} " f"elements of playlist '{playlist_id}'. Continuing " "without the remaining " - f"{total_number_of_items - len(items_list)}" - ) + f"{total_number_of_items - len(items_list)}", + ), ) - else: - all_items_playlist_response["items"] = items_list + all_items_playlist_response["items"] = items_list try: playlists_items_response_json: PlaylistsItemsResponseJSON | None = ( playlists_items_response_json_maker( - playlists_response=all_items_playlist_response + playlists_response=all_items_playlist_response, ) ) except Exception as e: - logger.exception(TidalPlaylistException(e.args[0])) - finally: - return playlists_items_response_json + logger.exception(TidalPlaylistError(e.args[0])) + return playlists_items_response_json # union type for type hinting diff --git a/tidal_wave/track.py b/tidal_wave/track.py index bc46db1..eb18b26 100644 --- a/tidal_wave/track.py +++ b/tidal_wave/track.py @@ -1,22 +1,29 @@ -from dataclasses import dataclass import json import logging -from pathlib import Path import re import shlex import shutil import subprocess import sys +from dataclasses import dataclass +from pathlib import Path from typing import Dict, Iterable, List, Optional, Union +import ffmpeg +import mutagen +from Crypto.Cipher import AES +from Crypto.Util import Counter +from mutagen.mp4 import MP4Cover +from requests import RequestException, Session + from .dash import ( - manifester, JSONDASHManifest, Manifest, TidalManifestError, XMLDASHManifest, + manifester, ) -from .media import AudioFormat, TAG_MAPPING +from .media import TAG_MAPPING, AudioFormat from .models import ( AlbumsEndpointResponseJSON, ArtistsBioResponseJSON, @@ -36,14 +43,7 @@ request_stream, request_tracks, ) -from .utils import download_cover_image, temporary_file, IMAGE_URL - -import ffmpeg -import mutagen -from mutagen.mp4 import MP4Cover -from requests import RequestException, Session -from Crypto.Cipher import AES -from Crypto.Util import Counter +from .utils import IMAGE_URL, download_cover_image, temporary_file logger = logging.getLogger("__name__") @@ -153,10 +153,12 @@ def set_album_dir(self, out_dir: Path): out_dir. In particular, self.album_dir is a subdirectory of out_dir based on the name of the album's artist""" artist_substring: str = self.album.artist.name.replace("..", "").replace( - "/", "and" + "/", + "and", ) album_substring: str = ( - f"{self.album.name} " f"[{self.album.id}] [{self.album.release_date.year}]" + f"{self.album.name} " + f"[{self.album.id}] [{self.album.release_date.year}]" ) self.album_dir: Path = out_dir / artist_substring / album_substring self.album_dir.mkdir(parents=True, exist_ok=True)