Skip to content

Workflows

Understanding workflows in Bifrost

Workflows are Python functions decorators to describe functionality to the platform. They:

  • Define and accept parameters that can be exposed forms or API calls
  • Execute business logic
  • Return results
  • Are discoverable without manual registration
from bifrost import workflow
@workflow
async def create_user(email: str, name: str):
"""Create a new user in the system."""
# Business logic here
return {"user_id": "123"}

The decorator automatically infers:

  • name: from the function name (create_user)
  • description: from the docstring
  • parameters: from the function signature (type hints determine field types)

Workflow lifecycle:

  1. Developer writes Python function with @workflow decorator
  2. Platform discovers it on startup and during execution
  3. Workflow appears in UI automatically
  4. Admins will usually create forms and tie them to workflows for execution
  5. Users execute via forms
  6. Results logged and displayed in realtime

Workflows auto-register - no manual configuration needed:

Workflow files are stored in S3 and automatically synced to the /workspace directory in containers.

/workspace
├── user_management.py
├── license_automation.py
└── reporting.py

Platform scans these files, finds @workflow decorators, registers them.

Parameters are automatically extracted from your function signature:

@workflow
async def create_user(
email: str, # Required string
name: str, # Required string
department: str = "IT", # Optional with default
active: bool = True # Optional boolean
):
"""Create a new user."""
pass

To use a data provider for a parameter, configure it in the form builder by selecting the data provider for the field. The form will populate the dropdown dynamically while the workflow receives the selected value:

@workflow
async def create_user(email: str, department: str):
"""Create a new user."""
pass

Every workflow receives ExecutionContext:

  • Current user info
  • Organization context

The choice between Async or Sync is something you can avoid thinking about for the most part.

If a workflow is enabled as an endpoint, it will be synchronous by default. This is because you almost always are using this to get an immediate result.

If the workflow is not an endpoint, it will be asynchronous by default. This is because most other interactions will be with forms which will update in realtime as the workflow runs.

Synchronous:

  • Runs immediately
  • Returns result directly
  • Best for quick operations (< 10s)
@workflow(
name="export_ninjaone_devices_csv",
execution_mode="sync"
)

Asynchronous (default):

  • Queued for background execution
  • Returns execution ID immediately
  • Best for long-running tasks (> 30s)
@workflow(
name="export_ninjaone_devices_csv",
execution_mode="async"
)

Scheduled:

  • Runs on cron schedule
  • Best for recurring tasks
@workflow(
name="export_ninjaone_devices_csv",
schedule="0 9 * * *"
)

Organize workflows by purpose, defined completely by the developer.

Workflows are organization-aware:

  • Access organization-specific data via context.org_id or context.organization
  • Use organization-scoped secrets and config
@workflow(name="get_org_data")
async def get_org_data(context):
# Automatically scoped to current org
data = await db.query(
"SELECT * FROM data WHERE org_id = ?",
context.org_id
)
return {"data": data}

Workflow: Backend Python function (business logic) Form: Frontend UI (user input collection)

Workflows can be:

  • Executed directly via API
  • Triggered by forms
  • Scheduled to run automatically
  • Called from other workflows

Forms always execute workflows, but workflows don’t require forms.

Workflows are stateless but:

  • Log progress with Python logging
  • Return results
  • Use external storage for state
  • Capture runtime varialbes for troubleshooting purposes
import logging
logger = logging.getLogger(__name__)
@workflow(name="process_items")
async def process_items(context, items: list):
logger.info(f"Processing {len(items)} items")
for i, item in enumerate(items):
logger.info(f"Processing item {i+1}/{len(items)}")
# Process item
return {"processed": len(items)}

Logs appear in execution detail view.

Workflows return error states (don’t raise exceptions):

@workflow(name="example")
async def example(context, param: str):
try:
result = await do_work(param)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}

This allows:

  • Partial success tracking
  • User-friendly error messages
  • Execution history

Workflows inherit user permissions:

  • Run as executing user
  • Access org-scoped resources only
  • Permission checks can be done via the context
@workflow(name="admin_task")
async def admin_task():
# Check permissions in the workflow
if not context.is_platform_admin:
return {"error": "Admin privileges required"}
# Perform admin operation
pass