Compare commits

...

5 Commits
v1.22 ... main

Author SHA1 Message Date
TPD94
221a2b74be v1.32
- Removed service certificate from generic

- Added some JSON response handlers for generic script

- Fixed Udemy in GUI mode
2024-02-12 16:20:18 -05:00
TPD94
6b52e6ecf0 v1.31
Added Udemy support, added PSSH parse for CLI
2024-01-22 20:47:58 -05:00
TPD94
92f8f24429 Fixed GUI bug 2024-01-08 23:18:50 -05:00
TPD94
36a98db32e Update version 2024-01-08 23:07:12 -05:00
TPD94
c9b4298078 v1.3
- Improved error handling (hopefully no more GUI crashes!)
- Added RTE and AstroGo service scripts
- Added RTE Decrypt/Web-DL/GUI functions
- Added Taskbar / WM Icon
- Changed theme
2024-01-08 23:04:29 -05:00
15 changed files with 1129 additions and 122 deletions

View File

@ -1,6 +1,5 @@
# Import dependencies # Import dependencies
import os import os
from sys import exit
# Define api key check # Define api key check

View File

@ -6,11 +6,10 @@ import Helpers.binary_check
import Sites.Generic import Sites.Generic
import license_curl import license_curl
import Helpers.os_check import Helpers.os_check
from sys import exit
# Web Download function generic # Web Download function generic
def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False): def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False, site: str = None):
# Get the current operating system # Get the current operating system
operating_system = Helpers.os_check.get_os_specific() operating_system = Helpers.os_check.get_os_specific()
@ -26,9 +25,19 @@ def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, rem
# Retrieve the keys # Retrieve the keys
if not remote: if not remote:
mp4decrypt_keys, _ = Sites.Generic.decrypt_generic(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers) if not site:
mp4decrypt_keys, _ = Sites.Generic.decrypt_generic(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers)
if site == 'rte':
mp4decrypt_keys, _ = Sites.RTE.decrypt_rte(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers)
if site == 'udemy':
mp4decrypt_keys, _ = Sites.Udemy.decrypt_udemy(mpd_url=mpd, wvd=device, license_curl_headers=license_curl.headers, license_curl_cookies=license_curl.cookies)
if remote: if remote:
mp4decrypt_keys, _ = Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers, mpd_url=mpd) if not site:
mp4decrypt_keys, _ = Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers, mpd_url=mpd)
if site == 'rte':
mp4decrypt_keys, _ = Sites.RTE.decrypt_rte_remotely(mpd_url=mpd, api_key=api_key, license_curl_headers=license_curl.headers)
if site == 'udemy':
mp4decrypt_keys, _ = Sites.Udemy.decrypt_udemy_remotely(mpd_url=mpd, api_key=api_key, license_curl_headers=license_curl.headers, license_curl_cookies=license_curl.cookies)
# Define n_m3u8dl-re download parameters # Define n_m3u8dl-re download parameters
n_m3u8dl_re_download = [ n_m3u8dl_re_download = [

View File

@ -1,4 +1,6 @@
import PySimpleGUI as sg import PySimpleGUI as sg
import assets.images
import Sites import Sites
import ast import ast
import webbrowser import webbrowser
@ -15,54 +17,55 @@ def clean_dict(dict: str = None):
def start_gui(wvd: str = None, api_key: str = None): def start_gui(wvd: str = None, api_key: str = None):
sg.theme('Dark Black') # Add theme
sg.theme('Dark Amber') # Add theme
# the layout # the layout
left_frame_normal = sg.Col([ left_frame_normal = sg.Col([
[sg.Text('PSSH:'), sg.Text(size=(15, 1), key='-PSSH_TEXT-', expand_x=True, expand_y=True)], [sg.Text('PSSH:'), sg.Text(key='-PSSH_TEXT-')],
[sg.Input(key="-PSSH-")], [sg.Input(key="-PSSH-")],
[sg.VPush()], [sg.VPush()],
[sg.Text(text='License URL:'), sg.Text(size=(15, 1), key='-LIC_URL_TEXT-', expand_x=True, expand_y=True)], [sg.Text(text='License URL:'), sg.Text(key='-LIC_URL_TEXT-')],
[sg.Input(key='-LIC_URL-')], [sg.Input(key='-LIC_URL-')],
[sg.VPush()], [sg.VPush()],
[sg.Text('Keys:')], [sg.Text('Keys:')],
[sg.Output(size=(45, 6), key='-OUTPUT-', expand_y=True, expand_x=True)], [sg.Output(key='-OUTPUT-', expand_y=True, expand_x=True)],
[sg.Button('Decrypt'), sg.Button('Reset')] [sg.Button('Decrypt'), sg.Button('Reset')]
], expand_x=True, expand_y=True) ], expand_x=True, expand_y=True)
right_frame = [ right_frame = [
[sg.Text('headers =')], [sg.Text('headers =', key='-HEADERS_TEXT-')],
[sg.Multiline(key='-HEADERS-', size=(50, 10), expand_x=True, expand_y=True)], [sg.Multiline(key='-HEADERS-', expand_x=True, expand_y=True, tooltip=f"Paste headers dictionary starting with the first curly brace '{{' and ending with the last curly brace '}}'")],
[sg.Text('json =', key='-JSON_TEXT-', visible=False)], [sg.Text('json =', key='-JSON_TEXT-', visible=False)],
[sg.Multiline(key='-JSON-', size=(50, 10), visible=False, expand_x=True, expand_y=True)], [sg.Multiline(key='-JSON-', visible=False, expand_x=True, expand_y=True, tooltip=f"Paste json dictionary starting with the first curly brace '{{' and ending with the last curly brace '}}'")],
[sg.Text('cookies =', key='-COOKIES_TEXT-', visible=False)], [sg.Text('cookies =', key='-COOKIES_TEXT-', visible=False)],
[sg.Multiline(key='-COOKIES-', size=(50, 10), visible=False, expand_x=True, expand_y=True)], [sg.Multiline(key='-COOKIES-', visible=False, expand_x=True, expand_y=True, tooltip=f"Paste cookies dictionary starting with the first curly brace '{{' and ending with the last curly brace '}}'")],
[sg.Combo(values=['Generic', 'Crunchyroll', 'YouTube'], default_value='Generic', key='-OPTIONS-', [sg.Text('releasePid:', key='-PID_TEXT-', visible=False)],
[sg.Input(key='-PID-', visible=False, expand_x=True, expand_y=False)],
[sg.Combo(values=['Generic', 'Crunchyroll', 'YouTube', 'RTE', 'Udemy'], default_value='Generic', key='-OPTIONS-',
enable_events=True), sg.Push(), sg.Checkbox(text="Use CDM-Project API", key='-USE_API-')] enable_events=True), sg.Push(), sg.Checkbox(text="Use CDM-Project API", key='-USE_API-')]
] ]
if wvd is None: if wvd is None and api_key is not None:
right_frame[6] = [sg.Combo(values=['Generic', 'Crunchyroll', 'YouTube'], default_value='Generic', key='-OPTIONS-', right_frame[8] = [sg.Combo(values=['Generic', 'Crunchyroll', 'YouTube', 'RTE', 'Udemy'], default_value='Generic', key='-OPTIONS-',
enable_events=True), sg.Push(), sg.Checkbox(text="Use CDM-Project API", key='-USE_API-', default=True, disabled=True)] enable_events=True), sg.Push(), sg.Checkbox(text="Use CDM-Project API", key='-USE_API-', default=True, disabled=True)]
if api_key is None: if api_key is None and wvd is not None:
right_frame[6] = [sg.Combo(values=['Generic', 'Crunchyroll', 'YouTube'], default_value='Generic', key='-OPTIONS-', right_frame[8] = [sg.Combo(values=['Generic', 'Crunchyroll', 'YouTube', 'RTE', 'Udemy'], default_value='Generic', key='-OPTIONS-',
enable_events=True), sg.Push(), sg.Checkbox(text="Use CDM-Project API", key='-USE_API-', default=False, disabled=True)] enable_events=True), sg.Push(), sg.Checkbox(text="Use CDM-Project API", key='-USE_API-', default=False, disabled=True)]
right_frame_normal = sg.Col(right_frame, expand_x=True, expand_y=True) right_frame_normal = sg.Col(right_frame, expand_x=True, expand_y=True)
window_layout = [ window_layout = [
[sg.MenubarCustom([['About', ['Discord', 'CDM-Project', 'CDRM-Project', 'Source Code', 'Version']]], k='-MENUBAR-', p=0, )], [sg.MenubarCustom([['About', ['Discord', 'CDM-Project', 'CDRM-Project', 'Source Code', 'Version']]], k='-MENUBAR-')],
[left_frame_normal, right_frame_normal] [left_frame_normal, right_frame_normal]
] ]
# the window # the window
window = sg.Window('TPD-Keys', layout=window_layout, resizable=True, size=(800, 800)) window = sg.Window('TPD-Keys', layout=window_layout, resizable=True, size=(800, 800), icon=assets.images.taskbar)
# the event loop # the event loop
while True: while True:
if wvd is None and api_key is None: if wvd is None and api_key is None:
sg.popup(title="TPD-Keys", custom_text="No CDM or API key found!") sg.popup('Error!', 'No CDM or API Key found', custom_text="Close", icon=assets.images.taskbar)
break break
event, values = window.read() event, values = window.read()
@ -148,11 +151,11 @@ def start_gui(wvd: str = None, api_key: str = None):
except Exception as error: except Exception as error:
window['-OUTPUT-'].update(f"{error}") window['-OUTPUT-'].update(f"{error}")
# Error for no Headers - Generic # Error for no Headers - Crunchyroll
if values['-PSSH-'] != '' and values['-OPTIONS-'] == 'Crunchyroll' and values['-HEADERS-'] == '': if values['-PSSH-'] != '' and values['-OPTIONS-'] == 'Crunchyroll' and values['-HEADERS-'] == '':
window['-OUTPUT-'].update(f"No Headers provided") window['-OUTPUT-'].update(f"No Headers provided")
# Error for no PSSH # Error for no PSSH - Crunchyroll
if values['-PSSH-'] == '' and values['-OPTIONS-'] == 'Crunchyroll': if values['-PSSH-'] == '' and values['-OPTIONS-'] == 'Crunchyroll':
window['-OUTPUT-'].update(f"No PSSH provided") window['-OUTPUT-'].update(f"No PSSH provided")
@ -180,88 +183,179 @@ def start_gui(wvd: str = None, api_key: str = None):
except Exception as error: except Exception as error:
window['-OUTPUT-'].update(f"{error}") window['-OUTPUT-'].update(f"{error}")
# Error for no Headers - Crunchyroll # Error for no Headers - YouTube
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-HEADERS-'] == '' and values['-JSON-'] != '' and values['-COOKIES-'] != '': if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-HEADERS-'] == '' and values['-JSON-'] != '' and values['-COOKIES-'] != '':
window['-OUTPUT-'].update(f"No Headers provided") window['-OUTPUT-'].update(f"No Headers provided")
# Error for no Headers or JSON - Crunchyroll # Error for no JSON - YouTube
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-HEADERS-'] == '' and values['-JSON-'] == '' and values['-COOKIES-'] != '':
window['-OUTPUT-'].update(f"No Headers or JSON provided")
# Error for no Headers or Cookies - Crunchyroll
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-HEADERS-'] == '' and values['-JSON-'] != '' and values['-COOKIES-'] == '':
window['-OUTPUT-'].update(f"No Headers or Cookies provided")
# Error for no JSON - Crunchyroll
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-JSON-'] == '' and values['-HEADERS-'] != '' and values['-COOKIES-'] != '': if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-JSON-'] == '' and values['-HEADERS-'] != '' and values['-COOKIES-'] != '':
window['-OUTPUT-'].update(f"No JSON provided") window['-OUTPUT-'].update(f"No JSON provided")
# Error for no JSON or Headers - Crunchyroll # Error for no Cookies - YouTube
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-JSON-'] == '' and values['-HEADERS-'] == '' and values['-COOKIES-'] != '':
window['-OUTPUT-'].update(f"No JSON or Headers provided")
# Error for no JSON or Cookies - Crunchyroll
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-JSON-'] == '' and values['-HEADERS-'] != '' and values['-COOKIES-'] == '':
window['-OUTPUT-'].update(f"No JSON or Cookies provided")
# Error for no Cookies - Crunchyroll
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-COOKIES-'] == '' and values['-HEADERS-'] != '' and values['-JSON-'] != '': if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-COOKIES-'] == '' and values['-HEADERS-'] != '' and values['-JSON-'] != '':
window['-OUTPUT-'].update(f"No Cookies provided") window['-OUTPUT-'].update(f"No Cookies provided")
# Error for no Cookies or Headers - Crunchyroll # Error if Headers, Cookies and JSON are empty - YouTube
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-COOKIES-'] == '' and values['-HEADERS-'] == '' and values['-JSON-'] != '':
window['-OUTPUT-'].update(f"No Cookies or Headers provided")
# Error for no Cookies or JSON - Crunchyroll
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-COOKIES-'] == '' and values['-HEADERS-'] != '' and values['-JSON-'] == '':
window['-OUTPUT-'].update(f"No Cookies or JSON provided")
# Error if Headers, Cookies and JSON are empty - Crunchyroll
if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-HEADERS-'] == '' and values['-JSON-'] == '' and values['-COOKIES-'] == '': if values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'YouTube' and values['-HEADERS-'] == '' and values['-JSON-'] == '' and values['-COOKIES-'] == '':
window['-OUTPUT-'].update(f"No dictionaries provided") window['-OUTPUT-'].update(f"Not all dictionaries provided")
# Error if no license URL - Crunchyroll # Error if no license URL - YouTube
if values['-LIC_URL-'] == '' and values['-OPTIONS-'] == 'YouTube': if values['-LIC_URL-'] == '' and values['-OPTIONS-'] == 'YouTube':
window['-OUTPUT-'].update(f"No license URL provided") window['-OUTPUT-'].update(f"No license URL provided")
# Action for decrypt if RTE is selected
if values['-PSSH-'] != '' and values['-LIC_URL-'] != '' and values['-PID-'] != '' and values['-OPTIONS-'] == 'RTE':
if not values['-USE_API-']:
try:
_, key_out = Sites.RTE.decrypt_rte(wvd=wvd, in_pssh=values['-PSSH-'],
in_license_url=values['-LIC_URL-'], release_pid=values['-PID-'])
window['-OUTPUT-'].update(f"{key_out}")
except Exception as error:
window['-OUTPUT-'].update(f"{error}")
if values['-USE_API-']:
try:
_, key_out = Sites.RTE.decrypt_rte_remotely(api_key=api_key, in_pssh=values['-PSSH-'],
in_license_url=values['-LIC_URL-'], release_pid=values['-PID-'])
window['-OUTPUT-'].update(f"{key_out}")
except Exception as error:
window['-OUTPUT-'].update(f"{error}")
# Error if no values - RTE
if values['-PSSH-'] == '' and values['-LIC_URL-'] == '' and values['-PID-'] == '' and values['-OPTIONS-'] == 'RTE':
window['-OUTPUT-'].update(f"No fields provided!")
# Error if no PSSH - RTE
if values['-PSSH-'] == '' and values['-LIC_URL-'] != '' and values['-PID-'] != '' and values['-OPTIONS-'] == 'RTE':
window['-OUTPUT-'].update(f"No PSSH provided")
# Error if no License URL - RTE
if values['-PSSH-'] != '' and values['-LIC_URL-'] == '' and values['-PID-'] != '' and values['-OPTIONS-'] == 'RTE':
window['-OUTPUT-'].update(f"No license URL provided")
# Error if no PID - RTE
if values['-PSSH-'] != '' and values['-LIC_URL-'] != '' and values['-PID-'] == '' and values['-OPTIONS-'] == 'RTE':
window['-OUTPUT-'].update(f"No PID provided")
# Action for Decrypt for Udemy decrypt if fields are filled out
if values['-PSSH-'] != '' and values['-OPTIONS-'] == 'Udemy' and values['-LIC_URL-'] != '' and values['-HEADERS-'] != '' and values['-COOKIES-'] != '':
if not values['-USE_API-']:
try:
_, key_out = Sites.Udemy.decrypt_udemy(wvd=wvd, in_pssh=values['-PSSH-'], in_license_url=values['-LIC_URL-'],
license_curl_headers=ast.literal_eval(clean_dict(dict=values['-HEADERS-'])),
license_curl_cookies=ast.literal_eval(clean_dict(dict=values['-COOKIES-'])))
window['-OUTPUT-'].update(f"{key_out}")
except Exception as error:
window['-OUTPUT-'].update(f"{error}")
if values['-USE_API-']:
if api_key is None:
window['-OUTPUT-'].update(f"No API key")
if api_key is not None:
try:
_, key_out = Sites.Udemy.decrypt_udemy_remotely(api_key=api_key, in_pssh=values['-PSSH-'], in_license_url=values['-LIC_URL-'],
license_curl_headers=ast.literal_eval(clean_dict(dict=values['-HEADERS-'])),
license_curl_cookies=ast.literal_eval(clean_dict(dict=values['-COOKIES-'])))
window['-OUTPUT-'].update(f"{key_out}")
except Exception as error:
window['-OUTPUT-'].update(f"{error}")
# Error for no PSSH - Udemy
if values['-PSSH-'] == '' and values['-OPTIONS-'] == 'Udemy':
window['-OUTPUT-'].update(f"No PSSH provided")
# Error for no License URL - Udemy
if values['-LIC_URL-'] == '' and values['-OPTIONS-'] == 'Udemy':
window['-OUTPUT-'].update(f"No License URL provided")
# Error for no Headers - Udemy
if values['-PSSH-'] != '' and values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'Udemy' and values['-HEADERS-'] == '':
window['-OUTPUT-'].update(f"No Headers provided")
# Error for no Cookies - Udemy
if values['-PSSH-'] != '' and values['-LIC_URL-'] != '' and values['-OPTIONS-'] == 'Udemy' and values['-COOKIES-'] == '':
window['-OUTPUT-'].update(f"No Cookies provided")
# Actions for reset button # Actions for reset button
if event == 'Reset': if event == 'Reset':
window['-PSSH-'].update(value="", disabled=False) window['-PSSH-'].update(value="", disabled=False)
window['-LIC_URL-'].update(value="", disabled=False) window['-LIC_URL-'].update(value="", disabled=False)
window['-OUTPUT-'].update(value="", disabled=False) window['-OUTPUT-'].update(value="", disabled=False)
window['-HEADERS-'].update(value="", disabled=False) window['-HEADERS_TEXT-'].update(visible=True)
window['-HEADERS-'].update(value="", visible=True)
window['-OPTIONS-'].update(value="Generic", disabled=False) window['-OPTIONS-'].update(value="Generic", disabled=False)
window['-JSON-'].update(visible=False) window['-JSON-'].update(visible=False)
window['-JSON_TEXT-'].update(visible=False) window['-JSON_TEXT-'].update(visible=False)
window['-COOKIES-'].update(visible=False) window['-COOKIES-'].update(visible=False)
window['-COOKIES_TEXT-'].update(visible=False) window['-COOKIES_TEXT-'].update(visible=False)
window['-PID_TEXT-'].update(visible=False)
window['-PID-'].update(visible=False, value="")
# Actions for Crunchyroll selector # Actions for Crunchyroll selector
if event == '-OPTIONS-' and values['-OPTIONS-'] == 'Crunchyroll': if event == '-OPTIONS-' and values['-OPTIONS-'] == 'Crunchyroll':
window['-PSSH-'].update(value="", disabled=False) window['-PSSH-'].update(value="", disabled=False)
window['-LIC_URL-'].update(value="", disabled=True) window['-LIC_URL-'].update(value="", disabled=True)
window['-HEADERS_TEXT-'].update(visible=True)
window['-HEADERS-'].update(value="", visible=True)
window['-JSON-'].update(value="", visible=False) window['-JSON-'].update(value="", visible=False)
window['-JSON_TEXT-'].update(visible=False) window['-JSON_TEXT-'].update(visible=False)
window['-COOKIES-'].update(value="", visible=False) window['-COOKIES-'].update(value="", visible=False)
window['-COOKIES_TEXT-'].update(visible=False) window['-COOKIES_TEXT-'].update(visible=False)
window['-PID_TEXT-'].update(visible=False)
window['-PID-'].update(visible=False, value="")
# Actions for Generic selector # Actions for Generic selector
if event == '-OPTIONS-' and values['-OPTIONS-'] == 'Generic': if event == '-OPTIONS-' and values['-OPTIONS-'] == 'Generic':
window['-PSSH-'].update(value="", disabled=False) window['-PSSH-'].update(value="", disabled=False)
window['-LIC_URL-'].update(value="", disabled=False) window['-LIC_URL-'].update(value="", disabled=False)
window['-HEADERS_TEXT-'].update(visible=True)
window['-HEADERS-'].update(visible=True)
window['-JSON-'].update(value="", visible=False) window['-JSON-'].update(value="", visible=False)
window['-JSON_TEXT-'].update(visible=False) window['-JSON_TEXT-'].update(visible=False)
window['-COOKIES-'].update(value="", visible=False) window['-COOKIES-'].update(value="", visible=False)
window['-COOKIES_TEXT-'].update(visible=False) window['-COOKIES_TEXT-'].update(visible=False)
window['-PID_TEXT-'].update(visible=False)
window['-PID-'].update(visible=False, value="")
# Actions for RTE selector
if event == '-OPTIONS-' and values['-OPTIONS-'] == 'RTE':
window['-PSSH-'].update(value="", disabled=False)
window['-LIC_URL-'].update(value="", disabled=False)
window['-HEADERS_TEXT-'].update(visible=False)
window['-HEADERS-'].update(value="", visible=False)
window['-JSON-'].update(value="", visible=False)
window['-JSON_TEXT-'].update(visible=False)
window['-COOKIES-'].update(value="", visible=False)
window['-COOKIES_TEXT-'].update(visible=False)
window['-PID_TEXT-'].update(visible=True)
window['-PID-'].update(visible=True, value="")
# Actions for Udemy selector
if event == '-OPTIONS-' and values['-OPTIONS-'] == 'Udemy':
window['-PSSH-'].update(value="", disabled=False)
window['-LIC_URL-'].update(value="", disabled=False)
window['-HEADERS_TEXT-'].update(visible=True)
window['-HEADERS-'].update(value="", visible=True)
window['-JSON-'].update(value="", visible=False)
window['-JSON_TEXT-'].update(visible=False)
window['-COOKIES-'].update(value="", visible=True)
window['-COOKIES_TEXT-'].update(visible=True)
window['-PID_TEXT-'].update(visible=False)
window['-PID-'].update(visible=False, value="")
# Actions for YouTube selector # Actions for YouTube selector
if event == '-OPTIONS-' and values['-OPTIONS-'] == 'YouTube': if event == '-OPTIONS-' and values['-OPTIONS-'] == 'YouTube':
window['-PSSH-'].update(value="", disabled=True) window['-PSSH-'].update(value="", disabled=True)
window['-LIC_URL-'].update(value="", disabled=False) window['-LIC_URL-'].update(value="", disabled=False)
window['-HEADERS_TEXT-'].update(visible=True)
window['-HEADERS-'].update(value="", visible=True)
window['-JSON-'].update(visible=True) window['-JSON-'].update(visible=True)
window['-JSON_TEXT-'].update(visible=True) window['-JSON_TEXT-'].update(visible=True)
window['-COOKIES-'].update(visible=True) window['-COOKIES-'].update(visible=True)
window['-COOKIES_TEXT-'].update(visible=True) window['-COOKIES_TEXT-'].update(visible=True)
window['-PID_TEXT-'].update(visible=False)
window['-PID-'].update(visible=False, value="")
# Actions for MenuBar # Actions for MenuBar
if event == 'Discord': if event == 'Discord':
@ -273,7 +367,7 @@ def start_gui(wvd: str = None, api_key: str = None):
if event == 'Source Code': if event == 'Source Code':
webbrowser.open(url='https://cdm-project.com/Decryption-Tools/TPD-Keys') webbrowser.open(url='https://cdm-project.com/Decryption-Tools/TPD-Keys')
if event == 'Version': if event == 'Version':
sg.popup('Version 1.22', custom_text='Close', grab_anywhere=True) sg.popup('Version 1.32', custom_text='Close', grab_anywhere=True)
# 4 - the close # 4 - the close
window.close() window.close()

View File

@ -4,10 +4,10 @@ from sys import exit
# Define MPD / m3u8 PSSH parser # Define MPD / m3u8 PSSH parser
def parse_pssh(manifest_url): def parse_pssh(manifest_url, license_headers: dict = None):
manifest = manifest_url manifest = manifest_url
try: try:
response = requests.get(manifest) response = requests.get(manifest, headers=license_headers)
except: except:
pssh = input("Couldn't retrieve manifest, please input PSSH: ") pssh = input("Couldn't retrieve manifest, please input PSSH: ")
return pssh return pssh

View File

@ -1,5 +1,4 @@
import os import os
from sys import exit
def get_os_specific(): def get_os_specific():

View File

@ -1,7 +1,6 @@
# Import dependencies # Import dependencies
import os import os
import glob import glob
from sys import exit
# Define WVD device check # Define WVD device check

255
Sites/AstroGo.py Normal file
View File

@ -0,0 +1,255 @@
# Import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import requests
import base64
import os
import Helpers
from sys import exit
# Defining decrypt function for generic services
def decrypt_astrogo(wvd: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, mpd_url: str = None,
in_pssh: str = None, in_license_url: str = None):
# Exit if no device
if wvd is None:
exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None and in_pssh is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
# prepare pssh
if in_pssh is None:
try:
pssh = PSSH(input_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
if in_pssh is not None:
try:
pssh = PSSH(in_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# Ask for license URL
if in_license_url is None:
license_url = input(f"\nLicense URL: ")
if in_license_url is not None:
license_url = in_license_url
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# Set challenge
challenge = cdm.get_license_challenge(session_id, pssh)
# Insert challenge into JSON
license_curl_json["licenseChallenge"] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=license_curl_headers,
json=license_curl_json
)
if license.status_code != 200:
print(f'An error occurred!\n{license.content}')
return license.content
# Extract license from content
license = license.json()["licenseData"]
# parse license challenge
try:
cdm.parse_license(session_id, license)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
if in_pssh is None:
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
if in_pssh is not None:
Helpers.cache_key.cache_keys(pssh=in_pssh, keys=returned_keys)
# Print out the keys
print(f'\nKeys:\n{returned_keys}')
# Return the keys for future ripper use.
return mp4decrypt_keys, returned_keys
# Defining remote decrypt function for generic services
def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, mpd_url: str = None,
in_pssh: str = None, in_license_url: str = None):
# Exit if no API key
if api_key is None:
exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None and in_pssh is None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None and in_pssh is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
if in_pssh is not None:
input_pssh = in_pssh
# Ask for license URL
if in_license_url is None:
input_license_url = input(f"\nLicense URL: ")
if in_license_url is not None:
input_license_url = in_license_url
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Error handling
if open_session.status_code != 200:
print(f"An error occurred!\n{open_session.content}")
return None, open_session.content
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Error handling
if generate_challenge.status_code != 200:
print(f"An error occurred!\n{generate_challenge.content}")
return None, generate_challenge.content
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Insert challenge into JSON
license_curl_json["licenseChallenge"] = base64.b64encode(challenge).decode()
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
json=license_curl_json
)
# Error handling
if license.status_code != 200:
print(f"An error occurred!\n{license.content}")
return None, license.content
# Retrieve the license message
license = license.json()["licenseData"]
license = base64.b64encode(license).decode()
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
parse = requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Error handling
if parse.status_code != 200:
print(f"An error occurred!\n{parse.content}")
return None, parse.content
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Error handling
if get_keys.status_code != 200:
print(f"An error occurred!\n{get_keys.content}")
return None, get_keys.content
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'\nKeys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
# Return mp4decrypt keys
return mp4decrypt_keys, returned_keys

View File

@ -9,6 +9,7 @@ import os
import Helpers import Helpers
from sys import exit from sys import exit
# Defining decrypt function for Crunchyroll # Defining decrypt function for Crunchyroll
def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_url: str = None, def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_url: str = None,
in_pssh: str = None): in_pssh: str = None):
@ -19,7 +20,7 @@ def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_
# Try getting pssh via MPD URL if web-dl # Try getting pssh via MPD URL if web-dl
if mpd_url is not None: if mpd_url is not None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url, license_headers=license_curl_headers)
if input_pssh is not None: if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}') print(f'\nPSSH found: {input_pssh}')
else: else:
@ -32,9 +33,17 @@ def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_
# prepare pssh # prepare pssh
if in_pssh is None: if in_pssh is None:
pssh = PSSH(input_pssh) try:
pssh = PSSH(input_pssh)
except Exception as error:
print(f"An error occurred!\n{error}")
return None, error
if in_pssh is not None: if in_pssh is not None:
pssh = PSSH(in_pssh) try:
pssh = PSSH(in_pssh)
except Exception as error:
print(f"An error occurred!\n{error}")
return None, error
# load device # load device
device = Device.load(wvd) device = Device.load(wvd)
@ -52,7 +61,8 @@ def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_
headers=license_curl_headers headers=license_curl_headers
) )
if service_cert.status_code != 200: if service_cert.status_code != 200:
print("Couldn't retrieve service cert") print(f"Couldn't retrieve service cert\n{service_cert.content}")
return None, service_cert.content
else: else:
service_cert = service_cert.json()["license"] service_cert = service_cert.json()["license"]
cdm.set_service_certificate(session_id, service_cert) cdm.set_service_certificate(session_id, service_cert)
@ -71,8 +81,8 @@ def decrypt_crunchyroll(wvd: str = None, license_curl_headers: dict = None, mpd_
) )
if license.status_code != 200: if license.status_code != 200:
print(license.content) print(f"An error occurred!\n{license.content}")
exit("Could not complete license challenge") return None, license.content
# Extract license from json dict # Extract license from json dict
license = license.json()["license"] license = license.json()["license"]
@ -125,7 +135,7 @@ def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict
# Try getting pssh via MPD URL if web-dl # Try getting pssh via MPD URL if web-dl
if mpd_url is not None and in_pssh is None: if mpd_url is not None and in_pssh is None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url, license_headers=license_curl_headers)
if input_pssh is not None: if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}') print(f'\nPSSH found: {input_pssh}')
else: else:
@ -147,6 +157,11 @@ def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict
# Open CDM session # Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Error handling
if open_session.status_code != 200:
print(f"An error occurred!\n{open_session.content}")
return None, open_session.content
# Get the session ID from the open CDM session # Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"] session_id = open_session.json()["data"]["session_id"]
@ -159,6 +174,11 @@ def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict
# Generate the license challenge # Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Error handling
if generate_challenge.status_code != 200:
print(f"An error occurred!\n{generate_challenge.content}")
return None, generate_challenge.content
# Retrieve the challenge and base64 decode it # Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
@ -169,6 +189,10 @@ def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict
data=challenge data=challenge
) )
if license.status_code != 200:
print(f"An error occurred!\n{license.content}")
return None, license.content
# Retrieve the license message # Retrieve the license message
license = license.json()["license"] license = license.json()["license"]
@ -179,13 +203,21 @@ def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict
} }
# Parse the license # Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) parse = requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Error handling
if parse.status_code != 200:
print(f"An error occurred!\n{parse.content}")
# Retrieve the keys # Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id}, json={"session_id": session_id},
headers=api_key_headers) headers=api_key_headers)
# Error handling
if get_keys.status_code != 200:
print(f"An error occurred!\n{get_keys.content}")
# Iterate through the keys, ignoring signing key # Iterate through the keys, ignoring signing key
returned_keys = '' returned_keys = ''
for key in get_keys.json()["data"]["keys"]: for key in get_keys.json()["data"]["keys"]:
@ -206,7 +238,11 @@ def decrypt_crunchyroll_remotely(api_key: str = None, license_curl_headers: dict
print(f'\nKeys:\n{returned_keys}') print(f'\nKeys:\n{returned_keys}')
# Close session # Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) close_session = requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
# Error handling
if close_session.status_code != 200:
print(f"An error occurred!\n{close_session.content}")
# return mp4decrypt keys # return mp4decrypt keys
return mp4decrypt_keys, returned_keys return mp4decrypt_keys, returned_keys

View File

@ -31,12 +31,19 @@ def decrypt_generic(wvd: str = None, license_curl_headers: dict = None, mpd_url:
# Ask for PSSH if web-dl not selected: # Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ") input_pssh = input(f"\nPSSH: ")
# prepare pssh # prepare pssh
if in_pssh is None: if in_pssh is None:
pssh = PSSH(input_pssh) try:
pssh = PSSH(input_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
if in_pssh is not None: if in_pssh is not None:
pssh = PSSH(in_pssh) try:
pssh = PSSH(in_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# Ask for license URL # Ask for license URL
if in_license_url is None: if in_license_url is None:
@ -53,23 +60,8 @@ def decrypt_generic(wvd: str = None, license_curl_headers: dict = None, mpd_url:
# open CDM session # open CDM session
session_id = cdm.open() session_id = cdm.open()
# get service certificate # Generate the challenge
service_cert = requests.post( challenge = cdm.get_license_challenge(session_id, pssh)
url=license_url,
data=cdm.service_certificate_challenge,
headers=license_curl_headers
)
if service_cert.status_code != 200:
print("Couldn't retrieve service cert")
else:
service_cert = service_cert.content
cdm.set_service_certificate(session_id, service_cert)
# generate license challenge
if service_cert:
challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
else:
challenge = cdm.get_license_challenge(session_id, pssh)
# send license challenge # send license challenge
license = requests.post( license = requests.post(
@ -79,14 +71,25 @@ def decrypt_generic(wvd: str = None, license_curl_headers: dict = None, mpd_url:
) )
if license.status_code != 200: if license.status_code != 200:
print(license.content) print(f'An error occurred!\n{license.content}')
exit("Could not complete license challenge") return license.content
# Extract license from json dict
license = license.content
# parse license challenge # parse license challenge
cdm.parse_license(session_id, license) try:
cdm.parse_license(session_id, license.content)
except:
try:
cdm.parse_license(session_id, license.json().get('license'))
except:
try:
cdm.parse_license(session_id, license.json().get('licenseData'))
except:
try:
cdm.parse_license(session_id, license.json().get('widevine2License'))
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# assign variable for returned keys # assign variable for returned keys
returned_keys = "" returned_keys = ""
@ -161,6 +164,11 @@ def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = N
# Open CDM session # Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Error handling
if open_session.status_code != 200:
print(f"An error occurred!\n{open_session.content}")
return None, open_session.content
# Get the session ID from the open CDM session # Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"] session_id = open_session.json()["data"]["session_id"]
@ -173,6 +181,11 @@ def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = N
# Generate the license challenge # Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Error handling
if generate_challenge.status_code != 200:
print(f"An error occurred!\n{generate_challenge.content}")
return None, generate_challenge.content
# Retrieve the challenge and base64 decode it # Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
@ -183,6 +196,10 @@ def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = N
data=challenge data=challenge
) )
if license.status_code != 200:
print(f'An error occurred!\n{license.content}')
return None, license.content
# Retrieve the license message # Retrieve the license message
license = base64.b64encode(license.content).decode() license = base64.b64encode(license.content).decode()
@ -193,13 +210,23 @@ def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = N
} }
# Parse the license # Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) parse = requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Error handling
if parse.status_code != 200:
print(f"An error occurred!\n{parse.content}")
return None, parse.content
# Retrieve the keys # Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id}, json={"session_id": session_id},
headers=api_key_headers) headers=api_key_headers)
# Error handling
if get_keys.status_code != 200:
print(f"An error occurred!\n{get_keys.content}")
return None, get_keys.content
# Iterate through the keys, ignoring signing key # Iterate through the keys, ignoring signing key
returned_keys = '' returned_keys = ''
for key in get_keys.json()["data"]["keys"]: for key in get_keys.json()["data"]["keys"]:

256
Sites/RTE.py Normal file
View File

@ -0,0 +1,256 @@
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import requests
import base64
import os
import Helpers
from sys import exit
# Defining decrypt function for generic services
def decrypt_rte(wvd: str = None, license_curl_headers: dict = None, mpd_url: str = None,
in_pssh: str = None, in_license_url: str = None, release_pid: str = None):
# Exit if no device
if wvd is None:
exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None and in_pssh is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
# prepare pssh
if in_pssh is None:
try:
pssh = PSSH(input_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
if in_pssh is not None:
try:
pssh = PSSH(in_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# Ask for license URL
if in_license_url is None:
license_url = input(f"\nLicense URL: ")
if in_license_url is not None:
license_url = in_license_url
# Ask for release PID
if release_pid is None:
pid = input(f"\nRelease PID: ")
if release_pid is not None:
pid = release_pid
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# Set the challenge
challenge = cdm.get_license_challenge(session_id, pssh)
# send license challenge
license = requests.post(
url=license_url,
headers=license_curl_headers,
data=f'{{"getWidevineLicense":{{"releasePid":"{pid}","widevineChallenge":"{base64.b64encode(challenge).decode()}"}}}}'
)
if license.status_code != 200:
print(f'An error occurred!\n{license.content}')
return license.content
# Extract license from content
license = license.json()['getWidevineLicenseResponse']['license']
# parse license challenge
try:
cdm.parse_license(session_id, license)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
if in_pssh is None:
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
if in_pssh is not None:
Helpers.cache_key.cache_keys(pssh=in_pssh, keys=returned_keys)
# Print out the keys
print(f'\nKeys:\n{returned_keys}')
# Return the keys for future ripper use.
return mp4decrypt_keys, returned_keys
# Defining remote decrypt function for generic services
def decrypt_rte_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None,
in_pssh: str = None, in_license_url: str = None, release_pid: str = None):
# Exit if no API key
if api_key is None:
exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None and in_pssh is None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None and in_pssh is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
if in_pssh is not None:
input_pssh = in_pssh
# Ask for license URL
if in_license_url is None:
input_license_url = input(f"\nLicense URL: ")
if in_license_url is not None:
input_license_url = in_license_url
# Ask for release PID
if release_pid is None:
pid = input(f"\nRelease PID: ")
if release_pid is not None:
pid = release_pid
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Error handling
if open_session.status_code != 200:
print(f"An error occurred!\n{open_session.content}")
return None, open_session.content
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Error handling
if generate_challenge.status_code != 200:
print(f"An error occurred!\n{generate_challenge.content}")
return None, generate_challenge.content
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
data=f'{{"getWidevineLicense":{{"releasePid":"{pid}","widevineChallenge":"{base64.b64encode(challenge).decode()}"}}}}'
)
if license.status_code != 200:
print(f'An error occurred!\n{license.content}')
return None, license.content
# Retrieve the license message
license = license.json()['getWidevineLicenseResponse']['license']
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
parse = requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Error handling
if parse.status_code != 200:
print(f"An error occurred!\n{parse.content}")
return None, parse.content
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Error handling
if get_keys.status_code != 200:
print(f"An error occurred!\n{get_keys.content}")
return None, get_keys.content
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'\nKeys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
# Return mp4decrypt keys
return mp4decrypt_keys, returned_keys

249
Sites/Udemy.py Normal file
View File

@ -0,0 +1,249 @@
# Import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import requests
import base64
import os
import Helpers
from sys import exit
# Defining decrypt function for generic services
def decrypt_udemy(wvd: str = None, license_curl_headers: dict = None, license_curl_cookies: str = None, mpd_url: str = None,
in_pssh: str = None, in_license_url: str = None):
# Exit if no device
if wvd is None:
exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None and in_pssh is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
# prepare pssh
if in_pssh is None:
try:
pssh = PSSH(input_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
if in_pssh is not None:
try:
pssh = PSSH(in_pssh)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# Ask for license URL
if in_license_url is None:
license_url = input(f"\nLicense URL: ")
if in_license_url is not None:
license_url = in_license_url
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# generate license challenge
challenge = cdm.get_license_challenge(session_id, pssh)
# send license challenge
license = requests.post(
url=license_url,
data=challenge,
headers=license_curl_headers,
cookies=license_curl_cookies
)
if license.status_code != 200:
print(f'An error occurred!\n{license.content}')
return license.content
# Extract license from content
license = license.content
# parse license challenge
try:
cdm.parse_license(session_id, license)
except Exception as error:
print(f'an error occurred!\n{error}')
return None, error
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
if in_pssh is None:
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
if in_pssh is not None:
Helpers.cache_key.cache_keys(pssh=in_pssh, keys=returned_keys)
# Print out the keys
print(f'\nKeys:\n{returned_keys}')
# Return the keys for future ripper use.
return mp4decrypt_keys, returned_keys
# Defining remote decrypt function for generic services
def decrypt_udemy_remotely(api_key: str = None, license_curl_headers: dict = None, license_curl_cookies: dict = None, mpd_url: str = None,
in_pssh: str = None, in_license_url: str = None):
# Exit if no API key
if api_key is None:
exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None and in_pssh is None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None and in_pssh is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
if in_pssh is not None:
input_pssh = in_pssh
# Ask for license URL
if in_license_url is None:
input_license_url = input(f"\nLicense URL: ")
if in_license_url is not None:
input_license_url = in_license_url
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Error handling
if open_session.status_code != 200:
print(f"An error occurred!\n{open_session.content}")
return None, open_session.content
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Error handling
if generate_challenge.status_code != 200:
print(f"An error occurred!\n{generate_challenge.content}")
return None, generate_challenge.content
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
cookies=license_curl_cookies,
data=challenge
)
if license.status_code != 200:
print(f'An error occurred!\n{license.content}')
return None, license.content
# Retrieve the license message
license = base64.b64encode(license.content).decode()
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
parse = requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Error handling
if parse.status_code != 200:
print(f"An error occurred!\n{parse.content}")
return None, parse.content
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Error handling
if get_keys.status_code != 200:
print(f"An error occurred!\n{get_keys.content}")
return None, get_keys.content
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'\nKeys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
# Return mp4decrypt keys
return mp4decrypt_keys, returned_keys

View File

@ -54,8 +54,8 @@ def decrypt_youtube(wvd: str = None, license_curl_headers: dict = None, license_
) )
if license.status_code != 200: if license.status_code != 200:
print(license.content) print(f"An error occurred!\n{license.content}")
exit("Could not complete license challenge") return None, license.content
# Extract license from json dict # Extract license from json dict
licence = license.json()["license"].replace("-", "+").replace("_", "/") licence = license.json()["license"].replace("-", "+").replace("_", "/")
@ -120,9 +120,19 @@ def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = N
# Open CDM session # Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Error handling
if open_session.status_code != 200:
print(f"An error occurred!\n{open_session.content}")
return None, open_session.content
# Get the session ID from the open CDM session # Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"] session_id = open_session.json()["data"]["session_id"]
# Error handling
if session_id.status_code != 200:
print(f"An error occurred!\n{session_id.content}")
return None, session_id.content
# Set JSON required to generate a license challenge # Set JSON required to generate a license challenge
generate_challenge_json = { generate_challenge_json = {
"session_id": session_id, "session_id": session_id,
@ -132,6 +142,10 @@ def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = N
# Generate the license challenge # Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
if generate_challenge.status_code != 200:
print(f"An error occurred!\n{generate_challenge.content}")
return None, generate_challenge.content
# Retrieve the challenge and base64 decode it # Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
@ -146,6 +160,11 @@ def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = N
cookies=license_curl_cookies cookies=license_curl_cookies
) )
# Error handling
if license.status_code != 200:
print(f"An error occurred!\n{license.content}")
return None, license.content
# Retrieve the license message # Retrieve the license message
license = license.json()["license"].replace("-", "+").replace("_", "/") license = license.json()["license"].replace("-", "+").replace("_", "/")
@ -156,13 +175,23 @@ def decrypt_youtube_remotely(api_key: str = None, license_curl_headers: dict = N
} }
# Parse the license # Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) parse = requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Error handling
if parse.status_code != 200:
print(f"An error occurred!\n{parse.content}")
return None, parse.content
# Retrieve the keys # Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id}, json={"session_id": session_id},
headers=api_key_headers) headers=api_key_headers)
# Error handling
if get_keys.status_code != 200:
print(f"An error occurred!\n{get_keys.content}")
return None, get_keys.content
# assign variable for mp4decrypt keys # assign variable for mp4decrypt keys
mp4decrypt_keys = [] mp4decrypt_keys = []
for key in get_keys.json()["data"]["keys"]: for key in get_keys.json()["data"]["keys"]:

View File

@ -2,4 +2,7 @@ from . import Crunchyroll
from . import Generic from . import Generic
from . import YouTube from . import YouTube
from . import VDOCipher from . import VDOCipher
from . import Canal from . import Canal
from . import RTE
from . import AstroGo
from . import Udemy

1
assets/images.py Normal file

File diff suppressed because one or more lines are too long

View File

@ -3,7 +3,6 @@ import Helpers
import Sites import Sites
import license_curl import license_curl
import argparse import argparse
from sys import exit
# Get device and api key # Get device and api key
device, api_key = Helpers.capability_check.capability_check() device, api_key = Helpers.capability_check.capability_check()
@ -20,10 +19,16 @@ services = parser.add_mutually_exclusive_group()
# Add switches to the mutually exclusive groups # Add switches to the mutually exclusive groups
services.add_argument('--crunchyroll', action='store_true', help="Decrypt Crunchyroll") services.add_argument('--crunchyroll', action='store_true', help="Decrypt Crunchyroll")
services.add_argument('--crunchyroll-remote', action='store_true', help="Decrypt Crunchyroll remotely") services.add_argument('--crunchyroll-remote', action='store_true', help="Decrypt Crunchyroll remotely")
services.add_argument('--youtube', action='store_true', help="Decrypt YouTube")
services.add_argument('--youtube-remote', action='store_true', help="Decrypt YouTube remotely")
services.add_argument('--generic', action='store_true', help="Decrypt generic services") services.add_argument('--generic', action='store_true', help="Decrypt generic services")
services.add_argument('--generic-remote', action='store_true', help="Decrypt generic services remotely") services.add_argument('--generic-remote', action='store_true', help="Decrypt generic services remotely")
services.add_argument('--pssh', action='store_true', help="Parse a PSSH from an MPD link.")
services.add_argument('--rte', action='store_true', help="Decrypt RTE")
services.add_argument('--rte-remote', action='store_true', help="Decrypt RTE remotely")
services.add_argument('--udemy', action='store_true', help="Decrypt Udemy")
services.add_argument('--udemy-remote', action='store_true', help="Decrypt Udemy remotely")
services.add_argument('--youtube', action='store_true', help="Decrypt YouTube")
services.add_argument('--youtube-remote', action='store_true', help="Decrypt YouTube remotely")
# Add web download switch # Add web download switch
parser.add_argument('--web-dl', help="Web download", action='store_true') parser.add_argument('--web-dl', help="Web download", action='store_true')
@ -51,6 +56,70 @@ elif switches.crunchyroll_remote:
Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers) Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
elif switches.generic:
# Perform action for --generic
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, device=device)
print(f'Saved at {file[0]}')
else:
Sites.Generic.decrypt_generic(wvd=device, license_curl_headers=license_curl.headers)
elif switches.generic_remote:
# Perform action for --generic-remote
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, api_key=api_key, remote=True)
print(f'Saved at {file[0]}')
else:
Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
elif switches.pssh:
# Perform action for --pssh
mpd = input("MPD URL: ")
print(f'PSSH: {Helpers.mpd_parse.parse_pssh(manifest_url=mpd)}')
elif switches.rte:
# Perform action for --rte
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, device=device, site='rte')
print(f'Saved at {file[0]}')
else:
Sites.RTE.decrypt_rte(wvd=device)
elif switches.rte_remote:
# Perform action for --rte-remote
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, api_key=api_key, remote=True, site='rte')
print(f'Saved at {file[0]}')
else:
Sites.RTE.decrypt_rte_remotely(api_key=api_key)
elif switches.udemy:
# Perform action for --udemy
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, device=device, site='udemy')
print(f'Saved at {file[0]}')
else:
Sites.Udemy.decrypt_udemy(wvd=device, license_curl_headers=license_curl.headers, license_curl_cookies=license_curl.cookies)
elif switches.udemy_remote:
# Perform action for --udemy-remote
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, api_key=api_key, remote=True, site='udemy')
print(f'Saved at {file[0]}')
else:
Sites.Udemy.decrypt_udemy_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_cookies=license_curl.cookies)
elif switches.youtube: elif switches.youtube:
# Perform action for --YouTube # Perform action for --YouTube
if switches.web_dl: if switches.web_dl:
@ -71,24 +140,6 @@ elif switches.youtube_remote:
Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies) Sites.YouTube.decrypt_youtube_remotely(api_key=api_key, license_curl_headers=license_curl.headers, license_curl_json=license_curl.json_data, license_curl_cookies=license_curl.cookies)
elif switches.generic_remote:
# Perform action for --generic-remote
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, api_key=api_key, remote=True)
print(f'Saved at {file[0]}')
else:
Sites.Generic.decrypt_generic_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
elif switches.generic:
# If no switch is provided, perform a default action
if switches.web_dl:
mpd = input("MPD URL: ")
file = Helpers.download.web_dl_generic(mpd=mpd, device=device)
print(f'Saved at {file[0]}')
else:
Sites.Generic.decrypt_generic(wvd=device, license_curl_headers=license_curl.headers)
else: else:
# Perform default action if no switch is provided
Helpers.gui.start_gui(wvd=device, api_key=api_key) Helpers.gui.start_gui(wvd=device, api_key=api_key)