Merge pull request 'TPD-Keys 2.0 Release' (#2) from Beta into main

Reviewed-on: https://cdm-project.com/Decryption-Tools/TPD-Keys/pulls/2
This commit is contained in:
TPD94 2023-12-04 09:50:59 +00:00
commit a97b81a987
14 changed files with 634 additions and 449 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
WVDs/
venv/
keys/
Config/
__pycache__/
*.pyc

5
Helpers/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from . import wvd_check
from . import api_check
from . import capability_check
from . import database_check
from . import cache_key

21
Helpers/api_check.py Normal file
View File

@ -0,0 +1,21 @@
# Import dependencies
import os
# Define api key check
def api_check():
# Create Config directory if it doesn't exist
if 'Config' not in os.listdir(fr'{os.getcwd()}'):
os.makedirs(f'{os.getcwd()}/Config')
# Create api-key.txt if it doesn't exist
if not os.path.isfile(f'{os.getcwd()}/Config/api-key.txt'):
with open(f'{os.getcwd()}/Config/api-key.txt', 'w') as api_key_text:
api_key_text.write("Place your API key on this line")
return "First run"
# Grab API Key from the text file
with open(f'{os.getcwd()}/Config/api-key.txt') as API_Key_Text:
api_key = API_Key_Text.readline()
if api_key != "Place your API key on this line":
return api_key
else:
return None

13
Helpers/cache_key.py Normal file
View File

@ -0,0 +1,13 @@
# Import dependencies
import sqlite3
import os
# Define cache function
def cache_keys(pssh: str, keys: str):
dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, keys))
dbconnection.commit()
dbconnection.close()

View File

@ -0,0 +1,27 @@
# Import dependencies
import Helpers
import os
def capability_check():
# Check for .WVD and API Key, exit program if neither exist.
Device = Helpers.wvd_check.wvd_check()
if Device is None:
API_Key = Helpers.api_check.api_check()
if API_Key == "First run" or API_Key == None:
exit(f"No CDM or API key found, please place a CDM in {os.getcwd()}/WVDs or an API key in {os.getcwd()}/Config/api-key.txt")
else:
print("No local device found, remote decryption only.")
print(f'Using API Key: {API_Key}\n')
return None, API_Key
elif Device is not None:
API_Key = Helpers.api_check.api_check()
if API_Key == "First run" or API_Key == None:
print("No API key found, local decryption only.")
print(f'Using device at {Device}\n')
return Device, None
else:
print(f'Local and remote decryption available.')
print(f'Using device at {Device}')
print(f'Using API Key: {API_Key}\n')
return Device, API_Key

19
Helpers/database_check.py Normal file
View File

@ -0,0 +1,19 @@
# Import dependencies
import os
import sqlite3
# Check to see if the database already exists, if not create a keys folder, and create the database.
def database_check():
# Check to see if the "keys" directory exists, if not creates it
if "keys" not in os.listdir(os.getcwd()):
os.makedirs('keys')
# Check to see if a database exists in keys directory, if not create it
if not os.path.isfile(f"{os.getcwd()}/keys/database.db"):
print(f"Creating database.\n")
dbconnection = sqlite3.connect(f"{os.getcwd()}/keys/database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
dbconnection.close()

21
Helpers/wvd_check.py Normal file
View File

@ -0,0 +1,21 @@
# Import dependencies
import os
import glob
# Define WVD device check
def wvd_check():
try:
# Check to see if the WVDs folder exist, if not create it
if 'WVDs' not in os.listdir(fr'{os.getcwd()}'):
os.makedirs(f'{os.getcwd()}/WVDs')
# Use glob to get the name of the .wvd
extracted_device = glob.glob(f'{os.getcwd()}/WVDs/*.wvd')[0]
# Return the device path
return extracted_device
except:
# Check to see if the WVDs folder exist, if not create it
if 'WVDs' not in os.listdir(fr'{os.getcwd()}'):
os.makedirs(f'{os.getcwd()}/WVDs')
# Stop the program and print out instructions
return None

View File

@ -1,21 +1,23 @@
# TPD-Keys
#### Created by @TPD94, proxy function by Radziu
#### Created by @TPD94
## Based on [pywidevine](https://cdm-project.com/Decryption-Tools/pywidevine "pywidevine")
How to use:
1. Create `TPD-Keys` folder.
2. Download and extract `tpd-keys.py`, `requirements.txt` and `License_cURL.py` into the newly created `TPD-Keys` directory
2. Download and extract `TPD-Keys.py`, `requirements.txt` and `License_curl.py` into the newly created `TPD-Keys` directory
3. Install the requirements with `pip install -r requirements.txt`
4. Crete a WVD with pywidevine; `pywidevine create-device -k "/PATH/TO/device_private_key" -c "/PATH/TO/device_client_id_blob" -t "ANDROID" -l 3`
5. Place your .wvd in the root of `TPD-Keys` directory
5. Place your .wvd in `/WVDs` directory, if you do not have this directory, create it or run the program with `python TPD-Keys.py` and it will be created for you
6. Paste any needed headers into `License_cURL.py`
6. Place your API key (if wanted) in `/Config/api-key.txt` if you do not have this file or directory, create it or run the program with `python TPD-Keys.py` and it will be created for you. If you don't have an API key, you can request one via [discord](https://discord.gg/cdrm-project "CDRM-Project")
7. Run with `python tpd-keys.py`
7. Paste dictionaries from license request curl post request into `License_curl.py`
8. Make a selection
8. Run with `python tpd-keys.py`
To view additional options you can use `python tpd-keys.py -h`

157
Sites/Crunchyroll.py Normal file
View File

@ -0,0 +1,157 @@
# Import dependencies
import base64
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import Helpers.cache_key
import requests
# Defining decrypt function for Crunchyroll
def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None):
# Ask for PSSH
input_pssh = input("PSSH: ")
print("\n")
# prepare pssh
pssh = PSSH(input_pssh)
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# get service certificate
service_cert = requests.post(
url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
data=cdm.service_certificate_challenge,
headers=license_curl_headers
)
if service_cert.status_code != 200:
print("Couldn't retrieve service cert")
else:
service_cert = service_cert.json()["license"]
cdm.set_service_certificate(session_id, service_cert)
# generate license challenge
if service_cert:
challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
else:
challenge = cdm.get_license_challenge(session_id, pssh)
# send license challenge
license = requests.post(
url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
data=challenge,
headers=license_curl_headers
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
license = license.json()["license"]
# parse license challenge
cdm.parse_license(session_id, license)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out the keys
print(f'Keys:\n{returned_keys}')
# Return the keys for future ripper use.
return returned_keys
# Defining remote decrypt function for Crunchyroll
def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict = None):
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Ask for PSSH
input_pssh = input("PSSH: ")
print("\n")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url="https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine",
headers=license_curl_headers,
data=challenge
)
# Retrieve the license message
license = license.json()["license"]
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'Keys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)

158
Sites/Generic.py Normal file
View File

@ -0,0 +1,158 @@
# Import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import Helpers.cache_key
import requests
import base64
# Defining decrypt function for Crunchyroll
def decrypt_generic(wvd: str = None, license_curl_headers: dict = None):
# Ask for PSSH:
input_pssh = input("PSSH: ")
# prepare pssh
pssh = PSSH(input_pssh)
license_url = input("License URL: ")
print("\n")
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# get service certificate
service_cert = requests.post(
url=license_url,
data=cdm.service_certificate_challenge,
headers=license_curl_headers
)
if service_cert.status_code != 200:
print("Couldn't retrieve service cert")
else:
service_cert = service_cert.content
cdm.set_service_certificate(session_id, service_cert)
# generate license challenge
if service_cert:
challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
else:
challenge = cdm.get_license_challenge(session_id, pssh)
# send license challenge
license = requests.post(
url=license_url,
data=challenge,
headers=license_curl_headers
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
license = license.content
# parse license challenge
cdm.parse_license(session_id, license)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out the keys
print(f'Keys:\n{returned_keys}')
# Return the keys for future ripper use.
return returned_keys
# Defining remote decrypt function for Crunchyroll
def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None):
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Ask for PSSH
input_pssh = input("PSSH: ")
input_license_url = input("License URL: ")
print("\n")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
data=challenge
)
# Retrieve the license message
license = base64.b64encode(license.content).decode()
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'Keys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)

149
Sites/YouTube.py Normal file
View File

@ -0,0 +1,149 @@
# Import dependencies
import json
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import Helpers.cache_key
import requests
import base64
# Defining decrypt function for YouTube
def decrypt_youtube(wvd: str = None, license_curl_headers: dict = None, license_curl_cookies: dict = None, license_curl_json: dict = None):
# prepare pssh
pssh = PSSH("AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY=")
license_url = input("License URL: ")
print("\n")
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# Generate challenge
challenge = cdm.get_license_challenge(session_id, pssh)
# Insert the challenge into the JSON data
license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=license_curl_headers,
cookies=license_curl_cookies,
json=license_curl_json
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
licence = license.json()["license"].replace("-", "+").replace("_", "/")
# parse license challenge
cdm.parse_license(session_id, licence)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys)
# Print out the keys
print(f'Keys:\n{returned_keys}')
# Return the keys for future ripper use.
return returned_keys
# Defining remote decrypt function for Crunchyroll
def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, license_curl_cookies: dict = None):
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Ask for PSSH
input_license_url = input("License URL: ")
print("\n")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": "AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY="
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Insert the challenge into the JSON data
license_curl_json["licenseRequest"] = base64.b64encode(challenge).decode()
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
json=license_curl_json,
cookies=license_curl_cookies
)
# Retrieve the license message
license = license.json()["license"].replace("-", "+").replace("_", "/")
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# Cache the keys
Helpers.cache_key.cache_keys(pssh="YouTube", keys=returned_keys)
# Print out keys
print(f'Keys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)

3
Sites/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from . import Crunchyroll
from . import Generic
from . import YouTube

Binary file not shown.

View File

@ -1,443 +1,47 @@
# Import needed libraries
import requests
import json
import httpx
import sqlite3
import License_cURL
import os
from os import urandom
import glob
import inquirer
import uuid
import random
import re
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
import base64
# Hola proxy
class Settings:
def __init__(self, userCountry: str = None, randomProxy: bool = False) -> None:
self.randomProxy = randomProxy
self.userCountry = userCountry
self.ccgi_url = "https://client.hola.org/client_cgi/"
self.ext_ver = self.get_ext_ver()
self.ext_browser = "chrome"
self.user_uuid = uuid.uuid4().hex
self.user_agent = "Mozilla/5.0 (X11; Fedora; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36"
self.product = "cws"
self.port_type_choice: str
self.zoneAvailable = ["AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
"FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
"NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"]
def get_ext_ver(self) -> str:
about = httpx.get("https://hola.org/access/my/settings#/about").text
if 'window.pub_config.init({"ver":"' in about:
version = about.split('window.pub_config.init({"ver":"')[1].split('"')[0]
return version
# last know working version
return "1.199.485"
class Engine:
def __init__(self, Settings) -> None:
self.settings = Settings
def get_proxy(self, tunnels, tls=False) -> str:
login = f"user-uuid-{self.settings.user_uuid}"
proxies = dict(tunnels)
protocol = "https" if tls else "http"
for k, v in proxies["ip_list"].items():
return "%s://%s:%s@%s:%d" % (
protocol,
login,
proxies["agent_key"],
k if tls else v,
proxies["port"][self.settings.port_type_choice],
)
def generate_session_key(self, timeout: float = 10.0) -> json:
post_data = {"login": "1", "ver": self.settings.ext_ver}
return httpx.post(
f"{self.settings.ccgi_url}background_init?uuid={self.settings.user_uuid}",
json=post_data,
headers={"User-Agent": self.settings.user_agent},
timeout=timeout,
).json()["key"]
def zgettunnels(
self, session_key: str, country: str, timeout: float = 10.0
) -> json:
qs = {
"country": country.lower(),
"limit": 1,
"ping_id": random.random(),
"ext_ver": self.settings.ext_ver,
"browser": self.settings.ext_browser,
"uuid": self.settings.user_uuid,
"session_key": session_key,
}
return httpx.post(
f"{self.settings.ccgi_url}zgettunnels", params=qs, timeout=timeout
).json()
class Hola:
def __init__(self, Settings) -> None:
self.myipUri: str = "https://hola.org/myip.json"
self.settings = Settings
def get_country(self) -> str:
if not self.settings.randomProxy and not self.settings.userCountry:
self.settings.userCountry = httpx.get(self.myipUri).json()["country"]
if (
not self.settings.userCountry in self.settings.zoneAvailable
or self.settings.randomProxy
):
self.settings.userCountry = random.choice(self.settings.zoneAvailable)
return self.settings.userCountry
def init_proxy(data):
settings = Settings(
data["zone"]
)
settings.port_type_choice = data[
"port"
]
hola = Hola(settings)
engine = Engine(settings)
userCountry = hola.get_country()
session_key = engine.generate_session_key()
# time.sleep(10)
tunnels = engine.zgettunnels(session_key, userCountry)
return engine.get_proxy(tunnels)
# Get current working directory
main_directory = os.getcwd()
# Check if API key exists, if not create file
if not os.path.isfile(f"{main_directory}/api-key.txt"):
with open(f'{main_directory}/api-key.txt', 'w') as api_key:
print(f"\nIf you have an API key please place it in api-key.txt")
api_key.write("Delete this and place your API key on this line")
# Check if API key exists
with open(f'{main_directory}/api-key.txt') as api_key:
api_key = api_key.readline()
if api_key == "Delete this and place your API key on this line":
print(f"\nNo API key found!\n")
api_key = ""
# Create database and table for local key caching if they don't exist
if not os.path.isfile(f"{main_directory}/database.db"):
dbconnection = sqlite3.connect("database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
dbconnection.close()
# Define key cache function
def key_cache(pssh: str, db_keys: str):
dbconnection = sqlite3.connect("database.db")
dbcursor = dbconnection.cursor()
dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, db_keys))
dbconnection.commit()
dbconnection.close()
# Making sure a .wvd file exists and using that as the CDM
try:
cdm = glob.glob(f'{main_directory}/*.wvd')[0]
except:
cdm = None
print(f"Please place a WVD in {main_directory}")
print(f"Use option 3 of TPD-Keys and set API key if you do not have your own.")
# Define key retrieval function
def retrieve_keys(proxy_used: str = None, headers: list = None,
json_data: json = None, device: str = cdm):
pssh = input("PSSH: ")
licence_url = input("License URL: ")
if proxy_used is not None:
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
proxies = {
"http": proxy
}
else:
proxies = None
challenge_pssh = PSSH(pssh)
try:
device = Device.load(device)
except:
print(f"Please place a WVD in {main_directory}")
exit()
cdm = Cdm.from_device(device)
session_id = cdm.open()
challenge = cdm.get_license_challenge(session_id, challenge_pssh)
license = requests.post(licence_url, data=challenge, proxies=proxies, headers=headers, json=json_data)
license.raise_for_status()
cdm.parse_license(session_id, license.content)
db_keys = ''
for key in cdm.get_keys(session_id):
if key.type != 'SIGNING':
db_keys += f'{key.kid.hex}:{key.key.hex()}\n'
key_cache(pssh=pssh, db_keys=db_keys)
return db_keys
# Define retrieve keys remotely function
def retrieve_keys_remotely(proxy_used: str = None):
api_url = "https://api.cdrm-project.com"
api_device = "CDM"
pssh = input("PSSH: ")
license_url = input("License URL: ")
if proxy_used is not None:
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
proxies = {
"http": proxy
}
else:
proxies = None
x_headers = {
"X-Secret-Key": api_key
}
open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
session_id = open_session.json()["data"]["session_id"]
license_challenge_json_data = {
"session_id": session_id,
"init_data": pssh
}
licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
json=license_challenge_json_data)
license_message = licence_challenge.json()["data"]["challenge_b64"]
license = requests.post(
headers=License_cURL.headers,
proxies=proxies,
url=license_url,
data=base64.b64decode(license_message)
)
parse_license_json_data = {
"session_id": session_id,
"license_message": f"{base64.b64encode(license.content).decode()}"
}
requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
headers=x_headers)
get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
json={"session_id": session_id}, headers=x_headers)
db_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
db_keys += f"{key['key_id']}:{key['key']}\n"
key_cache(pssh=pssh, db_keys=db_keys)
requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
return db_keys
# Define retrieve keys remotely VDOCipher function
def retrieve_keys_remotely_vdocipher(proxy_used: str = None):
# Get URL from function
url = input(f"Video URL: ")
# Set the VDOCipher token headers
token_headers = {
'accept': '*/*',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
## Comment this line out if using for anything other than https://www.vdocipher.com/blog/2014/12/add-text-to-videos-with-watermark/
'Origin': f"https://{urandom(8).hex()}.com",
}
# Set the token response
token_response = requests.get(url, cookies=License_cURL.cookies, headers=token_headers)
try:
otp_match = re.findall(r"otp: '(.*)',", token_response.text)[0]
playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", token_response.text)[0]
except IndexError:
try:
otp_match = re.findall(r"otp=(.*)&", token_response.text)[0]
playbackinfo_match = re.findall(r"playbackInfo=(.*)", token_response.text)[0]
except IndexError:
print("\nAn error occured while getting otp/playback")
exit()
# Set the video ID
video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"]
# Set new token response (1)
token_response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=token_headers)
try:
license_url = token_response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
mpd = token_response.json()["dash"]["manifest"]
except KeyError:
print("\n An error occured while getting mpd/license url")
# Set new token response (2)
token_response = requests.get(mpd, headers=token_headers)
# Set API URL
api_url = "https://api.cdrm-project.com"
# Set API Device
api_device = "CDM"
# Retrieve PSSH
pssh = re.search(r"<cenc:pssh>(.*)</cenc:pssh>", token_response.text).group(1)
# Check if proxy was used
if proxy_used is not None:
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
proxies = {
"http": proxy
}
else:
proxies = None
# Set API headers
x_headers = {
"X-Secret-Key": api_key
}
# Open API session
open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
# Set the session ID
session_id = open_session.json()["data"]["session_id"]
# Send json data to get license challenge
license_challenge_json_data = {
"session_id": session_id,
"init_data": pssh
}
# Get the license challenge from PSSH
licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
json=license_challenge_json_data)
# Set the final token
token = {
"otp":otp_match,
"playbackInfo":playbackinfo_match,
"href":url,
"tech":"wv",
"licenseRequest":licence_challenge.json()["data"]["challenge_b64"]
}
# Send challenge
license = requests.post(
proxies=proxies,
url=license_url,
json={'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}'}
)
# Set the parsing JSON data
parse_license_json_data = {
"session_id": session_id,
"license_message": license.json()["license"]
}
# Send the parsing JSON data
requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
headers=x_headers)
# Get the keys
get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
json={"session_id": session_id}, headers=x_headers)
# Cache the keys
db_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
db_keys += f"{key['key_id']}:{key['key']}\n"
key_cache(pssh=pssh, db_keys=db_keys)
# Close the session
requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
# Return the keys
return db_keys
# Defining service prompt function
def service_prompt():
service_prompt = [
inquirer.List('Service',
message="Please choose a service",
choices=['Generic', 'Generic with headers from Licence cURL', 'Remote', 'Remote VDOCipher'],
),
]
service_selected = inquirer.prompt(service_prompt)
proxy_needed_prompt = [
inquirer.List('Proxy',
message="Will you need a proxy?",
choices=['Yes', 'No'],
),
]
proxy_needed = inquirer.prompt(proxy_needed_prompt)
if proxy_needed["Proxy"] == "Yes":
allowed_countries = [
"AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
"FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
"NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"
]
proxy_available = [
inquirer.List('Proxys available',
message="Please choose a country",
choices=allowed_countries
),
]
selected_proxy = inquirer.prompt(proxy_available)
return service_selected["Service"], selected_proxy["Proxys available"]
else:
selected_proxy = None
return service_selected["Service"], selected_proxy
# Define variables for the service and proxy wanted
service_selected, selected_proxy = service_prompt()
if service_selected == "Generic":
print(f"\n{retrieve_keys(proxy_used=selected_proxy)}")
elif service_selected == "Generic with headers from Licence cURL":
print(f"\n{retrieve_keys(proxy_used=selected_proxy, headers=License_cURL.headers)}")
elif service_selected == "Remote":
print(f"\n{retrieve_keys_remotely(proxy_used=selected_proxy)}")
elif service_selected == "Remote VDOCipher":
print(f"\n{retrieve_keys_remotely_vdocipher(proxy_used=selected_proxy)}")
# Import dependencies
import Helpers
import license_curl
import Sites
import argparse
# Get device and api key
device, api_key = Helpers.capability_check.capability_check()
# Database check, if it doesn't exist, create it.
Helpers.database_check.database_check()
# Initialize argparse and set variable
parser = argparse.ArgumentParser(description="Options decryption")
# Create mutually exclusive groups for switches
services = parser.add_mutually_exclusive_group()
# Add switches to the mutually exclusive groups
services.add_argument('--crunchyroll', action='store_true', help="Decrypt Crunchyroll")
services.add_argument('--crunchyroll-remote', action='store_true', help="Decrypt Crunchyroll remotely")
services.add_argument('--youtube', action='store_true', help="Decrypt YouTube")
services.add_argument('--youtube-remote', action='store_true', help="Decrypt YouTube remotely")
services.add_argument('--generic-remote', action='store_true', help="Decrypt generic services remotely")
# Assign the switches a variable
switches = parser.parse_args()
# Based on the selected switch within the mutually exclusive group, perform actions
if switches.crunchyroll:
# Perform action for --crunchyroll
Sites.Crunchyroll.decrypt_crunchyroll(wvd=device, license_curl_headers=license_curl.headers)
elif switches.crunchyroll_remote:
# Perform action for --crunchyroll-remote
Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
elif switches.youtube:
# Perform action for --youtube
Sites.YouTube.decrypt_youtube(wvd=device, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
elif switches.youtube_remote:
# Perform action for --youtube-remote
Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
elif switches.generic_remote:
# Perform action for --generic-remote
Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
else:
# If no switch is provided, perform a default action
Sites.Generic.decrypt_generic(wvd=device, license_curl_headers=license_curl.headers)