Executions Module
SDK reference for querying workflow execution history and live logs
The executions module lets workflows query historical executions and stream their own in-flight logs. Platform admins see all executions in their scope; regular users see only their own.
Import
Section titled “Import”from bifrost import executionsMethod Index
Section titled “Method Index”| Method | Returns | Description |
|---|---|---|
executions.list() | list[WorkflowExecution] | List executions with optional filters |
executions.get() | WorkflowExecution | Get a single execution by ID |
executions.get_current_logs() | list[ExecutionLog] | Read logs from the current (or specified) execution’s Redis stream |
executions.list()
Section titled “executions.list()”async def list( workflow_name: str | None = None, status: str | None = None, start_date: str | None = None, end_date: str | None = None, limit: int = 50,) -> list[WorkflowExecution]| Parameter | Type | Description |
|---|---|---|
workflow_name | str | None | Filter by workflow name |
status | str | None | Filter by status (e.g. "Failed", "Completed") |
start_date | str | None | ISO-format lower bound on started_at |
end_date | str | None | ISO-format upper bound on started_at |
limit | int | Max results (default 50, capped at 1000) |
recent = await executions.list(limit=10)failed_today = await executions.list(status="Failed", start_date="2026-04-25")executions.get()
Section titled “executions.get()”async def get(execution_id: str) -> WorkflowExecutionRaises ValueError if not found, PermissionError if access is denied.
detail = await executions.get("a1b2c3d4-...")print(detail.status, detail.duration_ms)executions.get_current_logs()
Section titled “executions.get_current_logs()”Read structured log entries from the Redis stream for an execution. Useful for in-flight progress checks, sub-workflow context passing, or post-mortem inspection.
async def get_current_logs( execution_id: str | None = None, start: str = "0", count: int = 100,) -> list[ExecutionLog]| Parameter | Type | Description |
|---|---|---|
execution_id | str | None | Defaults to the current execution ID from context |
start | str | Stream ID to start at ("0" = beginning) |
count | int | Max entries to read |
When called outside a workflow context with no execution_id, raises RuntimeError.
logs = await executions.get_current_logs()for entry in logs: print(f"[{entry.level}] {entry.message}")WorkflowExecution
Section titled “WorkflowExecution”| Field | Type | Description |
|---|---|---|
execution_id | str | Unique execution ID |
workflow_name | str | Workflow name |
org_id | str | None | Organization ID |
form_id | str | None | Form ID if triggered via form |
executed_by | str | User ID who executed |
executed_by_name | str | Display name |
status | str | Current status |
input_data | dict | Input parameters |
result | Any | Execution result |
result_type | str | None | Hint for rendering result |
error_message | str | None | Error if failed |
duration_ms | int | None | Total duration |
started_at, completed_at | datetime | None | Timestamps |
logs | list[dict] | None | Captured log entries |
variables | dict | None | Runtime variables |
session_id | str | None | CLI session ID |
peak_memory_bytes, cpu_total_seconds | resource metrics |
ExecutionLog
Section titled “ExecutionLog”| Field | Type | Description |
|---|---|---|
id | str | Stream entry ID |
execution_id | str | Execution UUID |
level | str | "INFO", "WARNING", "ERROR", "DEBUG", "CRITICAL" |
message | str | Log message |
metadata | dict | None | Optional JSON metadata |
timestamp | str | ISO timestamp |