Database & Redis Integration¶
Cello's data-layer helpers give you production-ready database and cache connectivity through simple configuration objects and a single enable_* call. This example wires up a PostgreSQL connection pool, an async Redis client, and demonstrates how the @transactional decorator wraps an async handler in a database transaction — so errors automatically trigger a rollback without any try/except boilerplate in your business logic.
Features Demonstrated¶
DatabaseConfig— typed configuration for PostgreSQL with pool size and connection lifetime settingsRedisConfig— typed configuration for Redis with pool size controlapp.enable_database(config)— initialises the connection pool and makes it available to handlersapp.enable_redis(config)— initialises the async Redis client@transactionaldecorator fromcello.database— wraps an async handler in an atomic DB transaction- CRUD endpoints for a users resource with proper HTTP status codes (
201,404) - Key/value cache endpoints backed by an in-memory mock (drop-in for real Redis calls)
- Fund-transfer endpoint showing transactional integrity with balance validation
Complete Source Code¶
#!/usr/bin/env python3
"""
Database & Redis Integration Demo for Cello v1.0.1.
Run with: python examples/database_demo.py
Test:
curl http://127.0.0.1:8000/users
curl -X POST http://127.0.0.1:8000/users -d '{"name": "Alice", "email": "alice@example.com"}'
curl http://127.0.0.1:8000/cache/test
curl -X POST http://127.0.0.1:8000/transfer -d '{"from": 1, "to": 2, "amount": 100}'
"""
from cello import App, Response, DatabaseConfig, RedisConfig
from cello.database import transactional
app = App()
db_config = DatabaseConfig(url="postgresql://user:password@localhost:5432/mydb", pool_size=20, max_lifetime_secs=1800)
redis_config = RedisConfig(url="redis://localhost:6379", pool_size=10)
app.enable_database(db_config)
app.enable_redis(redis_config)
app.enable_cors()
app.enable_logging()
mock_users = [
{"id": 1, "name": "Alice", "email": "alice@example.com", "balance": 1000},
{"id": 2, "name": "Bob", "email": "bob@example.com", "balance": 500},
{"id": 3, "name": "Charlie","email":"charlie@example.com","balance": 750},
]
mock_cache = {}
next_id = 4
@app.get("/")
def home(request):
return {"message": "Cello v1.0.1 - Data Layer Demo",
"features": {"database": "Connection pooling", "redis": "Async Redis client", "transactions": "@transactional decorator"}}
@app.get("/users")
def list_users(request):
return {"users": mock_users, "count": len(mock_users)}
@app.get("/users/{id}")
def get_user(request):
user_id = int(request.params.get("id", 0))
user = next((u for u in mock_users if u["id"] == user_id), None)
if user is None:
return Response.json({"error": "User not found"}, status=404)
return {"user": user}
@app.post("/users")
def create_user(request):
global next_id
data = request.json()
user = {"id": next_id, "name": data.get("name","Anonymous"), "email": data.get("email",""), "balance": 0}
mock_users.append(user)
next_id += 1
return Response.json({"user": user, "created": True}, status=201)
@app.get("/cache/{key}")
def get_cached(request):
key = request.params.get("key", "")
value = mock_cache.get(key)
if value is None:
return Response.json({"key": key, "hit": False}, status=404)
return {"key": key, "value": value, "hit": True}
@app.post("/cache")
def set_cached(request):
data = request.json()
key, value = data.get("key",""), data.get("value","")
if not key:
return Response.json({"error": "Key required"}, status=400)
mock_cache[key] = value
return {"key": key, "stored": True}
@app.post("/transfer")
@transactional
async def transfer_funds(request):
data = request.json()
from_id, to_id, amount = int(data["from"]), int(data["to"]), float(data["amount"])
if amount <= 0:
return Response.json({"error": "Amount must be positive"}, status=400)
from_user = next((u for u in mock_users if u["id"] == from_id), None)
to_user = next((u for u in mock_users if u["id"] == to_id), None)
if not from_user or not to_user:
return Response.json({"error": "User not found"}, status=404)
if from_user["balance"] < amount:
return Response.json({"error": "Insufficient funds"}, status=400)
from_user["balance"] -= amount
to_user["balance"] += amount
return {"success": True, "transfer": {"from": {"id": from_id, "new_balance": from_user["balance"]},
"to": {"id": to_id, "new_balance": to_user["balance"]}, "amount": amount}}
@app.get("/db/status")
def database_status(request):
return {"database": {"status": "connected", "pool_size": db_config.pool_size}}
@app.get("/redis/status")
def redis_status(request):
return {"redis": {"status": "connected", "cached_keys": len(mock_cache)}}
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8000)
Running This Example¶
Key Concepts¶
DatabaseConfig/RedisConfig— using typed configuration objects (rather than raw strings) lets Cello validate settings at startup and surface misconfiguration errors before the first request arrives.pool_sizeandmax_lifetime_secs— connection pools prevent the overhead of opening a new TCP connection on every request;max_lifetime_secsevicts stale connections so the pool stays healthy across long-running processes.@transactionalon an async handler — the decorator opens a transaction before the handler body runs and commits on success; any unhandled exception triggers an automatic rollback, keeping your data consistent without explicitBEGIN/COMMIT/ROLLBACKcalls.- Returning
Response.json(..., status=404)— returning aResponseobject directly from a handler bypasses Cello's automatic JSON serialisation, giving you full control over status codes and headers for error cases. /db/statusand/redis/statusendpoints — exposing connection pool metadata as dedicated health endpoints is a common pattern for readiness probes in container orchestration environments.