Skip to content

Workflow Engine Troubleshooting

Diagnose and fix workflow execution and discovery issues

The workflow engine executes your workflows in an isolated environment. This guide helps you diagnose and fix workflow-specific issues.

Symptom: Workflow doesn’t appear in UI or API /api/discovery list

Workflows must be in discoverable directories:

✅ /home/workflows/my_workflow.py
✅ /platform/examples/my_workflow.py
✅ /home/data_providers/my_provider.py
❌ /workspace/my_workflow.py (wrong location)
❌ /other/my_workflow.py (not scanned)
✅ my_workflow.py (discovered)
✅ user_onboarding.py (discovered)
❌ _my_workflow.py (ignored, starts with _)
❌ __pycache__.py (ignored)
❌ my_workflow (no .py) (ignored)

Files starting with underscore are skipped to avoid discovering helper/utility files.

# ✅ Correct
from bifrost import workflow
@workflow(name="test", description="Test workflow")
async def test(context):
pass
# ❌ Wrong: No decorator
async def test(context):
pass
# ❌ Wrong: Decorator commented out
# @workflow(name="test", description="Test")
async def test(context):
pass
# ❌ Wrong: Wrong import
from shared.decorators import workflow # Import restriction violation!

When the Function App starts or deploys, it logs discovery:

Terminal window
# Get logs
az functionapp log tail --resource-group <rg> --name <name>
# Look for:
"Discovered: workspace.workflows.my_workflow"
"Failed to import: workspace.workflows.broken_file"
"Import error in workspace.workflows.my_workflow"

If import fails:

  1. Check Python syntax errors
  2. Look at full error message for import issues
  3. Verify imports are allowed by the import restrictions
# ✅ Correct
@workflow(
name="create_user", # Unique identifier
description="Create user", # Human-readable
category="users" # Optional
)
async def create_user(context):
pass
# ❌ Missing required fields
@workflow(name="create_user") # Missing description
async def create_user(context):
pass
# ❌ Wrong field types
@workflow(
name=123, # Should be string
description="Create user"
)
async def create_user(context):
pass

Symptom: Workflow appears but parameters are missing from form

Parameters must be stacked below @workflow:

# ✅ Correct order (workflow first, then params)
@workflow(name="test")
@param("email", type="email", required=True)
@param("name", type="string", required=True)
async def test(context, email: str, name: str):
pass
# ❌ Wrong: params before workflow
@param("email", type="email", required=True)
@workflow(name="test")
async def test(context, email: str):
pass

The order matters because decorators execute bottom-up in Python, and @workflow collects the parameters attached to the function.

# ✅ Correct: param name matches function argument
@param("email", type="email")
async def create_user(context, email: str):
pass
# ❌ Wrong: param name doesn't match argument
@param("recipient_email", type="email") # Called "recipient_email"
async def create_user(context, email: str): # But arg is "email"
pass
# ❌ Wrong: parameter in decorator but not in function
@param("phone", type="string")
async def create_user(context, email: str): # "phone" parameter not in args
pass
# ✅ Minimal valid param
@param("email", type="email")
# ✅ Full featured param
@param(
name="email", # Matches function arg
type="email", # Valid type
label="Email Address", # UI label
required=True, # Is required?
help_text="User's email", # Help text
validation={"pattern": "..."}, # Custom validation
data_provider="get_email_domains", # Dropdown provider
default_value="user@example.com" # Default value
)
# ❌ Invalid: Unknown field
@param("email", type="email", unknown_field="value")
# ❌ Invalid: Invalid type
@param("email", type="unknown_type") # Should be: email, string, int, etc.

Valid parameter types:

string → Text input
int → Integer number
float → Decimal number
bool → True/False checkbox
email → Email address with validation
json → JSON object
list → Array of values
# If using data_provider, it must exist and be discoverable
# ✅ Correct: Data provider exists
@param("license", data_provider="get_available_licenses")
# ❌ Wrong: Data provider doesn't exist
@param("license", data_provider="invalid_provider_name")
# ❌ Wrong: Typo in provider name
@param("license", data_provider="get_availble_licenses") # "availble" typo

Symptom: Workflow executes but returns error

Each execution has detailed logs:

UI: Workflows → [Select workflow] → Execution History → [Click execution]
Shows: logs, checkpoints, variables, errors

Log levels:

INFO → Normal operation messages
WARNING → Something unexpected but recoverable
ERROR → Operation failed

The error message tells you what went wrong:

❌ "NameError: name 'context' is not defined"
Fix: Function must accept context as first parameter
❌ "TypeError: missing required argument 'email'"
Fix: Form didn't pass email parameter
❌ "ImportError: cannot import name 'msgraph'"
Fix: Use oauth module: from bifrost import oauth
❌ "asyncio.TimeoutError"
Fix: Workflow took longer than timeout (increase timeout_seconds)
❌ "Exception: User does not have E5 license"
Fix: Your workflow logic raised this error (check your code)
# ✅ Correct: Must accept context
async def my_workflow(context: ExecutionContext):
pass
# ✅ Correct: With parameters
async def my_workflow(context: ExecutionContext, email: str, name: str):
pass
# ❌ Wrong: Missing context
async def my_workflow(email: str):
pass
# ❌ Wrong: Not async
def my_workflow(context): # Should be async
pass
# ❌ Will crash the workflow
async def my_workflow(context):
result = await some_api_call()
# If some_api_call fails, exception bubbles up
return result.data # AttributeError if result is None
# ✅ Better: Handle exceptions
async def my_workflow(context):
try:
result = await some_api_call()
if result is None:
logger.error( "API returned None")
return {"error": "API failed"}
return result.data
except Exception as e:
logger.error( f"API call failed: {str(e)}")
return {"error": str(e)}

Symptom: “ImportError: cannot import…” when executing workflow

Different directories have different allowed imports:

In /home code (user workflows):

# ✅ Allowed
from bifrost import workflow, param, ExecutionContext
import requests
import aiohttp
from datetime import datetime
# ❌ Not allowed
from shared.registry import get_registry # Internal
from functions.http.discovery import get_discovery # HTTP layer

In /platform code (examples, integrations):

# ✅ Allowed
from bifrost import workflow, param
from shared.handlers import some_handler # Can access shared/*
import requests
# ❌ Not allowed
from functions.http.discovery import get_discovery # HTTP layer
# ✅ Correct: bifrost SDK is public API
from bifrost import (
workflow,
param,
ExecutionContext,
get_registry, # Available in public API
)
# ❌ Wrong: Not in public API
from bifrost.internal import something_internal
# ❌ Wrong: Module doesn't exist
from bifrost.nonexistent import something

Third-party packages are available if in requirements.txt:

# requirements.txt (in bifrost-api)
requests==2.31.0
aiohttp==3.9.0
azure-storage-blob==12.18.0
pydantic==2.5.0
# ✅ Available
import requests
import aiohttp
from azure.storage.blob import BlobServiceClient
# ❌ Not available (not in requirements.txt)
import pandas # Not installed

Symptom: “Execution timeout” or “asyncio.TimeoutError”

Each workflow has a maximum execution time:

@workflow(
name="my_workflow",
timeout_seconds=300 # Default: 5 minutes
)
async def my_workflow(context):
pass

If the workflow takes longer than timeout_seconds, it’s killed.

# For longer operations
@workflow(
name="bulk_import",
timeout_seconds=1800, # 30 minutes
execution_mode="async" # Run in background
)
async def bulk_import(context):
# Has up to 30 minutes
pass
# ❌ Slow: One user at a time
async def import_users(context):
for user in users:
await create_user(user) # Wait for each one
# ✅ Fast: Parallel operations (Python 3.10+)
async def import_users(context):
tasks = [create_user(user) for user in users]
await asyncio.gather(*tasks) # Do them all at once
# ✅ Good for long workflows
@workflow(
execution_mode="async", # Runs in background
timeout_seconds=3600 # 1 hour
)
async def import_all_users(context):
pass
# ❌ Risky: Sync mode has hard limits
@workflow(
execution_mode="sync", # Blocks user's HTTP request
timeout_seconds=300 # Azure Functions hard timeout
)
async def import_all_users(context):
pass

Symptom: “context is None” or “Organization not found”

# ✅ Correct: context provided by framework
async def my_workflow(context: ExecutionContext):
org_id = context.org_id # "acme-corp"
org_name = context.org_name # "Acme Corporation"
email = context.executed_by_email # "user@example.com"
# ❌ Wrong: Trying to access context that's None
async def my_workflow(context):
if context is None: # This shouldn't happen
logger.error( "Context is None")

Some operations require organization context:

# ✅ Correct: Workflow requires org
@workflow(
name="create_user",
requires_org=True # Must have organization context
)
async def create_user(context: ExecutionContext):
await config.get("api_key") # Uses org context
# ⚠️ Risky: Global workflow
@workflow(
name="system_health",
requires_org=False # Can run globally
)
async def system_health(context):
# context might be None or have no org_id
pass
# ✅ Correct: With default value
value = config.get("api_key", default="https://api.example.com")
# ✅ Correct: With error handling
try:
api_key = config.get("api_key")
except KeyError:
logger.error( "api_key not configured")
return {"error": "Configuration missing"}
# ❌ Wrong: No fallback
api_key = config.get("api_key") # Raises if not found

Symptom: Workflow takes much longer than expected

@workflow(name="test")
async def test(context):
# Add timing logs
logger.info( "Starting step 1")
result1 = await step1()
logger.info( "Starting step 2")
result2 = await step2()
# Look in execution log to see timing:
# "Starting step 1" at T+0s
# "Starting step 2" at T+5s <- Step 1 took 5 seconds
# ❌ Slow: Sequential API calls
for user in users:
await create_m365_user(user) # Wait 1-2s
await assign_license(user) # Wait 1-2s
# = 3-4s per user
# ✅ Fast: Parallel operations
async def create_and_assign(user):
user_result = await create_m365_user(user)
return await assign_license(user_result)
tasks = [create_and_assign(user) for user in users]
await asyncio.gather(*tasks)
# All 100 users in ~2-3s instead of 300-400s
Slow workflow usually means:
- Waiting for external API calls
- Network latency
- Rate limiting (retry delays)
Solution:
- Use asyncio.gather() for parallel requests
- Add retry logic with backoff
- Cache frequently accessed data

Symptom: Dropdown in form is empty or showing error

Terminal window
# Check if data provider is discoverable
curl http://localhost:7071/api/data-providers
# Should include:
{
"dataProviders": [
{"name": "get_available_licenses", ...},
{"name": "get_departments", ...}
]
}
# ✅ Correct format
@data_provider(name="get_licenses")
async def get_licenses(context):
return [
{"label": "E3", "value": "SPE_E3", "metadata": {}},
{"label": "E5", "value": "SPE_E5", "metadata": {}}
]
# ❌ Wrong: Not list of dicts
@data_provider(name="get_licenses")
async def get_licenses(context):
return ["E3", "E5"] # Missing label/value structure
# ❌ Wrong: Returns None
@data_provider(name="get_licenses")
async def get_licenses(context):
# Missing return statement
pass

Data providers are cached:

@data_provider(
name="get_licenses",
cache_ttl_seconds=300 # Cached for 5 minutes
)
async def get_licenses(context):
pass
# If data doesn't update:
# - Wait 5 minutes for cache to expire
# - Or reduce cache_ttl_seconds to 0 for testing

Symptom: Changes to workflow file don’t appear after saving

Issue 1: File Not Reloaded During Development

Section titled “Issue 1: File Not Reloaded During Development”

During local development with func start:

Expected workflow reload:
1. Edit workflow file
2. Save file
3. Reload page
4. New version appears
If not working:
- Restart `func start` (stops function app, starts fresh)
- Check file is actually saved
- Verify no syntax errors preventing import

In production, changes require redeployment:

Terminal window
# After pushing code changes:
1. Commit to Git
2. GitHub Actions CI/CD triggers
3. Builds and deploys new version
4. Function App restarts with new code
5. Discovery runs and picks up changes

Discovery happens at startup, not continuously.

Symptom: “Cannot read/write files” or “Permission denied”

Bifrost provides isolated file systems:

/tmp/org-12345/workflow-abc123/my_file.txt
# ✅ Correct: Use paths provided by context
file_path = context.get_temp_file_path("my_file.txt")
# ✅ Correct: Use workspace path (if configured)
file_path = context.get_workspace_file_path("uploads/document.pdf")
# ❌ Wrong: Direct filesystem access
with open("/etc/sensitive_file") as f: # Denied by import restrictor
pass
# ❌ Wrong: Hardcoded paths
with open("/tmp/my_file.txt") as f: # May not exist or have permissions
pass
Temp directory (/tmp): Limited by Azure Functions (usually 512 MB)
Workspace (/workspace): Limited by Azure Files quota (depends on config)
# Careful with large files
file_size_mb = 100 # 100 MB
if file_size_mb > 500:
logger.error( "File too large for temp storage")
return {"error": "File too large"}
IssueMost Likely CauseFix
Workflow not discoveredWrong location or no decoratorCheck file location and decorator
Parameters missingDecorator order wrong or name mismatchCheck @param order, verify names match
Import errorNot using public APIUse bifrost.* instead of shared.*
TimeoutWorkflow too longIncrease timeout or use async mode
Context NoneWorkflow doesn’t require orgAdd requires_org=True
Slow executionSequential operationsUse asyncio.gather() for parallel work
Data provider emptyWrong format or exceptionCheck return format and logs
File access deniedUsing restricted pathsUse context.get_temp_file_path()
  • Execution Logs: Workflows page → Execution History → [Click execution]
  • Discovery Logs: Azure Portal → Function App → Log Stream
  • Debugging: Add logging statements throughout workflow using Python’s logging module
  • Check Status: curl http://your-bifrost.com/api/health