Skip to content

Use Decorators

Advanced decorator patterns and best practices

Bifrost uses three decorators to define workflows:

  • @workflow - Registers executable workflow
  • @param - Defines input parameters
  • @data_provider - Provides dynamic options

Run workflows automatically on a schedule:

@workflow(
name="daily_cleanup",
description="Daily cleanup task",
schedule="0 2 * * *" # Daily at 2 AM UTC
)
async def daily_cleanup(context):
# Runs automatically
pass

Cron Expressions:

  • 0 9 * * * - Daily at 9 AM
  • 0 */6 * * * - Every 6 hours
  • 0 9 * * 1 - Every Monday at 9 AM
  • 0 0 1 * * - First day of month at midnight

Expose workflows as HTTP endpoints for webhooks:

@workflow(
name="process_webhook",
description="Process incoming webhook",
endpoint_enabled=True,
allowed_methods=["POST"],
public_endpoint=True # Skip authentication
)
async def process_webhook(context, payload: dict):
# Available at: POST /api/endpoints/process_webhook
return {"status": "processed"}

endpoint_enabled (bool) : Expose at /api/endpoints/{name} (default: False)

allowed_methods (list) : HTTP methods allowed (default: ["POST"])

public_endpoint (bool) : Skip authentication for webhooks (default: False)

disable_global_key (bool) : Only workflow-specific keys work (default: False)

Automatically retry failed workflows:

@workflow(
name="api_call",
description="Call external API with retries",
retry_policy={
"max_attempts": 3,
"backoff_seconds": 5
}
)
async def api_call(context):
# Will retry up to 3 times with 5 second delay
pass

Control how workflows execute:

# Sync: Returns result immediately (best for endpoints)
@workflow(
name="quick_lookup",
execution_mode="sync",
endpoint_enabled=True
)
# Async: Queued for background execution (best for forms)
@workflow(
name="bulk_import",
execution_mode="async"
)
# Auto: Defaults based on endpoint_enabled
@workflow(
name="auto_mode"
# execution_mode auto-selects:
# - "sync" if endpoint_enabled=True
# - "async" if endpoint_enabled=False
)

Use data providers with parameters:

@data_provider(name="get_users_by_dept")
@param("department", type="string", required=True)
async def get_users_by_dept(context, department: str):
# Fetch users for specific department
users = await fetch_users(department)
return [{"label": u.name, "value": u.id} for u in users]
@workflow(name="assign_to_user")
@param("department", type="string", data_provider="get_departments")
@param("user", type="string", data_provider="get_users_by_dept")
async def assign_to_user(context, department: str, user: str):
# User dropdown populates based on selected department
pass

Combine multiple validation rules:

@param(
"password",
type="string",
required=True,
validation={
"min_length": 12,
"max_length": 128,
"pattern": r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]+$"
},
help_text="Min 12 chars with uppercase, lowercase, number, and special char"
)

Set defaults based on context:

@param(
"assignee",
type="string",
default_value=None # Set in workflow logic
)
async def create_ticket(context, assignee: str = None):
# Default to current user if not specified
if not assignee:
assignee = context.user_id
pass

Control cache behavior:

@data_provider(
name="get_departments",
cache_ttl_seconds=3600 # Cache for 1 hour
)
async def get_departments(context):
# Expensive query, cache results
return await fetch_departments()

Create options with metadata for grouping:

@data_provider(name="get_apps_by_category")
async def get_apps_by_category(context):
return [
{
"label": "Microsoft 365",
"value": "m365",
"metadata": {
"group": "Productivity",
"icon": "microsoft"
}
},
{
"label": "Slack",
"value": "slack",
"metadata": {
"group": "Communication",
"icon": "slack"
}
}
]

Filter options based on user/org:

@data_provider(name="get_available_licenses")
async def get_available_licenses(context):
# Only show licenses available to this org
licenses = await fetch_org_licenses(context.org_id)
# Filter by what user can assign
if not context.is_platform_admin:
licenses = [l for l in licenses if not l.requires_admin]
return [
{"label": l.name, "value": l.sku}
for l in licenses
]

Parameters are applied bottom-to-top, so decorators appear in reverse order:

@workflow(name="example")
@param("third", type="string") # Shows FIRST in form
@param("second", type="string") # Shows SECOND
@param("first", type="string") # Shows LAST
async def example(context, first, second, third):
pass
  • Use Descriptive Names: create_user_in_m365 not proc1
  • Set Appropriate Timeouts: Match expected execution time (default: 1800s)
  • Cache Wisely: Balance freshness vs. performance (default: 300s)
  • Validate Early: Use @param validation before workflow runs
  • Document Parameters: Use help_text for user guidance
  • Test Data Providers: Verify correct format with label and value
  • Use Retry Policies: For unreliable external APIs
  • Schedule in UTC: Avoid timezone confusion

Wrong

@workflow() # Missing name and description

Correct

@workflow(name="example", description="Example workflow")

Wrong

@param("email", type="text") # No such type

Correct

@param("email", type="email") # Valid: string, int, bool, float, json, list, email

Wrong

@workflow(
execution_mode="sync",
timeout_seconds=1800
)
async def bulk_import(): # Will timeout/block users
pass

Correct

@workflow(
execution_mode="async",
timeout_seconds=1800
)
async def bulk_import(): # Runs in background
pass