API Reference

Cyclonetix provides a RESTful API for integrating with other systems and programmatically managing workflows. This reference documents all available API endpoints, parameters, and response formats.

API Base URL

By default, the API is available at:

http://<cyclonetix-host>:3000/api

Authentication

When security is enabled, you can authenticate using:

  1. API Key: Pass the API key in the X-API-Key header:

    X-API-Key: your-api-key-here
  2. OAuth Token: After OAuth authentication, use the session cookie or pass the token in the Authorization header:

    Authorization: Bearer your-token-here

API Endpoints

Task Management

List Tasks

GET /api/tasks

Query parameters:

Parameter Description
filter Filter tasks by name or ID pattern
queue Filter tasks by queue
limit Maximum number of tasks to return
offset Pagination offset

Example response:

{
  "tasks": [
    {
      "id": "data_extraction",
      "name": "Data Extraction",
      "description": "Extract data from source system",
      "command": "python extract.py --source ${SOURCE}",
      "dependencies": [],
      "parameters": {
        "SOURCE": "prod_db"
      },
      "queue": "default"
    },
    // More tasks...
  ],
  "total": 24,
  "limit": 10,
  "offset": 0
}

Get Task

GET /api/tasks/{task_id}

Example response:

{
  "id": "data_extraction",
  "name": "Data Extraction",
  "description": "Extract data from source system",
  "command": "python extract.py --source ${SOURCE}",
  "dependencies": [],
  "parameters": {
    "SOURCE": "prod_db"
  },
  "queue": "default",
  "created_at": "2023-01-01T00:00:00Z",
  "updated_at": "2023-01-15T00:00:00Z"
}

Create/Update Task

PUT /api/tasks/{task_id}

Request body:

{
  "name": "Data Extraction",
  "description": "Extract data from source system",
  "command": "python extract.py --source ${SOURCE}",
  "dependencies": [],
  "parameters": {
    "SOURCE": "prod_db"
  },
  "queue": "default"
}

Response: The created/updated task object.

Delete Task

DELETE /api/tasks/{task_id}

Response:

{
  "success": true,
  "message": "Task deleted successfully"
}

DAG Management

List DAGs

GET /api/dags

Query parameters:

Parameter Description
filter Filter DAGs by name or ID pattern
limit Maximum number of DAGs to return
offset Pagination offset

Example response:

{
  "dags": [
    {
      "id": "etl_pipeline",
      "name": "ETL Pipeline",
      "description": "Extract, transform, and load data",
      "tasks": ["data_extraction", "data_transformation", "data_loading"],
      "tags": ["etl", "daily"]
    },
    // More DAGs...
  ],
  "total": 12,
  "limit": 10,
  "offset": 0
}

Get DAG

GET /api/dags/{dag_id}

Example response:

{
  "id": "etl_pipeline",
  "name": "ETL Pipeline",
  "description": "Extract, transform, and load data",
  "tasks": [
    {
      "id": "data_extraction",
      "parameters": {
        "SOURCE": "prod_db"
      }
    },
    {
      "id": "data_transformation",
      "parameters": {
        "TRANSFORM_TYPE": "standard"
      }
    },
    {
      "id": "data_loading",
      "parameters": {
        "DESTINATION": "data_warehouse"
      }
    }
  ],
  "context": "production",
  "tags": ["etl", "daily"],
  "created_at": "2023-01-01T00:00:00Z",
  "updated_at": "2023-01-15T00:00:00Z"
}

Create/Update DAG

PUT /api/dags/{dag_id}

Request body:

{
  "name": "ETL Pipeline",
  "description": "Extract, transform, and load data",
  "tasks": [
    {
      "id": "data_extraction",
      "parameters": {
        "SOURCE": "prod_db"
      }
    },
    {
      "id": "data_transformation",
      "parameters": {
        "TRANSFORM_TYPE": "standard"
      }
    },
    {
      "id": "data_loading",
      "parameters": {
        "DESTINATION": "data_warehouse"
      }
    }
  ],
  "context": "production",
  "tags": ["etl", "daily"]
}

Response: The created/updated DAG object.

Delete DAG

DELETE /api/dags/{dag_id}

Response:

{
  "success": true,
  "message": "DAG deleted successfully"
}

Workflow Execution

Schedule Task

POST /api/schedule-task

Request body:

{
  "task_id": "data_extraction",
  "context": "production",
  "env_vars": {
    "DATA_DATE": "2023-11-01",
    "LOG_LEVEL": "info"
  },
  "parameters": {
    "SOURCE": "prod_db",
    "LIMIT": "1000"
  },
  "run_id": "custom_run_id_20231101",  // Optional
  "queue": "high_priority",  // Optional
  "timeout_seconds": 3600,  // Optional
  "retries": 3,  // Optional
  "retry_delay_seconds": 300  // Optional
}

Response:

{
  "success": true,
  "run_id": "custom_run_id_20231101",
  "dag_run_id": "dag_57f3e2a1",
  "status": "pending",
  "start_time": "2023-11-01T12:00:00Z"
}

Schedule DAG

POST /api/schedule-dag

Request body:

{
  "dag_id": "etl_pipeline",
  "context": "production",
  "env_vars": {
    "DATA_DATE": "2023-11-01"
  },
  "parameters": {
    "data_extraction.SOURCE": "prod_db",
    "data_loading.DESTINATION": "data_warehouse"
  },
  "run_id": "etl_20231101",  // Optional
  "timeout_seconds": 7200  // Optional
}

Response:

{
  "success": true,
  "run_id": "etl_20231101",
  "status": "pending",
  "start_time": "2023-11-01T12:00:00Z",
  "tasks": [
    {
      "id": "data_extraction",
      "task_run_id": "data_extraction_a1b2c3d4",
      "status": "pending"
    },
    {
      "id": "data_transformation",
      "task_run_id": "data_transformation_b2c3d4e5",
      "status": "pending"
    },
    {
      "id": "data_loading",
      "task_run_id": "data_loading_c3d4e5f6",
      "status": "pending"
    }
  ]
}

Cancel Run

POST /api/cancel-run/{run_id}

Query parameters:

Parameter Description
force Force cancel even if in critical section (true/false)

Response:

{
  "success": true,
  "message": "Run cancelled successfully",
  "run_id": "etl_20231101"
}

Rerun

POST /api/rerun/{run_id}

Request body:

{
  "failed_only": true,  // Optional
  "from_task": "data_transformation",  // Optional
  "reset_params": false  // Optional
}

Response:

{
  "success": true,
  "new_run_id": "etl_20231101_rerun",
  "original_run_id": "etl_20231101",
  "status": "pending",
  "start_time": "2023-11-01T14:00:00Z"
}

Run Management

List Runs

GET /api/runs

Query parameters:

Parameter Description
status Filter by status (pending, running, completed, failed)
dag_id Filter by DAG ID
since Show runs since time (ISO 8601 format)
until Show runs until time (ISO 8601 format)
limit Maximum number of runs to return
offset Pagination offset

Example response:

{
  "runs": [
    {
      "run_id": "etl_20231101",
      "dag_id": "etl_pipeline",
      "status": "completed",
      "start_time": "2023-11-01T12:00:00Z",
      "end_time": "2023-11-01T12:30:00Z",
      "task_count": 3,
      "completed_tasks": 3,
      "failed_tasks": 0
    },
    // More runs...
  ],
  "total": 45,
  "limit": 10,
  "offset": 0
}

Get Run

GET /api/runs/{run_id}

Query parameters:

Parameter Description
include_tasks Include task details (true/false)
include_logs Include task logs (true/false)

Example response:

{
  "run_id": "etl_20231101",
  "dag_id": "etl_pipeline",
  "status": "completed",
  "start_time": "2023-11-01T12:00:00Z",
  "end_time": "2023-11-01T12:30:00Z",
  "duration_seconds": 1800,
  "task_count": 3,
  "completed_tasks": 3,
  "failed_tasks": 0,
  "tasks": [
    {
      "task_id": "data_extraction",
      "task_run_id": "data_extraction_a1b2c3d4",
      "status": "completed",
      "start_time": "2023-11-01T12:00:00Z",
      "end_time": "2023-11-01T12:10:00Z",
      "duration_seconds": 600,
      "logs": "Starting extraction...\nExtracted 10000 records\nCompleted successfully"
    },
    // More tasks...
  ],
  "context": {
    "DATA_DATE": "2023-11-01",
    "ENV": "production"
  }
}

Get Run Graph

GET /api/runs/{run_id}/graph

Example response:

{
  "run_id": "etl_20231101",
  "dag_id": "etl_pipeline",
  "nodes": [
    {
      "id": "data_extraction_a1b2c3d4",
      "task_id": "data_extraction",
      "status": "completed"
    },
    {
      "id": "data_transformation_b2c3d4e5",
      "task_id": "data_transformation",
      "status": "completed"
    },
    {
      "id": "data_loading_c3d4e5f6",
      "task_id": "data_loading",
      "status": "completed"
    }
  ],
  "edges": [
    {
      "from": "data_extraction_a1b2c3d4",
      "to": "data_transformation_b2c3d4e5"
    },
    {
      "from": "data_transformation_b2c3d4e5",
      "to": "data_loading_c3d4e5f6"
    }
  ]
}

Get Task Logs

GET /api/tasks/{task_run_id}/logs

Query parameters:

Parameter Description
tail Number of lines to return from the end
since Return logs since timestamp (ISO 8601 format)

Example response:

{
  "task_run_id": "data_extraction_a1b2c3d4",
  "logs": "Starting extraction...\nExtracted 10000 records\nCompleted successfully",
  "has_more": false,
  "start_time": "2023-11-01T12:00:00Z",
  "end_time": "2023-11-01T12:10:00Z"
}

Agent and Queue Management

List Agents

GET /api/agents

Example response:

{
  "agents": [
    {
      "agent_id": "agent_57f3e2a1",
      "last_heartbeat": "2023-11-01T12:35:00Z",
      "status": "active",
      "assigned_tasks": [
        "data_extraction_a1b2c3d4"
      ],
      "queues": [
        "default",
        "high_memory"
      ],
      "tags": [
        "region:us-east",
        "type:general"
      ]
    },
    // More agents...
  ]
}

List Queues

GET /api/queues

Example response:

{
  "queues": [
    {
      "name": "default",
      "length": 5,
      "consumer_count": 3,
      "tasks": [
        {
          "task_run_id": "data_extraction_a1b2c3d4",
          "dag_run_id": "etl_20231101",
          "position": 0
        },
        // More tasks...
      ]
    },
    // More queues...
  ]
}

Purge Queue

POST /api/queues/{queue_name}/purge

Response:

{
  "success": true,
  "message": "Queue purged successfully",
  "tasks_removed": 5
}

Context and Parameter Set Management

List Contexts

GET /api/contexts

Example response:

{
  "contexts": [
    {
      "id": "development",
      "variables": {
        "ENV": "development",
        "LOG_LEVEL": "debug"
      }
    },
    {
      "id": "production",
      "variables": {
        "ENV": "production",
        "LOG_LEVEL": "info"
      }
    }
  ]
}

Get Context

GET /api/contexts/{context_id}

Example response:

{
  "id": "production",
  "variables": {
    "ENV": "production",
    "LOG_LEVEL": "info",
    "DATA_PATH": "/data/production",
    "API_URL": "https://api.example.com",
    "MAX_WORKERS": "16"
  },
  "extends": null,
  "created_at": "2023-01-01T00:00:00Z",
  "updated_at": "2023-01-15T00:00:00Z"
}

Create/Update Context

PUT /api/contexts/{context_id}

Request body:

{
  "variables": {
    "ENV": "production",
    "LOG_LEVEL": "info",
    "DATA_PATH": "/data/production",
    "API_URL": "https://api.example.com",
    "MAX_WORKERS": "16"
  },
  "extends": null
}

Response: The created/updated context object.

List Parameter Sets

GET /api/parameter-sets

Example response:

{
  "parameter_sets": [
    {
      "id": "small_training",
      "parameters": {
        "EPOCHS": "10",
        "BATCH_SIZE": "16"
      }
    },
    {
      "id": "large_training",
      "parameters": {
        "EPOCHS": "100",
        "BATCH_SIZE": "128"
      }
    }
  ]
}

Get Parameter Set

GET /api/parameter-sets/{parameter_set_id}

Example response:

{
  "id": "large_training",
  "parameters": {
    "EPOCHS": "100",
    "BATCH_SIZE": "128",
    "LEARNING_RATE": "0.001"
  },
  "created_at": "2023-01-01T00:00:00Z",
  "updated_at": "2023-01-15T00:00:00Z"
}

Create/Update Parameter Set

PUT /api/parameter-sets/{parameter_set_id}

Request body:

{
  "parameters": {
    "EPOCHS": "100",
    "BATCH_SIZE": "128",
    "LEARNING_RATE": "0.001"
  }
}

Response: The created/updated parameter set object.

System Management

System Status

GET /api/status

Example response:

{
  "status": "healthy",
  "version": "0.1.0",
  "uptime_seconds": 86400,
  "backend": {
    "type": "redis",
    "status": "connected",
    "latency_ms": 5
  },
  "components": {
    "orchestrator": {
      "status": "running",
      "count": 1
    },
    "agents": {
      "status": "running",
      "count": 3,
      "active": 3,
      "inactive": 0
    },
    "ui": {
      "status": "running"
    }
  },
  "queues": {
    "total_pending_tasks": 10,
    "queues": [
      {
        "name": "default",
        "length": 5
      },
      {
        "name": "high_memory",
        "length": 3
      },
      {
        "name": "gpu_tasks",
        "length": 2
      }
    ]
  }
}

Health Check

GET /api/health

Example response:

{
  "status": "healthy",
  "components": {
    "backend": "healthy",
    "orchestrator": "healthy",
    "agents": "healthy"
  }
}

Metrics

GET /api/metrics

Example response:

{
  "tasks": {
    "pending": 10,
    "running": 5,
    "completed": 100,
    "failed": 2,
    "total": 117
  },
  "runs": {
    "pending": 1,
    "running": 2,
    "completed": 20,
    "failed": 1,
    "total": 24
  },
  "performance": {
    "average_task_duration_seconds": 45.2,
    "average_dag_duration_seconds": 320.5,
    "tasks_per_minute": 12.5
  },
  "resources": {
    "cpu_usage_percent": 25.5,
    "memory_usage_mb": 1024.5,
    "disk_usage_percent": 45.2
  }
}

Event Webhooks

Register Webhook

POST /api/webhooks

Request body:

{
  "name": "Task Completion Notification",
  "url": "https://example.com/webhook",
  "events": ["task.completed", "task.failed", "dag.completed"],
  "secret": "your-webhook-secret",
  "headers": {
    "Custom-Header": "custom-value"
  },
  "filters": {
    "dag_id": "etl_pipeline",
    "task_id": "data_extraction"
  }
}

Response:

{
  "id": "webhook_a1b2c3d4",
  "name": "Task Completion Notification",
  "url": "https://example.com/webhook",
  "events": ["task.completed", "task.failed", "dag.completed"],
  "filters": {
    "dag_id": "etl_pipeline",
    "task_id": "data_extraction"
  },
  "created_at": "2023-11-01T12:00:00Z"
}

List Webhooks

GET /api/webhooks

Example response:

{
  "webhooks": [
    {
      "id": "webhook_a1b2c3d4",
      "name": "Task Completion Notification",
      "url": "https://example.com/webhook",
      "events": ["task.completed", "task.failed", "dag.completed"],
      "filters": {
        "dag_id": "etl_pipeline",
        "task_id": "data_extraction"
      },
      "created_at": "2023-11-01T12:00:00Z"
    },
    // More webhooks...
  ]
}

Delete Webhook

DELETE /api/webhooks/{webhook_id}

Response:

{
  "success": true,
  "message": "Webhook deleted successfully"
}

Event Types

The following event types can be used with webhooks:

Event Type Description
task.pending Task is pending execution
task.queued Task has been queued
task.running Task has started running
task.completed Task has completed successfully
task.failed Task has failed
dag.pending DAG is pending execution
dag.running DAG has started running
dag.completed DAG has completed successfully
dag.failed DAG has failed
agent.registered New agent has registered
agent.offline Agent has gone offline

Webhook Payload Format

When an event occurs, Cyclonetix sends a POST request to the registered webhook URL with the following payload:

{
  "event": "task.completed",
  "timestamp": "2023-11-01T12:10:00Z",
  "data": {
    "run_id": "etl_20231101",
    "dag_id": "etl_pipeline",
    "task_id": "data_extraction",
    "task_run_id": "data_extraction_a1b2c3d4",
    "status": "completed",
    "start_time": "2023-11-01T12:00:00Z",
    "end_time": "2023-11-01T12:10:00Z",
    "duration_seconds": 600
  }
}

For security, Cyclonetix includes an X-Cyclonetix-Signature header with an HMAC-SHA256 signature of the payload using your webhook secret.

Error Handling

All API endpoints return standard HTTP status codes:

  • 200 OK: Request successful
  • 201 Created: Resource created successfully
  • 400 Bad Request: Invalid request parameters
  • 401 Unauthorized: Authentication required
  • 403 Forbidden: Permission denied
  • 404 Not Found: Resource not found
  • 409 Conflict: Resource already exists
  • 500 Internal Server Error: Server error

Error responses have a consistent format:

{
  "error": true,
  "code": "validation_error",
  "message": "Invalid parameter value",
  "details": {
    "field": "timeout_seconds",
    "error": "Must be a positive integer"
  }
}

Pagination

List endpoints support pagination with the following parameters:

  • limit: Maximum number of items to return
  • offset: Number of items to skip

Response includes metadata for pagination:

{
  "items": [...],
  "total": 100,
  "limit": 10,
  "offset": 0,
  "next": "/api/tasks?limit=10&offset=10",
  "prev": null
}

Filtering and Sorting

Many endpoints support filtering and sorting:

  • Filtering: Use query parameters like ?status=completed&dag_id=etl_pipeline
  • Sorting: Use the sort parameter, e.g., ?sort=start_time:desc

Next Steps