Skip to content

Write Workflows

Complete guide to writing Bifrost workflows with decorators, parameters, and best practices

Every workflow follows this pattern:

from bifrost import workflow, param, ExecutionContext
import logging
logger = logging.getLogger(__name__)
@workflow(
name="unique_name", # Required: snake_case identifier
description="What it does", # Required: shown in UI
category="Category Name", # Optional: for grouping
timeout_seconds=300 # Optional: max execution time
)
@param("param1", type="string", required=True)
async def unique_name(context: ExecutionContext, param1: str):
"""Docstring explaining workflow purpose."""
logger.info(f"Processing {param1}")
return {"result": "success"}

name (str) : Unique identifier in snake_case

description (str) : User-facing description shown in UI

name (str) : Parameter name (must match function arg)

type (str) : string, int, float, bool, email, json, list

@param("username", type="string",
validation={
"min_length": 3,
"max_length": 50,
"pattern": r"^[a-zA-Z0-9_]+$"
}
)
@param("quantity", type="int",
validation={
"min": 1,
"max": 1000
}
)
@param("status", type="string",
validation={
"enum": ["active", "inactive", "pending"]
}
)

Access organization, user, and execution metadata:

async def my_workflow(context: ExecutionContext):
# Organization
org_id = context.org_id
org_name = context.org_name
# User
user_id = context.user_id
user_email = context.email
user_name = context.name
# Execution
execution_id = context.execution_id
is_admin = context.is_platform_admin
is_global = context.is_global_scope

Access config, secrets, OAuth, and files via SDK:

from bifrost import config, secrets, oauth, files
async def my_workflow(context: ExecutionContext):
# Configuration (async)
api_url = await config.get("api_url", default="https://api.example.com")
await config.set("api_url", "https://api.example.com")
# Secrets from Key Vault (async)
api_key = await secrets.get("bifrost-global-api-key")
await secrets.set("bifrost-global-api-key", "secret_value")
# OAuth tokens (async)
token_data = await oauth.get_token("microsoft")
if token_data:
access_token = token_data["access_token"]
# Use access_token for API calls
# File operations (synchronous)
files.write("data/output.txt", "content", location="workspace")
content = files.read("data/output.txt", location="workspace")

Only recommended for endpoint-enabled workflows, which will use this mode by default. Returns result immediately.

@workflow(execution_mode="sync", timeout_seconds=30)
async def quick_lookup(context: ExecutionContext, id: str):
result = await database.get(id)
return {"result": result}

Best for long-running operations (> 30 seconds) and default unless using a workflow as an endpoint. Queued for background execution and will return realtime updates in the UI.

@workflow(execution_mode="async", timeout_seconds=1800)
async def bulk_import(context: ExecutionContext, csv_url: str):
items = await fetch_csv(csv_url)
for item in items:
await import_item(item)
return {"imported": len(items)}

Run automatically on a schedule using cron expressions.

@workflow(
execution_mode="scheduled",
schedule="0 9 * * *", # Daily at 9 AM UTC
expose_in_forms=False # Hide from manual execution
)
async def daily_report(context: ExecutionContext):
report = await generate_report()
await send_report(report)
return {"status": "sent"}

Provide dynamic options for dropdowns:

  1. Create a data provider:

    from bifrost import data_provider
    @data_provider(
    name="get_departments",
    description="List departments",
    cache_ttl_seconds=600
    )
    async def get_departments(context: ExecutionContext):
    return [
    {"label": "Engineering", "value": "eng"},
    {"label": "Sales", "value": "sales"}
    ]
  2. Use in workflow:

    @workflow(name="assign_user")
    @param("department", type="string", data_provider="get_departments")
    async def assign_user(context: ExecutionContext, department: str):
    # department contains selected value ("eng" or "sales")
    return {"assigned_to": department}

Always return a dictionary:

return {"success": True, "result": data}

Exceptions are automatically caught and handled by the execution engine. You can optionally return {"success": False} to indicate partial failures:

import logging
from bifrost import workflow, param, ExecutionContext
logger = logging.getLogger(__name__)
@workflow(name="create_user", description="Create a new user")
@param("email", type="email", required=True)
async def create_user(context: ExecutionContext, email: str):
# Raised exceptions are automatically caught and logged
user = await create_user_in_system(email)
logger.info(f"Created user: {user.id}")
return {"user_id": user.id}
# Or indicate partial failure (execution status: COMPLETED_WITH_ERRORS)
# return {"success": False, "error": "User created but email failed"}

Use Python’s logging module for visibility into execution:

import logging
logger = logging.getLogger(__name__)
@workflow(name="process_data")
async def process_data(context: ExecutionContext):
logger.debug("Starting detailed processing...") # Hidden from users
logger.info("Processing item 1 of 10...") # Visible to users
logger.warning("Item 5 failed, continuing...") # Visible to users
logger.error("Critical failure detected") # Visible to users
  • Single Responsibility: One workflow, one task. Keep workflows focused and composable.
  • Validate Early: Check inputs before processing to fail fast and provide clear feedback.
  • Log for Users: Use logger.info() for user-facing progress updates. Use logger.debug() for developer debugging.
  • Let Exceptions Bubble: Raised exceptions are automatically handled - no need to catch and return error dicts.
  • Use Type Hints: Enable IDE autocomplete and error detection with proper typing.
  • Avoid Secrets in Returns: Never return credentials or PII in workflow results.
async def process_items(context: ExecutionContext, items: list):
results = []
errors = []
for item in items:
try:
result = await process_item(item)
results.append(result)
except Exception as e:
errors.append({"item": item, "error": str(e)})
return {
"processed": len(results),
"failed": len(errors),
"results": results,
"errors": errors
}
async def create_user_with_license(context: ExecutionContext, email: str, sku: str):
# Step 1
logger.info("Creating user...")
user = await create_user(email)
# Step 2
logger.info("Assigning license...")
await assign_license(user.id, sku)
return {"user_id": user.id, "license": sku}