Cluster Mode & Production Deployment¶
This example demonstrates how to configure Cello applications for production deployments. It details deploying multi-worker processes, setting up graceful shutdowns, establishing TLS certificates, and implementing liveness/readiness probes for orchestration engines like Kubernetes.
Features Demonstrated¶
- Multi-Worker Processes: Autodetecting system CPU counts and configuring process spawning using
ClusterConfig.auto(). - Graceful Shutdown: Configuring worker process cleanup grace periods during shutdowns.
- Probe Endpoints: Exposing
/healthliveness probes and/readyreadiness checks. - Production Tunings: Increasing header limits, connection capacities, and establishing Keep-Alive request caps.
- Prometheus Metrics: Exposing a raw Prometheus format response on
/metrics.
Complete Source Code¶
#!/usr/bin/env python3
"""
Cluster Mode & Production Deployment Demo for Cello v1.0.1.
This example demonstrates production deployment configurations including:
- Cluster mode (multi-worker processes)
- Graceful shutdown handling
- Protocol configuration (HTTP/2, HTTP/3)
- TLS/SSL configuration patterns
- Production-ready settings
Run with:
python examples/cluster_demo.py
For production with multiple workers:
python examples/cluster_demo.py --workers 4
Note: Some features like TLS require proper certificates.
"""
from cello import App, Response
# Production configuration imports
from cello import (
TimeoutConfig,
LimitsConfig,
ClusterConfig,
TlsConfig,
Http2Config,
Http3Config,
)
app = App()
# Enable middleware for production
app.enable_cors()
app.enable_logging()
app.enable_compression(min_size=512)
# =============================================================================
# Production Configurations
# =============================================================================
# Timeout configuration - tuned for production
production_timeouts = TimeoutConfig(
read_header=10, # 10 seconds for headers
read_body=60, # 60 seconds for large uploads
write=60, # 60 seconds for large responses
idle=120, # 120 seconds idle timeout
handler=30, # 30 seconds handler timeout
)
# Limits configuration - production settings
production_limits = LimitsConfig(
max_header_size=16384, # 16KB headers
max_body_size=52428800, # 50MB body (for file uploads)
max_connections=50000, # 50k concurrent connections
max_requests_per_connection=1000, # Keep-alive limit
)
# Cluster configuration - auto-detect CPUs
cluster_config = ClusterConfig.auto()
# Cluster configuration - manual settings
cluster_config_manual = ClusterConfig(
workers=4, # 4 worker processes
cpu_affinity=True, # Pin to CPU cores for performance
max_restarts=10, # Restart worker up to 10 times
graceful_shutdown=True, # Enable graceful shutdown
shutdown_timeout=30, # 30 second grace period
)
# TLS configuration - example (requires actual certificates)
# Place your certificates in a 'certs/' directory relative to your project,
# or use platform-appropriate paths.
import os as _os
_certs_dir = _os.path.join(_os.path.dirname(_os.path.abspath(__file__)), "certs")
tls_config = TlsConfig(
cert_path=_os.path.join(_certs_dir, "server.crt"),
key_path=_os.path.join(_certs_dir, "server.key"),
ca_path=_os.path.join(_certs_dir, "ca.crt"), # For client cert verification
min_version="1.2",
max_version="1.3",
require_client_cert=False,
)
# HTTP/2 configuration - optimized for performance
http2_config = Http2Config(
max_concurrent_streams=250, # More concurrent streams
initial_window_size=2097152, # 2MB window
max_frame_size=32768, # 32KB frames
enable_push=False, # Server push disabled
)
# HTTP/3 configuration - QUIC settings
http3_config = Http3Config(
max_idle_timeout=60,
max_udp_payload_size=1350,
initial_max_streams_bidi=200,
enable_0rtt=False, # More secure without 0-RTT
)
# =============================================================================
# Routes
# =============================================================================
@app.get("/")
def home(request):
"""Home endpoint with cluster information."""
return {
"message": "Cello Cluster Mode Demo",
"version": "1.0.1",
"deployment": {
"mode": "cluster",
"features": [
"Multi-worker processes",
"Graceful shutdown",
"CPU affinity",
"Auto-restart on failure",
],
},
"endpoints": [
"GET / - This overview",
"GET /health - Health check",
"GET /ready - Readiness probe",
"GET /config - Configuration info",
"GET /worker-info - Worker process info",
],
}
@app.get("/health")
def health_check(request):
"""
Health check endpoint for load balancers.
Returns 200 if the service is running.
Use this for Kubernetes liveness probes.
"""
return {"status": "healthy", "service": "cello"}
@app.get("/ready")
def readiness_check(request):
"""
Readiness check endpoint.
Returns 200 if the service is ready to accept traffic.
Use this for Kubernetes readiness probes.
In a real app, check database connections, cache connections, etc.
"""
# Example readiness checks
checks = {
"database": True, # Would check actual DB connection
"cache": True, # Would check actual cache connection
"dependencies": True,
}
all_ready = all(checks.values())
if all_ready:
return {"status": "ready", "checks": checks}
else:
return Response.json(
{"status": "not_ready", "checks": checks},
status=503
)
@app.get("/config")
def show_config(request):
"""Display production configuration."""
return {
"timeouts": {
"read_header": production_timeouts.read_header_timeout,
"read_body": production_timeouts.read_body_timeout,
"write": production_timeouts.write_timeout,
"idle": production_timeouts.idle_timeout,
"handler": production_timeouts.handler_timeout,
},
"limits": {
"max_header_size": production_limits.max_header_size,
"max_body_size": production_limits.max_body_size,
"max_connections": production_limits.max_connections,
"max_requests_per_connection": production_limits.max_requests_per_connection,
},
"cluster": {
"workers": cluster_config.workers,
"cpu_affinity": cluster_config.cpu_affinity,
"graceful_shutdown": cluster_config.graceful_shutdown,
"shutdown_timeout": cluster_config.shutdown_timeout,
},
"http2": {
"max_concurrent_streams": http2_config.max_concurrent_streams,
"initial_window_size": http2_config.initial_window_size,
"max_frame_size": http2_config.max_frame_size,
"enable_push": http2_config.enable_push,
},
"http3": {
"max_idle_timeout": http3_config.max_idle_timeout,
"max_udp_payload_size": http3_config.max_udp_payload_size,
"initial_max_streams_bidi": http3_config.initial_max_streams_bidi,
"enable_0rtt": http3_config.enable_0rtt,
},
}
@app.get("/worker-info")
def worker_info(request):
"""Display worker process information."""
import os
def _safe_getppid() -> int:
"""Get parent PID, returning -1 on platforms where it's not reliable."""
try:
return os.getppid()
except (AttributeError, OSError):
return -1
return {
"worker": {
"pid": os.getpid(),
"ppid": _safe_getppid(),
},
"cluster_config": {
"workers": cluster_config.workers,
"cpu_affinity": cluster_config.cpu_affinity,
"max_restarts": cluster_config.max_restarts,
},
}
@app.get("/metrics")
def metrics(request):
"""
Prometheus-style metrics endpoint.
In production, you would integrate with actual metrics collection.
"""
import os
# Example metrics (would be real values in production)
metrics_text = f"""# HELP cello_requests_total Total number of requests
# TYPE cello_requests_total counter
cello_requests_total 1000
# HELP cello_request_duration_seconds Request duration
# TYPE cello_request_duration_seconds histogram
cello_request_duration_seconds_bucket{{le="0.01"}} 800
cello_request_duration_seconds_bucket{{le="0.1"}} 950
cello_request_duration_seconds_bucket{{le="1"}} 990
cello_request_duration_seconds_bucket{{le="+Inf"}} 1000
cello_request_duration_seconds_count 1000
cello_request_duration_seconds_sum 50
# HELP cello_active_connections Current active connections
# TYPE cello_active_connections gauge
cello_active_connections 42
# HELP cello_worker_pid Worker process ID
# TYPE cello_worker_pid gauge
cello_worker_pid {os.getpid()}
"""
response = Response.text(metrics_text)
response.set_header("Content-Type", "text/plain; version=0.0.4")
return response
# =============================================================================
# API Routes
# =============================================================================
@app.post("/api/data")
def handle_data(request):
"""Handle data uploads."""
try:
data = request.json()
return {
"received": True,
"size": len(str(data)),
"message": "Data processed successfully",
}
except Exception as e:
return Response.json({"error": str(e)}, status=400)
@app.get("/api/slow")
def slow_endpoint(request):
"""
Simulates a slow endpoint.
Useful for testing timeout configurations.
In production, configure appropriate handler timeouts.
"""
import time
time.sleep(2) # Simulate slow operation
return {"message": "Slow response completed", "delay_seconds": 2}
# =============================================================================
# Deployment Examples (as comments)
# =============================================================================
"""
Production Deployment Examples:
1. Basic Production Run:
python app.py --env production --workers 4 --port 8080
2. With Custom Host (for Docker):
python app.py --host 0.0.0.0 --port 8080 --workers 4
3. Systemd Service Example (/etc/systemd/system/cello.service):
[Unit]
Description=Cello Web Application
After=network.target
[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/opt/myapp
ExecStart=/opt/myapp/venv/bin/python app.py --env production --workers 4
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
4. Docker Compose Example:
version: '3.8'
services:
app:
build: .
ports:
- "8080:8080"
environment:
- CELLO_ENV=production
command: python app.py --host 0.0.0.0 --port 8080 --workers 4
deploy:
resources:
limits:
cpus: '2'
memory: 1G
5. Kubernetes Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: cello-app
spec:
replicas: 3
selector:
matchLabels:
app: cello-app
template:
metadata:
labels:
app: cello-app
spec:
containers:
- name: cello
image: myapp:latest
ports:
- containerPort: 8080
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
"""
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8000)
Running This Example¶
python examples/cluster_demo.py
# Test liveness probe:
curl http://127.0.0.1:8000/health
# Test readiness probe:
curl http://127.0.0.1:8000/ready
# Query configuration details:
curl http://127.0.0.1:8000/config
Key Concepts¶
- CPU Affinity: Pinning workers to specific CPU threads to prevent runtime thread swapping overhead.
- Graceful Timeouts: Specifying worker shutdown intervals gives concurrent operations a chance to finish cleanly before processes are killed.
- Probe separation: Checking process vitality with simple liveness handlers
/healthwhile validating connection logic at readiness probes/ready.