import asyncio import uvloop import orjson from base64 import b64decode from toolz.dicttoolz import keyfilter import asyncstdlib.builtins as a from app.utils.HTTPXClient import HTTPXClient class Releases: """Implements the methods required to get the latest releases and patches from revanced repositories.""" uvloop.install() httpx_client = HTTPXClient.create() async def __get_release(self, repository: str, tag="latest") -> list: """Get assets from latest release in a given repository. Args: repository (str): Github's standard username/repository notation tag (str): lateset(default) - to get latest release prerelease - to get lateset prerelease recent - to get recent release either prerelease or stable whichever is recent version - supply a valid version tag Returns: dict: dictionary of filename and download url """ assets: list = [] url = f"https://api.github.com/repos/{repository}/releases" response = await self.httpx_client.get(url).json() match tag: case "recent": response = await self.httpx_client.get(f"{url}/tags/{response[0]['tag_name']}") case "prerelease": for release in response: if release['prerelease']: response = await self.httpx_client.get(f"{url}/tags/{release['tag_name']}") break case "latest": response = await self.httpx_client.get(f"{url}/latest") case _: response = await self.httpx_client.get(f"{url}/tags/{tag}") if response.status_code == 200: release_assets: dict = response.json()['assets'] release_version: str = response.json()['tag_name'] release_tarball: str = response.json()['tarball_url'] release_timestamp: str = response.json()['published_at'] async def get_asset_data(asset: dict) -> dict: return {'repository': repository, 'version': release_version, 'timestamp': asset['updated_at'], 'name': asset['name'], 'size': asset['size'], 'browser_download_url': asset['browser_download_url'], 'content_type': asset['content_type'] } if release_assets: assets = await asyncio.gather(*[get_asset_data(asset) for asset in release_assets]) else: no_release_assets_data: dict = {'repository': repository, 'version': release_version, 'timestamp': release_timestamp, 'name': f"{repository.split('/')[1]}-{release_version}.tar.gz", 'browser_download_url': release_tarball, 'content_type': 'application/gzip' } assets.append(no_release_assets_data) return assets async def get_latest_releases(self, repositories: list, tag="latest") -> dict: """Runs get_release() asynchronously for each repository. Args: repositories (list): List of repositories in Github's standard username/repository notation Returns: dict: A dictionary containing assets from each repository """ releases: dict[str, list] = {} releases['tools'] = [] results: list = await asyncio.gather(*[self.__get_release(repository, tag) for repository in repositories]) releases['tools'] = [asset for result in results for asset in result] return releases async def __get_patches_json(self) -> dict: """Get revanced-patches repository's README.md. Returns: dict: JSON content """ response = await self.httpx_client.get(f"https://api.github.com/repos/revanced/revanced-patches/contents/patches.json") content = orjson.loads( b64decode(response.json()['content']).decode('utf-8')) return content async def get_patches_json(self) -> dict: """Get patches.json from revanced-patches repository. Returns: dict: Patches available for a given app """ patches: dict = await self.__get_patches_json() return patches async def __get_contributors(self, repository: str) -> list: """Get contributors from a given repository. Args: repository (str): Github's standard username/repository notation Returns: list: a list of dictionaries containing the repository's contributors """ keep: set = {'login', 'avatar_url', 'html_url', 'contributions'} response = await self.httpx_client.get(f"https://api.github.com/repos/{repository}/contributors") # Looping over each contributor, filtering each contributor so that # keyfilter() returns a dictionary with only the key-value pairs that are in the "keep" set. contributors: list = [keyfilter(lambda k: k in keep, contributor) for contributor in response.json()] return contributors async def get_contributors(self, repositories: list) -> dict: """Runs get_contributors() asynchronously for each repository. Args: repositories (list): List of repositories in Github's standard username/repository notation Returns: dict: A dictionary containing the contributors from each repository """ contributors: dict[str, list] contributors = {} contributors['repositories'] = [] revanced_repositories = [ repository for repository in repositories if 'revanced' in repository] results: list[dict] = await asyncio.gather(*[self.__get_contributors(repository) for repository in revanced_repositories]) async for key, value in a.zip(revanced_repositories, results): data = {'name': key, 'contributors': value} contributors['repositories'].append(data) return contributors async def get_commits(self, org: str, repository: str, path: str) -> dict: """Get commit history from a given repository. Args: org (str): Username of the organization | valid values: revanced or vancedapp repository (str): Repository name path (str): Path to the file per_page (int): Number of commits to return since (str): ISO 8601 timestamp Raises: Exception: Raise a generic exception if the organization is not revanced or vancedapp Returns: dict: a dictionary containing the repository's latest commits """ payload: dict = {} payload["repository"] = f"{org}/{repository}" payload["path"] = path payload["commits"] = [] if org == 'revanced' or org == 'vancedapp': _releases = await self.httpx_client.get( f"https://api.github.com/repos/{org}/{repository}/releases?per_page=2" ) if _releases.status_code == 200: releases = _releases.json() if any(releases): since = releases[1]['created_at'] until = releases[0]['created_at'] else: raise ValueError("No releases found") _response = await self.httpx_client.get( f"https://api.github.com/repos/{org}/{repository}/commits?path={path}&since={since}&until={until}" ) if _response.status_code == 200: response = _response.json() async def get_commit_data(commit: dict) -> dict: return {'sha': commit['sha'], 'author': commit['commit']['author']['name'], 'date': commit['commit']['author']['date'], 'message': commit['commit']['message'], 'url': commit['html_url'] } data: list = await asyncio.gather(*[get_commit_data(commit) for commit in response]) payload['commits'].append(data) else: raise ValueError("Error retrieving commits") else: raise ValueError("Invalid organization.") return payload