added parallelization, minor bugfixes, episode numbering in filename

This commit is contained in:
Quinten0508 2024-01-30 19:19:04 +01:00
commit 8d8eb2d473
18 changed files with 1988 additions and 0 deletions

19
.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
# Windows
Thumbs.db
desktop.ini
# OS X
.DS_Store
.Spotlight-V100
.Trashes
._*
# Generic
.vscode/
# CDM
**/android_generic/**
N_m3u8DL-RE*
mp4decrypt*
env/
avondshow.txt

45
README.md Normal file
View File

@ -0,0 +1,45 @@
# PyWKS
<p align="center">
<img src="https://cdn.discordapp.com/attachments/826590534151700550/1176511096803307520/image.png?ex=656f2257&is=655cad57&hm=47aca7f30777fa83ac68cc942efa9a85219a3d2fdbf2a307d989971a11987109&" alt="PyWKS Logo">
</p>
PyWKS is a Python script designed for obtaining keys to decrypt encrypted videos. This repository contains an improved version of WKS-KEYS.
> **⚠️ Warning:**
>
> I'm only uploading this for educational purposes.
## Setup
**1. Clone the repository:**
```bash
git clone https://github.com/SASUKE-DUCK/pywks
```
**2. Install dependencies:**
```bash
pip install -r https://github.com/SASUKE-DUCK/pywks/blob/main/requirements.txt
```
**3. Explore the example scripts:**
- `main.py`
- `main_dsnp.py`
For detailed instructions, run the scripts with the `-h` flag.
## Screenshots
### `main.py`
![main.py Screenshot](https://cdn.discordapp.com/attachments/826590534151700550/1168910480878870599/image.png?ex=65537bb7&is=654106b7&hm=2fb9262a79996ee463f8b64caf5495ba8b9bda3a5c57c295b3e30d655f58cd40&)
### `main_dsnp.py`
![main_dsnp.py Screenshot](https://cdn.discordapp.com/attachments/826590534151700550/1168910344941469767/image.png?ex=65537b97&is=65410697&hm=062bbcae4ead2976d94c812c83936bad99b40e1473090932639349bb2aee3e2d&)
## Support and Community
Join our Discord community for support, discussions, and updates!
[![Discord](https://img.shields.io/discord/your-discord-server-id?color=%237289DA&label=Join%20us%20on%20Discord&logo=discord&logoColor=white)](https://discord.gg/utEG7CsAm5)

Binary file not shown.

842
cdm/wks.py Normal file

File diff suppressed because one or more lines are too long

35
cdrm.py Normal file
View File

@ -0,0 +1,35 @@
import argparse
import requests
from cdm.wks import PsshExtractor, get_keys_license_cdrm_project, print_keys_cdrm_project
token = ""
def main():
parser = argparse.ArgumentParser(description="Decrypt Widevine content using MPD URL and License URL")
parser.add_argument("-mpd", required=True, help="URL of the MPD manifest")
parser.add_argument("-lic", required=True, help="URL of the license server")
args = parser.parse_args()
mpd_url = args.mpd
license_url = args.lic
headers_mpd = {
'origin': 'https://play.hbomax.com',
'referer': 'https://play.hbomax.com/',
}
response = requests.get(mpd_url, headers=headers_mpd)
pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh()
print("PSSH value:", pssh_value)
headers_license = {
'authorization': f'Bearer {token}',
}
response = get_keys_license_cdrm_project(license_url, headers_license, pssh_value)
print_keys_cdrm_project(response)
if __name__ == "__main__":
main()

56
cdrm_api.py Normal file
View File

@ -0,0 +1,56 @@
import requests
from cdm.wks import PsshExtractor, get_keys_cdrm_api
# HBOMAX Test
def parse_command_line_arguments():
"""Parse command line arguments."""
parser = __import__('argparse').ArgumentParser(description="Decrypt Widevine content using MPD URL and License URL")
parser.add_argument("-mpd", required=True, help="URL of the MPD manifest")
parser.add_argument("-lic", required=True, help="URL of the license server")
return parser.parse_args()
def get_mpd_response(mpd_url):
"""Get MPD manifest response."""
mpd_headers = {
'origin': 'https://play.hbomax.com',
'referer': 'https://play.hbomax.com/',
}
return requests.get(mpd_url, headers=mpd_headers)
def get_license_headers(token):
"""Get headers for the license server request."""
return {
'accept': "*/*", # no delet
'content-length': "316", # no delet
'Connection': 'keep-alive', # no delet
'authorization': f'Bearer {token}',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (Ktesttemp, like Gecko) Chrome/90.0.4430.85 Safari/537.36'
}
def main():
# Parse command line arguments
args = parse_command_line_arguments()
mpd_url, license_url = args.mpd, args.lic
# Get MPD response
mpd_response = get_mpd_response(mpd_url)
# Extract PSSH value from MPD response
pssh_extractor = PsshExtractor(mpd_response.text)
pssh_value = pssh_extractor.extract_pssh()
print("PSSH value:", pssh_value)
# Get headers for the license server request
token = ""
# Update with your actual token
license_headers = get_license_headers(token)
# Call the function in keys.py to get the keys
keys = get_keys_cdrm_api(license_headers, license_url, pssh_value)
# Process each key
for key in keys:
print(f'KEY: {key}')
if __name__ == "__main__":
main()

35
cdrm_cache.py Normal file
View File

@ -0,0 +1,35 @@
import argparse
import requests
from cdm.wks import PsshExtractor, get_keys_cache_cdrm_project
def extract_pssh_value(mpd_url):
headers_mpd = {
'origin': 'https://play.hbomax.com',
'referer': 'https://play.hbomax.com/',
}
response = requests.get(mpd_url, headers=headers_mpd)
if response.status_code == 200:
pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh()
return pssh_value
else:
raise ValueError(f"Error: Unable to fetch MPD manifest, Status Code: {response.status_code}")
def main():
parser = argparse.ArgumentParser(description="Decrypt Widevine content using MPD URL and License URL")
parser.add_argument("-mpd", required=True, help="URL of the MPD manifest")
args = parser.parse_args()
mpd_url = args.mpd
try:
pssh_value = extract_pssh_value(mpd_url)
print("PSSH value:", pssh_value)
get_keys_cache_cdrm_project(pssh_value)
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()

92
extractwvd.py Normal file
View File

@ -0,0 +1,92 @@
import argparse
import json
from enum import Enum
from pathlib import Path
from construct import BitStruct, Bytes, Const
from construct import Enum as CEnum
from construct import Flag, If, Int8ub, Int16ub, Optional, Padded, Padding, Struct, this
from Cryptodome.PublicKey import RSA
from cdm.wks import ClientIdentification
class DeviceTypes(Enum):
CHROME = 1
ANDROID = 2
WidevineDeviceStruct = Struct(
'signature' / Const(b'WVD'),
'version' / Int8ub,
'type' / CEnum(
Int8ub,
**{t.name: t.value for t in DeviceTypes}
),
'security_level' / Int8ub,
'flags' / Padded(1, Optional(BitStruct(
Padding(7),
'send_key_control_nonce' / Flag
))),
'private_key_len' / Int16ub,
'private_key' / Bytes(this.private_key_len),
'client_id_len' / Int16ub,
'client_id' / Bytes(this.client_id_len),
'vmp_len' / Optional(Int16ub),
'vmp' / If(this.vmp_len, Optional(Bytes(this.vmp_len)))
)
WidevineDeviceStructVersion = 1
def parse_args():
parser = argparse.ArgumentParser(description='Widevine Device Information Parser')
parser.add_argument('file', type=Path, help='Path to WVD file')
return parser.parse_args()
def write_key_and_blob_files(out_dir, device):
private_key_file = out_dir / 'device_private_key'
print(f'\n[INFO] Writing private key to: {private_key_file}')
private_key = RSA.import_key(device.private_key)
private_key_file.write_text(private_key.export_key('PEM').decode())
client_id_blob_file = out_dir / 'device_client_id_blob'
print(f'[INFO] Writing client ID blob to: {client_id_blob_file}')
client_id_blob_file.write_bytes(device.client_id)
if device.vmp:
vmp_blob_file = out_dir / 'device_vmp_blob'
print(f'[INFO] Writing VMP blob to: {vmp_blob_file}')
vmp_blob_file.write_bytes(device.vmp)
def write_json_file(out_dir, name, client_id, device):
wv_json_file = out_dir / 'wv.json'
description = f'{name} ({client_id.Token._DeviceCertificate.SystemId})'
print(f'[INFO] Writing JSON file to: {wv_json_file}')
wv_json_file.write_text(json.dumps({
'name': name,
'description': description,
'security_level': device.security_level,
'session_id_type': device.type.lower(),
'private_key_available': True,
'vmp': bool(device.vmp),
'send_key_control_nonce': device.type == DeviceTypes.ANDROID
}, indent=2))
def main():
args = parse_args()
name = args.file.with_suffix('').name
out_dir = Path.cwd() / 'cdm' / 'devices' / 'android_generic'
out_dir.mkdir(parents=True, exist_ok=True)
with args.file.open('rb') as fd:
device = WidevineDeviceStruct.parse_stream(fd)
print(f'\n[INFO] Starting Widevine Device Information Parsing')
write_key_and_blob_files(out_dir, device)
client_id = ClientIdentification()
client_id.ParseFromString(device.client_id)
write_json_file(out_dir, name, client_id, device)
print('[INFO] Done')
if __name__ == '__main__':
main()

83
gettoken.py Normal file
View File

@ -0,0 +1,83 @@
import requests
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
# 'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://npo.nl/start/serie/nos-journaal/seizoen-328/nos-journaal_91491/afspelen',
'Content-Type': 'application/json',
# 'sentry-trace': 'adf1091e221d423fa031623483d5a75f-b2cd5555b5236103-0',
'baggage': 'sentry-environment=prod,sentry-release=7-1xQyxcfFi-KAQG1GEsk,sentry-public_key=ff5dbc8fc4e94b9390c3581c962b975a,sentry-trace_id=adf1091e221d423fa031623483d5a75f,sentry-transaction=%2Fserie%2F%5BseriesSlug%5D%2F%5B%5B...seriesParams%5D%5D,sentry-sampled=false',
'DNT': '1',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
# Requests doesn't support trailers
# 'TE': 'trailers',
}
session = requests.Session()
response = session.get('https://npo.nl/start/api/auth/session', headers=headers)
response_cookies = session.cookies.get_dict()
# dict:
# {
# '__Host-next-auth.csrf-token' : '73bcedab2973fc2547d6a8f4405da66ac1dbeb432e21ba209ed93dac14dfbe88%7Cf4ed5605a6f0b044e80da012b9705c9d915a6603e2109d38ebff47823d58acc3',
# '__Secure-next-auth.callback-url' : 'https%3A%2F%2Fnpo.nl'
# }
csrf = response_cookies["__Host-next-auth.csrf-token"]
print(f'csrf: {csrf}')
cookies2 = {
'__Host-next-auth.csrf-token': csrf,
'__Secure-next-auth.callback-url': 'https%3A%2F%2Fnpo.nl',
# 'CCM_Wrapper_Cache': 'eyJ2ZXIiOiJ2My4yLjgiLCJqc2giOiIiLCJjaWQiOiJWSHNva3FFUkk2TVozbTI1IiwiY29uaWQiOiJXR0ptTCJ9',
'pa_privacy': '%22optin%22',
'_pcid': '%7B%22browserId%22%3A%22lr5dxgvg8okqb7ru%22%2C%22_t%22%3A%22m6tsuy7i%7Clr5dxgvi%22%7D',
'_pctx': '%7Bu%7DN4IgrgzgpgThIC4B2YA2qA05owMoBcBDfSREQpAeyRCwgEt8oBJAE0RXSwH18yBbAGz4IYAJ4B2AFYAfVDACsrAB4BzAG5SQAXyA',
'Cookie_Consent': 'false',
'CCM_ID': 'VHsokqERI6MZ3m25',
'Cookie_Category_Necessary': 'true',
'Cookie_Category_Analytics': 'true',
# 'Cookie_Category_Social': '',
# 'bitmovin_analytics_uuid': 'f6cdf35e-618f-4b82-8bec-2a543be32390',
}
headers2 = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
# 'Accept-Encoding': 'gzip, deflate, br',
'Content-Type': 'application/json',
# 'sentry-trace': 'da0c64681077477196b6c39ce00fe616-b275f8f0de3ad282-0',
# 'baggage': 'sentry-environment=prod,sentry-release=7-1xQyxcfFi-KAQG1GEsk,sentry-public_key=ff5dbc8fc4e94b9390c3581c962b975a,sentry-trace_id=da0c64681077477196b6c39ce00fe616',
'Origin': 'https://npo.nl',
'DNT': '1',
'Connection': 'keep-alive',
'Referer': 'https://npo.nl/start/serie/nos-journaal/seizoen-328/nos-journaal_91491/afspelen',
# 'Cookie': '__Host-next-auth.csrf-token=e47c6a34c0fe5a4d68821408a2b1271ca16d7ae435f8ca6ddb1f088f00f4ec84%7C8b09f39a2efa3e8229eef01fffb4a41ee08e19996f0e6ca860260e68aac1d682; __Secure-next-auth.callback-url=https%3A%2F%2Fnpo.nl; CCM_Wrapper_Cache=eyJ2ZXIiOiJ2My4yLjgiLCJqc2giOiIiLCJjaWQiOiJWSHNva3FFUkk2TVozbTI1IiwiY29uaWQiOiJXR0ptTCJ9; pa_privacy=%22optin%22; _pcid=%7B%22browserId%22%3A%22lr5dxgvg8okqb7ru%22%2C%22_t%22%3A%22m6tsuy7i%7Clr5dxgvi%22%7D; _pctx=%7Bu%7DN4IgrgzgpgThIC4B2YA2qA05owMoBcBDfSREQpAeyRCwgEt8oBJAE0RXSwH18yBbAGz4IYAJ4B2AFYAfVDACsrAB4BzAG5SQAXyA; Cookie_Consent=false; CCM_ID=VHsokqERI6MZ3m25; Cookie_Category_Necessary=true; Cookie_Category_Analytics=true; Cookie_Category_Social=; bitmovin_analytics_uuid=f6cdf35e-618f-4b82-8bec-2a543be32390',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
# Requests doesn't support trailers
# 'TE': 'trailers',
}
#json_data = {
# 'productId': 'POW_05759713',
#}
response2 = requests.post('https://npo.nl/start/api/domain/player-token', cookies=cookies2, headers=headers2)
#videoplayer token
token = response2.json()["token"]
print(f'token: {token}')

66
init_pssh.py Normal file
View File

@ -0,0 +1,66 @@
import base64
import sys
from pathlib import Path
from google.protobuf.message import DecodeError
from cdm.wks import WidevineCencHeader
# File path to read the raw data
file_path = "your init"
# Read the raw data from the file
raw = Path(file_path).read_bytes()
# Find the offset of 'pssh' in the raw data
pssh_offset = raw.rfind(b'pssh')
if pssh_offset == -1:
print("[ERROR] 'pssh' not found in the file.")
sys.exit(1)
else:
# Extract the PSSH data based on the offset and length information
_start = max(pssh_offset - 4, 0)
_end = min(pssh_offset - 4 + raw[pssh_offset - 1], len(raw))
pssh = raw[_start:_end]
# Display the PSSH data in base64 format
print('\n[INFO] PSSH:', base64.b64encode(pssh).decode('utf-8'))
pssh_b64 = base64.b64encode(pssh)
print("\n[SUCCESS] PSSH extracted successfully.")
# Check if the PSSH data needs modification
if not pssh[12:28] == bytes([237, 239, 139, 169, 121, 214, 74, 206, 163, 200, 39, 220, 213, 29, 33, 237]):
print("[Modifying PSSH data...]")
# Create a new PSSH data with the required modifications
new_pssh = bytearray([0, 0, 0])
new_pssh.append(32 + len(pssh))
new_pssh[4:] = bytearray(b'pssh')
new_pssh[8:] = [0, 0, 0, 0]
new_pssh[13:] = [237, 239, 139, 169, 121, 214, 74, 206, 163, 200, 39, 220, 213, 29, 33, 237]
new_pssh[29:] = [0, 0, 0, 0]
new_pssh[31] = len(pssh)
new_pssh[32:] = pssh
pssh_b64 = base64.b64encode(new_pssh)
print("[Modified PSSH data:", pssh_b64.decode(), "]")
else:
print("[PSSH data doesn't need modification.]")
# Parse the modified or original PSSH data using WidevineCencHeader
parsed_init_data = WidevineCencHeader()
try:
parsed_init_data.ParseFromString(base64.b64decode(pssh_b64))
print("[PSSH data parsed successfully.]")
except (DecodeError, SystemError) as e:
print("[Error parsing PSSH data:", e, "]")
try:
# Attempt to parse PSSH data from byte offset 32
id_bytes = parsed_init_data.ParseFromString(base64.b64decode(pssh_b64)[32:])
print("[PSSH data parsed successfully from byte offset 32.]")
except DecodeError as de:
print("[Error parsing PSSH data from byte offset 32:", de, "]")
sys.exit(1)
# Convert the parsed key_id to a hexadecimal string
key_id_str = ''.join(['{:02x}'.format(b) for b in parsed_init_data.key_id[0]])
print("[Key ID in hexadecimal:", key_id_str, "]")
print("\n[SUCCESS] [Done]\n")

36
ism.py Normal file
View File

@ -0,0 +1,36 @@
import argparse
from cdm.wks import parse_manifest_ism
def main():
# Create an ArgumentParser object and add the 'urls' argument
parser = argparse.ArgumentParser(description='Script for parsing Smooth Streaming manifest URLs.')
parser.add_argument('urls',
help='The URLs to parse. You may need to wrap the URLs in double quotes if you have issues.',
nargs='+')
# Parse the arguments
args = parser.parse_args()
# Iterate over the provided URLs
for manifest_link in args.urls:
kid, stream_info_list, encoded_string = parse_manifest_ism(manifest_link)
# Print information for each stream
for stream_info in stream_info_list:
type_info = stream_info['type']
codec = stream_info['codec']
bitrate = stream_info['bitrate']
resolution = stream_info['resolution']
if type_info == 'video':
print(f'[INFO] VIDEO - Codec: {codec}, Resolution: {resolution}, Bitrate: {bitrate}')
elif type_info == 'audio':
language = stream_info['language']
track_id = stream_info['track_id']
print(f'[INFO] AUDIO - Codec: {codec}, Bitrate: {bitrate}, Language: {language}, Track ID: {track_id}')
# Print PSSH information
print('\n[INFO] PSSH:', encoded_string)
if __name__ == "__main__":
main()

43
main.py Normal file
View File

@ -0,0 +1,43 @@
import argparse
from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor
import requests
def get_keys_license(mpd_url, license_url):
response = requests.get(mpd_url)
pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh()
print("PSSH value:", pssh_value)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
}
cert_b64 = None
key_extractor = KeyExtractor(pssh_value, cert_b64, license_url, headers)
keys = key_extractor.get_keys()
wvdecrypt = WvDecrypt(init_data_b64=pssh_value, cert_data_b64=cert_b64, device=device_android_generic)
raw_challenge = wvdecrypt.get_challenge()
data = raw_challenge
return keys
def main():
parser = argparse.ArgumentParser(description="Decrypt Widevine content using MPD URL and License URL")
parser.add_argument("-mpd", required=True, help="URL of the MPD manifest")
parser.add_argument("-lic", required=True, help="URL of the license server")
args = parser.parse_args()
mpd_url = args.mpd
license_url = args.lic
keys = get_keys_license(mpd_url, license_url)
for key in keys:
if isinstance(key, list):
if key:
for key_str in key:
print(f"KEY: {key_str}")
if __name__ == "__main__":
main()

58
main_dsnp.py Normal file
View File

@ -0,0 +1,58 @@
import argparse
from cdm.wks import KeyExtractor, DataExtractor_DSNP
import requests
token = ""
def get_keys_license(m3u8_url):
response = requests.get(m3u8_url)
content = response.text if response.status_code == 200 else None
data_extractor = DataExtractor_DSNP(content)
if content:
characteristics_list = data_extractor.get_characteristics_list()
if characteristics_list:
print("Choose CHARACTERISTICS Value:")
for i, (characteristics, _) in enumerate(characteristics_list):
print(f"{i + 1}. {characteristics}")
choice = int(input("Enter the number of the CHARACTERISTICS you want: "))
characteristics, base64_data = data_extractor.extract_base64_by_choice(choice)
if characteristics and base64_data:
print("CHARACTERISTICS Value:", characteristics)
print("PSSH value (Base64 Data):", base64_data)
license_url = "https://disney.playback.edge.bamgrid.com/widevine/v1/obtain-license"
headers = {
'authorization': f'Bearer {token}',
'origin': 'https://www.disneyplus.com',
'referer': 'https://www.disneyplus.com/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
}
cert_b64 = None
key_extractor = KeyExtractor(base64_data, cert_b64, license_url, headers)
keys = key_extractor.get_keys()
for key in keys:
if isinstance(key, list):
if key:
for key_str in key:
print(f"KEY: {key_str}")
return base64_data
def main():
parser = argparse.ArgumentParser(description="Decrypt Widevine content using M3U8 URL")
parser.add_argument("-m3u8", required=True, help="URL of the M3U8 manifest")
args = parser.parse_args()
m3u8_url = args.m3u8
pssh_value = get_keys_license(m3u8_url)
if __name__ == "__main__":
main()

42
main_m3u8.py Normal file
View File

@ -0,0 +1,42 @@
from cdm.wks import WvDecrypt, device_android_generic, extract_pssh_m3u8, KeyExtractor
import argparse
import requests
def get_keys_license(m3u8_url, license_url):
response = requests.get(m3u8_url)
pssh_value = extract_pssh_m3u8(response.text)
print("PSSH value:", pssh_value)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
}
cert_b64 = None
key_extractor = KeyExtractor(pssh_value, cert_b64, license_url, headers)
keys = key_extractor.get_keys()
wvdecrypt = WvDecrypt(init_data_b64=pssh_value, cert_data_b64=cert_b64, device=device_android_generic)
raw_challenge = wvdecrypt.get_challenge()
data = raw_challenge
return keys
def main():
parser = argparse.ArgumentParser(description="Decrypt Widevine content using M3U8 URL and License URL")
parser.add_argument("-m3u8", required=True, help="URL of the M3U8 manifest")
parser.add_argument("-lic", required=True, help="URL of the license server")
args = parser.parse_args()
m3u8_url = args.m3u8
license_url = args.lic
keys = get_keys_license(m3u8_url, license_url)
for key in keys:
if isinstance(key, list):
if key:
for key_str in key:
print(f"KEY: {key_str}")
if __name__ == "__main__":
main()

317
npo all-in-one.py Normal file
View File

@ -0,0 +1,317 @@
# Pre-requisites:
# * N_m3u8DL-RE and mp4decrypt in current directory
# * ffmpeg in PATH
# PIP Requirements:
# * protobuf
# * bs4
# * xmltodict
# * browser_cookie3
# * requests
# * pycryptodomex
import argparse
import requests
import subprocess
import os
from bs4 import BeautifulSoup
import json
import platform # check for windows OS
import shutil # check for ffmpeg in PATH, part of python std
import browser_cookie3 # cookies for premium accs
from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor
import concurrent.futures
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.3',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Cache-Control': 'no-cache',
}
license_url = "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication"
if platform.system() == "Windows":
windows_flag = True
else:
windows_flag = False
parser = argparse.ArgumentParser(description='PYWKS-NPO')
parser.add_argument('-url', dest='url', required=False, help='NPO Video URL')
parser.add_argument('-file', dest='file', required=False, help='File with NPO Video URLs, one per line')
args = parser.parse_args()
def parse_url_file(file_path):
with open(file_path, 'r') as file:
urls = [line.strip() for line in file]
return urls
if args.file and args.url:
print("ERR: Please specify just one argument.")
print("-url: input NPO video URL")
print("-file: input a file with NPO video URLS, one per line")
exit()
elif args.file:
urls = parse_url_file(args.file)
elif args.url:
urls = [args.url]
else:
print("ERR: Please input your URL.")
print("-url: input NPO video URL")
print("-file: input a file with NPO video URLS, one per line")
exit()
def find_cookies():
print("NPO Plus subscribers are able to download in 1080p instead of 540p.")
print("Are you an NPO Plus subscriber and logged in on your browser? (y/N)")
userinput = input().lower()
if not userinput or userinput.lower() != 'y':
return
cookies = browser_cookie3.load(domain_name='npo.nl')
return cookies
def find_targetId(url):
# Get full HTML and extract productId and episode number
# "future proof"
response_targetId = requests.get(url)
content = response_targetId.content
try:
url_split = url.split("/")
target_slug = url_split[7]
except:
print("URL invalid.")
print("URL format: https://npo.nl/start/serie/wie-is-de-mol/seizoen-24/wie-is-de-mol_56/afspelen")
print(f"Your URL: {url}")
exit()
soup = BeautifulSoup(content, 'html.parser')
script_tag = soup.find('script', {'id': '__NEXT_DATA__'})
if script_tag:
script_content = script_tag.contents[0]
else:
print("Script tag not found.")
print("Hint: Use the -token <token> argument to supply your own.")
def search(data, target_slug):
if isinstance(data, list):
for item in data:
result = search(item, target_slug)
if result:
return result
elif isinstance(data, dict):
for key, value in data.items():
if key == "slug" and value == target_slug:
return data.get("productId"), data.get("programKey")
else:
result = search(value, target_slug)
if result:
return result
return None
data_dict = json.loads(script_content)
target_product_id = search(data_dict, target_slug)
return target_product_id
def find_CSRF(targetId, plus_cookie):
response_CSRF = requests.get('https://npo.nl/start/api/auth/session', headers=headers, cookies=plus_cookie)
response_cookies = response_CSRF.cookies.get_dict()
csrf = response_cookies["__Host-next-auth.csrf-token"]
csrf_cookies = {
'__Host-next-auth.csrf-token': csrf,
'__Secure-next-auth.callback-url': 'https%3A%2F%2Fnpo.nl',
}
if not plus_cookie:
plus_cookie = csrf_cookies
json_productId = {
'productId': targetId,
}
response_token = requests.post('https://npo.nl/start/api/domain/player-token', cookies=plus_cookie, headers=headers, json=json_productId)
token = response_token.json()["token"]
return token
def find_MPD(token, url, plus_cookie):
headers['Authorization'] = token
json_auth = {
'profileName': 'dash',
'drmType': 'widevine',
'referrerUrl': url,
}
response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_auth, cookies=plus_cookie)
response_data = response.json()
stream_data = response_data.get('stream', {})
if stream_data.get('streamURL'):
return stream_data
else:
print("NO MPD URL - BAD TOKEN")
print(response_data)
exit()
def find_PSSH(mpd):
mpd_url = mpd.get('streamURL')
response = requests.get(mpd_url, headers=headers)
pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh()
return pssh_value, mpd_url
def find_key(mpd, pssh):
headers_license = {
'x-custom-data': mpd.get('drmToken'),
'origin': 'https://start-player.npo.nl',
'referer': 'https://start-player.npo.nl/',
}
cert_b64 = None
key_extractor = KeyExtractor(pssh, cert_b64, license_url, headers_license)
keys = key_extractor.get_keys()
wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64=cert_b64, device=device_android_generic)
raw_challenge = wvdecrypt.get_challenge()
data = raw_challenge
for key in keys:
if isinstance(key, list):
if key:
for key_str in key:
return key_str
def check_prereq():
if windows_flag == True:
prereq_filelist = ['mp4decrypt.exe', 'N_m3u8DL-RE.exe']
else:
prereq_filelist = ['mp4decrypt', 'N_m3u8DL-RE']
for file in prereq_filelist:
if not os.path.isfile(file):
print(f"ERR: {file} not found!")
print("Please check your directory and try again.")
exit()
if shutil.which("ffmpeg") is None:
print("ffmpeg not found in PATH.")
exit()
def create_filename(url, programKey):
# 1 2 3 4 5 6 7 8 (optional)
# create filename based on input URL: https://npo.nl/start/serie /wie-is-de-mol /seizoen-24 /wie-is-de-mol_56 /afspelen
# https://npo.nl/start/serie /de-avondshow-met-arjen-lubach /seizoen-8_1 /de-avondshow-met-arjen-lubach_93 /afspelen
url_split = url.split("/")
title = url_split[7].split("_")[0]
season = url_split[6].split("_")[0]
filename_enc = title + "_" + season + "_ep-" + programKey + "_encrypted"
filename = filename_enc.replace("_encrypted", "")
return filename_enc, filename
def download(mpd_url, filename_enc):
# output: filename.m4a (audio) and filename.mp4 (video)
if windows_flag == True:
subprocess.run(['N_m3u8DL-RE.exe', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL)
else:
subprocess.run(['N_m3u8DL-RE', '--auto-select', '--no-log', '--save-name', filename_enc, mpd_url], stdout=subprocess.DEVNULL)
def decrypt(key, filename_enc, filename):
if windows_flag == True:
subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL)
subprocess.run(['mp4decrypt.exe', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL)
else:
subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".mp4"), str(filename + "_video.mp4")], stdout=subprocess.DEVNULL)
subprocess.run(['mp4decrypt', '--key', key, str(filename_enc + ".m4a"), str(filename + "_audio.m4a")], stdout=subprocess.DEVNULL)
def merge(filename):
ffmpeg_command = [
'ffmpeg', '-v', 'quiet', # '-stats',
'-i', filename + "_video.mp4",
'-i', filename + "_audio.m4a",
'-c:v', 'copy',
'-c:a', 'copy',
'-strict', 'experimental',
filename + ".mp4"
]
subprocess.run(ffmpeg_command)
def clean(filename_enc, filename):
os.remove(filename_enc + ".mp4")
os.remove(filename_enc + ".m4a")
os.remove(filename + "_audio.m4a")
os.remove(filename + "_video.mp4")
def check_file(filename):
if not os.path.exists(filename + ".mp4"):
print("File not found. Continue anyway? (y/N)")
userinput = input().lower()
if not userinput or userinput != 'y':
exit()
def execute(url, plus_cookie, process_no):
productId, programKey = find_targetId(url)
token = find_CSRF(productId,plus_cookie)
mpd = find_MPD(token, url, plus_cookie)
pssh, mpd_url = find_PSSH(mpd)
key = find_key(mpd, pssh)
check_prereq()
filename_enc, filename = create_filename(url, programKey)
download(mpd_url, filename_enc)
decrypt(key, filename_enc, filename)
merge(filename)
clean(filename_enc, filename)
check_file(filename)
return process_no # keeps track of process index to return x/y videos completed message
plus_cookie = find_cookies()
max_workers = min(os.cpu_count(), len(urls))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(execute, url, plus_cookie, i + 1) for i, url in enumerate(urls)]
completed_videos = 0
for future in concurrent.futures.as_completed(futures):
result = future.result()
completed_videos += 1
print(f"{completed_videos}/{len(urls)} video{'s'[:len(urls) != 1]} completed")
#########
# NOTES #
#########
# The downloader *should* work across every platform, linux/mac/win.
# It has not been extensively tested on anything but windows. DM me if you need help :D
# Supported browsers for NPO Plus cookies:
# (https://github.com/borisbabic/browser_cookie3#testing-dates--ddmmyy)
# * Chrome
# * Firefox
# * LibreWolf
# * Opera
# * Opera GX
# * Edge
# * Chromium
# * Brave
# * Vivaldi
# * Safari

135
npo.py Normal file
View File

@ -0,0 +1,135 @@
import argparse
import requests
from bs4 import BeautifulSoup
import json
from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor
# Parse URL input
parser = argparse.ArgumentParser(description='PYWKS-NPO')
parser.add_argument('-url', dest='url', required=True, help='NPO Video URL')
args = parser.parse_args()
# Get HTML and extract productId
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-site',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
response_targetId = requests.get(args.url, headers=headers)
content = response_targetId.content
try:
url_split = args.url.split("/")
target_slug = url_split[7]
except:
print("URL invalid.")
print("URL format: https://npo.nl/start/serie/wie-is-de-mol/seizoen-24/wie-is-de-mol_56/afspelen")
print(f"Your URL: {args.url}")
exit()
soup = BeautifulSoup(content, 'html.parser')
script_tag = soup.find('script', {'id': '__NEXT_DATA__'})
if script_tag:
script_content = script_tag.contents[0]
else:
print("Script tag not found.")
def search(data, target_slug):
if isinstance(data, list):
for item in data:
result = search(item, target_slug)
if result:
return result
elif isinstance(data, dict):
for key, value in data.items():
if key == "slug" and value == target_slug:
return data.get("productId")
else:
result = search(value, target_slug)
if result:
return result
return None
data_dict = json.loads(script_content)
target_product_id = search(data_dict, target_slug)
# Get CSRF token
response_CSRF = requests.get('https://npo.nl/start/api/auth/session', headers=headers)
response_cookies = response_CSRF.cookies.get_dict()
csrf = response_cookies["__Host-next-auth.csrf-token"]
# Get player token
cookies = {
'__Host-next-auth.csrf-token': csrf,
'__Secure-next-auth.callback-url': 'https%3A%2F%2Fnpo.nl',
}
json_productId = {
'productId': target_product_id,
}
response_token = requests.post('https://npo.nl/start/api/domain/player-token', cookies=cookies, headers=headers, json=json_productId)
token = response_token.json()["token"]
# Get MPD URL
headers['authorization'] = token
json_auth = {
'profileName': 'dash',
'drmType': 'widevine',
'referrerUrl': args.url,
}
response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_auth)
response_data = response.json()
stream_data = response_data.get('stream', {})
if stream_data.get('streamURL'):
print('MPD URL:', stream_data.get('streamURL'))
else:
print("NO MPD URL - BAD TOKEN")
exit()
# Get PSSH
mpd_url = stream_data.get('streamURL')
license_url = "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication"
response = requests.get(mpd_url, headers=headers)
pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh()
print("PSSH:", pssh_value)
headers_license = {
'x-custom-data': stream_data.get('drmToken'),
'origin': 'https://start-player.npo.nl',
'referer': 'https://start-player.npo.nl/',
}
# Get Key
cert_b64 = None
key_extractor = KeyExtractor(pssh_value, cert_b64, license_url, headers_license)
keys = key_extractor.get_keys()
wvdecrypt = WvDecrypt(init_data_b64=pssh_value, cert_data_b64=cert_b64, device=device_android_generic)
raw_challenge = wvdecrypt.get_challenge()
data = raw_challenge
for key in keys:
if isinstance(key, list):
if key:
for key_str in key:
print(f"KEY: {key_str}")

78
npo_new.py Normal file
View File

@ -0,0 +1,78 @@
import argparse
import requests
from bs4 import BeautifulSoup
import json
import logging
import coloredlogs
from cdm.wks import WvDecrypt, device_android_generic, PsshExtractor, KeyExtractor
# def search(data, target_slug):
# if isinstance(data, list):
# for item in data:
# result = search(item, target_slug)
# if result:
# return result
# elif isinstance(data, dict):
# for key, value in data.items():
# if key == "slug" and value == target_slug:
# return data.get("productId")
# else:
# result = search(value, target_slug)
# if result:
# return result
# return None
parser = argparse.ArgumentParser(description='PYWKS-NPO')
parser.add_argument('-url', dest='url', required=True, help='NPO Video URL')
parser.add_argument("-logger", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Logger level")
args = parser.parse_args()
LOG_FORMAT = "{asctime} [{levelname[0]}] {name} : {message}"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
LOG_STYLE = "{"
coloredlogs.install(level=args.logger, fmt=LOG_FORMAT, datefmt=LOG_DATE_FORMAT, style=LOG_STYLE)
logger = logging.getLogger("NPO")
response_target_id = requests.get(args.url)
content = response_target_id.content
try:
target_slug = args.url.split("/")[7]
except IndexError:
logger.error("Invalid URL format. Example: https://npo.nl/start/serie/wie-is-de-mol/seizoen-24/wie-is-de-mol_56/afspelen")
exit()
soup = BeautifulSoup(content, 'html.parser')
script_tag = soup.find('script', {'id': '__NEXT_DATA__'})
script_content = script_tag.contents[0] if script_tag else logger.error("Script tag not found.")
data_dict = json.loads(script_content)
target_product_id = search(data_dict, target_slug)
if not target_product_id:
logger.error("Failed to retrieve target product ID.")
exit()
response_csrf = requests.get('https://npo.nl/start/api/auth/session')
cookies = {'__Host-next-auth.csrf-token': response_csrf.cookies.get_dict()["__Host-next-auth.csrf-token"],'__Secure-next-auth.callback-url': 'https://npo.nl'}
json_product_id = {'productId': target_product_id}
response_token = requests.post('https://npo.nl/start/api/domain/player-token', cookies=cookies, json=json_product_id)
headers = {'authorization': response_token.json()["token"]}
json_auth = {'profileName': 'dash', 'drmType': 'widevine', 'referrerUrl': args.url}
response = requests.post('https://prod.npoplayer.nl/stream-link', headers=headers, json=json_auth)
stream_data = response.json().get('stream', {})
if not stream_data.get('streamURL'):
logger.error("Failed to retrieve MPD URL. Invalid or expired authentication token.")
exit()
mpd_url = stream_data.get('streamURL')
logger.info(f"MPD URL: {mpd_url}")
license_url = "https://npo-drm-gateway.samgcloud.nepworldwide.nl/authentication"
response = requests.get(mpd_url, headers=headers)
pssh_extractor = PsshExtractor(response.text)
pssh_value = pssh_extractor.extract_pssh()
logger.info(f"PSSH: {pssh_value}")
headers_license = {'x-custom-data': stream_data.get('drmToken'),'origin': 'https://start-player.npo.nl','referer': 'https://start-player.npo.nl/'}
cert_b64 = None
key_extractor = KeyExtractor(pssh_value, cert_b64, license_url, headers_license)
keys = key_extractor.get_keys()
wvdecrypt = WvDecrypt(init_data_b64=pssh_value, cert_data_b64=cert_b64, device=device_android_generic)
raw_challenge = wvdecrypt.get_challenge()
for key in keys:
if isinstance(key, list) and key:
for key_str in key:
logger.info(f"\u251C KEY: {key_str}")

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
protobuf
bs4
xmltodict
browser_cookie3
requests
pycryptodomex