bifrost Module Reference
Complete API reference for decorators, context, and SDK modules
Import Structure
Section titled “Import Structure”# Decorators and context proxyfrom bifrost import workflow, data_provider, context
# SDK modules (all async)from bifrost import ai, knowledge, config, integrations, filesfrom bifrost import organizations, users, roles, forms, executions, workflows
# Types (optional, for type hints)from bifrost import ( ExecutionContext, Organization, Role, UserPublic, FormPublic, WorkflowMetadata, WorkflowExecution, IntegrationData, OAuthCredentials, ConfigData, AIResponse, AIStreamChunk, KnowledgeDocument, NamespaceInfo,)
# Enumsfrom bifrost import ExecutionStatus, ConfigType, FormFieldType
# Errorsfrom bifrost import UserError, WorkflowError, ValidationError, IntegrationError, ConfigurationErrorDecorators
Section titled “Decorators”@workflow
Section titled “@workflow”Registers an async function as an executable workflow. All parameters are optional - the decorator auto-infers name, description, and parameters from your function.
@workflow(**options)@workflowasync def create_user(email: str, name: str): """Create a new user in the system.""" return {"user_id": "123"}@workflow( category="User Management", timeout_seconds=60, schedule="0 9 * * *" # Daily at 9 AM)async def create_user(email: str, name: str): """Create a new user in the system.""" return {"user_id": "123"}Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Auto | Auto-derived from function name |
description | str | Auto | Auto-derived from docstring |
category | str | ”General” | Workflow category |
tags | list[str] | [] | Tags for filtering |
execution_mode | str | Auto | Auto-selects based on endpoint_enabled |
timeout_seconds | int | 1800 | Max execution time (30 min default) |
schedule | str | None | Cron expression |
endpoint_enabled | bool | False | Expose as HTTP endpoint |
allowed_methods | list[str] | [“POST”] | Allowed HTTP methods |
public_endpoint | bool | False | Skip authentication |
@data_provider
Section titled “@data_provider”Registers a function that provides dynamic options.
@data_provider(name: str | None = None, description: str | None = None)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | Unique identifier (auto-derived from function name if not provided) |
description | str | None | What it provides (auto-derived from docstring if not provided) |
Return Format:
[ { "label": "Display Text", "value": "actual_value", "metadata": {} # Optional }]Example:
from bifrost import data_provider, context
@data_provider( name="get_departments", description="List departments")async def get_departments(): """List available departments.""" # Access context via proxy if needed org_id = context.org_id return [ {"label": "Engineering", "value": "eng"}, {"label": "Sales", "value": "sales"} ]Context
Section titled “Context”Access organization, user, and execution metadata via the context proxy. No need to pass it as a parameter.
from bifrost import context
@workflowasync def my_workflow(name: str): # Access context properties directly org_id = context.org_id user_email = context.email is_admin = context.is_platform_adminProperties:
| Property | Type | Description |
|---|---|---|
user_id | str | Current user ID |
email | str | User email |
name | str | User display name |
org_id | str | None | Organization ID |
org_name | str | None | Organization name |
organization | Organization | None | Full org object |
scope | str | ”GLOBAL” or org ID |
execution_id | str | Unique execution ID |
is_platform_admin | bool | Is platform admin |
is_function_key | bool | Called via API key |
is_global_scope | bool | Executing in global scope |
See Context API for complete reference.
SDK Modules
Section titled “SDK Modules”LLM completions and streaming with optional RAG support.
from bifrost import ai
# Simple completionresponse = await ai.complete("Summarize this text")print(response.content)
# Structured outputfrom pydantic import BaseModelclass Analysis(BaseModel): sentiment: str score: float
result = await ai.complete("Analyze: Great!", response_format=Analysis)
# With knowledge base (RAG)response = await ai.complete("What is the policy?", knowledge=["policies"])
# Streamingasync for chunk in ai.stream("Write a story"): print(chunk.content, end="")See AI Module Reference for full API.
knowledge
Section titled “knowledge”Vector storage and semantic search for RAG.
from bifrost import knowledge
# Store documentdoc_id = await knowledge.store("Content...", namespace="docs", key="doc-1")
# Semantic searchresults = await knowledge.search("query", namespace="docs", limit=5)for doc in results: print(f"{doc.score}: {doc.content[:100]}")See Knowledge Module Reference for full API.
config
Section titled “config”Configuration and secrets management with org scoping.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
get(key, default=None) | key: str, default: Any | Any | Get config value |
set(key, value, is_secret=False) | key: str, value: Any, is_secret: bool | None | Set config value (use is_secret=True for sensitive data) |
list() | - | dict[str, Any] | List all configs |
delete(key) | key: str | bool | Delete config (returns success) |
from bifrost import config
# Get with defaultapi_url = await config.get("api_url", default="https://api.example.com")
# Get a secret (retrieved the same way)api_key = await config.get("api_key")from bifrost import config
# Set regular configuration valueawait config.set("api_url", "https://new-api.example.com")
# Set a secret (encrypted in database)await config.set("api_key", "sk_live_xxxx", is_secret=True)from bifrost import config
# Get all configurations for current orgall_configs = await config.list()for key, value in all_configs.items(): print(f"{key}: {value}")integrations
Section titled “integrations”Access integration configurations and OAuth credentials.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
get(name, scope=None) | name: str, scope: str | None | IntegrationData | None | Get integration with OAuth tokens |
IntegrationData Structure:
{ "integration_id": str, # Integration ID "entity_id": str | None, # External entity ID (tenant, company) "entity_name": str | None, # Display name "config": dict, # Merged configuration "oauth": { # OAuth credentials (if configured) "access_token": str, # Decrypted "refresh_token": str, # Decrypted "expires_at": str, # ISO timestamp ... }}from bifrost import integrationsimport httpx
# Get integration with OAuthintegration = await integrations.get("Microsoft_Graph")if not integration or not integration.oauth: raise ValueError("Integration not configured")
# Use access_token for API callsasync with httpx.AsyncClient() as client: response = await client.get( "https://graph.microsoft.com/v1.0/me", headers={"Authorization": f"Bearer {integration.oauth.access_token}"} ) return response.json()from bifrost import integrations
# Get integration with configintegration = await integrations.get("HaloPSA")if not integration: raise ValueError("HaloPSA not configured")
# Access config valuesbase_url = integration.config.get("base_url")tenant_id = integration.entity_id
# Use OAuth tokentoken = integration.oauth.access_tokenSee Integrations Module Reference for full API.
File operations for workspace and temporary storage.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
read(path, location, mode) | path: str, location: str, mode: str | str | Read file as text |
read_bytes(path, location, mode) | path: str, location: str, mode: str | bytes | Read file as bytes |
write(path, content, location, mode) | path: str, content: str, location: str, mode: str | None | Write text file |
write_bytes(path, content, location, mode) | path: str, content: bytes, location: str, mode: str | None | Write binary file |
list(directory, location, mode) | directory: str, location: str, mode: str | list[str] | List directory contents |
delete(path, location, mode) | path: str, location: str, mode: str | None | Delete file |
exists(path, location, mode) | path: str, location: str, mode: str | bool | Check if file exists |
Location Parameter:
All functions accept an optional location parameter:
"workspace"(default) - Persistent workspace files"temp"- Temporary files cleared periodically"uploads"- Files uploaded via form file fields (read-only)
Mode Parameter:
All functions accept an optional mode parameter:
"cloud"(default) - S3 storage (for platform execution)"local"- Local filesystem (for CLI/local development)
from bifrost import files
# Read text filecontent = await files.read("data/config.json")
# Read binary fileimage_bytes = await files.read_bytes("images/logo.png")
# Read uploaded file from formuploaded = await files.read("uploads/form_id/uuid/file.txt", location="uploads")from bifrost import files
# Write CSV fileawait files.write("data/output.csv", "id,name\n1,Alice\n")
# Write binary fileawait files.write_bytes("images/chart.png", image_data)from bifrost import files
# List directoryitems = await files.list("data/")for item in items: print(f"Found: {item}")
# Check if existsif await files.exists("data/config.json"): content = await files.read("data/config.json")from bifrost import files
# Write to temp locationawait files.write("temp_report.html", html_content, location="temp")
# Temp files are automatically cleaned up periodicallyorganizations
Section titled “organizations”Organization management (platform admin only).
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
create(name, domain, is_active) | name: str, domain: str | None, is_active: bool | Organization | Create organization |
get(org_id) | org_id: str | Organization | Get organization by ID |
list() | - | list[Organization] | List all organizations |
update(org_id, **updates) | org_id: str, updates: dict | Organization | Update organization |
delete(org_id) | org_id: str | bool | Soft delete organization |
Example:
from bifrost import organizations
# Create organizationorg = await organizations.create("Acme Corp", domain="acme.com")
# List organizationsorgs = await organizations.list()for org in orgs: print(f"{org.name}: {org.domain}")User management (platform admin only).
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
create(email, name, is_superuser, org_id, is_active) | email: str, name: str, … | UserPublic | Create user |
get(user_id) | user_id: str | UserPublic | None | Get user by ID |
list(org_id) | org_id: str | None | list[UserPublic] | List users |
update(user_id, **updates) | user_id: str, updates: dict | UserPublic | Update user |
delete(user_id) | user_id: str | bool | Soft delete user |
Example:
from bifrost import users
# Create useruser = await users.create( email="john@example.com", name="John Doe", org_id="org-123")
# List users in organizationorg_users = await users.list(org_id="org-123")Role management with user and form assignments.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
create(name, description) | name: str, description: str | Role | Create role |
get(role_id) | role_id: str | Role | Get role by ID |
list() | - | list[Role] | List roles in current org |
update(role_id, **updates) | role_id: str, updates: dict | Role | Update role |
delete(role_id) | role_id: str | None | Soft delete role |
list_users(role_id) | role_id: str | list[str] | List user IDs assigned to role |
list_forms(role_id) | role_id: str | list[str] | List form IDs assigned to role |
assign_users(role_id, user_ids) | role_id: str, user_ids: list[str] | None | Assign users to role |
assign_forms(role_id, form_ids) | role_id: str, form_ids: list[str] | None | Assign forms to role |
Example:
from bifrost import roles
# Create rolerole = await roles.create("Manager", description="Can manage team data")
# Assign users and formsawait roles.assign_users(role.id, ["user-1", "user-2"])await roles.assign_forms(role.id, ["form-1", "form-2"])Form definitions (read-only access).
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
list() | - | list[FormPublic] | List available forms |
get(form_id) | form_id: str | FormPublic | Get form by ID |
Example:
from bifrost import forms
# List formsall_forms = await forms.list()for form in all_forms: print(f"{form.name}: {form.description}")
# Get specific formform = await forms.get("form-123")print(form.form_schema)executions
Section titled “executions”Workflow execution history.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
list(workflow_name, status, start_date, end_date, limit) | filters: various | list[WorkflowExecution] | List executions with filtering |
get(execution_id) | execution_id: str | WorkflowExecution | Get execution details |
Example:
from bifrost import executions
# List recent executionsrecent = await executions.list(limit=10)for exec in recent: print(f"{exec.workflow_name}: {exec.status}")
# Get failed executionsfailed = await executions.list(status="Failed")
# Get specific executiondetails = await executions.get("exec-123")print(details.result)workflows
Section titled “workflows”Workflow metadata and execution status.
Functions:
| Function | Parameters | Returns | Description |
|---|---|---|---|
list() | - | list[WorkflowMetadata] | List all workflows |
get(execution_id) | execution_id: str | WorkflowExecution | Get execution details |
Example:
from bifrost import workflows
# List all workflowswf_list = await workflows.list()for wf in wf_list: print(f"{wf.name}: {wf.description}") if wf.schedule: print(f" Scheduled: {wf.schedule}")Complete Example
Section titled “Complete Example”from bifrost import workflow, data_provider, context, config, filesimport aiohttpimport jsonimport logging
logger = logging.getLogger(__name__)
# Data provider - no context parameter needed@data_provider( name="get_departments", description="List departments", cache_ttl_seconds=600)async def get_departments(): return [ {"label": "Engineering", "value": "eng"}, {"label": "Sales", "value": "sales"} ]
# Workflow - parameters auto-inferred from function signature@workflow(category="User Management", timeout_seconds=60)async def create_user( email: str, name: str, department: str = "General"): """Create a new user with department assignment."""
logger.info("Creating user", extra={ "email": email, "department": department, "org_id": context.org_id # Access context via proxy })
try: # Get config values (async!) api_url = await config.get("api_url") api_key = await config.get("api_key") # Secrets retrieved same as config
# Call external API headers = {"Authorization": f"Bearer {api_key}"} payload = {"email": email, "name": name, "department": department}
async with aiohttp.ClientSession() as session: async with session.post( f"{api_url}/users", headers=headers, json=payload ) as resp: if resp.status == 201: user = await resp.json()
# Write to file (async) await files.write( f"users/{user['id']}.json", json.dumps(user, indent=2) )
logger.info(f"User created: {user['id']}") return {"success": True, "user_id": user["id"]} else: error = await resp.text() logger.error(f"API error: {error}") return {"success": False, "error": error}
except Exception as e: logger.error(f"Failed: {str(e)}", exc_info=True) return {"success": False, "error": str(e)}Type Definitions
Section titled “Type Definitions”Organization
Section titled “Organization”class Organization(BaseModel): id: str name: str is_active: bool domain: str | Noneclass Role(BaseModel): id: str name: str description: str organization_id: str is_active: boolUserPublic
Section titled “UserPublic”class UserPublic(BaseModel): id: str email: str name: str organization_id: str | None is_active: bool is_superuser: boolFormPublic
Section titled “FormPublic”class FormPublic(BaseModel): id: str name: str description: str | None workflow_id: str | None form_schema: dict | None access_level: str organization_id: str | None is_active: boolWorkflowMetadata
Section titled “WorkflowMetadata”class WorkflowMetadata(BaseModel): id: str name: str description: str | None category: str tags: list[str] execution_mode: str timeout_seconds: int schedule: str | None endpoint_enabled: bool is_tool: bool time_saved: intWorkflowExecution
Section titled “WorkflowExecution”class WorkflowExecution(BaseModel): execution_id: str workflow_name: str org_id: str | None executed_by: str status: str input_data: dict result: Any error_message: str | None duration_ms: int | None started_at: datetime | None completed_at: datetime | NoneExecutionStatus:
PENDING- Queued for executionRUNNING- Currently executingCOMPLETED- Completed successfullyFAILED- Failed with errorCANCELLED- Cancelled by user
ConfigType:
STRING- Plain text valueSECRET- Encrypted valueJSON- JSON objectBOOL- Boolean valueINT- Integer value
FormFieldType:
TEXT- Text inputNUMBER- Number inputSELECT- Dropdown selectCHECKBOX- CheckboxDATE- Date picker
Errors
Section titled “Errors”class UserError(Exception): """User-facing error with formatted message."""
class WorkflowError(Exception): """Workflow execution error."""
class ValidationError(Exception): """Validation error."""
class IntegrationError(Exception): """Integration error."""
class ConfigurationError(Exception): """Configuration error."""Best Practices
Section titled “Best Practices”- Context Proxy: Use
from bifrost import contextto access execution context - All Async: All SDK modules are async and require
await - Logging: Use
logging.getLogger(__name__) - Error Handling: Return error states, log with context
- Secrets: Use
config.set(key, value, is_secret=True)for sensitive data; never log or return secrets - Caching: Set appropriate cache TTL for data providers
- Integrations: Use
integrations.get()to access OAuth tokens and config for external services
Next Steps
Section titled “Next Steps”- Context API Reference - Detailed ExecutionContext properties
- Decorators Reference - Advanced decorator usage
- Writing Workflows Guide - Practical guide