Error Handling
Graceful error handling patterns for Bifrost workflows
Philosophy
Section titled “Philosophy”The execution engine automatically catches and handles exceptions. You have two patterns:
- Raise exceptions for failures → execution status:
FAILED - Return
{"success": False}for partial failures → execution status:COMPLETED_WITH_ERRORS
Basic Patterns
Section titled “Basic Patterns”from bifrost import workflow, ExecutionContextimport logging
logger = logging.getLogger(__name__)
@workflow(name="create_user", description="Create user")async def create_user(context: ExecutionContext, email: str): # Exception automatically caught and logged user = await api.create_user(email) logger.info(f"Created user: {user.id}") return {"user_id": user.id}@workflow(name="bulk_create", description="Create multiple users")async def bulk_create(context: ExecutionContext, emails: list): results = {"created": [], "failed": []}
for email in emails: try: user = await api.create_user(email) results["created"].append(user.id) except Exception as e: results["failed"].append({"email": email, "error": str(e)})
# Indicate partial failure return { "success": len(results["failed"]) == 0, "created": results["created"], "failed": results["failed"] }Validate Early
Section titled “Validate Early”Fail fast with input validation before processing:
@workflow(name="send_email")@param("email", type="email", required=True)async def send_email(context, email: str, subject: str): # Validate inputs if not subject: raise ValueError("Subject is required")
# Proceed with valid inputs await send(email, subject) return {"sent": True}Handle Specific Exceptions
Section titled “Handle Specific Exceptions”Provide context-specific error handling:
import aiohttpfrom aiohttp import ClientError, ClientResponseError
@workflow(name="fetch_data")async def fetch_data(context, url: str): try: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: resp.raise_for_status() return {"data": await resp.json()}
except ClientResponseError as e: if e.status == 404: raise ValueError(f"Resource not found: {url}") elif e.status >= 500: raise RuntimeError(f"Server error: {e.status}") raise
except ClientError: raise ConnectionError("Network error occurred")Partial Success Tracking
Section titled “Partial Success Tracking”Track progress in bulk operations:
@workflow(name="bulk_process")async def bulk_process(context, items: list): results = {"processed": [], "failed": []}
# Process each item and track results for item in items: try: result = await api.process(item) results["processed"].append(result) logger.info(f"Processed item {item.id}") except Exception as e: logger.error(f"Failed item {item.id}: {e}") results["failed"].append({ "item_id": item.id, "error": str(e) })
# Return comprehensive results return { "success": len(results["failed"]) == 0, "total": len(items), "processed": len(results["processed"]), "failed": len(results["failed"]), "details": results }Retry with Backoff
Section titled “Retry with Backoff”For transient errors, use the built-in retry policy or implement custom logic:
@workflow( name="api_call", retry_policy={ "max_attempts": 3, "backoff_seconds": 5 })async def api_call(context, endpoint: str): # Automatic retry on failure return await api.call(endpoint)import asyncio
@workflow(name="custom_retry")async def custom_retry(context, endpoint: str): max_retries = 3 backoff = 1
for attempt in range(max_retries): try: result = await api.call(endpoint) return {"success": True, "data": result}
except TransientError as e: if attempt < max_retries - 1: logger.warning(f"Attempt {attempt + 1} failed, retrying...") await asyncio.sleep(backoff) backoff *= 2 else: raise RuntimeError(f"Max retries exceeded: {e}")
except PermanentError: raise # Don't retry permanent errorsResource Cleanup
Section titled “Resource Cleanup”Always clean up resources with try/finally:
@workflow(name="process_with_lock")async def process_with_lock(context, resource_id: str): lock = None try: lock = await acquire_lock(resource_id) result = await process(resource_id) return {"result": result}
finally: if lock: await release_lock(lock)OAuth Token Handling
Section titled “OAuth Token Handling”OAuth tokens are automatically refreshed by the engine:
from bifrost import oauth
@workflow(name="graph_call")async def graph_call(context, user_id: str): # Get OAuth token (automatically refreshes if expired) token_data = await oauth.get_token("microsoft")
if not token_data: raise ValueError("Microsoft OAuth not configured")
headers = {"Authorization": f"Bearer {token_data['access_token']}"} user = await graph_api.get_user(user_id, headers=headers)
return {"user": user}Logging Best Practices
Section titled “Logging Best Practices”Log at appropriate levels for visibility:
logger = logging.getLogger(__name__)
@workflow(name="example")async def example(context, param: str): # Debug: Hidden from users, for developers only logger.debug(f"Processing param: {param}")
# Info: Visible to users, progress updates logger.info(f"Started processing for org {context.org_id}")
try: result = await do_work(param) logger.info(f"Completed successfully") return {"result": result}
except Exception as e: # Error: Visible to users, includes details logger.error(f"Failed to process: {e}", exc_info=True) raiseError Response Format
Section titled “Error Response Format”Use consistent structure for partial failures:
{ "success": True, "user_id": "123", "email": "user@example.com"}{ "success": False, "message": "8 of 10 users created", "created": [{"id": "1"}, {"id": "2"}], "failed": [ {"email": "invalid", "error": "Invalid format"} ]}Common Patterns
Section titled “Common Patterns”Validation Errors
Section titled “Validation Errors”# ✅ Raise for invalid inputif not user_id or not isinstance(user_id, str): raise ValueError("user_id must be a non-empty string")
# ✅ Or use @param validation@param("user_id", type="string", required=True, validation={"min_length": 1})External API Calls
Section titled “External API Calls”# ✅ Let exceptions bubble for API errorstry: result = await external_api.call()except ApiError as e: # Log context and re-raise logger.error(f"API call failed: {e}", extra={"endpoint": endpoint}) raiseResource Not Found
Section titled “Resource Not Found”# ✅ Raise for missing resourcesuser = await get_user(user_id)if not user: raise ValueError(f"User not found: {user_id}")Platform Bugs
Section titled “Platform Bugs”# ✅ Raise for unexpected conditionsif context is None: raise RuntimeError("ExecutionContext is None - platform bug!")Testing Error Paths
Section titled “Testing Error Paths”Test both success and failure scenarios:
import pytest
async def test_create_user_success(): result = await create_user(context, "test@example.com") assert result["user_id"]
async def test_create_user_invalid_email(): with pytest.raises(ValueError, match="Invalid email"): await create_user(context, "invalid")
async def test_bulk_create_partial_failure(): result = await bulk_create(context, ["valid@test.com", "invalid"]) assert result["success"] is False assert len(result["created"]) == 1 assert len(result["failed"]) == 1When to Use Each Pattern
Section titled “When to Use Each Pattern”- Raise exceptions: Total failure, invalid input, external API errors
- Return
{"success": False}: Partial success in bulk operations - Both: Log errors, re-raise for critical issues
Next Steps
Section titled “Next Steps”- Writing Workflows - Complete workflow guide
- Using Decorators - Advanced patterns
- Logging Guide - Log visibility