Secrets encryption

This commit is contained in:
Ivan Golikov 2025-01-03 12:04:51 +01:00
parent 30887db256
commit ba7537e15e
5 changed files with 33 additions and 5 deletions

View file

@ -88,5 +88,8 @@ Configuration is done via environment variables.
Environment variables:
- `REDIS_URL`: URL for Redis access. Check what values are supported [here](https://redis.readthedocs.io/en/stable/connections.html#redis.Redis.from_url).
- `SECRETS_ENCRYPTION_KEY`: Key used for encrypting stored data.
You can also declare these variables in a `.env` file in the working directory.
Protect this file (or other source from where `SECRETS_ENCRYPTION_KEY` is read by application)
from being read by unauthorized parties.

View file

@ -0,0 +1,8 @@
from cryptography.fernet import Fernet
from pssecret_server.settings import Settings, get_settings
from typing import Annotated
from fastapi import Depends
def get_fernet(settings: Annotated[Settings, Depends(get_settings)]) -> Fernet:
return Fernet(settings.secrets_encryption_key)

View file

@ -1,16 +1,19 @@
from typing import Annotated
from cryptography.fernet import Fernet
from fastapi import Depends, FastAPI
from fastapi.exceptions import HTTPException
from redis.asyncio import Redis
from pssecret_server.fernet import get_fernet
from pssecret_server.models import Secret, SecretSaveResult
from pssecret_server.redis_db import get_redis
from pssecret_server.utils import save_secret
from pssecret_server.utils import decrypt_secret, encrypt_secret, save_secret
app = FastAPI()
RedisDep = Annotated[Redis, Depends(get_redis)]
FernetDep = Annotated[Fernet, Depends(get_fernet)]
@app.post(
@ -23,7 +26,8 @@ RedisDep = Annotated[Redis, Depends(get_redis)]
),
response_model=SecretSaveResult,
)
async def set_secret(data: Secret, redis: RedisDep) -> dict[str, str]:
async def set_secret(data: Secret, redis: RedisDep, fernet: FernetDep) -> dict[str, str]:
data = encrypt_secret(data, fernet)
return {
"key": await save_secret(data, redis),
}
@ -40,12 +44,12 @@ async def set_secret(data: Secret, redis: RedisDep) -> dict[str, str]:
response_model=Secret,
responses={404: {"description": "The item was not found"}},
)
async def get_secret(secret_key: str, redis: RedisDep) -> dict[str, bytes]:
async def get_secret(secret_key: str, redis: RedisDep, fernet: FernetDep) -> dict[str, bytes]:
data: bytes | None = await redis.getdel(secret_key)
if data is None:
raise HTTPException(404)
return {
"data": data,
"data": decrypt_secret(data, fernet),
}

View file

@ -1,9 +1,12 @@
from pydantic import RedisDsn
from pydantic_settings import BaseSettings
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env")
redis_url: RedisDsn = RedisDsn("redis://localhost")
secrets_encryption_key: bytes
def get_settings() -> Settings:

View file

@ -1,10 +1,20 @@
from uuid import uuid4
from redis.asyncio import Redis
from cryptography.fernet import Fernet
from pssecret_server.models import Secret
def encrypt_secret(data: Secret, fernet: Fernet) -> Secret:
data.data = fernet.encrypt(data.data.encode()).decode()
return data
def decrypt_secret(secret: bytes, fernet: Fernet) -> bytes:
return fernet.decrypt(secret)
async def get_new_key(redis: Redis) -> str:
"""Returns free Redis key"""
while True: