Write Workflows
Complete guide to writing Bifrost workflows with decorators, parameters, and best practices
Basic Structure
Section titled “Basic Structure”Every workflow follows this pattern:
from bifrost import workflowimport logging
logger = logging.getLogger(__name__)
@workflow(category="Category Name")async def my_workflow(param1: str, param2: int = 10): """Docstring explaining workflow purpose - this becomes the description.""" logger.info(f"Processing {param1}") return {"result": "success"}The decorator automatically infers:
- name: from the function name (
my_workflow) - description: from the docstring
- parameters: from the function signature with type hints
Decorator Reference
Section titled “Decorator Reference”@workflow Options
Section titled “@workflow Options”All options are optional - the decorator infers name and description from your function.
name (str) : Auto-derived from function name if not provided
description (str) : Auto-derived from docstring if not provided
execution_mode (str)
: Auto-selected based on endpoint_enabled
category (str) : Grouping category (default: “General”)
tags (list) : List of tags for filtering
timeout_seconds (int) : Max execution time (default: 1800 = 30 min)
schedule (str) : Cron expression for scheduled runs
endpoint_enabled (bool) : Expose as HTTP endpoint (default: False)
Parameter Inference
Section titled “Parameter Inference”Parameters are automatically extracted from your function signature:
@workflowasync def create_user( email: str, # Required string, label: "Email" name: str, # Required string, label: "Name" department: str = "IT", # Optional with default active: bool = True # Optional boolean): """Create a new user.""" pass@param for Advanced Features
Section titled “@param for Advanced Features”Use @param only when you need data providers, validation, or help text:
@workflow@param("department", data_provider="get_departments")@param("email", validation={"pattern": r".*@company\.com$"}, help_text="Must be company email")async def create_user(email: str, department: str): """Create a new user.""" passValidation Rules
Section titled “Validation Rules”# String validation@param("username", validation={ "min_length": 3, "max_length": 50, "pattern": r"^[a-zA-Z0-9_]+$"})
# Number validation@param("quantity", validation={ "min": 1, "max": 1000})
# Enum options@param("status", validation={ "enum": ["active", "inactive", "pending"]})Execution Context
Section titled “Execution Context”Access organization, user, and execution metadata via the context proxy:
from bifrost import context
@workflowasync def my_workflow(name: str): # Organization org_id = context.org_id org_name = context.org_name
# User user_id = context.user_id email = context.email user_name = context.name
# Execution execution_id = context.execution_id is_admin = context.is_platform_admin is_global = context.is_global_scopeSDK Modules
Section titled “SDK Modules”Access config, OAuth, and files via SDK:
from bifrost import config, oauth, files
async def my_workflow(): """Example workflow using SDK modules.""" # Configuration (async) api_url = await config.get("api_url", default="https://api.example.com") await config.set("api_url", "https://api.example.com")
# Secrets (stored encrypted in database) api_key = await config.get("api_key") await config.set("api_key", "secret_value", is_secret=True)
# OAuth connection (async) conn = await oauth.get("microsoft") if conn: access_token = conn["access_token"] # Use access_token for API calls
# File operations (synchronous) files.write("data/output.txt", "content", location="workspace") content = files.read("data/output.txt", location="workspace")Execution Modes
Section titled “Execution Modes”Synchronous
Section titled “Synchronous”Only recommended for endpoint-enabled workflows, which will use this mode by default. Returns result immediately.
@workflow(execution_mode="sync", timeout_seconds=30)async def quick_lookup(context: ExecutionContext, id: str): result = await database.get(id) return {"result": result}Asynchronous
Section titled “Asynchronous”Best for long-running operations (> 30 seconds) and default unless using a workflow as an endpoint. Queued for background execution and will return realtime updates in the UI.
@workflow(execution_mode="async", timeout_seconds=1800)async def bulk_import(context: ExecutionContext, csv_url: str): items = await fetch_csv(csv_url) for item in items: await import_item(item) return {"imported": len(items)}Scheduled
Section titled “Scheduled”Run automatically on a schedule using cron expressions.
@workflow( execution_mode="scheduled", schedule="0 9 * * *", # Daily at 9 AM UTC expose_in_forms=False # Hide from manual execution)async def daily_report(context: ExecutionContext): report = await generate_report() await send_report(report) return {"status": "sent"}Data Providers
Section titled “Data Providers”Provide dynamic options for dropdowns:
-
Create a data provider:
from bifrost import data_provider@data_provider(name="get_departments",description="List departments",cache_ttl_seconds=600)async def get_departments():return [{"label": "Engineering", "value": "eng"},{"label": "Sales", "value": "sales"}] -
Use in workflow with
@param:@workflow@param("department", data_provider="get_departments")async def assign_user(department: str, user_email: str):"""Assign user to department."""# department contains selected value ("eng" or "sales")return {"assigned_to": department}
Return Values
Section titled “Return Values”Always return a dictionary:
return {"success": True, "result": data}# Workflow still succeeds, but indicates failurereturn {"success": False, "error": "User not found"}return { "user": {"id": "123", "email": "user@example.com"}, "items": [1, 2, 3]}Error Handling
Section titled “Error Handling”Exceptions are automatically caught and handled by the execution engine. You can optionally return {"success": False} to indicate partial failures:
import loggingfrom bifrost import workflow
logger = logging.getLogger(__name__)
@workflowasync def create_user(email: str, name: str): """Create a new user in the system.""" # Raised exceptions are automatically caught and logged user = await create_user_in_system(email, name) logger.info(f"Created user: {user.id}") return {"user_id": user.id}
# Or indicate partial failure (execution status: COMPLETED_WITH_ERRORS) # return {"success": False, "error": "User created but email failed"}Logging
Section titled “Logging”Use Python’s logging module for visibility into execution:
import loggingfrom bifrost import workflow
logger = logging.getLogger(__name__)
@workflowasync def process_data(items: list): """Process a list of data items.""" logger.debug("Starting detailed processing...") # Hidden from users logger.info(f"Processing {len(items)} items...") # Visible to users logger.warning("Item 5 failed, continuing...") # Visible to users logger.error("Critical failure detected") # Visible to usersBest Practices
Section titled “Best Practices”- Single Responsibility: One workflow, one task. Keep workflows focused and composable.
- Validate Early: Check inputs before processing to fail fast and provide clear feedback.
- Log for Users: Use
logger.info()for user-facing progress updates. Uselogger.debug()for developer debugging. - Let Exceptions Bubble: Raised exceptions are automatically handled - no need to catch and return error dicts.
- Use Type Hints: Enable IDE autocomplete and error detection with proper typing.
- Avoid Secrets in Returns: Never return credentials or PII in workflow results.
Common Patterns
Section titled “Common Patterns”List Processing
Section titled “List Processing”async def process_items(context: ExecutionContext, items: list): results = [] errors = []
for item in items: try: result = await process_item(item) results.append(result) except Exception as e: errors.append({"item": item, "error": str(e)})
return { "processed": len(results), "failed": len(errors), "results": results, "errors": errors }Chained Operations
Section titled “Chained Operations”async def create_user_with_license(context: ExecutionContext, email: str, sku: str): # Step 1 logger.info("Creating user...") user = await create_user(email)
# Step 2 logger.info("Assigning license...") await assign_license(user.id, sku)
return {"user_id": user.id, "license": sku}Next Steps
Section titled “Next Steps”- Using Decorators - Advanced decorator features
- Error Handling - Comprehensive error patterns
- SDK Reference - Full API documentation