import os import re import shutil from enum import Enum from urllib.parse import urlparse import youtube_dl import requests from praw import Reddit class SourceType(Enum): VREDDIT = 1 IREDDIT = 2 YOUTUBE = 4 REDGIFS = 5 IMAGURJPG = 6 GFYCAT = 7 GREDDIT = 8 UNKNOWN = 1000 OUTTMPL = 'source_%(id)s.%(ext)s' class Downloader: reddit: Reddit username: str downloaded: bool post_id: str source_type: SourceType paths: list[str] def __init__(self, url: str, reddit: Reddit): self.reddit = reddit self.downloaded = False self.url = url self.source_type = self._get_source_type(url) self.paths = [] def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.delete() def download(self): try: if self.source_type == SourceType.VREDDIT: self._download_vreddit() elif self.source_type == SourceType.REDGIFS: self._download_redgifs() elif self.source_type == SourceType.GFYCAT: self._download_gifycat() elif self.source_type == SourceType.YOUTUBE: self._download_youtube() elif self.source_type in (SourceType.IMAGURJPG, SourceType.IREDDIT): self._download_raw_file() elif self.source_type == SourceType.GREDDIT: self._download_gallery_reddit() except Exception as e: self.downloaded = False def delete(self): if self.paths: for path in self.paths: if os.path.exists(path): os.unlink(path) def _download_youtube_dls(self, ydl_opts): with youtube_dl.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(self.url, download=True) if info.get('_type', None) == 'playlist': for entry in info['entries']: r = ydl.prepare_filename(entry) self.paths.append(f'{os.path.splitext(r)[0]}.mp4') else: r = ydl.prepare_filename(info) self.paths.append(f'{os.path.splitext(r)[0]}.mp4') self.downloaded = True def _download_redgifs(self): ydl_opts = { 'format': 'best', 'merge_output_format': 'mp4', 'outtmpl': OUTTMPL } self._download_youtube_dls(ydl_opts) def _download_gifycat(self): ydl_opts = { 'format': 'best', 'merge_output_format': 'mp4', 'outtmpl': OUTTMPL } self._download_youtube_dls(ydl_opts) def _download_vreddit(self): ydl_opts = { 'format': 'bestvideo+bestaudio/bestvideo', 'merge_output_format': 'mp4', 'outtmpl': OUTTMPL } self._download_youtube_dls(ydl_opts) def _download_youtube(self): ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio', 'merge_output_format': 'mp4', 'outtmpl': OUTTMPL } self._download_youtube_dls(ydl_opts) def _download_raw_file(self): a = urlparse(self.url) path = f'source_{os.path.basename(a.path)}' r = requests.get(self.url, stream=True) if r.status_code == 200: self.downloaded = True with open(path, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) self.paths.append(path) else: self.downloaded = False def _download_gallery_reddit(self): url = self.url submission = self.reddit.submission(url=self.url) for key in submission.media_metadata: value = submission.media_metadata[key] self.url = value['s']['u'] self._download_raw_file() self.url = url @staticmethod def _get_source_type(url): if re.match("^.*v\\.redd\\.it.*$", url): return SourceType.VREDDIT if re.match("^.*i\\.redd\\.it.*\\.(jpg|jpeg)$", url): return SourceType.IREDDIT if re.match("^.*\\.youtube\\.com.*$", url): return SourceType.YOUTUBE if re.match("^.*redgifs\\.com.*$", url): return SourceType.REDGIFS if re.match("^.*i\\.imgur\\.com.*\\.(jpg|jpeg)$", url): return SourceType.IMAGURJPG if re.match("^.*gfycat.com.*$", url): return SourceType.GFYCAT if re.match("^.*www.reddit.com/gallery.*$", url): return SourceType.GREDDIT return SourceType.UNKNOWN