Skip to content

bifrost Module Reference

Complete API reference for decorators, context, and SDK modules

# Decorators and context proxy
from bifrost import workflow, data_provider, context
# SDK modules (all async)
from bifrost import ai, knowledge, config, integrations, files
from bifrost import organizations, users, roles, forms, executions, workflows
# Types (optional, for type hints)
from bifrost import (
ExecutionContext, Organization, Role, UserPublic, FormPublic,
WorkflowMetadata, WorkflowExecution, IntegrationData, OAuthCredentials,
ConfigData, AIResponse, AIStreamChunk, KnowledgeDocument, NamespaceInfo,
)
# Enums
from bifrost import ExecutionStatus, ConfigType, FormFieldType
# Errors
from bifrost import UserError, WorkflowError, ValidationError, IntegrationError, ConfigurationError

Registers an async function as an executable workflow. All parameters are optional - the decorator auto-infers name, description, and parameters from your function.

@workflow(**options)
@workflow
async def create_user(email: str, name: str):
"""Create a new user in the system."""
return {"user_id": "123"}

Parameters:

ParameterTypeDefaultDescription
namestrAutoAuto-derived from function name
descriptionstrAutoAuto-derived from docstring
categorystr”General”Workflow category
tagslist[str][]Tags for filtering
execution_modestrAutoAuto-selects based on endpoint_enabled
timeout_secondsint1800Max execution time (30 min default)
schedulestrNoneCron expression
endpoint_enabledboolFalseExpose as HTTP endpoint
allowed_methodslist[str][“POST”]Allowed HTTP methods
public_endpointboolFalseSkip authentication

Registers a function that provides dynamic options.

@data_provider(name: str | None = None, description: str | None = None)

Parameters:

ParameterTypeDefaultDescription
namestrNoneUnique identifier (auto-derived from function name if not provided)
descriptionstrNoneWhat it provides (auto-derived from docstring if not provided)

Return Format:

[
{
"label": "Display Text",
"value": "actual_value",
"metadata": {} # Optional
}
]

Example:

from bifrost import data_provider, context
@data_provider(
name="get_departments",
description="List departments"
)
async def get_departments():
"""List available departments."""
# Access context via proxy if needed
org_id = context.org_id
return [
{"label": "Engineering", "value": "eng"},
{"label": "Sales", "value": "sales"}
]

Access organization, user, and execution metadata via the context proxy. No need to pass it as a parameter.

from bifrost import context
@workflow
async def my_workflow(name: str):
# Access context properties directly
org_id = context.org_id
user_email = context.email
is_admin = context.is_platform_admin

Properties:

PropertyTypeDescription
user_idstrCurrent user ID
emailstrUser email
namestrUser display name
org_idstr | NoneOrganization ID
org_namestr | NoneOrganization name
organizationOrganization | NoneFull org object
scopestr”GLOBAL” or org ID
execution_idstrUnique execution ID
is_platform_adminboolIs platform admin
is_function_keyboolCalled via API key
is_global_scopeboolExecuting in global scope

See Context API for complete reference.

LLM completions and streaming with optional RAG support.

from bifrost import ai
# Simple completion
response = await ai.complete("Summarize this text")
print(response.content)
# Structured output
from pydantic import BaseModel
class Analysis(BaseModel):
sentiment: str
score: float
result = await ai.complete("Analyze: Great!", response_format=Analysis)
# With knowledge base (RAG)
response = await ai.complete("What is the policy?", knowledge=["policies"])
# Streaming
async for chunk in ai.stream("Write a story"):
print(chunk.content, end="")

See AI Module Reference for full API.

Vector storage and semantic search for RAG.

from bifrost import knowledge
# Store document
doc_id = await knowledge.store("Content...", namespace="docs", key="doc-1")
# Semantic search
results = await knowledge.search("query", namespace="docs", limit=5)
for doc in results:
print(f"{doc.score}: {doc.content[:100]}")

See Knowledge Module Reference for full API.

Configuration and secrets management with org scoping.

Functions:

FunctionParametersReturnsDescription
get(key, default=None)key: str, default: AnyAnyGet config value
set(key, value, is_secret=False)key: str, value: Any, is_secret: boolNoneSet config value (use is_secret=True for sensitive data)
list()-dict[str, Any]List all configs
delete(key)key: strboolDelete config (returns success)
from bifrost import config
# Get with default
api_url = await config.get("api_url", default="https://api.example.com")
# Get a secret (retrieved the same way)
api_key = await config.get("api_key")

Access integration configurations and OAuth credentials.

Functions:

FunctionParametersReturnsDescription
get(name, scope=None)name: str, scope: str | NoneIntegrationData | NoneGet integration with OAuth tokens

IntegrationData Structure:

{
"integration_id": str, # Integration ID
"entity_id": str | None, # External entity ID (tenant, company)
"entity_name": str | None, # Display name
"config": dict, # Merged configuration
"oauth": { # OAuth credentials (if configured)
"access_token": str, # Decrypted
"refresh_token": str, # Decrypted
"expires_at": str, # ISO timestamp
...
}
}
from bifrost import integrations
import httpx
# Get integration with OAuth
integration = await integrations.get("Microsoft_Graph")
if not integration or not integration.oauth:
raise ValueError("Integration not configured")
# Use access_token for API calls
async with httpx.AsyncClient() as client:
response = await client.get(
"https://graph.microsoft.com/v1.0/me",
headers={"Authorization": f"Bearer {integration.oauth.access_token}"}
)
return response.json()

See Integrations Module Reference for full API.

File operations for workspace and temporary storage.

Functions:

FunctionParametersReturnsDescription
read(path, location, mode)path: str, location: str, mode: strstrRead file as text
read_bytes(path, location, mode)path: str, location: str, mode: strbytesRead file as bytes
write(path, content, location, mode)path: str, content: str, location: str, mode: strNoneWrite text file
write_bytes(path, content, location, mode)path: str, content: bytes, location: str, mode: strNoneWrite binary file
list(directory, location, mode)directory: str, location: str, mode: strlist[str]List directory contents
delete(path, location, mode)path: str, location: str, mode: strNoneDelete file
exists(path, location, mode)path: str, location: str, mode: strboolCheck if file exists

Location Parameter:

All functions accept an optional location parameter:

  • "workspace" (default) - Persistent workspace files
  • "temp" - Temporary files cleared periodically
  • "uploads" - Files uploaded via form file fields (read-only)

Mode Parameter:

All functions accept an optional mode parameter:

  • "cloud" (default) - S3 storage (for platform execution)
  • "local" - Local filesystem (for CLI/local development)
from bifrost import files
# Read text file
content = await files.read("data/config.json")
# Read binary file
image_bytes = await files.read_bytes("images/logo.png")
# Read uploaded file from form
uploaded = await files.read("uploads/form_id/uuid/file.txt", location="uploads")

Organization management (platform admin only).

Functions:

FunctionParametersReturnsDescription
create(name, domain, is_active)name: str, domain: str | None, is_active: boolOrganizationCreate organization
get(org_id)org_id: strOrganizationGet organization by ID
list()-list[Organization]List all organizations
update(org_id, **updates)org_id: str, updates: dictOrganizationUpdate organization
delete(org_id)org_id: strboolSoft delete organization

Example:

from bifrost import organizations
# Create organization
org = await organizations.create("Acme Corp", domain="acme.com")
# List organizations
orgs = await organizations.list()
for org in orgs:
print(f"{org.name}: {org.domain}")

User management (platform admin only).

Functions:

FunctionParametersReturnsDescription
create(email, name, is_superuser, org_id, is_active)email: str, name: str, …UserPublicCreate user
get(user_id)user_id: strUserPublic | NoneGet user by ID
list(org_id)org_id: str | Nonelist[UserPublic]List users
update(user_id, **updates)user_id: str, updates: dictUserPublicUpdate user
delete(user_id)user_id: strboolSoft delete user

Example:

from bifrost import users
# Create user
user = await users.create(
email="john@example.com",
name="John Doe",
org_id="org-123"
)
# List users in organization
org_users = await users.list(org_id="org-123")

Role management with user and form assignments.

Functions:

FunctionParametersReturnsDescription
create(name, description)name: str, description: strRoleCreate role
get(role_id)role_id: strRoleGet role by ID
list()-list[Role]List roles in current org
update(role_id, **updates)role_id: str, updates: dictRoleUpdate role
delete(role_id)role_id: strNoneSoft delete role
list_users(role_id)role_id: strlist[str]List user IDs assigned to role
list_forms(role_id)role_id: strlist[str]List form IDs assigned to role
assign_users(role_id, user_ids)role_id: str, user_ids: list[str]NoneAssign users to role
assign_forms(role_id, form_ids)role_id: str, form_ids: list[str]NoneAssign forms to role

Example:

from bifrost import roles
# Create role
role = await roles.create("Manager", description="Can manage team data")
# Assign users and forms
await roles.assign_users(role.id, ["user-1", "user-2"])
await roles.assign_forms(role.id, ["form-1", "form-2"])

Form definitions (read-only access).

Functions:

FunctionParametersReturnsDescription
list()-list[FormPublic]List available forms
get(form_id)form_id: strFormPublicGet form by ID

Example:

from bifrost import forms
# List forms
all_forms = await forms.list()
for form in all_forms:
print(f"{form.name}: {form.description}")
# Get specific form
form = await forms.get("form-123")
print(form.form_schema)

Workflow execution history.

Functions:

FunctionParametersReturnsDescription
list(workflow_name, status, start_date, end_date, limit)filters: variouslist[WorkflowExecution]List executions with filtering
get(execution_id)execution_id: strWorkflowExecutionGet execution details

Example:

from bifrost import executions
# List recent executions
recent = await executions.list(limit=10)
for exec in recent:
print(f"{exec.workflow_name}: {exec.status}")
# Get failed executions
failed = await executions.list(status="Failed")
# Get specific execution
details = await executions.get("exec-123")
print(details.result)

Workflow metadata and execution status.

Functions:

FunctionParametersReturnsDescription
list()-list[WorkflowMetadata]List all workflows
get(execution_id)execution_id: strWorkflowExecutionGet execution details

Example:

from bifrost import workflows
# List all workflows
wf_list = await workflows.list()
for wf in wf_list:
print(f"{wf.name}: {wf.description}")
if wf.schedule:
print(f" Scheduled: {wf.schedule}")
from bifrost import workflow, data_provider, context, config, files
import aiohttp
import json
import logging
logger = logging.getLogger(__name__)
# Data provider - no context parameter needed
@data_provider(
name="get_departments",
description="List departments",
cache_ttl_seconds=600
)
async def get_departments():
return [
{"label": "Engineering", "value": "eng"},
{"label": "Sales", "value": "sales"}
]
# Workflow - parameters auto-inferred from function signature
@workflow(category="User Management", timeout_seconds=60)
async def create_user(
email: str,
name: str,
department: str = "General"
):
"""Create a new user with department assignment."""
logger.info("Creating user", extra={
"email": email,
"department": department,
"org_id": context.org_id # Access context via proxy
})
try:
# Get config values (async!)
api_url = await config.get("api_url")
api_key = await config.get("api_key") # Secrets retrieved same as config
# Call external API
headers = {"Authorization": f"Bearer {api_key}"}
payload = {"email": email, "name": name, "department": department}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{api_url}/users",
headers=headers,
json=payload
) as resp:
if resp.status == 201:
user = await resp.json()
# Write to file (async)
await files.write(
f"users/{user['id']}.json",
json.dumps(user, indent=2)
)
logger.info(f"User created: {user['id']}")
return {"success": True, "user_id": user["id"]}
else:
error = await resp.text()
logger.error(f"API error: {error}")
return {"success": False, "error": error}
except Exception as e:
logger.error(f"Failed: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}
class Organization(BaseModel):
id: str
name: str
is_active: bool
domain: str | None
class Role(BaseModel):
id: str
name: str
description: str
organization_id: str
is_active: bool
class UserPublic(BaseModel):
id: str
email: str
name: str
organization_id: str | None
is_active: bool
is_superuser: bool
class FormPublic(BaseModel):
id: str
name: str
description: str | None
workflow_id: str | None
form_schema: dict | None
access_level: str
organization_id: str | None
is_active: bool
class WorkflowMetadata(BaseModel):
id: str
name: str
description: str | None
category: str
tags: list[str]
execution_mode: str
timeout_seconds: int
schedule: str | None
endpoint_enabled: bool
is_tool: bool
time_saved: int
class WorkflowExecution(BaseModel):
execution_id: str
workflow_name: str
org_id: str | None
executed_by: str
status: str
input_data: dict
result: Any
error_message: str | None
duration_ms: int | None
started_at: datetime | None
completed_at: datetime | None

ExecutionStatus:

  • PENDING - Queued for execution
  • RUNNING - Currently executing
  • COMPLETED - Completed successfully
  • FAILED - Failed with error
  • CANCELLED - Cancelled by user

ConfigType:

  • STRING - Plain text value
  • SECRET - Encrypted value
  • JSON - JSON object
  • BOOL - Boolean value
  • INT - Integer value

FormFieldType:

  • TEXT - Text input
  • NUMBER - Number input
  • SELECT - Dropdown select
  • CHECKBOX - Checkbox
  • DATE - Date picker
class UserError(Exception):
"""User-facing error with formatted message."""
class WorkflowError(Exception):
"""Workflow execution error."""
class ValidationError(Exception):
"""Validation error."""
class IntegrationError(Exception):
"""Integration error."""
class ConfigurationError(Exception):
"""Configuration error."""
  1. Context Proxy: Use from bifrost import context to access execution context
  2. All Async: All SDK modules are async and require await
  3. Logging: Use logging.getLogger(__name__)
  4. Error Handling: Return error states, log with context
  5. Secrets: Use config.set(key, value, is_secret=True) for sensitive data; never log or return secrets
  6. Caching: Set appropriate cache TTL for data providers
  7. Integrations: Use integrations.get() to access OAuth tokens and config for external services