[REFACTOR] Split metadata and download into two separate steps (#540)

This commit is contained in:
Jesse Bannon
2023-03-13 22:41:09 -07:00
committed by GitHub
parent 1922296c31
commit e69933d525
12 changed files with 610 additions and 468 deletions

View File

@@ -1,17 +1,12 @@
import abc
import contextlib
import copy
import json
import os
import time
from abc import ABC
from contextlib import contextmanager
from pathlib import Path
from typing import Callable
from typing import Dict
from typing import Generator
from typing import Generic
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Set
@@ -19,19 +14,16 @@ from typing import Tuple
from typing import Type
from typing import TypeVar
import yt_dlp as ytdl
from yt_dlp.utils import ExistingVideoReached
from yt_dlp.utils import MaxDownloadsReached
from yt_dlp.utils import RejectedVideoReached
from ytdl_sub.config.preset_options import AddsVariablesMixin
from ytdl_sub.config.preset_options import Overrides
from ytdl_sub.downloaders.generic.validators import MultiUrlValidator
from ytdl_sub.downloaders.generic.validators import UrlThumbnailListValidator
from ytdl_sub.downloaders.generic.validators import UrlValidator
from ytdl_sub.downloaders.ytdl_options_builder import YTDLOptionsBuilder
from ytdl_sub.downloaders.ytdlp import YTDLP
from ytdl_sub.entries.entry import Entry
from ytdl_sub.entries.entry_parent import EntryParent
from ytdl_sub.entries.variables.kwargs import COLLECTION_URL
from ytdl_sub.entries.variables.kwargs import COMMENTS
from ytdl_sub.entries.variables.kwargs import DOWNLOAD_INDEX
from ytdl_sub.entries.variables.kwargs import PLAYLIST_ENTRY
@@ -39,14 +31,13 @@ from ytdl_sub.entries.variables.kwargs import REQUESTED_SUBTITLES
from ytdl_sub.entries.variables.kwargs import SOURCE_ENTRY
from ytdl_sub.entries.variables.kwargs import SPONSORBLOCK_CHAPTERS
from ytdl_sub.entries.variables.kwargs import UPLOAD_DATE_INDEX
from ytdl_sub.thread.log_entries_downloaded_listener import LogEntriesDownloadedListener
from ytdl_sub.utils.exceptions import FileNotDownloadedException
from ytdl_sub.plugins.plugin import Plugin
from ytdl_sub.plugins.plugin import PluginOptions
from ytdl_sub.utils.file_handler import FileHandler
from ytdl_sub.utils.file_handler import FileMetadata
from ytdl_sub.utils.logger import Logger
from ytdl_sub.utils.thumbnail import ThumbnailTypes
from ytdl_sub.utils.thumbnail import convert_download_thumbnail
from ytdl_sub.utils.thumbnail import convert_url_thumbnail
from ytdl_sub.utils.thumbnail import download_and_convert_url_thumbnail
from ytdl_sub.validators.strict_dict_validator import StrictDictValidator
from ytdl_sub.ytdl_additions.enhanced_download_archive import DownloadArchiver
from ytdl_sub.ytdl_additions.enhanced_download_archive import EnhancedDownloadArchive
@@ -97,7 +88,25 @@ class URLDownloadState:
def __init__(self, entries_total: int):
self.entries_total = entries_total
self.entries_downloaded = 0
self.thumbnails_downloaded: Set[str] = set()
class EmptyPluginOptions(PluginOptions):
_optional_keys = {"no-op"}
class BaseDownloaderPlugin(Plugin[EmptyPluginOptions], ABC):
def __init__(
self,
overrides: Overrides,
enhanced_download_archive: EnhancedDownloadArchive,
):
super().__init__(
# Downloader plugins do not have exposed YAML options, so keep it blank.
# Use init instead.
plugin_options=EmptyPluginOptions(name=self.__class__.__name__, value={}),
overrides=overrides,
enhanced_download_archive=enhanced_download_archive,
)
class BaseDownloader(DownloadArchiver, Generic[DownloaderOptionsT], ABC):
@@ -118,9 +127,148 @@ class BaseDownloader(DownloadArchiver, Generic[DownloaderOptionsT], ABC):
self._metadata_ytdl_options_builder = metadata_ytdl_options
@abc.abstractmethod
def download(self) -> Iterable[Entry] | Iterable[Tuple[Entry, FileMetadata]]:
def download_metadata(self) -> Iterable[Entry]:
"""Gathers metadata of all entries to download"""
@abc.abstractmethod
def download(self, entry: Entry) -> Entry:
"""The function to perform the download of all media entries"""
# pylint: disable=no-self-use
def added_plugins(self) -> List[BaseDownloaderPlugin]:
"""Add these plugins from the Downloader to the subscription"""
return []
# pylint: enable=no-self-use
class YtDlpThumbnailPlugin(BaseDownloaderPlugin):
def __init__(
self,
overrides: Overrides,
enhanced_download_archive: EnhancedDownloadArchive,
collection_urls: List[UrlValidator],
):
super().__init__(
overrides=overrides,
enhanced_download_archive=enhanced_download_archive,
)
self._thumbnails_downloaded: Set[str] = set()
self._collection_url_mapping: Dict[str, UrlValidator] = {
self.overrides.apply_formatter(collection_url.url): collection_url
for collection_url in collection_urls
}
def _download_parent_thumbnails(
self,
thumbnail_list_info: UrlThumbnailListValidator,
entry: Entry,
parent: EntryParent,
) -> None:
"""
Downloads and moves channel avatar and banner images to the output directory.
"""
for thumbnail_info in thumbnail_list_info.list:
thumbnail_name = self.overrides.apply_formatter(thumbnail_info.name, entry=entry)
thumbnail_id = self.overrides.apply_formatter(thumbnail_info.uid)
# If latest entry, always update the thumbnail on each entry
if thumbnail_id == ThumbnailTypes.LATEST_ENTRY:
# Make sure the entry's thumbnail is converted to jpg
convert_download_thumbnail(entry, error_if_not_found=False)
# always save in dry-run even if it doesn't exist...
if self.is_dry_run or os.path.isfile(entry.get_download_thumbnail_path()):
self.save_file(
file_name=entry.get_download_thumbnail_name(),
output_file_name=thumbnail_name,
copy_file=True,
)
self._thumbnails_downloaded.add(thumbnail_name)
continue
# If not latest entry and the thumbnail has already been downloaded, then skip
if thumbnail_name in self._thumbnails_downloaded:
continue
if (thumbnail_url := parent.get_thumbnail_url(thumbnail_id=thumbnail_id)) is None:
download_logger.debug("Failed to find thumbnail id '%s'", thumbnail_id)
continue
if download_and_convert_url_thumbnail(
thumbnail_url=thumbnail_url,
output_thumbnail_path=str(Path(self.working_directory) / thumbnail_name),
):
self.save_file(file_name=thumbnail_name)
self._thumbnails_downloaded.add(thumbnail_name)
else:
download_logger.debug("Failed to download thumbnail id '%s'", thumbnail_id)
def _download_url_thumbnails(self, collection_url: UrlValidator, entry: Entry):
"""
After all media entries have been downloaded, post processed, and moved to the output
directory, run this function. This lets the downloader add any extra files directly to the
output directory, for things like YT channel image, banner.
"""
if entry.kwargs_contains(PLAYLIST_ENTRY):
self._download_parent_thumbnails(
thumbnail_list_info=collection_url.playlist_thumbnails,
entry=entry,
parent=EntryParent(
entry.kwargs(PLAYLIST_ENTRY), working_directory=self.working_directory
),
)
if entry.kwargs_contains(SOURCE_ENTRY):
self._download_parent_thumbnails(
thumbnail_list_info=collection_url.source_thumbnails,
entry=entry,
parent=EntryParent(
entry.kwargs(SOURCE_ENTRY), working_directory=self.working_directory
),
)
def modify_entry(self, entry: Entry) -> Optional[Entry]:
"""
Use the entry to download thumbnails (or move if LATEST_ENTRY)
"""
if entry.kwargs(COLLECTION_URL) in self._collection_url_mapping:
self._download_url_thumbnails(
collection_url=self._collection_url_mapping[entry.kwargs(COLLECTION_URL)],
entry=entry,
)
return entry
class YtDlpCollectionVariablePlugin(BaseDownloaderPlugin):
def __init__(
self,
overrides: Overrides,
enhanced_download_archive: EnhancedDownloadArchive,
collection_urls: List[UrlValidator],
):
super().__init__(
overrides=overrides,
enhanced_download_archive=enhanced_download_archive,
)
self._thumbnails_downloaded: Set[str] = set()
self._collection_url_mapping: Dict[str, UrlValidator] = {
self.overrides.apply_formatter(collection_url.url): collection_url
for collection_url in collection_urls
}
def modify_entry_metadata(self, entry: Entry) -> Optional[Entry]:
"""
Add collection variables to the entry
"""
collection_url: Optional[UrlValidator] = self._collection_url_mapping.get(
entry.kwargs(COLLECTION_URL)
)
if collection_url:
entry.add_variables(variables_to_add=collection_url.variables.dict_with_format_strings)
return entry
class YtDlpDownloader(BaseDownloader[DownloaderOptionsT], ABC):
"""
@@ -128,8 +276,24 @@ class YtDlpDownloader(BaseDownloader[DownloaderOptionsT], ABC):
and should translate that to list of Entry objects.
"""
_extract_entry_num_retries: int = 5
_extract_entry_retry_wait_sec: int = 5
def added_plugins(self) -> List[Plugin]:
"""
Adds
1. URL thumbnail download plugin
2. Collection variable plugin to add to each entry
"""
return [
YtDlpThumbnailPlugin(
overrides=self.overrides,
enhanced_download_archive=self._enhanced_download_archive,
collection_urls=self.collection.urls.list,
),
YtDlpCollectionVariablePlugin(
overrides=self.overrides,
enhanced_download_archive=self._enhanced_download_archive,
collection_urls=self.collection.urls.list,
),
]
@classmethod
def ytdl_option_defaults(cls) -> Dict:
@@ -199,18 +363,6 @@ class YtDlpDownloader(BaseDownloader[DownloaderOptionsT], ABC):
.to_dict()
)
@classmethod
@contextmanager
def ytdl_downloader(cls, ytdl_options_overrides: Dict) -> ytdl.YoutubeDL:
"""
Context manager to interact with yt_dlp.
"""
download_logger.debug("ytdl_options: %s", str(ytdl_options_overrides))
with Logger.handle_external_logs(name="yt-dlp"):
# Deep copy ytdl_options in case yt-dlp modifies the dict
with ytdl.YoutubeDL(copy.deepcopy(ytdl_options_overrides)) as ytdl_downloader:
yield ytdl_downloader
@property
def is_dry_run(self) -> bool:
"""
@@ -229,177 +381,6 @@ class YtDlpDownloader(BaseDownloader[DownloaderOptionsT], ABC):
"""
return self.download_ytdl_options.get("writethumbnail", False)
def extract_info(self, ytdl_options_overrides: Dict, **kwargs) -> Dict:
"""
Wrapper around yt_dlp.YoutubeDL.YoutubeDL.extract_info
All kwargs will passed to the extract_info function.
Parameters
----------
ytdl_options_overrides
Optional. Dict containing ytdl args to override other predefined ytdl args
**kwargs
arguments passed directory to YoutubeDL extract_info
"""
with self.ytdl_downloader(ytdl_options_overrides) as ytdl_downloader:
return ytdl_downloader.extract_info(**kwargs)
def extract_info_with_retry(
self,
ytdl_options_overrides: Dict,
is_downloaded_fn: Optional[Callable[[], bool]] = None,
is_thumbnail_downloaded_fn: Optional[Callable[[], bool]] = None,
**kwargs,
) -> Dict:
"""
Wrapper around yt_dlp.YoutubeDL.YoutubeDL.extract_info
All kwargs will passed to the extract_info function.
This should be used when downloading a single entry. Checks if the entry's video
and thumbnail files exist - retry if they do not.
Parameters
----------
ytdl_options_overrides
Dict containing ytdl args to override other predefined ytdl args
is_downloaded_fn
Optional. Function to check if the entry is downloaded
is_thumbnail_downloaded_fn
Optional. Function to check if the entry thumbnail is downloaded
**kwargs
arguments passed directory to YoutubeDL extract_info
Raises
------
FileNotDownloadedException
If the entry fails to download
"""
num_tries = 0
entry_files_exist = False
copied_ytdl_options_overrides = copy.deepcopy(ytdl_options_overrides)
while not entry_files_exist and num_tries < self._extract_entry_num_retries:
entry_dict = self.extract_info(
ytdl_options_overrides=copied_ytdl_options_overrides, **kwargs
)
is_downloaded = is_downloaded_fn is None or is_downloaded_fn()
is_thumbnail_downloaded = (
is_thumbnail_downloaded_fn is None or is_thumbnail_downloaded_fn()
)
if is_downloaded and is_thumbnail_downloaded:
return entry_dict
# If the video file is downloaded but the thumbnail is not, then do not download
# the video again
if is_downloaded and not is_thumbnail_downloaded:
copied_ytdl_options_overrides["skip_download"] = True
copied_ytdl_options_overrides["writethumbnail"] = True
time.sleep(self._extract_entry_retry_wait_sec)
num_tries += 1
# Remove the download archive so it can retry without thinking its already downloaded,
# even though it is not
if "download_archive" in copied_ytdl_options_overrides:
del copied_ytdl_options_overrides["download_archive"]
if num_tries < self._extract_entry_retry_wait_sec:
download_logger.debug(
"Failed to download entry. Retrying %d / %d",
num_tries,
self._extract_entry_num_retries,
)
error_dict = {"ytdl_options": ytdl_options_overrides, "kwargs": kwargs}
raise FileNotDownloadedException(
f"yt-dlp failed to download an entry with these arguments: {error_dict}"
)
def _get_entry_dicts_from_info_json_files(self) -> List[Dict]:
"""
Returns
-------
List of all info.json files read as JSON dicts
"""
entry_dicts: List[Dict] = []
info_json_paths = [
Path(self.working_directory) / file_name
for file_name in os.listdir(self.working_directory)
if file_name.endswith(".info.json")
]
for info_json_path in info_json_paths:
with open(info_json_path, "r", encoding="utf-8") as file:
entry_dicts.append(json.load(file))
return entry_dicts
@contextlib.contextmanager
def _listen_and_log_downloaded_info_json(self, log_prefix: Optional[str]):
"""
Context manager that starts a separate thread that listens for new .info.json files,
prints their titles as they appear
"""
if not log_prefix:
yield
return
info_json_listener = LogEntriesDownloadedListener(
working_directory=self.working_directory,
log_prefix=log_prefix,
)
info_json_listener.start()
try:
yield
finally:
info_json_listener.complete = True
def extract_info_via_info_json(
self,
ytdl_options_overrides: Dict,
log_prefix_on_info_json_dl: Optional[str] = None,
**kwargs,
) -> List[Dict]:
"""
Wrapper around yt_dlp.YoutubeDL.YoutubeDL.extract_info with infojson enabled. Entry dicts
are extracted via reading all info.json files in the working directory rather than
from the output of extract_info.
This allows us to catch RejectedVideoReached and ExistingVideoReached exceptions, and
simply ignore while still being able to read downloaded entry metadata.
Parameters
----------
ytdl_options_overrides
Dict containing ytdl args to override other predefined ytdl args
log_prefix_on_info_json_dl
Optional. Spin a new thread to listen for new info.json files. Log
f'{log_prefix_on_info_json_dl} {title}' when a new one appears
**kwargs
arguments passed directory to YoutubeDL extract_info
"""
try:
with self._listen_and_log_downloaded_info_json(log_prefix=log_prefix_on_info_json_dl):
_ = self.extract_info(ytdl_options_overrides=ytdl_options_overrides, **kwargs)
except RejectedVideoReached:
download_logger.debug(
"RejectedVideoReached, stopping additional downloads "
"(Can be disable by setting `ytdl_options.break_on_reject` to False)."
)
except ExistingVideoReached:
download_logger.debug(
"ExistingVideoReached, stopping additional downloads. "
"(Can be disable by setting `ytdl_options.break_on_existing` to False)."
)
except MaxDownloadsReached:
download_logger.info("MaxDownloadsReached, stopping additional downloads.")
return self._get_entry_dicts_from_info_json_files()
###############################################################################################
# DOWNLOAD FUNCTIONS
@@ -459,17 +440,140 @@ class YtDlpDownloader(BaseDownloader[DownloaderOptionsT], ABC):
FileHandler.delete(info_json_file)
def _extract_entry_info_with_retry(self, entry: Entry) -> Entry:
download_entry_dict = self.extract_info_with_retry(
download_entry_dict = YTDLP.extract_info_with_retry(
ytdl_options_overrides=self.download_ytdl_options,
is_downloaded_fn=None if self.is_dry_run else entry.is_downloaded,
is_thumbnail_downloaded_fn=None
if (self.is_dry_run or not self.is_entry_thumbnails_enabled)
else entry.is_thumbnail_downloaded,
url=entry.webpage_url,
ytdl_options_overrides=self.download_ytdl_options,
)
return Entry(download_entry_dict, working_directory=self.working_directory)
def _download_entry(self, entry: Entry) -> Entry:
def _iterate_child_entries(
self, url_validator: UrlValidator, entries: List[Entry]
) -> Iterator[Entry]:
entries_to_iterate = entries
if url_validator.download_reverse:
entries_to_iterate = reversed(entries)
for entry in entries_to_iterate:
self._url_state.entries_downloaded += 1
if self._is_downloaded(entry):
download_logger.info(
"Already downloaded entry %d/%d: %s",
self._url_state.entries_downloaded,
self._url_state.entries_total,
entry.title,
)
continue
yield entry
self._mark_downloaded(entry)
def _iterate_parent_entry(
self, url_validator: UrlValidator, parent: EntryParent
) -> Iterator[Entry]:
for entry_child in self._iterate_child_entries(
url_validator=url_validator, entries=parent.entry_children()
):
yield entry_child
# Recursion the parent's parent entries
for parent_child in reversed(parent.parent_children()):
for entry_child in self._iterate_parent_entry(
url_validator=url_validator, parent=parent_child
):
yield entry_child
def _download_url_metadata(
self, collection_url: UrlValidator
) -> Tuple[List[EntryParent], List[Entry]]:
"""
Downloads only info.json files and forms EntryParent trees
"""
url = self.overrides.apply_formatter(collection_url.url)
with self._separate_download_archives():
entry_dicts = YTDLP.extract_info_via_info_json(
working_directory=self.working_directory,
ytdl_options_overrides=self.metadata_ytdl_options,
log_prefix_on_info_json_dl="Downloading metadata for",
url=url,
)
parents = EntryParent.from_entry_dicts(
url=url,
entry_dicts=entry_dicts,
working_directory=self.working_directory,
)
orphans = EntryParent.from_entry_dicts_with_no_parents(
parents=parents, entry_dicts=entry_dicts, working_directory=self.working_directory
)
return parents, orphans
def _iterate_entries(
self,
url_validator: UrlValidator,
parents: List[EntryParent],
orphans: List[Entry],
) -> Iterator[Entry]:
"""
Downloads the leaf entries from EntryParent trees
"""
# Delete info json files afterwards so other collection URLs do not use them
with self._separate_download_archives(clear_info_json_files=True):
for parent in parents:
for entry_child in self._iterate_parent_entry(
url_validator=url_validator, parent=parent
):
yield entry_child
for orphan in self._iterate_child_entries(url_validator=url_validator, entries=orphans):
yield orphan
def download_metadata(self) -> Iterable[Entry]:
"""The function to perform the download of all media entries"""
# download the bottom-most urls first since they are top-priority
for collection_url in reversed(self.collection.urls.list):
parents, orphan_entries = self._download_url_metadata(collection_url=collection_url)
# TODO: Encapsulate this logic into its own class
self._url_state = URLDownloadState(
entries_total=sum(parent.num_children() for parent in parents) + len(orphan_entries)
)
download_logger.info(
"Beginning downloads for %s", self.overrides.apply_formatter(collection_url.url)
)
for entry in self._iterate_entries(
url_validator=collection_url, parents=parents, orphans=orphan_entries
):
# Add the collection URL to the info_dict to trace where it came from
entry.add_kwargs(
{COLLECTION_URL: self.overrides.apply_formatter(collection_url.url)}
)
yield entry
def download(self, entry: Entry) -> Entry:
"""
Parameters
----------
entry
Entry to download
Returns
-------
The entry that was downloaded successfully
"""
download_logger.info(
"Downloading entry %d/%d: %s",
self._url_state.entries_downloaded,
self._url_state.entries_total,
entry.title,
)
download_entry = self._extract_entry_info_with_retry(entry=entry)
upload_date_idx = self._enhanced_download_archive.mapping.get_num_entries_with_upload_date(
@@ -492,228 +596,3 @@ class YtDlpDownloader(BaseDownloader[DownloaderOptionsT], ABC):
)
return entry
def _download_entries(
self, url_validator: UrlValidator, entries: List[Entry]
) -> Generator[Entry, None, None]:
entries_to_iterate = entries
if url_validator.download_reverse:
entries_to_iterate = reversed(entries)
for entry in entries_to_iterate:
self._url_state.entries_downloaded += 1
if self._is_downloaded(entry):
download_logger.info(
"Already downloaded entry %d/%d: %s",
self._url_state.entries_downloaded,
self._url_state.entries_total,
entry.title,
)
continue
download_logger.info(
"Downloading entry %d/%d: %s",
self._url_state.entries_downloaded,
self._url_state.entries_total,
entry.title,
)
yield self._download_entry(entry)
self._mark_downloaded(entry)
def _download_parent_entry(
self, url_validator: UrlValidator, parent: EntryParent
) -> Generator[Entry, None, None]:
for entry_child in self._download_entries(
url_validator=url_validator, entries=parent.entry_children()
):
yield entry_child
# Recursion the parent's parent entries
for parent_child in reversed(parent.parent_children()):
for entry_child in self._download_parent_entry(
url_validator=url_validator, parent=parent_child
):
yield entry_child
def _set_collection_variables(self, collection_url: UrlValidator, entry: Entry | EntryParent):
if isinstance(entry, EntryParent):
for child in entry.parent_children():
self._set_collection_variables(collection_url, child)
for child in entry.entry_children():
child.add_variables(
variables_to_add=collection_url.variables.dict_with_format_strings
)
elif isinstance(entry, Entry):
entry.add_variables(variables_to_add=collection_url.variables.dict_with_format_strings)
def _download_url_metadata(
self, collection_url: UrlValidator
) -> Tuple[List[EntryParent], List[Entry]]:
"""
Downloads only info.json files and forms EntryParent trees
"""
url = self.overrides.apply_formatter(collection_url.url)
with self._separate_download_archives():
entry_dicts = self.extract_info_via_info_json(
ytdl_options_overrides=self.metadata_ytdl_options,
url=url,
log_prefix_on_info_json_dl="Downloading metadata for",
)
parents = EntryParent.from_entry_dicts(
url=url,
entry_dicts=entry_dicts,
working_directory=self.working_directory,
)
orphans = EntryParent.from_entry_dicts_with_no_parents(
parents=parents, entry_dicts=entry_dicts, working_directory=self.working_directory
)
for parent_entry in parents:
self._set_collection_variables(collection_url, parent_entry)
for entry in orphans:
self._set_collection_variables(collection_url, entry)
return parents, orphans
def _download(
self,
url_validator: UrlValidator,
parents: List[EntryParent],
orphans: List[Entry],
) -> Generator[Entry, None, None]:
"""
Downloads the leaf entries from EntryParent trees
"""
# Delete info json files afterwards so other collection URLs do not use them
with self._separate_download_archives(clear_info_json_files=True):
for parent in parents:
for entry_child in self._download_parent_entry(
url_validator=url_validator, parent=parent
):
yield entry_child
for orphan in self._download_entries(url_validator=url_validator, entries=orphans):
yield orphan
def download(
self,
) -> Iterable[Entry] | Iterable[Tuple[Entry, FileMetadata]]:
"""The function to perform the download of all media entries"""
# download the bottom-most urls first since they are top-priority
for collection_url in reversed(self.collection.urls.list):
parents, orphan_entries = self._download_url_metadata(collection_url=collection_url)
# TODO: Encapsulate this logic into its own class
self._url_state = URLDownloadState(
entries_total=sum(parent.num_children() for parent in parents) + len(orphan_entries)
)
download_logger.info(
"Beginning downloads for %s", self.overrides.apply_formatter(collection_url.url)
)
for entry in self._download(
url_validator=collection_url, parents=parents, orphans=orphan_entries
):
# Update thumbnails in case of last_entry
self._download_url_thumbnails(collection_url=collection_url, entry=entry)
yield entry
@classmethod
def _download_thumbnail(
cls,
thumbnail_url: str,
output_thumbnail_path: str,
) -> Optional[bool]:
"""
Downloads a thumbnail and stores it in the output directory
Parameters
----------
thumbnail_url:
Url of the thumbnail
output_thumbnail_path:
Path to store the thumbnail after downloading
Returns
-------
True if the thumbnail converted. None if it is missing or failed.
"""
if not thumbnail_url:
return None
return convert_url_thumbnail(
thumbnail_url=thumbnail_url, output_thumbnail_path=output_thumbnail_path
)
def _download_parent_thumbnails(
self,
thumbnail_list_info: UrlThumbnailListValidator,
entry: Entry,
parent: EntryParent,
) -> None:
"""
Downloads and moves channel avatar and banner images to the output directory.
"""
for thumbnail_info in thumbnail_list_info.list:
thumbnail_name = self.overrides.apply_formatter(thumbnail_info.name, entry=entry)
thumbnail_id = self.overrides.apply_formatter(thumbnail_info.uid)
# If latest entry, always update the thumbnail on each entry
if thumbnail_id == ThumbnailTypes.LATEST_ENTRY:
# Make sure the entry's thumbnail is converted to jpg
convert_download_thumbnail(entry, error_if_not_found=False)
# always save in dry-run even if it doesn't exist...
if self.is_dry_run or os.path.isfile(entry.get_download_thumbnail_path()):
self.save_file(
file_name=entry.get_download_thumbnail_name(),
output_file_name=thumbnail_name,
copy_file=True,
)
self._url_state.thumbnails_downloaded.add(thumbnail_name)
continue
# If not latest entry and the thumbnail has already been downloaded, then skip
if thumbnail_name in self._url_state.thumbnails_downloaded:
continue
if (thumbnail_url := parent.get_thumbnail_url(thumbnail_id=thumbnail_id)) is None:
download_logger.debug("Failed to find thumbnail id '%s'", thumbnail_id)
continue
if self._download_thumbnail(
thumbnail_url=thumbnail_url,
output_thumbnail_path=str(Path(self.working_directory) / thumbnail_name),
):
self.save_file(file_name=thumbnail_name)
self._url_state.thumbnails_downloaded.add(thumbnail_name)
else:
download_logger.debug("Failed to download thumbnail id '%s'", thumbnail_id)
def _download_url_thumbnails(self, collection_url: UrlValidator, entry: Entry):
"""
After all media entries have been downloaded, post processed, and moved to the output
directory, run this function. This lets the downloader add any extra files directly to the
output directory, for things like YT channel image, banner.
"""
if entry.kwargs_contains(PLAYLIST_ENTRY):
self._download_parent_thumbnails(
thumbnail_list_info=collection_url.playlist_thumbnails,
entry=entry,
parent=EntryParent(
entry.kwargs(PLAYLIST_ENTRY), working_directory=self.working_directory
),
)
if entry.kwargs_contains(SOURCE_ENTRY):
self._download_parent_thumbnails(
thumbnail_list_info=collection_url.source_thumbnails,
entry=entry,
parent=EntryParent(
entry.kwargs(SOURCE_ENTRY), working_directory=self.working_directory
),
)

View File

@@ -0,0 +1,227 @@
import contextlib
import copy
import json
import os
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
import yt_dlp as ytdl
from yt_dlp.utils import ExistingVideoReached
from yt_dlp.utils import MaxDownloadsReached
from yt_dlp.utils import RejectedVideoReached
from ytdl_sub.thread.log_entries_downloaded_listener import LogEntriesDownloadedListener
from ytdl_sub.utils.exceptions import FileNotDownloadedException
from ytdl_sub.utils.logger import Logger
class YTDLP:
_EXTRACT_ENTRY_NUM_RETRIES: int = 5
_EXTRACT_ENTRY_RETRY_WAIT_SEC: int = 5
logger = Logger.get(name="yt-dlp-downloader")
@classmethod
@contextmanager
def ytdlp_downloader(cls, ytdl_options_overrides: Dict) -> ytdl.YoutubeDL:
"""
Context manager to interact with yt_dlp.
"""
cls.logger.debug("ytdl_options: %s", str(ytdl_options_overrides))
with Logger.handle_external_logs(name="yt-dlp"):
# Deep copy ytdl_options in case yt-dlp modifies the dict
with ytdl.YoutubeDL(copy.deepcopy(ytdl_options_overrides)) as ytdl_downloader:
yield ytdl_downloader
@classmethod
def extract_info(cls, ytdl_options_overrides: Dict, **kwargs) -> Dict:
"""
Wrapper around yt_dlp.YoutubeDL.YoutubeDL.extract_info
All kwargs will passed to the extract_info function.
Parameters
----------
ytdl_options_overrides
Optional. Dict containing ytdl args to override other predefined ytdl args
**kwargs
arguments passed directory to YoutubeDL extract_info
"""
with cls.ytdlp_downloader(ytdl_options_overrides) as ytdlp:
return ytdlp.extract_info(**kwargs)
@classmethod
def extract_info_with_retry(
cls,
ytdl_options_overrides: Dict,
is_downloaded_fn: Optional[Callable[[], bool]] = None,
is_thumbnail_downloaded_fn: Optional[Callable[[], bool]] = None,
**kwargs,
) -> Dict:
"""
Wrapper around yt_dlp.YoutubeDL.YoutubeDL.extract_info
All kwargs will passed to the extract_info function.
This should be used when downloading a single entry. Checks if the entry's video
and thumbnail files exist - retry if they do not.
Parameters
----------
ytdl_options_overrides
Dict containing ytdl args to override other predefined ytdl args
is_downloaded_fn
Optional. Function to check if the entry is downloaded
is_thumbnail_downloaded_fn
Optional. Function to check if the entry thumbnail is downloaded
**kwargs
arguments passed directory to YoutubeDL extract_info
Raises
------
FileNotDownloadedException
If the entry fails to download
"""
num_tries = 0
entry_files_exist = False
copied_ytdl_options_overrides = copy.deepcopy(ytdl_options_overrides)
while not entry_files_exist and num_tries < cls._EXTRACT_ENTRY_NUM_RETRIES:
entry_dict = cls.extract_info(
ytdl_options_overrides=copied_ytdl_options_overrides, **kwargs
)
is_downloaded = is_downloaded_fn is None or is_downloaded_fn()
is_thumbnail_downloaded = (
is_thumbnail_downloaded_fn is None or is_thumbnail_downloaded_fn()
)
if is_downloaded and is_thumbnail_downloaded:
return entry_dict
# If the video file is downloaded but the thumbnail is not, then do not download
# the video again
if is_downloaded and not is_thumbnail_downloaded:
copied_ytdl_options_overrides["skip_download"] = True
copied_ytdl_options_overrides["writethumbnail"] = True
time.sleep(cls._EXTRACT_ENTRY_RETRY_WAIT_SEC)
num_tries += 1
# Remove the download archive so it can retry without thinking its already downloaded,
# even though it is not
if "download_archive" in copied_ytdl_options_overrides:
del copied_ytdl_options_overrides["download_archive"]
if num_tries < cls._EXTRACT_ENTRY_NUM_RETRIES:
cls.logger.debug(
"Failed to download entry. Retrying %d / %d",
num_tries,
cls._EXTRACT_ENTRY_NUM_RETRIES,
)
error_dict = {"ytdl_options": ytdl_options_overrides, "kwargs": kwargs}
raise FileNotDownloadedException(
f"yt-dlp failed to download an entry with these arguments: {error_dict}"
)
@classmethod
def _get_entry_dicts_from_info_json_files(cls, working_directory: str) -> List[Dict]:
"""
Parameters
----------
working_directory
Directory that info json files are located
Returns
-------
List of all info.json files read as JSON dicts
"""
entry_dicts: List[Dict] = []
info_json_paths = [
Path(working_directory) / file_name
for file_name in os.listdir(working_directory)
if file_name.endswith(".info.json")
]
for info_json_path in info_json_paths:
with open(info_json_path, "r", encoding="utf-8") as file:
entry_dicts.append(json.load(file))
return entry_dicts
@classmethod
@contextlib.contextmanager
def _listen_and_log_downloaded_info_json(
cls, working_directory: str, log_prefix: Optional[str]
):
"""
Context manager that starts a separate thread that listens for new .info.json files,
prints their titles as they appear
"""
if not log_prefix:
yield
return
info_json_listener = LogEntriesDownloadedListener(
working_directory=working_directory,
log_prefix=log_prefix,
)
info_json_listener.start()
try:
yield
finally:
info_json_listener.complete = True
@classmethod
def extract_info_via_info_json(
cls,
working_directory: str,
ytdl_options_overrides: Dict,
log_prefix_on_info_json_dl: Optional[str] = None,
**kwargs,
) -> List[Dict]:
"""
Wrapper around yt_dlp.YoutubeDL.YoutubeDL.extract_info with infojson enabled. Entry dicts
are extracted via reading all info.json files in the working directory rather than
from the output of extract_info.
This allows us to catch RejectedVideoReached and ExistingVideoReached exceptions, and
simply ignore while still being able to read downloaded entry metadata.
Parameters
----------
working_directory
Directory that info json files reside in
ytdl_options_overrides
Dict containing ytdl args to override other predefined ytdl args
log_prefix_on_info_json_dl
Optional. Spin a new thread to listen for new info.json files. Log
f'{log_prefix_on_info_json_dl} {title}' when a new one appears
**kwargs
arguments passed directory to YoutubeDL extract_info
"""
try:
with cls._listen_and_log_downloaded_info_json(
working_directory=working_directory, log_prefix=log_prefix_on_info_json_dl
):
_ = cls.extract_info(ytdl_options_overrides=ytdl_options_overrides, **kwargs)
except RejectedVideoReached:
cls.logger.debug(
"RejectedVideoReached, stopping additional downloads "
"(Can be disable by setting `ytdl_options.break_on_reject` to False)."
)
except ExistingVideoReached:
cls.logger.debug(
"ExistingVideoReached, stopping additional downloads. "
"(Can be disable by setting `ytdl_options.break_on_existing` to False)."
)
except MaxDownloadsReached:
cls.logger.info("MaxDownloadsReached, stopping additional downloads.")
return cls._get_entry_dicts_from_info_json_files(working_directory=working_directory)

View File

@@ -40,6 +40,7 @@ PLAYLIST_UPLOADER = _("playlist_uploader")
PLAYLIST_UPLOADER_ID = _("playlist_uploader_id")
PLAYLIST_UPLOADER_URL = _("playlist_uploader_url")
COLLECTION_URL = _("collection_url", backend=True)
DOWNLOAD_INDEX = _("download_index", backend=True)
UPLOAD_DATE_INDEX = _("upload_date_index", backend=True)
REQUESTED_SUBTITLES = _("requested_subtitles", backend=True)

View File

@@ -113,9 +113,24 @@ class Plugin(DownloadArchiver, Generic[PluginOptionsT], ABC):
"""
return []
def modify_entry_metadata(self, entry: Entry) -> Optional[Entry]:
"""
After entry metadata has been gathered, perform preprocessing on the metadata
Parameters
----------
entry
Entry metadata to modify
Returns
-------
The entry or None, indicating not to download it.
"""
return entry
def modify_entry(self, entry: Entry) -> Optional[Entry]:
"""
For each entry downloaded, modify the entry in some way before sending it to
After each entry is downloaded, modify the entry in some way before sending it to
post-processing.
Parameters

View File

@@ -174,6 +174,15 @@ class SubscriptionDownload(BaseSubscription, ABC):
FileHandler.delete(entry.get_download_thumbnail_path())
FileHandler.delete(entry.get_download_info_json_path())
@classmethod
def _preprocess_entry(cls, plugins: List[Plugin], entry: Entry) -> Optional[Entry]:
maybe_entry: Optional[Entry] = entry
for plugin in plugins:
if (maybe_entry := plugin.modify_entry_metadata(maybe_entry)) is None:
return None
return maybe_entry
def _post_process_entry(
self, plugins: List[Plugin], dry_run: bool, entry: Entry, entry_metadata: FileMetadata
):
@@ -276,19 +285,23 @@ class SubscriptionDownload(BaseSubscription, ABC):
dry_run=dry_run,
)
with self._subscription_download_context_managers():
downloader = self.downloader_class(
download_options=self.downloader_options,
enhanced_download_archive=self._enhanced_download_archive,
download_ytdl_options=subscription_ytdl_options.download_builder(),
metadata_ytdl_options=subscription_ytdl_options.metadata_builder(),
overrides=self.overrides,
)
downloader = self.downloader_class(
download_options=self.downloader_options,
enhanced_download_archive=self._enhanced_download_archive,
download_ytdl_options=subscription_ytdl_options.download_builder(),
metadata_ytdl_options=subscription_ytdl_options.metadata_builder(),
overrides=self.overrides,
)
# This could be cleaned up....
plugins.extend(downloader.added_plugins())
for entry in downloader.download():
with self._subscription_download_context_managers():
for entry in downloader.download_metadata():
if (entry := self._preprocess_entry(plugins=plugins, entry=entry)) is None:
continue
entry = downloader.download(entry)
entry_metadata = FileMetadata()
if isinstance(entry, tuple):
entry, entry_metadata = entry
if split_plugin := _get_split_plugin(plugins):
self._process_split_entry(

View File

@@ -47,7 +47,9 @@ def convert_download_thumbnail(entry: Entry, error_if_not_found: bool = True) ->
@retry(times=3, exceptions=(Exception,))
def convert_url_thumbnail(thumbnail_url: str, output_thumbnail_path: str) -> Optional[bool]:
def download_and_convert_url_thumbnail(
thumbnail_url: Optional[str], output_thumbnail_path: str
) -> Optional[bool]:
"""
Downloads and converts a thumbnail from a url into a jpg
@@ -62,6 +64,9 @@ def convert_url_thumbnail(thumbnail_url: str, output_thumbnail_path: str) -> Opt
-------
True to indicate it converted the thumbnail from url. None if the retry failed.
"""
if not thumbnail_url:
return None
# timeout after 8 seconds
with urlopen(thumbnail_url, timeout=1.0) as file:
with tempfile.NamedTemporaryFile(delete=False) as thumbnail:

View File

@@ -4,6 +4,7 @@ from expected_download import assert_expected_downloads
from expected_transaction_log import assert_transaction_log_matches
import ytdl_sub.downloaders.downloader
from ytdl_sub.downloaders.ytdlp import YTDLP
from ytdl_sub.subscriptions.subscription import Subscription
@@ -65,7 +66,7 @@ class TestBandcamp:
# Ensure another invocation will hit ExistingVideoReached
if not dry_run:
with assert_logs(
logger=ytdl_sub.downloaders.downloader.download_logger,
logger=YTDLP.logger,
expected_message="ExistingVideoReached, stopping additional downloads",
log_level="debug",
):

View File

@@ -7,6 +7,7 @@ from expected_download import assert_expected_downloads
from expected_transaction_log import assert_transaction_log_matches
import ytdl_sub.downloaders.downloader
from ytdl_sub.downloaders.ytdlp import YTDLP
from ytdl_sub.subscriptions.subscription import Subscription
@@ -68,7 +69,7 @@ class TestDateRange:
if not dry_run:
# try downloading again, ensure nothing more was downloaded
with assert_logs(
logger=ytdl_sub.downloaders.downloader.download_logger,
logger=YTDLP.logger,
expected_message="ExistingVideoReached, stopping additional downloads",
log_level="debug",
):
@@ -139,7 +140,7 @@ class TestDateRange:
# First, download recent vids. Always download since we want to test dry-run
# on the rolling recent portion.
with assert_logs(
logger=ytdl_sub.downloaders.downloader.download_logger,
logger=YTDLP.logger,
expected_message="RejectedVideoReached, stopping additional downloads",
log_level="debug",
):
@@ -159,7 +160,7 @@ class TestDateRange:
# Then, download the rolling recent vids subscription. This should remove one of the
# two videos
with assert_logs(
logger=ytdl_sub.downloaders.downloader.download_logger,
logger=YTDLP.logger,
expected_message="ExistingVideoReached, stopping additional downloads",
log_level="debug",
):
@@ -186,7 +187,7 @@ class TestDateRange:
# existing
if not dry_run:
with assert_logs(
logger=ytdl_sub.downloaders.downloader.download_logger,
logger=YTDLP.logger,
expected_message="ExistingVideoReached, stopping additional downloads",
log_level="debug",
):

View File

@@ -5,6 +5,7 @@ from expected_download import assert_expected_downloads
from expected_transaction_log import assert_transaction_log_matches
import ytdl_sub.downloaders.downloader
from ytdl_sub.downloaders.ytdlp import YTDLP
from ytdl_sub.subscriptions.subscription import Subscription
@@ -83,7 +84,7 @@ class TestPlaylist:
# Ensure another invocation will hit ExistingVideoReached
if not dry_run:
with assert_logs(
logger=ytdl_sub.downloaders.downloader.download_logger,
logger=YTDLP.logger,
expected_message="ExistingVideoReached, stopping additional downloads",
log_level="debug",
):
@@ -131,7 +132,7 @@ class TestPlaylist:
if not dry_run:
# Ensure another invocation will hit ExistingVideoReached
with assert_logs(
logger=ytdl_sub.downloaders.downloader.download_logger,
logger=YTDLP.logger,
expected_message="ExistingVideoReached, stopping additional downloads",
log_level="debug",
):

View File

@@ -1,6 +1,5 @@
import json
import os.path
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import List

View File

@@ -12,6 +12,7 @@ from resources import copy_file_fixture
from ytdl_sub.config.config_file import ConfigFile
from ytdl_sub.downloaders.downloader import YtDlpDownloader
from ytdl_sub.downloaders.ytdlp import YTDLP
from ytdl_sub.entries.variables.kwargs import DESCRIPTION
from ytdl_sub.entries.variables.kwargs import EPOCH
from ytdl_sub.entries.variables.kwargs import EXT
@@ -100,23 +101,22 @@ def mock_entry_dict_factory(mock_downloaded_file_path) -> Callable:
@pytest.fixture
def mock_download_collection_thumbnail(mock_downloaded_file_path):
def _mock_download_thumbnail(output_path: str) -> bool:
# mock_file_factory(file_name=output_path.split("/")[-1])
output_name = os.path.basename(output_path)
def _mock_download_and_convert_url_thumbnail(
thumbnail_url: str, output_thumbnail_path: str
) -> bool:
_ = thumbnail_url
output_name = os.path.basename(output_thumbnail_path)
if "poster" in output_name or "show" in output_name:
copy_file_fixture(fixture_name="poster.jpg", output_file_path=output_path)
copy_file_fixture(fixture_name="poster.jpg", output_file_path=output_thumbnail_path)
return True
elif "fanart" in output_name:
copy_file_fixture(fixture_name="fanart.jpeg", output_file_path=output_path)
copy_file_fixture(fixture_name="fanart.jpeg", output_file_path=output_thumbnail_path)
return True
return False
with patch.object(
YtDlpDownloader,
"_download_thumbnail",
new=lambda _, thumbnail_url, output_thumbnail_path: _mock_download_thumbnail(
output_thumbnail_path
),
with patch(
"ytdl_sub.downloaders.downloader.download_and_convert_url_thumbnail",
new=_mock_download_and_convert_url_thumbnail,
):
yield # TODO: create file here
@@ -126,11 +126,9 @@ def mock_download_collection_entries(
mock_download_collection_thumbnail, mock_entry_dict_factory: Callable, working_directory: str
):
@contextlib.contextmanager
def _mock_download_collection_entries_factory(is_youtube_channel: bool):
def _mock_download_collection_entries_factory(is_youtube_channel: bool, num_urls: int = 1):
def _write_entries_to_working_dir(*args, **kwargs) -> List[Dict]:
if (len(args[0].collection.urls.list) == 1) or (
"season.2" in kwargs["url"] and len(args[0].download_options.urls.list) > 1
):
if num_urls == 1 or ("season.2" in kwargs["url"] and num_urls > 1):
return [
mock_entry_dict_factory(
uid="21-1",
@@ -202,7 +200,7 @@ def mock_download_collection_entries(
]
with patch.object(
YtDlpDownloader, "extract_info_via_info_json", new=_write_entries_to_working_dir
YTDLP, "extract_info_via_info_json", new=_write_entries_to_working_dir
), patch.object(
YtDlpDownloader, "_extract_entry_info_with_retry", new=lambda _, entry: entry
):

View File

@@ -215,7 +215,9 @@ class TestPrebuiltTvShowCollectionPresets:
},
)
with mock_download_collection_entries(is_youtube_channel=is_youtube_channel):
with mock_download_collection_entries(
is_youtube_channel=is_youtube_channel, num_urls=len(season_indices)
):
transaction_log = subscription.download(dry_run=False)
assert_transaction_log_matches(