Compare commits

..

No commits in common. "main" and "v2.1.3" have entirely different histories.
main ... v2.1.3

20 changed files with 496 additions and 1023 deletions

View File

@ -4,57 +4,6 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [2.2.1] - 2025-03-01
### Added
- Added private key function.
### Fixed
- Error extracting functions (symbols) for old libraries.
## [2.2.0] - 2025-01-19
### Added
- Added support for dynamic interception without the need for Ghidra (available only for Frida server versions greater than 16.6.0).
- Support for Android 16 developer version `Backlava` (SDK 36).
### Changed
- Added additional comments to help understand the script.
- Optimized file path management in parameters.
- Refactored the code globally.
- Added glossary documentation for DRM/Widevine.
- Restructured the documentation.
### Fixed
- Fixed inconsistency in logging messages for certain functions.
- Fixed server-generated curl command issues.
## [2.1.5] - 2025-01-12
### Added
- Added private key function.
### Changed
- Searching for the library via pattern rather than by name.
## [2.1.4] - 2024-11-19
### Changed
- Library disabler error messages are now displayed in `DEBUG` mode for improved verbosity.
### Fixed
- Fixed errors in ADB shell messages.
- Resolved issues with executing shell commands via `subprocess`.
## [2.1.3] - 2024-11-03 ## [2.1.3] - 2024-11-03
### Added ### Added
@ -446,10 +395,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Initial release of the project, laying the foundation for future enhancements and features. - Initial release of the project, laying the foundation for future enhancements and features.
[2.2.1]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.2.1
[2.2.0]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.2.0
[2.1.5]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.5
[2.1.4]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.4
[2.1.3]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.3 [2.1.3]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.3
[2.1.2]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.2 [2.1.2]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.2
[2.1.1]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.1 [2.1.1]: https://github.com/hyugogirubato/KeyDive/releases/tag/v2.1.1

View File

@ -3,7 +3,7 @@
KeyDive is a sophisticated Python script designed for precise extraction of Widevine L3 DRM (Digital Rights Management) keys from Android devices. This tool leverages the capabilities of the Widevine CDM (Content Decryption Module) to facilitate the recovery of DRM keys, enabling a deeper understanding and analysis of the Widevine L3 DRM implementation across various Android SDK versions. KeyDive is a sophisticated Python script designed for precise extraction of Widevine L3 DRM (Digital Rights Management) keys from Android devices. This tool leverages the capabilities of the Widevine CDM (Content Decryption Module) to facilitate the recovery of DRM keys, enabling a deeper understanding and analysis of the Widevine L3 DRM implementation across various Android SDK versions.
> [!IMPORTANT] > [!IMPORTANT]
> A minimum version of `frida-server 16.6.0` is required for dynamic dumps on OEM API 18+ (SDK > 33). Otherwise, extracted functions from Ghidra are required. > Support for OEM API 18+ (SDK > 33) requires the use of functions extracted from Ghidra.
## Features ## Features
@ -28,27 +28,19 @@ Before you begin, ensure you have the following prerequisites in place:
Follow these steps to set up KeyDive: Follow these steps to set up KeyDive:
1. Ensure all prerequisites are met (see above). 1. Ensure all prerequisites are met (see above).
2. Install KeyDive directly from PyPI: 2. Install KeyDive from PyPI using Poetry:
```shell ```shell
pip install keydive pip install keydive
``` ```
## Usage ## Usage
KeyDive enables secure extraction of Widevine L3 keys in a straightforward sequence: 1. Play a DRM-protected video on the target device.
2. Launch the KeyDive script.
1. Run the KeyDive script: 3. Reload the DRM-protected video on your device.
```bash 4. The script will automatically extract the Widevine L3 keys, saving them as follows:
keydive -kwp - `client_id.bin` - This file contains device identification information.
``` - `private_key.pem` - This file contains the RSA private key for decryption.
2. The script will install and launch the [Kaltura](https://github.com/kaltura/kaltura-device-info-android) DRM test app (if not already installed).
3. Follow these steps within the app:
- **Provision Widevine** (if the device isn't provisioned).
- **Refresh** to intercept the keybox or private key.
- **Test DRM Playback** to extract the challenge.
4. KeyDive automatically captures the Widevine keys, saving them as:
- `client_id.bin` (device identification data).
- `private_key.pem` (RSA private key).
This sequence ensures that the DRM-protected content is active and ready for key extraction by the time the KeyDive script is initiated, optimizing the extraction process. This sequence ensures that the DRM-protected content is active and ready for key extraction by the time the KeyDive script is initiated, optimizing the extraction process.
@ -92,25 +84,25 @@ Advanced:
### Extracting Functions ### Extracting Functions
For advanced users looking to use custom functions with KeyDive, a comprehensive guide on extracting functions from Widevine libraries using Ghidra is available. Please refer to our [Functions Extraction Guide](https://github.com/hyugogirubato/KeyDive/blob/main/docs/advanced/FUNCTIONS.md) for detailed instructions. For advanced users looking to use custom functions with KeyDive, a comprehensive guide on extracting functions from Widevine libraries using Ghidra is available. Please refer to our [Functions Extraction Guide](https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md) for detailed instructions.
### Offline Extraction ### Offline Extraction
KeyDive supports offline extraction mode for situations without internet access. This mode allows you to extract DRM keys directly from your Android device. Ensure all necessary dependencies are installed and follow the detailed [Offline Mode Guide](https://github.com/hyugogirubato/KeyDive/blob/main/docs/advanced/OFFLINE.md) for step-by-step instructions. KeyDive supports offline extraction mode for situations without internet access. This mode allows you to extract DRM keys directly from your Android device. Ensure all necessary dependencies are installed and follow the detailed [Offline Mode Guide](https://github.com/hyugogirubato/KeyDive/blob/main/docs/OFFLINE.md) for step-by-step instructions.
### Obtaining Unencrypted Challenge Data ### Obtaining Unencrypted Challenge Data
> [!NOTE] > [!NOTE]
> Usage of unencrypted challenge is not required by default. It is only necessary when the script cannot extract the client id. > Usage of unencrypted challenge is not required by default. It is only necessary when the script cannot extract the client id.
To extract the unencrypted challenge data required for KeyDive's advanced features, follow the steps outlined in our [Challenge Extraction Guide](https://github.com/hyugogirubato/KeyDive/blob/main/docs/advanced/CHALLENGE.md). This data is crucial for analyzing DRM-protected content and enhancing your DRM key extraction capabilities. To extract the unencrypted challenge data required for KeyDive's advanced features, follow the steps outlined in our [Challenge Extraction Guide](https://github.com/hyugogirubato/KeyDive/blob/main/docs/CHALLENGE.md). This data is crucial for analyzing DRM-protected content and enhancing your DRM key extraction capabilities.
### Temporary Disabling L1 for L3 Extraction ### Temporary Disabling L1 for L3 Extraction
> [!WARNING] > [!WARNING]
> Usage of the module is now deprecated because the deactivation of the library was natively added. > Usage of the module is now deprecated because the deactivation of the library was natively added.
Some manufacturers (e.g., Xiaomi) allow the use of L1 keyboxes even after unlocking the bootloader. In such cases, it's necessary to install a Magisk module called [liboemcrypto-disabler](https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#liboemcrypto-disabler)to temporarily disable L1, thereby facilitating L3 key extraction. Some manufacturers (e.g., Xiaomi) allow the use of L1 keyboxes even after unlocking the bootloader. In such cases, it's necessary to install a Magisk module called [liboemcrypto-disabler](https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#liboemcrypto-disabler) to temporarily disable L1, thereby facilitating L3 key extraction.
## Disclaimer ## Disclaimer
@ -125,11 +117,12 @@ KeyDive is intended for educational and research purposes only. The use of this
<a href="https://github.com/Nineteen93"><img src="https://images.weserv.nl/?url=avatars.githubusercontent.com/u/107993263?v=4&h=25&w=25&fit=cover&mask=circle&maxage=7d" alt="Nineteen93"/></a> <a href="https://github.com/Nineteen93"><img src="https://images.weserv.nl/?url=avatars.githubusercontent.com/u/107993263?v=4&h=25&w=25&fit=cover&mask=circle&maxage=7d" alt="Nineteen93"/></a>
<a href="https://github.com/sn-o-w"><img src="https://images.weserv.nl/?url=avatars.githubusercontent.com/u/2406819?v=4&h=25&w=25&fit=cover&mask=circle&maxage=7d" alt="sn-o-w"/></a> <a href="https://github.com/sn-o-w"><img src="https://images.weserv.nl/?url=avatars.githubusercontent.com/u/2406819?v=4&h=25&w=25&fit=cover&mask=circle&maxage=7d" alt="sn-o-w"/></a>
## Licensing ## Licensing
This software is licensed under the terms of [MIT License](https://github.com/hyugogirubato/KeyDive/blob/main/LICENSE). This software is licensed under the terms of [MIT License](https://github.com/hyugogirubato/KeyDive/blob/main/LICENSE).
You can find a copy of the license in the LICENSE file in the root folder. You can find a copy of the license in the LICENSE file in the root folder.
--- * * *
© hyugogirubato 2025 © hyugogirubato 2024

View File

@ -8,7 +8,7 @@ This document provides an overview of the external libraries, tools, and applica
A tool designed to root Android Virtual Devices (AVDs). It enables users to gain superuser privileges on their AVDs, essential for accessing and modifying system-level files and settings that are otherwise restricted. A tool designed to root Android Virtual Devices (AVDs). It enables users to gain superuser privileges on their AVDs, essential for accessing and modifying system-level files and settings that are otherwise restricted.
### [DRM Info](https://apkcombo.app/drm-info/com.androidfung.drminfo) ### [DRM Info](https://apkcombo.com/drm-info/com.androidfung.drminfo/download/phone-1.1.9.220313-apk)
An Android application providing detailed information about the device's Digital Rights Management (DRM) modules, including Widevine. Useful for verifying the DRM support level (L1, L2, L3) on the target device. An Android application providing detailed information about the device's Digital Rights Management (DRM) modules, including Widevine. Useful for verifying the DRM support level (L1, L2, L3) on the target device.

View File

@ -1,272 +0,0 @@
drm_serial_number: "\341we\354N\267\362\353`\024;\002\277\025\312\267"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 6172
soc: "HiSilicon 3798M"
manufacturer: "Coolech"
model: "K-Q"
device_type: "tv"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: ""w\376\333\250|\273\003p\343\350\3533\334\013K"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 7999
soc: "HiSilicon 3798MV100"
manufacturer: "SmarDTV"
model: "DIP4090"
device_type: "stb"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\010\017P\315\237\r\315n\371\313\003\276\023R\247Z"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 9173
soc: "HiSilicon 3798MV100"
manufacturer: "SmarDTV"
model: "DIP4090"
device_type: "stb"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "F7\271J^\335EW\201\336\031\362\322\325\307\205"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 6053
soc: "MTK8639"
manufacturer: "MeeBoss"
model: "M100"
device_type: "tv"
model_year: 2015
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\301f\217\310\273\243~\272\250$\327\264\205N\200["
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 6054
soc: "MTK8639"
manufacturer: "MeeBoss"
model: "M150"
device_type: "tv"
model_year: 2015
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "9\320n=\330\203=\237\326\005\335\252M\317\271\364"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 5860
soc: "Intel Z2520"
manufacturer: "Micromax"
model: "P666"
device_type: "phone"
model_year: 2014
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\223n5V\033\3040\266\025\212\223\025)\360\231\377"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 6765
soc: "bcm"
manufacturer: "Roku"
model: "3600"
device_type: "video dongle"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\222\322h*\003xo\310\240\367\356\034\021\3034\273"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 6950
soc: "BCM"
manufacturer: "Roku"
model: "4200"
device_type: "video dongle"
model_year: 2015
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\007\351\241\014\017k\205^\0301\227l\352`f\353"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 8083
soc: "BCM7218"
manufacturer: "Roku"
model: "2700"
device_type: "video dongle"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\300\031\010\314\303\347R\316m\W\272\307,\307\350"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 8252
soc: "BCM"
manufacturer: "Roku"
model: "4200"
device_type: "video dongle"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\236\360\025d\302\344O\026W0\001BzIV\331"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 8253
soc: "BCM"
manufacturer: "Roku"
model: "3500"
device_type: "video dongle"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\315Q\220l\253\352\026\177\262\374\314\365nu\361D"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 8254
soc: "BCM"
manufacturer: "Roku"
model: "2xxx"
device_type: "video dongle"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "!cj<WP\344_\274\334\252\304\3474\003\215"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 8353
soc: "bcm"
manufacturer: "Roku"
model: "x2400"
device_type: "video dongle"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\345\364\353#\335q\342u\301\005\301t\274\374\035\372"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 8354
soc: "bcm"
manufacturer: "Roku"
model: "3400"
device_type: "video dongle"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\224\374s\220\304`U.A\231B\254TJ\306\335"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 6966
soc: "MSD6485"
manufacturer: "UMC"
model: "BLA-43-134M-XX"
device_type: "tv"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\026\337Z]\347f\274\253\371\2325\215\2212(("
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 6975
soc: "MSD6486"
manufacturer: "UMC"
model: "SHARP-6486-S6-XX"
device_type: "tv"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\2620\361Q\327\232\211a\240\001\332\313R\244sp"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 7321
soc: "MSD6486"
manufacturer: "UMC"
model: "ETE-6486-S6-XX"
device_type: "tv"
security_level: LEVEL_2
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED
drm_serial_number: "\226\300\340\001\365\000B\323\205\177\317\006&\341\334`"
deprecated_status: DEPRECATED_VALID
device_info {
system_id: 1070321
soc: "Qualcomm SM4350"
manufacturer: "HMD Global"
model: "TTG"
device_type: "phone"
model_year: 2021
security_level: LEVEL_1
provisioning_method: FACTORY_KEYBOX
}
status: STATUS_RELEASED

View File

@ -1,75 +0,0 @@
# Widevine and DRM Glossary
This document serves as a resource for understanding Widevine Digital Rights Management (DRM), its ecosystem, and associated terminology. The links and descriptions provided aim to help readers navigate the technical and procedural aspects of Widevine and DRM systems more effectively.
---
## **Widevine Digital Rights Management (DRM)**
Widevine is a leading DRM technology developed by Google, designed to protect video content and ensure secure delivery across a wide range of devices. It supports various security levels, enabling seamless integration with content providers while safeguarding intellectual property.
---
### **Core Concepts and Terms**
#### **Certified Widevine Implementation Partner (CWIP)**
The [CWIP program](https://support.google.com/widevine/answer/2938263?hl=en) ensures that individuals and organizations are equipped to install, configure, and troubleshoot Widevine DRM systems effectively. Key objectives of this program include:
- Teaching candidates to implement Widevine systems with precision.
- Enhancing satisfaction for integrators and end-users.
- Ensuring trust among content owners.
#### **Widevine Device Certificate Status List (DCSL)**
The [Widevine DCSL](https://developers.google.com/widevine/drm/overview) provides a detailed list of certified devices, including:
- **System ID**: A unique identifier for devices using Widevine.
- **Security Level**: Defines the degree of hardware-based protection (e.g., L1, L2, or L3).
- **Provisioning Method**: How keys and certificates are deployed (e.g., Factory Keybox).
- **Device Type**: Identifies whether a device is a phone, set-top box, TV, etc.
---
### **Key Technical Terms**
#### **Security Levels (L1, L2, L3)**
Widevine security levels determine the degree of protection applied to content playback:
- **L1**: Uses Trusted Execution Environment (TEE) for all decryption and processing.
- **L2**: Partially relies on the TEE but may use additional secure layers.
- **L3**: Relies entirely on software-based protection, typically for devices without TEE.
#### **Provisioning Methods**
The way keys and certificates are securely delivered to devices:
- **Factory Keybox**: Embedded during manufacturing to ensure hardware-level security.
- **Device Provisioning**: Post-manufacturing certificate injection.
#### **Content Encryption**
Widevine utilizes standardized encryption methods, typically AES (Advanced Encryption Standard), to secure video streams.
#### **DRM Key Types**
- **Content Key**: Used to decrypt protected content.
- **License Key**: Issued by the DRM license server to authorize content playback.
---
### **Security Vulnerabilities and Updates**
#### **CVE-2024-36971**
[Learn More](https://thehackernews.com/2024/08/google-patches-new-android-kernel.html)
A critical vulnerability in the Android kernel exploited in the wild. Google released a patch to address this issue, highlighting:
- The importance of maintaining up-to-date systems.
- Potential risks of targeted attacks using DRM components.
#### **Patching Processes**
Widevine relies on regular updates to mitigate vulnerabilities. This includes:
- Firmware updates for device security components.
- Collaboration with OEMs to ensure ecosystem-wide fixes.
---
### **Further Resources**
- **Widevine Overview**: [Google Developers Documentation](https://developers.google.com/widevine/drm/overview)
- **Understanding DRM**: [Wikipedia - Digital Rights Management](https://en.wikipedia.org/wiki/Digital_rights_management)
- **Android Security Bulletins**: [Google Security Updates](https://source.android.com/security/bulletin)
---
### **Conclusion**
This glossary aims to centralize essential Widevine and DRM-related terms, helping researchers, integrators, and developers understand the ecosystem. For those working with Widevine or exploring DRM technologies, these resources are foundational to a secure and efficient implementation.

View File

Before

Width:  |  Height:  |  Size: 102 KiB

After

Width:  |  Height:  |  Size: 102 KiB

View File

@ -3,23 +3,22 @@ import json
import time import time
import logging import logging
from pathlib import Path
import requests import requests
from pathlib import Path
from flask import Flask, Response, request, redirect from flask import Flask, Response, request, redirect
from keydive.__main__ import configure_logging from keydive.__main__ import configure_logging
# Suppress urllib3 warnings # Suppress urllib3 warnings
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) logging.getLogger('urllib3.connectionpool').setLevel(logging.ERROR)
# Initialize Flask application # Initialize Flask application
app = Flask(__name__) app = Flask(__name__)
# Define paths, constants, and global flags # Define paths, constants, and global flags
PARENT = Path(__file__).parent PARENT = Path(__file__).parent
VERSION = "1.0.1" VERSION = '1.0.0'
KEYBOX = False KEYBOX = False
DELAY = 10 DELAY = 10
@ -32,7 +31,7 @@ def health_check() -> Response:
Returns: Returns:
Response: A simple "pong" message with a 200 OK status. Response: A simple "pong" message with a 200 OK status.
""" """
return Response(response="pong", status=200, content_type="text/html; charset=utf-8") return Response(response='pong', status=200, content_type='text/html; charset=utf-8')
@app.route('/shaka-demo-assets/angel-one-widevine/<path:file>', methods=['GET']) @app.route('/shaka-demo-assets/angel-one-widevine/<path:file>', methods=['GET'])
@ -41,17 +40,17 @@ def shaka_demo_assets(file) -> Response:
Serves cached assets for Widevine demo content. If the requested file is Serves cached assets for Widevine demo content. If the requested file is
not available locally, it fetches it from a remote server and caches it. not available locally, it fetches it from a remote server and caches it.
Parameters: Args:
file (str): File path requested by the client. file (str): File path requested by the client.
Returns: Returns:
Response: File content as a byte stream, or a 404 error if not found. Response: File content as a byte stream, or a 404 error if not found.
""" """
logger = logging.getLogger("Shaka") logger = logging.getLogger('Shaka')
logger.info("%s %s", request.method, request.path) logger.info('%s %s', request.method, request.path)
try: try:
path = PARENT / ".assets" / file path = PARENT / '.assets' / file
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
if path.is_file(): if path.is_file():
@ -60,20 +59,20 @@ def shaka_demo_assets(file) -> Response:
else: else:
# Fetch the file from remote storage if not cached locally # Fetch the file from remote storage if not cached locally
r = requests.get( r = requests.get(
url=f"https://storage.googleapis.com/shaka-demo-assets/angel-one-widevine/{file}", url=f'https://storage.googleapis.com/shaka-demo-assets/angel-one-widevine/{file}',
headers={ headers={
"Accept": "*/*", 'Accept': '*/*',
"User-Agent": "KalturaDeviceInfo/1.4.1 (Linux;Android 10) ExoPlayerLib/2.9.3" 'User-Agent': 'KalturaDeviceInfo/1.4.1 (Linux;Android 10) ExoPlayerLib/2.9.3'
} }
) )
r.raise_for_status() r.raise_for_status()
path.write_bytes(r.content) # Cache the downloaded content path.write_bytes(r.content) # Cache the downloaded content
content = r.content content = r.content
logger.debug("Downloaded assets: %s", path) logger.debug('Downloaded assets: %s', path)
return Response(response=content, status=200, content_type="application/octet-stream") return Response(response=content, status=200, content_type='application/octet-stream')
except Exception as e: except Exception as e:
return Response(response=str(e), status=404, content_type="text/html; charset=utf-8") return Response(response=str(e), status=404, content_type='text/html; charset=utf-8')
@app.route('/certificateprovisioning/v1/devicecertificates/create', methods=['POST']) @app.route('/certificateprovisioning/v1/devicecertificates/create', methods=['POST'])
@ -87,37 +86,37 @@ def certificate_provisioning() -> Response:
Response: JSON response if provisioning is complete, else a redirection. Response: JSON response if provisioning is complete, else a redirection.
""" """
global KEYBOX, DELAY global KEYBOX, DELAY
logger = logging.getLogger("Google") logger = logging.getLogger('Google')
logger.info("%s %s", request.method, request.path) logger.info('%s %s', request.method, request.path)
if KEYBOX: if KEYBOX:
logger.warning("Provisioning request aborted to prevent keybox spam") logger.warning('Provisioning request aborted to prevent keybox spam')
return Response(response="Internal Server Error", status=500, content_type="text/html; charset=utf-8") return Response(response='Internal Server Error', status=500, content_type='text/html; charset=utf-8')
# Generate a curl command from the incoming request for debugging or testing # Generate a curl command from the incoming request for debugging or testing
user_agent = request.headers.get("User-Agent", "Unknown") user_agent = request.headers.get('User-Agent', 'Unknown')
url = request.url.replace("http://", "https://") url = request.url.replace('http://', 'https://')
prompt = [ prompt = [
'curl', "curl --request POST",
'--request', 'POST', f"--url '{url}'",
'--compressed', "--compressed",
'--header', '"Accept-Encoding: gzip"', "--header 'accept-encoding: gzip'",
'--header', '"Connection: Keep-Alive"', "--header 'connection: Keep-Alive'",
'--header', '"Content-Type: application/x-www-form-urlencoded"', "--header 'content-type: application/x-www-form-urlencoded'",
'--header', '"Host: www.googleapis.com"', "--header 'host: www.googleapis.com'",
'--header', f'"User-Agent: {user_agent}"' f"--header 'user-agent: {user_agent}'"
] ]
# Save the curl command for potential replay or inspection # Save the curl command for potential replay or inspection
curl = PARENT / "curl.txt" curl = PARENT / 'curl.txt'
curl.write_text(" \\\n ".join(prompt)) curl.write_text(' \\\n '.join(prompt))
logger.debug("Saved curl command to: %s", curl) logger.debug('Saved curl command to: %s', curl)
# Wait for provisioning response data with retries # Wait for provisioning response data with retries
logger.warning("Waiting for provisioning response...") logger.warning('Waiting for provisioning response...')
provision = PARENT / "provisioning.json" provision = PARENT / 'provisioning.json'
provision.unlink(missing_ok=True) provision.unlink(missing_ok=True)
provision.write_bytes(b"") # Create empty file for manual input if needed provision.write_bytes(b'') # Create empty file for manual input if needed
# Poll for the presence of a response up to DELAY times with 1-second intervals # Poll for the presence of a response up to DELAY times with 1-second intervals
for _ in range(DELAY): for _ in range(DELAY):
@ -127,13 +126,13 @@ def certificate_provisioning() -> Response:
# Cleanup after successful response # Cleanup after successful response
curl.unlink(missing_ok=True) curl.unlink(missing_ok=True)
provision.unlink(missing_ok=True) provision.unlink(missing_ok=True)
return Response(response=content, status=200, content_type="application/json") return Response(response=content, status=200, content_type='application/json')
except Exception as e: except Exception as e:
pass # Continue waiting if file is empty or not yet ready pass # Continue waiting if file is empty or not yet ready
time.sleep(1) time.sleep(1)
# Redirect to the secure URL if response is not available # Redirect to the secure URL if response is not available
logger.warning("Redirecting to avoid timeout") logger.warning('Redirecting to avoid timeout')
return redirect(url, code=302) return redirect(url, code=302)
@ -143,32 +142,31 @@ def main() -> None:
to set global parameters and configures logging, then starts the Flask server. to set global parameters and configures logging, then starts the Flask server.
""" """
global VERSION, DELAY, KEYBOX global VERSION, DELAY, KEYBOX
parser = argparse.ArgumentParser(description="Local DRM provisioning video player.") parser = argparse.ArgumentParser(description='Local DRM provisioning video player.')
# Global arguments for the application # Global arguments for the application
group_global = parser.add_argument_group("Global") group_global = parser.add_argument_group('Global')
group_global.add_argument('--host', required=False, type=str, default="127.0.0.1", metavar="<host>", help="Host address for the server to bind to.") group_global.add_argument('--host', required=False, type=str, default='127.0.0.1', metavar='<host>', help='Host address for the server to bind to.')
group_global.add_argument('--port', required=False, type=int, default=9090, metavar="<port>", help="Port number for the server to listen on.") group_global.add_argument('--port', required=False, type=int, default=9090, metavar='<port>', help='Port number for the server to listen on.')
group_global.add_argument('-v', '--verbose', required=False, action="store_true", help="Enable verbose logging for detailed debug output.") group_global.add_argument('-v', '--verbose', required=False, action='store_true', help='Enable verbose logging for detailed debug output.')
group_global.add_argument('-l', '--log', required=False, type=Path, metavar="<dir>", help="Directory to store log files.") group_global.add_argument('-l', '--log', required=False, type=Path, metavar='<dir>', help='Directory to store log files.')
group_global.add_argument('--version', required=False, action="store_true", help="Display Server version information.") group_global.add_argument('--version', required=False, action='store_true', help='Display Server version information.')
# Advanced options # Advanced options
group_advanced = parser.add_argument_group("Advanced") group_advanced = parser.add_argument_group('Advanced')
group_advanced.add_argument('-d', '--delay', required=False, type=int, metavar="<delay>", default=10, help="Delay (in seconds) between successive checks for provisioning responses.") group_advanced.add_argument('-d', '--delay', required=False, type=int, metavar='<delay>', default=10, help='Delay (in seconds) between successive checks for provisioning responses.')
group_advanced.add_argument('-k', '--keybox', required=False, action="store_true", help="Enable keybox mode, which aborts provisioning requests to prevent spam.") group_advanced.add_argument('-k', '--keybox', required=False, action='store_true', help='Enable keybox mode, which aborts provisioning requests to prevent spam.')
args = parser.parse_args() args = parser.parse_args()
# Handle version display
if args.version: if args.version:
print(f"Server {VERSION}") print(f'Server {VERSION}')
exit(0) exit(0)
# Configure logging (file and console) # Configure logging
log_path = configure_logging(path=args.log, verbose=args.verbose) log_path = configure_logging(path=args.log, verbose=args.verbose)
logger = logging.getLogger("Server") logger = logging.getLogger('Server')
logger.info("Version: %s", VERSION) logger.info('Version: %s', VERSION)
try: try:
# Set global variables based on parsed arguments # Set global variables based on parsed arguments
@ -176,7 +174,7 @@ def main() -> None:
KEYBOX = args.keybox KEYBOX = args.keybox
# Start Flask app with specified host, port, and debug mode # Start Flask app with specified host, port, and debug mode
logging.getLogger("werkzeug").setLevel(logging.INFO if args.verbose else logging.ERROR) logging.getLogger('werkzeug').setLevel(logging.INFO if args.verbose else logging.ERROR)
app.run(host=args.host, port=args.port, debug=False) app.run(host=args.host, port=args.port, debug=False)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@ -185,9 +183,9 @@ def main() -> None:
# Final logging and exit # Final logging and exit
if log_path: if log_path:
logger.info("Log file: %s" % log_path) logger.info('Log file: %s' % log_path)
logger.info("Exiting") logger.info('Exiting')
if __name__ == "__main__": if __name__ == '__main__':
main() main()

View File

@ -2,6 +2,5 @@ from .core import Core
from .adb import ADB from .adb import ADB
from .cdm import Cdm from .cdm import Cdm
from .vendor import Vendor from .vendor import Vendor
from .keybox import Keybox
__version__ = "2.2.1" __version__ = '2.1.3'

View File

@ -4,7 +4,6 @@ import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Optional
import coloredlogs import coloredlogs
@ -16,16 +15,16 @@ from keydive.constants import CDM_VENDOR_API, DRM_PLAYER
from keydive.core import Core from keydive.core import Core
def configure_logging(path: Path = None, verbose: bool = False) -> Optional[Path]: def configure_logging(path: Path = None, verbose: bool = False) -> Path:
""" """
Configures logging for the application. Configures logging for the application.
Parameters: Args:
path (Path, optional): The directory to store log files. path (Path, optional): The directory to store log files.
verbose (bool, optional): Flag to enable detailed debug logging. verbose (bool, optional): Flag to enable detailed debug logging.
Returns: Returns:
Path: The path of the log file, or None if no log file is created. Path: The path of log file.
""" """
# Set up the root logger with the desired logging level # Set up the root logger with the desired logging level
root_logger = logging.getLogger() root_logger = logging.getLogger()
@ -43,15 +42,15 @@ def configure_logging(path: Path = None, verbose: bool = False) -> Optional[Path
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
# Create a file handler # Create a file handler
file_path = path / ("keydive_%s.log" % datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) file_path = path / ('keydive_%s.log' % datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
file_path = file_path.resolve(strict=False) file_path = file_path.resolve(strict=False)
file_handler = logging.FileHandler(file_path) file_handler = logging.FileHandler(file_path)
file_handler.setLevel(logging.DEBUG) file_handler.setLevel(logging.DEBUG)
# Set log formatting # Set log formatting
formatter = logging.Formatter( formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname).1s] %(name)s: %(message)s", fmt='%(asctime)s [%(levelname).1s] %(name)s: %(message)s',
datefmt="%Y-%m-%d %H:%M:%S" datefmt='%Y-%m-%d %H:%M:%S'
) )
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
@ -60,8 +59,8 @@ def configure_logging(path: Path = None, verbose: bool = False) -> Optional[Path
# Configure coloredlogs for console output # Configure coloredlogs for console output
coloredlogs.install( coloredlogs.install(
fmt="%(asctime)s [%(levelname).1s] %(name)s: %(message)s", fmt='%(asctime)s [%(levelname).1s] %(name)s: %(message)s',
datefmt="%Y-%m-%d %H:%M:%S", datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG if verbose else logging.INFO, level=logging.DEBUG if verbose else logging.INFO,
logger=root_logger logger=root_logger
) )
@ -75,48 +74,47 @@ def main() -> None:
This application extracts Widevine L3 keys from an Android device. This application extracts Widevine L3 keys from an Android device.
It supports device management via ADB and allows hooking into Widevine processes. It supports device management via ADB and allows hooking into Widevine processes.
""" """
parser = argparse.ArgumentParser(description="Extract Widevine L3 keys from an Android device.") parser = argparse.ArgumentParser(description='Extract Widevine L3 keys from an Android device.')
# Global arguments for the application # Global arguments for the application
group_global = parser.add_argument_group("Global") group_global = parser.add_argument_group('Global')
group_global.add_argument('-d', '--device', required=False, type=str, metavar="<id>", help="Specify the target Android device ID for ADB connection.") group_global.add_argument('-d', '--device', required=False, type=str, metavar='<id>', help='Specify the target Android device ID for ADB connection.')
group_global.add_argument('-v', '--verbose', required=False, action="store_true", help="Enable verbose logging for detailed debug output.") group_global.add_argument('-v', '--verbose', required=False, action='store_true', help='Enable verbose logging for detailed debug output.')
group_global.add_argument('-l', '--log', required=False, type=Path, metavar="<dir>", help="Directory to store log files.") group_global.add_argument('-l', '--log', required=False, type=Path, metavar='<dir>', help='Directory to store log files.')
group_global.add_argument('--delay', required=False, type=float, metavar="<delay>", default=1, help="Delay (in seconds) between process checks.") group_global.add_argument('--delay', required=False, type=float, metavar='<delay>', default=1, help='Delay (in seconds) between process checks.')
group_global.add_argument('--version', required=False, action="store_true", help="Display KeyDive version information.") group_global.add_argument('--version', required=False, action='store_true', help='Display KeyDive version information.')
# Arguments specific to the CDM (Content Decryption Module) # Arguments specific to the CDM (Content Decryption Module)
group_cdm = parser.add_argument_group("Cdm") group_cdm = parser.add_argument_group('Cdm')
group_cdm.add_argument('-o', '--output', required=False, type=Path, default=Path("device"), metavar="<dir>", help="Output directory for extracted data.") group_cdm.add_argument('-o', '--output', required=False, type=Path, default=Path('device'), metavar='<dir>', help='Output directory for extracted data.')
group_cdm.add_argument('-w', '--wvd', required=False, action="store_true", help="Generate a pywidevine WVD device file.") group_cdm.add_argument('-w', '--wvd', required=False, action='store_true', help='Generate a pywidevine WVD device file.')
group_cdm.add_argument('-s', '--skip', required=False, action="store_true", help="Skip auto-detection of the private function.") group_cdm.add_argument('-s', '--skip', required=False, action='store_true', help='Skip auto-detection of the private function.')
group_cdm.add_argument('-a', '--auto', required=False, action="store_true", help="Automatically start the Bitmovin web player.") group_cdm.add_argument('-a', '--auto', required=False, action='store_true', help='Automatically start the Bitmovin web player.')
group_cdm.add_argument('-p', '--player', required=False, action="store_true", help="Install and start the Kaltura app automatically.") group_cdm.add_argument('-p', '--player', required=False, action='store_true', help='Install and start the Kaltura app automatically.')
# Advanced options # Advanced options
group_advanced = parser.add_argument_group("Advanced") group_advanced = parser.add_argument_group('Advanced')
group_advanced.add_argument('-f', '--functions', required=False, type=Path, metavar="<file>", help="Path to Ghidra XML functions file.") group_advanced.add_argument('-f', '--functions', required=False, type=Path, metavar='<file>', help='Path to Ghidra XML functions file.')
group_advanced.add_argument('-k', '--keybox', required=False, action="store_true", help="Enable export of the Keybox data if it is available.") group_advanced.add_argument('-k', '--keybox', required=False, action='store_true', help='Enable export of the Keybox data if it is available.')
group_advanced.add_argument('--challenge', required=False, type=Path, metavar="<file>", help="Path to unencrypted challenge for extracting client ID.") group_advanced.add_argument('--challenge', required=False, type=Path, metavar='<file>', help='Path to unencrypted challenge for extracting client ID.')
group_advanced.add_argument('--private-key', required=False, type=Path, metavar="<file>", help="Path to private key for extracting client ID.") group_advanced.add_argument('--private-key', required=False, type=Path, metavar='<file>', help='Path to private key for extracting client ID.')
args = parser.parse_args() args = parser.parse_args()
# Handle version display
if args.version: if args.version:
print(f"KeyDive {keydive.__version__}") print(f'KeyDive {keydive.__version__}')
exit(0) exit(0)
# Configure logging (file and console) # Configure logging
log_path = configure_logging(path=args.log, verbose=args.verbose) log_path = configure_logging(path=args.log, verbose=args.verbose)
logger = logging.getLogger("KeyDive") logger = logging.getLogger('KeyDive')
logger.info("Version: %s", keydive.__version__) logger.info('Version: %s', keydive.__version__)
try: try:
# Connect to the specified Android device # Connect to the specified Android device
adb = ADB(device=args.device) adb = ADB(device=args.device)
# Initialize Cdm instance for content decryption module (with optional arguments) # Initialize Cdm instance
cdm = Cdm(keybox=args.keybox) cdm = Cdm(keybox=args.keybox)
if args.challenge: if args.challenge:
cdm.set_challenge(data=args.challenge) cdm.set_challenge(data=args.challenge)
@ -126,70 +124,70 @@ def main() -> None:
# Initialize Core instance for interacting with the device # Initialize Core instance for interacting with the device
core = Core(adb=adb, cdm=cdm, functions=args.functions, skip=args.skip) core = Core(adb=adb, cdm=cdm, functions=args.functions, skip=args.skip)
# Setup actions based on user arguments (for DRM player, Bitmovin player, etc.) # Setup actions based on user arguments
if args.player: if args.player:
package = DRM_PLAYER["package"] package = DRM_PLAYER['package']
# Check if the application is already installed # Check if the application is already installed
installed = package in adb.list_applications(user=True, system=False) installed = package in adb.list_applications(user=True, system=False)
if not installed: if not installed:
logger.debug("Application %s not found. Installing...", package) logger.debug('Application %s not found. Installing...', package)
installed = adb.install_application(path=DRM_PLAYER["path"], url=DRM_PLAYER["url"]) installed = adb.install_application(path=DRM_PLAYER['path'], url=DRM_PLAYER['url'])
# Skip starting the application if installation failed # Skip starting the application if installation failed
if installed: if installed:
# Start the application # Start the application
logger.info("Starting application: %s", package) logger.info('Starting application: %s', package)
adb.start_application(package) adb.start_application(package)
elif args.auto: elif args.auto:
logger.info("Opening the Bitmovin web player...") logger.info('Opening the Bitmovin web player...')
adb.open_url("https://bitmovin.com/demos/drm") adb.open_url('https://bitmovin.com/demos/drm')
logger.info("Setup completed") logger.info('Setup completed')
# Process watcher loop: continuously checks for Widevine processes # Process watcher loop
logger.info("Watcher delay: %ss" % args.delay) logger.info('Watcher delay: %ss' % args.delay)
current = None # Variable to track the current Widevine process current = None # Variable to track the current Widevine process
while core.running: while core.running:
# Check if for current process data has been exported # Check if for current process data has been exported
if current and cdm.export(args.output, args.wvd): if current and cdm.export(args.output, args.wvd):
raise KeyboardInterrupt # Stop if export is complete raise KeyboardInterrupt # Stop if export is complete
# Get the currently running Widevine processes
# https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792 # https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792
# Get the currently running Widevine processes
processes = { processes = {
key: (name, pid) key: (name, pid)
for name, pid in adb.enumerate_processes().items() for name, pid in adb.enumerate_processes().items()
for key in CDM_VENDOR_API.keys() for key in CDM_VENDOR_API.keys()
if key in name or key.replace("-service", "-service-lazy") in name if key in name or key.replace('-service', '-service-lazy') in name
} }
if not processes: if not processes:
raise EnvironmentError("Unable to detect Widevine, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#drm-info") raise EnvironmentError('Unable to detect Widevine, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#drm-info')
# Check if the current process has changed # Check if the current process has changed
if current and current not in [v[1] for v in processes.values()]: if current and current not in [v[1] for v in processes.values()]:
logger.warning("Widevine process has changed") logger.warning('Widevine process has changed')
current = None current = None
# If current process not found, attempt to hook into the detected processes # If current process not found, attempt to hook into the detected processes
if not current: if not current:
logger.debug("Analysing...") logger.debug('Analysing...')
for key, (name, pid) in processes.items(): for key, (name, pid) in processes.items():
if current: if current:
break break
for vendor in CDM_VENDOR_API[key]: for vendor in CDM_VENDOR_API[key]:
if core.hook_process(pid=pid, vendor=vendor): if core.hook_process(pid=pid, vendor=vendor):
logger.info("Process: %s (%s)", pid, name) logger.info('Process: %s (%s)', pid, name)
current = pid current = pid
break break
elif not core.running: elif not core.running:
raise KeyboardInterrupt raise KeyboardInterrupt
if current: if current:
logger.info("Successfully hooked") logger.info('Successfully hooked')
else: else:
logger.warning("Widevine library not found, searching...") logger.warning('Widevine library not found, searching...')
# Delay before next iteration # Delay before next iteration
time.sleep(args.delay) time.sleep(args.delay)
@ -200,9 +198,9 @@ def main() -> None:
# Final logging and exit # Final logging and exit
if log_path: if log_path:
logger.info("Log file: %s" % log_path) logger.info('Log file: %s' % log_path)
logger.info("Exiting") logger.info('Exiting')
if __name__ == "__main__": if __name__ == '__main__':
main() main()

View File

@ -3,30 +3,29 @@ import re
import shutil import shutil
import subprocess import subprocess
from pathlib import Path
import frida import frida
import requests import requests
from pathlib import Path
from frida.core import Device from frida.core import Device
# Suppress urllib3 warnings # Suppress urllib3 warnings
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) logging.getLogger('urllib3.connectionpool').setLevel(logging.ERROR)
def shell(prompt: list) -> subprocess.CompletedProcess: def shell(prompt: list) -> subprocess.CompletedProcess:
""" """
Executes a shell command and returns the result. Executes a shell command and returns the completed process.
Parameters: Args:
prompt (list): The command to execute as a list of strings. prompt (list): The command to be executed as a list of strings.
Returns: Returns:
subprocess.CompletedProcess: The result containing return code, stdout, and stderr. subprocess.CompletedProcess: The completed process object containing return code, stdout, and stderr.
""" """
prompt = list(map(str, prompt)) # Ensure all command parts are strings prompt = list(map(str, prompt))
# logging.getLogger("Shell").debug(" ".join(prompt)) # logging.getLogger('Shell').debug(' '.join(prompt))
return subprocess.run(prompt, capture_output=True) # Run the command and capture output return subprocess.run(prompt, shell=True, capture_output=True)
class ADB: class ADB:
@ -36,76 +35,76 @@ class ADB:
def __init__(self, device: str = None, timeout: int = 5): def __init__(self, device: str = None, timeout: int = 5):
""" """
Initializes ADB connection to the device. Initializes the ADB connection to a device.
Parameters: Args:
device (str, optional): Device ID to connect to, defaults to the first USB device. device (str, optional): The ID of the device to connect to. If None, defaults to the first USB device.
timeout (int, optional): Timeout for connection in seconds. Defaults to 5. timeout (int, optional): The timeout for device connection in seconds. Defaults to 5.
Raises: Raises:
EnvironmentError: If ADB is not found in the system path. EnvironmentError: If ADB is not found in the system path.
Exception: If connection to the device fails. Exception: If the connection to the device fails.
""" """
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# Ensure ADB is available # Ensure ADB is available
if not shutil.which("adb"): if not shutil.which('adb'):
raise EnvironmentError( raise EnvironmentError(
"ADB is not recognized as an environment variable. " 'ADB is not recognized as an environment variable. '
"Ensure ADB is installed and refer to the documentation: " 'Ensure ADB is installed and refer to the documentation: '
"https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#adb-android-debug-bridge" 'https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#adb-android-debug-bridge'
) )
# Start the ADB server if not already running # Start the ADB server if not already running
sp = shell(['adb', 'start-server']) sp = shell(['adb', 'start-server'])
if sp.returncode != 0: if sp.returncode != 0:
self.logger.warning("ADB server startup failed (Error: %s)", sp.stdout.decode("utf-8").strip()) self.logger.warning('ADB server startup failed (Error: %s)', sp.stderr.decode('utf-8').strip())
# Connect to device (or default to the first USB device) # Select device based on provided ID or default to the first USB device
try: try:
self.device: Device = frida.get_device(id=device, timeout=timeout) if device else frida.get_usb_device(timeout=timeout) self.device: Device = frida.get_device(id=device, timeout=timeout) if device else frida.get_usb_device(timeout=timeout)
self.logger.info("Connected to device: %s (%s)", self.device.name, self.device.id) self.logger.info('Connected to device: %s (%s)', self.device.name, self.device.id)
except Exception as e: except Exception as e:
self.logger.error("Failed to connect to device: %s", e) self.logger.error('Failed to connect to device: %s', e)
raise e raise e
self.prompt = ['adb', '-s', self.device.id, 'shell'] self.prompt = ['adb', '-s', self.device.id, 'shell']
# Retrieve and log device properties # Obtain device properties
properties = self.device_properties() properties = self.device_properties()
if properties: if properties:
self.logger.info("SDK API: %s", properties.get("ro.build.version.sdk", "Unknown")) self.logger.info('SDK API: %s', properties.get('ro.build.version.sdk', 'Unknown'))
self.logger.info("ABI CPU: %s", properties.get("ro.product.cpu.abi", "Unknown")) self.logger.info('ABI CPU: %s', properties.get('ro.product.cpu.abi', 'Unknown'))
else: else:
self.logger.warning("No device properties retrieved") self.logger.warning('No device properties were retrieved.')
def device_properties(self) -> dict: def device_properties(self) -> dict:
""" """
Retrieves system properties from the device. Retrieves system properties from the connected device using ADB shell commands.
Returns: Returns:
dict: A dictionary mapping property keys to their corresponding values. dict: A dictionary mapping device property keys to their corresponding values.
""" """
# https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands # https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands
properties = {} properties = {}
# Execute the shell command to retrieve device properties # Execute shell command to retrieve device properties
sp = shell([*self.prompt, 'getprop']) sp = shell([*self.prompt, 'getprop'])
if sp.returncode != 0: if sp.returncode != 0:
self.logger.error("Failed to retrieve device properties (Error: %s)", sp.stdout.decode("utf-8").strip()) self.logger.error('Failed to retrieve device properties (Error: %s)', sp.stderr.decode('utf-8').strip())
return properties return properties
# Parse the output and cast values accordingly # Parse the output to fill the properties dictionary
for line in sp.stdout.decode("utf-8").splitlines(): for line in sp.stdout.decode('utf-8').splitlines():
match = re.match(r"\[(.*?)\]: \[(.*?)\]", line) match = re.match(r'\[(.*?)\]: \[(.*?)\]', line)
if match: if match:
key, value = match.groups() key, value = match.groups()
# Cast numeric and boolean values where appropriate # Attempt to cast numeric and boolean values
if value.isdigit(): if value.isdigit():
value = int(value) value = int(value)
elif value.lower() in ("true", "false"): elif value.lower() in ('true', 'false'):
value = value.lower() == "true" value = value.lower() == 'true'
properties[key] = value properties[key] = value
@ -113,38 +112,37 @@ class ADB:
def list_applications(self, user: bool = True, system: bool = False) -> dict: def list_applications(self, user: bool = True, system: bool = False) -> dict:
""" """
Lists installed applications on the device, with optional filters for user/system apps. Lists installed applications on the device, filtering by user and/or system apps.
Parameters: Args:
user (bool, optional): Include user-installed apps. Defaults to True. user (bool, optional): Whether to include user-installed applications. Defaults to True.
system (bool, optional): Include system apps. Defaults to False. system (bool, optional): Whether to include system applications. Defaults to False.
Returns: Returns:
dict: A dictionary of application packages and their file paths. dict: A dictionary mapping application packages to their file paths.
""" """
applications = {} applications = {}
# Validate input; return empty dict if no filter is set # Validate input and set the appropriate prompt
if not user and not system: if not user and not system:
return applications return applications
# Set the appropriate shell command based on user/system filters
prompt = [*self.prompt, 'pm', 'list', 'packages', '-f'] prompt = [*self.prompt, 'pm', 'list', 'packages', '-f']
if user and not system: if user and not system:
prompt.append("-3") prompt.append('-3')
elif not user and system: elif not user and system:
prompt.append("-s") prompt.append('-s')
# Execute the shell command to list applications # Execute shell command to list applications
sp = shell(prompt) sp = shell(prompt)
if sp.returncode != 0: if sp.returncode != 0:
self.logger.error("Failed to retrieve app list (Error: %s)", sp.stdout.decode("utf-8").strip()) self.logger.error('Failed to retrieve application list (Error: %s)', sp.stderr.decode('utf-8').strip())
return applications return applications
# Parse and add applications to the dictionary # Parse and store applications in the dictionary
for line in sp.stdout.decode("utf-8").splitlines(): for line in sp.stdout.decode('utf-8').splitlines():
try: try:
path, package = line.strip().split(":", 1)[1].rsplit("=", 1) path, package = line.strip().split(':', 1)[1].rsplit('=', 1)
applications[package] = path applications[package] = path
except Exception as e: except Exception as e:
pass pass
@ -155,68 +153,68 @@ class ADB:
""" """
Starts an application by its package name. Starts an application by its package name.
Parameters: Args:
package (str): The package name of the application. package (str): The package name of the application to start.
Returns: Returns:
bool: True if the app was started successfully, False otherwise. bool: True if the application was started successfully, False otherwise.
""" """
# Get package information using dumpsys # Get package information
sp = shell([*self.prompt, 'dumpsys', 'package', package]) sp = shell([*self.prompt, 'dumpsys', 'package', package])
lines = sp.stdout.decode("utf-8").splitlines() lines = sp.stdout.decode('utf-8').splitlines()
# Remove empty lines to ensure backwards compatibility # Remove empty lines to ensure backwards compatibility
lines = [l.strip() for l in lines if l.strip()] lines = [l.strip() for l in lines if l.strip()]
# Look for MAIN activity to identify entry point # Look for main activity in package information
for i, line in enumerate(lines): for i, line in enumerate(lines):
if "android.intent.action.MAIN" in line: if 'android.intent.action.MAIN' in line:
match = re.search(fr"({package}/[^ ]+)", lines[i + 1]) match = re.search(fr'({package}/[^ ]+)', lines[i + 1])
if match: if match:
# Start the application by its main activity # Attempt to start the application
main_activity = match.group() main_activity = match.group()
sp = shell([*self.prompt, 'am', 'start', '-n', main_activity]) sp = shell([*self.prompt, 'am', 'start', '-n', main_activity])
if sp.returncode == 0: if sp.returncode == 0:
return True return True
self.logger.error("Failed to start app %s (Error: %s)", package, sp.stdout.decode("utf-8").strip()) self.logger.error('Failed to start application %s (Error: %s)', package, sp.stderr.decode('utf-8').strip())
break break
self.logger.error("Package %s not found or no MAIN intent", package) self.logger.error('Package %s not found or has no MAIN intent action.', package)
return False return False
def enumerate_processes(self) -> dict: def enumerate_processes(self) -> dict:
""" """
Lists running processes and maps process names to their PIDs. Lists running processes on the device, mapping process names to PIDs.
Returns: Returns:
dict: Dictionary of process names and corresponding PIDs. dict: A dictionary mapping process names to their corresponding PIDs.
""" """
# https://github.com/frida/frida/issues/1225#issuecomment-604181822 # https://github.com/frida/frida/issues/1225#issuecomment-604181822
processes = {} processes = {}
# Attempt to get the list of processes using the 'ps -A' command # Try to get the list of processes using `ps -A`
prompt = [*self.prompt, 'ps'] prompt = [*self.prompt, 'ps']
sp = shell([*prompt, '-A']) sp = shell([*prompt, '-A'])
lines = sp.stdout.decode("utf-8").splitlines() lines = sp.stdout.decode('utf-8').splitlines()
# If the output has less than 10 lines, retry with a simpler 'ps' command # If the output has less than 10 lines, try the alternative `ps` command
if len(lines) < 10: if len(lines) < 10:
sp = shell(prompt) sp = shell(prompt)
if sp.returncode != 0: if sp.returncode != 0:
self.logger.error("Failed to execute ps command (Error: %s)", sp.stdout.decode("utf-8").strip()) self.logger.error('Failed to execute ps command (Error: %s)', sp.stderr.decode('utf-8').strip())
return processes return processes
lines = sp.stdout.decode("utf-8").splitlines() lines = sp.stdout.decode('utf-8').splitlines()
# Iterate through lines starting from the second line (skipping header) # Iterate through lines starting from the second line (skipping header)
for line in lines[1:]: for line in lines[1:]:
try: try:
parts = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME parts = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME
pid = int(parts[1]) # Extract PID pid = int(parts[1]) # Extract PID
name = " ".join(parts[8:]).strip() # Extract process name name = ' '.join(parts[8:]).strip() # Extract process name
# Handle cases where process name might be in brackets (e.g., kernel threads) # Handle cases where the name might be in brackets
name = name if name.startswith("[") else Path(name).name name = name if name.startswith('[') else Path(name).name
processes[name] = pid processes[name] = pid
except Exception as e: except Exception as e:
pass pass
@ -225,63 +223,61 @@ class ADB:
def install_application(self, path: Path = None, url: str = None) -> bool: def install_application(self, path: Path = None, url: str = None) -> bool:
""" """
Installs an application on the device either from a local file or by downloading from a URL. Installs an application from a local file path or a URL.
Parameters: Args:
path (Path, optional): The local file path of the APK to install. Defaults to None. path (Path, optional): The local file path of the APK to install. Defaults to None.
url (str, optional): The URL to download the APK from. Defaults to None. url (str, optional): The URL to download the APK from. Defaults to None.
Returns: Returns:
bool: True if the installation was successful, False otherwise. bool: True if the installation was successful, False otherwise.
""" """
# Prepare the shell command for installation
prompt = [*self.prompt[:-1], 'install'] prompt = [*self.prompt[:-1], 'install']
# Install from a local file path if a valid path is provided # Install from a local file path if provided
if path and path.is_file(): if path and path.is_file():
sp = shell([*prompt, path]) # Run the installation command with the local file path sp = shell([*prompt, path])
if sp.returncode == 0: if sp.returncode == 0:
return True return True
self.logger.error("Installation failed for local path: %s (Error: %s)", path, sp.stdout.decode("utf-8").strip()) self.logger.error('Installation failed for local path: %s (Error: %s)', path, sp.stderr.decode('utf-8').strip())
# If URL is provided, attempt to download the APK and install it # Install from a URL if provided
status = False status = False
if url: if url:
file = Path("tmp.apk") # Temporary file to store the downloaded APK file = Path('tmp.apk')
try: try:
# Download the APK from the provided URL r = requests.get(url, headers={'Accept': '*/*', 'User-Agent': 'KeyDive/ADB'})
r = requests.get(url, headers={"Accept": "*/*", "User-Agent": "KeyDive/ADB"})
r.raise_for_status() r.raise_for_status()
# Save the downloaded APK to a temporary file # Write downloaded content to temporary APK file
file.write_bytes(r.content) file.write_bytes(r.content)
# Attempt installation from the downloaded APK # Attempt installation from the downloaded file
status = self.install_application(path=file) status = self.install_application(path=file)
except Exception as e: except Exception as e:
self.logger.error("Failed to download application from URL: %s (Error: %s)", url, e) self.logger.error('Failed to download application from URL: %s (Error: %s)', url, e)
file.unlink(missing_ok=True) # Clean up the temporary file, even if there was an error file.unlink(missing_ok=True)
return status return status
def open_url(self, url: str) -> bool: def open_url(self, url: str) -> bool:
""" """
Opens a specified URL on the device. Opens a specified URL on the connected device.
Parameters: Args:
url (str): The URL to be opened on the device. url (str): The URL to open on the device.
Returns: Returns:
bool: True if the URL was successfully opened, False otherwise. bool: True if the URL was opened successfully, False otherwise.
""" """
# Execute the shell command to open the URL using the Android 'am' (Activity Manager) command. # Execute the shell command to open the URL
sp = shell([*self.prompt, 'am', 'start', '-a', 'android.intent.action.VIEW', '-d', url]) sp = shell([*self.prompt, 'am', 'start', '-a', 'android.intent.action.VIEW', '-d', url])
# Check the result of the command execution and log if there is an error # Check the result and log accordingly
if sp.returncode != 0: if sp.returncode != 0:
self.logger.error("URL open failed for: %s (Return: %s)", url, sp.stdout.decode("utf-8").strip()) self.logger.error('URL open failed for: %s (Return: %s)', url, sp.stderr.decode('utf-8').strip())
return False return False
return True return True
__all__ = ("ADB",) __all__ = ('ADB',)

View File

@ -2,18 +2,17 @@ import base64
import json import json
import logging import logging
from pathlib import Path
from typing import Union from typing import Union
from zlib import crc32 from zlib import crc32
from unidecode import unidecode from unidecode import unidecode
from pathlib import Path
from pathvalidate import sanitize_filepath, sanitize_filename from pathvalidate import sanitize_filepath, sanitize_filename
from Cryptodome.PublicKey import RSA from Cryptodome.PublicKey import RSA
from Cryptodome.PublicKey.RSA import RsaKey from Cryptodome.PublicKey.RSA import RsaKey
from pywidevine.device import Device, DeviceTypes from pywidevine.device import Device, DeviceTypes
from pywidevine.license_protocol_pb2 import ( from pywidevine.license_protocol_pb2 import (SignedMessage, LicenseRequest, ClientIdentification, SignedDrmCertificate,
SignedMessage, LicenseRequest, ClientIdentification, SignedDrmCertificate, DrmCertificate, DrmCertificate, EncryptedClientIdentification)
EncryptedClientIdentification)
from keydive.constants import OEM_CRYPTO_API from keydive.constants import OEM_CRYPTO_API
from keydive.keybox import Keybox from keydive.keybox import Keybox
@ -29,15 +28,15 @@ class Cdm:
""" """
Initializes the Cdm object, setting up a logger and containers for client IDs and private keys. Initializes the Cdm object, setting up a logger and containers for client IDs and private keys.
Parameters: Attributes:
client_id (dict[int, ClientIdentification]): Stores client identification info mapped by key modulus.
private_key (dict[int, RsaKey]): Stores private keys mapped by key modulus.
keybox (bool, optional): Initializes a Keybox instance for secure key management. keybox (bool, optional): Initializes a Keybox instance for secure key management.
""" """
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# https://github.com/devine-dl/pywidevine # https://github.com/devine-dl/pywidevine
self.client_id: dict[int, ClientIdentification] = {} self.client_id: dict[int, ClientIdentification] = {}
self.private_key: dict[int, RsaKey] = {} self.private_key: dict[int, RsaKey] = {}
# Optionally initialize a Keybox instance for secure key management if 'keybox' is True
self.keybox = Keybox() if keybox else None self.keybox = Keybox() if keybox else None
@staticmethod @staticmethod
@ -45,7 +44,7 @@ class Cdm:
""" """
Converts client identification information to a dictionary. Converts client identification information to a dictionary.
Parameters: Args:
client_id (ClientIdentification): The client identification. client_id (ClientIdentification): The client identification.
Returns: Returns:
@ -58,143 +57,120 @@ class Cdm:
""" """
Converts encrypted client identification information to a dictionary. Converts encrypted client identification information to a dictionary.
Parameters: Args:
encrypted_client_id (EncryptedClientIdentification): The encrypted client identification. encrypted_client_id (EncryptedClientIdentification): The encrypted client identification.
Returns: Returns:
dict: A dictionary of encrypted client information. dict: A dictionary of encrypted client information.
""" """
content = { content = {
"providerId": encrypted_client_id.provider_id, 'providerId': encrypted_client_id.provider_id,
"serviceCertificateSerialNumber": encrypted_client_id.service_certificate_serial_number, 'serviceCertificateSerialNumber': encrypted_client_id.service_certificate_serial_number,
"encryptedClientId": encrypted_client_id.encrypted_client_id, 'encryptedClientId': encrypted_client_id.encrypted_client_id,
"encryptedClientIdIv": encrypted_client_id.encrypted_client_id_iv, 'encryptedClientIdIv': encrypted_client_id.encrypted_client_id_iv,
"encryptedPrivacyKey": encrypted_client_id.encrypted_privacy_key 'encryptedPrivacyKey': encrypted_client_id.encrypted_privacy_key
} }
return { return {
k: base64.b64encode(v).decode("utf-8") if isinstance(v, bytes) else v k: base64.b64encode(v).decode('utf-8') if isinstance(v, bytes) else v
for k, v in content.items() for k, v in content.items()
} }
def set_challenge(self, data: Union[Path, bytes]) -> None: def set_challenge(self, data: Union[Path, bytes]) -> None:
""" """
Sets the challenge data by extracting device information and client ID. Sets the challenge data by extracting device information.
Parameters: Args:
data (Union[Path, bytes]): Challenge data as a file path or raw bytes. data (Union[Path, bytes]): The challenge data as a file path or bytes.
Raises: Raises:
FileNotFoundError: If the file path doesn't exist. FileNotFoundError: If the provided file path does not exist.
Exception: Logs any other exceptions that occur.
""" """
try:
# Check if the data is a Path object, indicating it's a file path
if isinstance(data, Path): if isinstance(data, Path):
if not data.is_file():
raise FileNotFoundError(data)
data = data.read_bytes() data = data.read_bytes()
# Parse the signed message from the data try:
signed_message = SignedMessage() signed_message = SignedMessage()
signed_message.ParseFromString(data) signed_message.ParseFromString(data)
# Parse the license request from the signed message
license_request = LicenseRequest() license_request = LicenseRequest()
license_request.ParseFromString(signed_message.msg) license_request.ParseFromString(signed_message.msg)
# Extract the encrypted client ID, if available
# https://integration.widevine.com/diagnostics # https://integration.widevine.com/diagnostics
encrypted_client_id: EncryptedClientIdentification = license_request.encrypted_client_id encrypted_client_id: EncryptedClientIdentification = license_request.encrypted_client_id
if encrypted_client_id.SerializeToString(): if encrypted_client_id.SerializeToString():
# If encrypted, log the encrypted client ID and indicate encryption self.logger.info('Receive encrypted client id: \n\n%s\n', json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2))
self.logger.info("Receive encrypted client id: \n\n%s\n", json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2)) self.logger.warning('The client ID of the challenge is encrypted')
self.logger.warning("The client ID of the challenge is encrypted")
else: else:
# If unencrypted, extract and set the client ID
client_id: ClientIdentification = license_request.client_id client_id: ClientIdentification = license_request.client_id
self.set_client_id(data=client_id) self.set_client_id(data=client_id)
except FileNotFoundError as e:
raise FileNotFoundError(f"Challenge file not found: {data}") from e
except Exception as e: except Exception as e:
self.logger.debug("Failed to set challenge data: %s", e) self.logger.debug('Failed to set challenge data: %s', e)
def set_private_key(self, data: Union[Path, bytes], name: str = None) -> None: def set_private_key(self, data: Union[Path, bytes], name: str = None) -> None:
""" """
Sets the private key from the provided data. Sets the private key from the provided data.
Parameters: Args:
data (Union[Path, bytes]): The private key data, either as a file path or byte data. data (Union[Path, bytes]): The private key data, either as a file path or bytes.
name (str, optional): Function name for verification against known functions. name (str, optional): Function name for verification against known functions.
Raises: Raises:
FileNotFoundError: If the file path doesn't exist. FileNotFoundError: If the provided file path does not exist or is not a file.
Exception: Logs any other exceptions that occur.
""" """
try:
# Check if the data is a Path object, indicating it's a file path
if isinstance(data, Path): if isinstance(data, Path):
if not data.is_file():
raise FileNotFoundError(data)
data = data.read_bytes() data = data.read_bytes()
# Import the private key using the RSA module try:
key = RSA.import_key(data) key = RSA.import_key(data)
# Log the private key if it's not already in the dictionary
if key.n not in self.private_key: if key.n not in self.private_key:
self.logger.info("Receive private key: \n\n%s\n", key.exportKey("PEM").decode("utf-8")) self.logger.info('Receive private key: \n\n%s\n', key.exportKey('PEM').decode('utf-8'))
# If a function name is provided, verify it against known functions
if name and name not in OEM_CRYPTO_API: if name and name not in OEM_CRYPTO_API:
self.logger.warning("The function '%s' does not belong to the referenced functions. Communicate it to the developer to improve the tool.",name) self.logger.warning('The function "%s" does not belong to the referenced functions. Communicate it to the developer to improve the tool.', name)
# Store the private key in the dictionary, using the modulus (key.n) as the key
self.private_key[key.n] = key self.private_key[key.n] = key
except FileNotFoundError as e:
raise FileNotFoundError(f"Private key file not found: {data}") from e
except Exception as e: except Exception as e:
self.logger.debug("Failed to set private key: %s", e) self.logger.debug('Failed to set private key: %s', e)
def set_client_id(self, data: Union[ClientIdentification, bytes]) -> None: def set_client_id(self, data: Union[ClientIdentification, bytes]) -> None:
""" """
Sets the client ID from the provided data. Sets the client ID from the provided data.
Parameters: Args:
data (Union[ClientIdentification, bytes]): The client ID data. data (Union[ClientIdentification, bytes]): The client ID data.
""" """
try: try:
# Check if the provided data is already a `ClientIdentification` object
if isinstance(data, ClientIdentification): if isinstance(data, ClientIdentification):
client_id = data client_id = data
else: else:
# Deserialize the byte data into a `ClientIdentification` object
client_id = ClientIdentification() client_id = ClientIdentification()
client_id.ParseFromString(data) client_id.ParseFromString(data)
# Initialize objects for parsing the DRM certificate and signed certificate
signed_drm_certificate = SignedDrmCertificate() signed_drm_certificate = SignedDrmCertificate()
drm_certificate = DrmCertificate() drm_certificate = DrmCertificate()
# Parse the signed DRM certificate from the client ID token
signed_drm_certificate.ParseFromString(client_id.token) signed_drm_certificate.ParseFromString(client_id.token)
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate) drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
# Extract the public key from the DRM certificate
public_key = drm_certificate.public_key public_key = drm_certificate.public_key
key = RSA.importKey(public_key) key = RSA.importKey(public_key)
# Check if this public key has already been recorded and log the client ID info
if key.n not in self.client_id: if key.n not in self.client_id:
self.logger.info("Receive client id: \n\n%s\n", json.dumps(self.__client_info(client_id), indent=2)) self.logger.info('Receive client id: \n\n%s\n', json.dumps(self.__client_info(client_id), indent=2))
# Store the client ID in the client_id dictionary, using the public key modulus (`key.n`) as the key
self.client_id[key.n] = client_id self.client_id[key.n] = client_id
except Exception as e: except Exception as e:
self.logger.debug("Failed to set client ID: %s", e) self.logger.debug('Failed to set client ID: %s', e)
def set_device_id(self, data: bytes) -> None: def set_device_id(self, data: bytes) -> None:
""" """
Sets the device ID in the keybox. Sets the device ID in the keybox if it is enabled.
Parameters: Args:
data (bytes): The device ID to be stored in the keybox. data (bytes): The device ID data to be stored in the keybox.
""" """
if self.keybox: if self.keybox:
self.keybox.set_device_id(data=data) self.keybox.set_device_id(data=data)
@ -203,7 +179,7 @@ class Cdm:
""" """
Sets the keybox data. Sets the keybox data.
Parameters: Args:
data (bytes): The keybox data to be set. data (bytes): The keybox data to be set.
""" """
if self.keybox: if self.keybox:
@ -211,69 +187,56 @@ class Cdm:
def export(self, parent: Path, wvd: bool = False) -> bool: def export(self, parent: Path, wvd: bool = False) -> bool:
""" """
Exports client ID, private key, and optionally WVD files to disk. Exports the client ID and private key to disk.
Parameters: Args:
parent (Path): Directory to export the files to. parent (Path): The parent directory to export the files to.
wvd (bool, optional): Whether to export WVD files. Defaults to False. wvd (bool): Whether to export WVD files.
Returns: Returns:
bool: True if any keys were exported, otherwise False. bool: True if any keys were exported, otherwise False.
""" """
# Find the intersection of client IDs and private keys
keys = self.client_id.keys() & self.private_key.keys() keys = self.client_id.keys() & self.private_key.keys()
for k in keys: for k in keys:
# Retrieve client information based on the client ID
client_info = self.__client_info(self.client_id[k]) client_info = self.__client_info(self.client_id[k])
# https://github.com/devine-dl/pywidevine/blob/master/pywidevine/main.py#L211 # https://github.com/devine-dl/pywidevine/blob/master/pywidevine/main.py#L211
device = Device( device = Device(
client_id=self.client_id[k].SerializeToString(), client_id=self.client_id[k].SerializeToString(),
private_key=self.private_key[k].exportKey("PEM"), private_key=self.private_key[k].exportKey('PEM'),
type_=DeviceTypes.ANDROID, type_=DeviceTypes.ANDROID,
security_level=3, security_level=3,
flags=None flags=None
) )
# Generate a sanitized file path for exporting the data
# https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146958022 # https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146958022
parent = sanitize_filepath(parent / client_info["company_name"] / client_info["model_name"] / str(device.system_id) / str(k)[:10]) parent = sanitize_filepath(parent / client_info['company_name'] / client_info['model_name'] / str(device.system_id) / str(k)[:10])
parent.mkdir(parents=True, exist_ok=True) parent.mkdir(parents=True, exist_ok=True)
# Export the client ID to a binary file path_id_bin = parent / 'client_id.bin'
path_id_bin = parent / "client_id.bin"
path_id_bin.write_bytes(data=device.client_id.SerializeToString()) path_id_bin.write_bytes(data=device.client_id.SerializeToString())
self.logger.info("Exported client ID: %s", path_id_bin) self.logger.info('Exported client ID: %s', path_id_bin)
# Export the private key to a PEM file path_key_bin = parent / 'private_key.pem'
path_key_bin = parent / "private_key.pem" path_key_bin.write_bytes(data=device.private_key.exportKey('PEM'))
path_key_bin.write_bytes(data=device.private_key.exportKey("PEM")) self.logger.info('Exported private key: %s', path_key_bin)
self.logger.info("Exported private key: %s", path_key_bin)
# If the WVD option is enabled, export the WVD file
if wvd: if wvd:
# Serialize the device to WVD format
wvd_bin = device.dumps() wvd_bin = device.dumps()
# Generate a unique name for the WVD file using client and device details
name = f"{client_info['company_name']} {client_info['model_name']}" name = f"{client_info['company_name']} {client_info['model_name']}"
if client_info.get("widevine_cdm_version"): if client_info.get('widevine_cdm_version'):
name += f" {client_info['widevine_cdm_version']}" name += f" {client_info['widevine_cdm_version']}"
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}" name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
name = unidecode(name.strip().lower().replace(" ", "_")) name = unidecode(name.strip().lower().replace(' ', '_'))
path_wvd = parent / sanitize_filename(f"{name}_{device.system_id}_l{device.security_level}.wvd") path_wvd = parent / sanitize_filename(f'{name}_{device.system_id}_l{device.security_level}.wvd')
# Export the WVD file to disk
path_wvd.write_bytes(data=wvd_bin) path_wvd.write_bytes(data=wvd_bin)
self.logger.info("Exported WVD: %s", path_wvd) self.logger.info('Exported WVD: %s', path_wvd)
# If keybox is available and hasn't been exported, issue a warning
if self.keybox and not self.keybox.export(parent=parent.parent): if self.keybox and not self.keybox.export(parent=parent.parent):
self.logger.warning("The keybox has not been intercepted or decrypted") self.logger.warning('The keybox has not been intercepted or decrypted')
# Return True if any keys were exported, otherwise return False
return len(keys) > 0 return len(keys) > 0
__all__ = ("Cdm",) __all__ = ('Cdm',)

View File

@ -2,144 +2,140 @@ from pathlib import Path
from keydive.vendor import Vendor from keydive.vendor import Vendor
# https://cs.android.com/android/platform/superproject/+/android14-qpr3-release:bionic/libc/libc.map.txt # https://developer.android.com/ndk/guides/cpp-support
NATIVE_C_API = { NATIVE_C_API = {
# BUILT-IN # BUILT-IN
"main", 'main',
# STDIO # STDIO
"fclose", "fflush", "fgetc", "fgetpos", "fgets", "fopen", "fprintf", "fputc", "fputs", "fread", "freopen", 'fclose', 'fflush', 'fgetc', 'fgetpos', 'fgets', 'fopen', 'fprintf', 'fputc', 'fputs', 'fread', 'freopen',
"fscanf", "fseek", "fsetpos", "ftell", "fwrite", "getc", "getchar", "gets", "perror", "printf", "putc", 'fscanf', 'fseek', 'fsetpos', 'ftell', 'fwrite', 'getc', 'getchar', 'gets', 'perror', 'printf', 'putc',
"putchar", "puts", "remove", "rename", "rewind", "scanf", "setbuf", "setvbuf", "sprintf", "sscanf", "tmpfile", 'putchar', 'puts', 'remove', 'rename', 'rewind', 'scanf', 'setbuf', 'setvbuf', 'sprintf', 'sscanf', 'tmpfile',
"tmpnam", "ungetc", "vfprintf", "vprintf", "vsprintf", "fileno", "feof", "ferror", "snprintf", 'tmpnam', 'ungetc', 'vfprintf', 'vprintf', 'vsprintf', 'fileno', 'feof', 'ferror', 'snprintf',
# STDLIB # STDLIB
"abort", "abs", "atexit", "atof", "atoi", "atol", "bsearch", "calloc", "div", "exit", "free", "getenv", "labs", 'abort', 'abs', 'atexit', 'atof', 'atoi', 'atol', 'bsearch', 'calloc', 'div', 'exit', 'free', 'getenv', 'labs',
"ldiv", "malloc", "mblen", "mbstowcs", "mbtowc", "qsort", "rand", "realloc", "srand", "strtod", "strtol", 'ldiv', 'malloc', 'mblen', 'mbstowcs', 'mbtowc', 'qsort', 'rand', 'realloc', 'srand', 'strtod', 'strtol',
"strtoul", "system", "wcstombs", "wctomb", 'strtoul', 'system', 'wcstombs', 'wctomb',
# STRING # STRING
"memchr", "memcmp", "memcpy", "memmove", "memset", "strcat", "strchr", "strcmp", "strcoll", "strcpy", "strcspn", 'memchr', 'memcmp', 'memcpy', 'memmove', 'memset', 'strcat', 'strchr', 'strcmp', 'strcoll', 'strcpy', 'strcspn',
"strerror", "strlen", "strncat", "strncmp", "strncpy", "strpbrk", "strrchr", "strspn", "strstr", "strtok", 'strerror', 'strlen', 'strncat', 'strncmp', 'strncpy', 'strpbrk', 'strrchr', 'strspn', 'strstr', 'strtok',
"strxfrm", "strncasecmp", 'strxfrm', 'strncasecmp',
# MATH # MATH
"acos", "asin", "atan", "atan2", "cos", "cosh", "exp", "fabs", "floor", "fmod", "frexp", "ldexp", "log", 'acos', 'asin', 'atan', 'atan2', 'cos', 'cosh', 'exp', 'fabs', 'floor', 'fmod', 'frexp', 'ldexp', 'log',
"log10", "modf", "pow", "sin", "sinh", "sqrt", "tan", "tanh", 'log10', 'modf', 'pow', 'sin', 'sinh', 'sqrt', 'tan', 'tanh',
# CTYPE # CTYPE
"isalnum", "isalpha", "iscntrl", "isdigit", "isgraph", "islower", "isprint", "ispunct", "isspace", "isupper", 'isalnum', 'isalpha', 'iscntrl', 'isdigit', 'isgraph', 'islower', 'isprint', 'ispunct', 'isspace', 'isupper',
"isxdigit", "tolower", "toupper", 'isxdigit', 'tolower', 'toupper',
# TIME # TIME
"asctime", "clock", "ctime", "difftime", "gmtime", "localtime", "mktime", "strftime", "time", 'asctime', 'clock', 'ctime', 'difftime', 'gmtime', 'localtime', 'mktime', 'strftime', 'time',
# UNISTD # UNISTD
"access", "alarm", "chdir", "chown", "close", "dup", "dup2", "execle", "execv", "execve", "execvp", "fork", 'access', 'alarm', 'chdir', 'chown', 'close', 'dup', 'dup2', 'execle', 'execv', 'execve', 'execvp', 'fork',
"fpathconf", "getcwd", "getegid", "geteuid", "getgid", "getgroups", "getlogin", "getopt", "getpgid", "getpgrp", 'fpathconf', 'getcwd', 'getegid', 'geteuid', 'getgid', 'getgroups', 'getlogin', 'getopt', 'getpgid', 'getpgrp',
"getpid", "getppid", "getuid", "isatty", "lseek", "pathconf", "pause", "pipe", "read", "rmdir", "setgid", 'getpid', 'getppid', 'getuid', 'isatty', 'lseek', 'pathconf', 'pause', 'pipe', 'read', 'rmdir', 'setgid',
"setpgid", "setsid", "setuid", "sleep", "sysconf", "tcgetpgrp", "tcsetpgrp", "ttyname", "ttyname_r", "write", 'setpgid', 'setsid', 'setuid', 'sleep', 'sysconf', 'tcgetpgrp', 'tcsetpgrp', 'ttyname', 'ttyname_r', 'write',
"fsync", "unlink", "syscall", "getpagesize", 'fsync', 'unlink', 'syscall', 'getpagesize',
# FCNTL # FCNTL
"creat", "fcntl", "open", 'creat', 'fcntl', 'open',
# SYS_TYPE # SYS_TYPE
"fd_set", "FD_CLR", "FD_ISSET", "FD_SET", "FD_ZERO", 'fd_set', 'FD_CLR', 'FD_ISSET', 'FD_SET', 'FD_ZERO',
# SYS_STAT # SYS_STAT
"chmod", "fchmod", "fstat", "mkdir", "mkfifo", "stat", "umask", 'chmod', 'fchmod', 'fstat', 'mkdir', 'mkfifo', 'stat', 'umask',
# SYS_TIME # SYS_TIME
"gettimeofday", "select", "settimeofday", 'gettimeofday', 'select', 'settimeofday',
# SIGNAL # SIGNAL
"signal", "raise", "kill", "sigaction", "sigaddset", "sigdelset", "sigemptyset", "sigfillset", "sigismember", 'signal', 'raise', 'kill', 'sigaction', 'sigaddset', 'sigdelset', 'sigemptyset', 'sigfillset', 'sigismember',
"sigpending", "sigprocmask", "sigsuspend", "alarm", "pause", 'sigpending', 'sigprocmask', 'sigsuspend', 'alarm', 'pause',
# SETJMP # SETJMP
"longjmp", "setjmp", 'longjmp', 'setjmp',
# ERRNO # ERRNO
"errno", "strerror", "perror", 'errno', 'strerror', 'perror',
# ASSERT # ASSERT
"assert", 'assert',
# LOCAL # LOCAL
"localeconv", "setlocale", 'localeconv', 'setlocale',
# WCHAR # WCHAR
"btowc", "fgetwc", "fgetws", "fputwc", "fputws", "fwide", "fwprintf", "fwscanf", "getwc", "getwchar", "mbrlen", 'btowc', 'fgetwc', 'fgetws', 'fputwc', 'fputws', 'fwide', 'fwprintf', 'fwscanf', 'getwc', 'getwchar', 'mbrlen',
"mbrtowc", "mbsinit", "mbsrtowcs", "putwc", "putwchar", "swprintf", "swscanf", "ungetwc", "vfwprintf", 'mbrtowc', 'mbsinit', 'mbsrtowcs', 'putwc', 'putwchar', 'swprintf', 'swscanf', 'ungetwc', 'vfwprintf',
"vfwscanf", "vwprintf", "vwscanf", "wcrtomb", "wcscat", "wcschr", "wcscmp", "wcscoll", "wcscpy", "wcscspn", 'vfwscanf', 'vwprintf', 'vwscanf', 'wcrtomb', 'wcscat', 'wcschr', 'wcscmp', 'wcscoll', 'wcscpy', 'wcscspn',
"wcsftime", "wcslen", "wcsncat", "wcsncmp", "wcsncpy", "wcspbrk", "wcsrchr", "wcsrtombs", "wcsspn", "wcsstr", 'wcsftime', 'wcslen', 'wcsncat', 'wcsncmp', 'wcsncpy', 'wcspbrk', 'wcsrchr', 'wcsrtombs', 'wcsspn', 'wcsstr',
"wcstod", "wcstok", "wcstol", "wcstombs", "wcstoul", "wcsxfrm", "wctob", "wmemchr", "wmemcmp", "wmemcpy", 'wcstod', 'wcstok', 'wcstol', 'wcstombs', 'wcstoul', 'wcsxfrm', 'wctob', 'wmemchr', 'wmemcmp', 'wmemcpy',
"wmemmove", "wmemset", "wprintf", "wscanf", 'wmemmove', 'wmemset', 'wprintf', 'wscanf',
# WCTYPE # WCTYPE
"iswalnum", "iswalpha", "iswcntrl", "iswdigit", "iswgraph", "iswlower", "iswprint", "iswpunct", "iswspace", 'iswalnum', 'iswalpha', 'iswcntrl', 'iswdigit', 'iswgraph', 'iswlower', 'iswprint', 'iswpunct', 'iswspace',
"iswupper", "iswxdigit", "towlower", "towupper", "iswctype", "wctype", 'iswupper', 'iswxdigit', 'towlower', 'towupper', 'iswctype', 'wctype',
# STDDEF # STDDEF
"NULL", "offsetof", "ptrdiff_t", "size_t", "wchar_t", 'NULL', 'offsetof', 'ptrdiff_t', 'size_t', 'wchar_t',
# STDARG # STDARG
"va_arg", "va_end", "va_start", 'va_arg', 'va_end', 'va_start',
# DLFCN # DLFCN
"dlclose", "dlerror", "dlopen", "dlsym", 'dlclose', 'dlerror', 'dlopen', 'dlsym',
# DIRENT # DIRENT
"closedir", "opendir", "readdir", 'closedir', 'opendir', 'readdir',
# SYS_SENDFILE # SYS_SENDFILE
"sendfile", 'sendfile',
# SYS_MMAN # SYS_MMAN
"mmap", "mprotect", "munmap", 'mmap', 'mprotect', 'munmap',
# SYS_UTSNAME # SYS_UTSNAME
"uname", 'uname',
# LINK # LINK
"dladdr" 'dladdr'
} }
# https://cs.android.com/search?q=oemcrypto&sq=&ss=android%2Fplatform%2Fsuperproject
OEM_CRYPTO_API = { OEM_CRYPTO_API = {
# Mapping of function names across different API levels (obfuscated names may vary). # Mapping of function names across different API levels (obfuscated names may vary).
"rnmsglvj", "polorucp", "kqzqahjq", "pldrclfq", "kgaitijd", "cwkfcplc", "crhqcdet", "ulns", "dnvffnze", "ygjiljer", 'rnmsglvj', 'polorucp', 'kqzqahjq', 'pldrclfq', 'kgaitijd', 'cwkfcplc', 'crhqcdet', 'ulns', 'dnvffnze', 'ygjiljer',
"qbjxtubz", "qkfrcjtw", "rbhjspoh", "zgtjmxko", "igrqajte", "ofskesua", "qllcoacg", "pukctkiv", "ehdqmfmd", 'qbjxtubz', 'qkfrcjtw', 'rbhjspoh', 'zgtjmxko', 'igrqajte', 'ofskesua', 'qllcoacg', 'pukctkiv', 'ehdqmfmd',
"xftzvkwx", "gndskkuk", "wcggmnnx", "kaatohcz", "ktmgdchz", "jkcwonus", "ehmduqyt", "vewtuecx", "mxrbzntq", 'xftzvkwx', 'gndskkuk', 'wcggmnnx', 'kaatohcz', 'ktmgdchz', 'jkcwonus', 'ehmduqyt', 'vewtuecx'
"isyowgmp", "flzfkhbc", "rtgejgqb", "sxxprljw", "ebxjbtxl", "pcmtpkrj"
# Add more as needed for different versions. # Add more as needed for different versions.
} }
# https://developer.android.com/tools/releases/platforms # https://developer.android.com/tools/releases/platforms
CDM_VENDOR_API = { CDM_VENDOR_API = {
"mediaserver": [ 'mediaserver': [
Vendor(22, 11, "1.0", r"libwvdrmengine(?:@\S+)?\.so") Vendor(22, 11, '1.0', 'libwvdrmengine.so')
], ],
"mediadrmserver": [ 'mediadrmserver': [
Vendor(24, 11, "1.0", r"libwvdrmengine(?:@\S+)?\.so") Vendor(24, 11, '1.0', 'libwvdrmengine.so')
], ],
"android.hardware.drm@1.0-service.widevine": [ 'android.hardware.drm@1.0-service.widevine': [
Vendor(26, 13, "5.1.0", r"libwvhidl(?:@\S+)?\.so") Vendor(26, 13, '5.1.0', 'libwvhidl.so')
], ],
"android.hardware.drm@1.1-service.widevine": [ 'android.hardware.drm@1.1-service.widevine': [
Vendor(28, 14, "14.0.0", r"libwvhidl(?:@\S+)?\.so") Vendor(28, 14, '14.0.0', 'libwvhidl.so')
], ],
"android.hardware.drm@1.2-service.widevine": [ 'android.hardware.drm@1.2-service.widevine': [
Vendor(29, 15, "15.0.0", r"libwvhidl(?:@\S+)?\.so") Vendor(29, 15, '15.0.0', 'libwvhidl.so')
], ],
"android.hardware.drm@1.3-service.widevine": [ 'android.hardware.drm@1.3-service.widevine': [
Vendor(30, 16, "16.0.0", r"libwvhidl(?:@\S+)?\.so") Vendor(30, 16, '16.0.0', 'libwvhidl.so')
], ],
"android.hardware.drm@1.4-service.widevine": [ 'android.hardware.drm@1.4-service.widevine': [
Vendor(31, 16, "16.1.0", r"libwvhidl(?:@\S+)?\.so") Vendor(31, 16, '16.1.0', 'libwvhidl.so')
], ],
"android.hardware.drm-service.widevine": [ 'android.hardware.drm-service.widevine': [
Vendor(33, 17, "17.0.0", r"libwvaidl(?:@\S+)?\.so"), Vendor(33, 17, '17.0.0', 'libwvaidl.so'),
Vendor(34, 18, "18.0.0", r"android\.hardware\.drm-service(?:-lazy)?\.widevine(?:@\S+)?"), Vendor(34, 18, '18.0.0', 'android.hardware.drm-service.widevine'),
Vendor(35, 18, "19.0.1", r"android\.hardware\.drm-service(?:-lazy)?\.widevine(?:@\S+)?") Vendor(35, 18, '19.0.1', 'android.hardware.drm-service.widevine')
] ]
} }
# https://developers.google.com/widevine # https://developers.google.com/widevine
CDM_FUNCTION_API = { CDM_FUNCTION_API = {
"UsePrivacyMode", 'UsePrivacyMode',
"GetCdmClientPropertySet", 'GetCdmClientPropertySet',
"PrepareKeyRequest", 'PrepareKeyRequest',
"getOemcryptoDeviceId", 'getOemcryptoDeviceId',
"lcc07", 'lcc07',
"oecc07", 'oecc07',
"Read", 'Read',
"x1c36", 'x1c36',
"runningcrc" 'runningcrc'
} }
# Maximum clear API level for Keybox
# https://cs.android.com/android/platform/superproject/+/android14-qpr3-release:trusty/user/app/sample/hwcrypto/keybox/keybox.c
KEYBOX_MAX_CLEAR_API = 28 KEYBOX_MAX_CLEAR_API = 28
# https://github.com/kaltura/kaltura-device-info-android # https://github.com/kaltura/kaltura-device-info-android
DRM_PLAYER = { DRM_PLAYER = {
"package": "com.kaltura.kalturadeviceinfo", 'package': 'com.kaltura.kalturadeviceinfo',
"path": Path(__file__).parent.parent / "docs" / "server" / "kaltura.apk", 'path': Path(__file__).parent.parent / 'docs' / 'server' / 'kaltura.apk',
"url": "https://github.com/kaltura/kaltura-device-info-android/releases/download/t3/kaltura-device-info-release.apk" 'url': 'https://github.com/kaltura/kaltura-device-info-android/releases/download/t3/kaltura-device-info-release.apk'
} }

View File

@ -24,59 +24,55 @@ class Core:
""" """
Initializes a Core instance. Initializes a Core instance.
Parameters: Args:
adb (ADB): ADB instance for device communication. adb (ADB): ADB instance for device communication.
cdm (Cdm): Instance for handling DRM-related operations. cdm (Cdm): Instance of Cdm for managing DRM related operations.
functions (Path, optional): Path to Ghidra XML file for symbol extraction. Defaults to None. functions (Path, optional): Path to Ghidra XML functions file for symbol extraction. Defaults to None.
skip (bool, optional): Whether to skip predefined functions (e.g., OEM_CRYPTO_API). Defaults to False. skip (bool, optional): Flag to determine whether to skip predefined functions (e.g., OEM_CRYPTO_API).
""" """
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.running = True self.running = True
self.cdm = cdm self.cdm = cdm
self.adb = adb self.adb = adb
# Flag to skip predefined functions based on the vendor's API level
# https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679 # https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679
# Flag to skip predefined functions based on the vendor's API level
self.skip = skip self.skip = skip
# Load the hook script with relevant data and prepare for injection # Load the hook script and prepare for injection
self.functions = functions self.functions = functions
self.script = self.__prepare_hook_script() self.script = self.__prepare_hook_script()
self.logger.info("Hook script prepared successfully") self.logger.info('Script loaded successfully')
def __prepare_hook_script(self) -> str: def __prepare_hook_script(self) -> str:
""" """
Prepares the hook script by injecting library-specific data. Prepares the hook script content by injecting the library-specific scripts.
Returns: Returns:
str: The finalized hook script content with placeholders replaced. str: The prepared script content.
""" """
# Read the base JavaScript template file content = Path(__file__).with_name('keydive.js').read_text(encoding='utf-8')
content = Path(__file__).with_name("keydive.js").read_text(encoding="utf-8")
# Generate the list of symbols from the functions file
symbols = self.__prepare_symbols(self.functions) symbols = self.__prepare_symbols(self.functions)
# Define the placeholder replacements # Replace placeholders in script template
replacements = { replacements = {
"${OEM_CRYPTO_API}": json.dumps(list(OEM_CRYPTO_API)), '${OEM_CRYPTO_API}': json.dumps(list(OEM_CRYPTO_API)),
"${NATIVE_C_API}": json.dumps(list(NATIVE_C_API)), '${NATIVE_C_API}': json.dumps(list(NATIVE_C_API)),
"${SYMBOLS}": json.dumps(symbols), '${SYMBOLS}': json.dumps(symbols),
"${SKIP}": str(self.skip) '${SKIP}': str(self.skip)
} }
# Replace placeholders in the script content
for placeholder, value in replacements.items(): for placeholder, value in replacements.items():
content = content.replace(placeholder, value, 1) content = content.replace(placeholder, value)
return content return content
def __prepare_symbols(self, path: Path) -> list: def __prepare_symbols(self, path: Path) -> list:
""" """
Extracts relevant functions from a Ghidra XML file. Parses the provided XML functions file to select relevant functions.
Parameters: Args:
path (Path): Path to the Ghidra XML functions file. path (Path): Path to Ghidra XML functions file.
Returns: Returns:
list: List of selected functions as dictionaries. list: List of selected functions as dictionaries.
@ -85,83 +81,70 @@ class Core:
FileNotFoundError: If the functions file is not found. FileNotFoundError: If the functions file is not found.
ValueError: If functions extraction fails. ValueError: If functions extraction fails.
""" """
# Return an empty list if no path is provided
if not path: if not path:
return [] return []
elif not path.is_file():
raise FileNotFoundError('Functions file not found')
try: try:
# Parse the XML file and extract program data program = xmltodict.parse(path.read_bytes())['PROGRAM']
program = xmltodict.parse(path.read_bytes())["PROGRAM"] addr_base = int(program['@IMAGE_BASE'], 16)
addr_base = int(program["@IMAGE_BASE"], 16) # Base address for function addresses functions = program['FUNCTIONS']['FUNCTION']
functions = program["FUNCTIONS"]["FUNCTION"] # List of functions in the XML
# Identify a target function from the predefined OEM_CRYPTO_API list (if not skipped) # Find a target function from a predefined list
target = next((f["@NAME"] for f in functions if f["@NAME"] in OEM_CRYPTO_API and not self.skip), None) target = None if self.skip else next((f['@NAME'] for f in functions if f['@NAME'] in OEM_CRYPTO_API), None)
# Prepare a dictionary to store selected functions # Extract relevant functions
selected = {} selected = {}
for func in functions: for func in functions:
name = func["@NAME"] # Function name name = func['@NAME']
args = len(func.get("REGISTER_VAR", [])) # Number of arguments args = len(func.get('REGISTER_VAR', []))
""" # Add function if it matches specific criteria
Add the function if it matches specific criteria
- Match the target function if identified
- Match API keywords
- Match unnamed functions with 6+ args
"""
if name not in selected and ( if name not in selected and (
name == target name == target
or any(True if self.skip else keyword in name for keyword in CDM_FUNCTION_API) or any(True if self.skip else keyword in name for keyword in CDM_FUNCTION_API)
or (not target and re.match(r"^[a-z]+$", name) and args >= 6) or (not target and re.match(r'^[a-z]+$', name) and args >= 6)
): ):
selected[name] = { selected[name] = {
"type": "function", 'type': 'function',
"name": name, 'name': name,
"address": hex(int(func["@ENTRY_POINT"], 16) - addr_base) # Calculate relative address 'address': hex(int(func['@ENTRY_POINT'], 16) - addr_base)
} }
# Return the list of selected functions
return list(selected.values()) return list(selected.values())
except FileNotFoundError as e:
raise FileNotFoundError(f"Functions file not found: {path}") from e
except Exception as e: except Exception as e:
raise ValueError("Failed to extract functions from Ghidra XML file") from e raise ValueError('Failed to extract functions from Ghidra') from e
def __process_message(self, message: dict, data: bytes) -> None: def __process_message(self, message: dict, data: bytes) -> None:
""" """
Handles messages received from the Frida script. Handles messages received from the Frida script.
Parameters: Args:
message (dict): The message payload. message (dict): The message payload.
data (bytes): The raw data associated with the message. data (bytes): The raw data associated with the message.
""" """
logger = logging.getLogger("Script") logger = logging.getLogger('Script')
level = message.get("payload") level = message.get('payload')
if isinstance(level, int): if isinstance(level, int):
# Log the message based on its severity level # Process logging messages from Frida script
logger.log(level=level, msg=data.decode("utf-8")) logger.log(level=level, msg=data.decode('utf-8'))
if level in (logging.FATAL, logging.CRITICAL): if level in (logging.FATAL, logging.CRITICAL):
self.running = False # Stop the process on critical errors self.running = False
elif isinstance(level, dict) and "private_key" in level: elif isinstance(level, dict) and 'private_key' in level:
# Set the private key in the DRM handler self.cdm.set_private_key(data=data, name=level['private_key'])
self.cdm.set_private_key(data=data, name=level["private_key"]) elif level == 'challenge':
elif level == "challenge":
# Set the challenge data in the DRM handler
self.cdm.set_challenge(data=data) self.cdm.set_challenge(data=data)
elif level == "device_id": elif level == 'device_id':
# Set the device ID in the DRM handler
self.cdm.set_device_id(data) self.cdm.set_device_id(data)
elif level == "keybox": elif level == 'keybox':
# Set the keybox data in the DRM handler
self.cdm.set_keybox(data) self.cdm.set_keybox(data)
def hook_process(self, pid: int, vendor: Vendor, timeout: int = 0) -> bool: def hook_process(self, pid: int, vendor: Vendor, timeout: int = 0) -> bool:
""" """
Hooks into the specified process. Hooks into the specified process.
Parameters: Args:
pid (int): The process ID to hook. pid (int): The process ID to hook.
vendor (Vendor): Instance of Vendor class representing the vendor information. vendor (Vendor): Instance of Vendor class representing the vendor information.
timeout (int, optional): Timeout for attaching to the process. Defaults to 0. timeout (int, optional): Timeout for attaching to the process. Defaults to 0.
@ -170,59 +153,36 @@ class Core:
bool: True if the process was successfully hooked, otherwise False. bool: True if the process was successfully hooked, otherwise False.
""" """
try: try:
# Attach to the target process using the specified PID.
# The 'persist_timeout' parameter ensures the session persists for the given duration.
session: Session = self.adb.device.attach(pid, persist_timeout=timeout) session: Session = self.adb.device.attach(pid, persist_timeout=timeout)
except frida.ServerNotRunningError as e: except frida.ServerNotRunningError as e:
# Handle the case where the Frida server is not running on the device. raise EnvironmentError('Frida server is not running') from e
raise EnvironmentError("Frida server is not running") from e
except Exception as e: except Exception as e:
# Log other exceptions and return False to indicate failure.
self.logger.error(e) self.logger.error(e)
return False return False
# Define a callback to handle when the process is destroyed.
def __process_destroyed() -> None: def __process_destroyed() -> None:
session.detach() session.detach()
# Create a Frida script object using the prepared script content.
script: Script = session.create_script(self.script) script: Script = session.create_script(self.script)
script.on("message", self.__process_message) script.on('message', self.__process_message)
script.on("destroyed", __process_destroyed) script.on('destroyed', __process_destroyed)
script.load() script.load()
# Fetch a list of libraries loaded by the target process. library = script.exports_sync.getlibrary(vendor.name)
libraries = script.exports_sync.getlibraries()
library = next((l for l in libraries if re.match(vendor.pattern, l["name"])), None)
if library: if library:
# Log information about the library if it is found. self.logger.info('Library: %s (%s)', library['name'], library['path'])
self.logger.info("Library: %s (%s)", library["name"], library["path"])
# Retrieve and log the version of the Frida server. # Check if Ghidra XML functions loaded
version = script.exports_sync.getversion() if vendor.oem > 17 and not self.functions:
self.logger.debug(f"Server: %s", version) self.logger.warning('For OEM API > 17, specifying "functions" is required, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md')
elif vendor.oem < 18 and self.functions:
self.logger.warning('The "functions" attribute is deprecated for OEM API < 18')
# Determine if the Frida server version is older than 16.6.0. return script.exports_sync.hooklibrary(vendor.name)
code = tuple(map(int, version.split(".")))
minimum = code[0] > 16 or (code[0] == 16 and code[1] >= 6)
# Warn the user if certain conditions related to the functions option are met.
if minimum and self.functions:
self.logger.warning("The '--functions' option is deprecated starting from Frida 16.6.0")
elif not minimum and vendor.oem < 18 and self.functions:
self.logger.warning("The '--functions' option is deprecated for OEM API < 18")
elif not minimum and vendor.oem > 17 and not self.functions:
self.logger.warning("For OEM API > 17, specifying '--functions' is required. Refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md")
# Enable dynamic analysis (symbols) only when necessary
dynamic = minimum and vendor.oem > 17 and not self.functions
return script.exports_sync.hooklibrary(library["name"], dynamic)
# Unload the script if the target library is not found.
script.unload() script.unload()
self.logger.warning("Library not found: %s" % vendor.pattern) self.logger.warning('Library not found: %s' % vendor.name)
return False return False
__all__ = ("Core",) __all__ = ('Core',)

View File

@ -5,20 +5,21 @@ import logging
from json.encoder import encode_basestring_ascii from json.encoder import encode_basestring_ascii
from typing import Literal from typing import Literal
from uuid import UUID from uuid import UUID
from pathlib import Path from pathlib import Path
def bytes2int(value: bytes, byteorder: Literal["big", "little"] = "big", signed: bool = False) -> int: def bytes2int(value: bytes, byteorder: Literal['big', 'little'] = 'big', signed: bool = False) -> int:
""" """
Convert a byte sequence to an integer. Convert bytes to an integer.
Parameters: Args:
value (bytes): The byte sequence to convert. value (bytes): The byte sequence to convert.
byteorder (str, optional): Byte order for conversion. 'big' or 'little'. Defaults to 'big'. byteorder (Literal['big', 'little'], optional): The byte order for conversion. Defaults to 'big'.
signed (bool, optional): Whether the integer is signed. Defaults to False. signed (bool, optional): Indicates if the integer is signed. Defaults to False.
Returns: Returns:
int: The integer representation of the byte sequence. int: The converted integer.
""" """
return int.from_bytes(value, byteorder=byteorder, signed=signed) return int.from_bytes(value, byteorder=byteorder, signed=signed)
@ -30,7 +31,12 @@ class Keybox:
def __init__(self): def __init__(self):
""" """
Initializes the Keybox object, setting up logger and containers for device IDs and keyboxes. Initializes the Keybox object, setting up a logger and containers for device IDs and keyboxes.
Attributes:
logger (Logger): Logger instance for logging messages.
device_id (list[bytes]): List of unique device IDs (32 bytes each).
keybox (dict[bytes, bytes]): Dictionary mapping device IDs to their respective keybox data.
""" """
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
# https://github.com/kaltura/kaltura-device-info-android/blob/master/app/src/main/java/com/kaltura/kalturadeviceinfo/MainActivity.java#L203 # https://github.com/kaltura/kaltura-device-info-android/blob/master/app/src/main/java/com/kaltura/kalturadeviceinfo/MainActivity.java#L203
@ -41,7 +47,7 @@ class Keybox:
""" """
Set the device ID from the provided data. Set the device ID from the provided data.
Parameters: Args:
data (bytes): The device ID, expected to be 32 bytes long. data (bytes): The device ID, expected to be 32 bytes long.
Raises: Raises:
@ -49,22 +55,19 @@ class Keybox:
""" """
try: try:
size = len(data) size = len(data)
# Ensure the device ID is exactly 32 bytes long assert size == 32, f'Invalid keybox length: {size}. Should be 32 bytes'
assert size == 32, f"Invalid device ID length: {size}. Should be 32 bytes"
# Add device ID to the list if it's not already present
if data not in self.device_id: if data not in self.device_id:
self.logger.info("Receive device id: \n\n%s\n", encode_basestring_ascii(data.decode("utf-8"))) self.logger.info('Receive device id: \n\n%s\n', encode_basestring_ascii(data.decode('utf-8')))
self.device_id.append(data) self.device_id.append(data)
except Exception as e: except Exception as e:
self.logger.debug("Failed to set device id: %s", e) self.logger.debug('Failed to set device id: %s', e)
def set_keybox(self, data: bytes) -> None: def set_keybox(self, data: bytes) -> None:
""" """
Set the keybox from the provided data. Set the keybox from the provided data.
Parameters: Args:
data (bytes): The keybox data, expected to be either 128 or 132 bytes long. data (bytes): The keybox data, expected to be either 128 or 132 bytes long.
Raises: Raises:
@ -73,74 +76,68 @@ class Keybox:
# https://github.com/zybpp/Python/tree/master/Python/keybox # https://github.com/zybpp/Python/tree/master/Python/keybox
try: try:
size = len(data) size = len(data)
# Validate the keybox size (128 or 132 bytes) assert size in (128, 132), f'Invalid keybox length: {size}. Should be 128 or 132 bytes'
assert size in (128, 132), f"Invalid keybox length: {size}. Should be 128 or 132 bytes"
# Validate the QSEE-style keybox end if size == 132:
assert size == 128 or data[128:132] == b"LVL1", "QSEE-style keybox must end with bytes 'LVL1'" assert data[128:132] == b"LVL1", 'QSEE-style keybox must end with bytes "LVL1"'
# Validate the keybox magic (should be 'kbox') assert data[120:124] == b"kbox", 'Invalid keybox magic'
assert data[120:124] == b"kbox", "Invalid keybox magic"
device_id = data[0:32] # Extract the device ID from the first 32 bytes device_id = data[0:32]
# Retrieve and log the structured keybox information # Retrieve structured keybox info and log it
infos = self.__keybox_info(data) infos = self.__keybox_info(data)
encrypted = infos["flags"] > 10 # Check if the keybox is encrypted encrypted = infos['flags'] > 10
self.set_device_id(data=device_id) # Set the device ID self.set_device_id(data=device_id)
# Log and store the keybox data if it's a new keybox or the device ID is updated
if (device_id in self.keybox and self.keybox[device_id] != (data, encrypted)) or device_id not in self.keybox: if (device_id in self.keybox and self.keybox[device_id] != (data, encrypted)) or device_id not in self.keybox:
self.logger.info("Receive keybox: \n\n%s\n", json.dumps(infos, indent=2)) self.logger.info('Receive keybox: \n\n%s\n', json.dumps(infos, indent=2))
# Warn if keybox is encrypted and interception of plaintext device token is needed # Warn if flags indicate encryption, which requires an unencrypted device token
if encrypted: if encrypted:
self.logger.warning("Keybox contains encrypted data. Interception of plaintext device token is needed") self.logger.warning('Keybox contains encrypted data. Interception of plaintext device token is needed')
# Store the keybox (encrypted or not) for the device ID # Store or update the keybox for the device if it's not already saved
if (device_id in self.keybox and not encrypted) or device_id not in self.keybox: if (device_id in self.keybox and not encrypted) or device_id not in self.keybox:
self.keybox[device_id] = (data, encrypted) self.keybox[device_id] = (data, encrypted)
except Exception as e: except Exception as e:
self.logger.debug("Failed to set keybox: %s", e) self.logger.debug('Failed to set keybox: %s', e)
@staticmethod @staticmethod
def __keybox_info(data: bytes) -> dict: def __keybox_info(data: bytes) -> dict:
""" """
Extract keybox information from the provided data. Extract keybox information from the provided data.
Parameters: Args:
data (bytes): The keybox data. data (bytes): The keybox data.
Returns: Returns:
dict: A dictionary containing extracted keybox information. dict: A dictionary containing extracted keybox information.
""" """
# https://github.com/wvdumper/dumper/blob/main/Helpers/Keybox.py#L51 # https://github.com/wvdumper/dumper/blob/main/Helpers/Keybox.py#L51
# Extract device-specific information from the keybox data
device_token = data[48:120] device_token = data[48:120]
# Prepare the keybox content dictionary
content = { content = {
"device_id": data[0:32].decode("utf-8"), # Device's unique identifier (32 bytes) 'device_id': data[0:32].decode('utf-8'), # Device's unique identifier (32 bytes)
"device_key": data[32:48], # Device cryptographic key (16 bytes) 'device_key': data[32:48], # Device-specific cryptographic key (16 bytes)
"device_token": device_token, # Token for device authentication (72 bytes) 'device_token': device_token, # Token used for device authentication (72 bytes)
"keybox_tag": data[120:124].decode("utf-8"), # Magic tag (4 bytes) 'keybox_tag': data[120:124].decode('utf-8'), # Magic tag indicating keybox format (4 bytes)
"crc32": bytes2int(data[124:128]), # CRC32 checksum (4 bytes) 'crc32': bytes2int(data[124:128]), # CRC32 checksum for data integrity verification (4 bytes)
"level_tag": data[128:132].decode("utf-8") or None, # Optional level tag (4 bytes) 'level_tag': data[128:132].decode('utf-8') or None, # Optional tag indicating keybox level (4 bytes).
# Extract metadata from the device token (Bytes 48120) # Additional metadata parsed from the device token (Bytes 48120).
"flags": bytes2int(device_token[0:4]), # Device flags (4 bytes) 'flags': bytes2int(device_token[0:4]), # Flags indicating specific device capabilities (4 bytes).
"system_id": bytes2int(device_token[4:8]), # System identifier (4 bytes) 'system_id': bytes2int(device_token[4:8]), # System identifier (4 bytes).
"provisioning_id": UUID(bytes_le=device_token[8:24]), # Provisioning UUID (16 bytes)
"encrypted_bits": device_token[24:72] # Encrypted device-specific information (48 bytes) # Provisioning ID, encrypted and derived from the unique ID in the system.
'provisioning_id': UUID(bytes_le=device_token[8:24]), # Provisioning UUID (16 bytes).
# Encrypted bits containing device key, key hash, and additional flags.
'encrypted_bits': device_token[24:72] ## Encrypted device-specific information (48 bytes).
} }
# https://github.com/ThatNotEasy/Parser-DRM/blob/main/modules/widevine.py#L84 # Encode certain fields in base64 and convert UUIDs to string
# TODO: decrypt device token value
# Encode bytes as base64 and convert UUIDs to string
return { return {
k: base64.b64encode(v).decode("utf-8") if isinstance(v, bytes) else str(v) if isinstance(v, UUID) else v k: base64.b64encode(v).decode('utf-8') if isinstance(v, bytes) else str(v) if isinstance(v, UUID) else v
for k, v in content.items() for k, v in content.items()
} }
@ -148,33 +145,24 @@ class Keybox:
""" """
Export the keybox data to a file in the specified parent directory. Export the keybox data to a file in the specified parent directory.
Parameters: Args:
parent (Path): The parent directory where the keybox data will be saved. parent (Path): The parent directory where the keybox data will be saved.
Returns: Returns:
bool: True if any keybox were exported, otherwise False. bool: True if any keybox were exported, otherwise False.
""" """
# Find matching keyboxes based on the device_id
keys = self.device_id & self.keybox.keys() keys = self.device_id & self.keybox.keys()
for k in keys: for k in keys:
# Create the parent directory if it doesn't exist # Prepare target directory and export file path
parent.mkdir(parents=True, exist_ok=True) parent.mkdir(parents=True, exist_ok=True)
path_keybox_bin = parent / f"keybox.{'enc' if self.keybox[k][1] else 'bin'}"
# Define the export file path and extension (encrypted or binary)
path_keybox_bin = parent / ("keybox." + ("enc" if self.keybox[k][1] else "bin"))
# Write the keybox data to the file
path_keybox_bin.write_bytes(self.keybox[k][0]) path_keybox_bin.write_bytes(self.keybox[k][0])
# Log export status based on whether the keybox is encrypted
if self.keybox[k][1]: if self.keybox[k][1]:
self.logger.warning("Exported encrypted keybox: %s", path_keybox_bin) self.logger.warning('Exported encrypted keybox: %s', path_keybox_bin)
else: else:
self.logger.info("Exported keybox: %s", path_keybox_bin) self.logger.info('Exported keybox: %s', path_keybox_bin)
# Return True if any keyboxes were exported, otherwise False
return len(keys) > 0 return len(keys) > 0
__all__ = ("Keybox",) __all__ = ('Keybox',)

View File

@ -1,5 +1,5 @@
/** /**
* Date: 2025-03-01 * Date: 2024-11-01
* Description: DRM key extraction for research and educational purposes. * Description: DRM key extraction for research and educational purposes.
* Source: https://github.com/hyugogirubato/KeyDive * Source: https://github.com/hyugogirubato/KeyDive
*/ */
@ -65,41 +65,27 @@ const print = (level, message) => {
send(level, message); send(level, message);
} }
const getVersion = () => Frida.version;
// @Utils // @Utils
const getLibraries = () => { const getLibraries = (name) => {
// https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792 // https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792
try { try {
return Process.enumerateModules(); const libraries = Process.enumerateModules();
return libraries.filter(l => l.name.includes(name));
} catch (e) { } catch (e) {
print(Level.CRITICAL, e.message); print(Level.CRITICAL, e.message);
return []; return [];
} }
} };
const getLibrary = (name) => { const getLibrary = (name) => {
const libraries = getLibraries().filter(l => l.name === name); const libraries = getLibraries(name);
return libraries.length === 1 ? libraries[0] : undefined; return libraries.length === 1 ? libraries[0] : undefined;
} }
const getFunctions = (library, dynamic) => { const getFunctions = (library) => {
try { try {
// https://frida.re/news/2025/01/09/frida-16-6-0-released/ return library.enumerateExports();
const functions = dynamic ? library.enumerateSymbols().map(item => ({
type: item.type,
name: item.name,
address: item.address
})) : [];
library.enumerateExports().forEach(item => {
if (!functions.includes(item)) {
functions.push(item);
}
});
return functions;
} catch (e) { } catch (e) {
print(Level.CRITICAL, e.message); print(Level.CRITICAL, e.message);
return []; return [];
@ -111,7 +97,7 @@ const disableLibrary = (name) => {
const library = getLibrary(name); const library = getLibrary(name);
if (library) { if (library) {
// https://github.com/hyugogirubato/KeyDive/issues/23#issuecomment-2230374415 // https://github.com/hyugogirubato/KeyDive/issues/23#issuecomment-2230374415
const functions = getFunctions(library, false); const functions = getFunctions(library);
const disabled = []; const disabled = [];
functions.forEach(func => { functions.forEach(func => {
@ -130,7 +116,7 @@ const disableLibrary = (name) => {
}); });
print(Level.INFO, `Library ${library.name} (${library.path}) has been disabled`); print(Level.INFO, `Library ${library.name} (${library.path}) has been disabled`);
} else { } else {
print(Level.DEBUG, `Library ${name} was not found`); print(Level.INFO, `Library ${name} was not found`);
} }
} }
@ -395,7 +381,7 @@ const RunningCRC = (address) => {
// @Hooks // @Hooks
const hookLibrary = (name, dynamic) => { const hookLibrary = (name) => {
// https://github.com/poxyran/misc/blob/master/frida-enumerate-imports.py // https://github.com/poxyran/misc/blob/master/frida-enumerate-imports.py
let library = getLibrary(name); let library = getLibrary(name);
if (!library) return false; if (!library) return false;
@ -409,8 +395,7 @@ const hookLibrary = (name, dynamic) => {
address: library.base.add(s.address) address: library.base.add(s.address)
})); }));
} else { } else {
// https://github.com/hyugogirubato/KeyDive/issues/50 functions = getFunctions(library);
functions = getFunctions(library, dynamic);
} }
functions = functions.filter(f => !NATIVE_C_API.includes(f.name)); functions = functions.filter(f => !NATIVE_C_API.includes(f.name));
@ -468,7 +453,6 @@ const hookLibrary = (name, dynamic) => {
// RPC interfaces exposed to external calls. // RPC interfaces exposed to external calls.
rpc.exports = { rpc.exports = {
getversion: getVersion, getlibrary: getLibrary,
getlibraries: getLibraries,
hooklibrary: hookLibrary hooklibrary: hookLibrary
}; };

View File

@ -3,32 +3,32 @@ class Vendor:
Represents a Vendor with SDK, OEM, version, and name attributes. Represents a Vendor with SDK, OEM, version, and name attributes.
""" """
def __init__(self, sdk: int, oem: int, version: str, pattern: str): def __init__(self, sdk: int, oem: int, version: str, name: str):
""" """
Initializes a Vendor instance. Initializes a Vendor instance.
Parameters: Args:
sdk (int): Minimum SDK version required by the vendor. sdk (int): Minimum SDK version required.
oem (int): OEM identifier for the vendor. oem (int): OEM identifier.
version (str): Version of the vendor. version (str): Version of the vendor.
pattern (str): Name pattern of the vendor. name (str): Name of the vendor.
""" """
self.sdk = sdk self.sdk = sdk
self.oem = oem self.oem = oem
self.version = version self.version = version
self.pattern = pattern self.name = name
def __repr__(self) -> str: def __repr__(self) -> str:
""" """
Returns a string representation of the Vendor instance. Returns a string representation of the Vendor instance.
Returns: Returns:
str: String representation of the Vendor instance with its attributes. str: String representation of the Vendor instance.
""" """
return "{name}({items})".format( return '{name}({items})'.format(
name=self.__class__.__name__, name=self.__class__.__name__,
items=", ".join([f"{k}={repr(v)}" for k, v in self.__dict__.items()]) items=', '.join([f'{k}={repr(v)}' for k, v in self.__dict__.items()])
) )
__all__ = ("Vendor",) __all__ = ('Vendor',)

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "keydive" name = "keydive"
version = "2.2.1" version = "2.1.3"
description = "Extract Widevine L3 keys from Android devices effortlessly, spanning multiple Android versions for DRM research and education." description = "Extract Widevine L3 keys from Android devices effortlessly, spanning multiple Android versions for DRM research and education."
license = "MIT" license = "MIT"
authors = ["hyugogirubato <65763543+hyugogirubato@users.noreply.github.com>"] authors = ["hyugogirubato <65763543+hyugogirubato@users.noreply.github.com>"]
@ -36,7 +36,7 @@ include = [
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.8" python = "^3.8"
coloredlogs = "^15.0.1" coloredlogs = "^15.0.1"
frida = "^16.6.0" frida = "^16.5.6"
pathlib = "^1.0.1" pathlib = "^1.0.1"
pycryptodomex = "^3.21.0" pycryptodomex = "^3.21.0"
pywidevine = "^1.8.0" pywidevine = "^1.8.0"