diff --git a/README.md b/README.md index 0e8ec2f..0ecb91b 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,12 @@ # Udemy Downloader with DRM support +[![forthebadge](https://forthebadge.com/images/badges/built-with-love.svg)](https://forthebadge.com) +[![forthebadge](https://forthebadge.com/images/badges/designed-in-ms-paint.svg)](https://forthebadge.com) +[![forthebadge](https://forthebadge.com/images/badges/made-with-python.svg)](https://forthebadge.com) +[![forthebadge](https://forthebadge.com/images/badges/approved-by-george-costanza.svg)](https://forthebadge.com) +![GitHub forks](https://img.shields.io/github/forks/Puyodead1/udemy-downloader?style=for-the-badge) +![GitHub Repo stars](https://img.shields.io/github/stars/Puyodead1/udemy-downloader?style=for-the-badge) ![GitHub](https://img.shields.io/github/license/Puyodead1/udemy-downloader?style=for-the-badge) -![GitHub top language](https://img.shields.io/github/languages/top/Puyodead1/udemy-downloader?style=for-the-badge) -![Codacy grade](https://img.shields.io/codacy/grade/e14b03f576ab4b1897624dcdf6dd9557?style=for-the-badge) # NOTE diff --git a/main.py b/main.py index c13c0fa..27cfe4b 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,6 @@ -import os, requests, json, glob, argparse, sys, re, time, asyncio, json, cloudscraper, m3u8 +import os, requests, shutil, json, glob, urllib.request, argparse, sys, datetime +from sanitize_filename import sanitize +import urllib.request from tqdm import tqdm from dotenv import load_dotenv from mpegdash.parser import MPEGDASHParser @@ -10,728 +12,19 @@ from sanitize import sanitize, slugify, SLUG_OK from pyffmpeg import FFMPeg as FFMPEG import subprocess -home_dir = os.getcwd() +course_id = None +header_bearer = None download_dir = os.path.join(os.getcwd(), "out_dir") -working_dir = os.path.join(os.getcwd(), "working_dir") -keyfile_path = os.path.join(os.getcwd(), "keyfile.json") +working_dir = os.path.join(os.getcwd(), "working_dir") # set the folder to download segments for DRM videos retry = 3 -downloader = None -HEADERS = { - "Origin": "www.udemy.com", - "User-Agent": - "Mozilla/5.0 (Windows NT 6.3; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0", - "Accept": "*/*", - "Accept-Encoding": None, -} -LOGIN_URL = "https://www.udemy.com/join/login-popup/?ref=&display_type=popup&loc" -LOGOUT_URL = "https://www.udemy.com/user/logout" -COURSE_URL = "https://{portal_name}.udemy.com/api-2.0/courses/{course_id}/cached-subscriber-curriculum-items?fields[asset]=results,title,external_url,time_estimation,download_urls,slide_urls,filename,asset_type,captions,media_license_token,course_is_drmed,media_sources,stream_urls,body&fields[chapter]=object_index,title,sort_order&fields[lecture]=id,title,object_index,asset,supplementary_assets,view_html&page_size=10000" -COURSE_SEARCH = "https://{portal_name}.udemy.com/api-2.0/users/me/subscribed-courses?fields[course]=id,url,title,published_title&page=1&page_size=500&search={course_name}" -SUBSCRIBED_COURSES = "https://www.udemy.com/api-2.0/users/me/subscribed-courses/?ordering=-last_accessed&fields[course]=id,title,url&page=1&page_size=12" -MY_COURSES_URL = "https://{portal_name}.udemy.com/api-2.0/users/me/subscribed-courses?fields[course]=id,url,title,published_title&ordering=-last_accessed,-access_time&page=1&page_size=10000" -COLLECTION_URL = "https://{portal_name}.udemy.com/api-2.0/users/me/subscribed-courses-collections/?collection_has_courses=True&course_limit=20&fields[course]=last_accessed_time,title,published_title&fields[user_has_subscribed_courses_collection]=@all&page=1&page_size=1000" - - -def _clean(text): - ok = re.compile(r'[^\\/:*?"<>|]') - text = "".join(x if ok.match(x) else "_" for x in text) - text = re.sub(r"\.+$", "", text.strip()) - return text - - -def _sanitize(self, unsafetext): - text = sanitize( - slugify(unsafetext, lower=False, spaces=True, ok=SLUG_OK + "().[]")) - return text - - -class Udemy: - def __init__(self, access_token): - self.session = None - self.access_token = None - self.auth = UdemyAuth(cache_session=False) - if not self.session: - self.session, self.access_token = self.auth.authenticate( - access_token=access_token) - - if self.session and self.access_token: - self.session._headers.update( - {"Authorization": "Bearer {}".format(self.access_token)}) - self.session._headers.update({ - "X-Udemy-Authorization": - "Bearer {}".format(self.access_token) - }) - print("Login Success") - else: - print("Login Failure!") - sys.exit(1) - - def _extract_supplementary_assets(self, supp_assets): - _temp = [] - for entry in supp_assets: - title = _clean(entry.get("title")) - filename = entry.get("filename") - download_urls = entry.get("download_urls") - external_url = entry.get("external_url") - asset_type = entry.get("asset_type").lower() - if asset_type == "file": - if download_urls and isinstance(download_urls, dict): - extension = filename.rsplit( - ".", 1)[-1] if "." in filename else "" - download_url = download_urls.get("File", [])[0].get("file") - _temp.append({ - "type": "file", - "title": title, - "filename": filename, - "extension": extension, - "download_url": download_url, - }) - elif asset_type == "sourcecode": - if download_urls and isinstance(download_urls, dict): - extension = filename.rsplit( - ".", 1)[-1] if "." in filename else "" - download_url = download_urls.get("SourceCode", - [])[0].get("file") - _temp.append({ - "type": "source_code", - "title": title, - "filename": filename, - "extension": extension, - "download_url": download_url, - }) - elif asset_type == "externallink": - _temp.append({ - "type": "external_link", - "title": title, - "filename": filename, - "extension": "txt", - "download_url": external_url, - }) - return _temp - - def _extract_ppt(self, assets): - _temp = [] - download_urls = assets.get("download_urls") - filename = assets.get("filename") - if download_urls and isinstance(download_urls, dict): - extension = filename.rsplit(".", 1)[-1] if "." in filename else "" - download_url = download_urls.get("Presentation", [])[0].get("file") - _temp.append({ - "type": "presentation", - "filename": filename, - "extension": extension, - "download_url": download_url, - }) - return _temp - - def _extract_file(self, assets): - _temp = [] - download_urls = assets.get("download_urls") - filename = assets.get("filename") - if download_urls and isinstance(download_urls, dict): - extension = filename.rsplit(".", 1)[-1] if "." in filename else "" - download_url = download_urls.get("File", [])[0].get("file") - _temp.append({ - "type": "file", - "filename": filename, - "extension": extension, - "download_url": download_url, - }) - return _temp - - def _extract_ebook(self, assets): - _temp = [] - download_urls = assets.get("download_urls") - filename = assets.get("filename") - if download_urls and isinstance(download_urls, dict): - extension = filename.rsplit(".", 1)[-1] if "." in filename else "" - download_url = download_urls.get("E-Book", [])[0].get("file") - _temp.append({ - "type": "ebook", - "filename": filename, - "extension": extension, - "download_url": download_url, - }) - return _temp - - def _extract_audio(self, assets): - _temp = [] - download_urls = assets.get("download_urls") - filename = assets.get("filename") - if download_urls and isinstance(download_urls, dict): - extension = filename.rsplit(".", 1)[-1] if "." in filename else "" - download_url = download_urls.get("Audio", [])[0].get("file") - _temp.append({ - "type": "audio", - "filename": filename, - "extension": extension, - "download_url": download_url, - }) - return _temp - - def _extract_sources(self, sources, skip_hls): - _temp = [] - if sources and isinstance(sources, list): - for source in sources: - label = source.get("label") - download_url = source.get("file") - if not download_url: - continue - if label.lower() == "audio": - continue - height = label if label else None - if height == "2160": - width = "3840" - elif height == "1440": - width = "2560" - elif height == "1080": - width = "1920" - elif height == "720": - width = "1280" - elif height == "480": - width = "854" - elif height == "360": - width = "640" - elif height == "240": - width = "426" - else: - width = "256" - if (source.get("type") == "application/x-mpegURL" - or "m3u8" in download_url): - if not skip_hls: - out = self._extract_m3u8(download_url) - if out: - _temp.extend(out) - else: - _type = source.get("type") - _temp.append({ - "type": "video", - "height": height, - "width": width, - "extension": _type.replace("video/", ""), - "download_url": download_url, - }) - return _temp - - def _extract_media_sources(self, sources): - _audio = [] - _video = [] - if sources and isinstance(sources, list): - for source in sources: - _type = source.get("type") - src = source.get("src") - - if _type == "application/dash+xml": - video, audio = self._extract_mpd(src) - if video and audio: - _video.extend(video) - _audio.extend(audio) - return (_video, _audio) - - def _extract_subtitles(self, tracks): - _temp = [] - if tracks and isinstance(tracks, list): - for track in tracks: - if not isinstance(track, dict): - continue - if track.get("_class") != "caption": - continue - download_url = track.get("url") - if not download_url or not isinstance(download_url, str): - continue - lang = (track.get("language") or track.get("srclang") - or track.get("label") - or track["locale_id"].split("_")[0]) - ext = "vtt" if "vtt" in download_url.rsplit(".", - 1)[-1] else "srt" - _temp.append({ - "type": "subtitle", - "language": lang, - "extension": ext, - "download_url": download_url, - }) - return _temp - - def _extract_m3u8(self, url): - """extracts m3u8 streams""" - _temp = [] - try: - resp = self.session._get(url) - resp.raise_for_status() - raw_data = resp.text - m3u8_object = m3u8.loads(raw_data) - playlists = m3u8_object.playlists - seen = set() - for pl in playlists: - resolution = pl.stream_info.resolution - codecs = pl.stream_info.codecs - if not resolution: - continue - if not codecs: - continue - width, height = resolution - download_url = pl.uri - if height not in seen: - seen.add(height) - _temp.append({ - "type": "hls", - "height": height, - "width": width, - "extension": "mp4", - "download_url": download_url, - }) - except Exception as error: - print(f"Udemy Says : '{error}' while fetching hls streams..") - return _temp - - def _extract_mpd(self, url): - """extract mpd streams""" - _video = [] - _audio = [] - try: - resp = self.session._get(url) - resp.raise_for_status() - raw_data = resp.text - mpd_object = MPEGDASHParser.parse(raw_data) - seen = set() - for period in mpd_object.periods: - for adapt_set in period.adaptation_sets: - content_type = adapt_set.mime_type - if content_type == "video/mp4": - for rep in adapt_set.representations: - for segment in rep.segment_templates: - segment_count = 1 - timeline = segment.segment_timelines[0] - segment_count += len(timeline.Ss) - for s in timeline.Ss: - if s.r: - segment_count += s.r - - segment_extension = segment.media.split( - ".")[-1] - height = rep.height - width = rep.width - - if height not in seen: - seen.add(height) - _video.append({ - "type": - "dash", - "content_type": - "video", - "height": - height, - "width": - width, - "extension": - segment_extension, - "segment_count": - segment_count, - "media": - segment.media, - "initialization": - segment.initialization - }) - elif content_type == "audio/mp4": - for rep in adapt_set.representations: - for segment in rep.segment_templates: - segment_count = 1 - timeline = segment.segment_timelines[0] - segment_count += len(timeline.Ss) - for s in timeline.Ss: - if s.r: - segment_count += s.r - - segment_extension = segment.media.split( - ".")[-1] - - _audio.append({ - "type": - "dash", - "content_type": - "audio", - "extension": - segment_extension, - "segment_count": - segment_count, - "media": - segment.media, - "initialization": - segment.initialization - }) - except Exception as error: - print(f"Udemy Says : '{error}' while fetching mpd manifest") - return (_video, _audio) - - def extract_course_name(self, url): - """ - @author r0oth3x49 - """ - obj = re.search( - r"(?i)(?://(?P.+?).udemy.com/(?:course(/draft)*/)?(?P[a-zA-Z0-9_-]+))", - url, - ) - if obj: - return obj.group("portal_name"), obj.group("name_or_id") - - def _subscribed_courses(self, portal_name, course_name): - results = [] - self.session._headers.update({ - "Host": - "{portal_name}.udemy.com".format(portal_name=portal_name), - "Referer": - "https://{portal_name}.udemy.com/home/my-courses/search/?q={course_name}" - .format(portal_name=portal_name, course_name=course_name), - }) - url = COURSE_SEARCH.format(portal_name=portal_name, - course_name=course_name) - try: - webpage = self.session._get(url).json() - except conn_error as error: - print(f"Udemy Says: Connection error, {error}") - time.sleep(0.8) - sys.exit(0) - except (ValueError, Exception) as error: - print(f"Udemy Says: {error} on {url}") - time.sleep(0.8) - sys.exit(0) - else: - results = webpage.get("results", []) - return results - - def _extract_course_json(self, url, course_id, portal_name): - self.session._headers.update({"Referer": url}) - url = COURSE_URL.format(portal_name=portal_name, course_id=course_id) - try: - resp = self.session._get(url) - if resp.status_code in [502, 503]: - print( - "> The course content is large, using large content extractor..." - ) - resp = self._extract_large_course_content(url=url) - else: - resp = resp.json() - except conn_error as error: - print(f"Udemy Says: Connection error, {error}") - time.sleep(0.8) - sys.exit(0) - except (ValueError, Exception): - resp = self._extract_large_course_content(url=url) - return resp - else: - return resp - - def _extract_large_course_content(self, url): - url = url.replace("10000", "50") if url.endswith("10000") else url - try: - data = self.session._get(url).json() - except conn_error as error: - print(f"Udemy Says: Connection error, {error}") - time.sleep(0.8) - sys.exit(0) - else: - _next = data.get("next") - while _next: - print("Downloading course information.. ") - try: - resp = self.session._get(_next).json() - except conn_error as error: - print(f"Udemy Says: Connection error, {error}") - time.sleep(0.8) - sys.exit(0) - else: - _next = resp.get("next") - results = resp.get("results") - if results and isinstance(results, list): - for d in resp["results"]: - data["results"].append(d) - return data - - def __extract_course(self, response, course_name): - _temp = {} - if response: - for entry in response: - course_id = str(entry.get("id")) - published_title = entry.get("published_title") - if course_name in (published_title, course_id): - _temp = entry - break - return _temp - - def _my_courses(self, portal_name): - results = [] - try: - url = MY_COURSES_URL.format(portal_name=portal_name) - webpage = self.session._get(url).json() - except conn_error as error: - print(f"Udemy Says: Connection error, {error}") - time.sleep(0.8) - sys.exit(0) - except (ValueError, Exception) as error: - print(f"Udemy Says: {error}") - time.sleep(0.8) - sys.exit(0) - else: - results = webpage.get("results", []) - return results - - def _subscribed_collection_courses(self, portal_name): - url = COLLECTION_URL.format(portal_name=portal_name) - courses_lists = [] - try: - webpage = self.session._get(url).json() - except conn_error as error: - print(f"Udemy Says: Connection error, {error}") - time.sleep(0.8) - sys.exit(0) - except (ValueError, Exception) as error: - print(f"Udemy Says: {error}") - time.sleep(0.8) - sys.exit(0) - else: - results = webpage.get("results", []) - if results: - [ - courses_lists.extend(courses.get("courses", [])) - for courses in results if courses.get("courses", []) - ] - return courses_lists - - def _archived_courses(self, portal_name): - results = [] - try: - url = MY_COURSES_URL.format(portal_name=portal_name) - url = f"{url}&is_archived=true" - webpage = self.session._get(url).json() - except conn_error as error: - print(f"Udemy Says: Connection error, {error}") - time.sleep(0.8) - sys.exit(0) - except (ValueError, Exception) as error: - print(f"Udemy Says: {error}") - time.sleep(0.8) - sys.exit(0) - else: - results = webpage.get("results", []) - return results - - def _extract_course_info(self, url): - portal_name, course_name = self.extract_course_name(url) - course = {} - results = self._subscribed_courses(portal_name=portal_name, - course_name=course_name) - course = self.__extract_course(response=results, - course_name=course_name) - if not course: - results = self._my_courses(portal_name=portal_name) - course = self.__extract_course(response=results, - course_name=course_name) - if not course: - results = self._subscribed_collection_courses( - portal_name=portal_name) - course = self.__extract_course(response=results, - course_name=course_name) - if not course: - results = self._archived_courses(portal_name=portal_name) - course = self.__extract_course(response=results, - course_name=course_name) - - if course: - course.update({"portal_name": portal_name}) - return course.get("id"), course - if not course: - print("Downloading course information, course id not found .. ") - print( - "It seems either you are not enrolled or you have to visit the course atleast once while you are logged in.", - ) - print("Trying to logout now...", ) - self.session.terminate() - print("Logged out successfully.", ) - sys.exit(0) - - -class Session(object): - def __init__(self): - self._headers = HEADERS - self._session = requests.sessions.Session() - - def _set_auth_headers(self, access_token="", client_id=""): - self._headers["Authorization"] = "Bearer {}".format(access_token) - self._headers["X-Udemy-Authorization"] = "Bearer {}".format( - access_token) - - def _get(self, url): - session = self._session.get(url, headers=self._headers) - if session.ok or session.status_code in [502, 503]: - return session - if not session.ok: - raise Exception(f"{session.status_code} {session.reason}") - - def _post(self, url, data, redirect=True): - session = self._session.post(url, - data, - headers=self._headers, - allow_redirects=redirect) - if session.ok: - return session - if not session.ok: - raise Exception(f"{session.status_code} {session.reason}") - - def terminate(self): - self._set_auth_headers() - return - - -# Thanks to a great open source utility youtube-dl .. -class HTMLAttributeParser(compat_HTMLParser): # pylint: disable=W - """Trivial HTML parser to gather the attributes for a single element""" - def __init__(self): - self.attrs = {} - compat_HTMLParser.__init__(self) - - def handle_starttag(self, tag, attrs): - self.attrs = dict(attrs) - - -def extract_attributes(html_element): - """Given a string for an HTML element such as - - Decode and return a dictionary of attributes. - { - 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz', - 'empty': '', 'noval': None, 'entity': '&', - 'sq': '"', 'dq': '\'' - }. - NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions, - but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5. - """ - parser = HTMLAttributeParser() - try: - parser.feed(html_element) - parser.close() - except Exception: # pylint: disable=W - pass - return parser.attrs - - -def hidden_inputs(html): - html = re.sub(r"", "", html) - hidden_inputs = {} # pylint: disable=W - for entry in re.findall(r"(?i)(]+>)", html): - attrs = extract_attributes(entry) - if not entry: - continue - if attrs.get("type") not in ("hidden", "submit"): - continue - name = attrs.get("name") or attrs.get("id") - value = attrs.get("value") - if name and value is not None: - hidden_inputs[name] = value - return hidden_inputs - - -def search_regex(pattern, - string, - name, - default=object(), - fatal=True, - flags=0, - group=None): - """ - Perform a regex search on the given string, using a single or a list of - patterns returning the first matching group. - In case of failure return a default value or raise a WARNING or a - RegexNotFoundError, depending on fatal, specifying the field name. - """ - if isinstance(pattern, str): - mobj = re.search(pattern, string, flags) - else: - for p in pattern: - mobj = re.search(p, string, flags) - if mobj: - break - - _name = name - - if mobj: - if group is None: - # return the first matching group - return next(g for g in mobj.groups() if g is not None) - else: - return mobj.group(group) - elif default is not object(): - return default - elif fatal: - print("[-] Unable to extract %s" % _name) - exit(0) - else: - print("[-] unable to extract %s" % _name) - exit(0) - - -class UdemyAuth(object): - def __init__(self, username="", password="", cache_session=False): - self.username = username - self.password = password - self._cache = cache_session - self._session = Session() - self._cloudsc = cloudscraper.create_scraper() - - def _form_hidden_input(self, form_id): - try: - resp = self._cloudsc.get(LOGIN_URL) - resp.raise_for_status() - webpage = resp.text - except conn_error as error: - raise error - else: - login_form = hidden_inputs( - search_regex( - r'(?is)]+?id=(["\'])%s\1[^>]*>(?P
.+?)
' - % form_id, - webpage, - "%s form" % form_id, - group="form", - )) - login_form.update({ - "email": self.username, - "password": self.password - }) - return login_form - - def authenticate(self, access_token="", client_id=""): - if not access_token and not client_id: - data = self._form_hidden_input(form_id="login-form") - self._cloudsc.headers.update({"Referer": LOGIN_URL}) - auth_response = self._cloudsc.post(LOGIN_URL, - data=data, - allow_redirects=False) - auth_cookies = auth_response.cookies - - access_token = auth_cookies.get("access_token", "") - client_id = auth_cookies.get("client_id", "") - - if access_token: - # dump cookies to configs - # if self._cache: - # _ = to_configs( - # username=self.username, - # password=self.password, - # cookies=f"access_token={access_token}", - # ) - self._session._set_auth_headers(access_token=access_token, - client_id=client_id) - self._session._session.cookies.update( - {"access_token": access_token}) - return self._session, access_token - else: - self._session._set_auth_headers() - return None, None - +home_dir = os.getcwd() +keyfile_path = os.path.join(os.getcwd(), "keyfile.json") +dl_assets = False +dl_captions = False +skip_lectures = False +caption_locale = "en" +quality = None # None will download the best possible +valid_qualities = [144, 360, 480, 720, 1080] if not os.path.exists(working_dir): os.makedirs(working_dir) @@ -771,6 +64,58 @@ def durationtoseconds(period): return None +def download_media(filename, url, lecture_working_dir, epoch=0): + if (os.path.isfile(filename)): + print("Segment already downloaded.. skipping..") + else: + media = requests.get(url, stream=True) + media_length = int(media.headers.get("content-length")) + if media.status_code == 200: + if (os.path.isfile(filename) + and os.path.getsize(filename) >= media_length): + print("Segment already downloaded.. skipping write to disk..") + else: + try: + pbar = tqdm(total=media_length, + initial=0, + unit='B', + unit_scale=True, + desc=filename) + with open(os.path.join(lecture_working_dir, filename), + 'wb') as video_file: + for chunk in media.iter_content(chunk_size=1024): + if chunk: + video_file.write(chunk) + pbar.update(1024) + pbar.close() + print("Segment downloaded: " + filename) + return False #Successfully downloaded the file + except: + print( + "Connection error: Reattempting download of segment..") + download_media(filename, url, lecture_working_dir, + epoch + 1) + + if os.path.getsize(filename) >= media_length: + pass + else: + print("Segment is faulty.. Redownloading...") + download_media(filename, url, lecture_working_dir, epoch + 1) + elif (media.status_code == 404): + print("Probably end hit!\n", url) + return True #Probably hit the last of the file + else: + if (epoch > retry): + exit("Error fetching segment, exceeded retry times.") + print("Error fetching segment file.. Redownloading...") + download_media(filename, url, lecture_working_dir, epoch + 1) + + +""" +@author Jayapraveen +""" + + def cleanup(path): """ @author Jayapraveen @@ -784,20 +129,17 @@ def cleanup(path): os.removedirs(path) -def mux_process(video_title, lecture_working_dir, output_path): - """ - @author Jayapraveen - """ +""" +@author Jayapraveen +""" + + +def mux_process(video_title, lecture_working_dir, outfile): + time_stamp = datetime.datetime.now().isoformat()+'Z' if os.name == "nt": - command = "ffmpeg -y -i \"{}\" -i \"{}\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{}\" \"{}\"".format( - os.path.join(lecture_working_dir, "decrypted_audio.mp4"), - os.path.join(lecture_working_dir, "decrypted_video.mp4"), - video_title, output_path) + command = f"ffmpeg -y -i \"{lecture_working_dir}\\decrypted_audio.mp4\" -i \"{lecture_working_dir}\\decrypted_video.mp4\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{video_title}\" -metadata creation_time=\"{time_stamp}\" \"{outfile}\"" else: - command = "nice -n 7 ffmpeg -y -i \"{}\" -i \"{}\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{}\" \"{}\"".format( - os.path.join(lecture_working_dir, "decrypted_audio.mp4"), - os.path.join(lecture_working_dir, "decrypted_video.mp4"), - video_title, output_path) + command = f"nice -n 7 ffmpeg -y -i \"{lecture_working_dir}//decrypted_audio.mp4\" -i \"{lecture_working_dir}//decrypted_video.mp4\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{video_title}\" -metadata creation_time=\"{time_stamp}\" \"{outfile}\"" os.system(command) @@ -808,56 +150,16 @@ def decrypt(kid, filename, lecture_working_dir): print("> Decrypting, this might take a minute...") try: key = keyfile[kid.lower()] - if (os.name == "nt"): - os.system(f"mp4decrypt --key 1:%s \"%s\" \"%s\"" % - (key, - os.path.join(lecture_working_dir, - "encrypted_{}.mp4".format(filename)), - os.path.join(lecture_working_dir, - "decrypted_{}.mp4".format(filename)))) - else: - os.system(f"nice -n 7 mp4decrypt --key 1:%s \"%s\" \"%s\"" % - (key, - os.path.join(lecture_working_dir, - "encrypted_{}.mp4".format(filename)), - os.path.join(lecture_working_dir, - "decrypted_{}.mp4".format(filename)))) - print("> Decryption complete") - except KeyError: - raise KeyError("Key not found") - - -def handle_segments(video_source, audio_source, video_title, - lecture_working_dir, output_path): - """ - @author Jayapraveen - """ - no_vid_segments = video_source.get("segment_count") - no_aud_segments = audio_source.get("segment_count") - - audio_media = audio_source.get("media") - audio_init = audio_source.get("initialization") - audio_extension = audio_source.get("extension") - - video_media = video_source.get("media") - video_init = video_source.get("initialization") - video_extension = video_source.get("extension") - - audio_urls = audio_init + "\n dir={}\n out=audio_0.mp4\n".format( - lecture_working_dir) - video_urls = video_init + "\n dir={}\n out=video_0.mp4\n".format( - lecture_working_dir) - - list_path = os.path.join(lecture_working_dir, "list.txt") - - for i in range(1, no_aud_segments): - audio_urls += audio_media.replace( - "$Number$", str(i)) + "\n dir={}\n out=audio_{}.mp4\n".format( - lecture_working_dir, i) - for i in range(1, no_vid_segments): - video_urls += video_media.replace( - "$Number$", str(i)) + "\n dir={}\n out=video_{}.mp4\n".format( - lecture_working_dir, i) + except KeyError as error: + exit("Key not found") + if (os.name == "nt"): + os.system( + f"mp4decrypt --key 1:{key} \"{lecture_working_dir}\\encrypted_{filename}.mp4\" \"{lecture_working_dir}\\decrypted_{filename}.mp4\"" + ) + else: + os.system( + f"nice -n 7 mp4decrypt --key 1:{key} \"{lecture_working_dir}//encrypted_{filename}.mp4\" \"{lecture_working_dir}//decrypted_{filename}.mp4\"" + ) with open(list_path, 'w') as f: f.write("{}\n{}".format(audio_urls, video_urls)) @@ -872,12 +174,14 @@ def handle_segments(video_source, audio_source, video_title, print("Return code: " + str(ret_code)) - os.remove(list_path) - - video_kid = extract_kid(os.path.join(lecture_working_dir, "video_0.mp4")) +def handle_irregular_segments(media_info, video_title, lecture_working_dir, + output_path): + no_segment, video_url, video_init, video_extension, no_segment, audio_url, audio_init, audio_extension = media_info + download_media("video_0.seg.mp4", video_init, lecture_working_dir) + video_kid = extract_kid(os.path.join(lecture_working_dir, "video_0.seg.mp4")) print("KID for video file is: " + video_kid) - - audio_kid = extract_kid(os.path.join(lecture_working_dir, "audio_0.mp4")) + download_media("audio_0.seg.mp4", audio_init, lecture_working_dir) + audio_kid = extract_kid(os.path.join(lecture_working_dir, "audio_0.seg.mp4")) print("KID for audio file is: " + audio_kid) os.chdir(lecture_working_dir) @@ -982,25 +286,15 @@ def download(url, path, filename): return file_size -def download_aria(url, file_dir, filename): - """ - @author Puyodead1 - """ - print(" > Downloading File...") - ret_code = subprocess.Popen([ - "aria2c", url, "-o", filename, "-d", file_dir, "-j16", "-s20", "-x16", - "-c", "--auto-file-renaming=false", "--summary-interval=0" - ]).wait() - print(" > File Downloaded") - - print("Return code: " + str(ret_code)) - - -def process_caption(caption, lecture_title, lecture_dir, keep_vtt, tries=0): - filename = f"%s_%s.%s" % (sanitize(lecture_title), caption.get("language"), - caption.get("extension")) - filename_no_ext = f"%s_%s" % (sanitize(lecture_title), - caption.get("language")) +def process_caption(caption, + lecture_index, + lecture_title, + lecture_dir, + tries=0): + filename = f"%s. %s_%s.%s" % (lecture_index, sanitize(lecture_title), + caption.get("locale_id"), caption.get("ext")) + filename_no_ext = f"%s. %s_%s" % (lecture_index, sanitize(lecture_title), + caption.get("locale_id")) filepath = os.path.join(lecture_dir, filename) if os.path.isfile(filepath): @@ -1064,15 +358,10 @@ def process_lecture(lecture, lecture_path, lecture_dir, quality, access_token): else: print(f" > Lecture '%s' is missing media links" % lecture_title) - print(len(lecture_audio_sources), len(lecture_video_sources)) - else: - sources = lecture.get("sources") - sources = sorted(sources, - key=lambda x: int(x.get("height")), - reverse=True) - if sources: - lecture_working_dir = os.path.join(working_dir, - str(lecture.get("asset_id"))) + lecture_working_dir = os.path.join( + working_dir, str(lecture_asset["id"]) + ) # set the folder to download ephemeral files + media_sources = lecture_asset["media_sources"] if not os.path.exists(lecture_working_dir): os.mkdir(lecture_working_dir) if not os.path.isfile(lecture_path): @@ -1145,9 +434,9 @@ def parse_new(_udemy, quality, skip_lectures, dl_assets, dl_captions, lecture_path = os.path.join( chapter_dir, "{}.html".format(sanitize(lecture_title))) try: - with open(lecture_path, 'w') as f: - f.write(html_content) - f.close() + download(download_url, + os.path.join(lecture_dir, asset_filename), + asset_filename) except Exception as e: print(" > Failed to write html file: ", e) continue @@ -1171,41 +460,44 @@ def parse_new(_udemy, quality, skip_lectures, dl_assets, dl_captions, print( "If you're seeing this message, that means that you reached a secret area that I haven't finished! jk I haven't implemented handling for this asset type, please report this at https://github.com/Puyodead1/udemy-downloader/issues so I can add it. When reporting, please provide the following information: " ) - print("AssetType: Article; AssetData: ", asset) - # html_content = lecture.get("html_content") - # lecture_path = os.path.join( - # chapter_dir, "{}.html".format(sanitize(lecture_title))) - # try: - # with open(lecture_path, 'w') as f: - # f.write(html_content) - # f.close() - # except Exception as e: - # print("Failed to write html file: ", e) - # continue - elif asset_type == "video": - print( - "If you're seeing this message, that means that you reached a secret area that I haven't finished! jk I haven't implemented handling for this asset type, please report this at https://github.com/Puyodead1/udemy-downloader/issues so I can add it. When reporting, please provide the following information: " - ) - print("AssetType: Video; AssetData: ", asset) - elif asset_type == "audio" or asset_type == "e-book" or asset_type == "file" or asset_type == "presentation": - try: - download_aria(download_url, chapter_dir, filename) - except Exception as e: - print("> Error downloading asset: ", e) - continue - elif asset_type == "external_link": - filepath = os.path.join(chapter_dir, filename) - savedirs, name = os.path.split(filepath) - filename = u"external-assets-links.txt" - filename = os.path.join(savedirs, filename) - file_data = [] - if os.path.isfile(filename): - file_data = [ - i.strip().lower() - for i in open(filename, - encoding="utf-8", - errors="ignore") if i - ] + continue + elif asset["asset_type"] == "Article": + assets.append(asset) + asset_path = os.path.join(lecture_dir, + sanitize(lecture_title)) + with open(asset_path, 'w') as f: + f.write(asset["body"]) + elif asset["asset_type"] == "ExternalLink": + assets.append(asset) + asset_path = os.path.join(lecture_dir, f"{lecture_index}. External URLs.txt") + with open(asset_path, 'a') as f: + f.write(f"%s : %s\n" % + (asset["title"], asset["external_url"])) + print("> Found %s assets for lecture '%s'" % + (len(assets), lecture_title)) + + # process captions + if dl_captions: + captions = [] + for caption in lecture_asset.get("captions"): + if not isinstance(caption, dict): + continue + if caption.get("_class") != "caption": + continue + download_url = caption.get("url") + if not download_url or not isinstance(download_url, str): + continue + lang = (caption.get("language") or caption.get("srclang") + or caption.get("label") + or caption.get("locale_id").split("_")[0]) + ext = "vtt" if "vtt" in download_url.rsplit(".", 1)[-1] else "srt" + if caption_locale == "all" or caption_locale == lang: + captions.append({ + "language": lang, + "locale_id": caption.get("locale_id"), + "ext": ext, + "url": download_url + }) content = u"\n{}\n{}\n".format(name, download_url) if name.lower() not in file_data: @@ -1225,6 +517,27 @@ def parse_new(_udemy, quality, skip_lectures, dl_assets, dl_captions, process_caption(subtitle, lecture_title, chapter_dir, keep_vtt) +def parse(data): + course_dir = os.path.join(download_dir, course_id) + if not os.path.exists(course_dir): + os.mkdir(course_dir) + chapters = [] + lectures = [] + + for obj in data: + if obj["_class"] == "chapter": + obj["lectures"] = [] + chapters.append(obj) + elif obj["_class"] == "lecture" and obj["asset"][ + "asset_type"] == "Video": + try: + chapters[-1]["lectures"].append(obj) + except IndexError: + # This is caused by there not being a starting chapter + lectures.append(obj) + lecture_index = lectures.index(obj) + 1 + lecture_path = os.path.join(course_dir, f'{lecture_index}. {sanitize(obj["title"])}.mp4') + process_lecture(obj, lecture_index, lecture_path, download_dir) def course_info(course_data): print("\n\n\n\n") @@ -1239,60 +552,17 @@ def course_info(course_data): chapters = course_data.get("chapters") for chapter in chapters: - chapter_title = chapter.get("chapter_title") - chapter_index = chapter.get("chapter_index") - chapter_lecture_count = chapter.get("lecture_count") - chapter_lectures = chapter.get("lectures") + chapter_dir = os.path.join(course_dir, f'{chapters.index(chapter) + 1}. {sanitize(chapter["title"])}') + if not os.path.exists(chapter_dir): + os.mkdir(chapter_dir) - print("> Chapter: {} ({} of {})".format(chapter_title, chapter_index, - chapter_count)) - - for lecture in chapter_lectures: - lecture_title = lecture.get("lecture_title") - lecture_index = lecture.get("index") - lecture_asset_count = lecture.get("assets_count") - lecture_is_encrypted = lecture.get("is_encrypted") - lecture_subtitles = lecture.get("subtitles") - lecture_extension = lecture.get("extension") - lecture_sources = lecture.get("sources") - lecture_video_sources = lecture.get("video_sources") - - if lecture_sources: - lecture_sources = sorted(lecture.get("sources"), - key=lambda x: int(x.get("height")), - reverse=True) - if lecture_video_sources: - lecture_video_sources = sorted( - lecture.get("video_sources"), - key=lambda x: int(x.get("height")), - reverse=True) - - if lecture_is_encrypted: - lecture_qualities = [ - "{}@{}x{}".format(x.get("type"), x.get("width"), - x.get("height")) - for x in lecture_video_sources - ] - elif not lecture_is_encrypted and lecture_sources: - lecture_qualities = [ - "{}@{}x{}".format(x.get("type"), x.get("height"), - x.get("width")) for x in lecture_sources - ] - - if lecture_extension: - continue - - print(" > Lecture: {} ({} of {})".format(lecture_title, - lecture_index, - chapter_lecture_count)) - print(" > DRM: {}".format(lecture_is_encrypted)) - print(" > Asset Count: {}".format(lecture_asset_count)) - print(" > Captions: {}".format( - [x.get("language") for x in lecture_subtitles])) - print(" > Qualities: {}".format(lecture_qualities)) - - if chapter_index != chapter_count: - print("\n\n") + for lecture in chapter["lectures"]: + lecture_index = chapter["lectures"].index(lecture) + 1 + lecture_path = os.path.join(chapter_dir, f'{lecture_index}. {sanitize(lecture["title"])}.mp4') + process_lecture(lecture, lecture_index, lecture_path, chapter_dir) + print("\n\n\n\n\n\n\n\n=====================") + print("All downloads completed for course!") + print("=====================") if __name__ == "__main__": diff --git a/vtt_to_srt.py b/vtt_to_srt.py index 1e2cc28..297424e 100644 --- a/vtt_to_srt.py +++ b/vtt_to_srt.py @@ -6,8 +6,8 @@ from pysrt.srttime import SubRipTime def convert(directory, filename): index = 0 - vtt_filepath = os.path.join(directory, filename + ".vtt") - srt_filepath = os.path.join(directory, filename + ".srt") + vtt_filepath = os.path.join(directory, f"{filename}.vtt") + srt_filepath = os.path.join(directory, f"{filename}.srt") srt = open(srt_filepath, "w") for caption in WebVTT().read(vtt_filepath):