Alexandre Teles bdbf99a4f7 basic logging
2022-09-08 00:32:31 -03:00

92 lines
2.7 KiB
Python

import sys
import logging
from loguru import logger
from typing import Optional
from types import FrameType
from redis import RedisError
class InterceptHandler(logging.Handler):
"""Setups a loging handler for uvicorn and FastAPI.
Args:
logging (logging.Handler)
"""
def emit(self, record: logging.LogRecord) -> None:
"""Emit a log record.
Args:
record (LogRecord): Logging record
"""
level: str | int
frame: Optional[FrameType]
depth: int
# Get corresponding Loguru level if it exists
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame = logging.currentframe()
depth = 2
# Find caller from where originated the logged message
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
class HTTPXLogger():
"""Logger adapter for HTTPX."""
async def log_request(self, request) -> None:
"""Logs HTTPX requests
Returns:
None
"""
logger.info(f"[HTTPX] Request: {request.method} {request.url} - Waiting for response")
async def log_response(self, response) -> None:
"""Logs HTTPX responses
Returns:
None
"""
request = response.request
logger.info(f"[HTTPX] Response: {request.method} {request.url} - Status: {response.status_code} {response.reason_phrase}")
class InternalCacheLogger:
async def log(self, operation: str, result: RedisError | None = None, key: str = "",) -> None:
"""Logs internal cache operations
Args:
operation (str): Operation name
key (str): Key used in the operation
"""
if type(result) is RedisError:
logger.error(f"[InternalCache] REDIS {operation} - Failed with error: {result}")
else:
logger.info(f"[InternalCache] REDIS {operation} {key} - OK")
def setup_logging(LOG_LEVEL: str, JSON_LOGS: bool) -> None:
"""Setup logging for uvicorn and FastAPI."""
# intercept everything at the root logger
logging.root.handlers = [InterceptHandler()]
logging.root.setLevel(LOG_LEVEL)
# remove every other logger's handlers
# and propagate to root logger
for name in logging.root.manager.loggerDict.keys():
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = True
# configure loguru
logger.configure(handlers=[{"sink": sys.stdout, "serialize": JSON_LOGS}])