TPD-Keys 2.0 Release

Cleaned up code, added remote options for all services, improved modularity
This commit is contained in:
TPD94 2023-12-04 04:46:41 -05:00
parent 99a75e1671
commit a39e10c1db
14 changed files with 634 additions and 449 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
WVDs/
venv/
keys/
Config/
__pycache__/
*.pyc

5
Helpers/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from . import wvd_check
from . import api_check
from . import capability_check
from . import database_check
from . import cache_key

21
Helpers/api_check.py Normal file
View File

@ -0,0 +1,21 @@
# Import dependencies
import os
# Define api key check
def api_check():
# Create Config directory if it doesn't exist
if 'Config' not in os.listdir(fr'{os.getcwd()}'):
os.makedirs(f'{os.getcwd()}/Config')
# Create api-key.txt if it doesn't exist
if not os.path.isfile(f'{os.getcwd()}/Config/api-key.txt'):
with open(f'{os.getcwd()}/Config/api-key.txt', 'w') as api_key_text:
api_key_text.write("Place your API key on this line")
return "First run"
# Grab API Key from the text file
with open(f'{os.getcwd()}/Config/api-key.txt') as API_Key_Text:
api_key = API_Key_Text.readline()
if api_key != "Place your API key on this line":
return api_key
else:
return None

13
Helpers/cache_key.py Normal file
View File

@ -0,0 +1,13 @@
# Import dependencies
import sqlite3
import os
# Define cache function
def cache_keys(pssh: str, keys: str):
dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, keys))
dbconnection.commit()
dbconnection.close()

View File

@ -0,0 +1,27 @@
# Import dependencies
import Helpers
import os
def capability_check():
# Check for .WVD and API Key, exit program if neither exist.
Device = Helpers.wvd_check.wvd_check()
if Device is None:
API_Key = Helpers.api_check.api_check()
if API_Key == "First run" or API_Key == None:
exit(f"No CDM or API key found, please place a CDM in {os.getcwd()}/WVDs or an API key in {os.getcwd()}/Config/api-key.txt")
else:
print("No local device found, remote decryption only.")
print(f'Using API Key: {API_Key}\n')
return None, API_Key
elif Device is not None:
API_Key = Helpers.api_check.api_check()
if API_Key == "First run" or API_Key == None:
print("No API key found, local decryption only.")
print(f'Using device at {Device}\n')
return Device, None
else:
print(f'Local and remote decryption available.')
print(f'Using device at {Device}')
print(f'Using API Key: {API_Key}\n')
return Device, API_Key

19
Helpers/database_check.py Normal file
View File

@ -0,0 +1,19 @@
# Import dependencies
import os
import sqlite3
# Check to see if the database already exists, if not create a keys folder, and create the database.
def database_check():
# Check to see if the "keys" directory exists, if not creates it
if "keys" not in os.listdir(os.getcwd()):
os.makedirs('keys')
# Check to see if a database exists in keys directory, if not create it
if not os.path.isfile(f"{os.getcwd()}/keys/database.db"):
print(f"Creating database.\n")
dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
dbconnection.close()

21
Helpers/wvd_check.py Normal file
View File

@ -0,0 +1,21 @@
# Import dependencies
import os
import glob
# Define WVD device check
def wvd_check():
try:
# Check to see if the WVDs folder exist, if not create it
if 'WVDs' not in os.listdir(fr'{os.getcwd()}'):
os.makedirs(f'{os.getcwd()}/WVDs')
# Use glob to get the name of the .wvd
extracted_device = glob.glob(f'{os.getcwd()}/WVDs/*.wvd')[0]
# Return the device path
return extracted_device
except:
# Check to see if the WVDs folder exist, if not create it
if 'WVDs' not in os.listdir(fr'{os.getcwd()}'):
os.makedirs(f'{os.getcwd()}/WVDs')
# Stop the program and print out instructions
return None

View File

@ -1,21 +1,23 @@
# TPD-Keys
#### Created by @TPD94, proxy function by Radziu
#### Created by @TPD94
## Based on [pywidevine](https://cdm-project.com/Decryption-Tools/pywidevine "pywidevine")
How to use:
1. Create `TPD-Keys` folder.
2. Download and extract `tpd-keys.py`, `requirements.txt` and `License_cURL.py` into the newly created `TPD-Keys` directory
2. Download and extract `TPD-Keys.py`, `requirements.txt` and `License_curl.py` into the newly created `TPD-Keys` directory
3. Install the requirements with `pip install -r requirements.txt`
4. Crete a WVD with pywidevine; `pywidevine create-device -k "/PATH/TO/device_private_key" -c "/PATH/TO/device_client_id_blob" -t "ANDROID" -l 3`
5. Place your .wvd in the root of `TPD-Keys` directory
5. Place your .wvd in `/WVDs` directory, if you do not have this directory, create it or run the program with `python TPD-Keys.py` and it will be created for you
6. Paste any needed headers into `License_cURL.py`
6. Place your API key (if wanted) in `/Config/api-key.txt` if you do not have this file or directory, create it or run the program with `python TPD-Keys.py` and it will be created for you. If you don't have an API key, you can request one via [discord](https://discord.gg/cdrm-project "CDRM-Project")
7. Run with `python tpd-keys.py`
7. Paste dictionaries from license request curl post request into `License_curl.py`
8. Make a selection
8. Run with `python tpd-keys.py`
To view additional options you can use `python tpd-keys.py -h`

157
Sites/Crunchyroll.py Normal file
View File

@ -0,0 +1,157 @@
# Import dependencies
import base64
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import Helpers.cache_key
import requests
# Defining decrypt function for Crunchyroll
def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None):
# Ask for PSSH
input_pssh = input("PSSH: ")
print("\n")
# prepare pssh
pssh = PSSH(input_pssh)
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# get service certificate
service_cert = requests.post(
url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
data=cdm.service_certificate_challenge,
headers=license_curl_headers
)
if service_cert.status_code != 200:
print("Couldn't retrieve service cert")
else:
service_cert = service_cert.json()["license"]
cdm.set_service_certificate(session_id, service_cert)
# generate license challenge
if service_cert:
challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
else:
challenge = cdm.get_license_challenge(session_id, pssh)
# send license challenge
license = requests.post(
url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
data=challenge,
headers=license_curl_headers
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
license = license.json()["license"]
# parse license challenge
cdm.parse_license(session_id, license)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out the keys
print(f'Keys:\n{returned_keys}')
# Return the keys for future ripper use.
return returned_keys
# Defining remote decrypt function for Crunchyroll
def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict = None):
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Ask for PSSH
input_pssh = input("PSSH: ")
print("\n")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
headers=license_curl_headers,
data=challenge
)
# Retrieve the license message
license = license.json()["license"]
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'Keys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)

158
Sites/Generic.py Normal file
View File

@ -0,0 +1,158 @@
# Import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import Helpers.cache_key
import requests
import base64
# Defining decrypt function for Crunchyroll
def decrypt_generic(wvd: str = None, license_curl_headers: dict = None):
# Ask for PSSH:
input_pssh = input("PSSH: ")
# prepare pssh
pssh = PSSH(input_pssh)
license_url = input("License URL: ")
print("\n")
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# get service certificate
service_cert = requests.post(
url=license_url,
data=cdm.service_certificate_challenge,
headers=license_curl_headers
)
if service_cert.status_code != 200:
print("Couldn't retrieve service cert")
else:
service_cert = service_cert.content
cdm.set_service_certificate(session_id, service_cert)
# generate license challenge
if service_cert:
challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
else:
challenge = cdm.get_license_challenge(session_id, pssh)
# send license challenge
license = requests.post(
url=license_url,
data=challenge,
headers=license_curl_headers
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
license = license.content
# parse license challenge
cdm.parse_license(session_id, license)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out the keys
print(f'Keys:\n{returned_keys}')
# Return the keys for future ripper use.
return returned_keys
# Defining remote decrypt function for Crunchyroll
def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None):
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Ask for PSSH
input_pssh = input("PSSH: ")
input_license_url = input("License URL: ")
print("\n")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
data=challenge
)
# Retrieve the license message
license = base64.b64encode(license.content).decode()
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'Keys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)

149
Sites/YouTube.py Normal file
View File

@ -0,0 +1,149 @@
# Import dependencies
import json
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import Helpers.cache_key
import requests
import base64
# Defining decrypt function for YouTube
def decrypt_youtube(wvd: str = None, license_curl_headers: dict = None, license_curl_cookies: dict = None, license_curl_json: dict = None):
# prepare pssh
pssh = PSSH("AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY=")
license_url = input("License URL: ")
print("\n")
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# Generate challenge
challenge = cdm.get_license_challenge(session_id, pssh)
# Insert the challenge into the JSON data
license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=license_curl_headers,
cookies=license_curl_cookies,
json=license_curl_json
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
licence = license.json()["license"].replace("-", "+").replace("_", "/")
# parse license challenge
cdm.parse_license(session_id, licence)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys)
# Print out the keys
print(f'Keys:\n{returned_keys}')
# Return the keys for future ripper use.
return returned_keys
# Defining remote decrypt function for Crunchyroll
def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, license_curl_cookies: dict = None):
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Ask for PSSH
input_license_url = input("License URL: ")
print("\n")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": "AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY="
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Insert the challenge into the JSON data
license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode()
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
json=license_curl_json,
cookies=license_curl_cookies
)
# Retrieve the license message
license = license.json()["license"].replace("-", "+").replace("_", "/")
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# Cache the keys
Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys)
# Print out keys
print(f'Keys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)

3
Sites/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from . import Crunchyroll
from . import Generic
from . import YouTube

Binary file not shown.

View File

@ -1,443 +1,47 @@
# Import needed libraries
# Import dependencies
import Helpers
import license_curl
import Sites
import argparse
import requests
import json
import httpx
import sqlite3
import License_cURL
import os
from os import urandom
import glob
import inquirer
import uuid
import random
import re
# Get device and api key
device, api_key = Helpers.capability_check.capability_check()
# Database check, if it doesn't exist, create it.
Helpers.database_check.database_check()
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
import base64
# Initialize argparse and set variable
parser = argparse.ArgumentParser(description="Options decryption")
# Create mutually exclusive groups for switches
services = parser.add_mutually_exclusive_group()
# Hola proxy
# Add switches to the mutually exclusive groups
services.add_argument('--crunchyroll', action='store_true', help="Decrypt Crunchyroll")
services.add_argument('--crunchyroll-remote', action='store_true', help="Decrypt Crunchyroll remotely")
services.add_argument('--youtube', action='store_true', help="Decrypt YouTube")
services.add_argument('--youtube-remote', action='store_true', help="Decrypt YouTube remotely")
services.add_argument('--generic-remote', action='store_true', help="Decrypt generic services remotely")
class Settings:
def __init__(self, userCountry: str = None, randomProxy: bool = False) -> None:
self.randomProxy = randomProxy
self.userCountry = userCountry
self.ccgi_url = "https://client.hola.org/client_cgi/"
self.ext_ver = self.get_ext_ver()
self.ext_browser = "chrome"
self.user_uuid = uuid.uuid4().hex
self.user_agent = "Mozilla/5.0 (X11; Fedora; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36"
self.product = "cws"
self.port_type_choice: str
self.zoneAvailable = ["AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
"FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
"NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"]
# Assign the switches a variable
switches = parser.parse_args()
def get_ext_ver(self) -> str:
about = httpx.get("https://hola.org/access/my/settings#/about").text
if 'window.pub_config.init({"ver":"' in about:
version = about.split('window.pub_config.init({"ver":"')[1].split('"')[0]
return version
# last know working version
return "1.199.485"
class Engine:
def __init__(self, Settings) -> None:
self.settings = Settings
def get_proxy(self, tunnels, tls=False) -> str:
login = f"user-uuid-{self.settings.user_uuid}"
proxies = dict(tunnels)
protocol = "https" if tls else "http"
for k, v in proxies["ip_list"].items():
return "%s://%s:%s@%s:%d" % (
protocol,
login,
proxies["agent_key"],
k if tls else v,
proxies["port"][self.settings.port_type_choice],
)
def generate_session_key(self, timeout: float = 10.0) -> json:
post_data = {"login": "1", "ver": self.settings.ext_ver}
return httpx.post(
f"{self.settings.ccgi_url}background_init?uuid={self.settings.user_uuid}",
json=post_data,
headers={"User-Agent": self.settings.user_agent},
timeout=timeout,
).json()["key"]
def zgettunnels(
self, session_key: str, country: str, timeout: float = 10.0
) -> json:
qs = {
"country": country.lower(),
"limit": 1,
"ping_id": random.random(),
"ext_ver": self.settings.ext_ver,
"browser": self.settings.ext_browser,
"uuid": self.settings.user_uuid,
"session_key": session_key,
}
return httpx.post(
f"{self.settings.ccgi_url}zgettunnels", params=qs, timeout=timeout
).json()
class Hola:
def __init__(self, Settings) -> None:
self.myipUri: str = "https://hola.org/myip.json"
self.settings = Settings
def get_country(self) -> str:
if not self.settings.randomProxy and not self.settings.userCountry:
self.settings.userCountry = httpx.get(self.myipUri).json()["country"]
if (
not self.settings.userCountry in self.settings.zoneAvailable
or self.settings.randomProxy
):
self.settings.userCountry = random.choice(self.settings.zoneAvailable)
return self.settings.userCountry
def init_proxy(data):
settings = Settings(
data["zone"]
)
settings.port_type_choice = data[
"port"
]
hola = Hola(settings)
engine = Engine(settings)
userCountry = hola.get_country()
session_key = engine.generate_session_key()
# time.sleep(10)
tunnels = engine.zgettunnels(session_key, userCountry)
return engine.get_proxy(tunnels)
# Get current working directory
main_directory = os.getcwd()
# Check if API key exists, if not create file
if not os.path.isfile(f"{main_directory}/api-key.txt"):
with open(f'{main_directory}/api-key.txt', 'w') as api_key:
print(f"\nIf you have an API key please place it in api-key.txt")
api_key.write("Delete this and place your API key on this line")
# Check if API key exists
with open(f'{main_directory}/api-key.txt') as api_key:
api_key = api_key.readline()
if api_key == "Delete this and place your API key on this line":
print(f"\nNo API key found!\n")
api_key = ""
# Create database and table for local key caching if they don't exist
if not os.path.isfile(f"{main_directory}/database.db"):
dbconnection = sqlite3.connect("database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
dbconnection.close()
# Define key cache function
def key_cache(pssh: str, db_keys: str):
dbconnection = sqlite3.connect("database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, db_keys))
dbconnection.commit()
dbconnection.close()
# Making sure a .wvd file exists and using that as the CDM
try:
cdm = glob.glob(f'{main_directory}/*.wvd')[0]
except:
cdm = None
print(f"Please place a WVD in {main_directory}")
print(f"Use option 3 of TPD-Keys and set API key if you do not have your own.")
# Define key retrieval function
def retrieve_keys(proxy_used: str = None, headers: list = None,
json_data: json = None, device: str = cdm):
pssh = input("PSSH: ")
licence_url = input("License URL: ")
if proxy_used is not None:
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
proxies = {
"http": proxy
}
# Based on the selected switch within the mutually exclusive group, perform actions
if switches.crunchyroll:
# Perform action for --crunchyroll
Sites.Crunchyroll.decrypt_crunchyroll(wvd=device, license_curl_headers=license_curl.headers)
elif switches.crunchyroll_remote:
# Perform action for --crunchyroll-remote
Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
elif switches.youtube:
# Perform action for --youtube
Sites.YouTube.decrypt_youtube(wvd=device, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
elif switches.youtube_remote:
# Perform action for --youtube-remote
Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
elif switches.generic_remote:
# Perform action for --generic-remote
Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
else:
proxies = None
challenge_pssh = PSSH(pssh)
try:
device = Device.load(device)
except:
print(f"Please place a WVD in {main_directory}")
exit()
cdm = Cdm.from_device(device)
session_id = cdm.open()
challenge = cdm.get_license_challenge(session_id, challenge_pssh)
license = requests.post(licence_url, data=challenge, proxies=proxies, headers=headers, json=json_data)
license.raise_for_status()
cdm.parse_license(session_id, license.content)
db_keys = ''
for key in cdm.get_keys(session_id):
if key.type != 'SIGNING':
db_keys += f'{key.kid.hex}:{key.key.hex()}\n'
key_cache(pssh=pssh, db_keys=db_keys)
return db_keys
# Define retrieve keys remotely function
def retrieve_keys_remotely(proxy_used: str = None):
api_url = "https://api.cdrm-project.com"
api_device = "CDM"
pssh = input("PSSH: ")
license_url = input("License URL: ")
if proxy_used is not None:
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
proxies = {
"http": proxy
}
else:
proxies = None
x_headers = {
"X-Secret-Key": api_key
}
open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
session_id = open_session.json()["data"]["session_id"]
license_challenge_json_data = {
"session_id": session_id,
"init_data": pssh
}
licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
json=license_challenge_json_data)
license_message = licence_challenge.json()["data"]["challenge_b64"]
license = requests.post(
headers=License_cURL.headers,
proxies=proxies,
url=license_url,
data=base64.b64decode(license_message)
)
parse_license_json_data = {
"session_id": session_id,
"license_message": f"{base64.b64encode(license.content).decode()}"
}
requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
headers=x_headers)
get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
json={"session_id": session_id}, headers=x_headers)
db_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
db_keys += f"{key['key_id']}:{key['key']}\n"
key_cache(pssh=pssh, db_keys=db_keys)
requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
return db_keys
# Define retrieve keys remotely VDOCipher function
def retrieve_keys_remotely_vdocipher(proxy_used: str = None):
# Get URL from function
url = input(f"Video URL: ")
# Set the VDOCipher token headers
token_headers = {
'accept': '*/*',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
## Comment this line out if using for anything other than https://www.vdocipher.com/blog/2014/12/add-text-to-videos-with-watermark/
'Origin': f"https://{urandom(8).hex()}.com",
}
# Set the token response
token_response = requests.get(url, cookies=License_cURL.cookies, headers=token_headers)
try:
otp_match = re.findall(r"otp: '(.*)',", token_response.text)[0]
playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", token_response.text)[0]
except IndexError:
try:
otp_match = re.findall(r"otp=(.*)&", token_response.text)[0]
playbackinfo_match = re.findall(r"playbackInfo=(.*)", token_response.text)[0]
except IndexError:
print("\nAn error occured while getting otp/playback")
exit()
# Set the video ID
video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"]
# Set new token response (1)
token_response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=token_headers)
try:
license_url = token_response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
mpd = token_response.json()["dash"]["manifest"]
except KeyError:
print("\n An error occured while getting mpd/license url")
# Set new token response (2)
token_response = requests.get(mpd, headers=token_headers)
# Set API URL
api_url = "https://api.cdrm-project.com"
# Set API Device
api_device = "CDM"
# Retrieve PSSH
pssh = re.search(r"<cenc:pssh>(.*)</cenc:pssh>", token_response.text).group(1)
# Check if proxy was used
if proxy_used is not None:
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
proxies = {
"http": proxy
}
else:
proxies = None
# Set API headers
x_headers = {
"X-Secret-Key": api_key
}
# Open API session
open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
# Set the session ID
session_id = open_session.json()["data"]["session_id"]
# Send json data to get license challenge
license_challenge_json_data = {
"session_id": session_id,
"init_data": pssh
}
# Get the license challenge from PSSH
licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
json=license_challenge_json_data)
# Set the final token
token = {
"otp":otp_match,
"playbackInfo":playbackinfo_match,
"href":url,
"tech":"wv",
"licenseRequest":licence_challenge.json()["data"]["challenge_b64"]
}
# Send challenge
license = requests.post(
proxies=proxies,
url=license_url,
json={'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}'}
)
# Set the parsing JSON data
parse_license_json_data = {
"session_id": session_id,
"license_message": license.json()["license"]
}
# Send the parsing JSON data
requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
headers=x_headers)
# Get the keys
get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
json={"session_id": session_id}, headers=x_headers)
# Cache the keys
db_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
db_keys += f"{key['key_id']}:{key['key']}\n"
key_cache(pssh=pssh, db_keys=db_keys)
# Close the session
requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
# Return the keys
return db_keys
# Defining service prompt function
def service_prompt():
service_prompt = [
inquirer.List('Service',
message="Please choose a service",
choices=['Generic', 'Generic with headers from Licence cURL', 'Remote', 'Remote VDOCipher'],
),
]
service_selected = inquirer.prompt(service_prompt)
proxy_needed_prompt = [
inquirer.List('Proxy',
message="Will you need a proxy?",
choices=['Yes', 'No'],
),
]
proxy_needed = inquirer.prompt(proxy_needed_prompt)
if proxy_needed["Proxy"] == "Yes":
allowed_countries = [
"AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
"FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
"NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"
]
proxy_available = [
inquirer.List('Proxys available',
message="Please choose a country",
choices=allowed_countries
),
]
selected_proxy = inquirer.prompt(proxy_available)
return service_selected["Service"], selected_proxy["Proxys available"]
else:
selected_proxy = None
return service_selected["Service"], selected_proxy
# Define variables for the service and proxy wanted
service_selected, selected_proxy = service_prompt()
if service_selected == "Generic":
print(f"\n{retrieve_keys(proxy_used=selected_proxy)}")
elif service_selected == "Generic with headers from Licence cURL":
print(f"\n{retrieve_keys(proxy_used=selected_proxy, headers=License_cURL.headers)}")
elif service_selected == "Remote":
print(f"\n{retrieve_keys_remotely(proxy_used=selected_proxy)}")
elif service_selected == "Remote VDOCipher":
print(f"\n{retrieve_keys_remotely_vdocipher(proxy_used=selected_proxy)}")
# If no switch is provided, perform a default action
Sites.Generic.decrypt_generic(wvd=device, license_curl_headers=license_curl.headers)