npo/npo all-in-one.py
quinten 8e46e9159b # fixed and improved!
* fixed cookies for chromium browsers by switching to rookie
* fixed productId which moved around since I coded this script up
* added support for movie-type content e.g. documentaries, which dont have seasons or episodes and are just a single video
* general code cleanup and probably some minor other things
2025-04-17 02:18:04 +02:00

349 lines
12 KiB
Python

# Pre-requisites:
# * N_m3u8DL-RE and mp4decrypt in current directory
# * ffmpeg in PATH
# * pip install -r requirements.txt
from datetime import datetime # unix timestamps from content published dates
import sys # proper process exiting if you messed up!
import argparse # your -url and -file options
import requests # sending web requests
import subprocess # multiprocessing
import os # file operations
import re # regex for filename sanitizing so it'll actually save (thanks "Wie is de Mol? België 2025" - question marks are not allowed)
from unidecode import unidecode # see above
import platform # check for windows OS
import shutil # check for ffmpeg in PATH
import rookiepy # replaced browser_cookie3 with rookiepy
from fake_useragent import UserAgent # sets useragent
import concurrent.futures # concurrent downloads when using a -file
from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor
# dont need any of these headers but makes it look like normal clients at least
# for extra "normal behavior": save the UA chosen here in some temp file so we can use the same one every time this utility is run
headers = {
'User-Agent': UserAgent(platforms='pc', min_version=122.0).random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Cache-Control': 'no-cache',
}
if platform.system() == "Windows":
windows_flag = True
else:
windows_flag = False
parser = argparse.ArgumentParser(description='PYWKS-NPO')
parser.add_argument('-url', dest='url', required=False, help='NPO Video URL')
parser.add_argument('-file', dest='file', required=False, help='File with NPO Video URLs, one per line')
args = parser.parse_args()
def parse_url_file(file_path):
with open(file_path, 'r') as file:
urls = [line.strip() for line in file]
return urls
if args.file and args.url:
print("ERR: Please specify just one argument.")
print("-url: input NPO video URL")
print("-file: input a file with NPO video URLS, one per line")
exit()
elif args.file:
urls = parse_url_file(args.file)
elif args.url:
urls = [args.url]
else:
print("ERR: Please input your URL(s).")
print("-url: input NPO video URL")
print("-file: input a file with NPO video URLS, one per line")
exit()
def find_cookies():
print("NPO Plus subscribers are able to download in 1080p instead of 540p.")
print("Are you an NPO Plus subscriber and logged in on your browser? (y/N)")
userinput = input().lower()
print("\033[F\033[K\033[F\033[K\033[F\033[K")
if not userinput or userinput.lower() != 'y':
return
# Now using rookie instead of browser_cookie3, which supports a TON of browsers and works with chromium again.
# check here for compatibility https://github.com/thewh1teagle/rookie?tab=readme-ov-file#contribute-
cookies = rookiepy.load(["npo.nl"])
cookies = rookiepy.to_cookiejar(cookies)
return cookies
def find_content_type(url):
content_type = url.split("/")[4] # 'video' or 'serie'
return content_type
def find_content_info(url, content_type):
if content_type == 'serie':
# url safetycheck - no way for me to grab the "latest" video from a series without reverse engineering about a megabyte of minified js aafaik :(
if len(url.split("/")) < 8:
print("\n\nERROR: URL invalid!\n" \
"You are currently on the homepage of whatever series it is you want to download, not on the episode-specific page.\n" \
"Please click on the episode you want to download so your url becomes something like ../serie/<serie>/seizoen*/episode/...\n" \
f"Your current url is: {url}")
sys.exit(1)
# grab "slug" from url - not my word this is what they call it
# with the found slug we can grab the productid which we need to make our second request
params = {
'slug': url.split("/")[7]
}
response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params)
data = response.json()
content_info = {
'seasonnumber': data.get('season', {}).get('seasonKey'),
'episodetitle': data.get("title"),
'episodenumber': data.get("programKey"),
}
# some shows have this set to `None`, do better NPO!
published_ts = data.get('publishedDateTime')
if published_ts is not None:
content_info['episodedate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d")
elif content_type == 'video':
params = {
'slug': url.split("/")[5]
}
response = requests.get('https://npo.nl/start/api/domain/program-detail', params=params)
data = response.json()
content_info = {
'videotitle': data.get("title"),
}
# some videos have this set to `None`, do better NPO!
published_ts = data.get('publishedDateTime')
if published_ts is not None:
content_info['videodate'] = datetime.fromtimestamp(published_ts).strftime("%Y-%m-%d")
productid = data.get("productId")
return productid, content_info
def find_token(productid, plus_cookie):
params = {
'productId': productid,
}
response = requests.get('https://npo.nl/start/api/domain/player-token', params=params, cookies=plus_cookie)
token = response.json().get('jwt')
return token
def find_MPD(token, url):
headers = {
'Authorization': token
}
json_data = {
'profileName': 'dash',
'drmType': 'widevine',
'referrerUrl': url
}
response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_data)
response_data = response.json()
stream_data = response_data.get('stream', {})
if stream_data.get('streamURL'):
return stream_data
else:
print("NO MPD URL - BAD TOKEN")
print(response_data)
print(stream_data.get('streamURL'))
def find_PSSH(mpd):
mpd_url = mpd.get('streamURL')
response = requests.get(mpd_url, headers=headers)
pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh()
return pssh_value, mpd_url
def find_key(mpd, pssh):
headers_license = {
'x-custom-data': mpd.get('drmToken'),
'origin': 'https://start-player.npo.nl',
'referer': 'https://start-player.npo.nl/',
}
cert_b64 = None
key_extractor = KeyExtractor(pssh, cert_b64, "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication", headers_license)
keys = key_extractor.get_keys()
wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64=cert_b64, device=device_android_generic)
raw_challenge = wvdecrypt.get_challenge()
data = raw_challenge
for key in keys:
if isinstance(key, list):
if key:
for key_str in key:
return key_str
def check_prereq():
if windows_flag == True:
prereq_filelist = ['mp4decrypt.exe', 'N_m3u8DL-RE.exe']
else:
prereq_filelist = ['mp4decrypt', 'N_m3u8DL-RE']
for file in prereq_filelist:
if not os.path.isfile(file):
print(f"ERR: {file} not found!")
print("Please check your directory and try again.")
sys.exit(1)
if shutil.which("ffmpeg") is None:
print("ffmpeg not found in PATH.")
sys.exit(1)
def create_filename(url, content_info, content_type):
if content_type == 'serie':
# grab slug from url
url_split = url.split("/")
seasontitle = url_split[5].split("_")[0]
filename = f"{seasontitle}_S{content_info['seasonnumber']}E{content_info['episodenumber']}_{content_info['episodetitle']}"
if 'episodedate' in content_info:
filename += f"_{content_info['episodedate']}"
elif content_type == 'video':
filename = f"{content_info['videotitle']}"
if 'videodate' in content_info:
filename += f"_{content_info['videodate']}"
# unidecode converts unicode to ascii (e.g. removes accents on characters)
# "takes a string object, possibly containing non-ASCII characters, and returns a string that can be safely encoded to ASCII"
filename = unidecode(filename).replace(' ', '_')
# remove everything not a-z, A-Z, 0-9, -, _
filename = re.sub(r'[^a-zA-Z0-9\-_]', '', filename)
filename_enc = f"{filename}_encrypted"
return filename_enc, filename
def download(mpd_url, filename_enc, productid, filename):
# output: filename.m4a (audio), filename.mp4 (video), filename.vtt (subtitles)
subtitle_url = f'https://cdn.npoplayer.nl/subtitles/nl/{productid}.vtt'
response = requests.get(subtitle_url)
with open(f"{filename}.vtt", 'wb') as subtitle_file:
subtitle_file.write(response.content)
if windows_flag == True:
subprocess.run(['N_m3u8DL-RE.exe', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL)
else:
subprocess.run(['N_m3u8DL-RE', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL)
def decrypt(key, filename_enc, filename):
if windows_flag == True:
subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL)
subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL)
else:
subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL)
subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL)
def merge(filename):
ffmpeg_command = [
'ffmpeg', '-v', 'quiet', # '-v stats',
'-i', filename + "_video.mp4",
'-i', filename + "_audio.m4a",
'-i', filename + ".vtt", # Subtitle file (seems to be present on NPO's side even if it's empty / the content has no subs)
'-c:v', 'copy', # Copy video codec
'-c:a', 'copy', # Copy audio codec
'-c:s', 'mov_text', # Subtitle codec for MP4
'-map', '0:v:0', # Map video stream
'-map', '1:a:0', # Map audio stream
'-map', '2:s:0', # Map subtitle stream
'-strict', 'experimental',
filename + ".mp4"
]
subprocess.run(ffmpeg_command)
def clean(filename_enc, filename):
os.remove(filename_enc + ".mp4")
os.remove(filename_enc + ".m4a")
os.remove(filename + "_audio.m4a")
os.remove(filename + "_video.mp4")
os.remove(filename + ".vtt")
def check_file(filename):
if not os.path.exists(filename + ".mp4"):
print("File not found. Continue anyway? (y/N)")
userinput = input().lower()
if not userinput or userinput != 'y':
sys.exit(1)
def execute(url, plus_cookie, process_no):
content_type = find_content_type(url)
productid, content_info = find_content_info(url, content_type)
token = find_token(productid, plus_cookie)
mpd = find_MPD(token, url)
pssh, mpd_url = find_PSSH(mpd)
key = find_key(mpd, pssh)
check_prereq()
filename_enc, filename = create_filename(url, content_info, content_type)
download(mpd_url, filename_enc, productid, filename)
decrypt(key, filename_enc, filename)
merge(filename)
clean(filename_enc, filename)
check_file(filename)
return process_no # keeps track of process index to return x/y videos completed message
plus_cookie = find_cookies()
max_workers = min(os.cpu_count(), len(urls))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(execute, url, plus_cookie, i + 1) for i, url in enumerate(urls)]
completed_videos = 0
print(f"0/{len(urls)} videos completed")
for future in concurrent.futures.as_completed(futures):
result = future.result()
completed_videos += 1
print("\033[F\033[K\033[F\033[K")
print(f"{completed_videos}/{len(urls)} video{'s'[:len(urls) != 1]} completed")
#########
# NOTES #
#########
# The downloader *should* work across every platform, linux/mac/win.
# It has not been tested on anything but windows though.
# I've tried my best to comment the code, but I understand if it's messy and overwhelming.
# Most of the lines are either:
# a) getting relevant cookies/keys/urls by mimicking what your browser would do: getting an ID, using that to get a key, using that to get a URl, etc
# b) pre- and post processing: creating nice filenames, extracting info for those filenames, downloading, decrypting, merging files, etc
# However, don't spend hours rummaging through my code, just DM me if you need help :D
# Discord: wtquin