diff --git a/.env.sample b/.env.sample
index 93971fb..b5cc685 100644
--- a/.env.sample
+++ b/.env.sample
@@ -1,2 +1 @@
-UDEMY_BEARER=enter bearer token without the Bearer prefix
-UDEMY_COURSE_ID=course id goes here
\ No newline at end of file
+UDEMY_BEARER=Your bearer token here
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 1e71896..3a296b0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -119,4 +119,7 @@ test_data.json
out_dir
working_dir
manifest.mpd
-.vscode
\ No newline at end of file
+.vscode
+saved
+*.aria2
+info.py
\ No newline at end of file
diff --git a/README.md b/README.md
index 44905b2..0d8c7ef 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,5 @@
# Udemy Downloader with DRM support
+
[](https://forthebadge.com)
[](https://forthebadge.com)
[](https://forthebadge.com)
@@ -6,6 +7,7 @@



+
# NOTE
This program is WIP, the code is provided as-is and I am not held resposible for any legal issues resulting from the use of this program.
@@ -21,11 +23,11 @@ All code is licensed under the MIT license
# Description
Simple program to download a Udemy course, has support for DRM videos but requires the user to aquire the decryption key (for legal reasons).
-Current only Windows is supported but with some small modifications it should work on linux also (and maybe mac)
+Windows is the primary development OS, but I've made an effort to support linux also.
# Requirements
-1. You would need to download `ffmpeg` and `mp4decrypter`from Bento4 SDK and ensure they are in path (typing their name in cmd invokes them).
+1. You would need to download `ffmpeg`, `aria2c` and `mp4decrypter` (from Bento4 SDK) and ensure they are in path (typing their name in cmd should invoke them).
# Usage
@@ -35,96 +37,91 @@ You will need to get a few things before you can use this program:
- Decryption Key ID
- Decryption Key
-- Udemy Course ID
-- Udemy Bearer Token
+- Udemy Course URL
+- Udemy Bearer Token (aka acccess token for udemy-dl users)
### Setting up
-- rename `.env.sample` to `.env`
+- rename `.env.sample` to `.env` _(you only need to do this if you plan to use the .env file to store your bearer token)_
- rename `keyfile.example.json` to `keyfile.json`
-### Aquire bearer token
+### Aquire Bearer Token
-- open dev tools
-- go to network tab
-- in the search field, enter `api-2.0/courses`
-- 
-- click a random request
-- locate the `Request Headers` section
-- copy the the text after `Authorization`, it should look like `Bearer xxxxxxxxxxx`
-- 
-- enter this in the `.env` file after `UDEMY_BEARER=` (you can also pass this as an argument, see advanced usage for more information)
-
-### Aquire Course ID
-
-- Follow above before following this
-- locate the request url field
-- 
-- copy the number after `/api-2.0/courses/` as seen highlighed in the above picture
-- enter this in the `.env` file after `UDEMY_COURSE_ID=` (you can also pass this as an argument, see advanced usage for more information)
+- Firefox: [Udemy-DL Guide](https://github.com/r0oth3x49/udemy-dl/issues/389#issuecomment-491903900)
+- Chrome: [Udemy-DL Guide](https://github.com/r0oth3x49/udemy-dl/issues/389#issuecomment-492569372)
+- If you want to use the .env file to store your Bearer Token, edit the .env and add your token.
### Key ID and Key
-It is up to you to aquire the key and key id.
+It is up to you to aquire the key and key id. Please don't ask me for help acquiring these, decrypting DRM protected content can be considered piracy.
- Enter the key and key id in the `keyfile.json`
-- 
+- 
- 
### Start Downloading
-You can now run `python main.py` to start downloading. The course will download to `out_dir`, chapters are seperated into folders.
+You can now run the program, see the examples below. The course will download to `out_dir`.
# Advanced Usage
```
-usage: main.py [-h] [-d] [-b BEARER_TOKEN] [-c COURSE_ID] [-q QUALITY] [-l LANG] [--skip-lectures] [--download-assets] [--download-captions]
+usage: main.py [-h] -c COURSE_URL [-b BEARER_TOKEN] [-q QUALITY] [-l LANG] [--skip-lectures] [--download-assets] [--download-captions]
+ [--keep-vtt] [--skip-hls] [--info]
Udemy Downloader
optional arguments:
-h, --help show this help message and exit
- -d, --debug Use test_data.json rather than fetch from the udemy api.
+ -c COURSE_URL, --course-url COURSE_URL
+ The URL of the course to download
-b BEARER_TOKEN, --bearer BEARER_TOKEN
The Bearer token to use
- -c COURSE_ID, --course-id COURSE_ID
- The ID of the course to download
-q QUALITY, --quality QUALITY
- Download specific video quality. (144, 360, 480, 720, 1080)
- -l LANG, --lang LANG The language to download for captions (Default is en)
- --skip-lectures If specified, lectures won't be downloaded.
- --download-assets If specified, lecture assets will be downloaded.
- --download-captions If specified, captions will be downloaded.
+ Download specific video quality. If the requested quality isn't available, the closest quality will be used. If not
+ specified, the best quality will be downloaded for each lecture
+ -l LANG, --lang LANG The language to download for captions, specify 'all' to download all captions (Default is 'en')
+ --skip-lectures If specified, lectures won't be downloaded
+ --download-assets If specified, lecture assets will be downloaded
+ --download-captions If specified, captions will be downloaded
+ --keep-vtt If specified, .vtt files won't be removed
+ --skip-hls If specified, hls streams will be skipped (faster fetching) (hls streams usually contain 1080p quality for non-drm
+ lectures)
+ --info If specified, only course information will be printed, nothing will be downloaded
```
- Passing a Bearer Token and Course ID as an argument
- - `python main.py -b -c `
+ - `python main.py -c -b `
+ - `python main.py -c https://www.udemy.com/courses/myawesomecourse -b `
- Download a specific quality
- - `python main.py -q 720`
+ - `python main.py -c -q 720`
- Download assets along with lectures
- - `python main.py --download-assets`
+ - `python main.py -c --download-assets`
- Download assets and specify a quality
- - `python main.py -q 360 --download-assets`
+ - `python main.py -c -q 360 --download-assets`
- Download captions (Defaults to English)
- - `python main.py --download-captions`
+ - `python main.py -c --download-captions`
- Download captions with specific language
- - `python main.py --download-captions -l en` - English subtitles
- - `python main.py --download-captions -l es` - Spanish subtitles
- - `python main.py --download-captions -l it` - Italian subtitles
- - `python main.py --download-captions -l pl` - Polish Subtitles
- - `python main.py --download-captions -l all` - Downloads all subtitles
+ - `python main.py -c --download-captions -l en` - English subtitles
+ - `python main.py -c --download-captions -l es` - Spanish subtitles
+ - `python main.py -c --download-captions -l it` - Italian subtitles
+ - `python main.py -c --download-captions -l pl` - Polish Subtitles
+ - `python main.py -c --download-captions -l all` - Downloads all subtitles
- etc
- Skip downloading lecture videos
- - `python main.py --skip-lectures --download-captions` - Downloads only captions
- - `python main.py --skip-lectures --download-assets` - Downloads only assets
-
-# Getting an error about "Accepting the latest terms of service"?
-
-- If you are using Udemy business, you must edit `main.py` and change `udemy.com` to `.udemy.com`
+ - `python main.py -c --skip-lectures --download-captions` - Downloads only captions
+ - `python main.py -c --skip-lectures --download-assets` - Downloads only assets
+- Keep .VTT caption files:
+ - `python main.py -c --download-captions --keep-vtt`
+- Skip parsing HLS Streams (HLS streams usually contain 1080p quality for Non-DRM lectures):
+ - `python main.py -c --skip-hls`
+- Print course information only:
+ - `python main.py -c --info`
# Credits
- https://github.com/Jayapraveen/Drm-Dash-stream-downloader - For the original code which this is based on
- https://github.com/alastairmccormack/pywvpssh - For code related to PSSH extraction
-- https://github.com/alastairmccormack/pymp4parse/ - For code related to mp4 box parsing (used by pywvpssh)
+- https://github.com/alastairmccormack/pymp4parse - For code related to mp4 box parsing (used by pywvpssh)
- https://github.com/lbrayner/vtt-to-srt - For code related to converting subtitles from vtt to srt format
+- https://github.com/r0oth3x49/udemy-dl - For some of the informaton related to using the udemy api
diff --git a/dashdownloader_multisegment.py b/dashdownloader_multisegment.py
deleted file mode 100644
index c253ee9..0000000
--- a/dashdownloader_multisegment.py
+++ /dev/null
@@ -1,203 +0,0 @@
-#dashdrmmultisegmentdownloader
-import os,requests,shutil,json,glob
-from mpegdash.parser import MPEGDASHParser
-from mpegdash.nodes import Descriptor
-from mpegdash.utils import (
- parse_attr_value, parse_child_nodes, parse_node_value,
- write_attr_value, write_child_node, write_node_value
-)
-from utils import extract_kid
-
-#global ids
-retry = 3
-download_dir = os.path.join(os.getcwd(), 'out_dir') # set the folder to output
-working_dir = os.path.join(os.getcwd(), "working_dir") # set the folder to download ephemeral files
-keyfile_path = os.path.join(os.getcwd(), "keyfile.json")
-
-if not os.path.exists(working_dir):
- os.makedirs(working_dir)
-
-#Get the keys
-with open(keyfile_path,'r') as keyfile:
- keyfile = keyfile.read()
-keyfile = json.loads(keyfile)
-
-
-#Patching the Mpegdash lib for keyID
-def __init__(self):
- self.scheme_id_uri = '' # xs:anyURI (required)
- self.value = None # xs:string
- self.id = None # xs:string
- self.key_id = None # xs:string
-
-def parse(self, xmlnode):
- self.scheme_id_uri = parse_attr_value(xmlnode, 'schemeIdUri', str)
- self.value = parse_attr_value(xmlnode, 'value', str)
- self.id = parse_attr_value(xmlnode, 'id', str)
- self.key_id = parse_attr_value(xmlnode, 'cenc:default_KID', str)
-
-def write(self, xmlnode):
- write_attr_value(xmlnode, 'schemeIdUri', self.scheme_id_uri)
- write_attr_value(xmlnode, 'value', self.value)
- write_attr_value(xmlnode, 'id', self.id)
- write_attr_value(xmlnode, 'cenc:default_KID', self.key_id)
-
-Descriptor.__init__ = __init__
-Descriptor.parse = parse
-Descriptor.write = write
-
-def durationtoseconds(period):
- #Duration format in PTxDxHxMxS
- if(period[:2] == "PT"):
- period = period[2:]
- day = int(period.split("D")[0] if 'D' in period else 0)
- hour = int(period.split("H")[0].split("D")[-1] if 'H' in period else 0)
- minute = int(period.split("M")[0].split("H")[-1] if 'M' in period else 0)
- second = period.split("S")[0].split("M")[-1]
- print("Total time: " + str(day) + " days " + str(hour) + " hours " + str(minute) + " minutes and " + str(second) + " seconds")
- total_time = float(str((day * 24 * 60 * 60) + (hour * 60 * 60) + (minute * 60) + (int(second.split('.')[0]))) + '.' + str(int(second.split('.')[-1])))
- return total_time
-
- else:
- print("Duration Format Error")
- return None
-
-def download_media(filename,url,epoch = 0):
- if(os.path.isfile(filename)):
- print("Segment already downloaded.. skipping..")
- else:
- media = requests.get(url, stream=True)
- media_length = int(media.headers.get("content-length"))
- if media.status_code == 200:
- if(os.path.isfile(filename) and os.path.getsize(filename) >= media_length):
- print("Segment already downloaded.. skipping write to disk..")
- else:
- try:
- with open(filename, 'wb') as video_file:
- shutil.copyfileobj(media.raw, video_file)
- print("Segment downloaded: " + filename)
- return False #Successfully downloaded the file
- except:
- print("Connection error: Reattempting download of segment..")
- download_media(filename,url, epoch + 1)
-
- if os.path.getsize(filename) >= media_length:
- pass
- else:
- print("Segment is faulty.. Redownloading...")
- download_media(filename,url, epoch + 1)
- elif(media.status_code == 404):
- print("Probably end hit!\n",url)
- return True #Probably hit the last of the file
- else:
- if (epoch > retry):
- exit("Error fetching segment, exceeded retry times.")
- print("Error fetching segment file.. Redownloading...")
- download_media(filename,url, epoch + 1)
-
-def cleanup(path):
- leftover_files = glob.glob(path + '/*.mp4', recursive=True)
- mpd_files = glob.glob(path + '/*.mpd', recursive=True)
- leftover_files = leftover_files + mpd_files
- for file_list in leftover_files:
- try:
- os.remove(file_list)
- except OSError:
- print(f"Error deleting file: {file_list}")
-
-def mux_process(video_title,outfile):
- if os.name == "nt":
- command = f"ffmpeg -y -i decrypted_audio.mp4 -i decrypted_video.mp4 -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{video_title}\" -metadata creation_time=2020-00-00T70:05:30.000000Z \"{outfile}.mp4\""
- else:
- command = f"nice -n 7 ffmpeg -y -i decrypted_audio.mp4 -i decrypted_video.mp4 -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{video_title}\" -metadata creation_time=2020-00-00T70:05:30.000000Z {outfile}.mp4"
- os.system(command)
-
-def decrypt(kid,filename):
- try:
- key = keyfile[kid.lower()]
- except KeyError as error:
- exit("Key not found")
- if(os.name == "nt"):
- os.system(f"mp4decrypt --key 1:{key} encrypted_{filename}.mp4 decrypted_{filename}.mp4")
- else:
- os.system(f"nice -n 7 mp4decrypt --key 1:{key} encrypted_{filename}.mp4 decrypted_{filename}.mp4")
-
-
-def handle_irregular_segments(media_info,video_title,output_path):
- no_segment,video_url,video_init,video_extension,no_segment,audio_url,audio_init,audio_extension = media_info
- download_media("video_0.seg.mp4",video_init)
- video_kid = extract_kid("video_0.seg.mp4")
- print("KID for video file is: " + video_kid)
- download_media("audio_0.seg.mp4",audio_init)
- audio_kid = extract_kid("audio_0.seg.mp4")
- print("KID for audio file is: " + audio_kid)
- for count in range(1,no_segment):
- video_segment_url = video_url.replace("$Number$",str(count))
- audio_segment_url = audio_url.replace("$Number$",str(count))
- video_status = download_media(f"video_{str(count)}.seg.{video_extension}",video_segment_url)
- audio_status = download_media(f"audio_{str(count)}.seg.{audio_extension}",audio_segment_url)
- if(video_status):
- if os.name == "nt":
- video_concat_command = "copy /b " + "+".join([f"video_{i}.seg.{video_extension}" for i in range(0,count)]) + " encrypted_video.mp4"
- audio_concat_command = "copy /b " + "+".join([f"audio_{i}.seg.{audio_extension}" for i in range(0,count)]) + " encrypted_audio.mp4"
- else:
- video_concat_command = "cat " + " ".join([f"video_{i}.seg.{video_extension}" for i in range(0,count)]) + " > encrypted_video.mp4"
- audio_concat_command = "cat " + " ".join([f"audio_{i}.seg.{audio_extension}" for i in range(0,count)]) + " > encrypted_audio.mp4"
- print(video_concat_command)
- print(audio_concat_command)
- os.system(video_concat_command)
- os.system(audio_concat_command)
- decrypt(video_kid,"video")
- decrypt(audio_kid,"audio")
- mux_process(video_title,output_path)
- break
-
-
-def manifest_parser(mpd_url):
- video = []
- audio = []
- manifest = requests.get(mpd_url).text
- with open("manifest.mpd",'w') as manifest_handler:
- manifest_handler.write(manifest)
- mpd = MPEGDASHParser.parse("./manifest.mpd")
- running_time = durationtoseconds(mpd.media_presentation_duration)
- for period in mpd.periods:
- for adapt_set in period.adaptation_sets:
- print("Processing " + adapt_set.mime_type)
- content_type = adapt_set.mime_type
- repr = adapt_set.representations[-1] # Max Quality
- for segment in repr.segment_templates:
- if(segment.duration):
- print("Media segments are of equal timeframe")
- segment_time = segment.duration / segment.timescale
- total_segments = running_time / segment_time
- else:
- print("Media segments are of inequal timeframe")
-
- approx_no_segments = round(running_time / 6) + 20 # aproximate of 6 sec per segment
- print("Expected No of segments:",approx_no_segments)
- if(content_type == "audio/mp4"):
- segment_extension = segment.media.split(".")[-1]
- audio.append(approx_no_segments)
- audio.append(segment.media)
- audio.append(segment.initialization)
- audio.append(segment_extension)
- elif(content_type == "video/mp4"):
- segment_extension = segment.media.split(".")[-1]
- video.append(approx_no_segments)
- video.append(segment.media)
- video.append(segment.initialization)
- video.append(segment_extension)
- return video + audio
-
-
-
-if __name__ == "__main__":
- mpd = "mpd url"
- base_url = mpd.split("index.mpd")[0]
- os.chdir(working_dir)
- media_info = manifest_parser(mpd)
- video_title = "175. Inverse Transforming Vectors" # the video title that gets embeded into the mp4 file metadata
- output_path = os.path.join(download_dir, "175. Inverse Transforming Vectors") # video title used in the filename, dont append .mp4
- handle_irregular_segments(media_info,video_title,output_path)
- cleanup(working_dir)
diff --git a/main.py b/main.py
index a2f8aa5..5d0305b 100644
--- a/main.py
+++ b/main.py
@@ -1,29 +1,794 @@
-import os, requests, shutil, json, glob, urllib.request, argparse, sys, datetime
-from sanitize_filename import sanitize
-import urllib.request
+import os, requests, json, glob, argparse, sys, re, time, asyncio, json, cloudscraper, m3u8
from tqdm import tqdm
from dotenv import load_dotenv
from mpegdash.parser import MPEGDASHParser
-from mpegdash.nodes import Descriptor
-from mpegdash.utils import (parse_attr_value, parse_child_nodes,
- parse_node_value, write_attr_value,
- write_child_node, write_node_value)
from utils import extract_kid
from vtt_to_srt import convert
+from requests.exceptions import ConnectionError as conn_error
+from html.parser import HTMLParser as compat_HTMLParser
+from sanitize import sanitize, slugify, SLUG_OK
+from pyffmpeg import FFMPeg as FFMPEG
+import subprocess
-course_id = None
-header_bearer = None
-download_dir = os.path.join(os.getcwd(), "out_dir")
-working_dir = os.path.join(os.getcwd(), "working_dir") # set the folder to download segments for DRM videos
-retry = 3
home_dir = os.getcwd()
+download_dir = os.path.join(os.getcwd(), "out_dir")
+working_dir = os.path.join(os.getcwd(), "working_dir")
keyfile_path = os.path.join(os.getcwd(), "keyfile.json")
-dl_assets = False
-dl_captions = False
-skip_lectures = False
-caption_locale = "en"
-quality = None # None will download the best possible
-valid_qualities = [144, 360, 480, 720, 1080]
+retry = 3
+downloader = None
+HEADERS = {
+ "Origin": "www.udemy.com",
+ "User-Agent":
+ "Mozilla/5.0 (Windows NT 6.3; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
+ "Accept": "*/*",
+ "Accept-Encoding": None,
+}
+LOGIN_URL = "https://www.udemy.com/join/login-popup/?ref=&display_type=popup&loc"
+LOGOUT_URL = "https://www.udemy.com/user/logout"
+COURSE_URL = "https://{portal_name}.udemy.com/api-2.0/courses/{course_id}/cached-subscriber-curriculum-items?fields[asset]=results,title,external_url,time_estimation,download_urls,slide_urls,filename,asset_type,captions,media_license_token,course_is_drmed,media_sources,stream_urls,body&fields[chapter]=object_index,title,sort_order&fields[lecture]=id,title,object_index,asset,supplementary_assets,view_html&page_size=10000"
+COURSE_SEARCH = "https://{portal_name}.udemy.com/api-2.0/users/me/subscribed-courses?fields[course]=id,url,title,published_title&page=1&page_size=500&search={course_name}"
+SUBSCRIBED_COURSES = "https://www.udemy.com/api-2.0/users/me/subscribed-courses/?ordering=-last_accessed&fields[course]=id,title,url&page=1&page_size=12"
+MY_COURSES_URL = "https://{portal_name}.udemy.com/api-2.0/users/me/subscribed-courses?fields[course]=id,url,title,published_title&ordering=-last_accessed,-access_time&page=1&page_size=10000"
+COLLECTION_URL = "https://{portal_name}.udemy.com/api-2.0/users/me/subscribed-courses-collections/?collection_has_courses=True&course_limit=20&fields[course]=last_accessed_time,title,published_title&fields[user_has_subscribed_courses_collection]=@all&page=1&page_size=1000"
+
+
+def _clean(text):
+ ok = re.compile(r'[^\\/:*?"<>|]')
+ text = "".join(x if ok.match(x) else "_" for x in text)
+ text = re.sub(r"\.+$", "", text.strip())
+ return text
+
+
+def _sanitize(self, unsafetext):
+ text = sanitize(
+ slugify(unsafetext, lower=False, spaces=True, ok=SLUG_OK + "().[]"))
+ return text
+
+
+class Udemy:
+ def __init__(self, access_token):
+ self.session = None
+ self.access_token = None
+ self.auth = UdemyAuth(cache_session=False)
+ if not self.session:
+ self.session, self.access_token = self.auth.authenticate(
+ access_token=access_token)
+
+ if self.session and self.access_token:
+ self.session._headers.update(
+ {"Authorization": "Bearer {}".format(self.access_token)})
+ self.session._headers.update({
+ "X-Udemy-Authorization":
+ "Bearer {}".format(self.access_token)
+ })
+ print("Login Success")
+ else:
+ print("Login Failure!")
+ sys.exit(1)
+
+ def _extract_supplementary_assets(self, supp_assets):
+ _temp = []
+ for entry in supp_assets:
+ title = _clean(entry.get("title"))
+ filename = entry.get("filename")
+ download_urls = entry.get("download_urls")
+ external_url = entry.get("external_url")
+ asset_type = entry.get("asset_type").lower()
+ if asset_type == "file":
+ if download_urls and isinstance(download_urls, dict):
+ extension = filename.rsplit(
+ ".", 1)[-1] if "." in filename else ""
+ download_url = download_urls.get("File", [])[0].get("file")
+ _temp.append({
+ "type": "file",
+ "title": title,
+ "filename": filename,
+ "extension": extension,
+ "download_url": download_url,
+ })
+ elif asset_type == "sourcecode":
+ if download_urls and isinstance(download_urls, dict):
+ extension = filename.rsplit(
+ ".", 1)[-1] if "." in filename else ""
+ download_url = download_urls.get("SourceCode",
+ [])[0].get("file")
+ _temp.append({
+ "type": "source_code",
+ "title": title,
+ "filename": filename,
+ "extension": extension,
+ "download_url": download_url,
+ })
+ elif asset_type == "externallink":
+ _temp.append({
+ "type": "external_link",
+ "title": title,
+ "filename": filename,
+ "extension": "txt",
+ "download_url": external_url,
+ })
+ return _temp
+
+ def _extract_ppt(self, assets):
+ _temp = []
+ download_urls = assets.get("download_urls")
+ filename = assets.get("filename")
+ if download_urls and isinstance(download_urls, dict):
+ extension = filename.rsplit(".", 1)[-1] if "." in filename else ""
+ download_url = download_urls.get("Presentation", [])[0].get("file")
+ _temp.append({
+ "type": "presentation",
+ "filename": filename,
+ "extension": extension,
+ "download_url": download_url,
+ })
+ return _temp
+
+ def _extract_file(self, assets):
+ _temp = []
+ download_urls = assets.get("download_urls")
+ filename = assets.get("filename")
+ if download_urls and isinstance(download_urls, dict):
+ extension = filename.rsplit(".", 1)[-1] if "." in filename else ""
+ download_url = download_urls.get("File", [])[0].get("file")
+ _temp.append({
+ "type": "file",
+ "filename": filename,
+ "extension": extension,
+ "download_url": download_url,
+ })
+ return _temp
+
+ def _extract_ebook(self, assets):
+ _temp = []
+ download_urls = assets.get("download_urls")
+ filename = assets.get("filename")
+ if download_urls and isinstance(download_urls, dict):
+ extension = filename.rsplit(".", 1)[-1] if "." in filename else ""
+ download_url = download_urls.get("E-Book", [])[0].get("file")
+ _temp.append({
+ "type": "ebook",
+ "filename": filename,
+ "extension": extension,
+ "download_url": download_url,
+ })
+ return _temp
+
+ def _extract_audio(self, assets):
+ _temp = []
+ download_urls = assets.get("download_urls")
+ filename = assets.get("filename")
+ if download_urls and isinstance(download_urls, dict):
+ extension = filename.rsplit(".", 1)[-1] if "." in filename else ""
+ download_url = download_urls.get("Audio", [])[0].get("file")
+ _temp.append({
+ "type": "audio",
+ "filename": filename,
+ "extension": extension,
+ "download_url": download_url,
+ })
+ return _temp
+
+ def _extract_sources(self, sources, skip_hls):
+ _temp = []
+ if sources and isinstance(sources, list):
+ for source in sources:
+ label = source.get("label")
+ download_url = source.get("file")
+ if not download_url:
+ continue
+ if label.lower() == "audio":
+ continue
+ height = label if label else None
+ if height == "2160":
+ width = "3840"
+ elif height == "1440":
+ width = "2560"
+ elif height == "1080":
+ width = "1920"
+ elif height == "720":
+ width = "1280"
+ elif height == "480":
+ width = "854"
+ elif height == "360":
+ width = "640"
+ elif height == "240":
+ width = "426"
+ else:
+ width = "256"
+ if (source.get("type") == "application/x-mpegURL"
+ or "m3u8" in download_url):
+ if not skip_hls:
+ out = self._extract_m3u8(download_url)
+ if out:
+ _temp.extend(out)
+ else:
+ _type = source.get("type")
+ _temp.append({
+ "type": "video",
+ "height": height,
+ "width": width,
+ "extension": _type.replace("video/", ""),
+ "download_url": download_url,
+ })
+ return _temp
+
+ def _extract_media_sources(self, sources):
+ _audio = []
+ _video = []
+ if sources and isinstance(sources, list):
+ for source in sources:
+ _type = source.get("type")
+ src = source.get("src")
+
+ if _type == "application/dash+xml":
+ video, audio = self._extract_mpd(src)
+ if video and audio:
+ _video.extend(video)
+ _audio.extend(audio)
+ return (_video, _audio)
+
+ def _extract_subtitles(self, tracks):
+ _temp = []
+ if tracks and isinstance(tracks, list):
+ for track in tracks:
+ if not isinstance(track, dict):
+ continue
+ if track.get("_class") != "caption":
+ continue
+ download_url = track.get("url")
+ if not download_url or not isinstance(download_url, str):
+ continue
+ lang = (track.get("language") or track.get("srclang")
+ or track.get("label")
+ or track["locale_id"].split("_")[0])
+ ext = "vtt" if "vtt" in download_url.rsplit(".",
+ 1)[-1] else "srt"
+ _temp.append({
+ "type": "subtitle",
+ "language": lang,
+ "extension": ext,
+ "download_url": download_url,
+ })
+ return _temp
+
+ def _extract_m3u8(self, url):
+ """extracts m3u8 streams"""
+ _temp = []
+ try:
+ resp = self.session._get(url)
+ resp.raise_for_status()
+ raw_data = resp.text
+ m3u8_object = m3u8.loads(raw_data)
+ playlists = m3u8_object.playlists
+ seen = set()
+ for pl in playlists:
+ resolution = pl.stream_info.resolution
+ codecs = pl.stream_info.codecs
+ if not resolution:
+ continue
+ if not codecs:
+ continue
+ width, height = resolution
+ download_url = pl.uri
+ if height not in seen:
+ seen.add(height)
+ _temp.append({
+ "type": "hls",
+ "height": height,
+ "width": width,
+ "extension": "mp4",
+ "download_url": download_url,
+ })
+ except Exception as error:
+ print(f"Udemy Says : '{error}' while fetching hls streams..")
+ return _temp
+
+ def _extract_mpd(self, url):
+ """extract mpd streams"""
+ _video = []
+ _audio = []
+ try:
+ resp = self.session._get(url)
+ resp.raise_for_status()
+ raw_data = resp.text
+ mpd_object = MPEGDASHParser.parse(raw_data)
+ seen = set()
+ for period in mpd_object.periods:
+ for adapt_set in period.adaptation_sets:
+ content_type = adapt_set.mime_type
+ if content_type == "video/mp4":
+ for rep in adapt_set.representations:
+ for segment in rep.segment_templates:
+ segment_count = 1
+ timeline = segment.segment_timelines[0]
+ segment_count += len(timeline.Ss)
+ for s in timeline.Ss:
+ if s.r:
+ segment_count += s.r
+
+ segment_extension = segment.media.split(
+ ".")[-1]
+ height = rep.height
+ width = rep.width
+
+ if height not in seen:
+ seen.add(height)
+ _video.append({
+ "type":
+ "dash",
+ "content_type":
+ "video",
+ "height":
+ height,
+ "width":
+ width,
+ "extension":
+ segment_extension,
+ "segment_count":
+ segment_count,
+ "media":
+ segment.media,
+ "initialization":
+ segment.initialization
+ })
+ elif content_type == "audio/mp4":
+ for rep in adapt_set.representations:
+ for segment in rep.segment_templates:
+ segment_count = 1
+ timeline = segment.segment_timelines[0]
+ segment_count += len(timeline.Ss)
+ for s in timeline.Ss:
+ if s.r:
+ segment_count += s.r
+
+ segment_extension = segment.media.split(
+ ".")[-1]
+
+ _audio.append({
+ "type":
+ "dash",
+ "content_type":
+ "audio",
+ "extension":
+ segment_extension,
+ "segment_count":
+ segment_count,
+ "media":
+ segment.media,
+ "initialization":
+ segment.initialization
+ })
+ except Exception as error:
+ print(f"Udemy Says : '{error}' while fetching mpd manifest")
+ return (_video, _audio)
+
+ def extract_course_name(self, url):
+ """
+ @author r0oth3x49
+ """
+ obj = re.search(
+ r"(?i)(?://(?P.+?).udemy.com/(?:course(/draft)*/)?(?P[a-zA-Z0-9_-]+))",
+ url,
+ )
+ if obj:
+ return obj.group("portal_name"), obj.group("name_or_id")
+
+ def _subscribed_courses(self, portal_name, course_name):
+ results = []
+ self.session._headers.update({
+ "Host":
+ "{portal_name}.udemy.com".format(portal_name=portal_name),
+ "Referer":
+ "https://{portal_name}.udemy.com/home/my-courses/search/?q={course_name}"
+ .format(portal_name=portal_name, course_name=course_name),
+ })
+ url = COURSE_SEARCH.format(portal_name=portal_name,
+ course_name=course_name)
+ try:
+ webpage = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception) as error:
+ print(f"Udemy Says: {error} on {url}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ results = webpage.get("results", [])
+ return results
+
+ def _extract_course_json(self, url, course_id, portal_name):
+ self.session._headers.update({"Referer": url})
+ url = COURSE_URL.format(portal_name=portal_name, course_id=course_id)
+ try:
+ resp = self.session._get(url)
+ if resp.status_code in [502, 503]:
+ print(
+ "> The course content is large, using large content extractor..."
+ )
+ resp = self._extract_large_course_content(url=url)
+ else:
+ resp = resp.json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception):
+ resp = self._extract_large_course_content(url=url)
+ return resp
+ else:
+ return resp
+
+ def _extract_large_course_content(self, url):
+ url = url.replace("10000", "50") if url.endswith("10000") else url
+ try:
+ data = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ _next = data.get("next")
+ while _next:
+ print("Downloading course information.. ")
+ try:
+ resp = self.session._get(_next).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ _next = resp.get("next")
+ results = resp.get("results")
+ if results and isinstance(results, list):
+ for d in resp["results"]:
+ data["results"].append(d)
+ return data
+
+ def __extract_course(self, response, course_name):
+ _temp = {}
+ if response:
+ for entry in response:
+ course_id = str(entry.get("id"))
+ published_title = entry.get("published_title")
+ if course_name in (published_title, course_id):
+ _temp = entry
+ break
+ return _temp
+
+ def _my_courses(self, portal_name):
+ results = []
+ try:
+ url = MY_COURSES_URL.format(portal_name=portal_name)
+ webpage = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception) as error:
+ print(f"Udemy Says: {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ results = webpage.get("results", [])
+ return results
+
+ def _subscribed_collection_courses(self, portal_name):
+ url = COLLECTION_URL.format(portal_name=portal_name)
+ courses_lists = []
+ try:
+ webpage = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception) as error:
+ print(f"Udemy Says: {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ results = webpage.get("results", [])
+ if results:
+ [
+ courses_lists.extend(courses.get("courses", []))
+ for courses in results if courses.get("courses", [])
+ ]
+ return courses_lists
+
+ def _archived_courses(self, portal_name):
+ results = []
+ try:
+ url = MY_COURSES_URL.format(portal_name=portal_name)
+ url = f"{url}&is_archived=true"
+ webpage = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception) as error:
+ print(f"Udemy Says: {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ results = webpage.get("results", [])
+ return results
+
+ def _my_courses(self, portal_name):
+ results = []
+ try:
+ url = MY_COURSES_URL.format(portal_name=portal_name)
+ webpage = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception) as error:
+ print(f"Udemy Says: {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ results = webpage.get("results", [])
+ return results
+
+ def _subscribed_collection_courses(self, portal_name):
+ url = COLLECTION_URL.format(portal_name=portal_name)
+ courses_lists = []
+ try:
+ webpage = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception) as error:
+ print(f"Udemy Says: {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ results = webpage.get("results", [])
+ if results:
+ [
+ courses_lists.extend(courses.get("courses", []))
+ for courses in results if courses.get("courses", [])
+ ]
+ return courses_lists
+
+ def _archived_courses(self, portal_name):
+ results = []
+ try:
+ url = MY_COURSES_URL.format(portal_name=portal_name)
+ url = f"{url}&is_archived=true"
+ webpage = self.session._get(url).json()
+ except conn_error as error:
+ print(f"Udemy Says: Connection error, {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ except (ValueError, Exception) as error:
+ print(f"Udemy Says: {error}")
+ time.sleep(0.8)
+ sys.exit(0)
+ else:
+ results = webpage.get("results", [])
+ return results
+
+ def _extract_course_info(self, url):
+ portal_name, course_name = self.extract_course_name(url)
+ course = {}
+ results = self._subscribed_courses(portal_name=portal_name,
+ course_name=course_name)
+ course = self.__extract_course(response=results,
+ course_name=course_name)
+ if not course:
+ results = self._my_courses(portal_name=portal_name)
+ course = self.__extract_course(response=results,
+ course_name=course_name)
+ if not course:
+ results = self._subscribed_collection_courses(
+ portal_name=portal_name)
+ course = self.__extract_course(response=results,
+ course_name=course_name)
+ if not course:
+ results = self._archived_courses(portal_name=portal_name)
+ course = self.__extract_course(response=results,
+ course_name=course_name)
+
+ if course:
+ course.update({"portal_name": portal_name})
+ return course.get("id"), course
+ if not course:
+ print("Downloading course information, course id not found .. ")
+ print(
+ "It seems either you are not enrolled or you have to visit the course atleast once while you are logged in.",
+ )
+ print("Trying to logout now...", )
+ self.session.terminate()
+ print("Logged out successfully.", )
+ sys.exit(0)
+
+
+class Session(object):
+ def __init__(self):
+ self._headers = HEADERS
+ self._session = requests.sessions.Session()
+
+ def _set_auth_headers(self, access_token="", client_id=""):
+ self._headers["Authorization"] = "Bearer {}".format(access_token)
+ self._headers["X-Udemy-Authorization"] = "Bearer {}".format(
+ access_token)
+
+ def _get(self, url):
+ session = self._session.get(url, headers=self._headers)
+ if session.ok or session.status_code in [502, 503]:
+ return session
+ if not session.ok:
+ raise Exception(f"{session.status_code} {session.reason}")
+
+ def _post(self, url, data, redirect=True):
+ session = self._session.post(url,
+ data,
+ headers=self._headers,
+ allow_redirects=redirect)
+ if session.ok:
+ return session
+ if not session.ok:
+ raise Exception(f"{session.status_code} {session.reason}")
+
+ def terminate(self):
+ self._set_auth_headers()
+ return
+
+
+# Thanks to a great open source utility youtube-dl ..
+class HTMLAttributeParser(compat_HTMLParser): # pylint: disable=W
+ """Trivial HTML parser to gather the attributes for a single element"""
+ def __init__(self):
+ self.attrs = {}
+ compat_HTMLParser.__init__(self)
+
+ def handle_starttag(self, tag, attrs):
+ self.attrs = dict(attrs)
+
+
+def extract_attributes(html_element):
+ """Given a string for an HTML element such as
+
+ Decode and return a dictionary of attributes.
+ {
+ 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz',
+ 'empty': '', 'noval': None, 'entity': '&',
+ 'sq': '"', 'dq': '\''
+ }.
+ NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions,
+ but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5.
+ """
+ parser = HTMLAttributeParser()
+ try:
+ parser.feed(html_element)
+ parser.close()
+ except Exception: # pylint: disable=W
+ pass
+ return parser.attrs
+
+
+def hidden_inputs(html):
+ html = re.sub(r"", "", html)
+ hidden_inputs = {} # pylint: disable=W
+ for entry in re.findall(r"(?i)(]+>)", html):
+ attrs = extract_attributes(entry)
+ if not entry:
+ continue
+ if attrs.get("type") not in ("hidden", "submit"):
+ continue
+ name = attrs.get("name") or attrs.get("id")
+ value = attrs.get("value")
+ if name and value is not None:
+ hidden_inputs[name] = value
+ return hidden_inputs
+
+
+def search_regex(pattern,
+ string,
+ name,
+ default=object(),
+ fatal=True,
+ flags=0,
+ group=None):
+ """
+ Perform a regex search on the given string, using a single or a list of
+ patterns returning the first matching group.
+ In case of failure return a default value or raise a WARNING or a
+ RegexNotFoundError, depending on fatal, specifying the field name.
+ """
+ if isinstance(pattern, str):
+ mobj = re.search(pattern, string, flags)
+ else:
+ for p in pattern:
+ mobj = re.search(p, string, flags)
+ if mobj:
+ break
+
+ _name = name
+
+ if mobj:
+ if group is None:
+ # return the first matching group
+ return next(g for g in mobj.groups() if g is not None)
+ else:
+ return mobj.group(group)
+ elif default is not object():
+ return default
+ elif fatal:
+ print("[-] Unable to extract %s" % _name)
+ exit(0)
+ else:
+ print("[-] unable to extract %s" % _name)
+ exit(0)
+
+
+class UdemyAuth(object):
+ def __init__(self, username="", password="", cache_session=False):
+ self.username = username
+ self.password = password
+ self._cache = cache_session
+ self._session = Session()
+ self._cloudsc = cloudscraper.create_scraper()
+
+ def _form_hidden_input(self, form_id):
+ try:
+ resp = self._cloudsc.get(LOGIN_URL)
+ resp.raise_for_status()
+ webpage = resp.text
+ except conn_error as error:
+ raise error
+ else:
+ login_form = hidden_inputs(
+ search_regex(
+ r'(?is)'
+ % form_id,
+ webpage,
+ "%s form" % form_id,
+ group="form",
+ ))
+ login_form.update({
+ "email": self.username,
+ "password": self.password
+ })
+ return login_form
+
+ def authenticate(self, access_token="", client_id=""):
+ if not access_token and not client_id:
+ data = self._form_hidden_input(form_id="login-form")
+ self._cloudsc.headers.update({"Referer": LOGIN_URL})
+ auth_response = self._cloudsc.post(LOGIN_URL,
+ data=data,
+ allow_redirects=False)
+ auth_cookies = auth_response.cookies
+
+ access_token = auth_cookies.get("access_token", "")
+ client_id = auth_cookies.get("client_id", "")
+
+ if access_token:
+ # dump cookies to configs
+ # if self._cache:
+ # _ = to_configs(
+ # username=self.username,
+ # password=self.password,
+ # cookies=f"access_token={access_token}",
+ # )
+ self._session._set_auth_headers(access_token=access_token,
+ client_id=client_id)
+ self._session._session.cookies.update(
+ {"access_token": access_token})
+ return self._session, access_token
+ else:
+ self._session._set_auth_headers()
+ return None, None
+
if not os.path.exists(working_dir):
os.makedirs(working_dir)
@@ -35,12 +800,13 @@ if not os.path.exists(download_dir):
with open(keyfile_path, 'r') as keyfile:
keyfile = keyfile.read()
keyfile = json.loads(keyfile)
-"""
-@author Jayapraveen
-"""
def durationtoseconds(period):
+ """
+ @author Jayapraveen
+ """
+
#Duration format in PTxDxHxMxS
if (period[:2] == "PT"):
period = period[2:]
@@ -62,234 +828,192 @@ def durationtoseconds(period):
return None
-def download_media(filename, url, lecture_working_dir, epoch=0):
- if (os.path.isfile(filename)):
- print("Segment already downloaded.. skipping..")
- else:
- media = requests.get(url, stream=True)
- media_length = int(media.headers.get("content-length"))
- if media.status_code == 200:
- if (os.path.isfile(filename)
- and os.path.getsize(filename) >= media_length):
- print("Segment already downloaded.. skipping write to disk..")
- else:
- try:
- pbar = tqdm(total=media_length,
- initial=0,
- unit='B',
- unit_scale=True,
- desc=filename)
- with open(os.path.join(lecture_working_dir, filename),
- 'wb') as video_file:
- for chunk in media.iter_content(chunk_size=1024):
- if chunk:
- video_file.write(chunk)
- pbar.update(1024)
- pbar.close()
- print("Segment downloaded: " + filename)
- return False #Successfully downloaded the file
- except:
- print(
- "Connection error: Reattempting download of segment..")
- download_media(filename, url, lecture_working_dir,
- epoch + 1)
-
- if os.path.getsize(filename) >= media_length:
- pass
- else:
- print("Segment is faulty.. Redownloading...")
- download_media(filename, url, lecture_working_dir, epoch + 1)
- elif (media.status_code == 404):
- print("Probably end hit!\n", url)
- return True #Probably hit the last of the file
- else:
- if (epoch > retry):
- exit("Error fetching segment, exceeded retry times.")
- print("Error fetching segment file.. Redownloading...")
- download_media(filename, url, lecture_working_dir, epoch + 1)
-
-
-"""
-@author Jayapraveen
-"""
-
-
def cleanup(path):
+ """
+ @author Jayapraveen
+ """
leftover_files = glob.glob(path + '/*.mp4', recursive=True)
- mpd_files = glob.glob(path + '/*.mpd', recursive=True)
- leftover_files = leftover_files + mpd_files
for file_list in leftover_files:
try:
os.remove(file_list)
except OSError:
print(f"Error deleting file: {file_list}")
+ os.removedirs(path)
-"""
-@author Jayapraveen
-"""
-
-
-def mux_process(video_title, lecture_working_dir, outfile):
- time_stamp = datetime.datetime.now().isoformat()+'Z'
+def mux_process(video_title, lecture_working_dir, output_path):
+ """
+ @author Jayapraveen
+ """
if os.name == "nt":
- command = f"ffmpeg -y -i \"{lecture_working_dir}\\decrypted_audio.mp4\" -i \"{lecture_working_dir}\\decrypted_video.mp4\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{video_title}\" -metadata creation_time=\"{time_stamp}\" \"{outfile}\""
+ command = "ffmpeg -y -i \"{}\" -i \"{}\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{}\" \"{}\"".format(
+ os.path.join(lecture_working_dir, "decrypted_audio.mp4"),
+ os.path.join(lecture_working_dir, "decrypted_video.mp4"),
+ video_title, output_path)
else:
- command = f"nice -n 7 ffmpeg -y -i \"{lecture_working_dir}//decrypted_audio.mp4\" -i \"{lecture_working_dir}//decrypted_video.mp4\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{video_title}\" -metadata creation_time=\"{time_stamp}\" \"{outfile}\""
+ command = "nice -n 7 ffmpeg -y -i \"{}\" -i \"{}\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{}\" \"{}\"".format(
+ os.path.join(lecture_working_dir, "decrypted_audio.mp4"),
+ os.path.join(lecture_working_dir, "decrypted_video.mp4"),
+ video_title, output_path)
os.system(command)
-"""
-@author Jayapraveen
-"""
-
-
def decrypt(kid, filename, lecture_working_dir):
+ """
+ @author Jayapraveen
+ """
+ print("> Decrypting, this might take a minute...")
try:
key = keyfile[kid.lower()]
- except KeyError as error:
- exit("Key not found")
- if (os.name == "nt"):
- os.system(
- f"mp4decrypt --key 1:{key} \"{lecture_working_dir}\\encrypted_{filename}.mp4\" \"{lecture_working_dir}\\decrypted_{filename}.mp4\""
- )
- else:
- os.system(
- f"nice -n 7 mp4decrypt --key 1:{key} \"{lecture_working_dir}//encrypted_{filename}.mp4\" \"{lecture_working_dir}//decrypted_{filename}.mp4\""
- )
+ if (os.name == "nt"):
+ os.system(f"mp4decrypt --key 1:%s \"%s\" \"%s\"" %
+ (key,
+ os.path.join(lecture_working_dir,
+ "encrypted_{}.mp4".format(filename)),
+ os.path.join(lecture_working_dir,
+ "decrypted_{}.mp4".format(filename))))
+ else:
+ os.system(f"nice -n 7 mp4decrypt --key 1:%s \"%s\" \"%s\"" %
+ (key,
+ os.path.join(lecture_working_dir,
+ "encrypted_{}.mp4".format(filename)),
+ os.path.join(lecture_working_dir,
+ "decrypted_{}.mp4".format(filename))))
+ print("> Decryption complete")
+ except KeyError:
+ raise KeyError("Key not found")
-"""
-@author Jayapraveen
-"""
+def handle_segments(video_source, audio_source, video_title,
+ lecture_working_dir, output_path):
+ """
+ @author Jayapraveen
+ """
+ no_vid_segments = video_source.get("segment_count")
+ no_aud_segments = audio_source.get("segment_count")
+ audio_media = audio_source.get("media")
+ audio_init = audio_source.get("initialization")
+ audio_extension = audio_source.get("extension")
-def handle_irregular_segments(media_info, video_title, lecture_working_dir,
- output_path):
- no_segment, video_url, video_init, video_extension, no_segment, audio_url, audio_init, audio_extension = media_info
- download_media("video_0.seg.mp4", video_init, lecture_working_dir)
- video_kid = extract_kid(os.path.join(lecture_working_dir, "video_0.seg.mp4"))
+ video_media = video_source.get("media")
+ video_init = video_source.get("initialization")
+ video_extension = video_source.get("extension")
+
+ audio_urls = audio_init + "\n dir={}\n out=audio_0.mp4\n".format(
+ lecture_working_dir)
+ video_urls = video_init + "\n dir={}\n out=video_0.mp4\n".format(
+ lecture_working_dir)
+
+ list_path = os.path.join(lecture_working_dir, "list.txt")
+
+ for i in range(1, no_aud_segments):
+ audio_urls += audio_media.replace(
+ "$Number$", str(i)) + "\n dir={}\n out=audio_{}.mp4\n".format(
+ lecture_working_dir, i)
+ for i in range(1, no_vid_segments):
+ video_urls += video_media.replace(
+ "$Number$", str(i)) + "\n dir={}\n out=video_{}.mp4\n".format(
+ lecture_working_dir, i)
+
+ with open(list_path, 'w') as f:
+ f.write("{}\n{}".format(audio_urls, video_urls))
+ f.close()
+
+ print("> Downloading Lecture Segments...")
+ ret_code = subprocess.Popen([
+ "aria2c", "-i", list_path, "-j16", "-s20", "-x16", "-c",
+ "--auto-file-renaming=false", "--summary-interval=0"
+ ]).wait()
+ print("> Lecture Segments Downloaded")
+
+ print("Return code: " + str(ret_code))
+
+ os.remove(list_path)
+
+ video_kid = extract_kid(os.path.join(lecture_working_dir, "video_0.mp4"))
print("KID for video file is: " + video_kid)
- download_media("audio_0.seg.mp4", audio_init, lecture_working_dir)
- audio_kid = extract_kid(os.path.join(lecture_working_dir, "audio_0.seg.mp4"))
+
+ audio_kid = extract_kid(os.path.join(lecture_working_dir, "audio_0.mp4"))
print("KID for audio file is: " + audio_kid)
- for count in range(1, no_segment):
- video_segment_url = video_url.replace("$Number$", str(count))
- audio_segment_url = audio_url.replace("$Number$", str(count))
- video_status = download_media(
- f"video_{str(count)}.seg.{video_extension}", video_segment_url,
- lecture_working_dir)
- audio_status = download_media(
- f"audio_{str(count)}.seg.{audio_extension}", audio_segment_url,
- lecture_working_dir)
- os.chdir(lecture_working_dir)
- if (video_status):
- if os.name == "nt":
- video_concat_command = "copy /b " + "+".join([
- f"video_{i}.seg.{video_extension}"
- for i in range(0, count)
- ]) + " encrypted_video.mp4"
- audio_concat_command = "copy /b " + "+".join([
- f"audio_{i}.seg.{audio_extension}"
- for i in range(0, count)
- ]) + " encrypted_audio.mp4"
- else:
- video_concat_command = "cat " + " ".join([
- f"video_{i}.seg.{video_extension}"
- for i in range(0, count)
- ]) + " > encrypted_video.mp4"
- audio_concat_command = "cat " + " ".join([
- f"audio_{i}.seg.{audio_extension}"
- for i in range(0, count)
- ]) + " > encrypted_audio.mp4"
- os.system(video_concat_command)
- os.system(audio_concat_command)
- decrypt(video_kid, "video", lecture_working_dir)
- decrypt(audio_kid, "audio", lecture_working_dir)
- os.chdir(home_dir)
- mux_process(video_title, lecture_working_dir, output_path)
- break
+
+ os.chdir(lecture_working_dir)
+
+ if os.name == "nt":
+ video_concat_command = "copy /b " + "+".join([
+ f"video_{i}.{video_extension}" for i in range(0, no_vid_segments)
+ ]) + " encrypted_video.mp4"
+ audio_concat_command = "copy /b " + "+".join([
+ f"audio_{i}.{audio_extension}" for i in range(0, no_aud_segments)
+ ]) + " encrypted_audio.mp4"
+ else:
+ video_concat_command = "cat " + " ".join([
+ f"video_{i}.{video_extension}" for i in range(0, no_aud_segments)
+ ]) + " > encrypted_video.mp4"
+ audio_concat_command = "cat " + " ".join([
+ f"audio_{i}.{audio_extension}" for i in range(0, no_vid_segments)
+ ]) + " > encrypted_audio.mp4"
+ os.system(video_concat_command)
+ os.system(audio_concat_command)
+ os.chdir(home_dir)
+ try:
+ decrypt(video_kid, "video", lecture_working_dir)
+ decrypt(audio_kid, "audio", lecture_working_dir)
+ os.chdir(home_dir)
+ mux_process(video_title, lecture_working_dir, output_path)
+ cleanup(lecture_working_dir)
+ except Exception as e:
+ print(f"Error: ", e)
-"""
-@author Jayapraveen
-"""
+def check_for_aria():
+ try:
+ subprocess.Popen(["aria2c", "-v"],
+ stdout=subprocess.DEVNULL,
+ stdin=subprocess.DEVNULL).wait()
+ return True
+ except FileNotFoundError:
+ return False
+ except Exception as e:
+ print(
+ "> Unexpected exception while checking for Aria2c, please tell the program author about this! ",
+ e)
+ return True
-def manifest_parser(mpd_url):
- video = []
- audio = []
- manifest = requests.get(mpd_url).text
- with open("manifest.mpd", 'w') as manifest_handler:
- manifest_handler.write(manifest)
- mpd = MPEGDASHParser.parse("./manifest.mpd")
- running_time = durationtoseconds(mpd.media_presentation_duration)
- for period in mpd.periods:
- for adapt_set in period.adaptation_sets:
- print("Processing " + adapt_set.mime_type)
- content_type = adapt_set.mime_type
- if quality and content_type == "video/mp4":
- print(adapt_set.representations[0].height, quality)
- repr = next((x for x in adapt_set.representations
- if x.height == quality), None)
- if not repr:
- qualities = []
- for rep in adapt_set.representations:
- qualities.append(rep.height)
- print(quality, qualities)
- if quality < qualities[0]:
- # they want a lower quality than whats available
- repr = adapt_set.representations[0] # Lowest Quality
- elif quality > qualities[-1]:
- # they want a higher quality than whats available
- repr = adapt_set.representations[-1] # Max Quality
- print(
- "> Could not find video with requested quality, falling back to closest!"
- )
- print("> Using quality of %s" % repr.height)
- else:
- print("> Found MPD representation with quality %s" %
- repr.height)
- else:
- repr = adapt_set.representations[-1] # Max Quality
- print("> Using max quality of %s" % repr.height)
- for segment in repr.segment_templates:
- if (segment.duration):
- print("Media segments are of equal timeframe")
- segment_time = segment.duration / segment.timescale
- total_segments = running_time / segment_time
- else:
- print("Media segments are of inequal timeframe")
-
- approx_no_segments = round(
- running_time /
- 6) + 10 # aproximate of 6 sec per segment
- print("Expected No of segments:", approx_no_segments)
- if (content_type == "audio/mp4"):
- segment_extension = segment.media.split(".")[-1]
- audio.append(approx_no_segments)
- audio.append(segment.media)
- audio.append(segment.initialization)
- audio.append(segment_extension)
- elif (content_type == "video/mp4"):
- segment_extension = segment.media.split(".")[-1]
- video.append(approx_no_segments)
- video.append(segment.media)
- video.append(segment.initialization)
- video.append(segment_extension)
- return video + audio
+def check_for_ffmpeg():
+ try:
+ subprocess.Popen(["ffmpeg"],
+ stdout=subprocess.DEVNULL,
+ stdin=subprocess.DEVNULL).wait()
+ return True
+ except FileNotFoundError:
+ return False
+ except Exception as e:
+ print(
+ "> Unexpected exception while checking for FFMPEG, please tell the program author about this! ",
+ e)
+ return True
-"""
-@author Puyodead1
-"""
+def check_for_mp4decrypt():
+ try:
+ subprocess.Popen(["mp4decrypt"],
+ stdout=subprocess.DEVNULL,
+ stdin=subprocess.DEVNULL).wait()
+ return True
+ except FileNotFoundError:
+ return False
+ except Exception as e:
+ print(
+ "> Unexpected exception while checking for MP4Decrypt, please tell the program author about this! ",
+ e)
+ return True
def download(url, path, filename):
"""
- @param: url to download file
- @param: path place to put the file
- @oaram: filename used for progress bar
+ @author Puyodead1
"""
file_size = int(requests.head(url).headers["Content-Length"])
if os.path.exists(path):
@@ -315,205 +1039,327 @@ def download(url, path, filename):
return file_size
-def process_caption(caption,
- lecture_index,
- lecture_title,
- lecture_dir,
- tries=0):
- filename = f"%s. %s_%s.%s" % (lecture_index, sanitize(lecture_title),
- caption.get("locale_id"), caption.get("ext"))
- filename_no_ext = f"%s. %s_%s" % (lecture_index, sanitize(lecture_title),
- caption.get("locale_id"))
+def download_aria(url, file_dir, filename):
+ """
+ @author Puyodead1
+ """
+ print(" > Downloading File...")
+ ret_code = subprocess.Popen([
+ "aria2c", url, "-o", filename, "-d", file_dir, "-j16", "-s20", "-x16",
+ "-c", "--auto-file-renaming=false", "--summary-interval=0"
+ ]).wait()
+ print(" > File Downloaded")
+
+ print("Return code: " + str(ret_code))
+
+
+def process_caption(caption, lecture_title, lecture_dir, keep_vtt, tries=0):
+ filename = f"%s_%s.%s" % (sanitize(lecture_title), caption.get("language"),
+ caption.get("extension"))
+ filename_no_ext = f"%s_%s" % (sanitize(lecture_title),
+ caption.get("language"))
filepath = os.path.join(lecture_dir, filename)
if os.path.isfile(filepath):
- print("> Captions '%s' already downloaded." % filename)
+ print(" > Caption '%s' already downloaded." % filename)
else:
- print(f"> Downloading captions: '%s'" % filename)
+ print(f" > Downloading caption: '%s'" % filename)
try:
- download(caption.get("url"), filepath, filename)
+ download_aria(caption.get("download_url"), lecture_dir, filename)
except Exception as e:
if tries >= 3:
print(
- f"> Error downloading captions: {e}. Exceeded retries, skipping."
+ f" > Error downloading caption: {e}. Exceeded retries, skipping."
)
return
else:
print(
- f"> Error downloading captions: {e}. Will retry {3-tries} more times."
+ f" > Error downloading caption: {e}. Will retry {3-tries} more times."
)
- process_caption(caption, lecture_index, lecture_title,
- lecture_dir, tries + 1)
- if caption.get("ext") == "vtt":
+ process_caption(caption, lecture_title, lecture_dir, keep_vtt,
+ tries + 1)
+ if caption.get("extension") == "vtt":
try:
- print("> Converting captions to SRT format...")
+ print(" > Converting caption to SRT format...")
convert(lecture_dir, filename_no_ext)
- print("> Caption conversion complete.")
- os.remove(filepath)
+ print(" > Caption conversion complete.")
+ if not keep_vtt:
+ os.remove(filepath)
except Exception as e:
- print(f"> Error converting captions: {e}")
+ print(f" > Error converting caption: {e}")
-def process_lecture(lecture, lecture_index, lecture_path, lecture_dir):
- lecture_title = lecture["title"]
- lecture_asset = lecture["asset"]
- if not skip_lectures:
- if lecture_asset["media_license_token"] == None:
- # not encrypted
- media_sources = lecture_asset["media_sources"]
- if quality: # if quality is specified, try to find the requested quality
- lecture_url = next(
- (x["src"]
- for x in media_sources if x["label"] == str(quality)),
- media_sources[0]["src"]
- ) # find the quality requested or return the best available
- else:
- lecture_url = media_sources[0][
- "src"] # best quality is the first index
+def process_lecture(lecture, lecture_path, lecture_dir, quality, access_token):
+ lecture_title = lecture.get("lecture_title")
+ is_encrypted = lecture.get("is_encrypted")
+ lecture_video_sources = lecture.get("video_sources")
+ lecture_audio_sources = lecture.get("audio_sources")
+
+ if is_encrypted:
+ if len(lecture_audio_sources) > 0 and len(lecture_video_sources) > 0:
+ lecture_working_dir = os.path.join(working_dir,
+ str(lecture.get("asset_id")))
if not os.path.isfile(lecture_path):
- try:
- download(lecture_url, lecture_path, lecture_title)
- except Exception as e:
- # We could add a retry here
- print(f"> Error downloading lecture: {e}. Skipping...")
- else:
- print(f"> Lecture '%s' is already downloaded, skipping..." %
+ video_source = lecture_video_sources[
+ -1] # last index is the best quality
+ audio_source = lecture_audio_sources[-1]
+ if isinstance(quality, int):
+ video_source = min(
+ lecture_video_sources,
+ key=lambda x: abs(int(x.get("height")) - quality))
+ if not os.path.exists(lecture_working_dir):
+ os.mkdir(lecture_working_dir)
+ print(f" > Lecture '%s' has DRM, attempting to download" %
lecture_title)
+ handle_segments(video_source, audio_source, lecture_title,
+ lecture_working_dir, lecture_path)
+ else:
+ print(
+ " > Lecture '%s' is already downloaded, skipping..." %
+ lecture_title)
else:
- # encrypted
- print(f"> Lecture '%s' has DRM, attempting to download" %
+ print(f" > Lecture '%s' is missing media links" %
lecture_title)
- lecture_working_dir = os.path.join(
- working_dir, str(lecture_asset["id"])
- ) # set the folder to download ephemeral files
- media_sources = lecture_asset["media_sources"]
+ print(len(lecture_audio_sources), len(lecture_video_sources))
+ else:
+ sources = lecture.get("sources")
+ sources = sorted(sources,
+ key=lambda x: int(x.get("height")),
+ reverse=True)
+ if sources:
+ lecture_working_dir = os.path.join(working_dir,
+ str(lecture.get("asset_id")))
if not os.path.exists(lecture_working_dir):
os.mkdir(lecture_working_dir)
if not os.path.isfile(lecture_path):
- mpd_url = next((x["src"] for x in media_sources
- if x["type"] == "application/dash+xml"), None)
- if not mpd_url:
- print(
- "> Couldn't find dash url for lecture '%s', skipping...",
- lecture_title)
- return
- media_info = manifest_parser(mpd_url)
- handle_irregular_segments(media_info, lecture_title,
- lecture_working_dir, lecture_path)
- cleanup(lecture_working_dir)
+ print(
+ " > Lecture doesn't have DRM, attempting to download..."
+ )
+ source = sources[0] # first index is the best quality
+ if isinstance(quality, int):
+ source = min(
+ sources,
+ key=lambda x: abs(int(x.get("height")) - quality))
+ try:
+ print(" ====== Selected quality: ",
+ source.get("type"), source.get("height"))
+ url = source.get("download_url")
+ source_type = source.get("type")
+ if source_type == "hls":
+ temp_filepath = lecture_path.replace(".mp4", "")
+ temp_filepath = temp_filepath + ".hls-part.mp4"
+ retVal = FFMPEG(None, url, access_token,
+ temp_filepath).download()
+ if retVal:
+ os.rename(temp_filepath, lecture_path)
+ print(" > HLS Download success")
+ else:
+ download_aria(url, lecture_dir, lecture_title + ".mp4")
+ except Exception as e:
+ print(f" > Error downloading lecture: ", e)
else:
- print("> Lecture '%s' is already downloaded, skipping..." %
- lecture_title)
-
- # process assets
- if dl_assets:
- assets = []
- all_assets = lecture["supplementary_assets"]
- for asset in all_assets:
- if asset["asset_type"] == "File":
- assets.append(asset)
- asset_filename = asset["filename"]
- download_url = next((x["file"]
- for x in asset["download_urls"]["File"]
- if x["label"] == "download"), None)
- if download_url:
- try:
- download(download_url,
- os.path.join(lecture_dir, asset_filename),
- asset_filename)
- except Exception as e:
- print(
- f"> Error downloading lecture asset: {e}. Skipping"
- )
- continue
- elif asset["asset_type"] == "Article":
- assets.append(asset)
- asset_path = os.path.join(lecture_dir,
- sanitize(lecture_title))
- with open(asset_path, 'w') as f:
- f.write(asset["body"])
- elif asset["asset_type"] == "ExternalLink":
- assets.append(asset)
- asset_path = os.path.join(lecture_dir, f"{lecture_index}. External URLs.txt")
- with open(asset_path, 'a') as f:
- f.write(f"%s : %s\n" %
- (asset["title"], asset["external_url"]))
- print("> Found %s assets for lecture '%s'" %
- (len(assets), lecture_title))
-
- # process captions
- if dl_captions:
- captions = []
- for caption in lecture_asset.get("captions"):
- if not isinstance(caption, dict):
- continue
- if caption.get("_class") != "caption":
- continue
- download_url = caption.get("url")
- if not download_url or not isinstance(download_url, str):
- continue
- lang = (caption.get("language") or caption.get("srclang")
- or caption.get("label")
- or caption.get("locale_id").split("_")[0])
- ext = "vtt" if "vtt" in download_url.rsplit(".", 1)[-1] else "srt"
- if caption_locale == "all" or caption_locale == lang:
- captions.append({
- "language": lang,
- "locale_id": caption.get("locale_id"),
- "ext": ext,
- "url": download_url
- })
-
- for caption in captions:
- process_caption(caption, lecture_index, lecture_title, lecture_dir)
+ print(
+ " > Lecture '%s' is already downloaded, skipping..." %
+ lecture_title)
+ else:
+ print(" > Missing sources for lecture", lecture)
-def parse(data):
- course_dir = os.path.join(download_dir, course_id)
+def parse_new(_udemy, quality, skip_lectures, dl_assets, dl_captions,
+ caption_locale, keep_vtt, access_token):
+ total_chapters = _udemy.get("total_chapters")
+ total_lectures = _udemy.get("total_lectures")
+ print(f"Chapter(s) ({total_chapters})")
+ print(f"Lecture(s) ({total_lectures})")
+
+ course_name = _udemy.get("course_title")
+ course_dir = os.path.join(download_dir, course_name)
if not os.path.exists(course_dir):
os.mkdir(course_dir)
- chapters = []
- lectures = []
- for obj in data:
- if obj["_class"] == "chapter":
- obj["lectures"] = []
- chapters.append(obj)
- elif obj["_class"] == "lecture" and obj["asset"][
- "asset_type"] == "Video":
- try:
- chapters[-1]["lectures"].append(obj)
- except IndexError:
- # This is caused by there not being a starting chapter
- lectures.append(obj)
- lecture_index = lectures.index(obj) + 1
- lecture_path = os.path.join(course_dir, f'{lecture_index}. {sanitize(obj["title"])}.mp4')
- process_lecture(obj, lecture_index, lecture_path, download_dir)
-
- for chapter in chapters:
- chapter_dir = os.path.join(course_dir, f'{chapters.index(chapter) + 1}. {sanitize(chapter["title"])}')
+ for chapter in _udemy.get("chapters"):
+ chapter_title = chapter.get("chapter_title")
+ chapter_index = chapter.get("chapter_index")
+ chapter_dir = os.path.join(course_dir, chapter_title)
if not os.path.exists(chapter_dir):
os.mkdir(chapter_dir)
+ print(
+ f"======= Processing chapter {chapter_index} of {total_chapters} ======="
+ )
- for lecture in chapter["lectures"]:
- lecture_index = chapter["lectures"].index(lecture) + 1
- lecture_path = os.path.join(chapter_dir, f'{lecture_index}. {sanitize(lecture["title"])}.mp4')
- process_lecture(lecture, lecture_index, lecture_path, chapter_dir)
- print("\n\n\n\n\n\n\n\n=====================")
- print("All downloads completed for course!")
- print("=====================")
+ for lecture in chapter.get("lectures"):
+ lecture_title = lecture.get("lecture_title")
+ lecture_index = lecture.get("lecture_index")
+
+ extension = lecture.get("extension")
+ print(
+ f" > Processing lecture {lecture_index} of {total_lectures}")
+ if not skip_lectures:
+ if extension == "html":
+ html_content = lecture.get("html_content").encode(
+ "ascii", "ignore").decode("utf8")
+ lecture_path = os.path.join(
+ chapter_dir, "{}.html".format(sanitize(lecture_title)))
+ try:
+ with open(lecture_path, 'w') as f:
+ f.write(html_content)
+ f.close()
+ except Exception as e:
+ print(" > Failed to write html file: ", e)
+ continue
+ else:
+ lecture_path = os.path.join(
+ chapter_dir, "{}.mp4".format(sanitize(lecture_title)))
+ process_lecture(lecture, lecture_path, chapter_dir,
+ quality, access_token)
+
+ if dl_assets:
+ assets = lecture.get("assets")
+ print(" > Processing {} asset(s) for lecture...".format(
+ len(assets)))
+
+ for asset in assets:
+ asset_type = asset.get("type")
+ filename = asset.get("filename")
+ download_url = asset.get("download_url")
+
+ if asset_type == "article":
+ print(
+ "If you're seeing this message, that means that you reached a secret area that I haven't finished! jk I haven't implemented handling for this asset type, please report this at https://github.com/Puyodead1/udemy-downloader/issues so I can add it. When reporting, please provide the following information: "
+ )
+ print("AssetType: Article; AssetData: ", asset)
+ # html_content = lecture.get("html_content")
+ # lecture_path = os.path.join(
+ # chapter_dir, "{}.html".format(sanitize(lecture_title)))
+ # try:
+ # with open(lecture_path, 'w') as f:
+ # f.write(html_content)
+ # f.close()
+ # except Exception as e:
+ # print("Failed to write html file: ", e)
+ # continue
+ elif asset_type == "video":
+ print(
+ "If you're seeing this message, that means that you reached a secret area that I haven't finished! jk I haven't implemented handling for this asset type, please report this at https://github.com/Puyodead1/udemy-downloader/issues so I can add it. When reporting, please provide the following information: "
+ )
+ print("AssetType: Video; AssetData: ", asset)
+ elif asset_type == "audio" or asset_type == "e-book" or asset_type == "file" or asset_type == "presentation":
+ try:
+ download_aria(download_url, chapter_dir, filename)
+ except Exception as e:
+ print("> Error downloading asset: ", e)
+ continue
+ elif asset_type == "external_link":
+ filepath = os.path.join(chapter_dir, filename)
+ savedirs, name = os.path.split(filepath)
+ filename = u"external-assets-links.txt"
+ filename = os.path.join(savedirs, filename)
+ file_data = []
+ if os.path.isfile(filename):
+ file_data = [
+ i.strip().lower()
+ for i in open(filename,
+ encoding="utf-8",
+ errors="ignore") if i
+ ]
+
+ content = u"\n{}\n{}\n".format(name, download_url)
+ if name.lower() not in file_data:
+ with open(filename,
+ 'a',
+ encoding="utf-8",
+ errors="ignore") as f:
+ f.write(content)
+ f.close()
+
+ subtitles = lecture.get("subtitles")
+ if dl_captions and subtitles:
+ print("Processing {} caption(s)...".format(len(subtitles)))
+ for subtitle in subtitles:
+ lang = subtitle.get("language")
+ if lang == caption_locale or caption_locale == "all":
+ process_caption(subtitle, lecture_title, chapter_dir,
+ keep_vtt)
+
+
+def course_info(course_data):
+ print("\n\n\n\n")
+ course_title = course_data.get("title")
+ chapter_count = course_data.get("total_chapters")
+ lecture_count = course_data.get("total_lectures")
+
+ print("> Course: {}".format(course_title))
+ print("> Total Chapters: {}".format(chapter_count))
+ print("> Total Lectures: {}".format(lecture_count))
+ print("\n")
+
+ chapters = course_data.get("chapters")
+ for chapter in chapters:
+ chapter_title = chapter.get("chapter_title")
+ chapter_index = chapter.get("chapter_index")
+ chapter_lecture_count = chapter.get("lecture_count")
+ chapter_lectures = chapter.get("lectures")
+
+ print("> Chapter: {} ({} of {})".format(chapter_title, chapter_index,
+ chapter_count))
+
+ for lecture in chapter_lectures:
+ lecture_title = lecture.get("lecture_title")
+ lecture_index = lecture.get("index")
+ lecture_asset_count = lecture.get("assets_count")
+ lecture_is_encrypted = lecture.get("is_encrypted")
+ lecture_subtitles = lecture.get("subtitles")
+ lecture_extension = lecture.get("extension")
+ lecture_sources = lecture.get("sources")
+ lecture_video_sources = lecture.get("video_sources")
+
+ if lecture_sources:
+ lecture_sources = sorted(lecture.get("sources"),
+ key=lambda x: int(x.get("height")),
+ reverse=True)
+ if lecture_video_sources:
+ lecture_video_sources = sorted(
+ lecture.get("video_sources"),
+ key=lambda x: int(x.get("height")),
+ reverse=True)
+
+ if lecture_is_encrypted:
+ lecture_qualities = [
+ "{}@{}x{}".format(x.get("type"), x.get("width"),
+ x.get("height"))
+ for x in lecture_video_sources
+ ]
+ elif not lecture_is_encrypted and lecture_sources:
+ lecture_qualities = [
+ "{}@{}x{}".format(x.get("type"), x.get("height"),
+ x.get("width")) for x in lecture_sources
+ ]
+
+ if lecture_extension:
+ continue
+
+ print(" > Lecture: {} ({} of {})".format(lecture_title,
+ lecture_index,
+ chapter_lecture_count))
+ print(" > DRM: {}".format(lecture_is_encrypted))
+ print(" > Asset Count: {}".format(lecture_asset_count))
+ print(" > Captions: {}".format(
+ [x.get("language") for x in lecture_subtitles]))
+ print(" > Qualities: {}".format(lecture_qualities))
+
+ if chapter_index != chapter_count:
+ print("\n\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Udemy Downloader')
- parser.add_argument(
- "-d",
- "--debug",
- dest="debug",
- action="store_true",
- help="Use test_data.json rather than fetch from the udemy api.",
- )
+ parser.add_argument("-c",
+ "--course-url",
+ dest="course_url",
+ type=str,
+ help="The URL of the course to download",
+ required=True)
parser.add_argument(
"-b",
"--bearer",
@@ -521,45 +1367,84 @@ if __name__ == "__main__":
type=str,
help="The Bearer token to use",
)
- parser.add_argument(
- "-c",
- "--course-id",
- dest="course_id",
- type=str,
- help="The ID of the course to download",
- )
parser.add_argument(
"-q",
"--quality",
dest="quality",
type=int,
- help="Download specific video quality. (144, 360, 480, 720, 1080)",
+ help=
+ "Download specific video quality. If the requested quality isn't available, the closest quality will be used. If not specified, the best quality will be downloaded for each lecture",
)
parser.add_argument(
"-l",
"--lang",
dest="lang",
type=str,
- help="The language to download for captions (Default is en)",
+ help=
+ "The language to download for captions, specify 'all' to download all captions (Default is 'en')",
)
parser.add_argument(
"--skip-lectures",
dest="skip_lectures",
action="store_true",
- help="If specified, lectures won't be downloaded.",
+ help="If specified, lectures won't be downloaded",
)
parser.add_argument(
"--download-assets",
dest="download_assets",
action="store_true",
- help="If specified, lecture assets will be downloaded.",
+ help="If specified, lecture assets will be downloaded",
)
parser.add_argument(
"--download-captions",
dest="download_captions",
action="store_true",
- help="If specified, captions will be downloaded.",
+ help="If specified, captions will be downloaded",
)
+ parser.add_argument(
+ "--keep-vtt",
+ dest="keep_vtt",
+ action="store_true",
+ help="If specified, .vtt files won't be removed",
+ )
+ parser.add_argument(
+ "--skip-hls",
+ dest="skip_hls",
+ action="store_true",
+ help=
+ "If specified, hls streams will be skipped (faster fetching) (hls streams usually contain 1080p quality for non-drm lectures)",
+ )
+ parser.add_argument(
+ "--info",
+ dest="info",
+ action="store_true",
+ help=
+ "If specified, only course information will be printed, nothing will be downloaded",
+ )
+
+ parser.add_argument(
+ "--save-to-file",
+ dest="save_to_file",
+ action="store_true",
+ help=argparse.SUPPRESS,
+ )
+ parser.add_argument(
+ "--load-from-file",
+ dest="load_from_file",
+ action="store_true",
+ help=argparse.SUPPRESS,
+ )
+
+ dl_assets = False
+ skip_lectures = False
+ dl_captions = False
+ caption_locale = "en"
+ quality = None
+ bearer_token = None
+ portal_name = None
+ course_name = None
+ keep_vtt = False
+ skip_hls = False
args = parser.parse_args()
if args.download_assets:
@@ -571,50 +1456,318 @@ if __name__ == "__main__":
if args.skip_lectures:
skip_lectures = True
if args.quality:
- if not args.quality in valid_qualities:
- print("Invalid quality specified! %s" % quality)
- sys.exit(1)
- else:
- quality = args.quality
+ quality = args.quality
+ if args.keep_vtt:
+ keep_vtt = args.keep_vtt
+ if args.skip_hls:
+ skip_hls = args.skip_hls
+
+ aria_ret_val = check_for_aria()
+ if not aria_ret_val:
+ print("> Aria2c is missing from your system or path!")
+ sys.exit(1)
+
+ ffmpeg_ret_val = check_for_aria()
+ if not ffmpeg_ret_val:
+ print("> FFMPEG is missing from your system or path!")
+ sys.exit(1)
+
+ mp4decrypt_ret_val = check_for_mp4decrypt()
+ if not mp4decrypt_ret_val:
+ print(
+ "> MP4Decrypt is missing from your system or path! (This is part of Bento4 tools)"
+ )
+ sys.exit(1)
+
+ if args.load_from_file:
+ print(
+ "> 'load_from_file' was specified, data will be loaded from json files instead of fetched"
+ )
+ if args.save_to_file:
+ print(
+ "> 'save_to_file' was specified, data will be saved to json files")
+
+ if not os.path.isfile(keyfile_path):
+ print("> Keyfile not found! Did you rename the file correctly?")
+ sys.exit(1)
load_dotenv()
+ access_token = None
if args.bearer_token:
- header_bearer = f"Bearer %s" % args.bearer_token
+ access_token = args.bearer_token
else:
- header_bearer = f"Bearer %s" % os.getenv("UDEMY_BEARER")
- if args.course_id:
- course_id = args.course_id
+ access_token = os.getenv("UDEMY_BEARER")
+
+ udemy = Udemy(access_token)
+
+ print("> Fetching course information, this may take a minute...")
+ if not args.load_from_file:
+ course_id, course_info = udemy._extract_course_info(args.course_url)
+ print("> Course information retrieved!")
+ if course_info and isinstance(course_info, dict):
+ title = _clean(course_info.get("title"))
+ course_title = course_info.get("published_title")
+ portal_name = course_info.get("portal_name")
+
+ print("> Fetching course content, this may take a minute...")
+ if args.load_from_file:
+ course_json = json.loads(
+ open(os.path.join(os.getcwd(), "saved", "course_content.json"),
+ 'r').read())
+ title = course_json.get("title")
+ course_title = course_json.get("published_title")
+ portal_name = course_json.get("portal_name")
else:
- course_id = os.getenv("UDEMY_COURSE_ID")
+ course_json = udemy._extract_course_json(args.course_url, course_id,
+ portal_name)
+ if args.save_to_file:
+ with open(os.path.join(os.getcwd(), "saved", "course_content.json"),
+ 'w') as f:
+ f.write(json.dumps(course_json))
+ f.close()
- if not course_id:
- print("> Missing Course ID!")
- sys.exit(1)
- if not header_bearer:
- print("> Missing Bearer Token!")
- sys.exit(1)
+ print("> Course content retrieved!")
+ course = course_json.get("results")
+ resource = course_json.get("detail")
- print(f"> Using course ID {course_id}")
-
- if args.debug:
- # this is for development purposes so we dont need to make tons of requests when testing
- # course data json is just stored and read from a file
- with open("test_data.json", encoding="utf8") as f:
- data = json.loads(f.read())["results"]
- parse(data)
- else:
- print("Fetching Course data, this may take a minute...")
- r = requests.get(
- f"https://udemy.com/api-2.0/courses/{course_id}/cached-subscriber-curriculum-items?fields[asset]=results,title,external_url,time_estimation,download_urls,slide_urls,filename,asset_type,captions,media_license_token,course_is_drmed,media_sources,stream_urls,body&fields[chapter]=object_index,title,sort_order&fields[lecture]=id,title,object_index,asset,supplementary_assets,view_html&page_size=10000"
- .format(course_id),
- headers={
- "Authorization": header_bearer,
- "x-udemy-authorization": header_bearer
- })
- if r.status_code == 200:
- print("Course data retrieved!")
- data = r.json()
- parse(data["results"])
+ if args.load_from_file:
+ _udemy = json.loads(
+ open(os.path.join(os.getcwd(), "saved", "_udemy.json")).read())
+ if args.info:
+ course_info(_udemy)
else:
- print("An error occurred while trying to fetch the course data! " +
- r.text)
+ parse_new(_udemy, quality, skip_lectures, dl_assets, dl_captions,
+ caption_locale, keep_vtt, access_token)
+ else:
+ _udemy = {}
+ _udemy["access_token"] = access_token
+ _udemy["course_id"] = course_id
+ _udemy["title"] = title
+ _udemy["course_title"] = course_title
+ _udemy["chapters"] = []
+ counter = -1
+
+ if resource:
+ print("> Trying to logout")
+ udemy.session.terminate()
+ print("> Logged out.")
+
+ if course:
+ print("> Processing course data, this may take a minute. ")
+ lecture_counter = 0
+ for entry in course:
+ clazz = entry.get("_class")
+ asset = entry.get("asset")
+ supp_assets = entry.get("supplementary_assets")
+
+ if clazz == "chapter":
+ lecture_counter = 0
+ lectures = []
+ chapter_index = entry.get("object_index")
+ chapter_title = "{0:02d} ".format(chapter_index) + _clean(
+ entry.get("title"))
+
+ if chapter_title not in _udemy["chapters"]:
+ _udemy["chapters"].append({
+ "chapter_title": chapter_title,
+ "chapter_id": entry.get("id"),
+ "chapter_index": chapter_index,
+ "lectures": []
+ })
+ counter += 1
+ elif clazz == "lecture":
+ lecture_counter += 1
+ lecture_id = entry.get("id")
+ if len(_udemy["chapters"]) == 0:
+ lectures = []
+ chapter_index = entry.get("object_index")
+ chapter_title = "{0:02d} ".format(
+ chapter_index) + _clean(entry.get("title"))
+ if chapter_title not in _udemy["chapters"]:
+ _udemy["chapters"].append({
+ "chapter_title": chapter_title,
+ "chapter_id": lecture_id,
+ "chapter_index": chapter_index,
+ "lectures": []
+ })
+ counter += 1
+
+ if lecture_id:
+ retVal = []
+
+ if isinstance(asset, dict):
+ asset_type = (asset.get("asset_type").lower()
+ or asset.get("assetType").lower)
+ if asset_type == "article":
+ if isinstance(supp_assets,
+ list) and len(supp_assets) > 0:
+ retVal = udemy._extract_supplementary_assets(
+ supp_assets)
+ elif asset_type == "video":
+ if isinstance(supp_assets,
+ list) and len(supp_assets) > 0:
+ retVal = udemy._extract_supplementary_assets(
+ supp_assets)
+ elif asset_type == "e-book":
+ retVal = udemy._extract_ebook(asset)
+ elif asset_type == "file":
+ retVal = udemy._extract_file(asset)
+ elif asset_type == "presentation":
+ retVal = udemy._extract_ppt(asset)
+ elif asset_type == "audio":
+ retVal = udemy._extract_audio(asset)
+
+ lecture_index = entry.get("object_index")
+ lecture_title = "{0:03d} ".format(
+ lecture_counter) + _clean(entry.get("title"))
+
+ if asset.get("stream_urls") != None:
+ # not encrypted
+ data = asset.get("stream_urls")
+ if data and isinstance(data, dict):
+ sources = data.get("Video")
+ tracks = asset.get("captions")
+ #duration = asset.get("time_estimation")
+ sources = udemy._extract_sources(
+ sources, skip_hls)
+ subtitles = udemy._extract_subtitles(tracks)
+ sources_count = len(sources)
+ subtitle_count = len(subtitles)
+ lectures.append({
+ "index": lecture_counter,
+ "lecture_index": lecture_index,
+ "lecture_id": lecture_id,
+ "lecture_title": lecture_title,
+ # "duration": duration,
+ "assets": retVal,
+ "assets_count": len(retVal),
+ "sources": sources,
+ "subtitles": subtitles,
+ "subtitle_count": subtitle_count,
+ "sources_count": sources_count,
+ "is_encrypted": False,
+ "asset_id": asset.get("id")
+ })
+ else:
+ lectures.append({
+ "index":
+ lecture_counter,
+ "lecture_index":
+ lecture_index,
+ "lectures_id":
+ lecture_id,
+ "lecture_title":
+ lecture_title,
+ "html_content":
+ asset.get("body"),
+ "extension":
+ "html",
+ "assets":
+ retVal,
+ "assets_count":
+ len(retVal),
+ "subtitle_count":
+ 0,
+ "sources_count":
+ 0,
+ "is_encrypted":
+ False,
+ "asset_id":
+ asset.get("id")
+ })
+ else:
+ # encrypted
+ data = asset.get("media_sources")
+ if data and isinstance(data, list):
+ video_media_sources, audio_media_sources = udemy._extract_media_sources(
+ data)
+ tracks = asset.get("captions")
+ # duration = asset.get("time_estimation")
+ subtitles = udemy._extract_subtitles(tracks)
+ sources_count = len(video_media_sources)
+ subtitle_count = len(subtitles)
+ lectures.append({
+ "index": lecture_counter,
+ "lecture_index": lecture_index,
+ "lectures_id": lecture_id,
+ "lecture_title": lecture_title,
+ # "duration": duration,
+ "assets": retVal,
+ "assets_count": len(retVal),
+ "video_sources": video_media_sources,
+ "audio_sources": audio_media_sources,
+ "subtitles": subtitles,
+ "subtitle_count": subtitle_count,
+ "sources_count": sources_count,
+ "is_encrypted": True,
+ "asset_id": asset.get("id")
+ })
+ else:
+ lectures.append({
+ "index":
+ lecture_counter,
+ "lecture_index":
+ lecture_index,
+ "lectures_id":
+ lecture_id,
+ "lecture_title":
+ lecture_title,
+ "html_content":
+ asset.get("body"),
+ "extension":
+ "html",
+ "assets":
+ retVal,
+ "assets_count":
+ len(retVal),
+ "subtitle_count":
+ 0,
+ "sources_count":
+ 0,
+ "is_encrypted":
+ False,
+ "asset_id":
+ asset.get("id")
+ })
+ _udemy["chapters"][counter]["lectures"] = lectures
+ _udemy["chapters"][counter]["lecture_count"] = len(
+ lectures)
+ elif clazz == "quiz":
+ lecture_id = entry.get("id")
+ if len(_udemy["chapters"]) == 0:
+ lectures = []
+ chapter_index = entry.get("object_index")
+ chapter_title = "{0:02d} ".format(
+ chapter_index) + _clean(entry.get("title"))
+ if chapter_title not in _udemy["chapters"]:
+ lecture_counter = 0
+ _udemy["chapters"].append({
+ "chapter_title": chapter_title,
+ "chapter_id": lecture_id,
+ "chapter_index": chapter_index,
+ "lectures": [],
+ })
+ counter += 1
+
+ _udemy["chapters"][counter]["lectures"] = lectures
+ _udemy["chapters"][counter]["lectures_count"] = len(
+ lectures)
+
+ _udemy["total_chapters"] = len(_udemy["chapters"])
+ _udemy["total_lectures"] = sum([
+ entry.get("lecture_count", 0) for entry in _udemy["chapters"]
+ if entry
+ ])
+
+ if args.save_to_file:
+ with open(os.path.join(os.getcwd(), "saved", "_udemy.json"),
+ 'w') as f:
+ f.write(json.dumps(_udemy))
+ f.close()
+ print("Saved parsed data to json")
+
+ if args.info:
+ course_info(_udemy)
+ else:
+ parse_new(_udemy, quality, skip_lectures, dl_assets, dl_captions,
+ caption_locale, keep_vtt, access_token)
diff --git a/pyffmpeg.py b/pyffmpeg.py
new file mode 100644
index 0000000..cd04951
--- /dev/null
+++ b/pyffmpeg.py
@@ -0,0 +1,277 @@
+#!/usr/bin/python3
+# pylint: disable=R,C,W,E
+"""
+Author : Nasir Khan (r0ot h3x49)
+Github : https://github.com/r0oth3x49
+License : MIT
+Copyright (c) 2018-2025 Nasir Khan (r0ot h3x49)
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the
+Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
+and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
+ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH
+THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+"""
+import re
+import time
+import subprocess
+import sys
+from colorama import Fore, Style
+
+
+class FFMPeg:
+
+ _PROGRESS_PATTERN = re.compile(
+ r"(frame|fps|total_size|out_time|bitrate|speed|progress)\s*\=\s*(\S+)")
+
+ def __init__(self,
+ duration,
+ url,
+ token,
+ filepath,
+ quiet=False,
+ callback=lambda *x: None):
+ self.url = url
+ self.filepath = filepath
+ self.quiet = quiet
+ self.duration = duration
+ self.callback = callback
+ self.token = token
+
+ def _command(self):
+ """
+ ffmpeg.exe -headers "Authorization: Bearer {token}" -i "" -c copy -bsf:a aac_adtstoasc out.mp4
+ """
+ command = [
+ "ffmpeg",
+ "-headers",
+ f"Authorization: Bearer {self.token}",
+ "-i",
+ f"{self.url}",
+ "-c",
+ "copy",
+ "-bsf:a",
+ "aac_adtstoasc",
+ f"{self.filepath}",
+ "-y",
+ "-progress",
+ "pipe:2",
+ ]
+ return command
+
+ def _fetch_total_duration(self, line):
+ duration_in_secs = 0
+ duration_regex = re.compile(
+ r"Duration: (\d{2}):(\d{2}):(\d{2})\.\d{2}")
+ mobj = duration_regex.search(line)
+ if mobj:
+ duration_tuple = mobj.groups()
+ duration_in_secs = (int(duration_tuple[0]) * 60 +
+ int(duration_tuple[1]) * 60 +
+ int(duration_tuple[2]))
+ else:
+ duration_in_secs = self.duration
+ return duration_in_secs
+
+ def _fetch_current_duration_done(self, time_str):
+ time_str = time_str.split(":")
+ return (int(time_str[0]) * 60 + int(time_str[1]) * 60 +
+ int(time_str[2].split(".")[0]))
+
+ def _prepare_time_str(self, secs):
+ (mins, secs) = divmod(secs, 60)
+ (hours, mins) = divmod(mins, 60)
+ if hours > 99:
+ time_str = "--:--:--"
+ if hours == 0:
+ time_str = "%02d:%02ds" % (mins, secs)
+ else:
+ time_str = "%02d:%02d:%02ds" % (hours, mins, secs)
+ return time_str
+
+ def _progress(self,
+ iterations,
+ total,
+ bytesdone,
+ speed,
+ elapsed,
+ bar_length=30,
+ fps=None):
+ offset = 0
+ filled_length = int(round(bar_length * iterations / float(total)))
+ percents = format(100.00 * (iterations * 1.0 / float(total)), ".2f")
+
+ if bytesdone <= 1048576:
+ _receiving = round(float(bytesdone) / 1024.00, 2)
+ _received = format(
+ _receiving if _receiving < 1024.00 else _receiving / 1024.00,
+ ".2f")
+ suffix_recvd = "KB" if _receiving < 1024.00 else "MB"
+ else:
+ _receiving = round(float(bytesdone) / 1048576, 2)
+ _received = format(
+ _receiving if _receiving < 1024.00 else _receiving / 1024.00,
+ ".2f")
+ suffix_recvd = "MB" if _receiving < 1024.00 else "GB"
+
+ suffix_rate = "Kb/s" if speed < 1024.00 else "Mb/s"
+ if fps:
+ suffix_rate += f" {fps}/fps"
+ if elapsed:
+ rate = ((float(iterations) - float(offset)) / 1024.0) / elapsed
+ eta = (total - iterations) / (rate * 1024.0)
+ else:
+ rate = 0
+ eta = 0
+ rate = format(speed if speed < 1024.00 else speed / 1024.00, ".2f")
+ (mins, secs) = divmod(eta, 60)
+ (hours, mins) = divmod(mins, 60)
+ if hours > 99:
+ eta = "--:--:--"
+ if hours == 0:
+ eta = "eta %02d:%02ds" % (mins, secs)
+ else:
+ eta = "eta %02d:%02d:%02ds" % (hours, mins, secs)
+ if secs == 0:
+ eta = "\n"
+
+ total_time = self._prepare_time_str(total)
+ done_time = self._prepare_time_str(iterations)
+ downloaded = f"{total_time}/{done_time}"
+
+ received_bytes = str(_received) + str(suffix_recvd)
+ percents = f"{received_bytes} {percents}"
+
+ self.hls_progress(
+ downloaded=downloaded,
+ percents=percents,
+ filled_length=filled_length,
+ rate=str(rate) + str(suffix_rate),
+ suffix=eta,
+ bar_length=bar_length,
+ )
+
+ def hls_progress(self,
+ downloaded,
+ percents,
+ filled_length,
+ rate,
+ suffix,
+ bar_length=30):
+ bar = (Fore.CYAN + Style.DIM + "#" * filled_length + Fore.WHITE +
+ Style.DIM + "-" * (bar_length - filled_length))
+ sys.stdout.write(
+ "\033[2K\033[1G\r\r{}{}[{}{}*{}{}] : {}{}{} {}% |{}{}{}| {} {}".
+ format(
+ Fore.CYAN,
+ Style.DIM,
+ Fore.MAGENTA,
+ Style.BRIGHT,
+ Fore.CYAN,
+ Style.DIM,
+ Fore.GREEN,
+ Style.BRIGHT,
+ downloaded,
+ percents,
+ bar,
+ Fore.GREEN,
+ Style.BRIGHT,
+ rate,
+ suffix,
+ ))
+ sys.stdout.flush()
+
+ def _parse_progress(self, line):
+ items = {
+ key: value
+ for key, value in self._PROGRESS_PATTERN.findall(line)
+ }
+ return items
+
+ def download(self):
+ total_time = None
+ t0 = time.time()
+ progress_lines = []
+ active = True
+ retVal = {}
+ command = self._command()
+ bytes_done = 0
+ download_speed = 0
+ try:
+ with subprocess.Popen(command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE) as proc:
+ while active:
+ elapsed = time.time() - t0
+ try:
+ line = proc.stderr.readline().decode("utf-8").strip()
+ if not total_time:
+ total_time = self._fetch_total_duration(line)
+ if "progress=end" in line:
+ try:
+ self._progress(
+ total_time,
+ total_time,
+ bytes_done,
+ download_speed,
+ elapsed,
+ )
+ except KeyboardInterrupt:
+ retVal = {
+ "status": "False",
+ "msg": "Error: KeyboardInterrupt",
+ }
+ raise KeyboardInterrupt
+ except Exception as err:
+ {"status": "False", "msg": f"Error: {err}"}
+ active = False
+ retVal = {"status": "True", "msg": "download"}
+ break
+ if "progress" not in line:
+ progress_lines.append(line)
+ else:
+ lines = "\n".join(progress_lines)
+ items = self._parse_progress(lines)
+ if items:
+ secs = self._fetch_current_duration_done(
+ items.get("out_time"))
+ _tsize = (
+ items.get("total_size").lower().replace(
+ "kb", ""))
+ _brate = (items.get("bitrate").lower().replace(
+ "kbits/s", ""))
+ fps = items.get("fps")
+ bytes_done = float(
+ _tsize) if _tsize != "n/a" else 0
+ download_speed = float(
+ _brate) if _brate != "n/a" else 0
+ try:
+ self._progress(
+ secs,
+ total_time,
+ bytes_done,
+ download_speed,
+ elapsed,
+ fps=fps,
+ )
+ except KeyboardInterrupt:
+ retVal = {
+ "status": "False",
+ "msg": "Error: KeyboardInterrupt",
+ }
+ raise KeyboardInterrupt
+ except Exception as err:
+ {"status": "False", "msg": f"Error: {err}"}
+ progress_lines = []
+ except KeyboardInterrupt:
+ active = False
+ retVal = {
+ "status": "False",
+ "msg": "Error: KeyboardInterrupt"
+ }
+ raise KeyboardInterrupt
+ except KeyboardInterrupt:
+ raise KeyboardInterrupt
+ return retVal
diff --git a/requirements.txt b/requirements.txt
index 23940f4..446e5b6 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,4 +5,6 @@ requests
python-dotenv
protobuf
webvtt-py
-pysrt
\ No newline at end of file
+pysrt
+m3u8
+colorama
\ No newline at end of file
diff --git a/sanitize.py b/sanitize.py
new file mode 100644
index 0000000..bd9aafb
--- /dev/null
+++ b/sanitize.py
@@ -0,0 +1,136 @@
+# This file is from https://github.com/r0oth3x49/udemy-dl/blob/master/udemy/sanitize.py
+
+from __future__ import unicode_literals
+
+import re
+import six
+import unicodedata
+from unidecode import unidecode
+
+
+def smart_text(s, encoding="utf-8", errors="strict"):
+ if isinstance(s, six.text_type):
+ return s
+
+ if not isinstance(s, six.string_types):
+ if six.PY3:
+ if isinstance(s, bytes):
+ s = six.text_type(s, encoding, errors)
+ else:
+ s = six.text_type(s)
+ elif hasattr(s, "__unicode__"):
+ s = six.text_type(s)
+ else:
+ s = six.text_type(bytes(s), encoding, errors)
+ else:
+ s = six.text_type(s)
+ return s
+
+
+# Extra characters outside of alphanumerics that we'll allow.
+SLUG_OK = "-_~"
+
+
+def slugify(s,
+ ok=SLUG_OK,
+ lower=True,
+ spaces=False,
+ only_ascii=False,
+ space_replacement="-"):
+ """
+ Creates a unicode slug for given string with several options.
+ L and N signify letter/number.
+ http://www.unicode.org/reports/tr44/tr44-4.html#GC_Values_Table
+ :param s: Your unicode string.
+ :param ok: Extra characters outside of alphanumerics to be allowed.
+ Default is '-_~'
+ :param lower: Lower the output string.
+ Default is True
+ :param spaces: True allows spaces, False replaces a space with the "space_replacement" param
+ :param only_ascii: True to replace non-ASCII unicode characters with
+ their ASCII representations.
+ :param space_replacement: Char used to replace spaces if "spaces" is False.
+ Default is dash ("-") or first char in ok if dash not allowed
+ :type s: String
+ :type ok: String
+ :type lower: Bool
+ :type spaces: Bool
+ :type only_ascii: Bool
+ :type space_replacement: String
+ :return: Slugified unicode string
+ """
+
+ if only_ascii and ok != SLUG_OK and hasattr(ok, "decode"):
+ try:
+ ok.decode("ascii")
+ except UnicodeEncodeError:
+ raise ValueError(
+ ('You can not use "only_ascii=True" with '
+ 'a non ascii available chars in "ok" ("%s" given)') % ok)
+
+ rv = []
+ for c in unicodedata.normalize("NFKC", smart_text(s)):
+ cat = unicodedata.category(c)[0]
+ if cat in "LN" or c in ok:
+ rv.append(c)
+ elif cat == "Z": # space
+ rv.append(" ")
+ new = "".join(rv).strip()
+
+ if only_ascii:
+ new = unidecode(new)
+ if not spaces:
+ if space_replacement and space_replacement not in ok:
+ space_replacement = ok[0] if ok else ""
+ new = re.sub("[%s\s]+" % space_replacement, space_replacement, new)
+ if lower:
+ new = new.lower()
+
+ return new
+
+
+def sanitize(title):
+ _locale = {
+ "194": "A",
+ "199": "C",
+ "286": "G",
+ "304": "I",
+ "206": "I",
+ "214": "O",
+ "350": "S",
+ "219": "U",
+ "226": "a",
+ "231": "c",
+ "287": "g",
+ "305": "i",
+ "238": "i",
+ "246": "o",
+ "351": "s",
+ "251": "u",
+ "191": "",
+ "225": "a",
+ "233": "e",
+ "237": "i",
+ "243": "o",
+ "250": "u",
+ "252": "u",
+ "168u": "u",
+ "241": "n",
+ "193": "A",
+ "201": "E",
+ "205": "I",
+ "211": "O",
+ "218": "U",
+ "220": "U",
+ "168U": "U",
+ "209": "N",
+ "223": "ss",
+ }
+ _temp = "".join([str(ord(i)) if ord(i) > 128 else i for i in title])
+ for _ascii, _char in _locale.items():
+ if _ascii in _temp:
+ _temp = _temp.replace(_ascii, _char)
+
+ ok = re.compile(r'[^\\/:*?"<>]')
+ _title = "".join(x if ok.match(x) else "_" for x in _temp)
+ return _title
\ No newline at end of file
diff --git a/vtt_to_srt.py b/vtt_to_srt.py
index 297424e..1e2cc28 100644
--- a/vtt_to_srt.py
+++ b/vtt_to_srt.py
@@ -6,8 +6,8 @@ from pysrt.srttime import SubRipTime
def convert(directory, filename):
index = 0
- vtt_filepath = os.path.join(directory, f"{filename}.vtt")
- srt_filepath = os.path.join(directory, f"{filename}.srt")
+ vtt_filepath = os.path.join(directory, filename + ".vtt")
+ srt_filepath = os.path.join(directory, filename + ".srt")
srt = open(srt_filepath, "w")
for caption in WebVTT().read(vtt_filepath):