initialize selenium branch

- Removed .env support
+ Added TOML configuration file support
+ Added selenium with "undetected_chrome_driver"
+ Fix changing logging level not working
+ Updated README to reflect code changes
This commit is contained in:
Puyodead1 2022-01-09 13:17:10 -05:00 committed by Puyodead1
parent 7621d078da
commit b922294135
No known key found for this signature in database
GPG Key ID: A4FA4FEC0DD353FC
9 changed files with 425 additions and 160 deletions

View File

@ -1 +0,0 @@
UDEMY_BEARER=Your bearer token here

3
.gitignore vendored
View File

@ -125,8 +125,7 @@ saved/
info.py
.idea/
cookies.txt
selenium_test.py
selenium_data/
config.dev.toml
temp/
*.exe
*.exe

View File

@ -72,8 +72,11 @@ You will need to use a different branch of the program, please see [feat/cookies
# Advanced Usage
```
usage: main.py [-h] -c COURSE_URL [-b BEARER_TOKEN] [-q QUALITY] [-l LANG] [-cd CONCURRENT_DOWNLOADS] [--disable-ipv6] [--skip-lectures] [--download-assets] [--download-captions] [--keep-vtt] [--skip-hls]
[--info] [--id-as-course-name] [-sc] [--save-to-file] [--load-from-file] [--log-level LOG_LEVEL] [--use-h265] [--h265-crf H265_CRF] [--h265-preset H265_PRESET] [--use-nvenc] [-v]
usage: main.py [-h] -c COURSE_URL [-b BEARER_TOKEN] [-u USERNAME] [-p PASSWORD] [-q QUALITY] [-l LANG]
[-cd CONCURRENT_DOWNLOADS] [--disable-ipv6] [--skip-lectures] [--download-assets] [--download-captions]
[--keep-vtt] [--skip-hls] [--info] [--id-as-course-name] [-sc] [--save-to-file] [--load-from-file]
[--log-level LOG_LEVEL] [--use-h265] [--h265-crf H265_CRF] [--h265-preset H265_PRESET] [--use-nvenc]
[-v]
Udemy Downloader
@ -83,9 +86,15 @@ options:
The URL of the course to download
-b BEARER_TOKEN, --bearer BEARER_TOKEN
The Bearer token to use
-u USERNAME, --username USERNAME
username
-p PASSWORD, --password PASSWORD
password
-q QUALITY, --quality QUALITY
Download specific video quality. If the requested quality isn't available, the closest quality will be used. If not specified, the best quality will be downloaded for each lecture
-l LANG, --lang LANG The language to download for captions, specify 'all' to download all captions (Default is 'en')
Download specific video quality. If the requested quality isn't available, the closest quality
will be used. If not specified, the best quality will be downloaded for each lecture
-l LANG, --lang LANG The language to download for captions, specify 'all' to download all captions (Default is
'en')
-cd CONCURRENT_DOWNLOADS, --concurrent-downloads CONCURRENT_DOWNLOADS
The number of maximum concurrent downloads for segments (HLS and DASH, must be a number 1-30)
--disable-ipv6 If specified, ipv6 will be disabled in aria2
@ -93,22 +102,26 @@ options:
--download-assets If specified, lecture assets will be downloaded
--download-captions If specified, captions will be downloaded
--keep-vtt If specified, .vtt files won't be removed
--skip-hls If specified, hls streams will be skipped (faster fetching) (hls streams usually contain 1080p quality for non-drm lectures)
--skip-hls If specified, hls streams will be skipped (faster fetching) (hls streams usually contain 1080p
quality for non-drm lectures)
--info If specified, only course information will be printed, nothing will be downloaded
--id-as-course-name If specified, the course id will be used in place of the course name for the output directory. This is a 'hack' to reduce the path length
--id-as-course-name If specified, the course id will be used in place of the course name for the output directory.
This is a 'hack' to reduce the path length
-sc, --subscription-course
Mark the course as a subscription based course, use this if you are having problems with the program auto detecting it
--save-to-file If specified, course content will be saved to a file that can be loaded later with --load-from-file, this can reduce processing time (Note that asset links expire after a certain
If this course is part of a subscription plan (Personal or Pro Plans)
--save-to-file If specified, course content will be saved to a file that can be loaded later with --load-
from-file, this can reduce processing time (Note that asset links expire after a certain
amount of time)
--load-from-file If specified, course content will be loaded from a previously saved file with --save-to-file, this can reduce processing time (Note that asset links expire after a certain amount of
time)
--load-from-file If specified, course content will be loaded from a previously saved file with --save-to-file,
this can reduce processing time (Note that asset links expire after a certain amount of time)
--log-level LOG_LEVEL
Logging level: one of DEBUG, INFO, ERROR, WARNING, CRITICAL (Default is INFO)
--use-h265 If specified, videos will be encoded with the H.265 codec
--h265-crf H265_CRF Set a custom CRF value for H.265 encoding. FFMPEG default is 28
--h265-preset H265_PRESET
Set a custom preset value for H.265 encoding. FFMPEG default is medium
--use-nvenc Whether to use the NVIDIA hardware transcoding for H.265. Only works if you have a supported NVIDIA GPU and ffmpeg with nvenc support
--use-nvenc Whether to use the NVIDIA hardware transcoding for H.265. Only works if you have a supported
NVIDIA GPU and ffmpeg with nvenc support
-v, --version show program's version number and exit
```
@ -161,6 +174,9 @@ options:
- `python main.py -c <Course URL> --use-h265 --h265-preset faster`
- Encode in H.265 using NVIDIA hardware transcoding:
- `python main.py -c <Course URL> --use-h265 --use-nvenc`
- Specify username and password (only used for subscription based courses):
- `python main.py -c <Course URL> --username cooluser@email.com --password amazingpassword123`
- `python main.py -c <Course URL> -u cooluser@email.com -p amazingpassword123`
If you encounter errors while downloading such as

30
config.toml Normal file
View File

@ -0,0 +1,30 @@
[general]
# ex: bearer_token = "xxxxxxxxxxxxxxxxxxxxx"
bearer_token =
# Automatically selects best quality if not set
quality =
caption_locale = "en"
concurrent_downloads = 10
disable_ipv6 = false
# whether to skip all lectures, useful if you only want to download captions or assets
skip_lectures = false
download_assets = false
download_captions = false
keep_vtt = false
skip_hls = false
# unused currently
skip_dash = false
# 'cache' course information, note that the download links expire after a certain amount of time so the course will have to be refreshed
save_to_file = false
# load 'cached' course information, note that the download links expire after a certain amount of time so the course will have to be refreshed
load_from_file = false
log_level = "INFO"
id_as_course_name = false
[selenium]
# ex: username = "user@email.com"
username =
# ex: password = "myCoolPassword123"
password =
# set to false if you want to see the process, just dont interact with the browser at all or stuff will probably break
headless = true

View File

@ -9,8 +9,10 @@ HEADERS = {
"Accept": "*/*",
"Accept-Encoding": None,
}
LOGIN_URL = "https://www.udemy.com/join/login-popup/?ref=&display_type=popup&loc"
LOGOUT_URL = "https://www.udemy.com/user/logout"
PORTAL_HOME = "https://{portal_name}.udemy.com/"
LOGIN_URL = "https://{portal_name}.udemy.com/join/login-popup/?locale=en_US&response_type=html&next=https%3A%2F%2Fwww.udemy.com%2F"
LOGOUT_URL = "https://{portal_name}.udemy.com/user/logout/"
COURSE_URL = "https://{portal_name}.udemy.com/api-2.0/courses/{course_id}/cached-subscriber-curriculum-items?fields[asset]=results,title,external_url,time_estimation,download_urls,slide_urls,filename,asset_type,captions,media_license_token,course_is_drmed,media_sources,stream_urls,body&fields[chapter]=object_index,title,sort_order&fields[lecture]=id,title,object_index,asset,supplementary_assets,view_html&page_size=10000"
COURSE_INFO_URL = "https://{portal_name}.udemy.com/api-2.0/courses/{course_id}/"
COURSE_SEARCH = "https://{portal_name}.udemy.com/api-2.0/users/me/subscribed-courses?fields[course]=id,url,title,published_title&page=1&page_size=500&search={course_name}"

View File

@ -1,3 +1,3 @@
{
"KeyID": "key"
"key id goes here": "key goes here"
}

480
main.py
View File

@ -4,6 +4,7 @@ import glob
import json
import logging
import os
import random
import re
import subprocess
import sys
@ -14,39 +15,44 @@ from typing import IO
import m3u8
import requests
import toml
import undetected_chromedriver as uc
import yt_dlp
from bs4 import BeautifulSoup
from coloredlogs import ColoredFormatter
from dotenv import load_dotenv
from pathvalidate import sanitize_filename
from requests.exceptions import ConnectionError as conn_error
from selenium.common.exceptions import ElementNotVisibleException
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from tqdm import tqdm
from _version import __version__
from constants import *
from tls import SSLCiphers
from utils import extract_kid
from utils import extract_kid, slow_type
from vtt_to_srt import convert
retry = 3
cookies = ""
downloader = None
logger: logging.Logger = None
dl_assets = False
skip_lectures = False
dl_captions = False
caption_locale = "en"
caption_locale: str = "en"
quality = None
bearer_token = None
portal_name = None
course_name = None
bearer_token: str = None
portal_name: str = None
course_name: str = None
keep_vtt = False
skip_hls = False
concurrent_downloads = 10
disable_ipv6 = False
save_to_file = None
load_from_file = None
course_url = None
course_url: str = None
info = None
keys = {}
id_as_course_name = False
@ -55,6 +61,10 @@ use_h265 = False
h265_crf = 28
h265_preset = "medium"
use_nvenc = False
stream: logging.StreamHandler = None
username: str = None
password: str = None
headless = True
# from https://stackoverflow.com/a/21978778/9785713
@ -65,18 +75,88 @@ def log_subprocess_output(prefix: str, pipe: IO[bytes]):
pipe.flush()
def parse_config():
global dl_assets, skip_lectures, dl_captions, caption_locale, quality, bearer_token, keep_vtt, skip_hls, concurrent_downloads, disable_ipv6, load_from_file, save_to_file, id_as_course_name, log_level, username, password, headless
filename = "config.toml"
if not os.path.isfile(filename):
logger.warning("[-] Config file not found")
return
if os.path.isfile("config.dev.toml"):
logger.info("[-] Using development config file")
filename = "config.dev.toml"
parsed_toml = toml.load(filename)
general_config = parsed_toml.get("general", {})
selenium_config = parsed_toml.get("selenium", {})
dl_assets = general_config.get("download_assets", False)
skip_lectures = general_config.get("skip_lectures", False)
dl_captions = general_config.get("download_captions", False)
caption_locale = general_config.get("caption_locale", "en")
quality = general_config.get("quality", None)
bearer_token = general_config.get("bearer_token", None)
keep_vtt = general_config.get("keep_vtt", False)
skip_hls = general_config.get("skip_hls", False)
# TODO: add support for skipping dash streams
skip_dash = general_config.get("skip_dash", False)
concurrent_downloads = general_config.get("concurrent_downloads", 10)
disable_ipv6 = general_config.get("disable_ipv6", False)
load_from_file = general_config.get("load_from_file", None)
save_to_file = general_config.get("save_to_file", None)
id_as_course_name = general_config.get("id_as_course_name", False)
log_level = general_config.get("log_level", "INFO")
username = selenium_config.get("username", None)
password = selenium_config.get("password", None)
headless = selenium_config.get("headless", True)
def create_logger():
global logger, stream
logger = logging.getLogger(__name__)
logging.root.setLevel(LOG_LEVEL)
# create a colored formatter for the console
console_formatter = ColoredFormatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
# create a regular non-colored formatter for the log file
file_formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
# create a handler for console logging
stream = logging.StreamHandler()
stream.setLevel(LOG_LEVEL)
stream.setFormatter(console_formatter)
# create a handler for file logging
file_handler = logging.FileHandler(LOG_FILE_PATH)
file_handler.setFormatter(file_formatter)
# construct the logger
logger = logging.getLogger("udemy-downloader")
logger.setLevel(LOG_LEVEL)
logger.addHandler(stream)
logger.addHandler(file_handler)
# this is the first function that is called, we parse the arguments, setup the logger, and ensure that required directories exist
def pre_run():
global cookies, dl_assets, skip_lectures, dl_captions, caption_locale, quality, bearer_token, portal_name, course_name, keep_vtt, skip_hls, concurrent_downloads, disable_ipv6, load_from_file, save_to_file, bearer_token, course_url, info, logger, keys, id_as_course_name, is_subscription_course, LOG_LEVEL, use_h265, h265_crf, h265_preset, use_nvenc
# make sure the directory exists
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
global dl_assets, skip_lectures, dl_captions, caption_locale, quality, portal_name, course_name, keep_vtt, skip_hls, concurrent_downloads, disable_ipv6, load_from_file, save_to_file, bearer_token, course_url, info, logger, keys, id_as_course_name, is_subscription_course, log_level, use_h265, h265_crf, h265_preset, use_nvenc, username, password
# make sure the logs directory exists
if not os.path.exists(LOG_DIR_PATH):
os.makedirs(LOG_DIR_PATH, exist_ok=True)
# setup a logger
create_logger()
# load config.toml and set initial settings
parse_config()
# make sure the directory exists
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
parser = argparse.ArgumentParser(description="Udemy Downloader")
parser.add_argument("-c", "--course-url", dest="course_url", type=str, help="The URL of the course to download", required=True)
parser.add_argument(
@ -86,6 +166,20 @@ def pre_run():
type=str,
help="The Bearer token to use",
)
parser.add_argument(
"-u",
"--username",
dest="username",
type=str,
help="username",
)
parser.add_argument(
"-p",
"--password",
dest="password",
type=str,
help="password",
)
parser.add_argument(
"-q",
"--quality",
@ -160,8 +254,9 @@ def pre_run():
"--subscription-course",
dest="is_subscription_course",
action="store_true",
help="Mark the course as a subscription based course, use this if you are having problems with the program auto detecting it",
help="If this course is part of a subscription plan (Personal or Pro Plans)",
)
parser.add_argument(
"--save-to-file",
dest="save_to_file",
@ -208,6 +303,7 @@ def pre_run():
)
parser.add_argument("-v", "--version", action="version", version="You are running version {version}".format(version=__version__))
# parse command line arguments, these override the config file settings
args = parser.parse_args()
if args.download_assets:
dl_assets = True
@ -253,48 +349,36 @@ def pre_run():
if args.use_nvenc:
use_nvenc = True
if args.log_level:
if args.log_level.upper() == "DEBUG":
LOG_LEVEL = logging.DEBUG
elif args.log_level.upper() == "INFO":
LOG_LEVEL = logging.INFO
elif args.log_level.upper() == "ERROR":
LOG_LEVEL = logging.ERROR
elif args.log_level.upper() == "WARNING":
LOG_LEVEL = logging.WARNING
elif args.log_level.upper() == "CRITICAL":
LOG_LEVEL = logging.CRITICAL
else:
print(f"Invalid log level: {args.log_level}; Using INFO")
LOG_LEVEL = logging.INFO
# setup a logger
logger = logging.getLogger(__name__)
logging.root.setLevel(LOG_LEVEL)
# create a colored formatter for the console
console_formatter = ColoredFormatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
# create a regular non-colored formatter for the log file
file_formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
# create a handler for console logging
stream = logging.StreamHandler()
stream.setLevel(LOG_LEVEL)
stream.setFormatter(console_formatter)
# create a handler for file logging
file_handler = logging.FileHandler(LOG_FILE_PATH)
file_handler.setFormatter(file_formatter)
# construct the logger
logger = logging.getLogger("udemy-downloader")
logger.setLevel(LOG_LEVEL)
logger.addHandler(stream)
logger.addHandler(file_handler)
log_level = args.log_level
if args.id_as_course_name:
id_as_course_name = args.id_as_course_name
if args.is_subscription_course:
is_subscription_course = args.is_subscription_course
if args.username:
username = args.username
if args.password:
password = args.password
# parse loglevel string to int
if log_level.upper() == "DEBUG":
logger.setLevel(logging.DEBUG)
stream.setLevel(logging.DEBUG)
elif log_level.upper() == "INFO":
logger.setLevel(logging.INFO)
stream.setLevel(logging.INFO)
elif log_level.upper() == "ERROR":
logger.setLevel(logging.ERROR)
stream.setLevel(logging.ERROR)
elif log_level.upper() == "WARNING":
logger.setLevel(logging.WARNING)
stream.setLevel(logging.WARNING)
elif log_level.upper() == "CRITICAL":
logger.setLevel(logging.CRITICAL)
stream.setLevel(logging.CRITICAL)
else:
logger.warning("Invalid log level: %s; Using INFO", args.log_level)
logger.setLevel(logging.INFO)
stream.setLevel(logging.INFO)
Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True)
Path(SAVED_DIR).mkdir(parents=True, exist_ok=True)
@ -306,15 +390,18 @@ def pre_run():
else:
logger.warning("> Keyfile not found! You won't be able to decrypt videos!")
# Read cookies from file
if os.path.exists(COOKIE_FILE_PATH):
with open(COOKIE_FILE_PATH, encoding="utf8", mode="r") as cookiefile:
cookies = cookiefile.read()
cookies = cookies.rstrip()
else:
logger.warning(
"No cookies.txt file was found, you won't be able to download subscription courses! You can ignore ignore this if you don't plan to download a course included in a subscription plan."
)
class Selenium:
def __init__(self):
data_dir = os.path.join(os.getcwd(), "selenium_data")
options = ChromeOptions()
options.add_argument("--profile=Selenium")
options.add_argument(f"--user-data-dir={data_dir}")
self._driver = uc.Chrome(options=options, headless=headless)
@property
def driver(self):
return self._driver
class Udemy:
@ -325,13 +412,14 @@ class Udemy:
if not self.session:
self.session, self.bearer_token = self.auth.authenticate(bearer_token=bearer_token)
if self.session and self.bearer_token:
self.session._headers.update({"Authorization": "Bearer {}".format(self.bearer_token)})
self.session._headers.update({"X-Udemy-Authorization": "Bearer {}".format(self.bearer_token)})
logger.info("Login Success")
else:
logger.fatal("Login Failure! You are probably missing an access token!")
sys.exit(1)
if not is_subscription_course:
if self.session and self.bearer_token:
self.session._headers.update({"Authorization": "Bearer {}".format(self.bearer_token)})
self.session._headers.update({"X-Udemy-Authorization": "Bearer {}".format(self.bearer_token)})
logger.info("[+] Login Success")
else:
logger.fatal("[-] Login Failure! You are probably missing an access token!")
sys.exit(1)
def _extract_supplementary_assets(self, supp_assets, lecture_counter):
_temp = []
@ -512,14 +600,15 @@ class Udemy:
for pl in playlists:
resolution = pl.stream_info.resolution
codecs = pl.stream_info.codecs
if not resolution:
continue
if not codecs:
continue
width, height = resolution
if height in seen: continue
if height in seen:
continue
# we need to save the individual playlists to disk also
playlist_path = Path(temp_path, f"index_{asset_id}_{width}x{height}.m3u8")
@ -540,7 +629,7 @@ class Udemy:
}
)
except Exception as error:
logger.error(f"Udemy Says : '{error}' while fetching hls streams..")
logger.error(f"[-] Udemy Says : '{error}' while fetching hls streams..")
return _temp
def _extract_mpd(self, url):
@ -597,14 +686,13 @@ class Udemy:
"download_url": f.get("manifest_url"),
}
)
else:
# ignore audio tracks
elif "audio" not in f.get("format_note"):
# unknown format type
# logger.debug(f"Unknown format type : {f}")
logger.debug(f"[-] Unknown format type : {f}")
continue
except Exception:
logger.exception(f"Error fetching MPD streams")
# We don't delete the mpd file yet because we can use it to download later
logger.exception(f"[-] Error fetching MPD streams")
return _temp
def extract_course_name(self, url):
@ -654,7 +742,7 @@ class Udemy:
try:
resp = self.session._get(url).json()
except conn_error as error:
logger.fatal(f"Udemy Says: Connection error, {error}")
logger.fatal(f"[-] Udemy Says: Connection error, {error}")
time.sleep(0.8)
sys.exit(1)
else:
@ -671,7 +759,7 @@ class Udemy:
else:
resp = resp.json()
except conn_error as error:
logger.fatal(f"Udemy Says: Connection error, {error}")
logger.fatal(f"[-] Udemy Says: Connection error, {error}")
time.sleep(0.8)
sys.exit(1)
except (ValueError, Exception):
@ -680,12 +768,40 @@ class Udemy:
else:
return resp
def _extract_course_json_sub(self, selenium: Selenium, course_id: str, portal_name: str):
url = COURSE_URL.format(portal_name=portal_name, course_id=course_id)
selenium.driver.get(url)
# TODO: actually wait for an element
time.sleep(2)
if "Attention" in selenium.driver.title:
# cloudflare captcha, panic
raise Exception("[-] Cloudflare captcha detected!")
# wait for page load
WebDriverWait(selenium.driver, 60).until(EC.visibility_of_element_located((By.TAG_NAME, "pre")))
time.sleep(2)
# TODO: determine if the course content is large
# get the text from the page
page_text = selenium.driver.find_element(By.TAG_NAME, "pre").text
if not page_text or not isinstance(page_text, str):
raise Exception("[-] Could not get page text!")
page_json = json.loads(page_text)
if page_json:
return page_json
else:
logger.error("[-] Failed to extract course json!")
time.sleep(0.8)
sys.exit(1)
def _extract_large_course_content(self, url):
url = url.replace("10000", "50") if url.endswith("10000") else url
try:
data = self.session._get(url).json()
except conn_error as error:
logger.fatal(f"Udemy Says: Connection error, {error}")
logger.fatal(f"[-] Udemy Says: Connection error, {error}")
time.sleep(0.8)
sys.exit(1)
else:
@ -695,7 +811,7 @@ class Udemy:
try:
resp = self.session._get(_next).json()
except conn_error as error:
logger.fatal(f"Udemy Says: Connection error, {error}")
logger.fatal(f"[-] Udemy Says: Connection error, {error}")
time.sleep(0.8)
sys.exit(1)
else:
@ -825,40 +941,20 @@ class Udemy:
results = webpage.get("results", [])
return results
def _extract_subscription_course_info(self, url):
course_html = self.session._get(url).text
soup = BeautifulSoup(course_html, "lxml")
data = soup.find("div", {"class": "ud-component--course-taking--app"})
if not data:
logger.fatal("Unable to extract arguments from course page! Make sure you have a cookies.txt file!")
self.session.terminate()
sys.exit(1)
data_args = data.attrs["data-module-args"]
data_json = json.loads(data_args)
course_id = data_json.get("courseId", None)
portal_name = self.extract_portal_name(url)
return course_id, portal_name
def _extract_course_info(self, url):
portal_name, course_name = self.extract_course_name(url)
course = {}
if not is_subscription_course:
results = self._subscribed_courses(portal_name=portal_name, course_name=course_name)
results = self._subscribed_courses(portal_name=portal_name, course_name=course_name)
course = self._extract_course(response=results, course_name=course_name)
if not course:
results = self._my_courses(portal_name=portal_name)
course = self._extract_course(response=results, course_name=course_name)
if not course:
results = self._subscribed_collection_courses(portal_name=portal_name)
course = self._extract_course(response=results, course_name=course_name)
if not course:
results = self._archived_courses(portal_name=portal_name)
course = self._extract_course(response=results, course_name=course_name)
if not course:
results = self._my_courses(portal_name=portal_name)
course = self._extract_course(response=results, course_name=course_name)
if not course:
results = self._subscribed_collection_courses(portal_name=portal_name)
course = self._extract_course(response=results, course_name=course_name)
if not course:
results = self._archived_courses(portal_name=portal_name)
course = self._extract_course(response=results, course_name=course_name)
if not course or is_subscription_course:
course_id, portal_name = self._extract_subscription_course_info(url)
course = self._extract_course_info_json(url, course_id, portal_name)
if course:
course.update({"portal_name": portal_name})
@ -979,7 +1075,92 @@ class Udemy:
}
return lecture
def _selenium_login(self, selenium: Selenium, portal_name: str):
# go to the login page
selenium.driver.get(LOGIN_URL.format(portal_name=portal_name))
# wait for the page to load, we need to see the id_name element on the page.
WebDriverWait(selenium.driver, 60).until(EC.presence_of_element_located((By.NAME, "email")))
# find the email, password, and submit button
email_elem = selenium.driver.find_element(By.NAME, "email")
password_elem = selenium.driver.find_element(By.NAME, "password")
submit_btn_elem = selenium.driver.find_element(By.XPATH, '//*[@id="udemy"]/div[1]/div[2]/div/div/form/button')
# select the email field and enter the email
ActionChains(selenium.driver).move_to_element(email_elem).click().perform()
email_elem.clear()
slow_type(email_elem, username)
# select the password field and enter the password
ActionChains(selenium.driver).move_to_element(password_elem).click().perform()
password_elem.clear()
slow_type(password_elem, password)
# click the submit button
ActionChains(selenium.driver).move_to_element(submit_btn_elem).click().perform()
# TODO: handle failed logins
# wait for the page to load
WebDriverWait(selenium.driver, 60).until(EC.title_contains("Online Courses - Learn Anything, On Your Schedule | Udemy"))
def _extract_course_info_sub(self, selenium: Selenium, course_url: str):
"""
Extract course information for subscription based courses use selenium
"""
portal_name = self.extract_portal_name(course_url)
portal_url = PORTAL_HOME.format(portal_name=portal_name)
selenium.driver.get(portal_url)
# wait for the page to load
WebDriverWait(selenium.driver, 60).until(EC.title_contains("Online Courses - Learn Anything, On Your Schedule | Udemy"))
# we need to check if we are logged in or not
is_authenticated = selenium.driver.execute_script("return window.UD.me.is_authenticated")
print("Is Authenticated: " + str(is_authenticated))
if not is_authenticated:
if not username or not password:
logger.fatal("Username or password not provided, cannot continue")
selenium.driver.quit()
sys.exit(1)
self._selenium_login(selenium, portal_name)
# go to the course page
selenium.driver.get(course_url)
# wait for either the body to be loaded or for the title to contain Attention (cloudflare captcha)
WebDriverWait(selenium.driver, 60).until(
EC.presence_of_element_located((By.CLASS_NAME, "ud-component--course-taking--app")) or EC.title_contains("Attention")
)
# check if we get a cloudflare captcha
if "Attention" in selenium.driver.title:
# cloudflare captcha, panic
raise Exception("Cloudflare captcha detected!")
# get the body element
data = selenium.driver.find_element(By.CLASS_NAME, "ud-component--course-taking--app")
# extract the course data attribute
data_args = data.get_attribute("data-module-args")
data_args = data_args.replace("quot;", '"')
data_json = json.loads(data_args)
course_id = data_json.get("courseId", None)
# go to the course info json page
course_url = COURSE_INFO_URL.format(portal_name=portal_name, course_id=course_id)
selenium.driver.get(course_url)
# wait for pre tag
WebDriverWait(selenium.driver, 60).until(EC.visibility_of_element_located((By.TAG_NAME, "pre")))
# get the text from the page
page_text = selenium.driver.find_element(By.TAG_NAME, "pre").text
if not page_text or not isinstance(page_text, str):
raise Exception("[-] Could not get page text!")
course = json.loads(page_text)
course.update({"portal_name": portal_name})
return course_id, course
class Session(object):
def __init__(self):
self._headers = HEADERS
@ -994,7 +1175,6 @@ class Session(object):
def _set_auth_headers(self, bearer_token=""):
self._headers["Authorization"] = "Bearer {}".format(bearer_token)
self._headers["X-Udemy-Authorization"] = "Bearer {}".format(bearer_token)
self._headers["Cookie"] = cookies
def _get(self, url):
for i in range(10):
@ -1002,8 +1182,9 @@ class Session(object):
if session.ok or session.status_code in [502, 503]:
return session
if not session.ok:
logger.error("Failed request " + url)
logger.error(f"{session.status_code} {session.reason}, retrying (attempt {i} )...")
logger.error(f"[-] Failed request: {url}")
logger.debug(session.text)
logger.error(f"[-] {session.status_code} {session.reason}, retrying (attempt {i} )...")
time.sleep(0.8)
def _post(self, url, data, redirect=True):
@ -1118,7 +1299,7 @@ class UdemyAuth(object):
return self._session, bearer_token
else:
self._session._set_auth_headers()
return None, None
return self._session, None
def durationtoseconds(period):
@ -1139,7 +1320,7 @@ def durationtoseconds(period):
return total_time
else:
logger.error("Duration Format Error")
logger.error("[-] Duration Format Error")
return None
@ -1168,9 +1349,7 @@ def mux_process(video_title, video_filepath, audio_filepath, output_path):
transcode, video_filepath, audio_filepath, codec, h265_crf, h265_preset, video_title, output_path
)
else:
command = 'ffmpeg -y -i "{}" -i "{}" -c:v copy -c:a copy -fflags +bitexact -map_metadata -1 -metadata title="{}" "{}"'.format(
video_filepath, audio_filepath, video_title, output_path
)
command = 'ffmpeg -y -i "{}" -i "{}" -c:v copy -c:a copy -fflags +bitexact -map_metadata -1 -metadata title="{}" "{}"'.format(video_filepath, audio_filepath, video_title, output_path)
else:
if use_h265:
command = 'nice -n 7 ffmpeg {} -y -i "{}" -i "{}" -c:v libx265 -vtag hvc1 -crf {} -preset {} -c:a copy -fflags +bitexact -map_metadata -1 -metadata title="{}" "{}"'.format(
@ -1195,7 +1374,7 @@ def decrypt(kid, in_filepath, out_filepath):
try:
key = keys[kid.lower()]
except KeyError:
raise KeyError("Key not found")
raise KeyError("[-] Key not found")
if os.name == "nt":
command = f'shaka-packager --enable_raw_key_decryption --keys key_id={kid}:key={key} input="{in_filepath}",stream_selector="0",output="{out_filepath}"'
@ -1323,22 +1502,23 @@ def handle_segments(url, format_id, video_title, output_path, lecture_file_name,
ret_code = process.wait()
logger.info("> Lecture Tracks Downloaded")
logger.debug("[-] Return code: " + str(ret_code))
if ret_code != 0:
logger.warning("Return code from the downloader was non-0 (error), skipping!")
logger.warning("[-] Return code from the downloader was non-0 (error), skipping!")
return
try:
video_kid = extract_kid(video_filepath_enc)
logger.info("KID for video file is: " + video_kid)
except Exception:
logger.exception(f"Error extracting video kid")
logger.exception(f"[-] Error extracting video kid")
return
try:
audio_kid = extract_kid(audio_filepath_enc)
logger.info("KID for audio file is: " + audio_kid)
except Exception:
logger.exception(f"Error extracting audio kid")
logger.exception(f"[-] Error extracting audio kid")
return
try:
@ -1365,7 +1545,7 @@ def handle_segments(url, format_id, video_title, output_path, lecture_file_name,
os.remove(video_filepath_dec)
os.remove(audio_filepath_dec)
except Exception:
logger.exception(f"Error: ")
logger.exception(f"[-] Error: ")
finally:
os.chdir(HOME_DIR)
# if the url is a file url, we need to remove the file after we're done with it
@ -1509,7 +1689,18 @@ def process_lecture(lecture, lecture_path, lecture_file_name, chapter_dir):
source_type = source.get("type")
if source_type == "hls":
temp_filepath = lecture_path.replace(".mp4", ".%(ext)s")
cmd = ["yt-dlp", "--enable-file-urls", "--force-generic-extractor", "--concurrent-fragments", f"{concurrent_downloads}", "--downloader", "aria2c", "-o", f"{temp_filepath}", f"{url}"]
cmd = [
"yt-dlp",
"--enable-file-urls",
"--force-generic-extractor",
"--concurrent-fragments",
f"{concurrent_downloads}",
"--downloader",
"aria2c",
"-o",
f"{temp_filepath}",
f"{url}",
]
if disable_ipv6:
cmd.append("--downloader-args")
cmd.append('aria2c:"--disable-ipv6"')
@ -1743,31 +1934,44 @@ def main():
if save_to_file:
logger.info("> 'save_to_file' was specified, data will be saved to json files")
load_dotenv()
if bearer_token:
bearer_token = bearer_token
else:
bearer_token = os.getenv("UDEMY_BEARER")
udemy = Udemy(bearer_token)
if is_subscription_course:
selenium = Selenium()
logger.info("> Fetching course information, this may take a minute...")
if not load_from_file:
course_id, course_info = udemy._extract_course_info(course_url)
if is_subscription_course:
logger.info("> Fetching course information as a subscription course, this may take a minute...")
course_id, course_info = udemy._extract_course_info_sub(selenium, course_url)
else:
logger.info("> Fetching course information, this may take a minute...")
course_id, course_info = udemy._extract_course_info(course_url)
logger.info("> Course information retrieved!")
if course_info and isinstance(course_info, dict):
title = sanitize_filename(course_info.get("title"))
course_title = course_info.get("published_title")
portal_name = course_info.get("portal_name")
logger.info("> Fetching course content, this may take a minute...")
if load_from_file:
logger.info("> Fetching course content, this may take a minute...")
if is_subscription_course:
# add some delay before switching pages to try and avoid captchas
delay = random.randint(1, 5)
time.sleep(delay)
course_json = udemy._extract_course_json_sub(selenium, course_id, portal_name)
else:
course_json = udemy._extract_course_json(course_url, course_id, portal_name)
else:
logger.info("> Loading cached course content, this may take a minute...")
course_json = json.loads(open(os.path.join(os.getcwd(), "saved", "course_content.json"), encoding="utf8", mode="r").read())
title = course_json.get("title")
course_title = course_json.get("published_title")
portal_name = course_json.get("portal_name")
else:
course_json = udemy._extract_course_json(course_url, course_id, portal_name)
# close selenium if it's running
if selenium:
selenium.driver.quit()
if save_to_file:
with open(os.path.join(os.getcwd(), "saved", "course_content.json"), encoding="utf8", mode="w") as f:
f.write(json.dumps(course_json))

View File

@ -1,7 +1,6 @@
mpegdash
tqdm
requests
python-dotenv
protobuf==3.20.0
webvtt-py
pysrt
@ -15,3 +14,6 @@ lxml
six
pathvalidate
coloredlogs
toml
selenium
undetected-chromedriver

View File

@ -1,8 +1,14 @@
import mp4parse
import codecs
import widevine_pssh_pb2
import base64
import codecs
import os
import random
import time
from selenium.webdriver.remote.webelement import WebElement
import mp4parse
import widevine_pssh_pb2
def extract_kid(mp4_file):
"""
@ -32,4 +38,11 @@ def extract_kid(mp4_file):
return content_id.decode("utf-8")
# No Moof or PSSH header found
return None
return None
def slow_type(element: WebElement, text: str):
for character in text:
element.send_keys(character)
delay = random.randint(1, 5) / 10
time.sleep(delay)