Cookbook
This cookbook provides practical recipes for implementing common workflow patterns with Cyclonetix. Each recipe includes task definitions, DAG configurations, and execution instructions to help you solve real-world orchestration challenges.
Contents
AI-Driven Research Pipeline
This recipe demonstrates how to implement an AI-driven research workflow with dynamic decision-making. The pattern consists of an initial research phase that gathers information, followed by an evaluation point that determines which subsequent analysis paths to take.
Components
- Initial Research Phase: Collects and processes information
- Evaluation Point: Analyzes results and decides on further actions
- Specialized Analysis Paths: Different analytical workflows triggered based on evaluation
Task Definitions
First, let’s define the core research tasks:
# data/tasks/research/collect_data.yaml
id: "collect_data"
name: "Collect Research Data"
command: "python research/collect.py --source ${DATA_SOURCE} --output ${OUTPUT_PATH}"
dependencies: []
parameters:
DATA_SOURCE: "api"
OUTPUT_PATH: "/data/research/raw"
QUERY: "default query"
# data/tasks/research/process_data.yaml
id: "process_data"
name: "Process Research Data"
command: "python research/process.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
dependencies:
- "collect_data"
parameters:
INPUT_PATH: "/data/research/raw"
OUTPUT_PATH: "/data/research/processed"
Now, let’s create the evaluation point task using Langchain and GPT:
# data/tasks/research/evaluate_findings.yaml
id: "evaluate_findings"
name: "Evaluate Research Findings with GPT"
command: "python research/evaluate_with_gpt.py --input ${INPUT_PATH} --output $CYCLO_EVAL_RESULT"
dependencies:
- "process_data"
evaluation_point: true
parameters:
INPUT_PATH: "/data/research/processed"
GPT_MODEL: "gpt-4"
OPENAI_API_KEY: "${OPENAI_API_KEY}"
CONFIDENCE_THRESHOLD: "0.7"
The evaluate_with_gpt.py
script uses Langchain and GPT to analyze research findings:
import json
import sys
import os
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List, Optional
# Define the output structure
class AnalysisDecision(BaseModel):
str] = Field(description="List of detected topics in the research")
topics: List[float = Field(description="Overall confidence score (0-1)")
confidence_score: str] = Field(description="List of recommended analysis types to perform")
recommended_analyses: List[str = Field(description="Brief summary of the key findings")
summary: dict = Field(description="Additional context to pass to subsequent analyses")
additional_context:
# Set up the parser
= PydanticOutputParser(pydantic_object=AnalysisDecision)
parser
# Create the prompt template
= """
template You are a research analysis expert. Based on the following research findings, please:
1. Identify the main topics covered
2. Assess the overall confidence and quality of the findings
3. Recommend which specialized analyses should be conducted
4. Provide a brief summary of the key points
5. Suggest additional context that would be valuable for subsequent analyses
Research findings:
{research_findings}
{format_instructions}
"""
= ChatPromptTemplate.from_template(
prompt =template,
template={"format_instructions": parser.get_format_instructions()}
partial_variables
)
def evaluate_with_gpt(input_path, api_key, model_name, confidence_threshold):
# Load research findings
with open(input_path, 'r') as f:
= f.read()
research_findings
# Initialize the LLM
= ChatOpenAI(temperature=0, model_name=model_name, openai_api_key=api_key)
llm
# Format the prompt with our research findings
= prompt.format_messages(research_findings=research_findings)
messages
# Get a response from the LLM
= llm(messages)
llm_response
# Parse the response
= parser.parse(llm_response.content)
analysis
# Build result for Cyclonetix
= {
result "metadata": {
"confidence_score": analysis.confidence_score,
"topics": analysis.topics,
"summary": analysis.summary
},"context_updates": analysis.additional_context
}
# Determine which subsequent analyses to run based on GPT's recommendations
= []
next_tasks
# Only proceed if confidence is above threshold
if analysis.confidence_score >= float(confidence_threshold):
for analysis_type in analysis.recommended_analyses:
# Map recommended analyses to task IDs
if analysis_type.lower() == "deep" or analysis_type.lower() == "detailed":
"deep_analysis")
next_tasks.append(elif analysis_type.lower() == "financial":
"financial_analysis")
next_tasks.append(elif analysis_type.lower() == "technical":
"technical_analysis")
next_tasks.append(elif analysis_type.lower() == "competitive" or analysis_type.lower() == "competitor":
"competitor_analysis")
next_tasks.append(
"next_tasks"] = next_tasks
result[
return result
def main():
= os.environ.get("INPUT_PATH")
input_path = os.environ.get("OPENAI_API_KEY")
api_key = os.environ.get("GPT_MODEL", "gpt-4")
model_name = os.environ.get("CONFIDENCE_THRESHOLD", "0.7")
confidence_threshold
= evaluate_with_gpt(input_path, api_key, model_name, confidence_threshold)
result
= os.environ.get("CYCLO_EVAL_RESULT", "eval_result.json")
eval_result_path with open(eval_result_path, 'w') as f:
json.dump(result, f)
# Exit with status code 0 for success
0)
sys.exit(
if __name__ == "__main__":
main()
Finally, define the specialized analysis tasks:
# data/tasks/research/deep_analysis.yaml
id: "deep_analysis"
name: "Deep Analysis"
command: "python research/deep_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
INPUT_PATH: "/data/research/processed"
OUTPUT_PATH: "/data/research/deep_analysis"
# data/tasks/research/financial_analysis.yaml
id: "financial_analysis"
name: "Financial Analysis"
command: "python research/financial_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
INPUT_PATH: "/data/research/processed"
OUTPUT_PATH: "/data/research/financial_analysis"
# data/tasks/research/technical_analysis.yaml
id: "technical_analysis"
name: "Technical Analysis"
command: "python research/technical_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
INPUT_PATH: "/data/research/processed"
OUTPUT_PATH: "/data/research/technical_analysis"
# data/tasks/research/competitor_analysis.yaml
id: "competitor_analysis"
name: "Competitor Analysis"
command: "python research/competitor_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
INPUT_PATH: "/data/research/processed"
OUTPUT_PATH: "/data/research/competitor_analysis"
DAG Definition
Create a DAG for the research pipeline:
# data/dags/ai_research_pipeline.yaml
id: "ai_research_pipeline"
name: "AI-Driven Research Pipeline"
description: "Dynamic research workflow with AI-based path selection"
tasks:
- id: "collect_data"
- id: "process_data"
- id: "evaluate_findings"
tags: ["research", "ai", "dynamic"]
We would create other DAGs for the specialized analysis paths, such as deep_analysis
, financial_analysis
, etc.
Context Propagation
One of the key advantages of this approach is that the context from the research pipeline is automatically propagated to the subsequent DAGs. In the evaluation point, we add context updates:
= {
result "metadata": {
"confidence_score": analysis.confidence_score,
"topics": analysis.topics,
"summary": analysis.summary
},"context_updates": analysis.additional_context # This propagates to downstream tasks
}
These context updates become available as environment variables in all subsequent tasks. For example, the financial analysis task could access GPT-identified financial metrics that weren’t explicitly defined in the original task parameters:
# In financial_analysis.py
import os
# Access context propagated from GPT evaluation
= os.environ.get("KEY_FINANCIAL_METRICS")
key_financial_metrics = os.environ.get("RELEVANT_COMPANIES")
relevant_companies = os.environ.get("ANALYSIS_TIME_PERIOD")
time_period
# Use these context variables in the analysis
Execution
Schedule the research pipeline:
./cyclonetix schedule-dag ai_research_pipeline \
--param collect_data.QUERY="AI trends in financial services" \
--param evaluate_findings.OPENAI_API_KEY="your-openai-api-key"
The workflow will: 1. Collect and process the research data 2. Use Langchain and GPT to evaluate the findings and decide on next steps 3. Dynamically execute only the relevant analysis paths based on GPT’s recommendations 4. Propagate GPT-generated context to all downstream tasks
This pattern can be extended with: - Additional evaluation points after each analysis that use GPT to refine the research direction - Aggregation steps that combine results from multiple analyses - Notification tasks that alert researchers about significant findings - Iterative research loops where GPT can request additional information collection
Low-Latency Trade Pricing with Kafka
This recipe demonstrates how to build a high-performance trade pricing system with Cyclonetix and Kafka, focusing on low latency and high throughput.
Components
- Kafka Consumer: On the orchestrator, listens for incoming pricing requests and adds to relevant agent queue
- Pricing Engine: Calculates prices for trade requests and publishes results back to Kafka
- GPU Pricing Engine: Agent running on system with GPU for highly parallelized pricing calculations e.g., Monte Carlo simulations, calculates results and publishes back to Kafka
Task Definitions
First, let’s define a pricing request consumer task:
# data/tasks/pricing/calculate_price.yaml
id: "calculate_price"
name: "Calculate Trade Price"
command: "rust-pricer --trade-id ${TRADE_ID} --instrument ${INSTRUMENT} --quantity ${QUANTITY} --side ${SIDE}"
dependencies: []
parameters:
TRADE_ID: ""
INSTRUMENT: ""
QUANTITY: ""
SIDE: "BUY"
MARKET_DATA_SERVICE: "http://market-data:8080"
queue: "pricing"
Now let’s define a GPU-accelerated pricing task:
# data/tasks/pricing/calculate_price.yaml
id: "calculate_price"
name: "Calculate Portfolio Price (GPU)"
command: "rust-wgpu-pricer --portfolio ${PORTFOLIO_ID} --simulations ${SIMULATIONS} --mu ${MU} --sigma ${SIGMA}"
dependencies: []
parameters:
MARKET_DATA_SERVICE: "http://market-data:8080"
queue: "gpu-pricing"
Configuration for Low Latency
Redis configuration for faster state management
backend: “redis” backend_url: “redis://redis:6379” serialization_format: “binary” # Use binary serialization for better performance redis: connection_pool_size: 20
### Agent Specialization
Deploy specialized agents for different tasks:
```bash
# Start specialized agents for pricing calculations
./cyclonetix --agent --config config.yaml --env CYCLO_AGENT_QUEUES=pricing
# Start specialized agents for GPU pricing calculations
./cyclonetix --agent --config config.yaml --env CYCLO_AGENT_QUEUES=gpu-pricing
Implementation Approach
- Use Batching: Process multiple pricing requests in batches when possible
- Minimize State Updates: Reduce Redis operations for better performance
- Specialized Queues: Separate pricing calculations from I/O operations
- Resource Allocation: Dedicate more resources to pricing-critical paths
This pattern offers: - High throughput for pricing requests - Dynamic scaling based on incoming request volume - Separation of concerns between message handling and pricing logic - Low-latency response times for time-sensitive trading operations
Standard ETL Pipeline
This recipe demonstrates a typical Extract, Transform, Load (ETL) pipeline implemented with Cyclonetix.
Components
- Extract: Pull data from source systems
- Transform: Clean, validate, and transform the data
- Load: Load processed data into target systems
- Validation: Verify data quality and integrity
Task Definitions
Let’s define the extraction tasks:
# data/tasks/etl/extract_customer_data.yaml
id: "extract_customer_data"
name: "Extract Customer Data"
command: "python etl/extract.py --source ${SOURCE} --type customer --output ${OUTPUT_PATH}"
dependencies: []
parameters:
SOURCE: "postgres://user:pass@db:5432/source_db"
OUTPUT_PATH: "/data/raw/customers"
BATCH_SIZE: 10000
queue: "extract"
# data/tasks/etl/extract_order_data.yaml
id: "extract_order_data"
name: "Extract Order Data"
command: "python etl/extract.py --source ${SOURCE} --type order --output ${OUTPUT_PATH}"
dependencies: []
parameters:
SOURCE: "postgres://user:pass@db:5432/source_db"
OUTPUT_PATH: "/data/raw/orders"
BATCH_SIZE: 10000
queue: "extract"
Next, define the validation task:
# data/tasks/etl/validate_raw_data.yaml
id: "validate_raw_data"
name: "Validate Raw Data"
command: "python etl/validate.py --customers ${CUSTOMERS_PATH} --orders ${ORDERS_PATH} --output ${OUTPUT_PATH}"
dependencies:
- "extract_customer_data"
- "extract_order_data"
parameters:
CUSTOMERS_PATH: "/data/raw/customers"
ORDERS_PATH: "/data/raw/orders"
OUTPUT_PATH: "/data/validated"
RULES_CONFIG: "/config/validation_rules.json"
evaluation_point: true
queue: "transform"
The validation script should output a result indicating whether to proceed or handle errors:
# Example validation logic
import json
import os
import sys
def validate_customers(customers_path, rules_config):
# Implementation of customer data validation
# Returns (is_valid, errors)
# ...
return True, []
def validate_orders(orders_path, rules_config):
# Implementation of order data validation
# Returns (is_valid, errors)
# ...
return True, []
def validate_data(customers_path, orders_path, rules_config, output_path):
# Perform validation checks
= validate_customers(customers_path, rules_config)
customers_valid, customers_errors = validate_orders(orders_path, rules_config)
orders_valid, orders_errors
# Write validation results
= {
validation_result "customers_valid": customers_valid,
"orders_valid": orders_valid,
"customers_errors": customers_errors,
"orders_errors": orders_errors
}
with open(os.path.join(output_path, "validation_results.json"), "w") as f:
json.dump(validation_result, f)
# Determine next steps
if customers_valid and orders_valid:
return {
"next_tasks": ["transform_data"],
"metadata": {
"validation_status": "passed",
"error_count": 0
}
}else:
return {
"next_tasks": ["handle_validation_errors"],
"parameters": {
"handle_validation_errors": {
"ERRORS_PATH": os.path.join(output_path, "validation_results.json")
}
},"metadata": {
"validation_status": "failed",
"error_count": len(customers_errors) + len(orders_errors)
}
}
# Write result to CYCLO_EVAL_RESULT
= validate_data(
result "CUSTOMERS_PATH"],
os.environ["ORDERS_PATH"],
os.environ["RULES_CONFIG"],
os.environ["OUTPUT_PATH"]
os.environ[
)
with open(os.environ["CYCLO_EVAL_RESULT"], "w") as f:
json.dump(result, f)
# Exit with appropriate code
0 if result["metadata"]["validation_status"] == "passed" else 1) sys.exit(
Define transformation and loading tasks:
# data/tasks/etl/transform_data.yaml
id: "transform_data"
name: "Transform Data"
command: "python etl/transform.py --customers ${CUSTOMERS_PATH} --orders ${ORDERS_PATH} --output ${OUTPUT_PATH}"
dependencies:
- "validate_raw_data"
parameters:
CUSTOMERS_PATH: "/data/validated/customers"
ORDERS_PATH: "/data/validated/orders"
OUTPUT_PATH: "/data/transformed"
TRANSFORMATION_CONFIG: "/config/transformation_rules.json"
queue: "transform"
# data/tasks/etl/handle_validation_errors.yaml
id: "handle_validation_errors"
name: "Handle Validation Errors"
command: "python etl/handle_errors.py --errors ${ERRORS_PATH} --notification-endpoint ${NOTIFICATION_ENDPOINT}"
dependencies:
- "validate_raw_data"
parameters:
ERRORS_PATH: "" # Will be set by the evaluation point
NOTIFICATION_ENDPOINT: "http://notification-service:8080/api/notify"
queue: "error_handling"
# data/tasks/etl/load_data.yaml
id: "load_data"
name: "Load Transformed Data"
command: "python etl/load.py --input ${INPUT_PATH} --target ${TARGET_CONNECTION} --mode ${LOAD_MODE}"
dependencies:
- "transform_data"
parameters:
INPUT_PATH: "/data/transformed"
TARGET_CONNECTION: "postgres://user:pass@warehouse:5432/dw"
LOAD_MODE: "append"
queue: "load"
DAG Definition
Create a DAG for the ETL pipeline:
# data/dags/daily_etl_pipeline.yaml
id: "daily_etl_pipeline"
name: "Daily ETL Pipeline"
description: "Daily ETL process for customer and order data"
tasks:
- id: "extract_customer_data"
- id: "extract_order_data"
- id: "validate_raw_data"
- id: "transform_data"
- id: "handle_validation_errors"
- id: "load_data"
schedule:
cron: "0 2 * * *" # Run daily at 2:00 AM
timezone: "UTC"
catchup: false
context: "production"
tags: ["etl", "daily", "production"]
Execution
Schedule the ETL pipeline:
./cyclonetix schedule-dag daily_etl_pipeline
The ETL pipeline demonstrates: - Parallel data extraction from multiple sources - Data validation with conditional error handling - Transformation of validated data - Loading into target systems
This pattern can be extended with: - Additional validation checks and quality gates - Data reconciliation steps - Notifications and alerts - Incremental loading strategies - Multi-source data synchronization
Model Training and Deployment Pipeline
This recipe demonstrates a machine learning workflow with dynamic hyperparameter tuning, comprehensive evaluation, and selective deployment.
Components
- Data Preparation: Extracts and prepares training data
- Hyperparameter Optimization: Dynamically searches for optimal parameters
- Model Training: Trains models with selected parameters
- Evaluation and Approval: Evaluates models and approves deployment
- Deployment: Deploys models to different environments
Task Definitions
First, let’s define the data preparation tasks:
# data/tasks/ml/prepare_data.yaml
id: "prepare_data"
name: "Prepare Training Data"
command: "python ml/prepare_data.py --source ${DATA_SOURCE} --output ${OUTPUT_PATH} --split ${TRAIN_TEST_SPLIT}"
dependencies: []
parameters:
DATA_SOURCE: "s3://data-bucket/raw/data.parquet"
OUTPUT_PATH: "/data/ml/prepared"
TRAIN_TEST_SPLIT: "0.8"
FEATURES_CONFIG: "/config/features.json"
Next, define a hyperparameter search task:
# data/tasks/ml/hyperparameter_search.yaml
id: "hyperparameter_search"
name: "Hyperparameter Search"
command: "python ml/hyperparameter_search.py --data ${DATA_PATH} --output ${OUTPUT_PATH} --trials ${NUM_TRIALS}"
dependencies:
- "prepare_data"
parameters:
DATA_PATH: "/data/ml/prepared"
OUTPUT_PATH: "/data/ml/hyperparams"
NUM_TRIALS: "20"
SEARCH_SPACE: "/config/search_space.json"
METRIC: "f1_score"
evaluation_point: true
The hyperparameter search script will test multiple configurations and determine the best ones to use:
import json
import os
import sys
import numpy as np
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
import pickle
def objective(trial, X_train, y_train, X_test, y_test):
# Define hyperparameter search space
= trial.suggest_int('n_estimators', 10, 100)
n_estimators = trial.suggest_int('max_depth', 3, 15)
max_depth = trial.suggest_int('min_samples_split', 2, 10)
min_samples_split
# Train model
= RandomForestClassifier(
clf =n_estimators,
n_estimators=max_depth,
max_depth=min_samples_split,
min_samples_split=42
random_state
)
clf.fit(X_train, y_train)
# Evaluate
= clf.predict(X_test)
y_pred = f1_score(y_test, y_pred, average='weighted')
score return score
def run_hyperparameter_search(data_path, output_path, num_trials, metric):
# Load data
with open(os.path.join(data_path, "train.pkl"), "rb") as f:
= pickle.load(f)
X_train, y_train
with open(os.path.join(data_path, "test.pkl"), "rb") as f:
= pickle.load(f)
X_test, y_test
# Create study
= optuna.create_study(direction='maximize')
study
study.optimize(lambda trial: objective(trial, X_train, y_train, X_test, y_test),
=num_trials
n_trials
)
# Get the top 3 trials
= sorted(study.trials, key=lambda t: t.value, reverse=True)[:3]
top_trials
# Prepare results
= []
results for i, trial in enumerate(top_trials):
= f"model_{i+1}"
model_name = trial.params
params = trial.value
score
results.append({"model_name": model_name,
"params": params,
"score": score
})
# Save hyperparameter set
with open(os.path.join(output_path, f"{model_name}.json"), "w") as f:
=2)
json.dump(params, f, indent
return results
def main():
= os.environ.get("DATA_PATH")
data_path = os.environ.get("OUTPUT_PATH")
output_path = int(os.environ.get("NUM_TRIALS", "20"))
num_trials = os.environ.get("METRIC", "f1_score")
metric
=True)
os.makedirs(output_path, exist_ok
= run_hyperparameter_search(data_path, output_path, num_trials, metric)
results
# Determine which models to train based on scores
= []
models_to_train for result in results:
if result["score"] > 0.7: # Only proceed with models that meet threshold
"model_name"])
models_to_train.append(result[
= {
result "next_tasks": ["train_model"] * len(models_to_train),
"parameters": {
f"train_model_{i}": {
"MODEL_NAME": model_name,
"HYPERPARAMS_FILE": f"/data/ml/hyperparams/{model_name}.json"
for i, model_name in enumerate(models_to_train)
}
},"metadata": {
"search_results": results,
"best_score": results[0]["score"] if results else None
}
}
# Write result to CYCLO_EVAL_RESULT
= os.environ.get("CYCLO_EVAL_RESULT", "eval_result.json")
eval_result_path with open(eval_result_path, "w") as f:
json.dump(result, f)
0)
sys.exit(
if __name__ == "__main__":
main()
Now, define the model training task:
# data/tasks/ml/train_model.yaml
id: "train_model"
name: "Train ML Model"
command: "python ml/train_model.py --data ${DATA_PATH} --hyperparams ${HYPERPARAMS_FILE} --output ${OUTPUT_PATH} --model-name ${MODEL_NAME}"
dependencies:
- "hyperparameter_search"
parameters:
DATA_PATH: "/data/ml/prepared"
HYPERPARAMS_FILE: "" # Will be set by the evaluation point
OUTPUT_PATH: "/data/ml/models"
MODEL_NAME: "" # Will be set by the evaluation point
Define a model evaluation task:
# data/tasks/ml/evaluate_models.yaml
id: "evaluate_models"
name: "Evaluate All Models"
command: "python ml/evaluate_models.py --models-dir ${MODELS_DIR} --data ${TEST_DATA} --output $CYCLO_EVAL_RESULT"
dependencies:
- "train_model"
parameters:
MODELS_DIR: "/data/ml/models"
TEST_DATA: "/data/ml/prepared/test.pkl"
evaluation_point: true
The evaluation script selects the best model for deployment:
import json
import os
import sys
import pickle
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
def evaluate_models(models_dir, test_data_path):
# Load test data
with open(test_data_path, "rb") as f:
= pickle.load(f)
X_test, y_test
# Find all model files
= [f for f in os.listdir(models_dir) if f.endswith('.pkl')]
model_files
= []
results for model_file in model_files:
= os.path.join(models_dir, model_file)
model_path = os.path.splitext(model_file)[0]
model_name
# Load model
with open(model_path, "rb") as f:
= pickle.load(f)
model
# Evaluate
= model.predict(X_test)
y_pred = classification_report(y_test, y_pred, output_dict=True)
report
# Add to results
results.append({"model_name": model_name,
"accuracy": report["accuracy"],
"weighted_f1": report["weighted avg"]["f1-score"],
"macro_f1": report["macro avg"]["f1-score"]
})
# Sort by weighted F1 score
=lambda x: x["weighted_f1"], reverse=True)
results.sort(key
# Select the best model
= results[0] if results else None
best_model
return results, best_model
def main():
= os.environ.get("MODELS_DIR")
models_dir = os.environ.get("TEST_DATA")
test_data_path
= evaluate_models(models_dir, test_data_path)
results, best_model
# Determine next tasks based on evaluation
if best_model and best_model["weighted_f1"] >= 0.8:
# Good model found, proceed to deployment
= {
result "next_tasks": ["deploy_model"],
"parameters": {
"deploy_model": {
"MODEL_NAME": best_model["model_name"],
"MODEL_PATH": f"{models_dir}/{best_model['model_name']}.pkl"
}
},"metadata": {
"evaluation_results": results,
"best_model": best_model
}
}else:
# No good model found, request human review
= {
result "next_tasks": ["request_human_review"],
"metadata": {
"evaluation_results": results,
"best_model": best_model
}
}
# Write result to CYCLO_EVAL_RESULT
= os.environ.get("CYCLO_EVAL_RESULT", "eval_result.json")
eval_result_path with open(eval_result_path, "w") as f:
json.dump(result, f)
0)
sys.exit(
if __name__ == "__main__":
main()
Define deployment and review tasks:
# data/tasks/ml/deploy_model.yaml
id: "deploy_model"
name: "Deploy ML Model"
command: "python ml/deploy_model.py --model ${MODEL_PATH} --environment ${ENVIRONMENT} --registry ${MODEL_REGISTRY}"
dependencies:
- "evaluate_models"
parameters:
MODEL_PATH: "" # Will be set by the evaluation point
MODEL_NAME: "" # Will be set by the evaluation point
ENVIRONMENT: "staging"
MODEL_REGISTRY: "mlflow"
# data/tasks/ml/request_human_review.yaml
id: "request_human_review"
name: "Request Human Review of Models"
command: "python ml/request_review.py --models-dir ${MODELS_DIR} --dashboard-url ${DASHBOARD_URL} --notify ${NOTIFY_EMAIL}"
dependencies:
- "evaluate_models"
parameters:
MODELS_DIR: "/data/ml/models"
DASHBOARD_URL: "https://ml-dashboard.example.com/models"
NOTIFY_EMAIL: "data-science-team@example.com"
DAG Definition
Create a DAG for the ML pipeline:
# data/dags/ml_training_pipeline.yaml
id: "ml_training_pipeline"
name: "ML Training Pipeline"
description: "End-to-end machine learning training, evaluation and deployment"
tasks:
- id: "prepare_data"
- id: "hyperparameter_search"
- id: "train_model"
- id: "evaluate_models"
- id: "deploy_model"
- id: "request_human_review"
schedule:
cron: "0 2 * * 1" # Run weekly on Monday at 2:00 AM
timezone: "UTC"
catchup: false
tags: ["ml", "training", "deployment"]
Execution
Schedule the ML pipeline:
./cyclonetix schedule-dag ml_training_pipeline \
--param prepare_data.DATA_SOURCE="s3://updated-data/latest.parquet"
This ML pipeline demonstrates: - Dynamic hyperparameter optimization - Parallel model training - Automatic model evaluation and selection - Conditional deployment based on quality thresholds - Human-in-the-loop review when needed
Cloud Run Agents
This recipe demonstrates how to deploy Cyclonetix agents on serverless platforms like Google Cloud Run for scalable task execution.
Components
- Cloud Coordinator: A Cyclonetix service that forwards tasks to cloud-native messaging systems
- Cloud Messaging: Message queues for task distribution (e.g., Pub/Sub, SQS, Service Bus)
- Serverless Agents: Cloud Run instances, Lambda functions, or Azure Functions that execute tasks
Architecture
In this serverless pattern:
- A cloud coordinator service acts as an intermediary between Cyclonetix and cloud-native messaging systems
- When a task is assigned to a specific queue, the coordinator forwards it to the appropriate cloud messaging system
- Serverless instances automatically scale based on message queue depth
- The agents execute tasks and publish results back to a results topic
- The coordinator updates task status in Cyclonetix state management
Cloud Run Configuration
Deploying the Cloud Coordinator
The cloud coordinator is deployed as a persistent service:
# Deploy the coordinator to Cloud Run
gcloud run deploy cyclonetix-cloud-coordinator \
--image gcr.io/your-project/cyclonetix-cloud-coordinator \
--platform managed \
--memory 512Mi \
--cpu 1 \
--min-instances 1 \
--set-env-vars="REDIS_URL=redis://10.0.0.1:6379,TASK_TOPIC=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
--region us-central1
Deploying Cloud Run Agents
Agents are deployed as auto-scaling services:
# Deploy the agent to Cloud Run
gcloud run deploy cyclonetix-cloud-run-agent \
--image gcr.io/your-project/cyclonetix-cloud-run-agent \
--platform managed \
--memory 2Gi \
--cpu 2 \
--min-instances 0 \
--max-instances 100 \
--set-env-vars="TASK_SUBSCRIPTION=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
--region us-central1
Setting Up Pub/Sub and Autoscaling
Configure the messaging system and autoscaling:
# Create Pub/Sub topics and subscriptions
gcloud pubsub topics create cyclonetix-tasks
gcloud pubsub topics create cyclonetix-results
gcloud pubsub subscriptions create cyclonetix-tasks-sub \
--topic=cyclonetix-tasks \
--ack-deadline=300
gcloud pubsub subscriptions create cyclonetix-results-sub \
--topic=cyclonetix-results \
--ack-deadline=60
# Configure Cloud Run autoscaling for the agent
gcloud run services update cyclonetix-cloud-run-agent \
--no-cpu-throttling \
--concurrency=10 \
--min-instances=0 \
--max-instances=100
Example Task Configuration
Create a task that runs on Cloud Run:
# data/tasks/cloud_run/ml_inference.yaml
id: "ml_inference"
name: "ML Model Inference on Cloud Run"
command: "python3 /scripts/inference.py --model ${MODEL_PATH} --data ${DATA_PATH} --output ${OUTPUT_PATH}"
dependencies: []
parameters:
MODEL_PATH: "/models/classifier.pkl"
DATA_PATH: "/data/input.csv"
OUTPUT_PATH: "/data/predictions.csv"
queue: "ml" # This queue will be processed by Cloud Run agents
Create a DAG that includes this task:
# data/dags/cloud_run_inference.yaml
id: "cloud_run_inference"
name: "Cloud Run ML Inference"
description: "Run ML inference on Cloud Run"
tasks:
- id: "ml_inference"
tags: ["ml", "cloud-run", "serverless"]
Benefits of Cloud Run Agents
- Zero-Cost When Idle: Agents scale to zero when no tasks are running
- Rapid Scaling: Automatically scales up to handle task spikes
- Resource Efficiency: Only pay for actual compute time used
- Specialized Hardware: Easy access to GPUs and other specialized hardware
- No Infrastructure Management: Google handles all infrastructure patching and maintenance
Multi-Cloud Agent Orchestration
This recipe demonstrates how to deploy Cyclonetix agents across multiple cloud providers and on-premises environments, creating a unified task execution platform across hybrid infrastructure.
Components
- Cyclonetix Orchestrator: Central orchestration service
- Cloud Coordinators: Services that bridge between Cyclonetix and cloud-specific messaging
- Cloud-Native Agents: Serverless agents that scale based on queue depth
- Persistent Agents: Always-on agents running on VMs or Kubernetes
- On-Premises Agents: Agents running in private data centers
Architecture
In this multi-cloud pattern:
- Tasks are assigned to queues based on their requirements (cloud provider, hardware, data locality)
- Cloud coordinators forward tasks to appropriate cloud messaging systems
- Agents on each platform execute tasks from their designated queues
- Results flow back through the same path to the central Cyclonetix orchestrator
- State management remains consistent across all environments
Multi-Cloud Setup
On-Premises Configuration
On-premises agents connect directly to the Cyclonetix state management:
# On-premises agent configuration (config.yaml)
agent:
enabled: true
id: "on-prem-agent-1"
queues:
- "on-prem"
- "secure-data"
concurrency: 8
state_management:
type: "redis"
url: "redis://cyclonetix-redis:6379"
serialization_format: "json"
AWS Configuration
For AWS, we use a cloud coordinator with SQS:
# Create SQS queues
aws sqs create-queue --queue-name cyclonetix-tasks
aws sqs create-queue --queue-name cyclonetix-results
# Deploy the coordinator to ECS
aws ecs create-service \
--cluster cyclonetix \
--service-name aws-coordinator \
--task-definition cyclonetix-aws-coordinator:1 \
--desired-count 1 \
--launch-type FARGATE \
--network-configuration "awsvpcConfiguration={subnets=[subnet-12345],securityGroups=[sg-12345],assignPublicIp=ENABLED}"
# Deploy agents as Lambda functions
aws lambda create-function \
--function-name cyclonetix-lambda-agent \
--runtime provided.al2 \
--role arn:aws:iam::123456789012:role/cyclonetix-lambda-role \
--handler not.used.in.provided.runtime \
--code S3Bucket=cyclonetix-deployment,S3Key=lambda-agent.zip \
--environment "Variables={TASK_QUEUE=cyclonetix-tasks,RESULT_QUEUE=cyclonetix-results,AGENT_QUEUE=aws}"
Azure Configuration
For Azure, we use a cloud coordinator with Service Bus:
# Create Azure Service Bus
az servicebus namespace create \
--resource-group cyclonetix-rg \
--name cyclonetix-bus \
--location eastus
az servicebus queue create \
--resource-group cyclonetix-rg \
--namespace-name cyclonetix-bus \
--name cyclonetix-tasks
az servicebus queue create \
--resource-group cyclonetix-rg \
--namespace-name cyclonetix-bus \
--name cyclonetix-results
# Deploy the coordinator to Azure Container Instances
az container create \
--resource-group cyclonetix-rg \
--name azure-coordinator \
--image yourregistry.azurecr.io/cyclonetix-azure-coordinator:latest \
--cpu 1 \
--memory 1.5 \
--environment-variables REDIS_URL=redis://cyclonetix-redis:6379 TASK_QUEUE=cyclonetix-tasks RESULT_QUEUE=cyclonetix-results
# Deploy agents as Azure Functions
az functionapp create \
--resource-group cyclonetix-rg \
--consumption-plan-location eastus \
--runtime custom \
--functions-version 4 \
--name cyclonetix-function-agent \
--storage-account cyclonetixstorage
Google Cloud Configuration
For GCP, we use a cloud coordinator with Pub/Sub:
# Create Pub/Sub topics
gcloud pubsub topics create cyclonetix-tasks
gcloud pubsub topics create cyclonetix-results
gcloud pubsub subscriptions create cyclonetix-tasks-sub \
--topic=cyclonetix-tasks \
--ack-deadline=300
# Deploy the coordinator to Cloud Run
gcloud run deploy gcp-coordinator \
--image gcr.io/your-project/cyclonetix-gcp-coordinator \
--platform managed \
--memory 512Mi \
--cpu 1 \
--min-instances 1 \
--set-env-vars="REDIS_URL=redis://10.0.0.1:6379,TASK_TOPIC=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
--region us-central1
# Deploy agents to Cloud Run
gcloud run deploy gcp-agent \
--image gcr.io/your-project/cyclonetix-cloud-run-agent \
--platform managed \
--memory 2Gi \
--cpu 2 \
--min-instances 0 \
--max-instances 100 \
--set-env-vars="TASK_SUBSCRIPTION=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
--region us-central1
Task Queue Routing
Define tasks with specific cloud destinations:
# data/tasks/aws/sagemaker_training.yaml
id: "sagemaker_training"
name: "SageMaker Model Training"
command: "python3 ml/aws/train_sagemaker.py --data ${DATA_PATH} --model-type ${MODEL_TYPE}"
dependencies: []
parameters:
DATA_PATH: "s3://cyclonetix-data/training/dataset-1"
MODEL_TYPE: "xgboost"
INSTANCE_TYPE: "ml.m5.large"
queue: "aws-ml" # Routed to AWS Lambda agents
# data/tasks/azure/batch_processing.yaml
id: "azure_batch_processing"
name: "Azure Batch Data Processing"
command: "python3 processing/azure/batch_process.py --input ${INPUT_CONTAINER} --output ${OUTPUT_CONTAINER}"
dependencies: []
parameters:
INPUT_CONTAINER: "https://cyclonetixstorage.blob.core.windows.net/input"
OUTPUT_CONTAINER: "https://cyclonetixstorage.blob.core.windows.net/output"
POOL_ID: "cyclonetix-pool"
queue: "azure-batch" # Routed to Azure Function agents
# data/tasks/gcp/bigquery_etl.yaml
id: "bigquery_etl"
name: "BigQuery ETL Process"
command: "python3 etl/gcp/bigquery_transform.py --project ${PROJECT_ID} --dataset ${DATASET}"
dependencies: []
parameters:
PROJECT_ID: "cyclonetix-analytics"
DATASET: "production_data"
SQL_SCRIPT: "transforms/daily_aggregation.sql"
queue: "gcp-data" # Routed to Cloud Run agents
# data/tasks/on_prem/secure_processing.yaml
id: "secure_processing"
name: "Secure Data Processing"
command: "python3 secure/process.py --data ${DATA_PATH} --keys ${KEYS_PATH}"
dependencies: []
parameters:
DATA_PATH: "/secure/data/customer_records"
KEYS_PATH: "/secure/keys/encryption_keys"
OUTPUT_PATH: "/secure/processed/customer_insights"
queue: "secure-data" # Routed to on-premises agents
Cross-Cloud DAG
Create a DAG that spans multiple cloud environments:
# data/dags/multi_cloud_analytics.yaml
id: "multi_cloud_analytics"
name: "Multi-Cloud Analytics Pipeline"
description: "Analytics workflow spanning multiple cloud environments"
tasks:
- id: "on_prem_data_prep"
queue: "on-prem"
- id: "aws_data_analysis"
queue: "aws-ml"
dependencies:
- "on_prem_data_prep"
- id: "azure_visualization"
queue: "azure-batch"
dependencies:
- "aws_data_analysis"
- id: "gcp_dashboard_update"
queue: "gcp-data"
dependencies:
- "azure_visualization"
tags: ["multi-cloud", "analytics", "hybrid"]
Cloud-Specific PaaS Integration
Agents can leverage cloud-specific PaaS offerings:
AWS SageMaker Integration
# Example of AWS Lambda agent invoking SageMaker
import boto3
def start_sagemaker_training(data_path, model_type, instance_type):
= boto3.client('sagemaker')
sagemaker
= sagemaker.create_training_job(
response ='cyclonetix-training-job',
TrainingJobName={
AlgorithmSpecification'TrainingImage': '123456789012.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest',
'TrainingInputMode': 'File'
},='arn:aws:iam::123456789012:role/SageMakerExecutionRole',
RoleArn=[
InputDataConfig
{'ChannelName': 'train',
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': data_path,
'S3DataDistributionType': 'FullyReplicated'
}
}
}
],={
OutputDataConfig'S3OutputPath': 's3://cyclonetix-models/output'
},={
ResourceConfig'InstanceType': instance_type,
'InstanceCount': 1,
'VolumeSizeInGB': 30
},={
StoppingCondition'MaxRuntimeInSeconds': 86400
},={
HyperParameters'objective': 'binary:logistic',
'max_depth': '5',
'eta': '0.2',
'num_round': '100'
}
)
return response['TrainingJobArn']
Azure ML Integration
# Example of Azure Function agent invoking Azure ML
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, command
from azure.ai.ml.entities import Environment, BuildContext
from azure.ai.ml.constants import AssetTypes
def start_azure_ml_job(data_path, model_type):
= DefaultAzureCredential()
credential = MLClient(
ml_client =credential,
credential="subscription-id",
subscription_id="resource-group",
resource_group_name="workspace-name"
workspace_name
)
= command(
job ="./src",
code="python train.py --data ${{inputs.data_path}} --model-type ${{inputs.model_type}}",
command={
inputs"data_path": data_path,
"model_type": model_type
},="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1",
environment="aml-cluster",
compute="cyclonetix-training-job"
display_name
)
= ml_client.jobs.create_or_update(job)
returned_job return returned_job.name
GCP AI Platform Integration
# Example of Cloud Run agent invoking AI Platform
from google.cloud import aiplatform
def start_vertex_training(project_id, region, data_path, model_type):
=project_id, location=region)
aiplatform.init(project
= aiplatform.CustomJob(
custom_job ="cyclonetix-training-job",
display_name=[
worker_pool_specs
{"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": "NVIDIA_TESLA_T4",
"accelerator_count": 1,
},"replica_count": 1,
"python_package_spec": {
"executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.0-23:latest",
"package_uris": ["gs://cyclonetix-packages/trainer-0.1.tar.gz"],
"python_module": "trainer.task",
"args": [
f"--data-path={data_path}",
f"--model-type={model_type}",
"--epochs=20"
],
},
}
],
)
= custom_job.run()
custom_job_run return custom_job_run.name
Benefits of Multi-Cloud Agent Orchestration
- Optimal Cloud Selection: Route tasks to the most suitable cloud based on requirements
- Cost Optimization: Utilize spot instances or serverless offerings from multiple providers
- Avoid Vendor Lock-in: Maintain flexibility to move workloads between clouds
- Geographical Distribution: Execute tasks in regions closest to data or users
- Specialized Services: Use the best-in-class services from each cloud provider
- Compliance: Keep sensitive workloads on-premises while using cloud for non-sensitive tasks
- Hybrid Strategy: Gradually migrate to cloud while maintaining on-premises systems