# fixed and improved!

* fixed cookies for chromium browsers by switching to rookie
* fixed productId which moved around since I coded this script up
* added support for movie-type content e.g. documentaries, which dont have seasons or episodes and are just a single video
* general code cleanup and probably some minor other things
This commit is contained in:
quinten 2025-04-17 02:18:04 +02:00
parent 5ff9d7f5cf
commit 8e46e9159b
3 changed files with 261 additions and 231 deletions

View File

@ -10,8 +10,8 @@ The python package `browser_cookie3` doesn't seem to be functional out of the bo
# Setup # Setup
* `git clone https://gitea.quinten0508.com/quinten/npo` * `git clone https://gitea.quinten0508.com/quinten/npo`
* `cd npo` * `cd npo`
* Download [N_m3u8DL-RE](https://github.com/nilaoda/N_m3u8DL-RE) and [mp4decrypt](https://www.bento4.com/downloads/) and put `N_m3u8DL-RE.exe` and `mp4decrypt.exe` in the root project folder
* Download [`/cdm/wks.py`](https://github.com/SASUKE-DUCK/pywks/blob/main/cdm/wks.py) and put it in an empty `/cdm` folder within the `npo` folder * Download [`/cdm/wks.py`](https://github.com/SASUKE-DUCK/pywks/blob/main/cdm/wks.py) and put it in an empty `/cdm` folder within the `npo` folder
* Download [N_m3u8DL-RE](https://github.com/nilaoda/N_m3u8DL-RE) and [mp4decrypt](https://www.bento4.com/downloads/)
* Add your own extracted android keys in `cdm/devices/android_generic/` (you can use [KeyDive](https://cdm-project.com/Android-Tools/KeyDive) or [possibly this guide](https://forum.videohelp.com/threads/408031-Dumping-Your-own-L3-CDM-with-Android-Studio) to extract them): * Add your own extracted android keys in `cdm/devices/android_generic/` (you can use [KeyDive](https://cdm-project.com/Android-Tools/KeyDive) or [possibly this guide](https://forum.videohelp.com/threads/408031-Dumping-Your-own-L3-CDM-with-Android-Studio) to extract them):
* `device_client_id_blob` * `device_client_id_blob`
* `device_private_key` * `device_private_key`

View File

@ -4,15 +4,17 @@
# * pip install -r requirements.txt # * pip install -r requirements.txt
import argparse from datetime import datetime # unix timestamps from content published dates
import requests import sys # proper process exiting if you messed up!
import subprocess import argparse # your -url and -file options
import os import requests # sending web requests
from bs4 import BeautifulSoup import subprocess # multiprocessing
import json import os # file operations
import re # regex for filename sanitizing so it'll actually save (thanks "Wie is de Mol? België 2025" - question marks are not allowed)
from unidecode import unidecode # see above
import platform # check for windows OS import platform # check for windows OS
import shutil # check for ffmpeg in PATH import shutil # check for ffmpeg in PATH
import browser_cookie3 # cookies for premium accs import rookiepy # replaced browser_cookie3 with rookiepy
from fake_useragent import UserAgent # sets useragent from fake_useragent import UserAgent # sets useragent
import concurrent.futures # concurrent downloads when using a -file import concurrent.futures # concurrent downloads when using a -file
from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor
@ -20,16 +22,16 @@ from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtract
# dont need any of these headers but makes it look like normal clients at least # dont need any of these headers but makes it look like normal clients at least
# for extra "normal behavior": save the UA chosen here in some temp file so we can use the same one every time this utility is run # for extra "normal behavior": save the UA chosen here in some temp file so we can use the same one every time this utility is run
headers = { headers = {
'User-Agent': UserAgent(platforms='pc', min_version=122.0).random, 'User-Agent': UserAgent(platforms='pc', min_version=122.0).random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5', 'Accept-Language': 'en-US,en;q=0.5',
'Cache-Control': 'no-cache', 'Cache-Control': 'no-cache',
} }
if platform.system() == "Windows": if platform.system() == "Windows":
windows_flag = True windows_flag = True
else: else:
windows_flag = False windows_flag = False
parser = argparse.ArgumentParser(description='PYWKS-NPO') parser = argparse.ArgumentParser(description='PYWKS-NPO')
@ -39,248 +41,280 @@ args = parser.parse_args()
def parse_url_file(file_path): def parse_url_file(file_path):
with open(file_path, 'r') as file: with open(file_path, 'r') as file:
urls = [line.strip() for line in file] urls = [line.strip() for line in file]
return urls return urls
if args.file and args.url: if args.file and args.url:
print("ERR: Please specify just one argument.") print("ERR: Please specify just one argument.")
print("-url: input NPO video URL") print("-url: input NPO video URL")
print("-file: input a file with NPO video URLS, one per line") print("-file: input a file with NPO video URLS, one per line")
exit() exit()
elif args.file: elif args.file:
urls = parse_url_file(args.file) urls = parse_url_file(args.file)
elif args.url: elif args.url:
urls = [args.url] urls = [args.url]
else: else:
print("ERR: Please input your URL(s).") print("ERR: Please input your URL(s).")
print("-url: input NPO video URL") print("-url: input NPO video URL")
print("-file: input a file with NPO video URLS, one per line") print("-file: input a file with NPO video URLS, one per line")
exit() exit()
def find_cookies(): def find_cookies():
print("NPO Plus subscribers are able to download in 1080p instead of 540p.") print("NPO Plus subscribers are able to download in 1080p instead of 540p.")
print("Are you an NPO Plus subscriber and logged in on your browser? (y/N)") print("Are you an NPO Plus subscriber and logged in on your browser? (y/N)")
userinput = input().lower() userinput = input().lower()
print("\033[F\033[K\033[F\033[K\033[F\033[K") print("\033[F\033[K\033[F\033[K\033[F\033[K")
if not userinput or userinput.lower() != 'y': if not userinput or userinput.lower() != 'y':
return return
# browser_cookie3.load() should use ALL browsers' cookies. If this doesn't work, replace browser_cookie3.load with browser_cookie3.<browser>. # Now using rookie instead of browser_cookie3, which supports a TON of browsers and works with chromium again.
# See notes at the end of this script for possible options. Example: browser_cookie3.chrome or browser_cookie3.librewolf. # check here for compatibility https://github.com/thewh1teagle/rookie?tab=readme-ov-file#contribute-
cookies = browser_cookie3.load(domain_name='npo.nl') cookies = rookiepy.load(["npo.nl"])
return cookies cookies = rookiepy.to_cookiejar(cookies)
return cookies
def find_targetId(url): def find_content_type(url):
# Get full HTML and extract productId and episode number content_type = url.split("/")[4] # 'video' or 'serie'
# "future proof" :) return content_type
response_targetId = requests.get(url)
content = response_targetId.content
try: def find_content_info(url, content_type):
url_split = url.split("/")
target_slug = url_split[7]
except:
print("URL invalid.")
print("URL format: https://npo.nl/start/serie/wie-is-de-mol/seizoen-24/wie-is-de-mol_56/afspelen")
print(f"Your URL: {url}")
exit()
soup = BeautifulSoup(content, 'html.parser') if content_type == 'serie':
script_tag = soup.find('script', {'id': '__NEXT_DATA__'}) # url safetycheck - no way for me to grab the "latest" video from a series without reverse engineering about a megabyte of minified js aafaik :(
if len(url.split("/")) < 8:
print("\n\nERROR: URL invalid!\n" \
"You are currently on the homepage of whatever series it is you want to download, not on the episode-specific page.\n" \
"Please click on the episode you want to download so your url becomes something like ../serie/<serie>/seizoen*/episode/...\n" \
f"Your current url is: {url}")
sys.exit(1)
if script_tag: # grab "slug" from url - not my word this is what they call it
script_content = script_tag.contents[0] # with the found slug we can grab the productid which we need to make our second request
else: params = {
print("Script tag not found.") 'slug': url.split("/")[7]
}
response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params)
data = response.json()
content_info = {
'seasonnumber': data.get('season', {}).get('seasonKey'),
'episodetitle': data.get("title"),
'episodenumber': data.get("programKey"),
}
# some shows have this set to `None`, do better NPO!
published_ts = data.get('publishedDateTime')
if published_ts is not None:
content_info['episodedate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d")
def search(data, target_slug): elif content_type == 'video':
if isinstance(data, list): params = {
for item in data: 'slug': url.split("/")[5]
result = search(item, target_slug) }
if result:
return result
elif isinstance(data, dict):
for key, value in data.items():
if key == "slug" and value == target_slug:
return data.get("productId"), data.get("programKey")
else:
result = search(value, target_slug)
if result:
return result
return None
data_dict = json.loads(script_content) response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params)
target_product_id = search(data_dict, target_slug) data = response.json()
return target_product_id
content_info = {
'videotitle': data.get("title"),
}
# some videos have this set to `None`, do better NPO!
published_ts = data.get('publishedDateTime')
if published_ts is not None:
content_info['videodate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d")
def find_CSRF(targetId, plus_cookie): productid = data.get("productId")
response_CSRF = requests.get('https://npo.nl/start/api/auth/session', headers=headers, cookies=plus_cookie) return productid, content_info
response_cookies = response_CSRF.cookies.get_dict()
json_productId = {
'productId': targetId,
}
url = f'https://npo.nl/start/api/domain/player-token'
response_token = requests.get(url, cookies=response_cookies, headers=headers, params=json_productId)
token = response_token.json()["jwt"]
return token
def find_MPD(token, url, plus_cookie): def find_token(productid, plus_cookie):
headers['Authorization'] = token params = {
'productId': productid,
}
json_auth = { response = requests.get('https://npo.nl/start/api/domain/player-token', params=params, cookies=plus_cookie)
'profileName': 'dash', token = response.json().get('jwt')
'drmType': 'widevine', return token
'referrerUrl': url,
}
response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_auth, cookies=plus_cookie)
response_data = response.json()
stream_data = response_data.get('stream', {})
if stream_data.get('streamURL'):
return stream_data
else:
print("NO MPD URL - BAD TOKEN") def find_MPD(token, url):
print(response_data) headers = {
exit() 'Authorization': token
}
json_data = {
'profileName': 'dash',
'drmType': 'widevine',
'referrerUrl': url
}
response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_data)
response_data = response.json()
stream_data = response_data.get('stream', {})
if stream_data.get('streamURL'):
return stream_data
else:
print("NO MPD URL - BAD TOKEN")
print(response_data)
print(stream_data.get('streamURL'))
def find_PSSH(mpd): def find_PSSH(mpd):
mpd_url = mpd.get('streamURL') mpd_url = mpd.get('streamURL')
response = requests.get(mpd_url, headers=headers) response = requests.get(mpd_url, headers=headers)
pssh_extractor = PsshExtractor(response.text) pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh() pssh_value = pssh_extractor.extract_pssh()
return pssh_value, mpd_url return pssh_value, mpd_url
def find_key(mpd, pssh): def find_key(mpd, pssh):
headers_license = { headers_license = {
'x-custom-data': mpd.get('drmToken'), 'x-custom-data': mpd.get('drmToken'),
'origin': 'https://start-player.npo.nl', 'origin': 'https://start-player.npo.nl',
'referer': 'https://start-player.npo.nl/', 'referer': 'https://start-player.npo.nl/',
} }
cert_b64 = None cert_b64 = None
key_extractor = KeyExtractor(pssh, cert_b64, "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication", headers_license) key_extractor = KeyExtractor(pssh, cert_b64, "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication", headers_license)
keys = key_extractor.get_keys() keys = key_extractor.get_keys()
wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64=cert_b64, device=device_android_generic) wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64=cert_b64, device=device_android_generic)
raw_challenge = wvdecrypt.get_challenge() raw_challenge = wvdecrypt.get_challenge()
data = raw_challenge data = raw_challenge
for key in keys: for key in keys:
if isinstance(key, list): if isinstance(key, list):
if key: if key:
for key_str in key: for key_str in key:
return key_str return key_str
def check_prereq(): def check_prereq():
if windows_flag == True: if windows_flag == True:
prereq_filelist = ['mp4decrypt.exe', 'N_m3u8DL-RE.exe'] prereq_filelist = ['mp4decrypt.exe', 'N_m3u8DL-RE.exe']
else: else:
prereq_filelist = ['mp4decrypt', 'N_m3u8DL-RE'] prereq_filelist = ['mp4decrypt', 'N_m3u8DL-RE']
for file in prereq_filelist: for file in prereq_filelist:
if not os.path.isfile(file): if not os.path.isfile(file):
print(f"ERR: {file} not found!") print(f"ERR: {file} not found!")
print("Please check your directory and try again.") print("Please check your directory and try again.")
exit() sys.exit(1)
if shutil.which("ffmpeg") is None: if shutil.which("ffmpeg") is None:
print("ffmpeg not found in PATH.") print("ffmpeg not found in PATH.")
exit() sys.exit(1)
def create_filename(url, programKey): def create_filename(url, content_info, content_type):
# season title if content_type == 'serie':
# 1 2 3 4 5 6 7 8 (optional) # grab slug from url
# create filename based on input URL: https://npo.nl/start/serie /wie-is-de-mol /seizoen-24 /wie-is-de-mol_56 /afspelen url_split = url.split("/")
# https://npo.nl/start/serie /de-avondshow-met-arjen-lubach /seizoen-8_1 /de-avondshow-met-arjen-lubach_93 /afspelen seasontitle = url_split[5].split("_")[0]
# https://npo.nl/start/serie /taarten-van-abel /seizoen-17 /joto /afspelen
url_split = url.split("/")
title = url_split[7].split("_")[0]
season = url_split[6].split("_")[0]
filename_enc = title + "_" + season + "_ep-" + programKey + "_encrypted"
filename = filename_enc.replace("_encrypted", "")
return filename_enc, filename
def download(mpd_url, filename_enc, productId, filename): filename = f"{seasontitle}_S{content_info['seasonnumber']}E{content_info['episodenumber']}_{content_info['episodetitle']}"
if 'episodedate' in content_info:
filename += f"_{content_info['episodedate']}"
elif content_type == 'video':
filename = f"{content_info['videotitle']}"
if 'videodate' in content_info:
filename += f"_{content_info['videodate']}"
# unidecode converts unicode to ascii (e.g. removes accents on characters)
# "takes a string object, possibly containing non-ASCII characters, and returns a string that can be safely encoded to ASCII"
filename = unidecode(filename).replace(' ', '_')
# remove everything not a-z, A-Z, 0-9, -, _
filename = re.sub(r'[^a-zA-Z0-9\-_]', '', filename)
filename_enc = f"{filename}_encrypted"
return filename_enc, filename
def download(mpd_url, filename_enc, productid, filename):
# output: filename.m4a (audio), filename.mp4 (video), filename.vtt (subtitles) # output: filename.m4a (audio), filename.mp4 (video), filename.vtt (subtitles)
subtitle_url = f'https://cdn.npoplayer.nl/subtitles/nl/{productId}.vtt' subtitle_url = f'https://cdn.npoplayer.nl/subtitles/nl/{productid}.vtt'
response = requests.get(subtitle_url) response = requests.get(subtitle_url)
with open(f"{filename}.vtt", 'wb') as subtitle_file: with open(f"{filename}.vtt", 'wb') as subtitle_file:
subtitle_file.write(response.content) subtitle_file.write(response.content)
if windows_flag == True: if windows_flag == True:
subprocess.run(['N_m3u8DL-RE.exe', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) subprocess.run(['N_m3u8DL-RE.exe', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL)
else: else:
subprocess.run(['N_m3u8DL-RE', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL) subprocess.run(['N_m3u8DL-RE', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL)
def decrypt(key, filename_enc, filename): def decrypt(key, filename_enc, filename):
if windows_flag == True: if windows_flag == True:
subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL)
subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL)
else: else:
subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL) subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL)
subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL) subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL)
def merge(filename): def merge(filename):
ffmpeg_command = [ ffmpeg_command = [
'ffmpeg', '-v', 'quiet', # '-v stats', 'ffmpeg', '-v', 'quiet', # '-v stats',
'-i', filename + "_video.mp4", '-i', filename + "_video.mp4",
'-i', filename + "_audio.m4a", '-i', filename + "_audio.m4a",
'-i', filename + ".vtt", # Subtitle file '-i', filename + ".vtt", # Subtitle file (seems to be present on NPO's side even if it's empty / the content has no subs)
'-c:v', 'copy', # Copy video codec '-c:v', 'copy', # Copy video codec
'-c:a', 'copy', # Copy audio codec '-c:a', 'copy', # Copy audio codec
'-c:s', 'mov_text', # Subtitle codec for MP4 '-c:s', 'mov_text', # Subtitle codec for MP4
'-map', '0:v:0', # Map video stream '-map', '0:v:0', # Map video stream
'-map', '1:a:0', # Map audio stream '-map', '1:a:0', # Map audio stream
'-map', '2:s:0', # Map subtitle stream '-map', '2:s:0', # Map subtitle stream
'-strict', 'experimental', '-strict', 'experimental',
filename + ".mp4" filename + ".mp4"
] ]
subprocess.run(ffmpeg_command) subprocess.run(ffmpeg_command)
def clean(filename_enc, filename): def clean(filename_enc, filename):
os.remove(filename_enc + ".mp4") os.remove(filename_enc + ".mp4")
os.remove(filename_enc + ".m4a") os.remove(filename_enc + ".m4a")
os.remove(filename + "_audio.m4a") os.remove(filename + "_audio.m4a")
os.remove(filename + "_video.mp4") os.remove(filename + "_video.mp4")
os.remove(filename + ".vtt") os.remove(filename + ".vtt")
def check_file(filename): def check_file(filename):
if not os.path.exists(filename + ".mp4"): if not os.path.exists(filename + ".mp4"):
print("File not found. Continue anyway? (y/N)") print("File not found. Continue anyway? (y/N)")
userinput = input().lower() userinput = input().lower()
if not userinput or userinput != 'y': if not userinput or userinput != 'y':
exit() sys.exit(1)
def execute(url, plus_cookie, process_no): def execute(url, plus_cookie, process_no):
productId, programKey = find_targetId(url)
token = find_CSRF(productId,plus_cookie) content_type = find_content_type(url)
mpd = find_MPD(token, url, plus_cookie) productid, content_info = find_content_info(url, content_type)
pssh, mpd_url = find_PSSH(mpd) token = find_token(productid, plus_cookie)
key = find_key(mpd, pssh) mpd = find_MPD(token, url)
check_prereq() pssh, mpd_url = find_PSSH(mpd)
filename_enc, filename = create_filename(url, programKey) key = find_key(mpd, pssh)
download(mpd_url, filename_enc, productId, filename) check_prereq()
decrypt(key, filename_enc, filename)
merge(filename)
clean(filename_enc, filename)
check_file(filename) filename_enc, filename = create_filename(url, content_info, content_type)
return process_no # keeps track of process index to return x/y videos completed message download(mpd_url, filename_enc, productid, filename)
decrypt(key, filename_enc, filename)
merge(filename)
clean(filename_enc, filename)
check_file(filename)
return process_no # keeps track of process index to return x/y videos completed message
@ -288,33 +322,28 @@ plus_cookie = find_cookies()
max_workers = min(os.cpu_count(), len(urls)) max_workers = min(os.cpu_count(), len(urls))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(execute, url, plus_cookie, i + 1) for i, url in enumerate(urls)] futures = [executor.submit(execute, url, plus_cookie, i + 1) for i, url in enumerate(urls)]
completed_videos = 0
print(f"0/{len(urls)} videos completed")
for future in concurrent.futures.as_completed(futures):
result = future.result()
completed_videos += 1
print("\033[F\033[K\033[F\033[K")
print(f"{completed_videos}/{len(urls)} video{'s'[:len(urls) != 1]} completed")
completed_videos = 0
print(f"0/{len(urls)} videos completed")
for future in concurrent.futures.as_completed(futures):
result = future.result()
completed_videos += 1
print("\033[F\033[K\033[F\033[K")
print(f"{completed_videos}/{len(urls)} video{'s'[:len(urls) != 1]} completed")
######### #########
# NOTES # # NOTES #
######### #########
# The downloader *should* work across every platform, linux/mac/win. # The downloader *should* work across every platform, linux/mac/win.
# It has not been extensively tested on anything but windows. DM me if you need help :D # It has not been tested on anything but windows though.
# Discord: quinten._. (That includes the ._.) # I've tried my best to comment the code, but I understand if it's messy and overwhelming.
# Most of the lines are either:
# a) getting relevant cookies/keys/urls by mimicking what your browser would do: getting an ID, using that to get a key, using that to get a URl, etc
# b) pre- and post processing: creating nice filenames, extracting info for those filenames, downloading, decrypting, merging files, etc
# Supported browsers for NPO Plus cookies: # However, don't spend hours rummaging through my code, just DM me if you need help :D
# (https://github.com/borisbabic/browser_cookie3#testing-dates--ddmmyy) # Discord: wtquin
# * Chrome
# * Firefox
# * LibreWolf
# * Opera
# * Opera GX
# * Edge
# * Chromium
# * Brave
# * Vivaldi
# * Safari

View File

@ -1,7 +1,8 @@
protobuf beautifulsoup4==4.13.4
bs4 fake_useragent==2.2.0
xmltodict protobuf==6.30.2
browser_cookie3 pycryptodomex==3.22.0
requests Requests==2.32.3
pycryptodomex rookiepy==0.5.6
fake-useragent unidecode==1.3.8
xmltodict==0.14.2