From 99a75e1671c00c1f679015d11807a5a5172a5cb2 Mon Sep 17 00:00:00 2001
From: TPD94 <>
Date: Fri, 17 Nov 2023 00:44:00 -0500
Subject: [PATCH] Beta branch
---
DRMHeaders.py | 15 -
License_cURL.py | 5 +
README.md | 11 +-
requirements.txt | 3 +-
tpd-keys.py | 782 +++++++++++++++++------------------------------
5 files changed, 286 insertions(+), 530 deletions(-)
delete mode 100644 DRMHeaders.py
create mode 100644 License_cURL.py
diff --git a/DRMHeaders.py b/DRMHeaders.py
deleted file mode 100644
index 6906b45..0000000
--- a/DRMHeaders.py
+++ /dev/null
@@ -1,15 +0,0 @@
-headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0',
- 'Accept': '*/*',
- 'Accept-Language': 'en-US,en;q=0.5',
- # 'Accept-Encoding': 'gzip, deflate, br',
- 'Content-Type': 'application/octet-stream',
- 'DNT': '1',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-site',
- 'Sec-GPC': '1',
- 'Connection': 'keep-alive',
- # Requests doesn't support trailers
- # 'TE': 'trailers',
-}
\ No newline at end of file
diff --git a/License_cURL.py b/License_cURL.py
new file mode 100644
index 0000000..cebd5a8
--- /dev/null
+++ b/License_cURL.py
@@ -0,0 +1,5 @@
+headers = {
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0',
+ 'Accept': '*/*',
+ 'Accept-Language': 'en-US,en;q=0.5',
+}
\ No newline at end of file
diff --git a/README.md b/README.md
index 20ea8b7..929adb7 100644
--- a/README.md
+++ b/README.md
@@ -6,20 +6,15 @@
How to use:
1. Create `TPD-Keys` folder.
-2. Download and extract `tpd-keys.py`, `requirements.txt` and `DRMHeaders.py` into the newly created `TPD-Keys` directory
+2. Download and extract `tpd-keys.py`, `requirements.txt` and `License_cURL.py` into the newly created `TPD-Keys` directory
3. Install the requirements with `pip install -r requirements.txt`
4. Crete a WVD with pywidevine; `pywidevine create-device -k "/PATH/TO/device_private_key" -c "/PATH/TO/device_client_id_blob" -t "ANDROID" -l 3`
-5. Replace `MyWVD= "/PATH/TO/WVD.wvd"` with the path to your `.wvd` on line 15 of tpd-keys.py
+5. Place your .wvd in the root of `TPD-Keys` directory
-> For instance
-> `MyWVD = "C:\Users\TPD94\Desktop\AndroidDeivce.wvd"`
-> or if it is located in the same folder
-> `MyWVD = "AndroidDeivce.wvd"`
-
-6. Paste any needed headers into `DRMHeaders.py`
+6. Paste any needed headers into `License_cURL.py`
7. Run with `python tpd-keys.py`
diff --git a/requirements.txt b/requirements.txt
index e32acbd..70e8eca 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,5 @@
pywidevine
requests
httpx
-pyyaml
\ No newline at end of file
+pyyaml
+inquirer
\ No newline at end of file
diff --git a/tpd-keys.py b/tpd-keys.py
index e4d8879..ec1967f 100644
--- a/tpd-keys.py
+++ b/tpd-keys.py
@@ -1,38 +1,26 @@
-import base64
-import hmac
-import hashlib
-import sys
-import time
+# Import needed libraries
-from bs4 import BeautifulSoup
+import requests
+import json
+import httpx
+import sqlite3
+import License_cURL
+import os
+from os import urandom
+import glob
+import inquirer
+import uuid
+import random
+import re
-if sys.version_info[0] >= 3:
- from urllib.parse import urlparse
-else:
- from urlparse import urlparse
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.pssh import PSSH
-from base64 import b64encode
-import re
-import requests
-import json
-import random
-import uuid
-import httpx
-import sqlite3
-import DRMHeaders
-from requests.utils import dict_from_cookiejar
-import http
-from os import urandom
+import base64
-MyWVD = "/PATH/TO/WVD.wvd"
-dbconnection = sqlite3.connect("database.db")
-dbcursor = dbconnection.cursor()
-dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
-dbconnection.close()
+# Hola proxy
class Settings:
def __init__(self, userCountry: str = None, randomProxy: bool = False) -> None:
@@ -86,9 +74,8 @@ class Engine:
).json()["key"]
def zgettunnels(
- self, session_key: str, country: str, timeout: float = 10.0
+ self, session_key: str, country: str, timeout: float = 10.0
) -> json:
-
qs = {
"country": country.lower(),
"limit": 1,
@@ -115,559 +102,342 @@ class Hola:
self.settings.userCountry = httpx.get(self.myipUri).json()["country"]
if (
- not self.settings.userCountry in self.settings.zoneAvailable
- or self.settings.randomProxy
+ not self.settings.userCountry in self.settings.zoneAvailable
+ or self.settings.randomProxy
):
self.settings.userCountry = random.choice(self.settings.zoneAvailable)
return self.settings.userCountry
-def cache_key(cpssh: str, ckeys: str):
- dbconnection = sqlite3.connect("database.db")
- dbcursor = dbconnection.cursor()
- dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (cpssh, ckeys))
- dbconnection.commit()
- dbconnection.close()
-
def init_proxy(data):
settings = Settings(
data["zone"]
- ) # True if you want random proxy each request / "DE" for a proxy with region of your choice (German here) / False if you wish to have a proxy localized to your IP address
+ )
settings.port_type_choice = data[
"port"
- ] # direct return datacenter ipinfo, peer "residential" (can fail sometime)
+ ]
hola = Hola(settings)
engine = Engine(settings)
userCountry = hola.get_country()
session_key = engine.generate_session_key()
-# time.sleep(10)
+ # time.sleep(10)
tunnels = engine.zgettunnels(session_key, userCountry)
return engine.get_proxy(tunnels)
-
-def calculate_signature(method, url, headers, payload, timestamp=None):
- app_id = 'SKYSHOWTIME-ANDROID-v1'
- signature_key = bytearray('jfj9qGg6aDHaBbFpH6wNEvN6cHuHtZVppHRvBgZs', 'utf-8')
- sig_version = '1.0'
-
- if not timestamp:
- timestamp = int(time.time())
-
- if url.startswith('http'):
- parsed_url = urlparse(url)
- path = parsed_url.path
- else:
- path = url
-
- #print('path: {}'.format(path))
-
- text_headers = ''
- for key in sorted(headers.keys()):
- if key.lower().startswith('x-skyott'):
- text_headers += key + ': ' + headers[key] + '\n'
- #print(text_headers)
- headers_md5 = hashlib.md5(text_headers.encode()).hexdigest()
- #print(headers_md5)
-
- if sys.version_info[0] > 2 and isinstance(payload, str):
- payload = payload.encode('utf-8')
- payload_md5 = hashlib.md5(payload).hexdigest()
-
- to_hash = ('{method}\n{path}\n{response_code}\n{app_id}\n{version}\n{headers_md5}\n'
- '{timestamp}\n{payload_md5}\n').format(method=method, path=path,
- response_code='', app_id=app_id, version=sig_version,
- headers_md5=headers_md5, timestamp=timestamp, payload_md5=payload_md5)
- #print(to_hash)
-
- hashed = hmac.new(signature_key, to_hash.encode('utf8'), hashlib.sha1).digest()
- signature = base64.b64encode(hashed).decode('utf8')
-
- return 'SkyOTT client="{}",signature="{}",timestamp="{}",version="{}"'.format(app_id, signature, timestamp, sig_version)
-allowed_countries = [
- "AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
- "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
- "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"
- ]
+# Get current working directory
+main_directory = os.getcwd()
-print("Welcome to TPD-Keys! \n")
-print("[Generic Services]")
-print("1. Generic without any headers")
-print("2. Generic with generic headers")
-print("3. Generic with headers from DRMHeaders.py")
-print("4. JSON Widevine challenge, headers from DRMHeaders.py \n")
-print("[Specific Services]")
-print("5. Canal+ Live TV")
-print("6. YouTube VOD")
-print("7. VDOCipher")
-print("8. SkyShowTime\n")
-selection = int(input("Please choose a service: "))
+# Check if API key exists, if not create file
+if not os.path.isfile(f"{main_directory}/api-key.txt"):
+ with open(f'{main_directory}/api-key.txt', 'w') as api_key:
+ print(f"\nIf you have an API key please place it in api-key.txt")
+ api_key.write("Delete this and place your API key on this line")
-if selection == 1:
- print("")
- print("Generic without headers.")
- ipssh = input("PSSH: ")
- pssh = PSSH(ipssh)
- device = Device.load(MyWVD)
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, pssh)
- ilicurl = input("License URL: ")
- country_code = input("Proxy? (2 letter country code or N for no): ")
- if len(country_code) == 2 and country_code.upper() in allowed_countries:
- proxy = init_proxy({"zone": country_code, "port": "peer"})
+# Check if API key exists
+with open(f'{main_directory}/api-key.txt') as api_key:
+ api_key = api_key.readline()
+ if api_key == "Delete this and place your API key on this line":
+ print(f"\nNo API key found!\n")
+ api_key = ""
+
+# Create database and table for local key caching if they don't exist
+if not os.path.isfile(f"{main_directory}/database.db"):
+ dbconnection = sqlite3.connect("database.db")
+ dbcursor = dbconnection.cursor()
+ dbcursor.execute('CREATE TABLE IF NOT EXISTS "DATABASE" ( "pssh" TEXT, "keys" TEXT, PRIMARY KEY("pssh") )')
+ dbconnection.close()
+
+
+# Define key cache function
+
+
+def key_cache(pssh: str, db_keys: str):
+ dbconnection = sqlite3.connect("database.db")
+ dbcursor = dbconnection.cursor()
+ dbcursor.execute("INSERT or REPLACE INTO database VALUES (?, ?)", (pssh, db_keys))
+ dbconnection.commit()
+ dbconnection.close()
+
+
+# Making sure a .wvd file exists and using that as the CDM
+try:
+ cdm = glob.glob(f'{main_directory}/*.wvd')[0]
+except:
+ cdm = None
+ print(f"Please place a WVD in {main_directory}")
+ print(f"Use option 3 of TPD-Keys and set API key if you do not have your own.")
+
+
+# Define key retrieval function
+def retrieve_keys(proxy_used: str = None, headers: list = None,
+ json_data: json = None, device: str = cdm):
+ pssh = input("PSSH: ")
+ licence_url = input("License URL: ")
+ if proxy_used is not None:
+ proxy = init_proxy({"zone": proxy_used, "port": "peer"})
proxies = {
"http": proxy
}
- print(f"Using proxy {proxies['http']}")
- licence = requests.post(ilicurl, proxies=proxies, data=challenge)
else:
- print("Proxy-less request.")
- licence = requests.post(ilicurl, data=challenge)
- licence.raise_for_status()
- cdm.parse_license(session_id, licence.content)
- fkeys = ""
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
- cache_key(ipssh, fkeys)
- print("")
- print(fkeys)
- cdm.close(session_id)
-
-elif selection == 2:
- print("")
- print("Generic with generic headers.")
- ipssh = input("PSSH: ")
- pssh = PSSH(ipssh)
- device = Device.load(MyWVD)
+ proxies = None
+ challenge_pssh = PSSH(pssh)
+ try:
+ device = Device.load(device)
+ except:
+ print(f"Please place a WVD in {main_directory}")
+ exit()
cdm = Cdm.from_device(device)
session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, pssh)
- ilicurl = input("License URL: ")
- country_code = input("Proxy? (2 letter country code or N for no): ")
- generic_headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/109.0',
- 'Accept': '*/*',
- 'Accept-Language': 'en-US,en;q=0.5',
+ challenge = cdm.get_license_challenge(session_id, challenge_pssh)
+ license = requests.post(licence_url, data=challenge, proxies=proxies, headers=headers, json=json_data)
+ license.raise_for_status()
+ cdm.parse_license(session_id, license.content)
+ db_keys = ''
+ for key in cdm.get_keys(session_id):
+ if key.type != 'SIGNING':
+ db_keys += f'{key.kid.hex}:{key.key.hex()}\n'
+ key_cache(pssh=pssh, db_keys=db_keys)
+ return db_keys
+
+# Define retrieve keys remotely function
+
+
+def retrieve_keys_remotely(proxy_used: str = None):
+ api_url = "https://api.cdrm-project.com"
+ api_device = "CDM"
+ pssh = input("PSSH: ")
+ license_url = input("License URL: ")
+ if proxy_used is not None:
+ proxy = init_proxy({"zone": proxy_used, "port": "peer"})
+ proxies = {
+ "http": proxy
+ }
+ else:
+ proxies = None
+ x_headers = {
+ "X-Secret-Key": api_key
}
- if len(country_code) == 2 and country_code.upper() in allowed_countries:
- proxy = init_proxy({"zone": country_code, "port": "peer"})
- proxies = {
- "http": proxy
- }
- print(f"Using proxy {proxies['http']}")
- licence = requests.post(ilicurl, data=challenge, headers=generic_headers, proxies=proxies)
- else:
- print("Proxy-less request.")
- licence = requests.post(ilicurl, data=challenge, headers=generic_headers)
- licence.raise_for_status()
- cdm.parse_license(session_id, licence.content)
- fkeys = ""
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
- cache_key(ipssh, fkeys)
- print("")
- print(fkeys)
- cdm.close(session_id)
+ open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
-elif selection == 3:
- print("")
- print("Generic with headers from DRMHeaders.py")
- ipssh = input("PSSH: ")
- pssh = PSSH(ipssh)
- device = Device.load(MyWVD)
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, pssh)
- ilicurl = input("License URL: ")
- country_code = input("Proxy? (2 letter country code or N for no): ")
- if len(country_code) == 2 and country_code.upper() in allowed_countries:
- proxy = init_proxy({"zone": country_code, "port": "peer"})
- proxies = {
- "http": proxy
- }
- print(f"Using proxy {proxies['http']}")
- licence = requests.post(ilicurl, data=challenge, headers=DRMHeaders.headers, proxies=proxies)
- else:
- print("Proxy-less request.")
- licence = requests.post(ilicurl, data=challenge, headers=DRMHeaders.headers)
- licence.raise_for_status()
- cdm.parse_license(session_id, licence.content)
- fkeys = ""
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
- cache_key(ipssh, fkeys)
- print("")
- print(fkeys)
- cdm.close(session_id)
+ session_id = open_session.json()["data"]["session_id"]
-elif selection == 4:
- print("")
- print("JSON Widevine challenge, headers from DRMHeaders.py")
- ipssh = input("PSSH: ")
- pssh = PSSH(ipssh)
- device = Device.load(MyWVD)
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, pssh)
- request = b64encode(challenge)
- ilicurl = input("License URL: ")
- pid = input("releasePid: ")
- country_code = input("Proxy? (2 letter country code or N for no): ")
- if len(country_code) == 2 and country_code.upper() in allowed_countries:
- proxy = init_proxy({"zone": country_code, "port": "peer"})
- proxies = {
- "http": proxy
- }
- print(f"Using proxy {proxies['http']}")
- licence = requests.post(ilicurl, headers=DRMHeaders.headers, proxies=proxies, json={
- "getRawWidevineLicense":
- {
- 'releasePid': pid,
- 'widevineChallenge': str(request, "utf-8")
- }
- })
- else:
- print("Proxy-less request.")
- licence = requests.post(ilicurl, headers=DRMHeaders.headers, json={
- "getRawWidevineLicense":
- {
- 'releasePid': pid,
- 'widevineChallenge': str(request, "utf-8")
- }
- })
- licence.raise_for_status()
- cdm.parse_license(session_id, licence.content)
- fkeys = ""
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
- cache_key(ipssh, fkeys)
- print("")
- print(fkeys)
- cdm.close(session_id)
+ license_challenge_json_data = {
+ "session_id": session_id,
+ "init_data": pssh
+ }
-elif selection == 5:
- print("")
- print("Canal+ Live TV")
- ipssh = input("PSSH: ")
- channel = input("Channel: ")
- live_token = input("Live Token: ")
- pssh = PSSH(ipssh)
- device = Device.load(MyWVD)
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, pssh)
- request = b64encode(challenge)
- ilicurl = "https://secure-browser.canalplus-bo.net/WebPortal/ottlivetv/api/V4/zones/cpfra/devices/31/apps/1/jobs/GetLicence"
- country_code = input("Proxy? (2 letter country code or N for no): ")
- if len(country_code) == 2 and country_code.upper() in allowed_countries:
- proxy = init_proxy({"zone": country_code, "port": "peer"})
- proxies = {
- "http": proxy
- }
- print(f"Using proxy {proxies['http']}")
- licence = requests.post(ilicurl, headers=DRMHeaders.headers, proxies=proxies, json={
- 'ServiceRequest': {
- 'InData': {
- 'EpgId': channel,
- 'LiveToken': live_token,
- 'UserKeyId': '_sdivii9vz',
- 'DeviceKeyId': '1676391356366-a3a5a7d663de',
- 'ChallengeInfo': f'{base64.b64encode(challenge).decode()}',
- 'Mode': 'MKPL',
- },
- },
- })
- else:
- print("Proxy-less request.")
- licence = requests.post(ilicurl, headers=DRMHeaders.headers, json={
- 'ServiceRequest': {
- 'InData': {
- 'EpgId': channel,
- 'LiveToken': live_token,
- 'UserKeyId': '_jprs988fy',
- 'DeviceKeyId': '1678334845207-61e4e804264c',
- 'ChallengeInfo': f'{base64.b64encode(challenge).decode()}',
- 'Mode': 'MKPL',
- },
- },
- })
- licence.raise_for_status()
- cdm.parse_license(session_id, licence.json()["ServiceResponse"]["OutData"]["LicenseInfo"])
- fkeys = ""
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
- cache_key(ipssh, fkeys)
- print("")
- print(fkeys)
- cdm.close(session_id)
+ licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
+ json=license_challenge_json_data)
-elif selection == 6:
- print("")
- print("YouTube")
- ipssh = "AAAAQXBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAACEiGVlUX01FRElBOjZlMzI4ZWQxYjQ5YmYyMWZI49yVmwY="
- ilicurl = input("License URL: ")
- pssh = PSSH(ipssh)
- device = Device.load(MyWVD)
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, pssh)
- country_code = input("Proxy? (2 letter country code or N for no): ")
- json_data = DRMHeaders.json_data
- json_data["licenseRequest"] = base64.b64encode(challenge).decode("utf-8")
- if len(country_code) == 2 and country_code.upper() in allowed_countries:
- proxy = init_proxy({"zone": country_code, "port": "peer"})
- proxies = {
- "http": proxy
- }
- print(f"Using proxy {proxies['http']}")
- licence = requests.post(ilicurl, cookies=DRMHeaders.cookies, headers=DRMHeaders.headers, proxies=proxies, json=json_data)
- else:
- print("Proxy-less request.")
- licence = requests.post(ilicurl, cookies=DRMHeaders.cookies, headers=DRMHeaders.headers, json=json_data)
+ license_message = licence_challenge.json()["data"]["challenge_b64"]
- licence.raise_for_status()
- cdm.parse_license(session_id, licence.json()["license"].replace("-", "+").replace("_", "/"))
- fkeys = ""
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
- cache_key(ipssh, fkeys)
- print("")
- print(fkeys)
- cdm.close(session_id)
+ license = requests.post(
+ headers=License_cURL.headers,
+ proxies=proxies,
+ url=license_url,
+ data=base64.b64decode(license_message)
+ )
-elif selection == 7:
+ parse_license_json_data = {
+ "session_id": session_id,
+ "license_message": f"{base64.b64encode(license.content).decode()}"
+ }
- url = input("\nEnter the URL: ")
+ requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
+ headers=x_headers)
- headers = {
+ get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
+ json={"session_id": session_id}, headers=x_headers)
+ db_keys = ''
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ db_keys += f"{key['key_id']}:{key['key']}\n"
+ key_cache(pssh=pssh, db_keys=db_keys)
+
+ requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
+
+ return db_keys
+
+
+# Define retrieve keys remotely VDOCipher function
+def retrieve_keys_remotely_vdocipher(proxy_used: str = None):
+
+ # Get URL from function
+ url = input(f"Video URL: ")
+
+ # Set the VDOCipher token headers
+ token_headers = {
'accept': '*/*',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
## Comment this line out if using for anything other than https://www.vdocipher.com/blog/2014/12/add-text-to-videos-with-watermark/
'Origin': f"https://{urandom(8).hex()}.com",
}
- response = requests.get(url, cookies=DRMHeaders.cookies, headers=headers)
-
+ # Set the token response
+ token_response = requests.get(url, cookies=License_cURL.cookies, headers=token_headers)
try:
- otp_match = re.findall(r"otp: '(.*)',", response.text)[0]
- playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", response.text)[0]
+ otp_match = re.findall(r"otp: '(.*)',", token_response.text)[0]
+ playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", token_response.text)[0]
except IndexError:
try:
- otp_match = re.findall(r"otp=(.*)&", response.text)[0]
- playbackinfo_match = re.findall(r"playbackInfo=(.*)", response.text)[0]
+ otp_match = re.findall(r"otp=(.*)&", token_response.text)[0]
+ playbackinfo_match = re.findall(r"playbackInfo=(.*)", token_response.text)[0]
except IndexError:
print("\nAn error occured while getting otp/playback")
exit()
+ # Set the video ID
video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"]
- response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=headers)
-
+ # Set new token response (1)
+ token_response = requests.get(f'https://dev.vdocipher.com/api/meta/{video_id}', headers=token_headers)
try:
- lic_url = response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
- mpd = response.json()["dash"]["manifest"]
+ license_url = token_response.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
+ mpd = token_response.json()["dash"]["manifest"]
except KeyError:
print("\n An error occured while getting mpd/license url")
- response = requests.get(mpd, headers=headers)
+ # Set new token response (2)
+ token_response = requests.get(mpd, headers=token_headers)
- pssh = re.search(r"(.*)", response.text).group(1)
- device = Device.load(MyWVD)
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, PSSH(pssh))
+ # Set API URL
+ api_url = "https://api.cdrm-project.com"
+ # Set API Device
+ api_device = "CDM"
+
+ # Retrieve PSSH
+ pssh = re.search(r"(.*)", token_response.text).group(1)
+
+ # Check if proxy was used
+ if proxy_used is not None:
+ proxy = init_proxy({"zone": proxy_used, "port": "peer"})
+ proxies = {
+ "http": proxy
+ }
+ else:
+ proxies = None
+
+ # Set API headers
+ x_headers = {
+ "X-Secret-Key": api_key
+ }
+
+ # Open API session
+ open_session = requests.get(url=f"{api_url}/{api_device}/open", headers=x_headers)
+
+ # Set the session ID
+ session_id = open_session.json()["data"]["session_id"]
+
+ # Send json data to get license challenge
+ license_challenge_json_data = {
+ "session_id": session_id,
+ "init_data": pssh
+ }
+
+ # Get the license challenge from PSSH
+ licence_challenge = requests.post(url=f"{api_url}/{api_device}/get_license_challenge/AUTOMATIC", headers=x_headers,
+ json=license_challenge_json_data)
+
+ # Set the final token
token = {
"otp":otp_match,
"playbackInfo":playbackinfo_match,
"href":url,
"tech":"wv",
- "licenseRequest":f"{base64.b64encode(challenge).decode()}"
+ "licenseRequest":licence_challenge.json()["data"]["challenge_b64"]
}
- json_data = {
- 'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}',
+ # Send challenge
+ license = requests.post(
+ proxies=proxies,
+ url=license_url,
+ json={'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}'}
+ )
+
+ # Set the parsing JSON data
+ parse_license_json_data = {
+ "session_id": session_id,
+ "license_message": license.json()["license"]
}
- response = requests.post(lic_url, headers=headers, json=json_data)
+ # Send the parsing JSON data
+ requests.post(f"{api_url}/{api_device}/parse_license", json=parse_license_json_data,
+ headers=x_headers)
- try:
- lic_b64 = response.json()["license"]
- except KeyError:
- print(f'An error occured while getting license: {response.json()["message"]}')
- exit()
+ # Get the keys
+ get_keys = requests.post(f"{api_url}/{api_device}/get_keys/ALL",
+ json={"session_id": session_id}, headers=x_headers)
- cdm.parse_license(session_id, lic_b64)
- fkeys = ""
+ # Cache the keys
+ db_keys = ''
+ for key in get_keys.json()["data"]["keys"]:
+ if not key["type"] == "SIGNING":
+ db_keys += f"{key['key_id']}:{key['key']}\n"
+ key_cache(pssh=pssh, db_keys=db_keys)
- print(f"\nMPD Link:")
- print(f"{mpd}\n")
+ # Close the session
+ requests.get(f"{api_url}/{api_device}/close/{session_id}", headers=x_headers)
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex() + "\n"
- cache_key(pssh, fkeys)
- print("")
- print(fkeys)
- cdm.close(session_id)
+ # Return the keys
+ return db_keys
-elif selection == 8:
- print("")
- print("SkyShowTime")
-
- token_url = 'https://ovp.skyshowtime.com/auth/tokens'
- vod_url= 'https://ovp.skyshowtime.com/video/playouts/vod'
-
- cookies = DRMHeaders.cookies
- region = cookies['activeTerritory']
-
- #Getting tokens
- headers = {
- 'accept': 'application/vnd.tokens.v1+json',
- 'content-type': 'application/vnd.tokens.v1+json',
- }
-
- post_data = {
- "auth": {
- "authScheme": 'MESSO',
- "authIssuer": 'NOWTV',
- "provider": 'SKYSHOWTIME',
- "providerTerritory": region,
- "proposition": 'SKYSHOWTIME',
- },
- "device": {
- "type": 'MOBILE',
- "platform": 'ANDROID',
- "id": 'Z-sKxKApSe7c3dAMGAYtVU8NmWKDcWrCKobKpnVTLqc', #Value seems to be irrelavant
- "drmDeviceId": 'UNKNOWN'
- }
- }
- post_data = json.dumps(post_data)
- headers['x-sky-signature'] = calculate_signature('POST', token_url, headers, post_data)
- userToken = json.loads(requests.post(token_url, cookies=cookies, headers=headers, data=post_data).content)['userToken']
- del headers
- del post_data
-
-
- #Getting license and manifest url
- video_url = input("Enter SkyShowTime video url: ")
- content_id = video_url.split("/")[6]
- provider_variant_id = video_url.split("/")[7][:36]
-
- post_data = {
- "providerVariantId": provider_variant_id,
- "device": {
- "capabilities": [
- {
- "transport": "DASH",
- "protection": "NONE",
- "vcodec": "H265",
- "acodec": "AAC",
- "container": "ISOBMFF"
- },
- {
- "transport": "DASH",
- "protection": "WIDEVINE",
- "vcodec": "H265",
- "acodec": "AAC",
- "container": "ISOBMFF"
- },
- {
- "transport": "DASH",
- "protection": "NONE",
- "vcodec": "H264",
- "acodec": "AAC",
- "container": "ISOBMFF"
- },
- {
- "transport": "DASH",
- "protection": "WIDEVINE",
- "vcodec": "H264",
- "acodec": "AAC",
- "container": "ISOBMFF"
- }
- ],
- "model": "SM-N986B",
- "maxVideoFormat": "HD",
- "hdcpEnabled": 'false',
- "supportedColourSpaces": [
- "SDR"
+# Defining service prompt function
+
+
+def service_prompt():
+ service_prompt = [
+ inquirer.List('Service',
+ message="Please choose a service",
+ choices=['Generic', 'Generic with headers from Licence cURL', 'Remote', 'Remote VDOCipher'],
+ ),
+ ]
+ service_selected = inquirer.prompt(service_prompt)
+
+ proxy_needed_prompt = [
+ inquirer.List('Proxy',
+ message="Will you need a proxy?",
+ choices=['Yes', 'No'],
+ ),
+ ]
+
+ proxy_needed = inquirer.prompt(proxy_needed_prompt)
+ if proxy_needed["Proxy"] == "Yes":
+ allowed_countries = [
+ "AR", "AT", "AU", "BE", "BG", "BR", "CA", "CH", "CL", "CO", "CZ", "DE", "DK", "ES", "FI",
+ "FR", "GR", "HK", "HR", "HU", "ID", "IE", "IL", "IN", "IS", "IT", "JP", "KR", "MX", "NL",
+ "NO", "NZ", "PL", "RO", "RU", "SE", "SG", "SK", "TR", "UK", "US", "GB"
]
- },
- "client": {
- "thirdParties": [
- "COMSCORE",
- "CONVIVA",
- "FREEWHEEL"
+ proxy_available = [
+ inquirer.List('Proxys available',
+ message="Please choose a country",
+ choices=allowed_countries
+ ),
]
- },
- "personaParentalControlRating": 9
- }
- post_data = json.dumps(post_data)
-
- headers = {
- 'accept': 'application/vnd.playvod.v1+json',
- 'content-type': 'application/vnd.playvod.v1+json',
- 'x-skyott-activeterritory': region,
- 'x-skyott-agent': 'skyshowtime.mobile.android',
- 'x-skyott-country': region,
- 'x-skyott-device': 'MOBILE',
- 'x-skyott-platform': 'ANDROID',
- 'x-skyott-proposition': 'SKYSHOWTIME',
- 'x-skyott-provider': 'SKYSHOWTIME',
- 'x-skyott-territory': region,
- 'x-skyott-usertoken': userToken,
- }
- headers['x-sky-signature'] = calculate_signature('POST', vod_url, headers, post_data)
+ selected_proxy = inquirer.prompt(proxy_available)
+ return service_selected["Service"], selected_proxy["Proxys available"]
+ else:
+ selected_proxy = None
+ return service_selected["Service"], selected_proxy
- vod_request = json.loads(requests.post(vod_url, headers=headers, data=post_data).content)
-
- license_url = vod_request['protection']['licenceAcquisitionUrl']
- manifest_url = vod_request['asset']['endpoints'][0]['url']
-
- #Getting pssh
-
- manifest = requests.get(manifest_url).content
- pssh = PSSH(BeautifulSoup(manifest, 'html.parser').findAll('cenc:pssh')[1].text)
-
-
- #CDM processing
-
- device = Device.load(MyWVD)
- cdm = Cdm.from_device(device)
- session_id = cdm.open()
- challenge = cdm.get_license_challenge(session_id, pssh)
-
- headers = {
- 'Content-Type':'application/octet-stream',
- }
-
- licence = requests.post(license_url, headers=headers, data=challenge)
- licence.raise_for_status()
- cdm.parse_license(session_id, licence.content)
-
- fkeys = ""
- print(f"\nMPD Link:")
- print(f"{manifest_url}\n")
+# Define variables for the service and proxy wanted
+
+
+service_selected, selected_proxy = service_prompt()
+
+
+if service_selected == "Generic":
+ print(f"\n{retrieve_keys(proxy_used=selected_proxy)}")
+elif service_selected == "Generic with headers from Licence cURL":
+ print(f"\n{retrieve_keys(proxy_used=selected_proxy, headers=License_cURL.headers)}")
+elif service_selected == "Remote":
+ print(f"\n{retrieve_keys_remotely(proxy_used=selected_proxy)}")
+elif service_selected == "Remote VDOCipher":
+ print(f"\n{retrieve_keys_remotely_vdocipher(proxy_used=selected_proxy)}")
- for key in cdm.get_keys(session_id):
- if key.type != 'SIGNING':
- fkeys += key.kid.hex + ":" + key.key.hex()
- cache_key(str(pssh), fkeys)
- print(fkeys)
- cdm.close(session_id)
-
-else:
- print("Invalid selection")
\ No newline at end of file