Skip to content

Error Handling

Graceful error handling patterns for Bifrost workflows

The execution engine automatically catches and handles exceptions. You have two patterns:

  1. Raise exceptions for failures → execution status: FAILED
  2. Return {"success": False} for partial failures → execution status: COMPLETED_WITH_ERRORS
from bifrost import workflow, ExecutionContext
import logging
logger = logging.getLogger(__name__)
@workflow(name="create_user", description="Create user")
async def create_user(context: ExecutionContext, email: str):
# Exception automatically caught and logged
user = await api.create_user(email)
logger.info(f"Created user: {user.id}")
return {"user_id": user.id}

Fail fast with input validation before processing:

@workflow(name="send_email")
@param("email", type="email", required=True)
async def send_email(context, email: str, subject: str):
# Validate inputs
if not subject:
raise ValueError("Subject is required")
# Proceed with valid inputs
await send(email, subject)
return {"sent": True}

Provide context-specific error handling:

import aiohttp
from aiohttp import ClientError, ClientResponseError
@workflow(name="fetch_data")
async def fetch_data(context, url: str):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
return {"data": await resp.json()}
except ClientResponseError as e:
if e.status == 404:
raise ValueError(f"Resource not found: {url}")
elif e.status >= 500:
raise RuntimeError(f"Server error: {e.status}")
raise
except ClientError:
raise ConnectionError("Network error occurred")

Track progress in bulk operations:

@workflow(name="bulk_process")
async def bulk_process(context, items: list):
results = {"processed": [], "failed": []}
# Process each item and track results
for item in items:
try:
result = await api.process(item)
results["processed"].append(result)
logger.info(f"Processed item {item.id}")
except Exception as e:
logger.error(f"Failed item {item.id}: {e}")
results["failed"].append({
"item_id": item.id,
"error": str(e)
})
# Return comprehensive results
return {
"success": len(results["failed"]) == 0,
"total": len(items),
"processed": len(results["processed"]),
"failed": len(results["failed"]),
"details": results
}

For transient errors, use the built-in retry policy or implement custom logic:

@workflow(
name="api_call",
retry_policy={
"max_attempts": 3,
"backoff_seconds": 5
}
)
async def api_call(context, endpoint: str):
# Automatic retry on failure
return await api.call(endpoint)

Always clean up resources with try/finally:

@workflow(name="process_with_lock")
async def process_with_lock(context, resource_id: str):
lock = None
try:
lock = await acquire_lock(resource_id)
result = await process(resource_id)
return {"result": result}
finally:
if lock:
await release_lock(lock)

OAuth tokens are automatically refreshed by the engine:

from bifrost import oauth
@workflow(name="graph_call")
async def graph_call(context, user_id: str):
# Get OAuth token (automatically refreshes if expired)
token_data = await oauth.get_token("microsoft")
if not token_data:
raise ValueError("Microsoft OAuth not configured")
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
user = await graph_api.get_user(user_id, headers=headers)
return {"user": user}

Log at appropriate levels for visibility:

logger = logging.getLogger(__name__)
@workflow(name="example")
async def example(context, param: str):
# Debug: Hidden from users, for developers only
logger.debug(f"Processing param: {param}")
# Info: Visible to users, progress updates
logger.info(f"Started processing for org {context.org_id}")
try:
result = await do_work(param)
logger.info(f"Completed successfully")
return {"result": result}
except Exception as e:
# Error: Visible to users, includes details
logger.error(f"Failed to process: {e}", exc_info=True)
raise

Use consistent structure for partial failures:

{
"success": True,
"user_id": "123",
"email": "user@example.com"
}
# ✅ Raise for invalid input
if not user_id or not isinstance(user_id, str):
raise ValueError("user_id must be a non-empty string")
# ✅ Or use @param validation
@param("user_id", type="string", required=True, validation={"min_length": 1})
# ✅ Let exceptions bubble for API errors
try:
result = await external_api.call()
except ApiError as e:
# Log context and re-raise
logger.error(f"API call failed: {e}", extra={"endpoint": endpoint})
raise
# ✅ Raise for missing resources
user = await get_user(user_id)
if not user:
raise ValueError(f"User not found: {user_id}")
# ✅ Raise for unexpected conditions
if context is None:
raise RuntimeError("ExecutionContext is None - platform bug!")

Test both success and failure scenarios:

import pytest
async def test_create_user_success():
result = await create_user(context, "test@example.com")
assert result["user_id"]
async def test_create_user_invalid_email():
with pytest.raises(ValueError, match="Invalid email"):
await create_user(context, "invalid")
async def test_bulk_create_partial_failure():
result = await bulk_create(context, ["valid@test.com", "invalid"])
assert result["success"] is False
assert len(result["created"]) == 1
assert len(result["failed"]) == 1
  • Raise exceptions: Total failure, invalid input, external API errors
  • Return {"success": False}: Partial success in bulk operations
  • Both: Log errors, re-raise for critical issues