""" MIT License Copyright (c) 2021 Puyodead1 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import argparse import json import os import subprocess import sys import time import requests from dotenv import load_dotenv from requests.exceptions import ConnectionError as conn_error from tqdm import tqdm from html.parser import HTMLParser as compat_HTMLParser from sanitize import sanitize from utils import extract_kid, decrypt, merge, remove_files, _clean, check_for_aria, check_for_ffmpeg, check_for_mp4decrypt from vtt_to_srt import convert from Udemy import Udemy from version import __version__ home_dir = os.getcwd() download_dir = os.path.join(os.getcwd(), "out_dir") saved_dir = os.path.join(os.getcwd(), "saved") keyfile_path = os.path.join(os.getcwd(), "keyfile.json") cookiefile_path = os.path.join(os.getcwd(), "cookies.txt") course_info_path = os.path.join(saved_dir, "course_info.json") course_content_path = os.path.join(saved_dir, "course_content.json") _udemy_path = os.path.join(saved_dir, "_udemy.json") udemy = None parser = None iknowwhatimdoing = False retry = 3 _udemy = {} course_url = None downloader = None dl_assets = False skip_lectures = False dl_captions = False caption_locale = "en" course = None resource = None quality = None bearer_token = None course_info = None course_content = None portal_name = None keys = None course_id = None course_title = None title = None course_name = None keep_vtt = False skip_hls = False print_info = False load_from_file = False save_to_file = False concurrent_connections = 10 access_token = None use_h265 = False h265_crf = "28" ffmpeg_preset = "medium" h265_encoder = "copy" ffmpeg_framerate = "30" cookies = "" disable_ipv6 = False def download_segments(url, format_id, video_title, output_path, lecture_file_name, chapter_dir): os.chdir(os.path.join(chapter_dir)) file_name = lecture_file_name.replace("%", "").replace(".mp4", "") video_filepath_enc = file_name + ".encrypted.mp4" audio_filepath_enc = file_name + ".encrypted.m4a" video_filepath_dec = file_name + ".decrypted.mp4" audio_filepath_dec = file_name + ".decrypted.m4a" print("> Downloading Lecture Tracks...") args = [ "yt-dlp", "--force-generic-extractor", "--allow-unplayable-formats", "--concurrent-fragments", f"{concurrent_connections}", "--downloader", "aria2c", "--fixup", "never", "-k", "-o", f"{file_name}.encrypted.%(ext)s", "-f", format_id ] if disable_ipv6: args.append("--downloader-args") args.append("aria2c:\"--disable-ipv6\"") args.append(f"{url}") ret_code = subprocess.Popen(args).wait() print("> Lecture Tracks Downloaded") print("Return code: " + str(ret_code)) if ret_code != 0: print("Return code from the downloader was non-0 (error), skipping!") return # tries to decrypt audio and video, and then merge them try: # tries to decrypt audio try: audio_kid = extract_kid(audio_filepath_enc) print("KID for audio file is: " + audio_kid) audio_key = keys[audio_kid.lower()] print("> Decrypting audio...") ret_code = decrypt( audio_key, audio_filepath_enc, audio_filepath_dec) if(ret_code != 0): print( "WARN: Decrypting returned a non-0 result code which usually indicated an error!") else: print("Decryption complete") except KeyError: print("Audio key not found!") raise RuntimeError("No audio key") # tries to decrypt video try: video_kid = extract_kid(video_filepath_enc) print("KID for video file is: " + video_kid) video_key = keys[video_kid.lower()] print("> Decrypting video...") ret_code2 = decrypt( video_key, video_filepath_enc, video_filepath_dec) if(ret_code2 != 0): print( "WARN: Decrypting returned a non-0 result code which usually indicated an error!") else: print("Decryption complete") except KeyError: print("Video key not found!") raise RuntimeError("No video key") # tries to merge audio and video # this should run only if both audio and video decryption returned 0 codes print("> Merging audio and video files...") ret_code3 = merge(video_title=video_title, video_filepath=video_filepath_dec, audio_filepath=audio_filepath_dec, output_path=output_path, use_h265=use_h265, h265_crf=h265_crf, ffmpeg_preset=ffmpeg_preset, h265_encoder=h265_encoder, ffmpeg_framerate=ffmpeg_framerate) if(ret_code3 != 0): print( "WARN: Merging returned a non-0 result code which usually indicated an error!") if(ret_code == 0 and ret_code2 == 0 and ret_code3 == 0): print("> Cleaning up...") # remove all the temporary files left over after decryption and merging if there were no errors remove_files((video_filepath_enc, video_filepath_dec, audio_filepath_enc, audio_filepath_dec)) print("> Cleanup complete") except Exception as e: print(e) os.chdir(home_dir) def download(url, path, filename): """ @author Puyodead1 """ file_size = int(requests.head(url).headers["Content-Length"]) if os.path.exists(path): first_byte = os.path.getsize(path) else: first_byte = 0 if first_byte >= file_size: return file_size header = {"Range": "bytes=%s-%s" % (first_byte, file_size)} pbar = tqdm(total=file_size, initial=first_byte, unit='B', unit_scale=True, desc=filename) res = requests.get(url, headers=header, stream=True) res.raise_for_status() with (open(path, encoding="utf8", mode='ab')) as f: for chunk in res.iter_content(chunk_size=1024): if chunk: f.write(chunk) pbar.update(1024) pbar.close() return file_size def download_aria(url, file_dir, filename): """ @author Puyodead1 """ print(" > Downloading File...") args = [ "aria2c", url, "-o", filename, "-d", file_dir, "-j16", "-s20", "-x16", "-c", "--auto-file-renaming=false", "--summary-interval=0" ] if disable_ipv6: args.append("--disable-ipv6") ret_code = subprocess.Popen(args).wait() print(" > File Downloaded") print("Return code: " + str(ret_code)) def process_caption(caption, lecture_title, lecture_dir, tries=0): filename = f"%s_%s.%s" % (sanitize(lecture_title), caption.get("language"), caption.get("extension")) filename_no_ext = f"%s_%s" % (sanitize(lecture_title), caption.get("language")) filepath = os.path.join(lecture_dir, filename) if os.path.isfile(filepath): print(" > Caption '%s' already downloaded." % filename) else: print(f" > Downloading caption: '%s'" % filename) try: download_aria(caption.get("download_url"), lecture_dir, filename) except Exception as e: if tries >= 3: print( f" > Error downloading caption: {e}. Exceeded retries, skipping." ) return else: print( f" > Error downloading caption: {e}. Will retry {3-tries} more times." ) process_caption(caption, lecture_title, lecture_dir, keep_vtt, tries + 1) if caption.get("extension") == "vtt": try: print(" > Converting caption to SRT format...") convert(lecture_dir, filename_no_ext) print(" > Caption conversion complete.") if not keep_vtt: os.remove(filepath) except Exception as e: print(f" > Error converting caption: {e}") def process_lecture(lecture, lecture_path, lecture_file_name, chapter_dir): lecture_title = lecture.get("lecture_title") is_encrypted = lecture.get("is_encrypted") lecture_sources = lecture.get("video_sources") if dl_assets: assets = lecture.get("assets") print(" > Processing {} asset(s) for lecture...".format( len(assets))) for asset in assets: asset_type = asset.get("type") filename = asset.get("filename") download_url = asset.get("download_url") asset_id = asset.get("id") if asset_type == "article": print( "If you're seeing this message, that means that you reached a secret area that I haven't finished! jk I haven't implemented handling for this asset type, please report this at https://github.com/Puyodead1/udemy-downloader/issues so I can add it. When reporting, please provide the following information: " ) print("AssetType: Article; AssetData: ", asset) # html_content = lecture.get("html_content") # lecture_path = os.path.join( # chapter_dir, "{}.html".format(sanitize(lecture_title))) # try: # with open(lecture_path, 'w') as f: # f.write(html_content) # f.close() # except Exception as e: # print("Failed to write html file: ", e) # continue elif asset_type == "video": print( "If you're seeing this message, that means that you reached an asset type that I haven't implemented handling for, please report this at https://github.com/Puyodead1/udemy-downloader/issues so I can add it. When reporting, please provide the following information: " ) print("AssetType: Video; AssetData: ", asset) elif asset_type == "audio" or asset_type == "e-book" or asset_type == "file" or asset_type == "presentation": try: download_aria(download_url, chapter_dir, filename) except Exception as e: print("> Error downloading asset: ", e) continue elif asset_type == "external_link": filepath = os.path.join(chapter_dir, filename) savedirs, name = os.path.split(filepath) filename = u"external-assets-links.txt" filename = os.path.join(savedirs, filename) file_data = [] if os.path.isfile(filename): file_data = [ i.strip().lower() for i in open(filename, encoding="utf-8", errors="ignore") if i ] content = u"\n{}\n{}\n".format(name, download_url) if name.lower() not in file_data: with open(filename, 'a', encoding="utf-8", errors="ignore") as f: f.write(content) f.close() subtitles = lecture.get("subtitles") if dl_captions and subtitles: selected_subtitles = [] print("Processing {} caption(s)...".format(len(subtitles))) for subtitle in subtitles: lang = subtitle.get("language") if lang == caption_locale or caption_locale == "all": selected_subtitles.append(subtitle) process_caption(subtitle, lecture_title, chapter_dir) print("Selected {} captions".format(len(selected_subtitles))) if is_encrypted: if len(lecture_sources) > 0: source = lecture_sources[-1] # last index is the best quality if isinstance(quality, int): source = min( lecture_sources, key=lambda x: abs(int(x.get("height")) - quality)) print(f" > Lecture '%s' has DRM, attempting to download" % lecture_title) download_segments(source.get("download_url"), source.get( "format_id"), lecture_title, lecture_path, lecture_file_name, chapter_dir) else: print(f" > Lecture '%s' is missing media links" % lecture_title) print(len(lecture_sources)) else: sources = lecture.get("sources") sources = sorted(sources, key=lambda x: int(x.get("height")), reverse=True) if sources: if not os.path.isfile(lecture_path): print( " > Lecture doesn't have DRM, attempting to download..." ) source = sources[0] # first index is the best quality if isinstance(quality, int): source = min( sources, key=lambda x: abs(int(x.get("height")) - quality)) try: print(" ====== Selected quality: ", source.get("type"), source.get("height")) url = source.get("download_url") source_type = source.get("type") if source_type == "hls": temp_filepath = lecture_path.replace( ".mp4", ".%(ext)s") args = [ "yt-dlp", "--force-generic-extractor", "--concurrent-fragments", f"{concurrent_connections}", "--downloader", "aria2c", "-o", f"{temp_filepath}" ] if disable_ipv6: args.append("--downloader-args") args.append("aria2c:\"--disable-ipv6\"") args.append(f"{url}") ret_code = subprocess.Popen(args).wait() if ret_code == 0: print(" > HLS Download success") ffmpeg_command = ["ffmpeg", "-i", lecture_path, "-c:v", h265_encoder, "-c:a", "copy", "-filter:v", "fps=" + ffmpeg_framerate, lecture_path + ".mp4"] ret_code = subprocess.Popen(ffmpeg_command).wait() if ret_code == 0: os.remove(lecture_path) os.rename(lecture_path + ".mp4", lecture_path) print("Encoding done") else: print("Encoding returned a non-0 code") else: download_aria(url, chapter_dir, lecture_title + ".mp4") except EnvironmentError as e: print(f" > Error downloading lecture: ", e) else: print( " > Lecture '%s' is already downloaded, skipping..." % lecture_title) else: print(" > Missing sources for lecture", lecture) def parse(): total_chapters = _udemy.get("total_chapters") total_lectures = _udemy.get("total_lectures") print(f"Chapter(s) ({total_chapters})") print(f"Lecture(s) ({total_lectures})") course_name = str(_udemy.get("course_id")) course_dir = os.path.join(download_dir, course_name) if not os.path.exists(course_dir): os.mkdir(course_dir) for chapter in _udemy.get("chapters"): chapter_title = chapter.get("chapter_title") chapter_index = chapter.get("chapter_index") chapter_dir = os.path.join(course_dir, chapter_title) if not os.path.exists(chapter_dir): os.mkdir(chapter_dir) print( f"======= Processing chapter {chapter_index} of {total_chapters} =======" ) for lecture in chapter.get("lectures"): lecture_title = lecture.get("lecture_title") lecture_index = lecture.get("lecture_index") lecture_extension = lecture.get("extension") extension = "mp4" # video lectures dont have an extension property, so we assume its mp4 if lecture_extension != None: # if the lecture extension property isnt none, set the extension to the lecture extension extension = lecture_extension lecture_file_name = sanitize(lecture_title + "." + extension) lecture_path = os.path.join( chapter_dir, lecture_file_name) print( f" > Processing lecture {lecture_index} of {total_lectures}") if not skip_lectures: # Check if the lecture is already downloaded if os.path.isfile(lecture_path): print( " > Lecture '%s' is already downloaded, skipping..." % lecture_title) continue else: # Check if the file is an html file if extension == "html": html_content = lecture.get("html_content").encode( "ascii", "ignore").decode("utf8") lecture_path = os.path.join( chapter_dir, "{}.html".format(sanitize(lecture_title))) try: with open(lecture_path, 'w') as f: f.write(html_content) f.close() except Exception as e: print(" > Failed to write html file: ", e) continue else: process_lecture(lecture, lecture_path, lecture_file_name, chapter_dir) def process_course(): global _udemy lecture_counter = 0 counter = -1 for entry in course: clazz = entry.get("_class") asset = entry.get("asset") supp_assets = entry.get("supplementary_assets") if clazz == "chapter": lecture_counter = 0 lectures = [] chapter_index = entry.get("object_index") chapter_title = "{0:02d} - ".format(chapter_index) + _clean( entry.get("title")) if chapter_title not in _udemy["chapters"]: _udemy["chapters"].append({ "chapter_title": chapter_title, "chapter_id": entry.get("id"), "chapter_index": chapter_index, "lectures": [] }) counter += 1 elif clazz == "lecture": lecture_counter += 1 lecture_id = entry.get("id") if len(_udemy["chapters"]) == 0: lectures = [] chapter_index = entry.get("object_index") chapter_title = "{0:02d} - ".format( chapter_index) + _clean(entry.get("title")) if chapter_title not in _udemy["chapters"]: _udemy["chapters"].append({ "chapter_title": chapter_title, "chapter_id": lecture_id, "chapter_index": chapter_index, "lectures": [] }) counter += 1 if lecture_id: print( f"Processing {course.index(entry)} of {len(course)}" ) retVal = [] if isinstance(asset, dict): asset_type = (asset.get("asset_type").lower() or asset.get("assetType").lower) if asset_type == "article": if isinstance(supp_assets, list) and len(supp_assets) > 0: retVal = udemy._extract_supplementary_assets( supp_assets) elif asset_type == "video": if isinstance(supp_assets, list) and len(supp_assets) > 0: retVal = udemy._extract_supplementary_assets( supp_assets) elif asset_type == "e-book": retVal = udemy._extract_ebook(asset) elif asset_type == "file": retVal = udemy._extract_file(asset) elif asset_type == "presentation": retVal = udemy._extract_ppt(asset) elif asset_type == "audio": retVal = udemy._extract_audio(asset) lecture_index = entry.get("object_index") lecture_title = "{0:03d} ".format( lecture_counter) + _clean(entry.get("title")) if asset.get("stream_urls") != None: # not encrypted data = asset.get("stream_urls") if data and isinstance(data, dict): sources = data.get("Video") tracks = asset.get("captions") #duration = asset.get("time_estimation") sources = udemy._extract_sources( sources, skip_hls) subtitles = udemy._extract_subtitles(tracks) sources_count = len(sources) subtitle_count = len(subtitles) lectures.append({ "index": lecture_counter, "lecture_index": lecture_index, "lecture_id": lecture_id, "lecture_title": lecture_title, # "duration": duration, "assets": retVal, "assets_count": len(retVal), "sources": sources, "subtitles": subtitles, "subtitle_count": subtitle_count, "sources_count": sources_count, "is_encrypted": False, "asset_id": asset.get("id") }) else: lectures.append({ "index": lecture_counter, "lecture_index": lecture_index, "lectures_id": lecture_id, "lecture_title": lecture_title, "html_content": asset.get("body"), "extension": "html", "assets": retVal, "assets_count": len(retVal), "subtitle_count": 0, "sources_count": 0, "is_encrypted": False, "asset_id": asset.get("id") }) else: # encrypted data = asset.get("media_sources") if data and isinstance(data, list): sources = udemy._extract_media_sources(data) tracks = asset.get("captions") # duration = asset.get("time_estimation") subtitles = udemy._extract_subtitles(tracks) sources_count = len(sources) subtitle_count = len(subtitles) lectures.append({ "index": lecture_counter, "lecture_index": lecture_index, "lectures_id": lecture_id, "lecture_title": lecture_title, # "duration": duration, "assets": retVal, "assets_count": len(retVal), "video_sources": sources, "subtitles": subtitles, "subtitle_count": subtitle_count, "sources_count": sources_count, "is_encrypted": True, "asset_id": asset.get("id") }) else: lectures.append({ "index": lecture_counter, "lecture_index": lecture_index, "lectures_id": lecture_id, "lecture_title": lecture_title, "html_content": asset.get("body"), "extension": "html", "assets": retVal, "assets_count": len(retVal), "subtitle_count": 0, "sources_count": 0, "is_encrypted": False, "asset_id": asset.get("id") }) _udemy["chapters"][counter]["lectures"] = lectures _udemy["chapters"][counter]["lecture_count"] = len( lectures) elif clazz == "quiz": lecture_id = entry.get("id") if len(_udemy["chapters"]) == 0: lectures = [] chapter_index = entry.get("object_index") chapter_title = "{0:02d} - ".format( chapter_index) + _clean(entry.get("title")) if chapter_title not in _udemy["chapters"]: lecture_counter = 0 _udemy["chapters"].append({ "chapter_title": chapter_title, "chapter_id": lecture_id, "chapter_index": chapter_index, "lectures": [], }) counter += 1 _udemy["chapters"][counter]["lectures"] = lectures _udemy["chapters"][counter]["lectures_count"] = len( lectures) _udemy["total_chapters"] = len(_udemy["chapters"]) _udemy["total_lectures"] = sum([ entry.get("lecture_count", 0) for entry in _udemy["chapters"] if entry ]) def get_course_information(): global course_info, course_id, title, course_title, portal_name if(load_from_file): if os.path.exists(course_info_path): f = open(course_info_path, encoding="utf8", mode='r') course_info = json.loads(f.read()) else: print("course_info.json not found, falling back to fetching") course_info = udemy._extract_course_info(course_url) else: course_info = udemy._extract_course_info(course_url) course_id = course_info.get("id") title = _clean(course_info.get("title")) course_title = course_info.get("published_title") portal_name = course_info.get("portal_name") def get_course_content(): global course_content if load_from_file: if os.path.exists(course_content_path): f = open(course_content_path, encoding="utf8", mode='r') course_content = json.loads(f.read()) else: print("course_content.json not found, falling back to fetching") course_content = udemy._extract_course_json( course_url, course_id, portal_name) else: course_content = udemy._extract_course_json( course_url, course_id, portal_name) def parse_data(): global _udemy if load_from_file and os.path.exists(_udemy_path): f = open(_udemy_path, encoding="utf8", mode='r') _udemy = json.loads(f.read()) else: process_course() def _print_course_info(): print("\n\n\n\n") course_title = _udemy.get("title") chapter_count = _udemy.get("total_chapters") lecture_count = _udemy.get("total_lectures") print("> Course: {}".format(course_title)) print("> Total Chapters: {}".format(chapter_count)) print("> Total Lectures: {}".format(lecture_count)) print("\n") chapters = _udemy.get("chapters") for chapter in chapters: chapter_title = chapter.get("chapter_title") chapter_index = chapter.get("chapter_index") chapter_lecture_count = chapter.get("lecture_count") chapter_lectures = chapter.get("lectures") print("> Chapter: {} ({} of {})".format(chapter_title, chapter_index, chapter_count)) for lecture in chapter_lectures: lecture_title = lecture.get("lecture_title") lecture_index = lecture.get("index") lecture_asset_count = lecture.get("assets_count") lecture_is_encrypted = lecture.get("is_encrypted") lecture_subtitles = lecture.get("subtitles") lecture_extension = lecture.get("extension") lecture_sources = lecture.get("sources") lecture_video_sources = lecture.get("video_sources") if lecture_sources: lecture_sources = sorted(lecture.get("sources"), key=lambda x: int(x.get("height")), reverse=True) if lecture_video_sources: lecture_video_sources = sorted( lecture.get("video_sources"), key=lambda x: int(x.get("height")), reverse=True) if lecture_is_encrypted: lecture_qualities = [ "{}@{}x{}".format(x.get("type"), x.get("width"), x.get("height")) for x in lecture_video_sources ] elif not lecture_is_encrypted and lecture_sources: lecture_qualities = [ "{}@{}x{}".format(x.get("type"), x.get("height"), x.get("width")) for x in lecture_sources ] if lecture_extension: continue print(" > Lecture: {} ({} of {})".format(lecture_title, lecture_index, chapter_lecture_count)) print(" > DRM: {}".format(lecture_is_encrypted)) print(" > Asset Count: {}".format(lecture_asset_count)) print(" > Captions: {}".format( [x.get("language") for x in lecture_subtitles])) print(" > Qualities: {}".format(lecture_qualities)) if chapter_index != chapter_count: print("\n\n") def setup_parser(): global parser parser = argparse.ArgumentParser(description='Udemy Downloader') parser.add_argument("-c", "--course-url", dest="course_url", type=str, help="The URL of the course to download", required=True) parser.add_argument( "-b", "--bearer", dest="bearer_token", type=str, help="The Bearer token to use", ) parser.add_argument( "-q", "--quality", dest="quality", type=int, help="Download specific video quality. If the requested quality isn't available, the closest quality will be used. If not specified, the best quality will be downloaded for each lecture", ) parser.add_argument( "-l", "--lang", dest="lang", type=str, help="The language to download for captions, specify 'all' to download all captions (Default is 'en')", ) parser.add_argument( "-cd", "--concurrent-connections", dest="concurrent_connections", type=int, help="The number of maximum concurrent connections per download for segments (HLS and DASH, must be a number 1-30)", ) parser.add_argument( "--skip-lectures", dest="skip_lectures", action="store_true", help="If specified, lectures won't be downloaded", ) parser.add_argument( "--download-assets", dest="download_assets", action="store_true", help="If specified, lecture assets will be downloaded", ) parser.add_argument( "--download-captions", dest="download_captions", action="store_true", help="If specified, captions will be downloaded", ) parser.add_argument( "--keep-vtt", dest="keep_vtt", action="store_true", help="If specified, .vtt files won't be removed", ) parser.add_argument( "--skip-hls", dest="skip_hls", action="store_true", help="If specified, hls streams will be skipped (faster fetching) (hls streams usually contain 1080p quality for non-drm lectures)", ) parser.add_argument( "--info", dest="print_info", action="store_true", help="If specified, only course information will be printed, nothing will be downloaded", ) parser.add_argument( "--use-h265", dest="use_h265", action="store_true", help="If specified, videos will be encoded with the H.265 codec", ) parser.add_argument( "--h265-crf", dest="h265_crf", type=str, default="28", help="Set a custom CRF value for H.265 encoding. FFMPEG default is 28", ) parser.add_argument( "--ffmpeg-preset", dest="ffmpeg_preset", type=str, default="medium", help="Set a custom preset value for encoding. This can vary depending on the encoder", ) parser.add_argument( "--ffmpeg-framerate", dest="ffmpeg_framerate", type=str, default="30", help="Changes the FPS used for encoding. FFMPEG default is 30", ) parser.add_argument( "--h265-encoder", dest="h265_encoder", type=str, default="libx265", help="Changes the HEVC encder that is used. Default is copy when not using h265, otherwise the default is libx265", ) parser.add_argument( "--disable-ipv6", dest="disable_ipv6", action="store_true", help="If specified, ipv6 will be disabled in aria2", ) parser.add_argument( "--iknowwhatimdoing", dest="iknowwhatimdoing", action="store_true", help=argparse.SUPPRESS, ) parser.add_argument( "--save-to-file", dest="save_to_file", action="store_true", help=argparse.SUPPRESS, ) parser.add_argument( "--load-from-file", dest="load_from_file", action="store_true", help=argparse.SUPPRESS, ) parser.add_argument("-v", "--version", action="version", version='You are running version {version}'.format(version=__version__)) def process_args(args): global course_url, bearer_token, dl_assets, dl_captions, caption_locale, skip_lectures, quality, keep_vtt, skip_hls, print_info, load_from_file, save_to_file, concurrent_connections, use_h265, h265_crf, ffmpeg_preset, iknowwhatimdoing, h265_encoder, ffmpeg_framerate, disable_ipv6 course_url = args.course_url if args.download_assets: dl_assets = True if args.lang: caption_locale = args.lang if args.download_captions: dl_captions = True if args.skip_lectures: skip_lectures = True if args.quality: quality = args.quality if args.keep_vtt: keep_vtt = args.keep_vtt if args.skip_hls: skip_hls = args.skip_hls if args.print_info: print_info = args.print_info if args.load_from_file: load_from_file = args.load_from_file if args.save_to_file: save_to_file = args.save_to_file if args.concurrent_connections: concurrent_connections = args.concurrent_connections if concurrent_connections <= 0: # if the user gave a number that is less than or equal to 0, set cc to default of 10 concurrent_connections = 10 elif concurrent_connections > 30: # if the user gave a number thats greater than 30, set cc to the max of 30 concurrent_connections = 30 if args.use_h265: use_h265 = True h265_encoder = "libx265" if args.h265_crf: h265_crf = args.h265_crf print("> Selected CRF: " + h265_crf) if args.iknowwhatimdoing: iknowwhatimdoing = args.iknowwhatimdoing if args.ffmpeg_framerate: ffmpeg_framerate = args.ffmpeg_framerate print("> Selected Framerate: " + ffmpeg_framerate) if args.h265_encoder: h265_encoder = args.h265_encoder print("> Selected HEVC Encoder: " + h265_encoder) if h265_encoder == "hevc_nvenc": ffmpeg_preset = "p4" print(" > Default preset for hevc_nvenc is p4") if args.ffmpeg_preset: ffmpeg_preset = args.ffmpeg_preset print("> Selected HEVC Encoder Preset: " + ffmpeg_preset) if args.disable_ipv6: disable_ipv6 = args.disable_ipv6 if args.load_from_file: print( "> 'load_from_file' was specified, data will be loaded from json files instead of fetched" ) if args.save_to_file: print( "> 'save_to_file' was specified, data will be saved to json files") if args.bearer_token: bearer_token = args.bearer_token else: bearer_token = os.getenv("UDEMY_BEARER") def ensure_dependencies_installed(): aria_ret_val = check_for_aria() if not aria_ret_val: print("> Aria2c is missing from your system or path!") sys.exit(1) ffmpeg_ret_val = check_for_ffmpeg() if not ffmpeg_ret_val: print("> FFMPEG is missing from your system or path!") sys.exit(1) mp4decrypt_ret_val = check_for_mp4decrypt() if not mp4decrypt_ret_val: print( "> MP4Decrypt is missing from your system or path! (This is part of Bento4 tools)" ) sys.exit(1) def check_dirs(): if not os.path.exists(saved_dir): os.makedirs(saved_dir) if not os.path.exists(download_dir): os.makedirs(download_dir) def try_load_keys(): global keys f = open(keyfile_path, encoding="utf8", mode='r') keys = json.loads(f.read()) def try_load_cookies(): global cookies f = open(cookiefile_path, encoding="utf8", mode='r') cookies = f.read() def UdemyDownloader(): global udemy, course, resource # loads the .env file load_dotenv() check_dirs() # Creates a new parser and sets up the arguments setup_parser() # parses the arguments and sets all the variables args = parser.parse_args() process_args(args=args) if not os.path.exists(cookiefile_path): print("No cookies.txt file was found, you won't be able to download subscription courses! You can ignore ignore this if you don't plan to download a course included in a subscription plan.") else: try_load_cookies() # warn that the keyfile is not found if not os.path.exists(keyfile_path): print("!!! Keyfile not found! This means you probably didn't rename the keyfile correctly, DRM lecture decryption will fail! If you aren't downloading DRM encrypted courses, you can ignore this message. !!!") if not iknowwhatimdoing: print("Waiting for 10 seconds...") time.sleep(10) else: try_load_keys() # ensure 3rd party binaries are installed ensure_dependencies_installed() udemy = Udemy(access_token=bearer_token, cookies=cookies) print("> Fetching course information, this may take a minute...") get_course_information() if not isinstance(course_info, dict): print("> Failed to get course information") sys.exit(1) print("> Course information retrieved!") if save_to_file: with open(course_info_path, encoding="utf8", mode='w') as f: f.write(json.dumps(course_info)) print("Saved course info to file") print("> Fetching course content, this may take a minute...") get_course_content() if not isinstance(course_content, dict): print("> Failed to get course content") sys.exit(1) print("> Course content retrieved!") if save_to_file: with open(course_content_path, encoding="utf8", mode='w') as f: f.write(json.dumps(course_content)) print("Saved course content to file") course = course_content.get("results") resource = course_content.get("detail") _udemy["access_token"] = access_token _udemy["course_id"] = course_id _udemy["title"] = title _udemy["course_title"] = course_title _udemy["chapters"] = [] if resource: print("> Trying to logout") udemy.session.terminate() print("> Logged out.") if course: print("> Processing course data, this may take a minute. ") parse_data() if save_to_file: with open(_udemy_path, encoding="utf8", mode='w') as f: f.write(json.dumps(_udemy)) print("Saved parsed data to file") if print_info: _print_course_info() else: parse() if __name__ == "__main__": UdemyDownloader()