# Pre-requisites: # * N_m3u8DL-RE and mp4decrypt in current directory # * ffmpeg in PATH # * pip install -r requirements.txt from datetime import datetime # unix timestamps from content published dates import sys # proper process exiting if you messed up! import argparse # your -url and -file options import requests # sending web requests import subprocess # multiprocessing import os # file operations import re # regex for filename sanitizing so it'll actually save (thanks "Wie is de Mol? Belgiƫ 2025" - question marks are not allowed) from unidecode import unidecode # see above import platform # check for windows OS import shutil # check for ffmpeg in PATH import rookiepy # replaced browser_cookie3 with rookiepy from fake_useragent import UserAgent # sets useragent import concurrent.futures # concurrent downloads when using a -file from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor # dont need any of these headers but makes it look like normal clients at least # for extra "normal behavior": save the UA chosen here in some temp file so we can use the same one every time this utility is run headers = { 'User-Agent': UserAgent(platforms='pc', min_version=122.0).random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Cache-Control': 'no-cache', } if platform.system() == "Windows": windows_flag = True else: windows_flag = False parser = argparse.ArgumentParser(description='PYWKS-NPO') parser.add_argument('-url', dest='url', required=False, help='NPO Video URL') parser.add_argument('-file', dest='file', required=False, help='File with NPO Video URLs, one per line') args = parser.parse_args() def parse_url_file(file_path): with open(file_path, 'r') as file: urls = [line.strip() for line in file] return urls if args.file and args.url: print("ERR: Please specify just one argument.") print("-url: input NPO video URL") print("-file: input a file with NPO video URLS, one per line") exit() elif args.file: urls = parse_url_file(args.file) elif args.url: urls = [args.url] else: print("ERR: Please input your URL(s).") print("-url: input NPO video URL") print("-file: input a file with NPO video URLS, one per line") exit() def find_cookies(): print("NPO Plus subscribers are able to download in 1080p instead of 540p.") print("Are you an NPO Plus subscriber and logged in on your browser? (y/N)") userinput = input().lower() print("\033[F\033[K\033[F\033[K\033[F\033[K") if not userinput or userinput.lower() != 'y': return # Now using rookie instead of browser_cookie3, which supports a TON of browsers and works with chromium again. # check here for compatibility https://github.com/thewh1teagle/rookie?tab=readme-ov-file#contribute- cookies = rookiepy.load(["npo.nl"]) cookies = rookiepy.to_cookiejar(cookies) return cookies def find_content_type(url): content_type = url.split("/")[4] # 'video' or 'serie' return content_type def find_content_info(url, content_type): if content_type == 'serie': # url safetycheck - no way for me to grab the "latest" video from a series without reverse engineering about a megabyte of minified js aafaik :( if len(url.split("/")) < 8: print("\n\nERROR: URL invalid!\n" \ "You are currently on the homepage of whatever series it is you want to download, not on the episode-specific page.\n" \ "Please click on the episode you want to download so your url becomes something like ../serie//seizoen*/episode/...\n" \ f"Your current url is: {url}") sys.exit(1) # grab "slug" from url - not my word this is what they call it # with the found slug we can grab the productid which we need to make our second request params = { 'slug': url.split("/")[7] } response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params) data = response.json() content_info = { 'seasonnumber': data.get('season', {}).get('seasonKey'), 'episodetitle': data.get("title"), 'episodenumber': data.get("programKey"), } # some shows have this set to `None`, do better NPO! published_ts = data.get('publishedDateTime') if published_ts is not None: content_info['episodedate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d") elif content_type == 'video': params = { 'slug': url.split("/")[5] } response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params) data = response.json() content_info = { 'videotitle': data.get("title"), } # some videos have this set to `None`, do better NPO! published_ts = data.get('publishedDateTime') if published_ts is not None: content_info['videodate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d") productid = data.get("productId") return productid, content_info def find_token(productid, plus_cookie): params = { 'productId': productid, } response = requests.get('https://npo.nl/start/api/domain/player-token', params=params, cookies=plus_cookie) token = response.json().get('jwt') return token def find_MPD(token, url): headers = { 'Authorization': token } json_data = { 'profileName': 'dash', 'drmType': 'widevine', 'referrerUrl': url } response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_data) response_data = response.json() stream_data = response_data.get('stream', {}) if stream_data.get('streamURL'): return stream_data else: print("NO MPD URL - BAD TOKEN") print(response_data) print(stream_data.get('streamURL')) def find_PSSH(mpd): mpd_url = mpd.get('streamURL') response = requests.get(mpd_url, headers=headers) pssh_extractor = PsshExtractor(response.text) pssh_value = pssh_extractor.extract_pssh() return pssh_value, mpd_url def find_key(mpd, pssh): headers_license = { 'x-custom-data': mpd.get('drmToken'), 'origin': 'https://start-player.npo.nl', 'referer': 'https://start-player.npo.nl/', } cert_b64 = None key_extractor = KeyExtractor(pssh, cert_b64, "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication", headers_license) keys = key_extractor.get_keys() wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64=cert_b64, device=device_android_generic) raw_challenge = wvdecrypt.get_challenge() data = raw_challenge for key in keys: if isinstance(key, list): if key: for key_str in key: return key_str def check_prereq(): if windows_flag == True: prereq_filelist = ['mp4decrypt.exe', 'N_m3u8DL-RE.exe'] else: prereq_filelist = ['mp4decrypt', 'N_m3u8DL-RE'] for file in prereq_filelist: if not os.path.isfile(file): print(f"ERR: {file} not found!") print("Please check your directory and try again.") sys.exit(1) if shutil.which("ffmpeg") is None: print("ffmpeg not found in PATH.") sys.exit(1) def create_filename(url, content_info, content_type): if content_type == 'serie': # grab slug from url url_split = url.split("/") seasontitle = url_split[5].split("_")[0] filename = f"{seasontitle}_S{content_info['seasonnumber']}E{content_info['episodenumber']}_{content_info['episodetitle']}" if 'episodedate' in content_info: filename += f"_{content_info['episodedate']}" elif content_type == 'video': filename = f"{content_info['videotitle']}" if 'videodate' in content_info: filename += f"_{content_info['videodate']}" # unidecode converts unicode to ascii (e.g. removes accents on characters) # "takes a string object, possibly containing non-ASCII characters, and returns a string that can be safely encoded to ASCII" filename = unidecode(filename).replace(' ', '_') # remove everything not a-z, A-Z, 0-9, -, _ filename = re.sub(r'[^a-zA-Z0-9\-_]', '', filename) filename_enc = f"{filename}_encrypted" return filename_enc, filename def download(mpd_url, filename_enc, productid, filename): # output: filename.m4a (audio), filename.mp4 (video), filename.vtt (subtitles) subtitle_url = f'https://cdn.npoplayer.nl/subtitles/nl/{productid}.vtt' response = requests.get(subtitle_url) with open(f"{filename}.vtt", 'wb') as subtitle_file: subtitle_file.write(response.content) if windows_flag == True: subprocess.run(['N_m3u8DL-RE.exe', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) else: subprocess.run(['N_m3u8DL-RE', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) def decrypt(key, filename_enc, filename): if windows_flag == True: subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) else: subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) def merge(filename): ffmpeg_command = [ 'ffmpeg', '-v', 'quiet', # '-v stats', '-i', filename + "_video.mp4", '-i', filename + "_audio.m4a", '-i', filename + ".vtt", # Subtitle file (seems to be present on NPO's side even if it's empty / the content has no subs) '-c:v', 'copy', # Copy video codec '-c:a', 'copy', # Copy audio codec '-c:s', 'mov_text', # Subtitle codec for MP4 '-map', '0:v:0', # Map video stream '-map', '1:a:0', # Map audio stream '-map', '2:s:0', # Map subtitle stream '-strict', 'experimental', filename + ".mp4" ] subprocess.run(ffmpeg_command) def clean(filename_enc, filename): os.remove(filename_enc + ".mp4") os.remove(filename_enc + ".m4a") os.remove(filename + "_audio.m4a") os.remove(filename + "_video.mp4") os.remove(filename + ".vtt") def check_file(filename): if not os.path.exists(filename + ".mp4"): print("File not found. Continue anyway? (y/N)") userinput = input().lower() if not userinput or userinput != 'y': sys.exit(1) def execute(url, plus_cookie, process_no): content_type = find_content_type(url) productid, content_info = find_content_info(url, content_type) token = find_token(productid, plus_cookie) mpd = find_MPD(token, url) pssh, mpd_url = find_PSSH(mpd) key = find_key(mpd, pssh) check_prereq() filename_enc, filename = create_filename(url, content_info, content_type) download(mpd_url, filename_enc, productid, filename) decrypt(key, filename_enc, filename) merge(filename) clean(filename_enc, filename) check_file(filename) return process_no # keeps track of process index to return x/y videos completed message plus_cookie = find_cookies() max_workers = min(os.cpu_count(), len(urls)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(execute, url, plus_cookie, i + 1) for i, url in enumerate(urls)] completed_videos = 0 print(f"0/{len(urls)} videos completed") for future in concurrent.futures.as_completed(futures): result = future.result() completed_videos += 1 print("\033[F\033[K\033[F\033[K") print(f"{completed_videos}/{len(urls)} video{'s'[:len(urls) != 1]} completed") ######### # NOTES # ######### # The downloader *should* work across every platform, linux/mac/win. # It has not been tested on anything but windows though. # I've tried my best to comment the code, but I understand if it's messy and overwhelming. # Most of the lines are either: # a) getting relevant cookies/keys/urls by mimicking what your browser would do: getting an ID, using that to get a key, using that to get a URl, etc # b) pre- and post processing: creating nice filenames, extracting info for those filenames, downloading, decrypting, merging files, etc # However, don't spend hours rummaging through my code, just DM me if you need help :D # Discord: wtquin