Custom REST APIs
Call any HTTP API from Bifrost workflows
Integrate with any REST API using standard HTTP libraries. This guide shows common patterns for calling external APIs securely.
HTTP Libraries
Section titled “HTTP Libraries”Use these Python libraries for HTTP requests:
- aiohttp (recommended for async) - Already installed
- requests (synchronous) - Already installed
- httpx (async/sync) - Install if needed
Basic GET Request
Section titled “Basic GET Request”Fetch data from an API:
from bifrost import workflow, ExecutionContextimport aiohttp
@workflow(name="fetch_data", description="Get data from API")async def fetch_data(context: ExecutionContext): url = "https://api.example.com/data"
async with aiohttp.ClientSession() as session: async with session.get(url) as resp: resp.raise_for_status() data = await resp.json() return {"data": data}POST Request
Section titled “POST Request”Send data to an API:
from bifrost import workflow, param, ExecutionContextimport aiohttp
@workflow(name="create_item", description="Create item via API")@param("name", type="string", required=True)@param("description", type="string", required=True)async def create_item( context: ExecutionContext, name: str, description: str): url = "https://api.example.com/items" payload = {"name": name, "description": description}
async with aiohttp.ClientSession() as session: async with session.post(url, json=payload) as resp: resp.raise_for_status() result = await resp.json() return {"success": True, "item_id": result["id"]}Authentication Patterns
Section titled “Authentication Patterns”API Key in Header
Section titled “API Key in Header”from bifrost import configimport aiohttp
@workflow(name="call_with_api_key")async def call_with_api_key(context: ExecutionContext): api_key = await config.get("api_key")
url = "https://api.example.com/data" headers = {"X-API-Key": api_key}
async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as resp: resp.raise_for_status() return await resp.json()Bearer Token
Section titled “Bearer Token”from bifrost import secretsimport aiohttp
@workflow(name="call_with_bearer")async def call_with_bearer(context: ExecutionContext): token = await secrets.get("api_token")
url = "https://api.example.com/data" headers = {"Authorization": f"Bearer {token}"}
async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as resp: resp.raise_for_status() return await resp.json()Basic Auth
Section titled “Basic Auth”import aiohttpfrom aiohttp import BasicAuthfrom bifrost import secrets
@workflow(name="call_with_basic_auth")async def call_with_basic_auth(context: ExecutionContext): username = await secrets.get("api_username") password = await secrets.get("api_password")
url = "https://api.example.com/data" auth = BasicAuth(username, password)
async with aiohttp.ClientSession() as session: async with session.get(url, auth=auth) as resp: resp.raise_for_status() return await resp.json()Query Parameters
Section titled “Query Parameters”Add URL parameters:
url = "https://api.example.com/users"params = {"page": 1, "limit": 100, "status": "active"}
async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as resp: # Requests: https://api.example.com/users?page=1&limit=100&status=active return await resp.json()Headers
Section titled “Headers”Send custom headers:
headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "User-Agent": "Bifrost/1.0", "X-Custom-Header": "value"}
async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as resp: return await resp.json()Request Body
Section titled “Request Body”payload = {"name": "John", "email": "john@example.com"}
async with aiohttp.ClientSession() as session:async with session.post(url, json=payload) as resp:return await resp.json()data = {"key1": "value1", "key2": "value2"}
async with aiohttp.ClientSession() as session: async with session.post(url, data=data) as resp: return await resp.json()import aiohttp
data = aiohttp.FormData()data.add_field('file', open('document.pdf', 'rb'), filename='document.pdf')data.add_field('description', 'My file')
async with aiohttp.ClientSession() as session:async with session.post(url, data=data) as resp:return await resp.json()Error Handling
Section titled “Error Handling”Handle API errors gracefully:
import aiohttp
@workflow(name="safe_api_call")async def safe_api_call(context: ExecutionContext): url = "https://api.example.com/data"
try: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status == 200: data = await resp.json() return {"success": True, "data": data} elif resp.status == 404: return {"success": False, "error": "Resource not found"} elif resp.status == 401: return {"success": False, "error": "Unauthorized"} else: error = await resp.text() return {"success": False, "error": error}
except aiohttp.ClientError as e: return {"success": False, "error": f"Network error: {str(e)}"}Timeouts
Section titled “Timeouts”Set request timeouts:
import aiohttp
timeout = aiohttp.ClientTimeout(total=30) # 30 seconds
async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as resp: return await resp.json()Retry Logic
Section titled “Retry Logic”Retry failed requests:
import asyncioimport aiohttp
@workflow(name="api_with_retry")async def api_with_retry(context: ExecutionContext): url = "https://api.example.com/data" max_retries = 3 backoff = 1
for attempt in range(max_retries): try: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: resp.raise_for_status() return await resp.json()
except aiohttp.ClientError: if attempt < max_retries - 1: await asyncio.sleep(backoff) backoff *= 2 # Exponential backoff else: raisePagination
Section titled “Pagination”Handle paginated responses:
@workflow(name="get_all_pages")async def get_all_pages(context: ExecutionContext): all_items = [] url = "https://api.example.com/items" page = 1
async with aiohttp.ClientSession() as session: while True: params = {"page": page, "limit": 100}
async with session.get(url, params=params) as resp: resp.raise_for_status() data = await resp.json()
items = data.get("items", []) if not items: break
all_items.extend(items) page += 1
return {"items": all_items, "total": len(all_items)}Webhooks
Section titled “Webhooks”Receive webhook calls:
import hmacimport hashlibfrom bifrost import workflow, param, ExecutionContext, config
@workflow( name="handle_webhook", endpoint_enabled=True, allowed_methods=["POST"], public_endpoint=True)@param("payload", type="json", required=True)@param("signature", type="string", required=True)async def handle_webhook( context: ExecutionContext, payload: dict, signature: str): # Verify signature secret = await config.get("webhook_secret") payload_str = str(payload)
expected = hmac.new( secret.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest()
if not hmac.compare_digest(expected, signature): raise ValueError("Invalid signature")
# Process webhook return {"success": True, "received": payload}Using requests Library
Section titled “Using requests Library”For synchronous operations:
import requestsfrom bifrost import secrets
@workflow(name="sync_api_call")async def sync_api_call(context: ExecutionContext): api_key = await secrets.get("api_key")
response = requests.get( "https://api.example.com/data", headers={"Authorization": f"Bearer {api_key}"}, timeout=30 )
response.raise_for_status() return response.json()Best Practices
Section titled “Best Practices”- Use aiohttp for async - Workflows are async, use async HTTP clients
- Store credentials securely - Use
config.get()orsecrets.get() - Set timeouts - Prevent hanging on slow APIs
- Handle errors - Check status codes and catch exceptions
- Use raise_for_status() - Automatically raise on HTTP errors
- Implement retries - For transient failures
- Respect rate limits - Add delays between requests if needed
Common Patterns
Section titled “Common Patterns”Stripe API
Section titled “Stripe API”import aiohttpfrom bifrost import config
@workflow(name="create_stripe_customer")@param("email", type="email", required=True)async def create_stripe_customer(context: ExecutionContext, email: str): api_key = await config.get("stripe_api_key")
url = "https://api.stripe.com/v1/customers" auth = aiohttp.BasicAuth(api_key, "") data = {"email": email}
async with aiohttp.ClientSession() as session: async with session.post(url, auth=auth, data=data) as resp: resp.raise_for_status() customer = await resp.json() return {"customer_id": customer["id"]}SendGrid Email
Section titled “SendGrid Email”import aiohttpfrom bifrost import secrets
@workflow(name="send_email")@param("to", type="email", required=True)@param("subject", type="string", required=True)@param("body", type="string", required=True)async def send_email( context: ExecutionContext, to: str, subject: str, body: str): api_key = await secrets.get("sendgrid_api_key")
url = "https://api.sendgrid.com/v3/mail/send" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }
payload = { "personalizations": [{"to": [{"email": to}]}], "from": {"email": "noreply@example.com"}, "subject": subject, "content": [{"type": "text/plain", "value": body}] }
async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=payload) as resp: resp.raise_for_status() return {"success": True}Troubleshooting
Section titled “Troubleshooting”Connection timeout: Increase timeout or check API availability
401 Unauthorized: Verify API key/token is correct and not expired
429 Too Many Requests: Implement rate limiting and retry logic
SSL errors: Update certificates or use ssl=False (not recommended for production)
Next Steps
Section titled “Next Steps”- Store API Keys Securely - Manage API credentials
- Automate Microsoft 365 - Microsoft Graph patterns
- OAuth Integration - OAuth-based APIs