bifrost Module Reference
Complete API reference for decorators, context, and SDK modules
Import Structure
Section titled “Import Structure”# Decorators and contextfrom bifrost import workflow, param, data_provider, ExecutionContext
# SDK modules (all async except files)from bifrost import config, secrets, oauth, files
# Types (optional)from bifrost import OrganizationDecorators
Section titled “Decorators”@workflow
Section titled “@workflow”Registers an async function as an executable workflow.
@workflow(name: str, description: str, **options)@workflow( name="create_user", description="Create new user")async def create_user(context: ExecutionContext, email: str): return {"user_id": "123"}@workflow( name="create_user", description="Create new user", category="User Management", timeout_seconds=60, schedule="0 9 * * *" # Daily at 9 AM)async def create_user(context: ExecutionContext, email: str): return {"user_id": "123"}Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Required | Unique identifier (snake_case) |
description | str | Required | User-facing description |
category | str | ”General” | Workflow category |
tags | list[str] | [] | Tags for filtering |
execution_mode | str | ”sync” | "sync" or "async" |
timeout_seconds | int | 300 | Max execution time |
schedule | str | None | Cron expression |
endpoint_enabled | bool | False | Expose as HTTP endpoint |
allowed_methods | list[str] | [“POST”] | Allowed HTTP methods |
public_endpoint | bool | False | Skip authentication |
@param
Section titled “@param”Defines a workflow parameter.
@param(name: str, type: str, **options)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Required | Parameter name (must match function arg) |
type | str | Required | Parameter type (see types below) |
label | str | Titlecased name | Display label |
required | bool | False | Is required |
default_value | Any | None | Default value |
help_text | str | None | Help tooltip |
validation | dict | None | Validation rules |
data_provider | str | None | Data provider name |
Available Types:
string- Text inputemail- Email with validationtextarea- Multi-line text
number- Numeric inputint- Integer inputfloat- Decimal number
boolean- Checkboxselect- Single-select dropdownmulti_select- Multiple-select dropdown
date- Date picker (YYYY-MM-DD)datetime- Date + time picker
json- JSON datalist- Array/listfile- File upload
Validation Rules:
| Rule | Applies To | Description |
|---|---|---|
min_length | string | Minimum length |
max_length | string | Maximum length |
pattern | string | Regex pattern |
min | number | Minimum value |
max | number | Maximum value |
enum | string | Allowed values |
Example:
@param( "email", type="email", required=True, help_text="User's email address", validation={"pattern": r"^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$"})@data_provider
Section titled “@data_provider”Registers a function that provides dynamic options.
@data_provider(name: str, description: str, **options)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Required | Unique identifier |
description | str | Required | What it provides |
category | str | ”General” | Category |
cache_ttl_seconds | int | 300 | Cache duration (0 = no cache) |
Return Format:
[ { "label": "Display Text", "value": "actual_value", "metadata": {} # Optional }]Example:
@data_provider( name="get_departments", description="List departments", cache_ttl_seconds=600)async def get_departments(context: ExecutionContext): return [ {"label": "Engineering", "value": "eng"}, {"label": "Sales", "value": "sales"} ]ExecutionContext
Section titled “ExecutionContext”Access to organization, user, and execution metadata.
Properties:
| Property | Type | Description |
|---|---|---|
user_id | str | Current user ID |
email | str | User email |
name | str | User display name |
org_id | str | None | Organization ID |
org_name | str | None | Organization name |
organization | Organization | None | Full org object |
scope | str | ”GLOBAL” or org ID |
execution_id | str | Unique execution ID |
is_platform_admin | bool | Is platform admin |
is_function_key | bool | Called via API key |
is_global_scope | bool | Executing in global scope |
Example:
async def my_workflow(context: ExecutionContext): org_id = context.org_id user_email = context.email is_admin = context.is_platform_adminSee ExecutionContext API for complete reference.
SDK Modules
Section titled “SDK Modules”config
Section titled “config”Configuration management with org scoping.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
get(key, default=None) | key: str, default: Any | Any | Get config value |
set(key, value) | key: str, value: Any | None | Set config value |
list() | - | dict[str, Any] | List all configs |
delete(key) | key: str | bool | Delete config (returns success) |
from bifrost import config
# Get with defaultapi_url = await config.get("api_url", default="https://api.example.com")
# Check if exists firstif await config.get("api_key"): api_key = await config.get("api_key")from bifrost import config
# Set configuration valueawait config.set("api_url", "https://new-api.example.com")from bifrost import config
# Get all configurations for current orgall_configs = await config.list()for key, value in all_configs.items(): print(f"{key}: {value}")secrets
Section titled “secrets”Azure Key Vault secret management.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
get(key) | key: str | str | None | Get secret value |
set(key, value) | key: str, value: str | None | Store secret |
list() | - | list[str] | List secret keys (not values) |
delete(key) | key: str | bool | Delete secret (returns success) |
Scoping: Secrets automatically scoped as {org_id}--{key} in Key Vault.
from bifrost import secrets
# Get secret from Key Vaultapi_key = await secrets.get("api_key")
if api_key: # Use in API call headers = {"Authorization": f"Bearer {api_key}"}from bifrost import secrets
# Store secret in Key Vaultawait secrets.set("api_key", "new_secret_value")from bifrost import secrets
# List secret keys (values not returned for security)secret_keys = await secrets.list()for key in secret_keys: print(f"Secret exists: {key}")OAuth token management.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
get_token(provider) | provider: str | dict | None | Get OAuth token data |
set_token(provider, token_data) | provider: str, token_data: dict | None | Store token |
list_providers() | - | list[str] | List provider names |
delete_token(provider) | provider: str | bool | Delete token (returns success) |
refresh_token(provider) | provider: str | dict | Refresh and return new token |
Token Data Structure:
The get_token() method returns a dictionary with:
{ "access_token": str, # Current access token "refresh_token": str, # Refresh token (if available) "expires_at": int, # Unix timestamp "token_type": str, # Usually "Bearer" "scope": str # Granted scopes}from bifrost import oauthimport aiohttp
# Get OAuth tokentoken_data = await oauth.get_token("microsoft")if not token_data: raise ValueError("Microsoft OAuth not configured")
# Use in API callurl = "https://graph.microsoft.com/v1.0/me"headers = {"Authorization": f"Bearer {token_data['access_token']}"}
async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as resp: return await resp.json()from bifrost import oauth
# Refresh token if expiredtoken_data = await oauth.get_token("microsoft")
if token_data and token_data["expires_at"] < time.time(): # Token expired, refresh it token_data = await oauth.refresh_token("microsoft")from bifrost import oauth
# List all configured OAuth providersproviders = await oauth.list_providers()for provider in providers: print(f"Provider configured: {provider}")File operations (synchronous).
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
read(path) | path: str | str | Read file as text |
read_bytes(path) | path: str | bytes | Read file as bytes |
write(path, content) | path: str, content: str | None | Write text file |
write_bytes(path, content) | path: str, content: bytes | None | Write binary file |
list(directory) | directory: str | list[str] | List directory contents |
delete(path) | path: str | None | Delete file |
exists(path) | path: str | bool | Check if file exists |
Location Parameter:
All functions accept optional location parameter:
"workspace"(default) - Persistent workspace files"temp"- Temporary files cleared after execution
from bifrost import files
# Read text file (no await!)content = files.read("data/config.json")
# Read binary fileimage_bytes = files.read_bytes("images/logo.png")from bifrost import files
# Write CSV filefiles.write("data/output.csv", "id,name\n1,Alice\n")
# Write binary filefiles.write_bytes("images/chart.png", image_data)from bifrost import files
# List directoryitems = files.list("data/")for item in items: print(f"Found: {item}")
# Check if existsif files.exists("data/config.json"): config = files.read("data/config.json")from bifrost import files
# Write to temp locationfiles.write("temp_report.html", html_content, location="temp")
# Temp files are automatically cleaned up after executionComplete Example
Section titled “Complete Example”from bifrost import ( workflow, param, data_provider, ExecutionContext, config, secrets, oauth, files)import aiohttpimport logging
logger = logging.getLogger(__name__)
# Data provider@data_provider( name="get_departments", description="List departments", cache_ttl_seconds=600)async def get_departments(context: ExecutionContext): return [ {"label": "Engineering", "value": "eng"}, {"label": "Sales", "value": "sales"} ]
# Workflow@workflow( name="create_user", description="Create user in system", category="User Management", timeout_seconds=60)@param("email", type="email", required=True)@param("name", type="string", required=True)@param("department", type="string", data_provider="get_departments")async def create_user( context: ExecutionContext, email: str, name: str, department: str): """Create a new user with department assignment."""
logger.info("Creating user", extra={ "email": email, "department": department })
try: # Get config and secrets (both async!) api_url = await config.get("api_url") api_key = await secrets.get("api_key")
# Call external API headers = {"Authorization": f"Bearer {api_key}"} payload = {"email": email, "name": name, "department": department}
async with aiohttp.ClientSession() as session: async with session.post( f"{api_url}/users", headers=headers, json=payload ) as resp: if resp.status == 201: user = await resp.json()
# Write to file (synchronous!) files.write( f"users/{user['id']}.json", json.dumps(user, indent=2) )
logger.info(f"User created: {user['id']}") return {"success": True, "user_id": user["id"]} else: error = await resp.text() logger.error(f"API error: {error}") return {"success": False, "error": error}
except Exception as e: logger.error(f"Failed: {str(e)}", exc_info=True) return {"success": False, "error": str(e)}Type Definitions
Section titled “Type Definitions”Organization
Section titled “Organization”@dataclassclass Organization: id: str name: str is_active: boolExecutionStatus:
PENDING- Queued for executionRUNNING- Currently executingSUCCESS- Completed successfullyFAILED- Failed with error
Best Practices
Section titled “Best Practices”- Type Hints: Always use
ExecutionContexttype hint - Async/Await: Remember config, secrets, and oauth are async
- Files Are Sync: No await for files module
- Logging: Use
logging.getLogger(__name__) - Error Handling: Return error states, log with context
- Secrets: Never log or return secret values
- Caching: Set appropriate cache TTL for data providers
Next Steps
Section titled “Next Steps”- Context API Reference - Detailed ExecutionContext properties
- Decorators Reference - Advanced decorator usage
- Writing Workflows Guide - Practical guide