From a39e10c1dbc8ecb93926d91854875c0523260d84 Mon Sep 17 00:00:00 2001 From: TPD94 <> Date: Mon, 4 Dec 2023 04:46:41 -0500 Subject: [PATCH] TPD-Keys 2.0 Release Cleaned up code, added remote options for all services, improved modularity --- .gitignore | 6 + Helpers/__init__.py | 5 + Helpers/api_check.py | 21 ++ Helpers/cache_key.py | 13 + Helpers/capability_check.py | 27 ++ Helpers/database_check.py | 19 ++ Helpers/wvd_check.py | 21 ++ README.md | 14 +- Sites/Crunchyroll.py | 157 ++++++++++++ Sites/Generic.py | 158 ++++++++++++ Sites/YouTube.py | 149 +++++++++++ Sites/__init__.py | 3 + requirements.txt | Bin 41 -> 106 bytes tpd-keys.py | 490 ++++-------------------------------- 14 files changed, 634 insertions(+), 449 deletions(-) create mode 100644 .gitignore create mode 100644 Helpers/__init__.py create mode 100644 Helpers/api_check.py create mode 100644 Helpers/cache_key.py create mode 100644 Helpers/capability_check.py create mode 100644 Helpers/database_check.py create mode 100644 Helpers/wvd_check.py create mode 100644 Sites/Crunchyroll.py create mode 100644 Sites/Generic.py create mode 100644 Sites/YouTube.py create mode 100644 Sites/__init__.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d03fac0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +WVDs/ +venv/ +keys/ +Config/ +__pycache__/ +*.pyc \ No newline at end of file diff --git a/Helpers/__init__.py b/Helpers/__init__.py new file mode 100644 index 0000000..7dac765 --- /dev/null +++ b/Helpers/__init__.py @@ -0,0 +1,5 @@ +from . import wvd_check +from . import api_check +from . import capability_check +from . import database_check +from . import cache_key \ No newline at end of file diff --git a/Helpers/api_check.py b/Helpers/api_check.py new file mode 100644 index 0000000..184f9dc --- /dev/null +++ b/Helpers/api_check.py @@ -0,0 +1,21 @@ +# Import dependencies +import os + + +# Define api key check +def api_check(): + # Create Config directory if it doesn't exist + if 'Config' not in os.listdir(fr'{os.getcwd()}'): + os.makedirs(f'{os.getcwd()}/Config') + # Create api-key.txt if it doesn't exist + if not os.path.isfile(f'{os.getcwd()}/Config/api-key.txt'): + with open(f'{os.getcwd()}/Config/api-key.txt', 'w') as api_key_text: + api_key_text.write("Place your API key on this line") + return "First run" + # Grab API Key from the text file + with open(f'{os.getcwd()}/Config/api-key.txt') as API_Key_Text: + api_key = API_Key_Text.readline() + if api_key != "Place your API key on this line": + return api_key + else: + return None diff --git a/Helpers/cache_key.py b/Helpers/cache_key.py new file mode 100644 index 0000000..9eba312 --- /dev/null +++ b/Helpers/cache_key.py @@ -0,0 +1,13 @@ +# Import dependencies + +import sqlite3 +import os + + +# Define cache function +def cache_keys(pssh: str, keys: str): + dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db") + dbcursor = dbconnection.cursor() + dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, keys)) + dbconnection.commit() + dbconnection.close() \ No newline at end of file diff --git a/Helpers/capability_check.py b/Helpers/capability_check.py new file mode 100644 index 0000000..c5c663c --- /dev/null +++ b/Helpers/capability_check.py @@ -0,0 +1,27 @@ +# Import dependencies +import Helpers +import os + + +def capability_check(): + # Check for .WVD and API Key, exit program if neither exist. + Device = Helpers.wvd_check.wvd_check() + if Device is None: + API_Key = Helpers.api_check.api_check() + if API_Key == "First run" or API_Key == None: + exit(f"No CDM or API key found, please place a CDM in {os.getcwd()}/WVDs or an API key in {os.getcwd()}/Config/api-key.txt") + else: + print("No local device found, remote decryption only.") + print(f'Using API Key: {API_Key}\n') + return None, API_Key + elif Device is not None: + API_Key = Helpers.api_check.api_check() + if API_Key == "First run" or API_Key == None: + print("No API key found, local decryption only.") + print(f'Using device at {Device}\n') + return Device, None + else: + print(f'Local and remote decryption available.') + print(f'Using device at {Device}') + print(f'Using API Key: {API_Key}\n') + return Device, API_Key diff --git a/Helpers/database_check.py b/Helpers/database_check.py new file mode 100644 index 0000000..d50f4c9 --- /dev/null +++ b/Helpers/database_check.py @@ -0,0 +1,19 @@ +# Import dependencies + +import os +import sqlite3 + + +# Check to see if the database already exists, if not create a keys folder, and create the database. +def database_check(): + # Check to see if the "keys" directory exists, if not creates it + if "keys" not in os.listdir(os.getcwd()): + os.makedirs('keys') + + # Check to see if a database exists in keys directory, if not create it + if not os.path.isfile(f"{os.getcwd()}/keys/database.db"): + print(f"Creating database.\n") + dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db") + dbcursor = dbconnection.cursor() + dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )') + dbconnection.close() \ No newline at end of file diff --git a/Helpers/wvd_check.py b/Helpers/wvd_check.py new file mode 100644 index 0000000..61839fe --- /dev/null +++ b/Helpers/wvd_check.py @@ -0,0 +1,21 @@ +# Import dependencies +import os +import glob + + +# Define WVD device check +def wvd_check(): + try: + # Check to see if the WVDs folder exist, if not create it + if 'WVDs' not in os.listdir(fr'{os.getcwd()}'): + os.makedirs(f'{os.getcwd()}/WVDs') + # Use glob to get the name of the .wvd + extracted_device = glob.glob(f'{os.getcwd()}/WVDs/*.wvd')[0] + # Return the device path + return extracted_device + except: + # Check to see if the WVDs folder exist, if not create it + if 'WVDs' not in os.listdir(fr'{os.getcwd()}'): + os.makedirs(f'{os.getcwd()}/WVDs') + # Stop the program and print out instructions + return None diff --git a/README.md b/README.md index 929adb7..fe5515c 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,23 @@ # TPD-Keys -#### Created by @TPD94, proxy function by Radziu +#### Created by @TPD94 ## Based on [pywidevine](https://cdm-project.com/Decryption-Tools/pywidevine "pywidevine") How to use: 1. Create `TPD-Keys` folder. -2. Download and extract `tpd-keys.py`, `requirements.txt` and `License_cURL.py` into the newly created `TPD-Keys` directory +2. Download and extract `TPD-Keys.py`, `requirements.txt` and `License_curl.py` into the newly created `TPD-Keys` directory 3. Install the requirements with `pip install -r requirements.txt` 4. Crete a WVD with pywidevine; `pywidevine create-device -k "/PATH/TO/device_private_key" -c "/PATH/TO/device_client_id_blob" -t "ANDROID" -l 3` -5. Place your .wvd in the root of `TPD-Keys` directory +5. Place your .wvd in `/WVDs` directory, if you do not have this directory, create it or run the program with `python TPD-Keys.py` and it will be created for you -6. Paste any needed headers into `License_cURL.py` +6. Place your API key (if wanted) in `/Config/api-key.txt` if you do not have this file or directory, create it or run the program with `python TPD-Keys.py` and it will be created for you. If you don't have an API key, you can request one via [discord](https://discord.gg/cdrm-project "CDRM-Project") -7. Run with `python tpd-keys.py` +7. Paste dictionaries from license request curl post request into `License_curl.py` -8. Make a selection +8. Run with `python tpd-keys.py` + +To view additional options you can use `python tpd-keys.py -h` \ No newline at end of file diff --git a/Sites/Crunchyroll.py b/Sites/Crunchyroll.py new file mode 100644 index 0000000..2b1e99c --- /dev/null +++ b/Sites/Crunchyroll.py @@ -0,0 +1,157 @@ +# Import dependencies +import base64 + +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import Helpers.cache_key +import requests + + +# Defining decrypt function for Crunchyroll +def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None): + + # Ask for PSSH + input_pssh = input("PSSH: ") + print("\n") + + # prepare pssh + pssh = PSSH(input_pssh) + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + # get service certificate + service_cert = requests.post( + url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine", + data=cdm.service_certificate_challenge, + headers=license_curl_headers + ) + if service_cert.status_code != 200: + print("Couldn't retrieve service cert") + else: + service_cert = service_cert.json()["license"] + cdm.set_service_certificate(session_id, service_cert) + + # generate license challenge + if service_cert: + challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True) + else: + challenge = cdm.get_license_challenge(session_id, pssh) + + # send license challenge + license = requests.post( + url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine", + data=challenge, + headers=license_curl_headers + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + license = license.json()["license"] + + # parse license challenge + cdm.parse_license(session_id, license) + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out the keys + print(f'Keys:\n{returned_keys}') + + # Return the keys for future ripper use. + return returned_keys + + +# Defining remote decrypt function for Crunchyroll +def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict = None): + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Ask for PSSH + input_pssh = input("PSSH: ") + print("\n") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": input_pssh + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Send the challenge to the widevine license server + license = requests.post( + url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine", + headers=license_curl_headers, + data=challenge + ) + + # Retrieve the license message + license = license.json()["license"] + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out keys + print(f'Keys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) diff --git a/Sites/Generic.py b/Sites/Generic.py new file mode 100644 index 0000000..98ee831 --- /dev/null +++ b/Sites/Generic.py @@ -0,0 +1,158 @@ +# Import dependencies +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import Helpers.cache_key +import requests +import base64 + + +# Defining decrypt function for Crunchyroll +def decrypt_generic(wvd: str = None, license_curl_headers: dict = None): + + # Ask for PSSH: + input_pssh = input("PSSH: ") + + # prepare pssh + pssh = PSSH(input_pssh) + license_url = input("License URL: ") + print("\n") + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + # get service certificate + service_cert = requests.post( + url=license_url, + data=cdm.service_certificate_challenge, + headers=license_curl_headers + ) + if service_cert.status_code != 200: + print("Couldn't retrieve service cert") + else: + service_cert = service_cert.content + cdm.set_service_certificate(session_id, service_cert) + + # generate license challenge + if service_cert: + challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True) + else: + challenge = cdm.get_license_challenge(session_id, pssh) + + # send license challenge + license = requests.post( + url=license_url, + data=challenge, + headers=license_curl_headers + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + license = license.content + + # parse license challenge + cdm.parse_license(session_id, license) + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out the keys + print(f'Keys:\n{returned_keys}') + + # Return the keys for future ripper use. + return returned_keys + + +# Defining remote decrypt function for Crunchyroll +def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None): + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Ask for PSSH + input_pssh = input("PSSH: ") + input_license_url = input("License URL: ") + print("\n") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": input_pssh + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Send the challenge to the widevine license server + license = requests.post( + url=input_license_url, + headers=license_curl_headers, + data=challenge + ) + + # Retrieve the license message + license = base64.b64encode(license.content).decode() + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out keys + print(f'Keys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) \ No newline at end of file diff --git a/Sites/YouTube.py b/Sites/YouTube.py new file mode 100644 index 0000000..901ebec --- /dev/null +++ b/Sites/YouTube.py @@ -0,0 +1,149 @@ +# Import dependencies +import json + +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import Helpers.cache_key +import requests +import base64 + + +# Defining decrypt function for YouTube +def decrypt_youtube(wvd: str = None, license_curl_headers: dict = None, license_curl_cookies: dict = None, license_curl_json: dict = None): + + # prepare pssh + pssh = PSSH("AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY=") + license_url = input("License URL: ") + print("\n") + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + # Generate challenge + challenge = cdm.get_license_challenge(session_id, pssh) + + # Insert the challenge into the JSON data + license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode() + + # send license challenge + license = requests.post( + url=license_url, + headers=license_curl_headers, + cookies=license_curl_cookies, + json=license_curl_json + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + licence = license.json()["license"].replace("-", "+").replace("_", "/") + + # parse license challenge + cdm.parse_license(session_id, licence) + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys) + + # Print out the keys + print(f'Keys:\n{returned_keys}') + + # Return the keys for future ripper use. + return returned_keys + + +# Defining remote decrypt function for Crunchyroll +def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, license_curl_cookies: dict = None): + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Ask for PSSH + input_license_url = input("License URL: ") + print("\n") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": "AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY=" + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Insert the challenge into the JSON data + license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode() + + # Send the challenge to the widevine license server + license = requests.post( + url=input_license_url, + headers=license_curl_headers, + json=license_curl_json, + cookies=license_curl_cookies + ) + + # Retrieve the license message + license = license.json()["license"].replace("-", "+").replace("_", "/") + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # Cache the keys + Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys) + + # Print out keys + print(f'Keys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) \ No newline at end of file diff --git a/Sites/__init__.py b/Sites/__init__.py new file mode 100644 index 0000000..c216588 --- /dev/null +++ b/Sites/__init__.py @@ -0,0 +1,3 @@ +from . import Crunchyroll +from . import Generic +from . import YouTube \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 70e8ecad56b927056b9096493d70b4aabfd49085..68eb7a0df432032281392471aa3f9ec1e451e379 100644 GIT binary patch literal 106 zcmezWuZSU)p^%{zNES1c0C63IErSt*9)mH1A&@j+;AP-qC}5~$C}+rINCB!U1JZdw b8l(=S!W^zDk)eno9jF#W7bB@K0jmH2XW|lh literal 41 wcmXS@EYD0yEz8VH None: - self.randomProxy = randomProxy - self.userCountry = userCountry - self.ccgi_url = "https://client.hola.org/client_cgi/" - self.ext_ver = self.get_ext_ver() - self.ext_browser = "chrome" - self.user_uuid = uuid.uuid4().hex - self.user_agent = "Mozilla/5.0 (X11; Fedora; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36" - self.product = "cws" - self.port_type_choice: str - self.zoneAvailable = ["AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI", - "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL", - "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"] - - def get_ext_ver(self) -> str: - about = httpx.get("https://hola.org/access/my/settings#/about").text - if 'window.pub_config.init({"ver":"' in about: - version = about.split('window.pub_config.init({"ver":"')[1].split('"')[0] - return version - - # last know working version - return "1.199.485" - - -class Engine: - def __init__(self, Settings) -> None: - self.settings = Settings - - def get_proxy(self, tunnels, tls=False) -> str: - login = f"user-uuid-{self.settings.user_uuid}" - proxies = dict(tunnels) - protocol = "https" if tls else "http" - for k, v in proxies["ip_list"].items(): - return "%s://%s:%s@%s:%d" % ( - protocol, - login, - proxies["agent_key"], - k if tls else v, - proxies["port"][self.settings.port_type_choice], - ) - - def generate_session_key(self, timeout: float = 10.0) -> json: - post_data = {"login": "1", "ver": self.settings.ext_ver} - return httpx.post( - f"{self.settings.ccgi_url}background_init?uuid={self.settings.user_uuid}", - json=post_data, - headers={"User-Agent": self.settings.user_agent}, - timeout=timeout, - ).json()["key"] - - def zgettunnels( - self, session_key: str, country: str, timeout: float = 10.0 - ) -> json: - qs = { - "country": country.lower(), - "limit": 1, - "ping_id": random.random(), - "ext_ver": self.settings.ext_ver, - "browser": self.settings.ext_browser, - "uuid": self.settings.user_uuid, - "session_key": session_key, - } - - return httpx.post( - f"{self.settings.ccgi_url}zgettunnels", params=qs, timeout=timeout - ).json() - - -class Hola: - def __init__(self, Settings) -> None: - self.myipUri: str = "https://hola.org/myip.json" - self.settings = Settings - - def get_country(self) -> str: - - if not self.settings.randomProxy and not self.settings.userCountry: - self.settings.userCountry = httpx.get(self.myipUri).json()["country"] - - if ( - not self.settings.userCountry in self.settings.zoneAvailable - or self.settings.randomProxy - ): - self.settings.userCountry = random.choice(self.settings.zoneAvailable) - - return self.settings.userCountry - - -def init_proxy(data): - settings = Settings( - data["zone"] - ) - settings.port_type_choice = data[ - "port" - ] - - hola = Hola(settings) - engine = Engine(settings) - - userCountry = hola.get_country() - session_key = engine.generate_session_key() - # time.sleep(10) - tunnels = engine.zgettunnels(session_key, userCountry) - - return engine.get_proxy(tunnels) - - -# Get current working directory -main_directory = os.getcwd() - -# Check if API key exists, if not create file -if not os.path.isfile(f"{main_directory}/api-key.txt"): - with open(f'{main_directory}/api-key.txt', 'w') as api_key: - print(f"\nIf you have an API key please place it in api-key.txt") - api_key.write("Delete this and place your API key on this line") - -# Check if API key exists -with open(f'{main_directory}/api-key.txt') as api_key: - api_key = api_key.readline() - if api_key == "Delete this and place your API key on this line": - print(f"\nNo API key found!\n") - api_key = "" - -# Create database and table for local key caching if they don't exist -if not os.path.isfile(f"{main_directory}/database.db"): - dbconnection = sqlite3.connect("database.db") - dbcursor = dbconnection.cursor() - dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )') - dbconnection.close() - - -# Define key cache function - - -def key_cache(pssh: str, db_keys: str): - dbconnection = sqlite3.connect("database.db") - dbcursor = dbconnection.cursor() - dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, db_keys)) - dbconnection.commit() - dbconnection.close() - - -# Making sure a .wvd file exists and using that as the CDM -try: - cdm = glob.glob(f'{main_directory}/*.wvd')[0] -except: - cdm = None - print(f"Please place a WVD in {main_directory}") - print(f"Use option 3 of TPD-Keys and set API key if you do not have your own.") - - -# Define key retrieval function -def retrieve_keys(proxy_used: str = None, headers: list = None, - json_data: json = None, device: str = cdm): - pssh = input("PSSH: ") - licence_url = input("License URL: ") - if proxy_used is not None: - proxy = init_proxy({"zone": proxy_used, "port": "peer"}) - proxies = { - "http": proxy - } - else: - proxies = None - challenge_pssh = PSSH(pssh) - try: - device = Device.load(device) - except: - print(f"Please place a WVD in {main_directory}") - exit() - cdm = Cdm.from_device(device) - session_id = cdm.open() - challenge = cdm.get_license_challenge(session_id, challenge_pssh) - license = requests.post(licence_url, data=challenge, proxies=proxies, headers=headers, json=json_data) - license.raise_for_status() - cdm.parse_license(session_id, license.content) - db_keys = '' - for key in cdm.get_keys(session_id): - if key.type != 'SIGNING': - db_keys += f'{key.kid.hex}:{key.key.hex()}\n' - key_cache(pssh=pssh, db_keys=db_keys) - return db_keys - -# Define retrieve keys remotely function - - -def retrieve_keys_remotely(proxy_used: str = None): - api_url = "https://api.cdrm-project.com" - api_device = "CDM" - pssh = input("PSSH: ") - license_url = input("License URL: ") - if proxy_used is not None: - proxy = init_proxy({"zone": proxy_used, "port": "peer"}) - proxies = { - "http": proxy - } - else: - proxies = None - x_headers = { - "X-Secret-Key": api_key - } - open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers) - - session_id = open_session.json()["data"]["session_id"] - - license_challenge_json_data = { - "session_id": session_id, - "init_data": pssh - } - - licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers, - json=license_challenge_json_data) - - license_message = licence_challenge.json()["data"]["challenge_b64"] - - license = requests.post( - headers=License_cURL.headers, - proxies=proxies, - url=license_url, - data=base64.b64decode(license_message) - ) - - parse_license_json_data = { - "session_id": session_id, - "license_message": f"{base64.b64encode(license.content).decode()}" - } - - requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data, - headers=x_headers) - - get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL", - json={"session_id": session_id}, headers=x_headers) - db_keys = '' - for key in get_keys.json()["data"]["keys"]: - if not key["type"] == "SIGNING": - db_keys += f"{key['key_id']}:{key['key']}\n" - key_cache(pssh=pssh, db_keys=db_keys) - - requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers) - - return db_keys - - -# Define retrieve keys remotely VDOCipher function -def retrieve_keys_remotely_vdocipher(proxy_used: str = None): - - # Get URL from function - url = input(f"Video URL: ") - - # Set the VDOCipher token headers - token_headers = { - 'accept': '*/*', - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36', - ## Comment this line out if using for anything other than https://www.vdocipher.com/blog/2014/12/add-text-to-videos-with-watermark/ - 'Origin': f"https://{urandom(8).hex()}.com", - } - - # Set the token response - token_response = requests.get(url, cookies=License_cURL.cookies, headers=token_headers) - try: - otp_match = re.findall(r"otp: '(.*)',", token_response.text)[0] - playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", token_response.text)[0] - except IndexError: - try: - otp_match = re.findall(r"otp=(.*)&", token_response.text)[0] - playbackinfo_match = re.findall(r"playbackInfo=(.*)", token_response.text)[0] - except IndexError: - print("\nAn error occured while getting otp/playback") - exit() - - # Set the video ID - video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"] - - # Set new token response (1) - token_response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=token_headers) - try: - license_url = token_response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0] - mpd = token_response.json()["dash"]["manifest"] - except KeyError: - print("\n An error occured while getting mpd/license url") - - # Set new token response (2) - token_response = requests.get(mpd, headers=token_headers) - - # Set API URL - api_url = "https://api.cdrm-project.com" - - # Set API Device - api_device = "CDM" - - # Retrieve PSSH - pssh = re.search(r"(.*)", token_response.text).group(1) - - # Check if proxy was used - if proxy_used is not None: - proxy = init_proxy({"zone": proxy_used, "port": "peer"}) - proxies = { - "http": proxy - } - else: - proxies = None - - # Set API headers - x_headers = { - "X-Secret-Key": api_key - } - - # Open API session - open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers) - - # Set the session ID - session_id = open_session.json()["data"]["session_id"] - - # Send json data to get license challenge - license_challenge_json_data = { - "session_id": session_id, - "init_data": pssh - } - - # Get the license challenge from PSSH - licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers, - json=license_challenge_json_data) - - # Set the final token - token = { - "otp":otp_match, - "playbackInfo":playbackinfo_match, - "href":url, - "tech":"wv", - "licenseRequest":licence_challenge.json()["data"]["challenge_b64"] - } - - # Send challenge - license = requests.post( - proxies=proxies, - url=license_url, - json={'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}'} - ) - - # Set the parsing JSON data - parse_license_json_data = { - "session_id": session_id, - "license_message": license.json()["license"] - } - - # Send the parsing JSON data - requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data, - headers=x_headers) - - # Get the keys - get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL", - json={"session_id": session_id}, headers=x_headers) - - # Cache the keys - db_keys = '' - for key in get_keys.json()["data"]["keys"]: - if not key["type"] == "SIGNING": - db_keys += f"{key['key_id']}:{key['key']}\n" - key_cache(pssh=pssh, db_keys=db_keys) - - # Close the session - requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers) - - # Return the keys - return db_keys - -# Defining service prompt function - - -def service_prompt(): - service_prompt = [ - inquirer.List('Service', - message="Please choose a service", - choices=['Generic', 'Generic with headers from Licence cURL', 'Remote', 'Remote VDOCipher'], - ), - ] - service_selected = inquirer.prompt(service_prompt) - - proxy_needed_prompt = [ - inquirer.List('Proxy', - message="Will you need a proxy?", - choices=['Yes', 'No'], - ), - ] - - proxy_needed = inquirer.prompt(proxy_needed_prompt) - if proxy_needed["Proxy"] == "Yes": - allowed_countries = [ - "AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI", - "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL", - "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB" - ] - proxy_available = [ - inquirer.List('Proxys available', - message="Please choose a country", - choices=allowed_countries - ), - ] - selected_proxy = inquirer.prompt(proxy_available) - return service_selected["Service"], selected_proxy["Proxys available"] - else: - selected_proxy = None - return service_selected["Service"], selected_proxy - - -# Define variables for the service and proxy wanted - - -service_selected, selected_proxy = service_prompt() - - -if service_selected == "Generic": - print(f"\n{retrieve_keys(proxy_used=selected_proxy)}") -elif service_selected == "Generic with headers from Licence cURL": - print(f"\n{retrieve_keys(proxy_used=selected_proxy, headers=License_cURL.headers)}") -elif service_selected == "Remote": - print(f"\n{retrieve_keys_remotely(proxy_used=selected_proxy)}") -elif service_selected == "Remote VDOCipher": - print(f"\n{retrieve_keys_remotely_vdocipher(proxy_used=selected_proxy)}") - +# Import dependencies +import Helpers +import license_curl +import Sites +import argparse + +# Get device and api key +device, api_key = Helpers.capability_check.capability_check() + +# Database check, if it doesn't exist, create it. +Helpers.database_check.database_check() + +# Initialize argparse and set variable +parser = argparse.ArgumentParser(description="Options decryption") + +# Create mutually exclusive groups for switches +services = parser.add_mutually_exclusive_group() + +# Add switches to the mutually exclusive groups +services.add_argument('--crunchyroll', action='store_true', help="Decrypt Crunchyroll") +services.add_argument('--crunchyroll-remote', action='store_true', help="Decrypt Crunchyroll remotely") +services.add_argument('--youtube', action='store_true', help="Decrypt YouTube") +services.add_argument('--youtube-remote', action='store_true', help="Decrypt YouTube remotely") +services.add_argument('--generic-remote', action='store_true', help="Decrypt generic services remotely") + +# Assign the switches a variable +switches = parser.parse_args() + +# Based on the selected switch within the mutually exclusive group, perform actions +if switches.crunchyroll: + # Perform action for --crunchyroll + Sites.Crunchyroll.decrypt_crunchyroll(wvd=device, license_curl_headers=license_curl.headers) +elif switches.crunchyroll_remote: + # Perform action for --crunchyroll-remote + Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers) +elif switches.youtube: + # Perform action for --youtube + Sites.YouTube.decrypt_youtube(wvd=device, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies) +elif switches.youtube_remote: + # Perform action for --youtube-remote + Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies) +elif switches.generic_remote: + # Perform action for --generic-remote + Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers) +else: + # If no switch is provided, perform a default action + Sites.Generic.decrypt_generic(wvd=device, license_curl_headers=license_curl.headers) \ No newline at end of file