mirror of
https://github.com/revanced/revanced-releases-api.git
synced 2025-05-02 23:24:27 +02:00

* feat: resolves #5, resolves #4 (#7) * Implements client generation and management * fix announcements endpoints * change annoucements model * bump deps * sync with main * refactor: adopt some functional standards in Releases.py * feat: add new workflows * chore: remove unused files * refactor: update build badge * refactor: move files around and delete unused ones * feat: add authentication endpoints * refactor: clean up code on Clients.py controller * fix: fix the client secret update endpoint * refactor: clean up authentication code * feat: add authentication to client endpoints * chore: bump deps * feat: add admin user generation * feature: add /changelogs endpoint (#10) * feat: move endpoints into custom routers, resolves #12 (#14) * refactor: import routers from old branch * refactor: import InternalCache removal * refactor: move routes into dedicated routers * fix: fixes entrypoint * refactor: add documentation and bump libs * docs: update description (#16) * feat: implement cdn mirrors endpoints, closes #15 (#17) * feat: add cdn mirror endpoints * refactor: change API version in docs * docs: fix titles on API docs page Co-authored-by: oSumAtrIX <johan.melkonyan1@web.de>
79 lines
2.7 KiB
Python
79 lines
2.7 KiB
Python
from loguru import logger
|
|
from redis import RedisError
|
|
from argon2.exceptions import VerifyMismatchError
|
|
|
|
class HTTPXLogger():
|
|
"""Logger adapter for HTTPX."""
|
|
|
|
async def log_request(self, request) -> None:
|
|
"""Logs HTTPX requests
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
logger.info(f"[HTTPX] Request: {request.method} {request.url} - Waiting for response")
|
|
|
|
async def log_response(self, response) -> None:
|
|
"""Logs HTTPX responses
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
request = response.request
|
|
|
|
logger.info(f"[HTTPX] Response: {request.method} {request.url} - Status: {response.status_code} {response.reason_phrase}")
|
|
|
|
class InternalCacheLogger:
|
|
async def log(self, operation: str, result: RedisError | None = None, key: str = "",) -> None:
|
|
"""Logs internal cache operations
|
|
|
|
Args:
|
|
operation (str): Operation name
|
|
key (str): Key used in the operation
|
|
"""
|
|
if type(result) is RedisError:
|
|
logger.error(f"[InternalCache] REDIS {operation} - Failed with error: {result}")
|
|
else:
|
|
logger.info(f"[InternalCache] REDIS {operation} {key} - OK")
|
|
|
|
class UserLogger:
|
|
async def log(self, operation: str, result: RedisError | VerifyMismatchError | None = None,
|
|
key: str = "",) -> None:
|
|
"""Logs internal cache operations
|
|
|
|
Args:
|
|
operation (str): Operation name
|
|
key (str): Key used in the operation
|
|
"""
|
|
if type(result) is RedisError:
|
|
logger.error(f"[User] REDIS {operation} - Failed with error: {result}")
|
|
else:
|
|
logger.info(f"[User] REDIS {operation} {key} - OK")
|
|
|
|
class AnnouncementsLogger:
|
|
async def log(self, operation: str, result: RedisError | None = None, key: str = "") -> None:
|
|
"""Logs internal cache operations
|
|
|
|
Args:
|
|
operation (str): Operation name
|
|
key (str): Key used in the operation
|
|
"""
|
|
if type(result) is RedisError:
|
|
logger.error(f"[User] REDIS {operation} - Failed with error: {result}")
|
|
else:
|
|
logger.info(f"[User] REDIS {operation} {key} - OK")
|
|
|
|
class MirrorsLogger:
|
|
async def log(self, operation: str, result: RedisError | None = None, key: str = "") -> None:
|
|
"""Logs internal cache operations
|
|
|
|
Args:
|
|
operation (str): Operation name
|
|
key (str): Key used in the operation
|
|
"""
|
|
if type(result) is RedisError:
|
|
logger.error(f"[User] REDIS {operation} - Failed with error: {result}")
|
|
else:
|
|
logger.info(f"[User] REDIS {operation} {key} - OK")
|