Skip to content

ExecutionContext API

Complete reference for the ExecutionContext object

ExecutionContext provides access to organization, user, and execution information in every workflow and data provider.

from bifrost import workflow, ExecutionContext
@workflow(name="example")
async def example(context: ExecutionContext):
# Access context properties
org_id = context.org_id
user_id = context.user_id
PropertyTypeDescriptionExample
user_idstrCurrent user’s ID"user-123"
emailstrUser’s email address"alice@example.com"
namestrUser’s display name"Alice Smith"
PropertyTypeDescriptionExample
org_idstr | NoneOrganization ID"org-456"
org_namestr | NoneOrganization name"Acme Corp"
organizationOrganization | NoneFull organization objectSee Organization type
scopestrScope identifier"org-456" or "GLOBAL"
PropertyTypeDescriptionExample
execution_idstrUnique execution ID"exec-789"
PropertyTypeDescription
is_platform_adminboolUser is platform administrator
is_function_keyboolCalled via API key (not user)
is_global_scopeboolExecuting in global scope (no org)

Returns organization ID or None for global scope.

org_id: str | None = context.org_id
if context.org_id:
# Organization-scoped operation
users = await get_org_users(context.org_id)
else:
# Global operation
all_orgs = await get_all_organizations()

Returns organization display name.

org_name: str | None = context.org_name
logger.info(f"Executing for {context.org_name}")

Check if executing in global (platform-wide) scope.

if context.is_global_scope:
# Platform-level operation
pass
else:
# Organization-level operation
pass

When context.organization is not None:

PropertyTypeDescription
idstrOrganization ID
namestrDisplay name
is_activeboolOrganization is active
if context.organization:
org_name = context.organization.name
is_active = context.organization.is_active
@workflow(name="admin_task")
async def admin_task(context: ExecutionContext):
if not context.is_platform_admin:
return {
"success": False,
"error": "Admin privileges required"
}
# Perform admin operation
return {"success": True}

Data providers receive the same context:

from bifrost import data_provider, ExecutionContext
@data_provider(name="get_org_users")
async def get_org_users(context: ExecutionContext):
# Filter users by organization
users = await db.query(
"SELECT id, name FROM users WHERE org_id = ?",
context.org_id
)
return [
{"label": u["name"], "value": u["id"]}
for u in users
]
Old PropertyNew Property
executed_byuser_id
executed_by_emailemail
executed_by_namename
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ExecutionContext:
user_id: str
email: str
name: str
scope: str
organization: Organization | None
is_platform_admin: bool
is_function_key: bool
execution_id: str
@property
def org_id(self) -> str | None:
return self.organization.id if self.organization else None
@property
def org_name(self) -> str | None:
return self.organization.name if self.organization else None
@property
def is_global_scope(self) -> bool:
return self.scope == "GLOBAL"
  1. Always Check org_id: Verify org context before org-scoped operations
  2. Log Context: Include context in log messages for debugging
  3. Type Hints: Always use ExecutionContext type hint
  4. Don’t Mutate: Context is read-only, don’t try to modify it
  5. Use Properties: Use context.org_id not context.organization.id
  6. Never Log Secrets: Don’t include sensitive data in log messages