diff --git a/Helpers/__init__.py b/Helpers/__init__.py index 1961d81..11feeb4 100644 --- a/Helpers/__init__.py +++ b/Helpers/__init__.py @@ -5,4 +5,5 @@ from . import database_check from . import cache_key from . import mpd_parse from . import download -from . import binary_check \ No newline at end of file +from . import binary_check +from . import os_check \ No newline at end of file diff --git a/Helpers/binary_check.py b/Helpers/binary_check.py index 23e15a9..7eb43f9 100644 --- a/Helpers/binary_check.py +++ b/Helpers/binary_check.py @@ -1,7 +1,11 @@ import os +import subprocess import zipfile import shutil import requests +import tarfile +import stat +from Helpers import os_check from tqdm import tqdm @@ -19,56 +23,113 @@ def create_folders(): # Create / Check binaries function def create_binaries(): + + # Check which OS the host is + operating_system = os_check.get_os_specific() + + # Set binary dictionaries for Windows / Linux + windows_binaries = ["n_m3u8dl-re.exe", "mp4decrypt.exe", "ffmpeg.exe", "yt-dlp.exe"] + linux_binaries = ["n_m3u8dl-re", "mp4decrypt", "ffmpeg"] + if operating_system == "Windows": + binary_list = windows_binaries + if operating_system == "Linux": + binary_list = linux_binaries + # Check if the required binaries exist, if not, download them. # Iterate through required binaries - for binary in ["n_m3u8dl-re.exe", "mp4decrypt.exe", "ffmpeg.exe", "yt-dlp.exe"]: + for binary in binary_list: # Perform checks for each binary if not os.path.isfile(f"{os.getcwd()}/binaries/{binary}"): # FFmpeg - if binary == "ffmpeg.exe": + if binary == "ffmpeg.exe" or binary =="ffmpeg": # Download windows zip file for FFmpeg - ffmpeg_download = requests.get( - "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip", - stream=True) + if operating_system == "Windows": + ffmpeg_download = requests.get( + "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip", + stream=True) + if operating_system == "Linux": + ffmpeg_download = requests.get( + "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz", + stream=True) total_size = int(ffmpeg_download.headers.get('content-length', 0)) - with open(f"{os.getcwd()}/download/temp/ffmpeg.zip", 'wb') as download: - with tqdm(total=total_size, unit='B', unit_scale=True, - desc="Downloading ffmpeg.zip") as progress_bar: - for data in ffmpeg_download.iter_content(chunk_size=1024): - download.write(data) - progress_bar.update(len(data)) + if operating_system == "Windows": + with open(f"{os.getcwd()}/download/temp/ffmpeg.zip", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading ffmpeg.zip") as progress_bar: + for data in ffmpeg_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) + if operating_system == "Linux": + with open(f"{os.getcwd()}/download/temp/ffmpeg.tar.xz", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading ffmpeg.tar.xz") as progress_bar: + for data in ffmpeg_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) - # Unzip FFmpeg - with zipfile.ZipFile(f"{os.getcwd()}/download/temp/ffmpeg.zip", "r") as ffmpeg_zip: - file_count = len(ffmpeg_zip.infolist()) - with tqdm(total=file_count, unit='file', desc="Extracting ffmpeg.zip") as unzip_progress_bar: - for file in ffmpeg_zip.infolist(): - ffmpeg_zip.extract(file, path=f"{os.getcwd()}/download/temp") - unzip_progress_bar.update(1) + # Unzip FFmpeg if Windows + if operating_system == "Windows": + with zipfile.ZipFile(f"{os.getcwd()}/download/temp/ffmpeg.zip", "r") as ffmpeg_zip: + file_count = len(ffmpeg_zip.infolist()) + with tqdm(total=file_count, unit='file', desc="Extracting ffmpeg.zip") as unzip_progress_bar: + for file in ffmpeg_zip.infolist(): + ffmpeg_zip.extract(file, path=f"{os.getcwd()}/download/temp") + unzip_progress_bar.update(1) - # Copy ffmpeg binary to binaries - shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl/bin/ffmpeg.exe", - f"{os.getcwd()}/binaries") + # Untar FFmpeg if Linux + if operating_system == "Linux": + with tarfile.open(f"{os.getcwd()}/download/temp/ffmpeg.tar.xz", 'r:xz') as ffmpeg_tar_xz: + file_count = len(ffmpeg_tar_xz.getmembers()) + with tqdm(total=file_count, unit='file', desc=f"Extracting ffmpeg.tar.xz") as untar_xz_progress_bar: + for file in ffmpeg_tar_xz: + ffmpeg_tar_xz.extract(file, path=f"{os.getcwd()}/download/temp") + untar_xz_progress_bar.update(1) - # Remove the zip - os.remove(f"{os.getcwd()}/download/temp/ffmpeg.zip") + # Copy ffmpeg binary to binaries if Windows + if operating_system == "Windows": + shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl/bin/ffmpeg.exe", + f"{os.getcwd()}/binaries") - # Remove the folder - shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl") + # Copy ffmpeg binary to binaries if linux + if operating_system == "Linux": + shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-linux64-gpl/bin/ffmpeg", + f"{os.getcwd()}/binaries") + os.chmod(f"{os.getcwd()}/binaries/ffmpeg", stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + # Remove the zip if Windows + if operating_system == "Windows": + os.remove(f"{os.getcwd()}/download/temp/ffmpeg.zip") + + # Remove the .tar.xz and .tar if linux + if operating_system == "Linux": + os.remove(f"{os.getcwd()}/download/temp/ffmpeg.tar.xz") + + # Remove the folder if windows + if operating_system == "Windows": + shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl") + + # Remove the folder if linux + if operating_system == "Linux": + shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-linux64-gpl") # Print a new line print() # MP4 Decrypt - elif binary == "mp4decrypt.exe": + elif binary == "mp4decrypt.exe" or binary == "mp4decrypt": # Download mp4decrypt zip file - mp4decrypt_download = requests.get( - "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32.zip", stream=True) + if operating_system == "Windows": + mp4decrypt_download = requests.get( + "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32.zip", stream=True) + if operating_system == "Linux": + mp4decrypt_download = requests.get( + "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-641.x86_64-unknown-linux.zip", + stream=True) total_size = int(mp4decrypt_download.headers.get('content-length', 0)) with open(f"{os.getcwd()}/download/temp/mp4decrypt.zip", 'wb') as download: with tqdm(total=total_size, unit='B', unit_scale=True, @@ -85,77 +146,155 @@ def create_binaries(): mp4decrypt_zip.extract(file, path=f"{os.getcwd()}/download/temp") unzip_progress_bar.update(1) - # Copy mp4decrypt binary to binaries - shutil.copy2( - f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32/bin/mp4decrypt.exe", - f"{os.getcwd()}/binaries") + # Copy mp4decrypt binary to binaries if windows + if operating_system == "Windows": + shutil.copy2( + f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32/bin/mp4decrypt.exe", + f"{os.getcwd()}/binaries") + + # Copy mp4decrypt binary to binaries if Linux + if operating_system == "Linux": + shutil.copy2( + f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-641.x86_64-unknown-linux/bin/mp4decrypt", + f"{os.getcwd()}/binaries") + os.chmod(f"{os.getcwd()}/binaries/mp4decrypt", stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) # Deleting the zip file os.remove(f"{os.getcwd()}/download/temp/mp4decrypt.zip") - # Deleting the directory - shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32") + # Deleting the directory if Windows + if operating_system == "Windows": + shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32") + + # Deleting the directory if Linux + if operating_system == "Linux": + shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-641.x86_64-unknown-linux") # Print a new line print() # n_m3u8dl-re - elif binary == "n_m3u8dl-re.exe": + elif binary == "n_m3u8dl-re.exe" or binary == "n_m3u8dl-re": # Download n_m3u8dl-re zip file - n_m3u8dl_re_download = requests.get( - "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_win-x64_20230628.zip", - stream=True) + if operating_system == "Windows": + n_m3u8dl_re_download = requests.get( + "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_win-x64_20230628.zip", + stream=True) + if operating_system == "Linux": + n_m3u8dl_re_download = requests.get( + "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_linux-x64_20230628.tar.gz", + stream=True) total_size = int(n_m3u8dl_re_download.headers.get('content-length', 0)) - with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", 'wb') as download: - with tqdm(total=total_size, unit='B', unit_scale=True, - desc="Downloading n_m3u8dl-re.zip") as progress_bar: - for data in n_m3u8dl_re_download.iter_content(chunk_size=1024): - download.write(data) - progress_bar.update(len(data)) + if operating_system == "Windows": + with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading n_m3u8dl-re.zip") as progress_bar: + for data in n_m3u8dl_re_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) + if operating_system == "Linux": + with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.tar.gz", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading n_m3u8dl-re.tar.gz") as progress_bar: + for data in n_m3u8dl_re_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) - # Unzip n_m3u8dl-re - with zipfile.ZipFile(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", "r") as nm3u8dl_re_zip: - file_count = len(nm3u8dl_re_zip.infolist()) - with tqdm(total=file_count, unit='file', desc="Extracting n_m3u8dl-re.zip") as unzip_progress_bar: - for file in nm3u8dl_re_zip.infolist(): - nm3u8dl_re_zip.extract(file, path=f"{os.getcwd()}/download/temp") - unzip_progress_bar.update(1) + # Unzip n_m3u8dl-re if Windows + if operating_system == "Windows": + with zipfile.ZipFile(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", "r") as nm3u8dl_re_zip: + file_count = len(nm3u8dl_re_zip.infolist()) + with tqdm(total=file_count, unit='file', desc="Extracting n_m3u8dl-re.zip") as unzip_progress_bar: + for file in nm3u8dl_re_zip.infolist(): + nm3u8dl_re_zip.extract(file, path=f"{os.getcwd()}/download/temp") + unzip_progress_bar.update(1) - # Copy n_m3u8dl-re binary to binaries - shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64/N_m3u8DL-RE.exe", - f"{os.getcwd()}/binaries") + # Untar n_m3u8dl-re if Linux + if operating_system == "Linux": + with tarfile.open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.tar.gz", 'r:gz') as n_m3u8dl_re_tar_gz: + file_count = len(n_m3u8dl_re_tar_gz.getmembers()) + with tqdm(total=file_count, unit='file', + desc=f"Extracting n_m3u8dl-re.tar.gz") as untar_gz_progress_bar: + for file in n_m3u8dl_re_tar_gz: + n_m3u8dl_re_tar_gz.extract(file, path=f"{os.getcwd()}/download/temp") + untar_gz_progress_bar.update(1) - # Delete zip file - os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip") + # Copy n_m3u8dl-re binary to binaries if Windows + if operating_system == "Windows": + shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64/N_m3u8DL-RE.exe", + f"{os.getcwd()}/binaries") - # Delete directory - shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64") + # Copy n_m3u8dl-re to binaries if Linux + if operating_system == "Linux": + shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_linux-x64/N_m3u8DL-RE", + f"{os.getcwd()}/binaries") + subprocess.run(['chmod', '+x', f"{os.getcwd()}/binaries/N_m3u8DL-RE"]) + + # Delete zip file if Windows + if operating_system == "Windows": + os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip") + + # Deleter .tar.gz and .tar file if Linux + if operating_system == "Linux": + os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.tar.gz") + + # Delete directory if Windows + if operating_system == "Windows": + shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64") + + if operating_system == "Linux": + shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_linux-x64") # Print a new line print() # YT-DLP - elif binary == "yt-dlp.exe": + elif binary == "yt-dlp.exe" or binary == "yt-dlp": - # Download yt-dlp exe - yt_dlp_download = requests.get( - "https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_x86.exe", - stream=True) + # Download yt-dlp exe if windows + if operating_system == "Windows": + yt_dlp_download = requests.get( + "https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_x86.exe", + stream=True) + if operating_system == "Linux": + yt_dlp_download = requests.get( + "https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_linux", + stream=True) total_size = int(yt_dlp_download.headers.get('content-length', 0)) - with open(f"{os.getcwd()}/download/yt-dlp.exe", 'wb') as download: - with tqdm(total=total_size, unit='B', unit_scale=True, - desc="Downloading yt-dlp") as progress_bar: - for data in yt_dlp_download.iter_content(chunk_size=1024): - download.write(data) - progress_bar.update(len(data)) + if operating_system == "Windows": + with open(f"{os.getcwd()}/download/yt-dlp.exe", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading yt-dlp") as progress_bar: + for data in yt_dlp_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) + if operating_system == "Linux": + with open(f"{os.getcwd()}/download/yt-dlp", 'wb') as download: + with tqdm(total=total_size, unit='B', unit_scale=True, + desc="Downloading yt-dlp") as progress_bar: + for data in yt_dlp_download.iter_content(chunk_size=1024): + download.write(data) + progress_bar.update(len(data)) - # Copy yt-dlp binary to binaries - shutil.copy2(f"{os.getcwd()}/download/yt-dlp.exe", - f"{os.getcwd()}/binaries") + # Copy yt-dlp binary to binaries if Windows + if operating_system == "Windows": + shutil.copy2(f"{os.getcwd()}/download/yt-dlp.exe", + f"{os.getcwd()}/binaries") - # Remove binary from download folder - os.remove(f"{os.getcwd()}/download/yt-dlp.exe") + # Copy yt-dlp binary to binaries if Linux + if operating_system == "Linux": + shutil.copy2(f"{os.getcwd()}/download/yt-dlp", + f"{os.getcwd()}/binaries") + os.chmod(f"{os.getcwd()}/binaries/yt-dlp", stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + # Remove binary from download folder if Windows + if operating_system == "Windows": + os.remove(f"{os.getcwd()}/download/yt-dlp.exe") + + # Remove binary from download folder if Linux + if operating_system == "Linux": + os.remove(f"{os.getcwd()}/download/yt-dlp") # Print a new line print() diff --git a/Helpers/download.py b/Helpers/download.py index 4c94d35..385e75e 100644 --- a/Helpers/download.py +++ b/Helpers/download.py @@ -1,16 +1,19 @@ import subprocess -from os import urandom import uuid import glob import os import Helpers.binary_check import Sites.Generic import license_curl +import Helpers.os_check # Web Download function generic def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False): + # Get the current operating system + operating_system = Helpers.os_check.get_os_specific() + # Check for folders Helpers.binary_check.create_folders() @@ -45,6 +48,10 @@ def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, rem '--mux-after-done', 'format=mkv' ] + mp4decrypt_keys + if operating_system == "Linux": + n_m3u8dl_re_download[0] = f'{os.getcwd()}/binaries/N_m3u8DL-RE' + n_m3u8dl_re_download[3] = f'{os.getcwd()}/binaries/ffmpeg' + n_m3u8dl_re_download[5] = f'{os.getcwd()}/binaries/mp4decrypt' subprocess.run(n_m3u8dl_re_download) @@ -58,6 +65,9 @@ def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, rem # Web Download crunchyroll function def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False): + # Get the current operating system + operating_system = Helpers.os_check.get_os_specific() + # Check for folders Helpers.binary_check.create_folders() @@ -94,6 +104,10 @@ def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None, '--mux-after-done', 'format=mkv' ] + mp4decrypt_keys + if operating_system == "Linux": + n_m3u8dl_re_download[0] = f'{os.getcwd()}/binaries/N_m3u8DL-RE' + n_m3u8dl_re_download[5] = f'{os.getcwd()}/binaries/ffmpeg' + n_m3u8dl_re_download[7] = f'{os.getcwd()}/binaries/mp4decrypt' subprocess.run(n_m3u8dl_re_download) @@ -107,6 +121,9 @@ def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None, # YouTube Download function generic def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote: bool = False): + # Get the current operating system + operating_system = Helpers.os_check.get_os_specific() + # Check for folders Helpers.binary_check.create_folders() @@ -136,6 +153,8 @@ def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote 'res:720', f'{url}' ] + if operating_system == "Linux": + yt_dlp_download[0] = f'{os.getcwd()}/binaries/yt-dlp' # Run yt-dlp subprocess.run(yt_dlp_download) @@ -158,6 +177,8 @@ def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote f'{file}', f'{os.getcwd()}/download/{file_name}', ] + mp4decrypt_keys + if operating_system == "Linux": + mp4_decrypt[0] = f'{os.getcwd()}/binaries/mp4decrypt' # Run mp4decrypt subprocess.run(mp4_decrypt) @@ -181,6 +202,8 @@ def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote 'copy', f"{os.getcwd()}/download/{final_mux}.mkv", ] + if operating_system == "Linux": + ffmpeg_merge[0] = f"{os.getcwd()}/binaries/ffmpeg" # Run ffmpeg to merge the files subprocess.run(ffmpeg_merge) diff --git a/Helpers/os_check.py b/Helpers/os_check.py new file mode 100644 index 0000000..6b70249 --- /dev/null +++ b/Helpers/os_check.py @@ -0,0 +1,10 @@ +import os + + +def get_os_specific(): + if os.name == 'nt': # 'nt' stands for Windows + return "Windows" + elif os.name == 'posix': # 'posix' stands for Linux/Unix + return "Linux" + else: + return "Unknown" \ No newline at end of file diff --git a/README.md b/README.md index fe5515c..f1e378f 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,6 @@ How to use: 7. Paste dictionaries from license request curl post request into `License_curl.py` -8. Run with `python tpd-keys.py` +8. Run with `python tpd-keys.py` for just decryption keys or `python tpd-keys.py --web-dl` to get decryption keys plus download the content To view additional options you can use `python tpd-keys.py -h` \ No newline at end of file diff --git a/Sites/Canal.py b/Sites/Canal.py new file mode 100644 index 0000000..acb5299 --- /dev/null +++ b/Sites/Canal.py @@ -0,0 +1,196 @@ +# Import dependencies + +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import requests +import base64 +import os +import Helpers + + +# Defining decrypt function for canal plus +def decrypt_canal_plus(wvd: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, mpd_url: str = None): + + # Exit if no device + if wvd is None: + exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs") + + # Try getting pssh via MPD URL if web-dl + if mpd_url is not None: + input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) + if input_pssh is not None: + print(f'\nPSSH found: {input_pssh}') + else: + input_pssh = input(f"\nPSSH not found! Input PSSH: ") + + # Ask for PSSH if just keys function + if mpd_url is None: + # Ask for PSSH if web-dl not selected: + input_pssh = input(f"\nPSSH: ") + + + # prepare pssh + pssh = PSSH(input_pssh) + + # Ask for license URL + license_url = "https://ltv.slc-app-aka.prod.bo.canal.canalplustech.pro/ottlivetv/api/V4/zones/cpfra/locations/FR/devices/11/apps/1/jobs/GetLicence" + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + + challenge = cdm.get_license_challenge(session_id, pssh) + + # Set the challenge in the json data + license_curl_json['ServiceRequest']['InData']['ChallengeInfo'] = base64.b64encode(challenge).decode() + + # send license challenge + license = requests.post( + url=license_url, + headers=license_curl_headers, + json=license_curl_json + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + license = license.json()['ServiceResponse']['OutData']['LicenseInfo'] + + # parse license challenge + cdm.parse_license(session_id, license) + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}') + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out the keys + print(f'\nKeys:\n{returned_keys}') + + # Return the keys for future ripper use. + return mp4decrypt_keys + + +# Defining remote decrypt function for generic services +def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None): + + # Exit if no API key + if api_key is None: + exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt") + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Try getting pssh via MPD URL if web-dl + if mpd_url is not None: + input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) + if input_pssh is not None: + print(f'\nPSSH found: {input_pssh}') + else: + input_pssh = input(f"\nPSSH not found! Input PSSH: ") + + # Ask for PSSH if just keys function + if mpd_url is None: + # Ask for PSSH if web-dl not selected: + input_pssh = input(f"\nPSSH: ") + + # Ask for license URL + input_license_url = input(f"\nLicense URL: ") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": input_pssh + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Send the challenge to the widevine license server + license = requests.post( + url=input_license_url, + headers=license_curl_headers, + data=challenge + ) + + # Retrieve the license message + license = base64.b64encode(license.content).decode() + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}") + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out keys + print(f'\nKeys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) + + # Return mp4decrypt keys + return mp4decrypt_keys diff --git a/Sites/VDOCipher.py b/Sites/VDOCipher.py new file mode 100644 index 0000000..5125d66 --- /dev/null +++ b/Sites/VDOCipher.py @@ -0,0 +1,259 @@ +# Import dependencies + +from pywidevine import PSSH +from pywidevine import Cdm +from pywidevine import Device +import json +import requests +import base64 +import os +import Helpers +import re + + +# Defining decrypt function for generic services +def decrypt_vdocipher(wvd: str = None, url_curl_headers: dict = None, url_curl_cookies: dict = None, video_url: str = None): + + # Exit if no device + if wvd is None: + exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs") + + # Ask for URL of web page + if video_url is None: + url = input("URL: ") + + # Send a get request to the URL specified + url_get_request = requests.get(url=url, headers=url_curl_headers, cookies=url_curl_cookies) + + # Try to find the OTP from the get request + try: + otp_match = re.findall(r"otp: '(.*)',", url_get_request.text)[0] + playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", url_get_request.text)[0] + except IndexError: + try: + otp_match = re.findall(r"otp=(.*)&", url_get_request.text)[0] + playbackinfo_match = re.findall(r"playbackInfo=(.*)", url_get_request.text)[0] + except IndexError: + print("\nAn error occured while getting otp/playback") + exit() + + # Get the video id from playbackinfo_match + video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"] + + # Send a get request to acquire the license URL + license_get_request = requests.get(url=f'https://dev.vdocipher.com/api/meta/{video_id}', headers=url_curl_headers) + + # Try to extract the license URL from the license get request + try: + license_url_match = license_get_request.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0] + mpd = license_get_request.json()["dash"]["manifest"] + except KeyError: + print("\n An error occured while getting mpd/license url") + + # Send a get request to acquire the MPD + mpd_get_request = requests.get(url=mpd, headers=url_curl_headers, cookies=url_curl_cookies) + + # Regular expression search the mpd get request for PSSH + input_pssh = re.search(r"(.*)", mpd_get_request.text).group(1) + + # prepare pssh + pssh = PSSH(input_pssh) + + # load device + device = Device.load(wvd) + + # load CDM from device + cdm = Cdm.from_device(device) + + # open CDM session + session_id = cdm.open() + + # Set service cert token + service_cert_token = { + "otp": otp_match, + "playbackInfo": playbackinfo_match, + "href": url, + "tech": "wv", + "licenseRequest": f"{base64.b64encode(cdm.service_certificate_challenge).decode()}" + } + + # Convert service cert token to JSON + service_cert_json_data = { + 'token': f'{base64.b64encode(json.dumps(service_cert_token).encode("utf-8")).decode()}', + } + + # get service certificate + service_cert = requests.post( + url=license_url_match, + json=service_cert_json_data, + headers=url_curl_headers + ) + if service_cert.status_code != 200: + print("Couldn't retrieve service cert") + else: + service_cert = service_cert.json()["license"] + cdm.set_service_certificate(session_id, service_cert) + + # Generate license challenge + if service_cert: + challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True) + else: + challenge = cdm.get_license_challenge(session_id, pssh) + + # Declare token dictionary for license challenge + token = { + "otp": otp_match, + "playbackInfo": playbackinfo_match, + "href": url, + "tech": "wv", + "licenseRequest": f"{base64.b64encode(challenge).decode()}" + } + + # Convert token dictionary into JSON data + json_data = { + 'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}', + } + + # send license challenge + license = requests.post( + url=license_url_match, + headers=url_curl_headers, + cookies=url_curl_cookies, + json=json_data + ) + + if license.status_code != 200: + print(license.content) + exit("Could not complete license challenge") + + # Extract license from json dict + license = license.json()["license"] + + # parse license challenge + cdm.parse_license(session_id, license) + + # assign variable for returned keys + returned_keys = "" + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + returned_keys += f"{key.kid.hex}:{key.key.hex()}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in cdm.get_keys(session_id): + if key.type != "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}') + + # close session, disposes of session data + cdm.close(session_id) + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out the keys + print(f'\nKeys:\n{returned_keys}') + + # Return the keys for future ripper use. + return mp4decrypt_keys, mpd + +# Defining remote decrypt function for generic services +def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None): + + # Exit if no API key + if api_key is None: + exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt") + + # Set CDM Project API URL + api_url = "https://api.cdm-project.com" + + # Set API device + api_device = "CDM" + + # Try getting pssh via MPD URL if web-dl + if mpd_url is not None: + input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url) + if input_pssh is not None: + print(f'\nPSSH found: {input_pssh}') + else: + input_pssh = input(f"\nPSSH not found! Input PSSH: ") + + # Ask for PSSH if just keys function + if mpd_url is None: + # Ask for PSSH if web-dl not selected: + input_pssh = input(f"\nPSSH: ") + + # Ask for license URL + input_license_url = input(f"\nLicense URL: ") + + # Set headers for API key + api_key_headers = { + "X-Secret-Key": api_key + } + + # Open CDM session + open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers) + + # Get the session ID from the open CDM session + session_id = open_session.json()["data"]["session_id"] + + # Set JSON required to generate a license challenge + generate_challenge_json = { + "session_id": session_id, + "init_data": input_pssh + } + + # Generate the license challenge + generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json) + + # Retrieve the challenge and base64 decode it + challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"]) + + # Send the challenge to the widevine license server + license = requests.post( + url=input_license_url, + headers=license_curl_headers, + data=challenge + ) + + # Retrieve the license message + license = base64.b64encode(license.content).decode() + + # Set JSON required to parse license message + license_message_json = { + "session_id": session_id, + "license_message": license + } + + # Parse the license + requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json) + + # Retrieve the keys + get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL', + json={"session_id": session_id}, + headers=api_key_headers) + + # Iterate through the keys, ignoring signing key + returned_keys = '' + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + returned_keys += f"{key['key_id']}:{key['key']}\n" + + # assign variable for mp4decrypt keys + mp4decrypt_keys = [] + for key in get_keys.json()["data"]["keys"]: + if not key["type"] == "SIGNING": + mp4decrypt_keys.append('--key') + mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}") + + # Cache the keys + Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys) + + # Print out keys + print(f'\nKeys:\n{returned_keys}') + + # Close session + requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers) + + # Return mp4decrypt keys + return mp4decrypt_keys diff --git a/Sites/__init__.py b/Sites/__init__.py index c216588..55e5c46 100644 --- a/Sites/__init__.py +++ b/Sites/__init__.py @@ -1,3 +1,5 @@ from . import Crunchyroll from . import Generic -from . import YouTube \ No newline at end of file +from . import YouTube +from . import VDOCipher +from . import Canal \ No newline at end of file diff --git a/license_curl.py b/license_curl.py index e69de29..780d16d 100644 --- a/license_curl.py +++ b/license_curl.py @@ -0,0 +1,15 @@ +headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0', + 'Accept': '*/*', + 'Accept-Language': 'en-US,en;q=0.5', + # 'Accept-Encoding': 'gzip, deflate, br', + 'Content-Type': 'application/octet-stream', + 'DNT': '1', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-site', + 'Sec-GPC': '1', + 'Connection': 'keep-alive', + # Requests doesn't support trailers + # 'TE': 'trailers', +} \ No newline at end of file diff --git a/tpd-keys.py b/tpd-keys.py index c6b6cc0..240c13e 100644 --- a/tpd-keys.py +++ b/tpd-keys.py @@ -1,5 +1,4 @@ # Import dependencies -import os import Helpers import Sites import license_curl @@ -30,8 +29,6 @@ parser.add_argument('--web-dl', help="Web download", action='store_true') # Assign the switches a variable switches = parser.parse_args() - -# Based on the selected switch within the mutually exclusive group, perform actions if switches.crunchyroll: # Perform action for --crunchyroll if switches.web_dl: @@ -51,6 +48,7 @@ elif switches.crunchyroll_remote: else: Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers) + elif switches.youtube: # Perform action for --YouTube if switches.web_dl: