diff --git a/README.md b/README.md index aeac2a3..03236a3 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ The python package `browser_cookie3` doesn't seem to be functional out of the bo # Setup * `git clone https://gitea.quinten0508.com/quinten/npo` * `cd npo` +* Download [N_m3u8DL-RE](https://github.com/nilaoda/N_m3u8DL-RE) and [mp4decrypt](https://www.bento4.com/downloads/) and put `N_m3u8DL-RE.exe` and `mp4decrypt.exe` in the root project folder * Download [`/cdm/wks.py`](https://github.com/SASUKE-DUCK/pywks/blob/main/cdm/wks.py) and put it in an empty `/cdm` folder within the `npo` folder -* Download [N_m3u8DL-RE](https://github.com/nilaoda/N_m3u8DL-RE) and [mp4decrypt](https://www.bento4.com/downloads/) * Add your own extracted android keys in `cdm/devices/android_generic/` (you can use [KeyDive](https://cdm-project.com/Android-Tools/KeyDive) or [possibly this guide](https://forum.videohelp.com/threads/408031-Dumping-Your-own-L3-CDM-with-Android-Studio) to extract them): * `device_client_id_blob` * `device_private_key` diff --git a/npo all-in-one.py b/npo all-in-one.py index 3dbe927..578aaa9 100644 --- a/npo all-in-one.py +++ b/npo all-in-one.py @@ -4,15 +4,17 @@ # * pip install -r requirements.txt -import argparse -import requests -import subprocess -import os -from bs4 import BeautifulSoup -import json +from datetime import datetime # unix timestamps from content published dates +import sys # proper process exiting if you messed up! +import argparse # your -url and -file options +import requests # sending web requests +import subprocess # multiprocessing +import os # file operations +import re # regex for filename sanitizing so it'll actually save (thanks "Wie is de Mol? Belgiƫ 2025" - question marks are not allowed) +from unidecode import unidecode # see above import platform # check for windows OS import shutil # check for ffmpeg in PATH -import browser_cookie3 # cookies for premium accs +import rookiepy # replaced browser_cookie3 with rookiepy from fake_useragent import UserAgent # sets useragent import concurrent.futures # concurrent downloads when using a -file from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor @@ -20,16 +22,16 @@ from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtract # dont need any of these headers but makes it look like normal clients at least # for extra "normal behavior": save the UA chosen here in some temp file so we can use the same one every time this utility is run headers = { - 'User-Agent': UserAgent(platforms='pc', min_version=122.0).random, - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', - 'Accept-Language': 'en-US,en;q=0.5', - 'Cache-Control': 'no-cache', + 'User-Agent': UserAgent(platforms='pc', min_version=122.0).random, + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', + 'Accept-Language': 'en-US,en;q=0.5', + 'Cache-Control': 'no-cache', } if platform.system() == "Windows": - windows_flag = True + windows_flag = True else: - windows_flag = False + windows_flag = False parser = argparse.ArgumentParser(description='PYWKS-NPO') @@ -39,248 +41,280 @@ args = parser.parse_args() def parse_url_file(file_path): - with open(file_path, 'r') as file: - urls = [line.strip() for line in file] - return urls + with open(file_path, 'r') as file: + urls = [line.strip() for line in file] + return urls if args.file and args.url: - print("ERR: Please specify just one argument.") - print("-url: input NPO video URL") - print("-file: input a file with NPO video URLS, one per line") - exit() + print("ERR: Please specify just one argument.") + print("-url: input NPO video URL") + print("-file: input a file with NPO video URLS, one per line") + exit() elif args.file: - urls = parse_url_file(args.file) + urls = parse_url_file(args.file) elif args.url: - urls = [args.url] + urls = [args.url] else: - print("ERR: Please input your URL(s).") - print("-url: input NPO video URL") - print("-file: input a file with NPO video URLS, one per line") - exit() + print("ERR: Please input your URL(s).") + print("-url: input NPO video URL") + print("-file: input a file with NPO video URLS, one per line") + exit() def find_cookies(): - print("NPO Plus subscribers are able to download in 1080p instead of 540p.") - print("Are you an NPO Plus subscriber and logged in on your browser? (y/N)") - userinput = input().lower() - print("\033[F\033[K\033[F\033[K\033[F\033[K") - if not userinput or userinput.lower() != 'y': - return + print("NPO Plus subscribers are able to download in 1080p instead of 540p.") + print("Are you an NPO Plus subscriber and logged in on your browser? (y/N)") + userinput = input().lower() + print("\033[F\033[K\033[F\033[K\033[F\033[K") + if not userinput or userinput.lower() != 'y': + return -# browser_cookie3.load() should use ALL browsers' cookies. If this doesn't work, replace browser_cookie3.load with browser_cookie3.. -# See notes at the end of this script for possible options. Example: browser_cookie3.chrome or browser_cookie3.librewolf. - cookies = browser_cookie3.load(domain_name='npo.nl') - return cookies +# Now using rookie instead of browser_cookie3, which supports a TON of browsers and works with chromium again. +# check here for compatibility https://github.com/thewh1teagle/rookie?tab=readme-ov-file#contribute- + cookies = rookiepy.load(["npo.nl"]) + cookies = rookiepy.to_cookiejar(cookies) + return cookies -def find_targetId(url): - # Get full HTML and extract productId and episode number - # "future proof" :) - response_targetId = requests.get(url) - content = response_targetId.content +def find_content_type(url): + content_type = url.split("/")[4] # 'video' or 'serie' + return content_type - try: - url_split = url.split("/") - target_slug = url_split[7] - except: - print("URL invalid.") - print("URL format: https://npo.nl/start/serie/wie-is-de-mol/seizoen-24/wie-is-de-mol_56/afspelen") - print(f"Your URL: {url}") - exit() +def find_content_info(url, content_type): - soup = BeautifulSoup(content, 'html.parser') - script_tag = soup.find('script', {'id': '__NEXT_DATA__'}) + if content_type == 'serie': + # url safetycheck - no way for me to grab the "latest" video from a series without reverse engineering about a megabyte of minified js aafaik :( + if len(url.split("/")) < 8: + print("\n\nERROR: URL invalid!\n" \ + "You are currently on the homepage of whatever series it is you want to download, not on the episode-specific page.\n" \ + "Please click on the episode you want to download so your url becomes something like ../serie//seizoen*/episode/...\n" \ + f"Your current url is: {url}") + sys.exit(1) - if script_tag: - script_content = script_tag.contents[0] - else: - print("Script tag not found.") + # grab "slug" from url - not my word this is what they call it + # with the found slug we can grab the productid which we need to make our second request + params = { + 'slug': url.split("/")[7] + } + response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params) + data = response.json() + + content_info = { + 'seasonnumber': data.get('season', {}).get('seasonKey'), + 'episodetitle': data.get("title"), + 'episodenumber': data.get("programKey"), + } + # some shows have this set to `None`, do better NPO! + published_ts = data.get('publishedDateTime') + if published_ts is not None: + content_info['episodedate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d") - def search(data, target_slug): - if isinstance(data, list): - for item in data: - result = search(item, target_slug) - if result: - return result - elif isinstance(data, dict): - for key, value in data.items(): - if key == "slug" and value == target_slug: - return data.get("productId"), data.get("programKey") - else: - result = search(value, target_slug) - if result: - return result - return None + elif content_type == 'video': + params = { + 'slug': url.split("/")[5] + } - data_dict = json.loads(script_content) - target_product_id = search(data_dict, target_slug) - return target_product_id + response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params) + data = response.json() + + content_info = { + 'videotitle': data.get("title"), + } + + # some videos have this set to `None`, do better NPO! + published_ts = data.get('publishedDateTime') + if published_ts is not None: + content_info['videodate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d") -def find_CSRF(targetId, plus_cookie): - response_CSRF = requests.get('https://npo.nl/start/api/auth/session', headers=headers, cookies=plus_cookie) - response_cookies = response_CSRF.cookies.get_dict() - - json_productId = { - 'productId': targetId, - } - - url = f'https://npo.nl/start/api/domain/player-token' - response_token = requests.get(url, cookies=response_cookies, headers=headers, params=json_productId) - token = response_token.json()["jwt"] - return token + productid = data.get("productId") + return productid, content_info -def find_MPD(token, url, plus_cookie): - headers['Authorization'] = token +def find_token(productid, plus_cookie): + params = { + 'productId': productid, + } - json_auth = { - 'profileName': 'dash', - 'drmType': 'widevine', - 'referrerUrl': url, - } - response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_auth, cookies=plus_cookie) - response_data = response.json() - stream_data = response_data.get('stream', {}) + response = requests.get('https://npo.nl/start/api/domain/player-token', params=params, cookies=plus_cookie) + token = response.json().get('jwt') + return token - if stream_data.get('streamURL'): - return stream_data - else: - print("NO MPD URL - BAD TOKEN") - print(response_data) - exit() + + + +def find_MPD(token, url): + headers = { + 'Authorization': token + } + + json_data = { + 'profileName': 'dash', + 'drmType': 'widevine', + 'referrerUrl': url + } + response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_data) + + response_data = response.json() + stream_data = response_data.get('stream', {}) + + if stream_data.get('streamURL'): + return stream_data + else: + print("NO MPD URL - BAD TOKEN") + print(response_data) + print(stream_data.get('streamURL')) def find_PSSH(mpd): - mpd_url = mpd.get('streamURL') + mpd_url = mpd.get('streamURL') - response = requests.get(mpd_url, headers=headers) - pssh_extractor = PsshExtractor(response.text) - pssh_value = pssh_extractor.extract_pssh() - return pssh_value, mpd_url + response = requests.get(mpd_url, headers=headers) + pssh_extractor = PsshExtractor(response.text) + pssh_value = pssh_extractor.extract_pssh() + return pssh_value, mpd_url def find_key(mpd, pssh): - headers_license = { - 'x-custom-data': mpd.get('drmToken'), - 'origin': 'https://start-player.npo.nl', - 'referer': 'https://start-player.npo.nl/', - } + headers_license = { + 'x-custom-data': mpd.get('drmToken'), + 'origin': 'https://start-player.npo.nl', + 'referer': 'https://start-player.npo.nl/', + } - cert_b64 = None - key_extractor = KeyExtractor(pssh, cert_b64, "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication", headers_license) - keys = key_extractor.get_keys() - wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64=cert_b64, device=device_android_generic) - raw_challenge = wvdecrypt.get_challenge() - data = raw_challenge - for key in keys: - if isinstance(key, list): - if key: - for key_str in key: - return key_str + cert_b64 = None + key_extractor = KeyExtractor(pssh, cert_b64, "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication", headers_license) + keys = key_extractor.get_keys() + wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64=cert_b64, device=device_android_generic) + raw_challenge = wvdecrypt.get_challenge() + data = raw_challenge + for key in keys: + if isinstance(key, list): + if key: + for key_str in key: + return key_str def check_prereq(): - if windows_flag == True: - prereq_filelist = ['mp4decrypt.exe', 'N_m3u8DL-RE.exe'] - else: - prereq_filelist = ['mp4decrypt', 'N_m3u8DL-RE'] + if windows_flag == True: + prereq_filelist = ['mp4decrypt.exe', 'N_m3u8DL-RE.exe'] + else: + prereq_filelist = ['mp4decrypt', 'N_m3u8DL-RE'] - for file in prereq_filelist: - if not os.path.isfile(file): - print(f"ERR: {file} not found!") - print("Please check your directory and try again.") - exit() - if shutil.which("ffmpeg") is None: - print("ffmpeg not found in PATH.") - exit() + for file in prereq_filelist: + if not os.path.isfile(file): + print(f"ERR: {file} not found!") + print("Please check your directory and try again.") + sys.exit(1) + if shutil.which("ffmpeg") is None: + print("ffmpeg not found in PATH.") + sys.exit(1) -def create_filename(url, programKey): -# season title -# 1 2 3 4 5 6 7 8 (optional) -# create filename based on input URL: https://npo.nl/start/serie /wie-is-de-mol /seizoen-24 /wie-is-de-mol_56 /afspelen -# https://npo.nl/start/serie /de-avondshow-met-arjen-lubach /seizoen-8_1 /de-avondshow-met-arjen-lubach_93 /afspelen -# https://npo.nl/start/serie /taarten-van-abel /seizoen-17 /joto /afspelen - url_split = url.split("/") - title = url_split[7].split("_")[0] - season = url_split[6].split("_")[0] - filename_enc = title + "_" + season + "_ep-" + programKey + "_encrypted" - filename = filename_enc.replace("_encrypted", "") - return filename_enc, filename +def create_filename(url, content_info, content_type): + if content_type == 'serie': + # grab slug from url + url_split = url.split("/") + seasontitle = url_split[5].split("_")[0] -def download(mpd_url, filename_enc, productId, filename): + filename = f"{seasontitle}_S{content_info['seasonnumber']}E{content_info['episodenumber']}_{content_info['episodetitle']}" + if 'episodedate' in content_info: + filename += f"_{content_info['episodedate']}" + + elif content_type == 'video': + filename = f"{content_info['videotitle']}" + if 'videodate' in content_info: + filename += f"_{content_info['videodate']}" + + + # unidecode converts unicode to ascii (e.g. removes accents on characters) + # "takes a string object, possibly containing non-ASCII characters, and returns a string that can be safely encoded to ASCII" + filename = unidecode(filename).replace(' ', '_') + # remove everything not a-z, A-Z, 0-9, -, _ + filename = re.sub(r'[^a-zA-Z0-9\-_]', '', filename) + filename_enc = f"{filename}_encrypted" + + return filename_enc, filename + + + +def download(mpd_url, filename_enc, productid, filename): # output: filename.m4a (audio), filename.mp4 (video), filename.vtt (subtitles) - subtitle_url = f'https://cdn.npoplayer.nl/subtitles/nl/{productId}.vtt' - response = requests.get(subtitle_url) - with open(f"{filename}.vtt", 'wb') as subtitle_file: - subtitle_file.write(response.content) - if windows_flag == True: - subprocess.run(['N_m3u8DL-RE.exe', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) - else: - subprocess.run(['N_m3u8DL-RE', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) + subtitle_url = f'https://cdn.npoplayer.nl/subtitles/nl/{productid}.vtt' + response = requests.get(subtitle_url) + with open(f"{filename}.vtt", 'wb') as subtitle_file: + subtitle_file.write(response.content) + if windows_flag == True: + subprocess.run(['N_m3u8DL-RE.exe', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) + else: + subprocess.run(['N_m3u8DL-RE', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) def decrypt(key, filename_enc, filename): - if windows_flag == True: - subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) - subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) - else: - subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) - subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) + if windows_flag == True: + subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) + subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) + else: + subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) + subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) def merge(filename): - ffmpeg_command = [ - 'ffmpeg', '-v', 'quiet', # '-v stats', - '-i', filename + "_video.mp4", - '-i', filename + "_audio.m4a", - '-i', filename + ".vtt", # Subtitle file - '-c:v', 'copy', # Copy video codec - '-c:a', 'copy', # Copy audio codec - '-c:s', 'mov_text', # Subtitle codec for MP4 - '-map', '0:v:0', # Map video stream - '-map', '1:a:0', # Map audio stream - '-map', '2:s:0', # Map subtitle stream - '-strict', 'experimental', - filename + ".mp4" - ] - - subprocess.run(ffmpeg_command) + ffmpeg_command = [ + 'ffmpeg', '-v', 'quiet', # '-v stats', + '-i', filename + "_video.mp4", + '-i', filename + "_audio.m4a", + '-i', filename + ".vtt", # Subtitle file (seems to be present on NPO's side even if it's empty / the content has no subs) + '-c:v', 'copy', # Copy video codec + '-c:a', 'copy', # Copy audio codec + '-c:s', 'mov_text', # Subtitle codec for MP4 + '-map', '0:v:0', # Map video stream + '-map', '1:a:0', # Map audio stream + '-map', '2:s:0', # Map subtitle stream + '-strict', 'experimental', + filename + ".mp4" + ] + + subprocess.run(ffmpeg_command) def clean(filename_enc, filename): - os.remove(filename_enc + ".mp4") - os.remove(filename_enc + ".m4a") - os.remove(filename + "_audio.m4a") - os.remove(filename + "_video.mp4") - os.remove(filename + ".vtt") + os.remove(filename_enc + ".mp4") + os.remove(filename_enc + ".m4a") + os.remove(filename + "_audio.m4a") + os.remove(filename + "_video.mp4") + os.remove(filename + ".vtt") def check_file(filename): - if not os.path.exists(filename + ".mp4"): - print("File not found. Continue anyway? (y/N)") - userinput = input().lower() - if not userinput or userinput != 'y': - exit() + if not os.path.exists(filename + ".mp4"): + print("File not found. Continue anyway? (y/N)") + userinput = input().lower() + if not userinput or userinput != 'y': + sys.exit(1) def execute(url, plus_cookie, process_no): - productId, programKey = find_targetId(url) - token = find_CSRF(productId,plus_cookie) - mpd = find_MPD(token, url, plus_cookie) - pssh, mpd_url = find_PSSH(mpd) - key = find_key(mpd, pssh) - check_prereq() - filename_enc, filename = create_filename(url, programKey) - download(mpd_url, filename_enc, productId, filename) - decrypt(key, filename_enc, filename) - merge(filename) - clean(filename_enc, filename) - check_file(filename) - return process_no # keeps track of process index to return x/y videos completed message + + content_type = find_content_type(url) + productid, content_info = find_content_info(url, content_type) + token = find_token(productid, plus_cookie) + mpd = find_MPD(token, url) + pssh, mpd_url = find_PSSH(mpd) + key = find_key(mpd, pssh) + check_prereq() + + + + filename_enc, filename = create_filename(url, content_info, content_type) + download(mpd_url, filename_enc, productid, filename) + decrypt(key, filename_enc, filename) + merge(filename) + clean(filename_enc, filename) + check_file(filename) + return process_no # keeps track of process index to return x/y videos completed message @@ -288,33 +322,28 @@ plus_cookie = find_cookies() max_workers = min(os.cpu_count(), len(urls)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(execute, url, plus_cookie, i + 1) for i, url in enumerate(urls)] + futures = [executor.submit(execute, url, plus_cookie, i + 1) for i, url in enumerate(urls)] + + completed_videos = 0 + print(f"0/{len(urls)} videos completed") + for future in concurrent.futures.as_completed(futures): + result = future.result() + completed_videos += 1 + print("\033[F\033[K\033[F\033[K") + print(f"{completed_videos}/{len(urls)} video{'s'[:len(urls) != 1]} completed") + - completed_videos = 0 - print(f"0/{len(urls)} videos completed") - for future in concurrent.futures.as_completed(futures): - result = future.result() - completed_videos += 1 - print("\033[F\033[K\033[F\033[K") - print(f"{completed_videos}/{len(urls)} video{'s'[:len(urls) != 1]} completed") ######### # NOTES # ######### -# The downloader *should* work across every platform, linux/mac/win. -# It has not been extensively tested on anything but windows. DM me if you need help :D -# Discord: quinten._. (That includes the ._.) +# The downloader *should* work across every platform, linux/mac/win. +# It has not been tested on anything but windows though. +# I've tried my best to comment the code, but I understand if it's messy and overwhelming. +# Most of the lines are either: +# a) getting relevant cookies/keys/urls by mimicking what your browser would do: getting an ID, using that to get a key, using that to get a URl, etc +# b) pre- and post processing: creating nice filenames, extracting info for those filenames, downloading, decrypting, merging files, etc -# Supported browsers for NPO Plus cookies: -# (https://github.com/borisbabic/browser_cookie3#testing-dates--ddmmyy) -# * Chrome -# * Firefox -# * LibreWolf -# * Opera -# * Opera GX -# * Edge -# * Chromium -# * Brave -# * Vivaldi -# * Safari \ No newline at end of file +# However, don't spend hours rummaging through my code, just DM me if you need help :D +# Discord: wtquin \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 874f664..024adb5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ -protobuf -bs4 -xmltodict -browser_cookie3 -requests -pycryptodomex -fake-useragent \ No newline at end of file +beautifulsoup4==4.13.4 +fake_useragent==2.2.0 +protobuf==6.30.2 +pycryptodomex==3.22.0 +Requests==2.32.3 +rookiepy==0.5.6 +unidecode==1.3.8 +xmltodict==0.14.2 \ No newline at end of file