Skip to content

Use Decorators

Advanced decorator patterns and best practices

Bifrost uses decorators to define workflows:

  • @workflow - Registers executable workflow (all options optional)
  • @param - Adds advanced parameter features (only when needed)
  • @data_provider - Provides dynamic options for dropdowns

The simplest workflow needs just a decorator and docstring:

from bifrost import workflow
@workflow
async def send_email(recipient: str, subject: str, body: str = ""):
"""Send an email message."""
await email_service.send(recipient, subject, body)
return {"sent": True}

This automatically:

  • Sets name to "send_email" (from function name)
  • Sets description to "Send an email message." (from docstring)
  • Extracts parameters with types and defaults from signature

Run workflows automatically on a schedule:

@workflow(schedule="0 2 * * *") # Daily at 2 AM UTC
async def daily_cleanup():
"""Daily cleanup task."""
await cleanup_old_records()
return {"cleaned": True}

Cron Expressions:

  • 0 9 * * * - Daily at 9 AM
  • 0 */6 * * * - Every 6 hours
  • 0 9 * * 1 - Every Monday at 9 AM
  • 0 0 1 * * - First day of month at midnight

Expose workflows as HTTP endpoints for webhooks:

@workflow(
endpoint_enabled=True,
allowed_methods=["POST"],
public_endpoint=True # Skip authentication
)
async def process_webhook(payload: dict):
"""Process incoming webhook."""
# Available at: POST /api/endpoints/process_webhook
return {"status": "processed"}

endpoint_enabled (bool) : Expose at /api/endpoints/{name} (default: False)

allowed_methods (list) : HTTP methods allowed (default: ["POST"])

public_endpoint (bool) : Skip authentication for webhooks (default: False)

disable_global_key (bool) : Only workflow-specific keys work (default: False)

Automatically retry failed workflows:

@workflow(
retry_policy={
"max_attempts": 3,
"backoff_seconds": 5
}
)
async def api_call():
"""Call external API with retries."""
# Will retry up to 3 times with 5 second delay
pass

Execution mode is auto-selected based on endpoint_enabled:

# Auto-selects sync (endpoint needs immediate response)
@workflow(endpoint_enabled=True)
async def quick_lookup(id: str):
"""Quick lookup endpoint."""
pass
# Auto-selects async (background execution)
@workflow
async def bulk_import(csv_url: str):
"""Import data from CSV."""
pass
# Explicit override when needed
@workflow(
endpoint_enabled=True,
execution_mode="async" # Returns 202 + execution_id
)
async def long_webhook(payload: dict):
"""Webhook that triggers long process."""
pass

Parameters are automatically extracted from your function signature:

@workflow
async def create_user(
email: str, # Required string
name: str, # Required string
department: str = "IT", # Optional with default
active: bool = True, # Optional boolean
tags: list | None = None # Optional list
):
"""Create a new user."""
pass

Type mapping:

  • str → string input
  • int → integer input
  • float → decimal input
  • bool → checkbox
  • dict → JSON editor
  • list → array input

Labels are auto-generated from parameter names:

  • first_name → “First Name”
  • userEmail → “User Email”

Use @param only when you need features beyond type inference:

@data_provider(name="get_departments", description="List departments")
async def get_departments():
return [
{"label": "Engineering", "value": "eng"},
{"label": "Sales", "value": "sales"}
]
@workflow
@param("department", data_provider="get_departments")
async def assign_user(email: str, department: str):
"""Assign user to department."""
pass
@data_provider(name="get_users_by_dept", description="Users by department")
async def get_users_by_dept(department: str):
users = await fetch_users(department)
return [{"label": u.name, "value": u.id} for u in users]
@workflow
@param("department", data_provider="get_departments")
@param("user", data_provider="get_users_by_dept")
async def assign_task(department: str, user: str, task: str):
"""Assign task to user. User dropdown updates based on department."""
pass
@workflow
@param("password", validation={
"min_length": 12,
"max_length": 128,
"pattern": r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]+$"
}, help_text="Min 12 chars with uppercase, lowercase, number, and special char")
async def set_password(user_id: str, password: str):
"""Set user password with validation."""
pass
@workflow
@param("message", type="textarea")
@param("contact", type="email")
async def send_message(contact: str, message: str):
"""Send a message."""
pass
@data_provider(
name="get_departments",
description="List departments",
cache_ttl_seconds=3600 # Cache for 1 hour
)
async def get_departments():
# Expensive query, cache results
return await fetch_departments()
@data_provider(name="get_licenses", description="Available licenses")
async def get_licenses():
return [
{
"label": "Microsoft 365 E3",
"value": "SPE_E3",
"metadata": {
"group": "Microsoft",
"available": 50
}
},
{
"label": "Microsoft 365 E5",
"value": "SPE_E5",
"metadata": {
"group": "Microsoft",
"available": 10
}
}
]
  • Use Descriptive Function Names: create_user_in_m365 not proc1
  • Write Clear Docstrings: They become the workflow description
  • Use Type Hints: Required for parameter extraction
  • Set Appropriate Timeouts: Match expected execution time (default: 1800s)
  • Cache Wisely: Balance freshness vs. performance
  • Use @param Sparingly: Only for data providers, validation, or help text

Wrong

@workflow
async def example():
pass # No description in UI!

Correct

@workflow
async def example():
"""Process example data."""
pass

Wrong

@workflow
async def example(name, count): # Parameters won't be extracted!
"""Example workflow."""
pass

Correct

@workflow
async def example(name: str, count: int = 1):
"""Example workflow."""
pass

Wrong

@workflow(execution_mode="sync")
async def bulk_import(data: list): # Will timeout
"""Import thousands of records."""
pass

Correct

@workflow # Auto-selects async
async def bulk_import(data: list):
"""Import thousands of records."""
pass