revanced-releases-api/modules/InternalCache.py
2022-09-04 02:32:25 -03:00

38 lines
1.1 KiB
Python

import os
import toml
import msgpack
import aioredis
# Load config
config: dict = toml.load("config.toml")
# Redis connection parameters
redis_config: dict[ str, str | int ] = {
"url": f"redis://{os.environ['REDIS_URL']}",
"port": os.environ['REDIS_PORT'],
"database": config['internal-cache']['database'],
}
class InternalCache:
"""Implements an internal cache for ReVanced Releases API."""
redis_url = f"{redis_config['url']}:{redis_config['port']}/{redis_config['database']}"
redis = aioredis.from_url(redis_url, encoding="utf-8", decode_responses=True)
async def store(self, key: str, value: dict) -> None:
await self.redis.set(key, msgpack.packb(value), ex=config['internal-cache']['expire'])
async def delete(self, key: str) -> None:
await self.redis.delete(key)
async def update(self, key: str, value: dict) -> None:
await self.redis.set(key, msgpack.packb(value), ex=config['internal-cache']['expire'])
async def get(self, key: str) -> dict:
return msgpack.unpackb(await self.redis.get(key))