Database Integration¶
Connect your Cello application to a PostgreSQL database using an async connection pool. This example covers pool setup and teardown, full CRUD endpoints for a products resource, and robust error handling so database failures are surfaced cleanly to API clients.
Complete Example¶
import asyncpg
from cello import Cello, Request, Response
app = Cello()
# -------------------------------------------------------------------
# Connection pool — created once on startup, shared across requests
# -------------------------------------------------------------------
DB_URL = "postgresql://user:password@localhost:5432/mydb"
pool: asyncpg.Pool | None = None
@app.on_startup
async def startup():
global pool
pool = await asyncpg.create_pool(
dsn=DB_URL,
min_size=2, # Keep at least 2 connections warm
max_size=10, # Never exceed 10 simultaneous connections
command_timeout=30,
)
# Ensure the products table exists
async with pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
price NUMERIC(10, 2) NOT NULL,
stock INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
@app.on_shutdown
async def shutdown():
if pool:
await pool.close()
# -------------------------------------------------------------------
# Helper
# -------------------------------------------------------------------
def row_to_dict(record: asyncpg.Record) -> dict:
"""Convert an asyncpg Record to a plain dict (JSON-serialisable)."""
return {
"id": record["id"],
"name": record["name"],
"price": float(record["price"]),
"stock": record["stock"],
"created_at": record["created_at"].isoformat(),
}
# -------------------------------------------------------------------
# GET /products — list all products
# -------------------------------------------------------------------
@app.get("/products")
async def list_products(request: Request):
try:
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, name, price, stock, created_at FROM products ORDER BY id"
)
return Response.json([row_to_dict(r) for r in rows])
except Exception as exc:
return Response.json(
{"error": "Failed to fetch products", "detail": str(exc)},
status=500,
)
# -------------------------------------------------------------------
# POST /products — create a new product
# -------------------------------------------------------------------
@app.post("/products")
async def create_product(request: Request):
body = await request.json()
name = body.get("name")
price = body.get("price")
stock = body.get("stock", 0)
if not name or price is None:
return Response.json(
{"error": "Both 'name' and 'price' are required"},
status=400,
)
try:
price = float(price)
stock = int(stock)
except (TypeError, ValueError):
return Response.json(
{"error": "'price' must be a number and 'stock' must be an integer"},
status=400,
)
try:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO products (name, price, stock)
VALUES ($1, $2, $3)
RETURNING id, name, price, stock, created_at
""",
name, price, stock,
)
return Response.json(row_to_dict(row), status=201)
except Exception as exc:
return Response.json(
{"error": "Failed to create product", "detail": str(exc)},
status=500,
)
# -------------------------------------------------------------------
# DELETE /products/{id} — remove a product by primary key
# -------------------------------------------------------------------
@app.delete("/products/{id}")
async def delete_product(request: Request, id: int):
try:
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM products WHERE id = $1", id
)
# asyncpg returns "DELETE <count>" as the status string
deleted_count = int(result.split()[-1])
if deleted_count == 0:
return Response.json(
{"error": f"Product {id} not found"},
status=404,
)
return Response.json({"message": f"Product {id} deleted"})
except Exception as exc:
return Response.json(
{"error": "Failed to delete product", "detail": str(exc)},
status=500,
)
# -------------------------------------------------------------------
# Entry point
# -------------------------------------------------------------------
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
Key Concepts¶
asyncpg.create_pool— creates an async connection pool with configurablemin_sizeandmax_size. The pool is shared across all requests, avoiding the overhead of opening a new connection per request.@app.on_startup/@app.on_shutdown— lifecycle hooks that run once when the server starts and stops, perfect for initialising (and cleanly closing) the pool.pool.acquire()— borrows a connection from the pool for the duration of theasync withblock, then returns it automatically. The pool handles queuing if all connections are in use.- Parameterised queries (
$1,$2, …) — asyncpg uses positional placeholders. Passing values as arguments (never via string formatting) prevents SQL injection. RETURNINGclause — letsINSERThand back the newly created row in one round-trip, avoiding a secondSELECT.- Typed error responses — each endpoint wraps its database call in a
try/exceptand returns a structured JSON error with an appropriate HTTP status code (400,404, or500).
Running This Example¶
# Install dependencies
pip install cello asyncpg
# Ensure PostgreSQL is running and update DB_URL in the script, then:
python examples/basic/database.py
Try the endpoints:
# Create a product
curl -s -X POST http://localhost:8000/products \
-H "Content-Type: application/json" \
-d '{"name": "Widget", "price": 9.99, "stock": 42}' | jq .
# List all products
curl -s http://localhost:8000/products | jq .
# Delete a product (replace 1 with the actual id)
curl -s -X DELETE http://localhost:8000/products/1 | jq .