Added Linux support
This commit is contained in:
TPD94 2023-12-25 03:18:50 -05:00
parent 3efaadffc5
commit 24b61e279f
10 changed files with 724 additions and 81 deletions

View File

@ -5,4 +5,5 @@ from . import database_check
from . import cache_key from . import cache_key
from . import mpd_parse from . import mpd_parse
from . import download from . import download
from . import binary_check from . import binary_check
from . import os_check

View File

@ -1,7 +1,11 @@
import os import os
import subprocess
import zipfile import zipfile
import shutil import shutil
import requests import requests
import tarfile
import stat
from Helpers import os_check
from tqdm import tqdm from tqdm import tqdm
@ -19,56 +23,113 @@ def create_folders():
# Create / Check binaries function # Create / Check binaries function
def create_binaries(): def create_binaries():
# Check which OS the host is
operating_system = os_check.get_os_specific()
# Set binary dictionaries for Windows / Linux
windows_binaries = ["n_m3u8dl-re.exe", "mp4decrypt.exe", "ffmpeg.exe", "yt-dlp.exe"]
linux_binaries = ["n_m3u8dl-re", "mp4decrypt", "ffmpeg"]
if operating_system == "Windows":
binary_list = windows_binaries
if operating_system == "Linux":
binary_list = linux_binaries
# Check if the required binaries exist, if not, download them. # Check if the required binaries exist, if not, download them.
# Iterate through required binaries # Iterate through required binaries
for binary in ["n_m3u8dl-re.exe", "mp4decrypt.exe", "ffmpeg.exe", "yt-dlp.exe"]: for binary in binary_list:
# Perform checks for each binary # Perform checks for each binary
if not os.path.isfile(f"{os.getcwd()}/binaries/{binary}"): if not os.path.isfile(f"{os.getcwd()}/binaries/{binary}"):
# FFmpeg # FFmpeg
if binary == "ffmpeg.exe": if binary == "ffmpeg.exe" or binary =="ffmpeg":
# Download windows zip file for FFmpeg # Download windows zip file for FFmpeg
ffmpeg_download = requests.get( if operating_system == "Windows":
"https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip", ffmpeg_download = requests.get(
stream=True) "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip",
stream=True)
if operating_system == "Linux":
ffmpeg_download = requests.get(
"https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz",
stream=True)
total_size = int(ffmpeg_download.headers.get('content-length', 0)) total_size = int(ffmpeg_download.headers.get('content-length', 0))
with open(f"{os.getcwd()}/download/temp/ffmpeg.zip", 'wb') as download: if operating_system == "Windows":
with tqdm(total=total_size, unit='B', unit_scale=True, with open(f"{os.getcwd()}/download/temp/ffmpeg.zip", 'wb') as download:
desc="Downloading ffmpeg.zip") as progress_bar: with tqdm(total=total_size, unit='B', unit_scale=True,
for data in ffmpeg_download.iter_content(chunk_size=1024): desc="Downloading ffmpeg.zip") as progress_bar:
download.write(data) for data in ffmpeg_download.iter_content(chunk_size=1024):
progress_bar.update(len(data)) download.write(data)
progress_bar.update(len(data))
if operating_system == "Linux":
with open(f"{os.getcwd()}/download/temp/ffmpeg.tar.xz", 'wb') as download:
with tqdm(total=total_size, unit='B', unit_scale=True,
desc="Downloading ffmpeg.tar.xz") as progress_bar:
for data in ffmpeg_download.iter_content(chunk_size=1024):
download.write(data)
progress_bar.update(len(data))
# Unzip FFmpeg # Unzip FFmpeg if Windows
with zipfile.ZipFile(f"{os.getcwd()}/download/temp/ffmpeg.zip", "r") as ffmpeg_zip: if operating_system == "Windows":
file_count = len(ffmpeg_zip.infolist()) with zipfile.ZipFile(f"{os.getcwd()}/download/temp/ffmpeg.zip", "r") as ffmpeg_zip:
with tqdm(total=file_count, unit='file', desc="Extracting ffmpeg.zip") as unzip_progress_bar: file_count = len(ffmpeg_zip.infolist())
for file in ffmpeg_zip.infolist(): with tqdm(total=file_count, unit='file', desc="Extracting ffmpeg.zip") as unzip_progress_bar:
ffmpeg_zip.extract(file, path=f"{os.getcwd()}/download/temp") for file in ffmpeg_zip.infolist():
unzip_progress_bar.update(1) ffmpeg_zip.extract(file, path=f"{os.getcwd()}/download/temp")
unzip_progress_bar.update(1)
# Copy ffmpeg binary to binaries # Untar FFmpeg if Linux
shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl/bin/ffmpeg.exe", if operating_system == "Linux":
f"{os.getcwd()}/binaries") with tarfile.open(f"{os.getcwd()}/download/temp/ffmpeg.tar.xz", 'r:xz') as ffmpeg_tar_xz:
file_count = len(ffmpeg_tar_xz.getmembers())
with tqdm(total=file_count, unit='file', desc=f"Extracting ffmpeg.tar.xz") as untar_xz_progress_bar:
for file in ffmpeg_tar_xz:
ffmpeg_tar_xz.extract(file, path=f"{os.getcwd()}/download/temp")
untar_xz_progress_bar.update(1)
# Remove the zip # Copy ffmpeg binary to binaries if Windows
os.remove(f"{os.getcwd()}/download/temp/ffmpeg.zip") if operating_system == "Windows":
shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl/bin/ffmpeg.exe",
f"{os.getcwd()}/binaries")
# Remove the folder # Copy ffmpeg binary to binaries if linux
shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl") if operating_system == "Linux":
shutil.copy2(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-linux64-gpl/bin/ffmpeg",
f"{os.getcwd()}/binaries")
os.chmod(f"{os.getcwd()}/binaries/ffmpeg", stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
# Remove the zip if Windows
if operating_system == "Windows":
os.remove(f"{os.getcwd()}/download/temp/ffmpeg.zip")
# Remove the .tar.xz and .tar if linux
if operating_system == "Linux":
os.remove(f"{os.getcwd()}/download/temp/ffmpeg.tar.xz")
# Remove the folder if windows
if operating_system == "Windows":
shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-win64-gpl")
# Remove the folder if linux
if operating_system == "Linux":
shutil.rmtree(f"{os.getcwd()}/download/temp/ffmpeg-master-latest-linux64-gpl")
# Print a new line # Print a new line
print() print()
# MP4 Decrypt # MP4 Decrypt
elif binary == "mp4decrypt.exe": elif binary == "mp4decrypt.exe" or binary == "mp4decrypt":
# Download mp4decrypt zip file # Download mp4decrypt zip file
mp4decrypt_download = requests.get( if operating_system == "Windows":
"https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32.zip", stream=True) mp4decrypt_download = requests.get(
"https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32.zip", stream=True)
if operating_system == "Linux":
mp4decrypt_download = requests.get(
"https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-641.x86_64-unknown-linux.zip",
stream=True)
total_size = int(mp4decrypt_download.headers.get('content-length', 0)) total_size = int(mp4decrypt_download.headers.get('content-length', 0))
with open(f"{os.getcwd()}/download/temp/mp4decrypt.zip", 'wb') as download: with open(f"{os.getcwd()}/download/temp/mp4decrypt.zip", 'wb') as download:
with tqdm(total=total_size, unit='B', unit_scale=True, with tqdm(total=total_size, unit='B', unit_scale=True,
@ -85,77 +146,155 @@ def create_binaries():
mp4decrypt_zip.extract(file, path=f"{os.getcwd()}/download/temp") mp4decrypt_zip.extract(file, path=f"{os.getcwd()}/download/temp")
unzip_progress_bar.update(1) unzip_progress_bar.update(1)
# Copy mp4decrypt binary to binaries # Copy mp4decrypt binary to binaries if windows
shutil.copy2( if operating_system == "Windows":
f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32/bin/mp4decrypt.exe", shutil.copy2(
f"{os.getcwd()}/binaries") f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32/bin/mp4decrypt.exe",
f"{os.getcwd()}/binaries")
# Copy mp4decrypt binary to binaries if Linux
if operating_system == "Linux":
shutil.copy2(
f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-641.x86_64-unknown-linux/bin/mp4decrypt",
f"{os.getcwd()}/binaries")
os.chmod(f"{os.getcwd()}/binaries/mp4decrypt", stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
# Deleting the zip file # Deleting the zip file
os.remove(f"{os.getcwd()}/download/temp/mp4decrypt.zip") os.remove(f"{os.getcwd()}/download/temp/mp4decrypt.zip")
# Deleting the directory # Deleting the directory if Windows
shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32") if operating_system == "Windows":
shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-639.x86_64-microsoft-win32")
# Deleting the directory if Linux
if operating_system == "Linux":
shutil.rmtree(f"{os.getcwd()}/download/temp/Bento4-SDK-1-6-0-641.x86_64-unknown-linux")
# Print a new line # Print a new line
print() print()
# n_m3u8dl-re # n_m3u8dl-re
elif binary == "n_m3u8dl-re.exe": elif binary == "n_m3u8dl-re.exe" or binary == "n_m3u8dl-re":
# Download n_m3u8dl-re zip file # Download n_m3u8dl-re zip file
n_m3u8dl_re_download = requests.get( if operating_system == "Windows":
"https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_win-x64_20230628.zip", n_m3u8dl_re_download = requests.get(
stream=True) "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_win-x64_20230628.zip",
stream=True)
if operating_system == "Linux":
n_m3u8dl_re_download = requests.get(
"https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.2.0-beta/N_m3u8DL-RE_Beta_linux-x64_20230628.tar.gz",
stream=True)
total_size = int(n_m3u8dl_re_download.headers.get('content-length', 0)) total_size = int(n_m3u8dl_re_download.headers.get('content-length', 0))
with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", 'wb') as download: if operating_system == "Windows":
with tqdm(total=total_size, unit='B', unit_scale=True, with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", 'wb') as download:
desc="Downloading n_m3u8dl-re.zip") as progress_bar: with tqdm(total=total_size, unit='B', unit_scale=True,
for data in n_m3u8dl_re_download.iter_content(chunk_size=1024): desc="Downloading n_m3u8dl-re.zip") as progress_bar:
download.write(data) for data in n_m3u8dl_re_download.iter_content(chunk_size=1024):
progress_bar.update(len(data)) download.write(data)
progress_bar.update(len(data))
if operating_system == "Linux":
with open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.tar.gz", 'wb') as download:
with tqdm(total=total_size, unit='B', unit_scale=True,
desc="Downloading n_m3u8dl-re.tar.gz") as progress_bar:
for data in n_m3u8dl_re_download.iter_content(chunk_size=1024):
download.write(data)
progress_bar.update(len(data))
# Unzip n_m3u8dl-re # Unzip n_m3u8dl-re if Windows
with zipfile.ZipFile(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", "r") as nm3u8dl_re_zip: if operating_system == "Windows":
file_count = len(nm3u8dl_re_zip.infolist()) with zipfile.ZipFile(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip", "r") as nm3u8dl_re_zip:
with tqdm(total=file_count, unit='file', desc="Extracting n_m3u8dl-re.zip") as unzip_progress_bar: file_count = len(nm3u8dl_re_zip.infolist())
for file in nm3u8dl_re_zip.infolist(): with tqdm(total=file_count, unit='file', desc="Extracting n_m3u8dl-re.zip") as unzip_progress_bar:
nm3u8dl_re_zip.extract(file, path=f"{os.getcwd()}/download/temp") for file in nm3u8dl_re_zip.infolist():
unzip_progress_bar.update(1) nm3u8dl_re_zip.extract(file, path=f"{os.getcwd()}/download/temp")
unzip_progress_bar.update(1)
# Copy n_m3u8dl-re binary to binaries # Untar n_m3u8dl-re if Linux
shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64/N_m3u8DL-RE.exe", if operating_system == "Linux":
f"{os.getcwd()}/binaries") with tarfile.open(f"{os.getcwd()}/download/temp/n_m3u8dl-re.tar.gz", 'r:gz') as n_m3u8dl_re_tar_gz:
file_count = len(n_m3u8dl_re_tar_gz.getmembers())
with tqdm(total=file_count, unit='file',
desc=f"Extracting n_m3u8dl-re.tar.gz") as untar_gz_progress_bar:
for file in n_m3u8dl_re_tar_gz:
n_m3u8dl_re_tar_gz.extract(file, path=f"{os.getcwd()}/download/temp")
untar_gz_progress_bar.update(1)
# Delete zip file # Copy n_m3u8dl-re binary to binaries if Windows
os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip") if operating_system == "Windows":
shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64/N_m3u8DL-RE.exe",
f"{os.getcwd()}/binaries")
# Delete directory # Copy n_m3u8dl-re to binaries if Linux
shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64") if operating_system == "Linux":
shutil.copy2(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_linux-x64/N_m3u8DL-RE",
f"{os.getcwd()}/binaries")
subprocess.run(['chmod', '+x', f"{os.getcwd()}/binaries/N_m3u8DL-RE"])
# Delete zip file if Windows
if operating_system == "Windows":
os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.zip")
# Deleter .tar.gz and .tar file if Linux
if operating_system == "Linux":
os.remove(f"{os.getcwd()}/download/temp/n_m3u8dl-re.tar.gz")
# Delete directory if Windows
if operating_system == "Windows":
shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_win-x64")
if operating_system == "Linux":
shutil.rmtree(f"{os.getcwd()}/download/temp/N_m3u8DL-RE_Beta_linux-x64")
# Print a new line # Print a new line
print() print()
# YT-DLP # YT-DLP
elif binary == "yt-dlp.exe": elif binary == "yt-dlp.exe" or binary == "yt-dlp":
# Download yt-dlp exe # Download yt-dlp exe if windows
yt_dlp_download = requests.get( if operating_system == "Windows":
"https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_x86.exe", yt_dlp_download = requests.get(
stream=True) "https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_x86.exe",
stream=True)
if operating_system == "Linux":
yt_dlp_download = requests.get(
"https://github.com/yt-dlp/yt-dlp/releases/download/2023.11.16/yt-dlp_linux",
stream=True)
total_size = int(yt_dlp_download.headers.get('content-length', 0)) total_size = int(yt_dlp_download.headers.get('content-length', 0))
with open(f"{os.getcwd()}/download/yt-dlp.exe", 'wb') as download: if operating_system == "Windows":
with tqdm(total=total_size, unit='B', unit_scale=True, with open(f"{os.getcwd()}/download/yt-dlp.exe", 'wb') as download:
desc="Downloading yt-dlp") as progress_bar: with tqdm(total=total_size, unit='B', unit_scale=True,
for data in yt_dlp_download.iter_content(chunk_size=1024): desc="Downloading yt-dlp") as progress_bar:
download.write(data) for data in yt_dlp_download.iter_content(chunk_size=1024):
progress_bar.update(len(data)) download.write(data)
progress_bar.update(len(data))
if operating_system == "Linux":
with open(f"{os.getcwd()}/download/yt-dlp", 'wb') as download:
with tqdm(total=total_size, unit='B', unit_scale=True,
desc="Downloading yt-dlp") as progress_bar:
for data in yt_dlp_download.iter_content(chunk_size=1024):
download.write(data)
progress_bar.update(len(data))
# Copy yt-dlp binary to binaries # Copy yt-dlp binary to binaries if Windows
shutil.copy2(f"{os.getcwd()}/download/yt-dlp.exe", if operating_system == "Windows":
f"{os.getcwd()}/binaries") shutil.copy2(f"{os.getcwd()}/download/yt-dlp.exe",
f"{os.getcwd()}/binaries")
# Remove binary from download folder # Copy yt-dlp binary to binaries if Linux
os.remove(f"{os.getcwd()}/download/yt-dlp.exe") if operating_system == "Linux":
shutil.copy2(f"{os.getcwd()}/download/yt-dlp",
f"{os.getcwd()}/binaries")
os.chmod(f"{os.getcwd()}/binaries/yt-dlp", stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
# Remove binary from download folder if Windows
if operating_system == "Windows":
os.remove(f"{os.getcwd()}/download/yt-dlp.exe")
# Remove binary from download folder if Linux
if operating_system == "Linux":
os.remove(f"{os.getcwd()}/download/yt-dlp")
# Print a new line # Print a new line
print() print()

View File

@ -1,16 +1,19 @@
import subprocess import subprocess
from os import urandom
import uuid import uuid
import glob import glob
import os import os
import Helpers.binary_check import Helpers.binary_check
import Sites.Generic import Sites.Generic
import license_curl import license_curl
import Helpers.os_check
# Web Download function generic # Web Download function generic
def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False): def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False):
# Get the current operating system
operating_system = Helpers.os_check.get_os_specific()
# Check for folders # Check for folders
Helpers.binary_check.create_folders() Helpers.binary_check.create_folders()
@ -45,6 +48,10 @@ def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, rem
'--mux-after-done', '--mux-after-done',
'format=mkv' 'format=mkv'
] + mp4decrypt_keys ] + mp4decrypt_keys
if operating_system == "Linux":
n_m3u8dl_re_download[0] = f'{os.getcwd()}/binaries/N_m3u8DL-RE'
n_m3u8dl_re_download[3] = f'{os.getcwd()}/binaries/ffmpeg'
n_m3u8dl_re_download[5] = f'{os.getcwd()}/binaries/mp4decrypt'
subprocess.run(n_m3u8dl_re_download) subprocess.run(n_m3u8dl_re_download)
@ -58,6 +65,9 @@ def web_dl_generic(mpd: str = None, device: str = None, api_key: str = None, rem
# Web Download crunchyroll function # Web Download crunchyroll function
def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False): def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None, remote: bool = False):
# Get the current operating system
operating_system = Helpers.os_check.get_os_specific()
# Check for folders # Check for folders
Helpers.binary_check.create_folders() Helpers.binary_check.create_folders()
@ -94,6 +104,10 @@ def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None,
'--mux-after-done', '--mux-after-done',
'format=mkv' 'format=mkv'
] + mp4decrypt_keys ] + mp4decrypt_keys
if operating_system == "Linux":
n_m3u8dl_re_download[0] = f'{os.getcwd()}/binaries/N_m3u8DL-RE'
n_m3u8dl_re_download[5] = f'{os.getcwd()}/binaries/ffmpeg'
n_m3u8dl_re_download[7] = f'{os.getcwd()}/binaries/mp4decrypt'
subprocess.run(n_m3u8dl_re_download) subprocess.run(n_m3u8dl_re_download)
@ -107,6 +121,9 @@ def web_dl_crunchyroll(mpd: str = None, device: str = None, api_key: str = None,
# YouTube Download function generic # YouTube Download function generic
def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote: bool = False): def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote: bool = False):
# Get the current operating system
operating_system = Helpers.os_check.get_os_specific()
# Check for folders # Check for folders
Helpers.binary_check.create_folders() Helpers.binary_check.create_folders()
@ -136,6 +153,8 @@ def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote
'res:720', 'res:720',
f'{url}' f'{url}'
] ]
if operating_system == "Linux":
yt_dlp_download[0] = f'{os.getcwd()}/binaries/yt-dlp'
# Run yt-dlp # Run yt-dlp
subprocess.run(yt_dlp_download) subprocess.run(yt_dlp_download)
@ -158,6 +177,8 @@ def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote
f'{file}', f'{file}',
f'{os.getcwd()}/download/{file_name}', f'{os.getcwd()}/download/{file_name}',
] + mp4decrypt_keys ] + mp4decrypt_keys
if operating_system == "Linux":
mp4_decrypt[0] = f'{os.getcwd()}/binaries/mp4decrypt'
# Run mp4decrypt # Run mp4decrypt
subprocess.run(mp4_decrypt) subprocess.run(mp4_decrypt)
@ -181,6 +202,8 @@ def youtube_dlp(url: str = None, device: str = None, api_key: str = None, remote
'copy', 'copy',
f"{os.getcwd()}/download/{final_mux}.mkv", f"{os.getcwd()}/download/{final_mux}.mkv",
] ]
if operating_system == "Linux":
ffmpeg_merge[0] = f"{os.getcwd()}/binaries/ffmpeg"
# Run ffmpeg to merge the files # Run ffmpeg to merge the files
subprocess.run(ffmpeg_merge) subprocess.run(ffmpeg_merge)

10
Helpers/os_check.py Normal file
View File

@ -0,0 +1,10 @@
import os
def get_os_specific():
if os.name == 'nt': # 'nt' stands for Windows
return "Windows"
elif os.name == 'posix': # 'posix' stands for Linux/Unix
return "Linux"
else:
return "Unknown"

View File

@ -18,6 +18,6 @@ How to use:
7. Paste dictionaries from license request curl post request into `License_curl.py` 7. Paste dictionaries from license request curl post request into `License_curl.py`
8. Run with `python tpd-keys.py` 8. Run with `python tpd-keys.py` for just decryption keys or `python tpd-keys.py --web-dl` to get decryption keys plus download the content
To view additional options you can use `python tpd-keys.py -h` To view additional options you can use `python tpd-keys.py -h`

196
Sites/Canal.py Normal file
View File

@ -0,0 +1,196 @@
# Import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import requests
import base64
import os
import Helpers
# Defining decrypt function for canal plus
def decrypt_canal_plus(wvd: str = None, license_curl_headers: dict = None, license_curl_json: dict = None, mpd_url: str = None):
# Exit if no device
if wvd is None:
exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
# prepare pssh
pssh = PSSH(input_pssh)
# Ask for license URL
license_url = "https://ltv.slc-app-aka.prod.bo.canal.canalplustech.pro/ottlivetv/api/V4/zones/cpfra/locations/FR/devices/11/apps/1/jobs/GetLicence"
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
challenge = cdm.get_license_challenge(session_id, pssh)
# Set the challenge in the json data
license_curl_json['ServiceRequest']['InData']['ChallengeInfo'] = base64.b64encode(challenge).decode()
# send license challenge
license = requests.post(
url=license_url,
headers=license_curl_headers,
json=license_curl_json
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
license = license.json()['ServiceResponse']['OutData']['LicenseInfo']
# parse license challenge
cdm.parse_license(session_id, license)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out the keys
print(f'\nKeys:\n{returned_keys}')
# Return the keys for future ripper use.
return mp4decrypt_keys
# Defining remote decrypt function for generic services
def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None):
# Exit if no API key
if api_key is None:
exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
# Ask for license URL
input_license_url = input(f"\nLicense URL: ")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
data=challenge
)
# Retrieve the license message
license = base64.b64encode(license.content).decode()
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'\nKeys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
# Return mp4decrypt keys
return mp4decrypt_keys

259
Sites/VDOCipher.py Normal file
View File

@ -0,0 +1,259 @@
# Import dependencies
from pywidevine import PSSH
from pywidevine import Cdm
from pywidevine import Device
import json
import requests
import base64
import os
import Helpers
import re
# Defining decrypt function for generic services
def decrypt_vdocipher(wvd: str = None, url_curl_headers: dict = None, url_curl_cookies: dict = None, video_url: str = None):
# Exit if no device
if wvd is None:
exit(f"No CDM! to use local decryption place a .wvd in {os.getcwd()}/WVDs")
# Ask for URL of web page
if video_url is None:
url = input("URL: ")
# Send a get request to the URL specified
url_get_request = requests.get(url=url, headers=url_curl_headers, cookies=url_curl_cookies)
# Try to find the OTP from the get request
try:
otp_match = re.findall(r"otp: '(.*)',", url_get_request.text)[0]
playbackinfo_match = re.findall(r"playbackInfo: '(.*)',", url_get_request.text)[0]
except IndexError:
try:
otp_match = re.findall(r"otp=(.*)&", url_get_request.text)[0]
playbackinfo_match = re.findall(r"playbackInfo=(.*)", url_get_request.text)[0]
except IndexError:
print("\nAn error occured while getting otp/playback")
exit()
# Get the video id from playbackinfo_match
video_id = json.loads(base64.b64decode(playbackinfo_match).decode())["videoId"]
# Send a get request to acquire the license URL
license_get_request = requests.get(url=f'https://dev.vdocipher.com/api/meta/{video_id}', headers=url_curl_headers)
# Try to extract the license URL from the license get request
try:
license_url_match = license_get_request.json()["dash"]["licenseServers"]["com.widevine.alpha"].rsplit(":", 1)[0]
mpd = license_get_request.json()["dash"]["manifest"]
except KeyError:
print("\n An error occured while getting mpd/license url")
# Send a get request to acquire the MPD
mpd_get_request = requests.get(url=mpd, headers=url_curl_headers, cookies=url_curl_cookies)
# Regular expression search the mpd get request for PSSH
input_pssh = re.search(r"<cenc:pssh>(.*)</cenc:pssh>", mpd_get_request.text).group(1)
# prepare pssh
pssh = PSSH(input_pssh)
# load device
device = Device.load(wvd)
# load CDM from device
cdm = Cdm.from_device(device)
# open CDM session
session_id = cdm.open()
# Set service cert token
service_cert_token = {
"otp": otp_match,
"playbackInfo": playbackinfo_match,
"href": url,
"tech": "wv",
"licenseRequest": f"{base64.b64encode(cdm.service_certificate_challenge).decode()}"
}
# Convert service cert token to JSON
service_cert_json_data = {
'token': f'{base64.b64encode(json.dumps(service_cert_token).encode("utf-8")).decode()}',
}
# get service certificate
service_cert = requests.post(
url=license_url_match,
json=service_cert_json_data,
headers=url_curl_headers
)
if service_cert.status_code != 200:
print("Couldn't retrieve service cert")
else:
service_cert = service_cert.json()["license"]
cdm.set_service_certificate(session_id, service_cert)
# Generate license challenge
if service_cert:
challenge = cdm.get_license_challenge(session_id, pssh, privacy_mode=True)
else:
challenge = cdm.get_license_challenge(session_id, pssh)
# Declare token dictionary for license challenge
token = {
"otp": otp_match,
"playbackInfo": playbackinfo_match,
"href": url,
"tech": "wv",
"licenseRequest": f"{base64.b64encode(challenge).decode()}"
}
# Convert token dictionary into JSON data
json_data = {
'token': f'{base64.b64encode(json.dumps(token).encode("utf-8")).decode()}',
}
# send license challenge
license = requests.post(
url=license_url_match,
headers=url_curl_headers,
cookies=url_curl_cookies,
json=json_data
)
if license.status_code != 200:
print(license.content)
exit("Could not complete license challenge")
# Extract license from json dict
license = license.json()["license"]
# parse license challenge
cdm.parse_license(session_id, license)
# assign variable for returned keys
returned_keys = ""
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
returned_keys += f"{key.kid.hex}:{key.key.hex()}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in cdm.get_keys(session_id):
if key.type != "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f'{key.kid.hex}:{key.key.hex()}')
# close session, disposes of session data
cdm.close(session_id)
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out the keys
print(f'\nKeys:\n{returned_keys}')
# Return the keys for future ripper use.
return mp4decrypt_keys, mpd
# Defining remote decrypt function for generic services
def decrypt_generic_remotely(api_key: str = None, license_curl_headers: dict = None, mpd_url: str = None):
# Exit if no API key
if api_key is None:
exit(f"No API Key! to use remote decryption place an API key in {os.getcwd()}/Config/api-key.txt")
# Set CDM Project API URL
api_url = "https://api.cdm-project.com"
# Set API device
api_device = "CDM"
# Try getting pssh via MPD URL if web-dl
if mpd_url is not None:
input_pssh = Helpers.mpd_parse.parse_pssh(mpd_url)
if input_pssh is not None:
print(f'\nPSSH found: {input_pssh}')
else:
input_pssh = input(f"\nPSSH not found! Input PSSH: ")
# Ask for PSSH if just keys function
if mpd_url is None:
# Ask for PSSH if web-dl not selected:
input_pssh = input(f"\nPSSH: ")
# Ask for license URL
input_license_url = input(f"\nLicense URL: ")
# Set headers for API key
api_key_headers = {
"X-Secret-Key": api_key
}
# Open CDM session
open_session = requests.get(url=f'{api_url}/{api_device}/open', headers=api_key_headers)
# Get the session ID from the open CDM session
session_id = open_session.json()["data"]["session_id"]
# Set JSON required to generate a license challenge
generate_challenge_json = {
"session_id": session_id,
"init_data": input_pssh
}
# Generate the license challenge
generate_challenge = requests.post(url=f'{api_url}/{api_device}/get_license_challenge/AUTOMATIC', headers=api_key_headers, json=generate_challenge_json)
# Retrieve the challenge and base64 decode it
challenge = base64.b64decode(generate_challenge.json()["data"]["challenge_b64"])
# Send the challenge to the widevine license server
license = requests.post(
url=input_license_url,
headers=license_curl_headers,
data=challenge
)
# Retrieve the license message
license = base64.b64encode(license.content).decode()
# Set JSON required to parse license message
license_message_json = {
"session_id": session_id,
"license_message": license
}
# Parse the license
requests.post(url=f'{api_url}/{api_device}/parse_license', headers=api_key_headers, json=license_message_json)
# Retrieve the keys
get_keys = requests.post(url=f'{api_url}/{api_device}/get_keys/ALL',
json={"session_id": session_id},
headers=api_key_headers)
# Iterate through the keys, ignoring signing key
returned_keys = ''
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
returned_keys += f"{key['key_id']}:{key['key']}\n"
# assign variable for mp4decrypt keys
mp4decrypt_keys = []
for key in get_keys.json()["data"]["keys"]:
if not key["type"] == "SIGNING":
mp4decrypt_keys.append('--key')
mp4decrypt_keys.append(f"{key['key_id']}:{key['key']}")
# Cache the keys
Helpers.cache_key.cache_keys(pssh=input_pssh, keys=returned_keys)
# Print out keys
print(f'\nKeys:\n{returned_keys}')
# Close session
requests.get(url=f'{api_url}/{api_device}/close/{session_id}', headers=api_key_headers)
# Return mp4decrypt keys
return mp4decrypt_keys

View File

@ -1,3 +1,5 @@
from . import Crunchyroll from . import Crunchyroll
from . import Generic from . import Generic
from . import YouTube from . import YouTube
from . import VDOCipher
from . import Canal

View File

@ -0,0 +1,15 @@
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
# 'Accept-Encoding': 'gzip, deflate, br',
'Content-Type': 'application/octet-stream',
'DNT': '1',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'Sec-GPC': '1',
'Connection': 'keep-alive',
# Requests doesn't support trailers
# 'TE': 'trailers',
}

View File

@ -1,5 +1,4 @@
# Import dependencies # Import dependencies
import os
import Helpers import Helpers
import Sites import Sites
import license_curl import license_curl
@ -30,8 +29,6 @@ parser.add_argument('--web-dl', help="Web download", action='store_true')
# Assign the switches a variable # Assign the switches a variable
switches = parser.parse_args() switches = parser.parse_args()
# Based on the selected switch within the mutually exclusive group, perform actions
if switches.crunchyroll: if switches.crunchyroll:
# Perform action for --crunchyroll # Perform action for --crunchyroll
if switches.web_dl: if switches.web_dl:
@ -51,6 +48,7 @@ elif switches.crunchyroll_remote:
else: else:
Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers) Sites.Crunchyroll.decrypt_crunchyroll_remotely(api_key=api_key, license_curl_headers=license_curl.headers)
elif switches.youtube: elif switches.youtube:
# Perform action for --YouTube # Perform action for --YouTube
if switches.web_dl: if switches.web_dl: