From c92b4453e13b6c94d7c75d7e07bc47ac8f4699f1 Mon Sep 17 00:00:00 2001
From: TPD94 <>
Date: Fri, 8 Dec 2023 13:53:24 -0500
Subject: [PATCH] Added WEB-DL
---
.gitignore | 7 +
Helpers/__init__.py | 8 +
Helpers/api_check.py | 21 ++
Helpers/binary_check.py | 161 ++++++++++++
Helpers/cache_key.py | 13 +
Helpers/capability_check.py | 27 ++
Helpers/database_check.py | 19 ++
Helpers/download.py | 193 ++++++++++++++
Helpers/mpd_parse.py | 28 ++
Helpers/wvd_check.py | 21 ++
License_cURL.py | 5 -
README.md | 14 +-
Sites/Crunchyroll.py | 201 +++++++++++++++
Sites/Generic.py | 207 +++++++++++++++
Sites/YouTube.py | 180 +++++++++++++
Sites/__init__.py | 3 +
requirements.txt | Bin 41 -> 134 bytes
tpd-keys.py | 502 ++++++------------------------------
18 files changed, 1172 insertions(+), 438 deletions(-)
create mode 100644 .gitignore
create mode 100644 Helpers/__init__.py
create mode 100644 Helpers/api_check.py
create mode 100644 Helpers/binary_check.py
create mode 100644 Helpers/cache_key.py
create mode 100644 Helpers/capability_check.py
create mode 100644 Helpers/database_check.py
create mode 100644 Helpers/download.py
create mode 100644 Helpers/mpd_parse.py
create mode 100644 Helpers/wvd_check.py
create mode 100644 Sites/Crunchyroll.py
create mode 100644 Sites/Generic.py
create mode 100644 Sites/YouTube.py
create mode 100644 Sites/__init__.py
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..ccaa126
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+WVDs/
+venv/
+keys/
+Config/
+download/
+binaries/
+*.pyc
\ No newline at end of file
diff --git a/Helpers/__init__.py b/Helpers/__init__.py
new file mode 100644
index 0000000..1961d81
--- /dev/null
+++ b/Helpers/__init__.py
@@ -0,0 +1,8 @@
+from . import wvd_check
+from . import api_check
+from . import capability_check
+from . import database_check
+from . import cache_key
+from . import mpd_parse
+from . import download
+from . import binary_check
\ No newline at end of file
diff --git a/Helpers/api_check.py b/Helpers/api_check.py
new file mode 100644
index 0000000..184f9dc
--- /dev/null
+++ b/Helpers/api_check.py
@@ -0,0 +1,21 @@
+# Import dependencies
+import os
+
+
+# Define api key check
+def api_check():
+ # Create Config directory if it doesn't exist
+ if 'Config' not in os.listdir(fr'{os.getcwd()}'):
+ os.makedirs(f'{os.getcwd()}/Config')
+ # Create api-key.txt if it doesn't exist
+ if not os.path.isfile(f'{os.getcwd()}/Config/api-key.txt'):
+ with open(f'{os.getcwd()}/Config/api-key.txt', 'w') as api_key_text:
+ api_key_text.write("Place your API key on this line")
+ return "First run"
+ # Grab API Key from the text file
+ with open(f'{os.getcwd()}/Config/api-key.txt') as API_Key_Text:
+ api_key = API_Key_Text.readline()
+ if api_key != "Place your API key on this line":
+ return api_key
+ else:
+ return None
diff --git a/Helpers/binary_check.py b/Helpers/binary_check.py
new file mode 100644
index 0000000..23e15a9
--- /dev/null
+++ b/Helpers/binary_check.py
@@ -0,0 +1,161 @@
+import os
+import zipfile
+import shutil
+import requests
+from tqdm import tqdm
+
+
+# Create / Check folders function
+def create_folders():
+ # Check for required directories
+ for directory in ['binaries', 'download']:
+ # Create them if they don't exist
+ if directory not in os.listdir(os.getcwd()):
+ os.makedirs(f'{os.getcwd()}/{directory}')
+ # If they do exist, check for temp, create if it doesn't exist
+ if 'temp' not in f'{os.listdir(os.getcwd())}/download':
+ os.makedirs(f'{os.getcwd()}/download/temp')
+
+
+# Create / Check binaries function
+def create_binaries():
+ # Check if the required binaries exist, if not, download them.
+
+ # Iterate through required binaries
+ for binary in ["n_m3u8dl-re.exe", "mp4decrypt.exe", "ffmpeg.exe", "yt-dlp.exe"]:
+
+ # Perform checks for each binary
+ if not os.path.isfile(f"{os.getcwd()}/binaries/{binary}"):
+
+ # FFmpeg
+ if binary == "ffmpeg.exe":
+
+ # Download windows zip file for FFmpeg
+ ffmpeg_download = requests.get(
+ "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip",
+ stream=True)
+ total_size = int(ffmpeg_download.headers.get('content-length', 0))
+ with open(f"{os.getcwd()}/download/temp/ffmpeg.zip", 'wb') as download:
+ with tqdm(total=total_size, unit='B', unit_scale=True,
+ desc="Downloading ffmpeg.zip") as progress_bar:
+ for data in ffmpeg_download.iter_content(chunk_size=1024):
+ download.write(data)
+ progress_bar.update(len(data))
+
+ # Unzip FFmpeg
+ with zipfile.ZipFile(f"{os.getcwd()}/download/temp/ffmpeg.zip", "r") as ffmpeg_zip:
+ file_count = len(ffmpeg_zip.infolist())
+ with tqdm(total=file_count, unit='file', desc="Extracting ffmpeg.zip") as unzip_progress_bar:
+ for file in ffmpeg_zip.infolist():
+ ffmpeg_zip.extract(file, path=f"{os.getcwd()}/download/temp")
+ unzip_progress_bar.update(1)
+
+ # Copy ffmpeg binary to binaries
+ shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl/bin/ffmpeg.exe",
+ f"{os.getcwd()}/binaries")
+
+ # Remove the zip
+ os.remove(f"{os.getcwd()}/download/temp/ffmpeg.zip")
+
+ # Remove the folder
+ shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl")
+
+ # Print a new line
+ print()
+
+ # MP4 Decrypt
+ elif binary == "mp4decrypt.exe":
+
+ # Download mp4decrypt zip file
+ mp4decrypt_download = requests.get(
+ "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32.zip", stream=True)
+ total_size = int(mp4decrypt_download.headers.get('content-length', 0))
+ with open(f"{os.getcwd()}/download/temp/mp4decrypt.zip", 'wb') as download:
+ with tqdm(total=total_size, unit='B', unit_scale=True,
+ desc="Downloading mp4decrypt.zip") as progress_bar:
+ for data in mp4decrypt_download.iter_content(chunk_size=1024):
+ download.write(data)
+ progress_bar.update(len(data))
+
+ # Unzip mp4decrypt
+ with zipfile.ZipFile(f"{os.getcwd()}/download/temp/mp4decrypt.zip", "r") as mp4decrypt_zip:
+ file_count = len(mp4decrypt_zip.infolist())
+ with tqdm(total=file_count, unit='file', desc="Extracting mp4decrypt.zip") as unzip_progress_bar:
+ for file in mp4decrypt_zip.infolist():
+ mp4decrypt_zip.extract(file, path=f"{os.getcwd()}/download/temp")
+ unzip_progress_bar.update(1)
+
+ # Copy mp4decrypt binary to binaries
+ shutil.copy2(
+ f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32/bin/mp4decrypt.exe",
+ f"{os.getcwd()}/binaries")
+
+ # Deleting the zip file
+ os.remove(f"{os.getcwd()}/download/temp/mp4decrypt.zip")
+
+ # Deleting the directory
+ shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32")
+
+ # Print a new line
+ print()
+
+ # n_m3u8dl-re
+ elif binary == "n_m3u8dl-re.exe":
+
+ # Download n_m3u8dl-re zip file
+ n_m3u8dl_re_download = requests.get(
+ "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_win-x64_20230628.zip",
+ stream=True)
+ total_size = int(n_m3u8dl_re_download.headers.get('content-length', 0))
+ with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", 'wb') as download:
+ with tqdm(total=total_size, unit='B', unit_scale=True,
+ desc="Downloading n_m3u8dl-re.zip") as progress_bar:
+ for data in n_m3u8dl_re_download.iter_content(chunk_size=1024):
+ download.write(data)
+ progress_bar.update(len(data))
+
+ # Unzip n_m3u8dl-re
+ with zipfile.ZipFile(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", "r") as nm3u8dl_re_zip:
+ file_count = len(nm3u8dl_re_zip.infolist())
+ with tqdm(total=file_count, unit='file', desc="Extracting n_m3u8dl-re.zip") as unzip_progress_bar:
+ for file in nm3u8dl_re_zip.infolist():
+ nm3u8dl_re_zip.extract(file, path=f"{os.getcwd()}/download/temp")
+ unzip_progress_bar.update(1)
+
+ # Copy n_m3u8dl-re binary to binaries
+ shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64/N_m3u8DL-RE.exe",
+ f"{os.getcwd()}/binaries")
+
+ # Delete zip file
+ os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip")
+
+ # Delete directory
+ shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64")
+
+ # Print a new line
+ print()
+
+ # YT-DLP
+ elif binary == "yt-dlp.exe":
+
+ # Download yt-dlp exe
+ yt_dlp_download = requests.get(
+ "https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_x86.exe",
+ stream=True)
+ total_size = int(yt_dlp_download.headers.get('content-length', 0))
+ with open(f"{os.getcwd()}/download/yt-dlp.exe", 'wb') as download:
+ with tqdm(total=total_size, unit='B', unit_scale=True,
+ desc="Downloading yt-dlp") as progress_bar:
+ for data in yt_dlp_download.iter_content(chunk_size=1024):
+ download.write(data)
+ progress_bar.update(len(data))
+
+ # Copy yt-dlp binary to binaries
+ shutil.copy2(f"{os.getcwd()}/download/yt-dlp.exe",
+ f"{os.getcwd()}/binaries")
+
+ # Remove binary from download folder
+ os.remove(f"{os.getcwd()}/download/yt-dlp.exe")
+
+ # Print a new line
+ print()
diff --git a/Helpers/cache_key.py b/Helpers/cache_key.py
new file mode 100644
index 0000000..9eba312
--- /dev/null
+++ b/Helpers/cache_key.py
@@ -0,0 +1,13 @@
+# Import dependencies
+
+import sqlite3
+import os
+
+
+# Define cache function
+def cache_keys(pssh: str, keys: str):
+ dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db")
+ dbcursor = dbconnection.cursor()
+ dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, keys))
+ dbconnection.commit()
+ dbconnection.close()
\ No newline at end of file
diff --git a/Helpers/capability_check.py b/Helpers/capability_check.py
new file mode 100644
index 0000000..c5c663c
--- /dev/null
+++ b/Helpers/capability_check.py
@@ -0,0 +1,27 @@
+# Import dependencies
+import Helpers
+import os
+
+
+def capability_check():
+ # Check for .WVD and API Key, exit program if neither exist.
+ Device = Helpers.wvd_check.wvd_check()
+ if Device is None:
+ API_Key = Helpers.api_check.api_check()
+ if API_Key == "First run" or API_Key == None:
+ exit(f"No CDM or API key found, please place a CDM in {os.getcwd()}/WVDs or an API key in {os.getcwd()}/Config/api-key.txt")
+ else:
+ print("No local device found, remote decryption only.")
+ print(f'Using API Key: {API_Key}\n')
+ return None, API_Key
+ elif Device is not None:
+ API_Key = Helpers.api_check.api_check()
+ if API_Key == "First run" or API_Key == None:
+ print("No API key found, local decryption only.")
+ print(f'Using device at {Device}\n')
+ return Device, None
+ else:
+ print(f'Local and remote decryption available.')
+ print(f'Using device at {Device}')
+ print(f'Using API Key: {API_Key}\n')
+ return Device, API_Key
diff --git a/Helpers/database_check.py b/Helpers/database_check.py
new file mode 100644
index 0000000..d50f4c9
--- /dev/null
+++ b/Helpers/database_check.py
@@ -0,0 +1,19 @@
+# Import dependencies
+
+import os
+import sqlite3
+
+
+# Check to see if the database already exists, if not create a keys folder, and create the database.
+def database_check():
+ # Check to see if the "keys" directory exists, if not creates it
+ if "keys" not in os.listdir(os.getcwd()):
+ os.makedirs('keys')
+
+ # Check to see if a database exists in keys directory, if not create it
+ if not os.path.isfile(f"{os.getcwd()}/keys/database.db"):
+ print(f"Creating database.\n")
+ dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db")
+ dbcursor = dbconnection.cursor()
+ dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
+ dbconnection.close()
\ No newline at end of file
diff --git a/Helpers/download.py b/Helpers/download.py
new file mode 100644
index 0000000..3673d44
--- /dev/null
+++ b/Helpers/download.py
@@ -0,0 +1,193 @@
+import subprocess
+from os import urandom
+import uuid
+import glob
+import os
+import Helpers.binary_check
+import Sites.Generic
+import license_curl
+
+
+# Web Download function generic
+def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False):
+
+ # Check for folders
+ Helpers.binary_check.create_folders()
+
+ # Check for binaries
+ Helpers.binary_check.create_binaries()
+
+ # Create random download name
+ download_name = str(uuid.uuid4())
+
+ # Retrieve the keys
+ if not remote:
+ mp4decrypt_keys = Sites.Generic.decrypt_generic(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers)
+ if remote:
+ mp4decrypt_keys = Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers, mpd_url=mpd)
+
+ # Define n_m3u8dl-re download parameters
+ n_m3u8dl_re_download = [
+ f'{os.getcwd()}/binaries/n_m3u8dl-re.exe',
+ f'{mpd}',
+ '--ffmpeg-binary-path',
+ f'{os.getcwd()}/binaries/ffmpeg.exe',
+ '--decryption-binary-path',
+ f'{os.getcwd()}/binaries/mp4decrypt.exe',
+ '--tmp-dir',
+ f'{os.getcwd()}/download/temp',
+ '--save-dir',
+ f'{os.getcwd()}/download',
+ '--save-name',
+ f'{download_name}',
+ '--binary-merge',
+ 'True',
+ '--mux-after-done',
+ 'format=mkv'
+ ] + mp4decrypt_keys
+
+ subprocess.run(n_m3u8dl_re_download)
+
+ try:
+ download_name = glob.glob(f'{os.getcwd()}/download/{download_name}.*')
+ return download_name
+ except:
+ return f'Failed to download!'
+
+
+# Web Download crunchyroll function
+def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False):
+
+ # Check for folders
+ Helpers.binary_check.create_folders()
+
+ # Check for binaries
+ Helpers.binary_check.create_binaries()
+
+ # Create random download name
+ download_name = str(uuid.uuid4())
+
+ # Retrieve the keys
+ if not remote:
+ mp4decrypt_keys = Sites.Crunchyroll.decrypt_crunchyroll(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers)
+ if remote:
+ mp4decrypt_keys = Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers, mpd_url=mpd)
+
+ # Define n_m3u8dl-re download parameters
+ n_m3u8dl_re_download = [
+ f'{os.getcwd()}/binaries/n_m3u8dl-re.exe',
+ f'--header',
+ f'authorization: {license_curl.headers["authorization"]}',
+ f'{mpd}',
+ '--ffmpeg-binary-path',
+ f'{os.getcwd()}/binaries/ffmpeg.exe',
+ '--decryption-binary-path',
+ f'{os.getcwd()}/binaries/mp4decrypt.exe',
+ '--tmp-dir',
+ f'{os.getcwd()}/download/temp',
+ '--save-dir',
+ f'{os.getcwd()}/download',
+ '--save-name',
+ f'{download_name}',
+ '--binary-merge',
+ 'True',
+ '--mux-after-done',
+ 'format=mkv'
+ ] + mp4decrypt_keys
+
+ subprocess.run(n_m3u8dl_re_download)
+
+ try:
+ download_name = glob.glob(f'{os.getcwd()}/download/{download_name}.*')
+ return download_name
+ except:
+ return f'Failed to download!'
+
+
+# YouTube Download function generic
+def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote: bool = False):
+
+ # Check for folders
+ Helpers.binary_check.create_folders()
+
+ # Check for binaries
+ Helpers.binary_check.create_binaries()
+
+ # Create random download name
+ download_name = str(uuid.uuid4())
+
+ # Retrieve the keys
+ if not remote:
+ mp4decrypt_keys = Sites.YouTube.decrypt_youtube(wvd=device, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
+ if remote:
+ mp4decrypt_keys = Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
+
+ # Define yt-dlp download parameters
+ yt_dlp_download = [
+ f'{os.getcwd()}/binaries/yt-dlp.exe',
+ '-f',
+ 'bv*+ba/b',
+ '--allow-u',
+ '-o',
+ f'{os.getcwd()}/download/{download_name}.%(ext)s',
+ '-S',
+ 'ext',
+ '-S',
+ 'res:720',
+ f'{url}'
+ ]
+
+ # Run yt-dlp
+ subprocess.run(yt_dlp_download)
+
+ # Get the names of the newly downloaded files
+ files = glob.glob(f'{os.getcwd()}/download/{download_name}.*')
+
+ # Declare empty list for decrypted files location to be stored
+ decrypted_files = []
+
+ # Iterate through all the files and decrypt them
+ for file in files:
+
+ # Assign file name variable to be appended to decrypted files list
+ file_name = str(uuid.uuid4())
+
+ # define mp4 decrypt parameters
+ mp4_decrypt = [
+ f'{os.getcwd()}/binaries/mp4decrypt.exe',
+ f'{file}',
+ f'{os.getcwd()}/download/{file_name}',
+ ] + mp4decrypt_keys
+
+ # Run mp4decrypt
+ subprocess.run(mp4_decrypt)
+
+ # Append the file to the decrypted file list
+ decrypted_files.append(f'{os.getcwd()}/download/{file_name}')
+
+ # Declare a final mux variable
+ final_mux = str(uuid.uuid4())
+
+ # Define ffmpeg parameters
+ ffmpeg_merge = [
+ f"{os.getcwd()}/binaries/ffmpeg.exe",
+ '-i',
+ f"{decrypted_files[0]}",
+ '-i',
+ f"{decrypted_files[1]}",
+ '-vcodec',
+ 'copy',
+ '-acodec',
+ 'copy',
+ f"{os.getcwd()}/downloads/{final_mux}.mkv",
+ ]
+
+ # Run ffmpeg to merge the files
+ subprocess.run(ffmpeg_merge)
+
+ # Try to get a download name and return it
+ download_name = glob.glob(f'{os.getcwd()}/download/{final_mux}.*')
+ if download_name:
+ return download_name
+ else:
+ return f"Couldn't complete download!"
diff --git a/Helpers/mpd_parse.py b/Helpers/mpd_parse.py
new file mode 100644
index 0000000..b69a391
--- /dev/null
+++ b/Helpers/mpd_parse.py
@@ -0,0 +1,28 @@
+import requests
+import re
+
+
+# Define MPD / m3u8 PSSH parser
+def parse_pssh(manifest_url):
+ manifest = manifest_url
+ try:
+ response = requests.get(manifest)
+ except:
+ pssh = input("Couldn't retrieve manifest, please input PSSH: ")
+ return pssh
+ try:
+ matches = re.finditer(r'(.*))>(?P(.*))', response.text)
+ pssh_list = []
+
+ for match in matches:
+ if match.group and not match.group("pssh") in pssh_list and len(match.group("pssh")) < 300:
+ pssh_list.append(match.group("pssh"))
+
+ if len(pssh_list) < 1:
+ matches = re.finditer(r'URI="data:text/plain;base64,(?P(.*))"', response.text)
+ for match in matches:
+ if match.group("pssh") and match.group("pssh").upper().startswith("A") and len(match.group("pssh")) < 300:
+ pssh_list.append(match.group("pssh"))
+ return f'{pssh_list[0]}'
+ except:
+ return None
\ No newline at end of file
diff --git a/Helpers/wvd_check.py b/Helpers/wvd_check.py
new file mode 100644
index 0000000..61839fe
--- /dev/null
+++ b/Helpers/wvd_check.py
@@ -0,0 +1,21 @@
+# Import dependencies
+import os
+import glob
+
+
+# Define WVD device check
+def wvd_check():
+ try:
+ # Check to see if the WVDs folder exist, if not create it
+ if 'WVDs' not in os.listdir(fr'{os.getcwd()}'):
+ os.makedirs(f'{os.getcwd()}/WVDs')
+ # Use glob to get the name of the .wvd
+ extracted_device = glob.glob(f'{os.getcwd()}/WVDs/*.wvd')[0]
+ # Return the device path
+ return extracted_device
+ except:
+ # Check to see if the WVDs folder exist, if not create it
+ if 'WVDs' not in os.listdir(fr'{os.getcwd()}'):
+ os.makedirs(f'{os.getcwd()}/WVDs')
+ # Stop the program and print out instructions
+ return None
diff --git a/License_cURL.py b/License_cURL.py
index cebd5a8..e69de29 100644
--- a/License_cURL.py
+++ b/License_cURL.py
@@ -1,5 +0,0 @@
-headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0',
- 'Accept': '*/*',
- 'Accept-Language': 'en-US,en;q=0.5',
-}
\ No newline at end of file
diff --git a/README.md b/README.md
index 929adb7..fe5515c 100644
--- a/README.md
+++ b/README.md
@@ -1,21 +1,23 @@
# TPD-Keys
-#### Created by @TPD94, proxy function by Radziu
+#### Created by @TPD94
## Based on [pywidevine](https://cdm-project.com/Decryption-Tools/pywidevine "pywidevine")
How to use:
1. Create `TPD-Keys` folder.
-2. Download and extract `tpd-keys.py`, `requirements.txt` and `License_cURL.py` into the newly created `TPD-Keys` directory
+2. Download and extract `TPD-Keys.py`, `requirements.txt` and `License_curl.py` into the newly created `TPD-Keys` directory
3. Install the requirements with `pip install -r requirements.txt`
4. Crete a WVD with pywidevine; `pywidevine create-device -k "/PATH/TO/device_private_key" -c "/PATH/TO/device_client_id_blob" -t "ANDROID" -l 3`
-5. Place your .wvd in the root of `TPD-Keys` directory
+5. Place your .wvd in `/WVDs` directory, if you do not have this directory, create it or run the program with `python TPD-Keys.py` and it will be created for you
-6. Paste any needed headers into `License_cURL.py`
+6. Place your API key (if wanted) in `/Config/api-key.txt` if you do not have this file or directory, create it or run the program with `python TPD-Keys.py` and it will be created for you. If you don't have an API key, you can request one via [discord](https://discord.gg/cdrm-project "CDRM-Project")
-7. Run with `python tpd-keys.py`
+7. Paste dictionaries from license request curl post request into `License_curl.py`
-8. Make a selection
+8. Run with `python tpd-keys.py`
+
+To view additional options you can use `python tpd-keys.py -h`
\ No newline at end of file
diff --git a/Sites/Crunchyroll.py b/Sites/Crunchyroll.py
new file mode 100644
index 0000000..96dec30
--- /dev/null
+++ b/Sites/Crunchyroll.py
@@ -0,0 +1,201 @@
+# Import dependencies
+
+from pywidevine import PSSH
+from pywidevine import Cdm
+from pywidevine import Device
+import requests
+import base64
+import os
+import Helpers
+
+
+# Defining decrypt function for Crunchyroll
+def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_url: str = None):
+
+ # Exit if no device
+ if wvd is None:
+ exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
+
+ # Try getting pssh via MPD URL if web-dl
+ if mpd_url is not None:
+ input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
+ if input_pssh is not None:
+ print(f'\nPSSH found: {input_pssh}')
+ else:
+ input_pssh = input(f"\nPSSH not found! Input PSSH: ")
+
+ # Ask for PSSH if just keys function
+ if mpd_url is None:
+ # Ask for PSSH if web-dl not selected:
+ input_pssh = input(f"\nPSSH: ")
+
+ # prepare pssh
+ pssh = PSSH(input_pssh)
+
+ # load device
+ device = Device.load(wvd)
+
+ # load CDM from device
+ cdm = Cdm.from_device(device)
+
+ # open CDM session
+ session_id = cdm.open()
+
+ # get service certificate
+ service_cert = requests.post(
+ url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
+ data=cdm.service_certificate_challenge,
+ headers=license_curl_headers
+ )
+ if service_cert.status_code != 200:
+ print("Couldn't retrieve service cert")
+ else:
+ service_cert = service_cert.json()["license"]
+ cdm.set_service_certificate(session_id, service_cert)
+
+ # generate license challenge
+ if service_cert:
+ challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
+ else:
+ challenge = cdm.get_license_challenge(session_id, pssh)
+
+ # send license challenge
+ license = requests.post(
+ url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
+ data=challenge,
+ headers=license_curl_headers
+ )
+
+ if license.status_code != 200:
+ print(license.content)
+ exit("Could not complete license challenge")
+
+ # Extract license from json dict
+ license = license.json()["license"]
+
+ # parse license challenge
+ cdm.parse_license(session_id, license)
+
+ # assign variable for returned keys
+ returned_keys = ""
+ for key in cdm.get_keys(session_id):
+ if key.type != "SIGNING":
+ returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
+
+ # assign variable for mp4decrypt keys
+ mp4decrypt_keys = []
+ for key in cdm.get_keys(session_id):
+ if key.type != "SIGNING":
+ mp4decrypt_keys.append('--key')
+ mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
+
+ # close session, disposes of session data
+ cdm.close(session_id)
+
+ # Cache the keys
+ Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
+
+ # Print out the keys
+ print(f'\nKeys:\n{returned_keys}')
+
+ # Return the keys for future ripper use.
+ return mp4decrypt_keys
+
+
+# Defining remote decrypt function for Crunchyroll
+def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None):
+
+ # Exit if no device
+ if api_key is None:
+ exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
+
+ # Set CDM Project API URL
+ api_url = "https://api.cdm-project.com"
+
+ # Set API device
+ api_device = "CDM"
+
+ # Try getting pssh via MPD URL if web-dl
+ if mpd_url is not None:
+ input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
+ if input_pssh is not None:
+ print(f'\nPSSH found: {input_pssh}')
+ else:
+ input_pssh = input(f"\nPSSH not found! Input PSSH: ")
+
+ # Ask for PSSH if just keys function
+ if mpd_url is None:
+ # Ask for PSSH if web-dl not selected:
+ input_pssh = input(f"\nPSSH: ")
+
+ # Set headers for API key
+ api_key_headers = {
+ "X-Secret-Key": api_key
+ }
+
+ # Open CDM session
+ open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
+
+ # Get the session ID from the open CDM session
+ session_id = open_session.json()["data"]["session_id"]
+
+ # Set JSON required to generate a license challenge
+ generate_challenge_json = {
+ "session_id": session_id,
+ "init_data": input_pssh
+ }
+
+ # Generate the license challenge
+ generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
+
+ # Retrieve the challenge and base64 decode it
+ challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
+
+ # Send the challenge to the widevine license server
+ license = requests.post(
+ url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
+ headers=license_curl_headers,
+ data=challenge
+ )
+
+ # Retrieve the license message
+ license = license.json()["license"]
+
+ # Set JSON required to parse license message
+ license_message_json = {
+ "session_id": session_id,
+ "license_message": license
+ }
+
+ # Parse the license
+ requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
+
+ # Retrieve the keys
+ get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
+ json={"session_id": session_id},
+ headers=api_key_headers)
+
+ # Iterate through the keys, ignoring signing key
+ returned_keys = ''
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ returned_keys += f"{key['key_id']}:{key['key']}\n"
+
+ # assign variable for mp4decrypt keys
+ mp4decrypt_keys = []
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ mp4decrypt_keys.append('--key')
+ mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
+
+ # Cache the keys
+ Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
+
+ # Print out keys
+ print(f'\nKeys:\n{returned_keys}')
+
+ # Close session
+ requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
+
+ # return mp4decrypt keys
+ return mp4decrypt_keys
diff --git a/Sites/Generic.py b/Sites/Generic.py
new file mode 100644
index 0000000..db0e958
--- /dev/null
+++ b/Sites/Generic.py
@@ -0,0 +1,207 @@
+# Import dependencies
+
+from pywidevine import PSSH
+from pywidevine import Cdm
+from pywidevine import Device
+import requests
+import base64
+import os
+import Helpers
+
+
+# Defining decrypt function for generic services
+def decrypt_generic(wvd: str = None, license_curl_headers: dict = None, mpd_url: str = None):
+
+ # Exit if no device
+ if wvd is None:
+ exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
+
+ # Try getting pssh via MPD URL if web-dl
+ if mpd_url is not None:
+ input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
+ if input_pssh is not None:
+ print(f'\nPSSH found: {input_pssh}')
+ else:
+ input_pssh = input(f"\nPSSH not found! Input PSSH: ")
+
+ # Ask for PSSH if just keys function
+ if mpd_url is None:
+ # Ask for PSSH if web-dl not selected:
+ input_pssh = input(f"\nPSSH: ")
+
+
+ # prepare pssh
+ pssh = PSSH(input_pssh)
+
+ # Ask for license URL
+ license_url = input(f"\nLicense URL: ")
+
+ # load device
+ device = Device.load(wvd)
+
+ # load CDM from device
+ cdm = Cdm.from_device(device)
+
+ # open CDM session
+ session_id = cdm.open()
+
+ # get service certificate
+ service_cert = requests.post(
+ url=license_url,
+ data=cdm.service_certificate_challenge,
+ headers=license_curl_headers
+ )
+ if service_cert.status_code != 200:
+ print("Couldn't retrieve service cert")
+ else:
+ service_cert = service_cert.content
+ cdm.set_service_certificate(session_id, service_cert)
+
+ # generate license challenge
+ if service_cert:
+ challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
+ else:
+ challenge = cdm.get_license_challenge(session_id, pssh)
+
+ # send license challenge
+ license = requests.post(
+ url=license_url,
+ data=challenge,
+ headers=license_curl_headers
+ )
+
+ if license.status_code != 200:
+ print(license.content)
+ exit("Could not complete license challenge")
+
+ # Extract license from json dict
+ license = license.content
+
+ # parse license challenge
+ cdm.parse_license(session_id, license)
+
+ # assign variable for returned keys
+ returned_keys = ""
+ for key in cdm.get_keys(session_id):
+ if key.type != "SIGNING":
+ returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
+
+ # assign variable for mp4decrypt keys
+ mp4decrypt_keys = []
+ for key in cdm.get_keys(session_id):
+ if key.type != "SIGNING":
+ mp4decrypt_keys.append('--key')
+ mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
+
+ # close session, disposes of session data
+ cdm.close(session_id)
+
+ # Cache the keys
+ Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
+
+ # Print out the keys
+ print(f'\nKeys:\n{returned_keys}')
+
+ # Return the keys for future ripper use.
+ return mp4decrypt_keys
+
+# Defining remote decrypt function for generic services
+def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None):
+
+ # Exit if no API key
+ if api_key is None:
+ exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
+
+ # Set CDM Project API URL
+ api_url = "https://api.cdm-project.com"
+
+ # Set API device
+ api_device = "CDM"
+
+ # Try getting pssh via MPD URL if web-dl
+ if mpd_url is not None:
+ input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
+ if input_pssh is not None:
+ print(f'\nPSSH found: {input_pssh}')
+ else:
+ input_pssh = input(f"\nPSSH not found! Input PSSH: ")
+
+ # Ask for PSSH if just keys function
+ if mpd_url is None:
+ # Ask for PSSH if web-dl not selected:
+ input_pssh = input(f"\nPSSH: ")
+
+ # Ask for license URL
+ input_license_url = input(f"\nLicense URL: ")
+
+ # Set headers for API key
+ api_key_headers = {
+ "X-Secret-Key": api_key
+ }
+
+ # Open CDM session
+ open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
+
+ # Get the session ID from the open CDM session
+ session_id = open_session.json()["data"]["session_id"]
+
+ # Set JSON required to generate a license challenge
+ generate_challenge_json = {
+ "session_id": session_id,
+ "init_data": input_pssh
+ }
+
+ # Generate the license challenge
+ generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
+
+ # Retrieve the challenge and base64 decode it
+ challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
+
+ # Send the challenge to the widevine license server
+ license = requests.post(
+ url=input_license_url,
+ headers=license_curl_headers,
+ data=challenge
+ )
+
+ # Retrieve the license message
+ license = base64.b64encode(license.content).decode()
+
+ # Set JSON required to parse license message
+ license_message_json = {
+ "session_id": session_id,
+ "license_message": license
+ }
+
+ # Parse the license
+ requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
+
+ # Retrieve the keys
+ get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
+ json={"session_id": session_id},
+ headers=api_key_headers)
+
+ # Iterate through the keys, ignoring signing key
+ returned_keys = ''
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ returned_keys += f"{key['key_id']}:{key['key']}\n"
+
+ # assign variable for mp4decrypt keys
+ mp4decrypt_keys = []
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ mp4decrypt_keys.append('--key')
+ mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
+
+ # Cache the keys
+ Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
+
+ # Print out keys
+ print(f'\nKeys:\n{returned_keys}')
+
+ # Close session
+ requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
+
+ # Return mp4decrypt keys
+ return mp4decrypt_keys
diff --git a/Sites/YouTube.py b/Sites/YouTube.py
new file mode 100644
index 0000000..5c62e88
--- /dev/null
+++ b/Sites/YouTube.py
@@ -0,0 +1,180 @@
+# Import dependencies
+
+from pywidevine import PSSH
+from pywidevine import Cdm
+from pywidevine import Device
+import requests
+import base64
+import os
+import Helpers
+
+
+# Defining decrypt function for YouTube
+def decrypt_youtube(wvd: str = None, license_curl_headers: dict = None, license_curl_cookies: dict = None, license_curl_json: dict = None):
+
+ # Exit if no device
+ if wvd is None:
+ exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
+
+ # prepare pssh
+ pssh = PSSH("AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY=")
+
+ # Ask for the license URL
+ license_url = input("License URL: ")
+
+ # Print a new line between PSSH/License URL and keys
+ print("\n")
+
+ # load device
+ device = Device.load(wvd)
+
+ # load CDM from device
+ cdm = Cdm.from_device(device)
+
+ # open CDM session
+ session_id = cdm.open()
+
+ # Generate challenge
+ challenge = cdm.get_license_challenge(session_id, pssh)
+
+ # Insert the challenge into the JSON data
+ license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode()
+
+ # send license challenge
+ license = requests.post(
+ url=license_url,
+ headers=license_curl_headers,
+ cookies=license_curl_cookies,
+ json=license_curl_json
+ )
+
+ if license.status_code != 200:
+ print(license.content)
+ exit("Could not complete license challenge")
+
+ # Extract license from json dict
+ licence = license.json()["license"].replace("-", "+").replace("_", "/")
+
+ # parse license challenge
+ cdm.parse_license(session_id, licence)
+
+ # assign variable for mp4decrypt keys
+ mp4decrypt_keys = []
+ for key in cdm.get_keys(session_id):
+ if key.type != "SIGNING":
+ mp4decrypt_keys.append('--key')
+ mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
+
+ # assign variable for returned keys
+ returned_keys = ""
+ for key in cdm.get_keys(session_id):
+ if key.type != "SIGNING":
+ returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
+
+ # close session, disposes of session data
+ cdm.close(session_id)
+
+ # Cache the keys
+ Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys)
+
+ # Print out the keys
+ print(f'Keys:\n{returned_keys}')
+
+ # Return the keys for future ripper use.
+ return mp4decrypt_keys
+
+
+# Defining remote decrypt function for YouTube
+def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, license_curl_cookies: dict = None):
+
+ # Exit if no API key
+ if api_key is None:
+ exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
+
+ # Set CDM Project API URL
+ api_url = "https://api.cdm-project.com"
+
+ # Set API device
+ api_device = "CDM"
+
+ # Ask for License URL
+ input_license_url = input("License URL: ")
+
+ # Print a line between license URL and keys
+ print("\n")
+
+ # Set headers for API key
+ api_key_headers = {
+ "X-Secret-Key": api_key
+ }
+
+ # Open CDM session
+ open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
+
+ # Get the session ID from the open CDM session
+ session_id = open_session.json()["data"]["session_id"]
+
+ # Set JSON required to generate a license challenge
+ generate_challenge_json = {
+ "session_id": session_id,
+ "init_data": "AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY="
+ }
+
+ # Generate the license challenge
+ generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
+
+ # Retrieve the challenge and base64 decode it
+ challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
+
+ # Insert the challenge into the JSON data
+ license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode()
+
+ # Send the challenge to the widevine license server
+ license = requests.post(
+ url=input_license_url,
+ headers=license_curl_headers,
+ json=license_curl_json,
+ cookies=license_curl_cookies
+ )
+
+ # Retrieve the license message
+ license = license.json()["license"].replace("-", "+").replace("_", "/")
+
+ # Set JSON required to parse license message
+ license_message_json = {
+ "session_id": session_id,
+ "license_message": license
+ }
+
+ # Parse the license
+ requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
+
+ # Retrieve the keys
+ get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
+ json={"session_id": session_id},
+ headers=api_key_headers)
+
+ # assign variable for mp4decrypt keys
+ mp4decrypt_keys = []
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ mp4decrypt_keys.append('--key')
+ mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
+
+ # Iterate through the keys, ignoring signing key
+ returned_keys = ''
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ returned_keys += f"{key['key_id']}:{key['key']}\n"
+
+ # Cache the keys
+ Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys)
+
+ # Print out keys
+ print(f'Keys:\n{returned_keys}')
+
+ # Close session
+ requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
+
+ # Return mp4decrypt keys
+ return mp4decrypt_keys
diff --git a/Sites/__init__.py b/Sites/__init__.py
new file mode 100644
index 0000000..c216588
--- /dev/null
+++ b/Sites/__init__.py
@@ -0,0 +1,3 @@
+from . import Crunchyroll
+from . import Generic
+from . import YouTube
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 70e8ecad56b927056b9096493d70b4aabfd49085..9931c2c1519419c4188facb0f1f83f0c45282e95 100644
GIT binary patch
literal 134
zcmYk#Jqmy@42I!%7QBl1@8n?wMI01utK#C()ucmlNIpoOkoPeRZ0yo4PU@y5C!rvd
oo)QzWa&a=zi#jA1Np&ikpgIhrSWWvAWe`q2_00ZOr!|$|eWx@QEC2ui
literal 41
wcmXS@EYD0yEz8VH None:
- self.randomProxy = randomProxy
- self.userCountry = userCountry
- self.ccgi_url = "https://client.hola.org/client_cgi/"
- self.ext_ver = self.get_ext_ver()
- self.ext_browser = "chrome"
- self.user_uuid = uuid.uuid4().hex
- self.user_agent = "Mozilla/5.0 (X11; Fedora; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36"
- self.product = "cws"
- self.port_type_choice: str
- self.zoneAvailable = ["AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
- "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
- "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"]
-
- def get_ext_ver(self) -> str:
- about = httpx.get("https://hola.org/access/my/settings#/about").text
- if 'window.pub_config.init({"ver":"' in about:
- version = about.split('window.pub_config.init({"ver":"')[1].split('"')[0]
- return version
-
- # last know working version
- return "1.199.485"
-
-
-class Engine:
- def __init__(self, Settings) -> None:
- self.settings = Settings
-
- def get_proxy(self, tunnels, tls=False) -> str:
- login = f"user-uuid-{self.settings.user_uuid}"
- proxies = dict(tunnels)
- protocol = "https" if tls else "http"
- for k, v in proxies["ip_list"].items():
- return "%s://%s:%s@%s:%d" % (
- protocol,
- login,
- proxies["agent_key"],
- k if tls else v,
- proxies["port"][self.settings.port_type_choice],
- )
-
- def generate_session_key(self, timeout: float = 10.0) -> json:
- post_data = {"login": "1", "ver": self.settings.ext_ver}
- return httpx.post(
- f"{self.settings.ccgi_url}background_init?uuid={self.settings.user_uuid}",
- json=post_data,
- headers={"User-Agent": self.settings.user_agent},
- timeout=timeout,
- ).json()["key"]
-
- def zgettunnels(
- self, session_key: str, country: str, timeout: float = 10.0
- ) -> json:
- qs = {
- "country": country.lower(),
- "limit": 1,
- "ping_id": random.random(),
- "ext_ver": self.settings.ext_ver,
- "browser": self.settings.ext_browser,
- "uuid": self.settings.user_uuid,
- "session_key": session_key,
- }
-
- return httpx.post(
- f"{self.settings.ccgi_url}zgettunnels", params=qs, timeout=timeout
- ).json()
-
-
-class Hola:
- def __init__(self, Settings) -> None:
- self.myipUri: str = "https://hola.org/myip.json"
- self.settings = Settings
-
- def get_country(self) -> str:
-
- if not self.settings.randomProxy and not self.settings.userCountry:
- self.settings.userCountry = httpx.get(self.myipUri).json()["country"]
-
- if (
- not self.settings.userCountry in self.settings.zoneAvailable
- or self.settings.randomProxy
- ):
- self.settings.userCountry = random.choice(self.settings.zoneAvailable)
-
- return self.settings.userCountry
-
-
-def init_proxy(data):
- settings = Settings(
- data["zone"]
- )
- settings.port_type_choice = data[
- "port"
- ]
-
- hola = Hola(settings)
- engine = Engine(settings)
-
- userCountry = hola.get_country()
- session_key = engine.generate_session_key()
- # time.sleep(10)
- tunnels = engine.zgettunnels(session_key, userCountry)
-
- return engine.get_proxy(tunnels)
-
-
-# Get current working directory
-main_directory = os.getcwd()
-
-# Check if API key exists, if not create file
-if not os.path.isfile(f"{main_directory}/api-key.txt"):
- with open(f'{main_directory}/api-key.txt', 'w') as api_key:
- print(f"\nIf you have an API key please place it in api-key.txt")
- api_key.write("Delete this and place your API key on this line")
-
-# Check if API key exists
-with open(f'{main_directory}/api-key.txt') as api_key:
- api_key = api_key.readline()
- if api_key == "Delete this and place your API key on this line":
- print(f"\nNo API key found!\n")
- api_key = ""
-
-# Create database and table for local key caching if they don't exist
-if not os.path.isfile(f"{main_directory}/database.db"):
- dbconnection = sqlite3.connect("database.db")
- dbcursor = dbconnection.cursor()
- dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
- dbconnection.close()
-
-
-# Define key cache function
-
-
-def key_cache(pssh: str, db_keys: str):
- dbconnection = sqlite3.connect("database.db")
- dbcursor = dbconnection.cursor()
- dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, db_keys))
- dbconnection.commit()
- dbconnection.close()
-
-
-# Making sure a .wvd file exists and using that as the CDM
-try:
- cdm = glob.glob(f'{main_directory}/*.wvd')[0]
-except:
- cdm = None
- print(f"Please place a WVD in {main_directory}")
- print(f"Use option 3 of TPD-Keys and set API key if you do not have your own.")
-
-
-# Define key retrieval function
-def retrieve_keys(proxy_used: str = None, headers: list = None,
- json_data: json = None, device: str = cdm):
- pssh = input("PSSH: ")
- licence_url = input("License URL: ")
- if proxy_used is not None:
- proxy = init_proxy({"zone": proxy_used, "port": "peer"})
- proxies = {
- "http": proxy
- }
+# Based on the selected switch within the mutually exclusive group, perform actions
+if switches.crunchyroll:
+ # Perform action for --crunchyroll
+ if switches.web_dl:
+ mpd = input("MPD URL: ")
+ file = Helpers.download.web_dl_crunchyroll(mpd=mpd, device=device)
+ print(f'Saved at {file[0]}')
else:
- proxies = None
- challenge_pssh = PSSH(pssh)
- try:
- device = Device.load(device)
- except:
- print(f"Please place a WVD in {main_directory}")
- exit()
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, challenge_pssh)
- license = requests.post(licence_url, data=challenge, proxies=proxies, headers=headers, json=json_data)
- license.raise_for_status()
- cdm.parse_license(session_id, license.content)
- db_keys = ''
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- db_keys += f'{key.kid.hex}:{key.key.hex()}\n'
- key_cache(pssh=pssh, db_keys=db_keys)
- return db_keys
-
-# Define retrieve keys remotely function
+ Sites.Crunchyroll.decrypt_crunchyroll(wvd=device, license_curl_headers=license_curl.headers)
-def retrieve_keys_remotely(proxy_used: str = None):
- api_url = "https://api.cdrm-project.com"
- api_device = "CDM"
- pssh = input("PSSH: ")
- license_url = input("License URL: ")
- if proxy_used is not None:
- proxy = init_proxy({"zone": proxy_used, "port": "peer"})
- proxies = {
- "http": proxy
- }
+elif switches.crunchyroll_remote:
+ # Perform action for --crunchyroll-remote
+ if switches.web_dl:
+ mpd = input("MPD URL: ")
+ file = Helpers.download.web_dl_crunchyroll(mpd=mpd, api_key=api_key, remote=True)
+ print(f'Saved at {file[0]}')
else:
- proxies = None
- x_headers = {
- "X-Secret-Key": api_key
- }
- open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
+ Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
- session_id = open_session.json()["data"]["session_id"]
-
- license_challenge_json_data = {
- "session_id": session_id,
- "init_data": pssh
- }
-
- licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
- json=license_challenge_json_data)
-
- license_message = licence_challenge.json()["data"]["challenge_b64"]
-
- license = requests.post(
- headers=License_cURL.headers,
- proxies=proxies,
- url=license_url,
- data=base64.b64decode(license_message)
- )
-
- parse_license_json_data = {
- "session_id": session_id,
- "license_message": f"{base64.b64encode(license.content).decode()}"
- }
-
- requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
- headers=x_headers)
-
- get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
- json={"session_id": session_id}, headers=x_headers)
- db_keys = ''
- for key in get_keys.json()["data"]["keys"]:
- if not key["type"] == "SIGNING":
- db_keys += f"{key['key_id']}:{key['key']}\n"
- key_cache(pssh=pssh, db_keys=db_keys)
-
- requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
-
- return db_keys
-
-
-# Define retrieve keys remotely VDOCipher function
-def retrieve_keys_remotely_vdocipher(proxy_used: str = None):
-
- # Get URL from function
- url = input(f"Video URL: ")
-
- # Set the VDOCipher token headers
- token_headers = {
- 'accept': '*/*',
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
- ## Comment this line out if using for anything other than https://www.vdocipher.com/blog/2014/12/add-text-to-videos-with-watermark/
- 'Origin': f"https://{urandom(8).hex()}.com",
- }
-
- # Set the token response
- token_response = requests.get(url, cookies=License_cURL.cookies, headers=token_headers)
- try:
- otp_match = re.findall(r"otp: '(.*)',", token_response.text)[0]
- playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", token_response.text)[0]
- except IndexError:
- try:
- otp_match = re.findall(r"otp=(.*)&", token_response.text)[0]
- playbackinfo_match = re.findall(r"playbackInfo=(.*)", token_response.text)[0]
- except IndexError:
- print("\nAn error occured while getting otp/playback")
- exit()
-
- # Set the video ID
- video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"]
-
- # Set new token response (1)
- token_response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=token_headers)
- try:
- license_url = token_response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
- mpd = token_response.json()["dash"]["manifest"]
- except KeyError:
- print("\n An error occured while getting mpd/license url")
-
- # Set new token response (2)
- token_response = requests.get(mpd, headers=token_headers)
-
- # Set API URL
- api_url = "https://api.cdrm-project.com"
-
- # Set API Device
- api_device = "CDM"
-
- # Retrieve PSSH
- pssh = re.search(r"(.*)", token_response.text).group(1)
-
- # Check if proxy was used
- if proxy_used is not None:
- proxy = init_proxy({"zone": proxy_used, "port": "peer"})
- proxies = {
- "http": proxy
- }
+elif switches.youtube:
+ # Perform action for --YouTube
+ if switches.web_dl:
+ url = input("YouTube URL: ")
+ file = Helpers.download.youtube_dlp(url=url, device=device)
+ print(f'Saved at {file}')
else:
- proxies = None
-
- # Set API headers
- x_headers = {
- "X-Secret-Key": api_key
- }
-
- # Open API session
- open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
-
- # Set the session ID
- session_id = open_session.json()["data"]["session_id"]
-
- # Send json data to get license challenge
- license_challenge_json_data = {
- "session_id": session_id,
- "init_data": pssh
- }
-
- # Get the license challenge from PSSH
- licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
- json=license_challenge_json_data)
-
- # Set the final token
- token = {
- "otp":otp_match,
- "playbackInfo":playbackinfo_match,
- "href":url,
- "tech":"wv",
- "licenseRequest":licence_challenge.json()["data"]["challenge_b64"]
- }
-
- # Send challenge
- license = requests.post(
- proxies=proxies,
- url=license_url,
- json={'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}'}
- )
-
- # Set the parsing JSON data
- parse_license_json_data = {
- "session_id": session_id,
- "license_message": license.json()["license"]
- }
-
- # Send the parsing JSON data
- requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
- headers=x_headers)
-
- # Get the keys
- get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
- json={"session_id": session_id}, headers=x_headers)
-
- # Cache the keys
- db_keys = ''
- for key in get_keys.json()["data"]["keys"]:
- if not key["type"] == "SIGNING":
- db_keys += f"{key['key_id']}:{key['key']}\n"
- key_cache(pssh=pssh, db_keys=db_keys)
-
- # Close the session
- requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
-
- # Return the keys
- return db_keys
-
-# Defining service prompt function
+ Sites.YouTube.decrypt_youtube(wvd=device, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
-def service_prompt():
- service_prompt = [
- inquirer.List('Service',
- message="Please choose a service",
- choices=['Generic', 'Generic with headers from Licence cURL', 'Remote', 'Remote VDOCipher'],
- ),
- ]
- service_selected = inquirer.prompt(service_prompt)
-
- proxy_needed_prompt = [
- inquirer.List('Proxy',
- message="Will you need a proxy?",
- choices=['Yes', 'No'],
- ),
- ]
-
- proxy_needed = inquirer.prompt(proxy_needed_prompt)
- if proxy_needed["Proxy"] == "Yes":
- allowed_countries = [
- "AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
- "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
- "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"
- ]
- proxy_available = [
- inquirer.List('Proxys available',
- message="Please choose a country",
- choices=allowed_countries
- ),
- ]
- selected_proxy = inquirer.prompt(proxy_available)
- return service_selected["Service"], selected_proxy["Proxys available"]
+elif switches.youtube_remote:
+ # Perform action for --youtube-remote
+ if switches.web_dl:
+ url = input("YouTube URL: ")
+ file = Helpers.download.youtube_dlp(url=url, api_key=api_key, remote=True)
+ print(f'Saved at {file}')
else:
- selected_proxy = None
- return service_selected["Service"], selected_proxy
+ Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
-# Define variables for the service and proxy wanted
+elif switches.generic_remote:
+ # Perform action for --generic-remote
+ if switches.web_dl:
+ mpd = input("MPD URL: ")
+ file = Helpers.download.web_dl_generic(mpd=mpd, api_key=api_key, remote=True)
+ print(f'Saved at {file[0]}')
+ else:
+ Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
-service_selected, selected_proxy = service_prompt()
-
-
-if service_selected == "Generic":
- print(f"\n{retrieve_keys(proxy_used=selected_proxy)}")
-elif service_selected == "Generic with headers from Licence cURL":
- print(f"\n{retrieve_keys(proxy_used=selected_proxy, headers=License_cURL.headers)}")
-elif service_selected == "Remote":
- print(f"\n{retrieve_keys_remotely(proxy_used=selected_proxy)}")
-elif service_selected == "Remote VDOCipher":
- print(f"\n{retrieve_keys_remotely_vdocipher(proxy_used=selected_proxy)}")
-
+else:
+ # If no switch is provided, perform a default action
+ if switches.web_dl:
+ mpd = input("MPD URL: ")
+ file = Helpers.download.web_dl_generic(mpd=mpd, device=device)
+ print(f'Saved at {file[0]}')
+ else:
+ Sites.Generic.decrypt_generic(wvd=device, license_curl_headers=license_curl.headers)