API Reference
Cyclonetix provides a RESTful API for integrating with other systems and programmatically managing workflows. This reference documents all available API endpoints, parameters, and response formats.
API Base URL
By default, the API is available at:
http://<cyclonetix-host>:3000/api
Authentication
When security is enabled, you can authenticate using:
API Key: Pass the API key in the
X-API-Key
header:X-API-Key: your-api-key-here
OAuth Token: After OAuth authentication, use the session cookie or pass the token in the
Authorization
header:Authorization: Bearer your-token-here
API Endpoints
Task Management
List Tasks
GET /api/tasks
Query parameters:
Parameter | Description |
---|---|
filter |
Filter tasks by name or ID pattern |
queue |
Filter tasks by queue |
limit |
Maximum number of tasks to return |
offset |
Pagination offset |
Example response:
{
"tasks": [
{
"id": "data_extraction",
"name": "Data Extraction",
"description": "Extract data from source system",
"command": "python extract.py --source ${SOURCE}",
"dependencies": [],
"parameters": {
"SOURCE": "prod_db"
},
"queue": "default"
},
// More tasks...
],
"total": 24,
"limit": 10,
"offset": 0
}
Get Task
GET /api/tasks/{task_id}
Example response:
{
"id": "data_extraction",
"name": "Data Extraction",
"description": "Extract data from source system",
"command": "python extract.py --source ${SOURCE}",
"dependencies": [],
"parameters": {
"SOURCE": "prod_db"
},
"queue": "default",
"created_at": "2023-01-01T00:00:00Z",
"updated_at": "2023-01-15T00:00:00Z"
}
Create/Update Task
PUT /api/tasks/{task_id}
Request body:
{
"name": "Data Extraction",
"description": "Extract data from source system",
"command": "python extract.py --source ${SOURCE}",
"dependencies": [],
"parameters": {
"SOURCE": "prod_db"
},
"queue": "default"
}
Response: The created/updated task object.
Delete Task
DELETE /api/tasks/{task_id}
Response:
{
"success": true,
"message": "Task deleted successfully"
}
DAG Management
List DAGs
GET /api/dags
Query parameters:
Parameter | Description |
---|---|
filter |
Filter DAGs by name or ID pattern |
limit |
Maximum number of DAGs to return |
offset |
Pagination offset |
Example response:
{
"dags": [
{
"id": "etl_pipeline",
"name": "ETL Pipeline",
"description": "Extract, transform, and load data",
"tasks": ["data_extraction", "data_transformation", "data_loading"],
"tags": ["etl", "daily"]
},
// More DAGs...
],
"total": 12,
"limit": 10,
"offset": 0
}
Get DAG
GET /api/dags/{dag_id}
Example response:
{
"id": "etl_pipeline",
"name": "ETL Pipeline",
"description": "Extract, transform, and load data",
"tasks": [
{
"id": "data_extraction",
"parameters": {
"SOURCE": "prod_db"
}
},
{
"id": "data_transformation",
"parameters": {
"TRANSFORM_TYPE": "standard"
}
},
{
"id": "data_loading",
"parameters": {
"DESTINATION": "data_warehouse"
}
}
],
"context": "production",
"tags": ["etl", "daily"],
"created_at": "2023-01-01T00:00:00Z",
"updated_at": "2023-01-15T00:00:00Z"
}
Create/Update DAG
PUT /api/dags/{dag_id}
Request body:
{
"name": "ETL Pipeline",
"description": "Extract, transform, and load data",
"tasks": [
{
"id": "data_extraction",
"parameters": {
"SOURCE": "prod_db"
}
},
{
"id": "data_transformation",
"parameters": {
"TRANSFORM_TYPE": "standard"
}
},
{
"id": "data_loading",
"parameters": {
"DESTINATION": "data_warehouse"
}
}
],
"context": "production",
"tags": ["etl", "daily"]
}
Response: The created/updated DAG object.
Delete DAG
DELETE /api/dags/{dag_id}
Response:
{
"success": true,
"message": "DAG deleted successfully"
}
Workflow Execution
Schedule Task
POST /api/schedule-task
Request body:
{
"task_id": "data_extraction",
"context": "production",
"env_vars": {
"DATA_DATE": "2023-11-01",
"LOG_LEVEL": "info"
},
"parameters": {
"SOURCE": "prod_db",
"LIMIT": "1000"
},
"run_id": "custom_run_id_20231101", // Optional
"queue": "high_priority", // Optional
"timeout_seconds": 3600, // Optional
"retries": 3, // Optional
"retry_delay_seconds": 300 // Optional
}
Response:
{
"success": true,
"run_id": "custom_run_id_20231101",
"dag_run_id": "dag_57f3e2a1",
"status": "pending",
"start_time": "2023-11-01T12:00:00Z"
}
Schedule DAG
POST /api/schedule-dag
Request body:
{
"dag_id": "etl_pipeline",
"context": "production",
"env_vars": {
"DATA_DATE": "2023-11-01"
},
"parameters": {
"data_extraction.SOURCE": "prod_db",
"data_loading.DESTINATION": "data_warehouse"
},
"run_id": "etl_20231101", // Optional
"timeout_seconds": 7200 // Optional
}
Response:
{
"success": true,
"run_id": "etl_20231101",
"status": "pending",
"start_time": "2023-11-01T12:00:00Z",
"tasks": [
{
"id": "data_extraction",
"task_run_id": "data_extraction_a1b2c3d4",
"status": "pending"
},
{
"id": "data_transformation",
"task_run_id": "data_transformation_b2c3d4e5",
"status": "pending"
},
{
"id": "data_loading",
"task_run_id": "data_loading_c3d4e5f6",
"status": "pending"
}
]
}
Cancel Run
POST /api/cancel-run/{run_id}
Query parameters:
Parameter | Description |
---|---|
force |
Force cancel even if in critical section (true /false ) |
Response:
{
"success": true,
"message": "Run cancelled successfully",
"run_id": "etl_20231101"
}
Rerun
POST /api/rerun/{run_id}
Request body:
{
"failed_only": true, // Optional
"from_task": "data_transformation", // Optional
"reset_params": false // Optional
}
Response:
{
"success": true,
"new_run_id": "etl_20231101_rerun",
"original_run_id": "etl_20231101",
"status": "pending",
"start_time": "2023-11-01T14:00:00Z"
}
Run Management
List Runs
GET /api/runs
Query parameters:
Parameter | Description |
---|---|
status |
Filter by status (pending , running , completed , failed ) |
dag_id |
Filter by DAG ID |
since |
Show runs since time (ISO 8601 format) |
until |
Show runs until time (ISO 8601 format) |
limit |
Maximum number of runs to return |
offset |
Pagination offset |
Example response:
{
"runs": [
{
"run_id": "etl_20231101",
"dag_id": "etl_pipeline",
"status": "completed",
"start_time": "2023-11-01T12:00:00Z",
"end_time": "2023-11-01T12:30:00Z",
"task_count": 3,
"completed_tasks": 3,
"failed_tasks": 0
},
// More runs...
],
"total": 45,
"limit": 10,
"offset": 0
}
Get Run
GET /api/runs/{run_id}
Query parameters:
Parameter | Description |
---|---|
include_tasks |
Include task details (true /false ) |
include_logs |
Include task logs (true /false ) |
Example response:
{
"run_id": "etl_20231101",
"dag_id": "etl_pipeline",
"status": "completed",
"start_time": "2023-11-01T12:00:00Z",
"end_time": "2023-11-01T12:30:00Z",
"duration_seconds": 1800,
"task_count": 3,
"completed_tasks": 3,
"failed_tasks": 0,
"tasks": [
{
"task_id": "data_extraction",
"task_run_id": "data_extraction_a1b2c3d4",
"status": "completed",
"start_time": "2023-11-01T12:00:00Z",
"end_time": "2023-11-01T12:10:00Z",
"duration_seconds": 600,
"logs": "Starting extraction...\nExtracted 10000 records\nCompleted successfully"
},
// More tasks...
],
"context": {
"DATA_DATE": "2023-11-01",
"ENV": "production"
}
}
Get Run Graph
GET /api/runs/{run_id}/graph
Example response:
{
"run_id": "etl_20231101",
"dag_id": "etl_pipeline",
"nodes": [
{
"id": "data_extraction_a1b2c3d4",
"task_id": "data_extraction",
"status": "completed"
},
{
"id": "data_transformation_b2c3d4e5",
"task_id": "data_transformation",
"status": "completed"
},
{
"id": "data_loading_c3d4e5f6",
"task_id": "data_loading",
"status": "completed"
}
],
"edges": [
{
"from": "data_extraction_a1b2c3d4",
"to": "data_transformation_b2c3d4e5"
},
{
"from": "data_transformation_b2c3d4e5",
"to": "data_loading_c3d4e5f6"
}
]
}
Get Task Logs
GET /api/tasks/{task_run_id}/logs
Query parameters:
Parameter | Description |
---|---|
tail |
Number of lines to return from the end |
since |
Return logs since timestamp (ISO 8601 format) |
Example response:
{
"task_run_id": "data_extraction_a1b2c3d4",
"logs": "Starting extraction...\nExtracted 10000 records\nCompleted successfully",
"has_more": false,
"start_time": "2023-11-01T12:00:00Z",
"end_time": "2023-11-01T12:10:00Z"
}
Agent and Queue Management
List Agents
GET /api/agents
Example response:
{
"agents": [
{
"agent_id": "agent_57f3e2a1",
"last_heartbeat": "2023-11-01T12:35:00Z",
"status": "active",
"assigned_tasks": [
"data_extraction_a1b2c3d4"
],
"queues": [
"default",
"high_memory"
],
"tags": [
"region:us-east",
"type:general"
]
},
// More agents...
]
}
List Queues
GET /api/queues
Example response:
{
"queues": [
{
"name": "default",
"length": 5,
"consumer_count": 3,
"tasks": [
{
"task_run_id": "data_extraction_a1b2c3d4",
"dag_run_id": "etl_20231101",
"position": 0
},
// More tasks...
]
},
// More queues...
]
}
Purge Queue
POST /api/queues/{queue_name}/purge
Response:
{
"success": true,
"message": "Queue purged successfully",
"tasks_removed": 5
}
Context and Parameter Set Management
List Contexts
GET /api/contexts
Example response:
{
"contexts": [
{
"id": "development",
"variables": {
"ENV": "development",
"LOG_LEVEL": "debug"
}
},
{
"id": "production",
"variables": {
"ENV": "production",
"LOG_LEVEL": "info"
}
}
]
}
Get Context
GET /api/contexts/{context_id}
Example response:
{
"id": "production",
"variables": {
"ENV": "production",
"LOG_LEVEL": "info",
"DATA_PATH": "/data/production",
"API_URL": "https://api.example.com",
"MAX_WORKERS": "16"
},
"extends": null,
"created_at": "2023-01-01T00:00:00Z",
"updated_at": "2023-01-15T00:00:00Z"
}
Create/Update Context
PUT /api/contexts/{context_id}
Request body:
{
"variables": {
"ENV": "production",
"LOG_LEVEL": "info",
"DATA_PATH": "/data/production",
"API_URL": "https://api.example.com",
"MAX_WORKERS": "16"
},
"extends": null
}
Response: The created/updated context object.
List Parameter Sets
GET /api/parameter-sets
Example response:
{
"parameter_sets": [
{
"id": "small_training",
"parameters": {
"EPOCHS": "10",
"BATCH_SIZE": "16"
}
},
{
"id": "large_training",
"parameters": {
"EPOCHS": "100",
"BATCH_SIZE": "128"
}
}
]
}
Get Parameter Set
GET /api/parameter-sets/{parameter_set_id}
Example response:
{
"id": "large_training",
"parameters": {
"EPOCHS": "100",
"BATCH_SIZE": "128",
"LEARNING_RATE": "0.001"
},
"created_at": "2023-01-01T00:00:00Z",
"updated_at": "2023-01-15T00:00:00Z"
}
Create/Update Parameter Set
PUT /api/parameter-sets/{parameter_set_id}
Request body:
{
"parameters": {
"EPOCHS": "100",
"BATCH_SIZE": "128",
"LEARNING_RATE": "0.001"
}
}
Response: The created/updated parameter set object.
System Management
System Status
GET /api/status
Example response:
{
"status": "healthy",
"version": "0.1.0",
"uptime_seconds": 86400,
"backend": {
"type": "redis",
"status": "connected",
"latency_ms": 5
},
"components": {
"orchestrator": {
"status": "running",
"count": 1
},
"agents": {
"status": "running",
"count": 3,
"active": 3,
"inactive": 0
},
"ui": {
"status": "running"
}
},
"queues": {
"total_pending_tasks": 10,
"queues": [
{
"name": "default",
"length": 5
},
{
"name": "high_memory",
"length": 3
},
{
"name": "gpu_tasks",
"length": 2
}
]
}
}
Health Check
GET /api/health
Example response:
{
"status": "healthy",
"components": {
"backend": "healthy",
"orchestrator": "healthy",
"agents": "healthy"
}
}
Metrics
GET /api/metrics
Example response:
{
"tasks": {
"pending": 10,
"running": 5,
"completed": 100,
"failed": 2,
"total": 117
},
"runs": {
"pending": 1,
"running": 2,
"completed": 20,
"failed": 1,
"total": 24
},
"performance": {
"average_task_duration_seconds": 45.2,
"average_dag_duration_seconds": 320.5,
"tasks_per_minute": 12.5
},
"resources": {
"cpu_usage_percent": 25.5,
"memory_usage_mb": 1024.5,
"disk_usage_percent": 45.2
}
}
Event Webhooks
Register Webhook
POST /api/webhooks
Request body:
{
"name": "Task Completion Notification",
"url": "https://example.com/webhook",
"events": ["task.completed", "task.failed", "dag.completed"],
"secret": "your-webhook-secret",
"headers": {
"Custom-Header": "custom-value"
},
"filters": {
"dag_id": "etl_pipeline",
"task_id": "data_extraction"
}
}
Response:
{
"id": "webhook_a1b2c3d4",
"name": "Task Completion Notification",
"url": "https://example.com/webhook",
"events": ["task.completed", "task.failed", "dag.completed"],
"filters": {
"dag_id": "etl_pipeline",
"task_id": "data_extraction"
},
"created_at": "2023-11-01T12:00:00Z"
}
List Webhooks
GET /api/webhooks
Example response:
{
"webhooks": [
{
"id": "webhook_a1b2c3d4",
"name": "Task Completion Notification",
"url": "https://example.com/webhook",
"events": ["task.completed", "task.failed", "dag.completed"],
"filters": {
"dag_id": "etl_pipeline",
"task_id": "data_extraction"
},
"created_at": "2023-11-01T12:00:00Z"
},
// More webhooks...
]
}
Delete Webhook
DELETE /api/webhooks/{webhook_id}
Response:
{
"success": true,
"message": "Webhook deleted successfully"
}
Event Types
The following event types can be used with webhooks:
Event Type | Description |
---|---|
task.pending |
Task is pending execution |
task.queued |
Task has been queued |
task.running |
Task has started running |
task.completed |
Task has completed successfully |
task.failed |
Task has failed |
dag.pending |
DAG is pending execution |
dag.running |
DAG has started running |
dag.completed |
DAG has completed successfully |
dag.failed |
DAG has failed |
agent.registered |
New agent has registered |
agent.offline |
Agent has gone offline |
Webhook Payload Format
When an event occurs, Cyclonetix sends a POST request to the registered webhook URL with the following payload:
{
"event": "task.completed",
"timestamp": "2023-11-01T12:10:00Z",
"data": {
"run_id": "etl_20231101",
"dag_id": "etl_pipeline",
"task_id": "data_extraction",
"task_run_id": "data_extraction_a1b2c3d4",
"status": "completed",
"start_time": "2023-11-01T12:00:00Z",
"end_time": "2023-11-01T12:10:00Z",
"duration_seconds": 600
}
}
For security, Cyclonetix includes an X-Cyclonetix-Signature
header with an HMAC-SHA256 signature of the payload using your webhook secret.
Error Handling
All API endpoints return standard HTTP status codes:
200 OK
: Request successful201 Created
: Resource created successfully400 Bad Request
: Invalid request parameters401 Unauthorized
: Authentication required403 Forbidden
: Permission denied404 Not Found
: Resource not found409 Conflict
: Resource already exists500 Internal Server Error
: Server error
Error responses have a consistent format:
{
"error": true,
"code": "validation_error",
"message": "Invalid parameter value",
"details": {
"field": "timeout_seconds",
"error": "Must be a positive integer"
}
}
Pagination
List endpoints support pagination with the following parameters:
limit
: Maximum number of items to returnoffset
: Number of items to skip
Response includes metadata for pagination:
{
"items": [...],
"total": 100,
"limit": 10,
"offset": 0,
"next": "/api/tasks?limit=10&offset=10",
"prev": null
}
Filtering and Sorting
Many endpoints support filtering and sorting:
- Filtering: Use query parameters like
?status=completed&dag_id=etl_pipeline
- Sorting: Use the
sort
parameter, e.g.,?sort=start_time:desc
Next Steps
- Review the CLI Reference for command-line interface options
- Check the Configuration Reference for configuration details
- Explore the YAML Schema for task and DAG definitions