Skip to content

Custom REST APIs

Call any HTTP API from Bifrost workflows

Integrate with any REST API using standard HTTP libraries. This guide shows common patterns for calling external APIs securely.

Use these Python libraries for HTTP requests:

  • aiohttp (recommended for async) - Already installed
  • requests (synchronous) - Already installed
  • httpx (async/sync) - Install if needed

Fetch data from an API:

from bifrost import workflow, ExecutionContext
import aiohttp
@workflow(name="fetch_data", description="Get data from API")
async def fetch_data(context: ExecutionContext):
url = "https://api.example.com/data"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
data = await resp.json()
return {"data": data}

Send data to an API:

from bifrost import workflow, param, ExecutionContext
import aiohttp
@workflow(name="create_item", description="Create item via API")
@param("name", type="string", required=True)
@param("description", type="string", required=True)
async def create_item(
context: ExecutionContext,
name: str,
description: str
):
url = "https://api.example.com/items"
payload = {"name": name, "description": description}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as resp:
resp.raise_for_status()
result = await resp.json()
return {"success": True, "item_id": result["id"]}
from bifrost import config
import aiohttp
@workflow(name="call_with_api_key")
async def call_with_api_key(context: ExecutionContext):
api_key = await config.get("api_key")
url = "https://api.example.com/data"
headers = {"X-API-Key": api_key}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
resp.raise_for_status()
return await resp.json()
from bifrost import secrets
import aiohttp
@workflow(name="call_with_bearer")
async def call_with_bearer(context: ExecutionContext):
token = await secrets.get("api_token")
url = "https://api.example.com/data"
headers = {"Authorization": f"Bearer {token}"}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
resp.raise_for_status()
return await resp.json()
import aiohttp
from aiohttp import BasicAuth
from bifrost import secrets
@workflow(name="call_with_basic_auth")
async def call_with_basic_auth(context: ExecutionContext):
username = await secrets.get("api_username")
password = await secrets.get("api_password")
url = "https://api.example.com/data"
auth = BasicAuth(username, password)
async with aiohttp.ClientSession() as session:
async with session.get(url, auth=auth) as resp:
resp.raise_for_status()
return await resp.json()

Add URL parameters:

url = "https://api.example.com/users"
params = {"page": 1, "limit": 100, "status": "active"}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
# Requests: https://api.example.com/users?page=1&limit=100&status=active
return await resp.json()

Send custom headers:

headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": "Bifrost/1.0",
"X-Custom-Header": "value"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
return await resp.json()
payload = {"name": "John", "email": "john@example.com"}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as resp:
return await resp.json()

Handle API errors gracefully:

import aiohttp
@workflow(name="safe_api_call")
async def safe_api_call(context: ExecutionContext):
url = "https://api.example.com/data"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
return {"success": True, "data": data}
elif resp.status == 404:
return {"success": False, "error": "Resource not found"}
elif resp.status == 401:
return {"success": False, "error": "Unauthorized"}
else:
error = await resp.text()
return {"success": False, "error": error}
except aiohttp.ClientError as e:
return {"success": False, "error": f"Network error: {str(e)}"}

Set request timeouts:

import aiohttp
timeout = aiohttp.ClientTimeout(total=30) # 30 seconds
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
return await resp.json()

Retry failed requests:

import asyncio
import aiohttp
@workflow(name="api_with_retry")
async def api_with_retry(context: ExecutionContext):
url = "https://api.example.com/data"
max_retries = 3
backoff = 1
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError:
if attempt < max_retries - 1:
await asyncio.sleep(backoff)
backoff *= 2 # Exponential backoff
else:
raise

Handle paginated responses:

@workflow(name="get_all_pages")
async def get_all_pages(context: ExecutionContext):
all_items = []
url = "https://api.example.com/items"
page = 1
async with aiohttp.ClientSession() as session:
while True:
params = {"page": page, "limit": 100}
async with session.get(url, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
items = data.get("items", [])
if not items:
break
all_items.extend(items)
page += 1
return {"items": all_items, "total": len(all_items)}

Receive webhook calls:

import hmac
import hashlib
from bifrost import workflow, param, ExecutionContext, config
@workflow(
name="handle_webhook",
endpoint_enabled=True,
allowed_methods=["POST"],
public_endpoint=True
)
@param("payload", type="json", required=True)
@param("signature", type="string", required=True)
async def handle_webhook(
context: ExecutionContext,
payload: dict,
signature: str
):
# Verify signature
secret = await config.get("webhook_secret")
payload_str = str(payload)
expected = hmac.new(
secret.encode(),
payload_str.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected, signature):
raise ValueError("Invalid signature")
# Process webhook
return {"success": True, "received": payload}

For synchronous operations:

import requests
from bifrost import secrets
@workflow(name="sync_api_call")
async def sync_api_call(context: ExecutionContext):
api_key = await secrets.get("api_key")
response = requests.get(
"https://api.example.com/data",
headers={"Authorization": f"Bearer {api_key}"},
timeout=30
)
response.raise_for_status()
return response.json()
  1. Use aiohttp for async - Workflows are async, use async HTTP clients
  2. Store credentials securely - Use config.get() or secrets.get()
  3. Set timeouts - Prevent hanging on slow APIs
  4. Handle errors - Check status codes and catch exceptions
  5. Use raise_for_status() - Automatically raise on HTTP errors
  6. Implement retries - For transient failures
  7. Respect rate limits - Add delays between requests if needed
import aiohttp
from bifrost import config
@workflow(name="create_stripe_customer")
@param("email", type="email", required=True)
async def create_stripe_customer(context: ExecutionContext, email: str):
api_key = await config.get("stripe_api_key")
url = "https://api.stripe.com/v1/customers"
auth = aiohttp.BasicAuth(api_key, "")
data = {"email": email}
async with aiohttp.ClientSession() as session:
async with session.post(url, auth=auth, data=data) as resp:
resp.raise_for_status()
customer = await resp.json()
return {"customer_id": customer["id"]}
import aiohttp
from bifrost import secrets
@workflow(name="send_email")
@param("to", type="email", required=True)
@param("subject", type="string", required=True)
@param("body", type="string", required=True)
async def send_email(
context: ExecutionContext,
to: str,
subject: str,
body: str
):
api_key = await secrets.get("sendgrid_api_key")
url = "https://api.sendgrid.com/v3/mail/send"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"personalizations": [{"to": [{"email": to}]}],
"from": {"email": "noreply@example.com"},
"subject": subject,
"content": [{"type": "text/plain", "value": body}]
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as resp:
resp.raise_for_status()
return {"success": True}

Connection timeout: Increase timeout or check API availability

401 Unauthorized: Verify API key/token is correct and not expired

429 Too Many Requests: Implement rate limiting and retry logic

SSL errors: Update certificates or use ssl=False (not recommended for production)