mirror of
https://cdm-project.com/Decryption-Tools/TPD-Keys.git
synced 2025-04-30 00:24:25 +02:00
Merge pull request 'Beta branch' (#1) from Beta into main
Reviewed-on: https://cdm-project.com/Decryption-Tools/TPD-Keys/pulls/1
This commit is contained in:
commit
58e41f459e
@ -1,15 +0,0 @@
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0',
|
||||
'Accept': '*/*',
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
# 'Accept-Encoding': 'gzip, deflate, br',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'DNT': '1',
|
||||
'Sec-Fetch-Dest': 'empty',
|
||||
'Sec-Fetch-Mode': 'cors',
|
||||
'Sec-Fetch-Site': 'same-site',
|
||||
'Sec-GPC': '1',
|
||||
'Connection': 'keep-alive',
|
||||
# Requests doesn't support trailers
|
||||
# 'TE': 'trailers',
|
||||
}
|
5
License_cURL.py
Normal file
5
License_cURL.py
Normal file
@ -0,0 +1,5 @@
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0',
|
||||
'Accept': '*/*',
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
}
|
11
README.md
11
README.md
@ -6,20 +6,15 @@
|
||||
How to use:
|
||||
1. Create `TPD-Keys` folder.
|
||||
|
||||
2. Download and extract `tpd-keys.py`, `requirements.txt` and `DRMHeaders.py` into the newly created `TPD-Keys` directory
|
||||
2. Download and extract `tpd-keys.py`, `requirements.txt` and `License_cURL.py` into the newly created `TPD-Keys` directory
|
||||
|
||||
3. Install the requirements with `pip install -r requirements.txt`
|
||||
|
||||
4. Crete a WVD with pywidevine; `pywidevine create-device -k "/PATH/TO/device_private_key" -c "/PATH/TO/device_client_id_blob" -t "ANDROID" -l 3`
|
||||
|
||||
5. Replace `MyWVD= "/PATH/TO/WVD.wvd"` with the path to your `.wvd` on line 15 of tpd-keys.py
|
||||
5. Place your .wvd in the root of `TPD-Keys` directory
|
||||
|
||||
> For instance
|
||||
> `MyWVD = "C:\Users\TPD94\Desktop\AndroidDeivce.wvd"`
|
||||
> or if it is located in the same folder
|
||||
> `MyWVD = "AndroidDeivce.wvd"`
|
||||
|
||||
6. Paste any needed headers into `DRMHeaders.py`
|
||||
6. Paste any needed headers into `License_cURL.py`
|
||||
|
||||
7. Run with `python tpd-keys.py`
|
||||
|
||||
|
@ -2,3 +2,4 @@ pywidevine
|
||||
requests
|
||||
httpx
|
||||
pyyaml
|
||||
inquirer
|
758
tpd-keys.py
758
tpd-keys.py
@ -1,38 +1,26 @@
|
||||
import base64
|
||||
import hmac
|
||||
import hashlib
|
||||
import sys
|
||||
import time
|
||||
# Import needed libraries
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
import requests
|
||||
import json
|
||||
import httpx
|
||||
import sqlite3
|
||||
import License_cURL
|
||||
import os
|
||||
from os import urandom
|
||||
import glob
|
||||
import inquirer
|
||||
import uuid
|
||||
import random
|
||||
import re
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
from urllib.parse import urlparse
|
||||
else:
|
||||
from urlparse import urlparse
|
||||
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.pssh import PSSH
|
||||
from base64 import b64encode
|
||||
import re
|
||||
import requests
|
||||
import json
|
||||
import random
|
||||
import uuid
|
||||
import httpx
|
||||
import sqlite3
|
||||
import DRMHeaders
|
||||
from requests.utils import dict_from_cookiejar
|
||||
import http
|
||||
from os import urandom
|
||||
import base64
|
||||
|
||||
MyWVD = "/PATH/TO/WVD.wvd"
|
||||
|
||||
dbconnection = sqlite3.connect("database.db")
|
||||
dbcursor = dbconnection.cursor()
|
||||
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
|
||||
dbconnection.close()
|
||||
# Hola proxy
|
||||
|
||||
class Settings:
|
||||
def __init__(self, userCountry: str = None, randomProxy: bool = False) -> None:
|
||||
@ -86,9 +74,8 @@ class Engine:
|
||||
).json()["key"]
|
||||
|
||||
def zgettunnels(
|
||||
self, session_key: str, country: str, timeout: float = 10.0
|
||||
self, session_key: str, country: str, timeout: float = 10.0
|
||||
) -> json:
|
||||
|
||||
qs = {
|
||||
"country": country.lower(),
|
||||
"limit": 1,
|
||||
@ -115,559 +102,342 @@ class Hola:
|
||||
self.settings.userCountry = httpx.get(self.myipUri).json()["country"]
|
||||
|
||||
if (
|
||||
not self.settings.userCountry in self.settings.zoneAvailable
|
||||
or self.settings.randomProxy
|
||||
not self.settings.userCountry in self.settings.zoneAvailable
|
||||
or self.settings.randomProxy
|
||||
):
|
||||
self.settings.userCountry = random.choice(self.settings.zoneAvailable)
|
||||
|
||||
return self.settings.userCountry
|
||||
|
||||
def cache_key(cpssh: str, ckeys: str):
|
||||
dbconnection = sqlite3.connect("database.db")
|
||||
dbcursor = dbconnection.cursor()
|
||||
dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (cpssh, ckeys))
|
||||
dbconnection.commit()
|
||||
dbconnection.close()
|
||||
|
||||
|
||||
def init_proxy(data):
|
||||
settings = Settings(
|
||||
data["zone"]
|
||||
) # True if you want random proxy each request / "DE" for a proxy with region of your choice (German here) / False if you wish to have a proxy localized to your IP address
|
||||
)
|
||||
settings.port_type_choice = data[
|
||||
"port"
|
||||
] # direct return datacenter ipinfo, peer "residential" (can fail sometime)
|
||||
]
|
||||
|
||||
hola = Hola(settings)
|
||||
engine = Engine(settings)
|
||||
|
||||
userCountry = hola.get_country()
|
||||
session_key = engine.generate_session_key()
|
||||
# time.sleep(10)
|
||||
# time.sleep(10)
|
||||
tunnels = engine.zgettunnels(session_key, userCountry)
|
||||
|
||||
return engine.get_proxy(tunnels)
|
||||
|
||||
def calculate_signature(method, url, headers, payload, timestamp=None):
|
||||
app_id = 'SKYSHOWTIME-ANDROID-v1'
|
||||
signature_key = bytearray('jfj9qGg6aDHaBbFpH6wNEvN6cHuHtZVppHRvBgZs', 'utf-8')
|
||||
sig_version = '1.0'
|
||||
|
||||
if not timestamp:
|
||||
timestamp = int(time.time())
|
||||
# Get current working directory
|
||||
main_directory = os.getcwd()
|
||||
|
||||
if url.startswith('http'):
|
||||
parsed_url = urlparse(url)
|
||||
path = parsed_url.path
|
||||
else:
|
||||
path = url
|
||||
# Check if API key exists, if not create file
|
||||
if not os.path.isfile(f"{main_directory}/api-key.txt"):
|
||||
with open(f'{main_directory}/api-key.txt', 'w') as api_key:
|
||||
print(f"\nIf you have an API key please place it in api-key.txt")
|
||||
api_key.write("Delete this and place your API key on this line")
|
||||
|
||||
#print('path: {}'.format(path))
|
||||
# Check if API key exists
|
||||
with open(f'{main_directory}/api-key.txt') as api_key:
|
||||
api_key = api_key.readline()
|
||||
if api_key == "Delete this and place your API key on this line":
|
||||
print(f"\nNo API key found!\n")
|
||||
api_key = ""
|
||||
|
||||
text_headers = ''
|
||||
for key in sorted(headers.keys()):
|
||||
if key.lower().startswith('x-skyott'):
|
||||
text_headers += key + ': ' + headers[key] + '\n'
|
||||
#print(text_headers)
|
||||
headers_md5 = hashlib.md5(text_headers.encode()).hexdigest()
|
||||
#print(headers_md5)
|
||||
|
||||
if sys.version_info[0] > 2 and isinstance(payload, str):
|
||||
payload = payload.encode('utf-8')
|
||||
payload_md5 = hashlib.md5(payload).hexdigest()
|
||||
|
||||
to_hash = ('{method}\n{path}\n{response_code}\n{app_id}\n{version}\n{headers_md5}\n'
|
||||
'{timestamp}\n{payload_md5}\n').format(method=method, path=path,
|
||||
response_code='', app_id=app_id, version=sig_version,
|
||||
headers_md5=headers_md5, timestamp=timestamp, payload_md5=payload_md5)
|
||||
#print(to_hash)
|
||||
|
||||
hashed = hmac.new(signature_key, to_hash.encode('utf8'), hashlib.sha1).digest()
|
||||
signature = base64.b64encode(hashed).decode('utf8')
|
||||
|
||||
return 'SkyOTT client="{}",signature="{}",timestamp="{}",version="{}"'.format(app_id, signature, timestamp, sig_version)
|
||||
# Create database and table for local key caching if they don't exist
|
||||
if not os.path.isfile(f"{main_directory}/database.db"):
|
||||
dbconnection = sqlite3.connect("database.db")
|
||||
dbcursor = dbconnection.cursor()
|
||||
dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
|
||||
dbconnection.close()
|
||||
|
||||
|
||||
allowed_countries = [
|
||||
"AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
|
||||
"FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
|
||||
"NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"
|
||||
]
|
||||
# Define key cache function
|
||||
|
||||
print("Welcome to TPD-Keys! \n")
|
||||
print("[Generic Services]")
|
||||
print("1. Generic without any headers")
|
||||
print("2. Generic with generic headers")
|
||||
print("3. Generic with headers from DRMHeaders.py")
|
||||
print("4. JSON Widevine challenge, headers from DRMHeaders.py \n")
|
||||
print("[Specific Services]")
|
||||
print("5. Canal+ Live TV")
|
||||
print("6. YouTube VOD")
|
||||
print("7. VDOCipher")
|
||||
print("8. SkyShowTime\n")
|
||||
selection = int(input("Please choose a service: "))
|
||||
|
||||
if selection == 1:
|
||||
print("")
|
||||
print("Generic without headers.")
|
||||
ipssh = input("PSSH: ")
|
||||
pssh = PSSH(ipssh)
|
||||
device = Device.load(MyWVD)
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, pssh)
|
||||
ilicurl = input("License URL: ")
|
||||
country_code = input("Proxy? (2 letter country code or N for no): ")
|
||||
if len(country_code) == 2 and country_code.upper() in allowed_countries:
|
||||
proxy = init_proxy({"zone": country_code, "port": "peer"})
|
||||
def key_cache(pssh: str, db_keys: str):
|
||||
dbconnection = sqlite3.connect("database.db")
|
||||
dbcursor = dbconnection.cursor()
|
||||
dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, db_keys))
|
||||
dbconnection.commit()
|
||||
dbconnection.close()
|
||||
|
||||
|
||||
# Making sure a .wvd file exists and using that as the CDM
|
||||
try:
|
||||
cdm = glob.glob(f'{main_directory}/*.wvd')[0]
|
||||
except:
|
||||
cdm = None
|
||||
print(f"Please place a WVD in {main_directory}")
|
||||
print(f"Use option 3 of TPD-Keys and set API key if you do not have your own.")
|
||||
|
||||
|
||||
# Define key retrieval function
|
||||
def retrieve_keys(proxy_used: str = None, headers: list = None,
|
||||
json_data: json = None, device: str = cdm):
|
||||
pssh = input("PSSH: ")
|
||||
licence_url = input("License URL: ")
|
||||
if proxy_used is not None:
|
||||
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
print(f"Using proxy {proxies['http']}")
|
||||
licence = requests.post(ilicurl, proxies=proxies, data=challenge)
|
||||
else:
|
||||
print("Proxy-less request.")
|
||||
licence = requests.post(ilicurl, data=challenge)
|
||||
licence.raise_for_status()
|
||||
cdm.parse_license(session_id, licence.content)
|
||||
fkeys = ""
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
|
||||
cache_key(ipssh, fkeys)
|
||||
print("")
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
|
||||
elif selection == 2:
|
||||
print("")
|
||||
print("Generic with generic headers.")
|
||||
ipssh = input("PSSH: ")
|
||||
pssh = PSSH(ipssh)
|
||||
device = Device.load(MyWVD)
|
||||
proxies = None
|
||||
challenge_pssh = PSSH(pssh)
|
||||
try:
|
||||
device = Device.load(device)
|
||||
except:
|
||||
print(f"Please place a WVD in {main_directory}")
|
||||
exit()
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, pssh)
|
||||
ilicurl = input("License URL: ")
|
||||
country_code = input("Proxy? (2 letter country code or N for no): ")
|
||||
generic_headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/109.0',
|
||||
'Accept': '*/*',
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
challenge = cdm.get_license_challenge(session_id, challenge_pssh)
|
||||
license = requests.post(licence_url, data=challenge, proxies=proxies, headers=headers, json=json_data)
|
||||
license.raise_for_status()
|
||||
cdm.parse_license(session_id, license.content)
|
||||
db_keys = ''
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
db_keys += f'{key.kid.hex}:{key.key.hex()}\n'
|
||||
key_cache(pssh=pssh, db_keys=db_keys)
|
||||
return db_keys
|
||||
|
||||
# Define retrieve keys remotely function
|
||||
|
||||
|
||||
def retrieve_keys_remotely(proxy_used: str = None):
|
||||
api_url = "https://api.cdrm-project.com"
|
||||
api_device = "CDM"
|
||||
pssh = input("PSSH: ")
|
||||
license_url = input("License URL: ")
|
||||
if proxy_used is not None:
|
||||
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
else:
|
||||
proxies = None
|
||||
x_headers = {
|
||||
"X-Secret-Key": api_key
|
||||
}
|
||||
if len(country_code) == 2 and country_code.upper() in allowed_countries:
|
||||
proxy = init_proxy({"zone": country_code, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
print(f"Using proxy {proxies['http']}")
|
||||
licence = requests.post(ilicurl, data=challenge, headers=generic_headers, proxies=proxies)
|
||||
else:
|
||||
print("Proxy-less request.")
|
||||
licence = requests.post(ilicurl, data=challenge, headers=generic_headers)
|
||||
licence.raise_for_status()
|
||||
cdm.parse_license(session_id, licence.content)
|
||||
fkeys = ""
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
|
||||
cache_key(ipssh, fkeys)
|
||||
print("")
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
|
||||
|
||||
elif selection == 3:
|
||||
print("")
|
||||
print("Generic with headers from DRMHeaders.py")
|
||||
ipssh = input("PSSH: ")
|
||||
pssh = PSSH(ipssh)
|
||||
device = Device.load(MyWVD)
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, pssh)
|
||||
ilicurl = input("License URL: ")
|
||||
country_code = input("Proxy? (2 letter country code or N for no): ")
|
||||
if len(country_code) == 2 and country_code.upper() in allowed_countries:
|
||||
proxy = init_proxy({"zone": country_code, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
print(f"Using proxy {proxies['http']}")
|
||||
licence = requests.post(ilicurl, data=challenge, headers=DRMHeaders.headers, proxies=proxies)
|
||||
else:
|
||||
print("Proxy-less request.")
|
||||
licence = requests.post(ilicurl, data=challenge, headers=DRMHeaders.headers)
|
||||
licence.raise_for_status()
|
||||
cdm.parse_license(session_id, licence.content)
|
||||
fkeys = ""
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
|
||||
cache_key(ipssh, fkeys)
|
||||
print("")
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
session_id = open_session.json()["data"]["session_id"]
|
||||
|
||||
elif selection == 4:
|
||||
print("")
|
||||
print("JSON Widevine challenge, headers from DRMHeaders.py")
|
||||
ipssh = input("PSSH: ")
|
||||
pssh = PSSH(ipssh)
|
||||
device = Device.load(MyWVD)
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, pssh)
|
||||
request = b64encode(challenge)
|
||||
ilicurl = input("License URL: ")
|
||||
pid = input("releasePid: ")
|
||||
country_code = input("Proxy? (2 letter country code or N for no): ")
|
||||
if len(country_code) == 2 and country_code.upper() in allowed_countries:
|
||||
proxy = init_proxy({"zone": country_code, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
print(f"Using proxy {proxies['http']}")
|
||||
licence = requests.post(ilicurl, headers=DRMHeaders.headers, proxies=proxies, json={
|
||||
"getRawWidevineLicense":
|
||||
{
|
||||
'releasePid': pid,
|
||||
'widevineChallenge': str(request, "utf-8")
|
||||
}
|
||||
})
|
||||
else:
|
||||
print("Proxy-less request.")
|
||||
licence = requests.post(ilicurl, headers=DRMHeaders.headers, json={
|
||||
"getRawWidevineLicense":
|
||||
{
|
||||
'releasePid': pid,
|
||||
'widevineChallenge': str(request, "utf-8")
|
||||
}
|
||||
})
|
||||
licence.raise_for_status()
|
||||
cdm.parse_license(session_id, licence.content)
|
||||
fkeys = ""
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
|
||||
cache_key(ipssh, fkeys)
|
||||
print("")
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
license_challenge_json_data = {
|
||||
"session_id": session_id,
|
||||
"init_data": pssh
|
||||
}
|
||||
|
||||
elif selection == 5:
|
||||
print("")
|
||||
print("Canal+ Live TV")
|
||||
ipssh = input("PSSH: ")
|
||||
channel = input("Channel: ")
|
||||
live_token = input("Live Token: ")
|
||||
pssh = PSSH(ipssh)
|
||||
device = Device.load(MyWVD)
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, pssh)
|
||||
request = b64encode(challenge)
|
||||
ilicurl = "https://secure-browser.canalplus-bo.net/WebPortal/ottlivetv/api/V4/zones/cpfra/devices/31/apps/1/jobs/GetLicence"
|
||||
country_code = input("Proxy? (2 letter country code or N for no): ")
|
||||
if len(country_code) == 2 and country_code.upper() in allowed_countries:
|
||||
proxy = init_proxy({"zone": country_code, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
print(f"Using proxy {proxies['http']}")
|
||||
licence = requests.post(ilicurl, headers=DRMHeaders.headers, proxies=proxies, json={
|
||||
'ServiceRequest': {
|
||||
'InData': {
|
||||
'EpgId': channel,
|
||||
'LiveToken': live_token,
|
||||
'UserKeyId': '_sdivii9vz',
|
||||
'DeviceKeyId': '1676391356366-a3a5a7d663de',
|
||||
'ChallengeInfo': f'{base64.b64encode(challenge).decode()}',
|
||||
'Mode': 'MKPL',
|
||||
},
|
||||
},
|
||||
})
|
||||
else:
|
||||
print("Proxy-less request.")
|
||||
licence = requests.post(ilicurl, headers=DRMHeaders.headers, json={
|
||||
'ServiceRequest': {
|
||||
'InData': {
|
||||
'EpgId': channel,
|
||||
'LiveToken': live_token,
|
||||
'UserKeyId': '_jprs988fy',
|
||||
'DeviceKeyId': '1678334845207-61e4e804264c',
|
||||
'ChallengeInfo': f'{base64.b64encode(challenge).decode()}',
|
||||
'Mode': 'MKPL',
|
||||
},
|
||||
},
|
||||
})
|
||||
licence.raise_for_status()
|
||||
cdm.parse_license(session_id, licence.json()["ServiceResponse"]["OutData"]["LicenseInfo"])
|
||||
fkeys = ""
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
|
||||
cache_key(ipssh, fkeys)
|
||||
print("")
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
|
||||
json=license_challenge_json_data)
|
||||
|
||||
elif selection == 6:
|
||||
print("")
|
||||
print("YouTube")
|
||||
ipssh = "AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY="
|
||||
ilicurl = input("License URL: ")
|
||||
pssh = PSSH(ipssh)
|
||||
device = Device.load(MyWVD)
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, pssh)
|
||||
country_code = input("Proxy? (2 letter country code or N for no): ")
|
||||
json_data = DRMHeaders.json_data
|
||||
json_data["licenseRequest"] = base64.b64encode(challenge).decode("utf-8")
|
||||
if len(country_code) == 2 and country_code.upper() in allowed_countries:
|
||||
proxy = init_proxy({"zone": country_code, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
print(f"Using proxy {proxies['http']}")
|
||||
licence = requests.post(ilicurl, cookies=DRMHeaders.cookies, headers=DRMHeaders.headers, proxies=proxies, json=json_data)
|
||||
else:
|
||||
print("Proxy-less request.")
|
||||
licence = requests.post(ilicurl, cookies=DRMHeaders.cookies, headers=DRMHeaders.headers, json=json_data)
|
||||
license_message = licence_challenge.json()["data"]["challenge_b64"]
|
||||
|
||||
licence.raise_for_status()
|
||||
cdm.parse_license(session_id, licence.json()["license"].replace("-", "+").replace("_", "/"))
|
||||
fkeys = ""
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
|
||||
cache_key(ipssh, fkeys)
|
||||
print("")
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
license = requests.post(
|
||||
headers=License_cURL.headers,
|
||||
proxies=proxies,
|
||||
url=license_url,
|
||||
data=base64.b64decode(license_message)
|
||||
)
|
||||
|
||||
elif selection == 7:
|
||||
parse_license_json_data = {
|
||||
"session_id": session_id,
|
||||
"license_message": f"{base64.b64encode(license.content).decode()}"
|
||||
}
|
||||
|
||||
url = input("\nEnter the URL: ")
|
||||
requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
|
||||
headers=x_headers)
|
||||
|
||||
headers = {
|
||||
get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
|
||||
json={"session_id": session_id}, headers=x_headers)
|
||||
db_keys = ''
|
||||
for key in get_keys.json()["data"]["keys"]:
|
||||
if not key["type"] == "SIGNING":
|
||||
db_keys += f"{key['key_id']}:{key['key']}\n"
|
||||
key_cache(pssh=pssh, db_keys=db_keys)
|
||||
|
||||
requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
|
||||
|
||||
return db_keys
|
||||
|
||||
|
||||
# Define retrieve keys remotely VDOCipher function
|
||||
def retrieve_keys_remotely_vdocipher(proxy_used: str = None):
|
||||
|
||||
# Get URL from function
|
||||
url = input(f"Video URL: ")
|
||||
|
||||
# Set the VDOCipher token headers
|
||||
token_headers = {
|
||||
'accept': '*/*',
|
||||
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
|
||||
## Comment this line out if using for anything other than https://www.vdocipher.com/blog/2014/12/add-text-to-videos-with-watermark/
|
||||
'Origin': f"https://{urandom(8).hex()}.com",
|
||||
}
|
||||
|
||||
response = requests.get(url, cookies=DRMHeaders.cookies, headers=headers)
|
||||
|
||||
# Set the token response
|
||||
token_response = requests.get(url, cookies=License_cURL.cookies, headers=token_headers)
|
||||
try:
|
||||
otp_match = re.findall(r"otp: '(.*)',", response.text)[0]
|
||||
playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", response.text)[0]
|
||||
otp_match = re.findall(r"otp: '(.*)',", token_response.text)[0]
|
||||
playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", token_response.text)[0]
|
||||
except IndexError:
|
||||
try:
|
||||
otp_match = re.findall(r"otp=(.*)&", response.text)[0]
|
||||
playbackinfo_match = re.findall(r"playbackInfo=(.*)", response.text)[0]
|
||||
otp_match = re.findall(r"otp=(.*)&", token_response.text)[0]
|
||||
playbackinfo_match = re.findall(r"playbackInfo=(.*)", token_response.text)[0]
|
||||
except IndexError:
|
||||
print("\nAn error occured while getting otp/playback")
|
||||
exit()
|
||||
|
||||
# Set the video ID
|
||||
video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"]
|
||||
|
||||
response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=headers)
|
||||
|
||||
# Set new token response (1)
|
||||
token_response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=token_headers)
|
||||
try:
|
||||
lic_url = response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
|
||||
mpd = response.json()["dash"]["manifest"]
|
||||
license_url = token_response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
|
||||
mpd = token_response.json()["dash"]["manifest"]
|
||||
except KeyError:
|
||||
print("\n An error occured while getting mpd/license url")
|
||||
|
||||
response = requests.get(mpd, headers=headers)
|
||||
# Set new token response (2)
|
||||
token_response = requests.get(mpd, headers=token_headers)
|
||||
|
||||
pssh = re.search(r"<cenc:pssh>(.*)</cenc:pssh>", response.text).group(1)
|
||||
device = Device.load(MyWVD)
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, PSSH(pssh))
|
||||
# Set API URL
|
||||
api_url = "https://api.cdrm-project.com"
|
||||
|
||||
# Set API Device
|
||||
api_device = "CDM"
|
||||
|
||||
# Retrieve PSSH
|
||||
pssh = re.search(r"<cenc:pssh>(.*)</cenc:pssh>", token_response.text).group(1)
|
||||
|
||||
# Check if proxy was used
|
||||
if proxy_used is not None:
|
||||
proxy = init_proxy({"zone": proxy_used, "port": "peer"})
|
||||
proxies = {
|
||||
"http": proxy
|
||||
}
|
||||
else:
|
||||
proxies = None
|
||||
|
||||
# Set API headers
|
||||
x_headers = {
|
||||
"X-Secret-Key": api_key
|
||||
}
|
||||
|
||||
# Open API session
|
||||
open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
|
||||
|
||||
# Set the session ID
|
||||
session_id = open_session.json()["data"]["session_id"]
|
||||
|
||||
# Send json data to get license challenge
|
||||
license_challenge_json_data = {
|
||||
"session_id": session_id,
|
||||
"init_data": pssh
|
||||
}
|
||||
|
||||
# Get the license challenge from PSSH
|
||||
licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
|
||||
json=license_challenge_json_data)
|
||||
|
||||
# Set the final token
|
||||
token = {
|
||||
"otp":otp_match,
|
||||
"playbackInfo":playbackinfo_match,
|
||||
"href":url,
|
||||
"tech":"wv",
|
||||
"licenseRequest":f"{base64.b64encode(challenge).decode()}"
|
||||
"licenseRequest":licence_challenge.json()["data"]["challenge_b64"]
|
||||
}
|
||||
|
||||
json_data = {
|
||||
'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}',
|
||||
# Send challenge
|
||||
license = requests.post(
|
||||
proxies=proxies,
|
||||
url=license_url,
|
||||
json={'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}'}
|
||||
)
|
||||
|
||||
# Set the parsing JSON data
|
||||
parse_license_json_data = {
|
||||
"session_id": session_id,
|
||||
"license_message": license.json()["license"]
|
||||
}
|
||||
|
||||
response = requests.post(lic_url, headers=headers, json=json_data)
|
||||
# Send the parsing JSON data
|
||||
requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
|
||||
headers=x_headers)
|
||||
|
||||
try:
|
||||
lic_b64 = response.json()["license"]
|
||||
except KeyError:
|
||||
print(f'An error occured while getting license: {response.json()["message"]}')
|
||||
exit()
|
||||
# Get the keys
|
||||
get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
|
||||
json={"session_id": session_id}, headers=x_headers)
|
||||
|
||||
cdm.parse_license(session_id, lic_b64)
|
||||
fkeys = ""
|
||||
# Cache the keys
|
||||
db_keys = ''
|
||||
for key in get_keys.json()["data"]["keys"]:
|
||||
if not key["type"] == "SIGNING":
|
||||
db_keys += f"{key['key_id']}:{key['key']}\n"
|
||||
key_cache(pssh=pssh, db_keys=db_keys)
|
||||
|
||||
print(f"\nMPD Link:")
|
||||
print(f"{mpd}\n")
|
||||
# Close the session
|
||||
requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
|
||||
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
|
||||
cache_key(pssh, fkeys)
|
||||
print("")
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
# Return the keys
|
||||
return db_keys
|
||||
|
||||
elif selection == 8:
|
||||
print("")
|
||||
print("SkyShowTime")
|
||||
|
||||
token_url = 'https://ovp.skyshowtime.com/auth/tokens'
|
||||
vod_url= 'https://ovp.skyshowtime.com/video/playouts/vod'
|
||||
|
||||
cookies = DRMHeaders.cookies
|
||||
region = cookies['activeTerritory']
|
||||
|
||||
#Getting tokens
|
||||
headers = {
|
||||
'accept': 'application/vnd.tokens.v1+json',
|
||||
'content-type': 'application/vnd.tokens.v1+json',
|
||||
}
|
||||
|
||||
post_data = {
|
||||
"auth": {
|
||||
"authScheme": 'MESSO',
|
||||
"authIssuer": 'NOWTV',
|
||||
"provider": 'SKYSHOWTIME',
|
||||
"providerTerritory": region,
|
||||
"proposition": 'SKYSHOWTIME',
|
||||
},
|
||||
"device": {
|
||||
"type": 'MOBILE',
|
||||
"platform": 'ANDROID',
|
||||
"id": 'Z-sKxKApSe7c3dAMGAYtVU8NmWKDcWrCKobKpnVTLqc', #Value seems to be irrelavant
|
||||
"drmDeviceId": 'UNKNOWN'
|
||||
}
|
||||
}
|
||||
post_data = json.dumps(post_data)
|
||||
headers['x-sky-signature'] = calculate_signature('POST', token_url, headers, post_data)
|
||||
userToken = json.loads(requests.post(token_url, cookies=cookies, headers=headers, data=post_data).content)['userToken']
|
||||
del headers
|
||||
del post_data
|
||||
# Defining service prompt function
|
||||
|
||||
|
||||
#Getting license and manifest url
|
||||
video_url = input("Enter SkyShowTime video url: ")
|
||||
content_id = video_url.split("/")[6]
|
||||
provider_variant_id = video_url.split("/")[7][:36]
|
||||
def service_prompt():
|
||||
service_prompt = [
|
||||
inquirer.List('Service',
|
||||
message="Please choose a service",
|
||||
choices=['Generic', 'Generic with headers from Licence cURL', 'Remote', 'Remote VDOCipher'],
|
||||
),
|
||||
]
|
||||
service_selected = inquirer.prompt(service_prompt)
|
||||
|
||||
post_data = {
|
||||
"providerVariantId": provider_variant_id,
|
||||
"device": {
|
||||
"capabilities": [
|
||||
{
|
||||
"transport": "DASH",
|
||||
"protection": "NONE",
|
||||
"vcodec": "H265",
|
||||
"acodec": "AAC",
|
||||
"container": "ISOBMFF"
|
||||
},
|
||||
{
|
||||
"transport": "DASH",
|
||||
"protection": "WIDEVINE",
|
||||
"vcodec": "H265",
|
||||
"acodec": "AAC",
|
||||
"container": "ISOBMFF"
|
||||
},
|
||||
{
|
||||
"transport": "DASH",
|
||||
"protection": "NONE",
|
||||
"vcodec": "H264",
|
||||
"acodec": "AAC",
|
||||
"container": "ISOBMFF"
|
||||
},
|
||||
{
|
||||
"transport": "DASH",
|
||||
"protection": "WIDEVINE",
|
||||
"vcodec": "H264",
|
||||
"acodec": "AAC",
|
||||
"container": "ISOBMFF"
|
||||
}
|
||||
],
|
||||
"model": "SM-N986B",
|
||||
"maxVideoFormat": "HD",
|
||||
"hdcpEnabled": 'false',
|
||||
"supportedColourSpaces": [
|
||||
"SDR"
|
||||
proxy_needed_prompt = [
|
||||
inquirer.List('Proxy',
|
||||
message="Will you need a proxy?",
|
||||
choices=['Yes', 'No'],
|
||||
),
|
||||
]
|
||||
|
||||
proxy_needed = inquirer.prompt(proxy_needed_prompt)
|
||||
if proxy_needed["Proxy"] == "Yes":
|
||||
allowed_countries = [
|
||||
"AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
|
||||
"FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
|
||||
"NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"
|
||||
]
|
||||
},
|
||||
"client": {
|
||||
"thirdParties": [
|
||||
"COMSCORE",
|
||||
"CONVIVA",
|
||||
"FREEWHEEL"
|
||||
proxy_available = [
|
||||
inquirer.List('Proxys available',
|
||||
message="Please choose a country",
|
||||
choices=allowed_countries
|
||||
),
|
||||
]
|
||||
},
|
||||
"personaParentalControlRating": 9
|
||||
}
|
||||
post_data = json.dumps(post_data)
|
||||
|
||||
headers = {
|
||||
'accept': 'application/vnd.playvod.v1+json',
|
||||
'content-type': 'application/vnd.playvod.v1+json',
|
||||
'x-skyott-activeterritory': region,
|
||||
'x-skyott-agent': 'skyshowtime.mobile.android',
|
||||
'x-skyott-country': region,
|
||||
'x-skyott-device': 'MOBILE',
|
||||
'x-skyott-platform': 'ANDROID',
|
||||
'x-skyott-proposition': 'SKYSHOWTIME',
|
||||
'x-skyott-provider': 'SKYSHOWTIME',
|
||||
'x-skyott-territory': region,
|
||||
'x-skyott-usertoken': userToken,
|
||||
}
|
||||
headers['x-sky-signature'] = calculate_signature('POST', vod_url, headers, post_data)
|
||||
|
||||
vod_request = json.loads(requests.post(vod_url, headers=headers, data=post_data).content)
|
||||
|
||||
license_url = vod_request['protection']['licenceAcquisitionUrl']
|
||||
manifest_url = vod_request['asset']['endpoints'][0]['url']
|
||||
|
||||
#Getting pssh
|
||||
|
||||
manifest = requests.get(manifest_url).content
|
||||
pssh = PSSH(BeautifulSoup(manifest, 'html.parser').findAll('cenc:pssh')[1].text)
|
||||
selected_proxy = inquirer.prompt(proxy_available)
|
||||
return service_selected["Service"], selected_proxy["Proxys available"]
|
||||
else:
|
||||
selected_proxy = None
|
||||
return service_selected["Service"], selected_proxy
|
||||
|
||||
|
||||
#CDM processing
|
||||
# Define variables for the service and proxy wanted
|
||||
|
||||
device = Device.load(MyWVD)
|
||||
cdm = Cdm.from_device(device)
|
||||
session_id = cdm.open()
|
||||
challenge = cdm.get_license_challenge(session_id, pssh)
|
||||
|
||||
headers = {
|
||||
'Content-Type':'application/octet-stream',
|
||||
}
|
||||
service_selected, selected_proxy = service_prompt()
|
||||
|
||||
licence = requests.post(license_url, headers=headers, data=challenge)
|
||||
licence.raise_for_status()
|
||||
cdm.parse_license(session_id, licence.content)
|
||||
|
||||
fkeys = ""
|
||||
if service_selected == "Generic":
|
||||
print(f"\n{retrieve_keys(proxy_used=selected_proxy)}")
|
||||
elif service_selected == "Generic with headers from Licence cURL":
|
||||
print(f"\n{retrieve_keys(proxy_used=selected_proxy, headers=License_cURL.headers)}")
|
||||
elif service_selected == "Remote":
|
||||
print(f"\n{retrieve_keys_remotely(proxy_used=selected_proxy)}")
|
||||
elif service_selected == "Remote VDOCipher":
|
||||
print(f"\n{retrieve_keys_remotely_vdocipher(proxy_used=selected_proxy)}")
|
||||
|
||||
print(f"\nMPD Link:")
|
||||
print(f"{manifest_url}\n")
|
||||
|
||||
for key in cdm.get_keys(session_id):
|
||||
if key.type != 'SIGNING':
|
||||
fkeys += key.kid.hex + ":" + key.key.hex()
|
||||
cache_key(str(pssh), fkeys)
|
||||
print(fkeys)
|
||||
cdm.close(session_id)
|
||||
|
||||
else:
|
||||
print("Invalid selection")
|
Loading…
x
Reference in New Issue
Block a user