Security Features¶
This example demonstrates how to configure and enforce security practices within a Cello application. It covers strict security headers, building a Content Security Policy (CSP), setting up token bucket and sliding window rate limiters, validating JWT tokens, and configuring secure cookie sessions.
Features Demonstrated¶
- Security Headers: Configuring standard headers such as HSTS, X-Frame-Options, X-Content-Type-Options, and Referrer policies.
- Content Security Policy (CSP): Assembling source rules programmatically to prevent XSS and cross-site injections.
- Parametrized Rate Limiting: Deploying token bucket rate limiters for general routes alongside sliding window limiters for login paths.
- JWT Authorization: Verifying bearer tokens from HTTP Authorization headers.
- Session Protections: Setting up secure cookie sessions with HttpOnly, Secure, and Strict SameSite attributes.
Complete Source Code¶
#!/usr/bin/env python3
"""
Security Features Example for Cello v1.0.1.
This example demonstrates security-focused features including:
- Security headers configuration
- Content Security Policy (CSP)
- Rate limiting patterns
- JWT authentication patterns
- Session security
Run with:
python examples/security.py
Then test with:
curl http://127.0.0.1:8000/
curl http://127.0.0.1:8000/secure-headers
curl http://127.0.0.1:8000/api/protected -H "Authorization: Bearer your-token"
"""
from cello import App, Response
# Security configuration imports
from cello import (
JwtConfig,
RateLimitConfig,
SessionConfig,
SecurityHeadersConfig,
CSP,
)
app = App()
# Enable middleware
app.enable_cors(origins=["https://example.com"]) # Restrict origins in production
app.enable_logging()
# =============================================================================
# Security Configurations
# =============================================================================
# JWT Configuration
# In production, use environment variables for secrets!
jwt_config = JwtConfig(
secret="your-256-bit-secret-change-me-in-production",
algorithm="HS256",
header_name="Authorization",
cookie_name="auth_token", # Also check cookies
leeway=60, # Allow 60 seconds clock skew
)
# Rate Limiting - Different strategies for different endpoints
api_rate_limit = RateLimitConfig.token_bucket(
capacity=100, # 100 requests
refill_rate=10, # 10 requests per second refill
)
login_rate_limit = RateLimitConfig.sliding_window(
max_requests=5, # 5 login attempts
window_secs=300, # per 5 minutes
)
# Session Configuration - Secure defaults
session_config = SessionConfig(
cookie_name="cello_session",
cookie_path="/",
cookie_domain=None,
cookie_secure=True, # HTTPS only
cookie_http_only=True, # No JavaScript access
cookie_same_site="Strict", # Strict same-site policy
max_age=3600, # 1 hour session
)
# Security Headers - Strict configuration
security_headers = SecurityHeadersConfig.secure()
# Content Security Policy - Strict policy
csp = CSP()
csp.default_src(["'self'"])
csp.script_src(["'self'"])
csp.style_src(["'self'", "'unsafe-inline'"]) # For inline styles
csp.img_src(["'self'", "data:", "https:"])
csp.font_src(["'self'", "https://fonts.gstatic.com"])
csp_header_value = csp.build()
# =============================================================================
# Helper Functions
# =============================================================================
def add_security_headers(response):
"""Add security headers to response."""
headers = {
"X-Frame-Options": security_headers.x_frame_options or "DENY",
"X-Content-Type-Options": "nosniff" if security_headers.x_content_type_options else None,
"X-XSS-Protection": security_headers.x_xss_protection,
"Referrer-Policy": security_headers.referrer_policy,
"Content-Security-Policy": csp_header_value,
}
# Add HSTS if configured
if security_headers.hsts_max_age:
hsts_value = f"max-age={security_headers.hsts_max_age}"
if security_headers.hsts_include_subdomains:
hsts_value += "; includeSubDomains"
if security_headers.hsts_preload:
hsts_value += "; preload"
headers["Strict-Transport-Security"] = hsts_value
for key, value in headers.items():
if value:
response.set_header(key, value)
return response
def verify_jwt_token(request):
"""
Verify JWT token from request.
In a real application, you would decode and verify the JWT.
This is a pattern example showing where verification would happen.
"""
auth_header = request.get_header("Authorization")
if not auth_header:
return None, "Missing Authorization header"
if not auth_header.startswith("Bearer "):
return None, "Invalid Authorization header format"
token = auth_header[7:] # Remove "Bearer " prefix
# In a real app, decode and verify the JWT here
# Example with PyJWT:
# try:
# payload = jwt.decode(token, jwt_config.secret, algorithms=[jwt_config.algorithm])
# return payload, None
# except jwt.ExpiredSignatureError:
# return None, "Token has expired"
# except jwt.InvalidTokenError:
# return None, "Invalid token"
# For demo purposes, just check if token exists
if token:
return {"user_id": 123, "role": "user"}, None
return None, "Invalid token"
# =============================================================================
# Routes
# =============================================================================
@app.get("/")
def home(request):
"""Security features overview."""
response_data = {
"message": "Cello Security Features Demo",
"version": "1.0.1",
"features": {
"jwt_auth": "JWT authentication with configurable algorithms",
"rate_limiting": "Token bucket and sliding window algorithms",
"session_management": "Secure cookie-based sessions",
"security_headers": "Industry-standard security headers",
"csp": "Content Security Policy builder",
},
"endpoints": [
"GET / - This overview",
"GET /secure-headers - Response with security headers",
"GET /api/public - Public endpoint",
"GET /api/protected - Protected endpoint (needs JWT)",
"POST /api/login - Login endpoint (rate limited)",
"GET /config/security - Security configuration info",
],
}
response = Response.json(response_data)
return add_security_headers(response)
@app.get("/secure-headers")
def secure_headers_demo(request):
"""Demonstrate security headers."""
response_data = {
"message": "This response includes security headers",
"headers_added": {
"X-Frame-Options": "Prevents clickjacking attacks",
"X-Content-Type-Options": "Prevents MIME type sniffing",
"X-XSS-Protection": "Enables XSS filter in older browsers",
"Referrer-Policy": "Controls referrer information",
"Content-Security-Policy": "Prevents XSS and data injection",
"Strict-Transport-Security": "Enforces HTTPS (when enabled)",
},
}
response = Response.json(response_data)
return add_security_headers(response)
@app.get("/api/public")
def public_endpoint(request):
"""Public API endpoint - no authentication required."""
return {
"message": "This is a public endpoint",
"authenticated": False,
}
@app.get("/api/protected")
def protected_endpoint(request):
"""Protected API endpoint - requires JWT authentication."""
user, error = verify_jwt_token(request)
if error:
response = Response.json(
{"error": "Unauthorized", "detail": error},
status=401
)
response.set_header("WWW-Authenticate", "Bearer")
return response
return {
"message": "This is a protected endpoint",
"authenticated": True,
"user": user,
}
@app.post("/api/login")
def login_endpoint(request):
"""
Login endpoint with rate limiting pattern.
Rate limiting would be enforced at the middleware level.
This shows the pattern for handling login requests.
"""
try:
data = request.json()
except Exception:
return Response.json({"error": "Invalid JSON"}, status=400)
username = data.get("username")
password = data.get("password")
if not username or not password:
return Response.json(
{"error": "Missing username or password"},
status=400
)
# In a real app, verify credentials against database
# This is a demo - never do this in production!
if username == "demo" and password == "password":
# Generate JWT token here
return {
"message": "Login successful",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.demo",
"expires_in": 3600,
}
return Response.json(
{"error": "Invalid credentials"},
status=401
)
@app.get("/config/security")
def security_config_info(request):
"""Display security configuration (safe values only)."""
return {
"jwt": {
"algorithm": jwt_config.algorithm,
"header_name": jwt_config.header_name,
"cookie_name": jwt_config.cookie_name,
"leeway_seconds": jwt_config.leeway,
},
"rate_limits": {
"api": {
"algorithm": api_rate_limit.algorithm,
"capacity": api_rate_limit.capacity,
"refill_rate": api_rate_limit.refill_rate,
},
"login": {
"algorithm": login_rate_limit.algorithm,
"max_requests": login_rate_limit.capacity,
"window_seconds": login_rate_limit.window_secs,
},
},
"session": {
"cookie_name": session_config.cookie_name,
"cookie_secure": session_config.cookie_secure,
"cookie_http_only": session_config.cookie_http_only,
"cookie_same_site": session_config.cookie_same_site,
"max_age_seconds": session_config.max_age,
},
"csp_policy": csp_header_value,
}
# =============================================================================
# Error Handlers (Pattern Examples)
# =============================================================================
@app.get("/error/unauthorized")
def unauthorized_example(request):
"""Example of 401 Unauthorized response."""
response = Response.json(
{
"type": "/errors/unauthorized",
"title": "Unauthorized",
"status": 401,
"detail": "Authentication is required to access this resource",
},
status=401
)
response.set_header("Content-Type", "application/problem+json")
response.set_header("WWW-Authenticate", "Bearer")
return response
@app.get("/error/forbidden")
def forbidden_example(request):
"""Example of 403 Forbidden response."""
response = Response.json(
{
"type": "/errors/forbidden",
"title": "Forbidden",
"status": 403,
"detail": "You don't have permission to access this resource",
},
status=403
)
response.set_header("Content-Type", "application/problem+json")
return response
@app.get("/error/rate-limited")
def rate_limited_example(request):
"""Example of 429 Too Many Requests response."""
response = Response.json(
{
"type": "/errors/rate-limited",
"title": "Too Many Requests",
"status": 429,
"detail": "Rate limit exceeded. Please try again later.",
"retry_after": 60,
},
status=429
)
response.set_header("Content-Type", "application/problem+json")
response.set_header("Retry-After", "60")
return response
if __name__ == "__main__":
print("🔐 Cello Security Features Demo")
print()
print(" Try these endpoints:")
print(" - GET http://127.0.0.1:8000/")
print(" - GET http://127.0.0.1:8000/secure-headers")
print(" - GET http://127.0.0.1:8000/api/public")
print(" - GET http://127.0.0.1:8000/api/protected")
print(" - POST http://127.0.0.1:8000/api/login")
print()
print(" Test protected endpoint with:")
print(' curl -H "Authorization: Bearer test-token" http://127.0.0.1:8000/api/protected')
print()
app.run(host="127.0.0.1", port=8000)
Running This Example¶
python examples/security.py
# Test public:
curl http://127.0.0.1:8000/api/public
# Test headers:
curl -I http://127.0.0.1:8000/secure-headers
# Test protected without auth:
curl http://127.0.0.1:8000/api/protected
# Test protected with auth:
curl -H "Authorization: Bearer mock-token-value" http://127.0.0.1:8000/api/protected
Key Concepts¶
- HSTS / Security Headers: Implementing X-Frame-Options and X-Content-Type-Options blocks basic cross-frame scripting and MIME exploitation attempts.
- CSP Builder: Content Security Policy allows specifying whitelist sources for resources, preventing execution of unauthorized scripts.
- Endpoint Specific Rate Limits: Using token buckets for API traffic alongside sliding window limits for login prevents brute force authentication requests.