Cookbook

This cookbook provides practical recipes for implementing common workflow patterns with Cyclonetix. Each recipe includes task definitions, DAG configurations, and execution instructions to help you solve real-world orchestration challenges.

Contents

  1. AI-Driven Research Pipeline
  2. Low-Latency Trade Pricing with Kafka
  3. Standard ETL Pipeline
  4. Model Training and Deployment Pipeline
  5. Agents on Cloud Run
  6. Multi-Cloud Deployment

AI-Driven Research Pipeline

This recipe demonstrates how to implement an AI-driven research workflow with dynamic decision-making. The pattern consists of an initial research phase that gathers information, followed by an evaluation point that determines which subsequent analysis paths to take.

Components

  1. Initial Research Phase: Collects and processes information
  2. Evaluation Point: Analyzes results and decides on further actions
  3. Specialized Analysis Paths: Different analytical workflows triggered based on evaluation

Task Definitions

First, let’s define the core research tasks:

# data/tasks/research/collect_data.yaml
id: "collect_data"
name: "Collect Research Data"
command: "python research/collect.py --source ${DATA_SOURCE} --output ${OUTPUT_PATH}"
dependencies: []
parameters:
  DATA_SOURCE: "api"
  OUTPUT_PATH: "/data/research/raw"
  QUERY: "default query"
# data/tasks/research/process_data.yaml
id: "process_data"
name: "Process Research Data"
command: "python research/process.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
dependencies:
  - "collect_data"
parameters:
  INPUT_PATH: "/data/research/raw"
  OUTPUT_PATH: "/data/research/processed"

Now, let’s create the evaluation point task using Langchain and GPT:

# data/tasks/research/evaluate_findings.yaml
id: "evaluate_findings"
name: "Evaluate Research Findings with GPT"
command: "python research/evaluate_with_gpt.py --input ${INPUT_PATH} --output $CYCLO_EVAL_RESULT"
dependencies:
  - "process_data"
evaluation_point: true
parameters:
  INPUT_PATH: "/data/research/processed"
  GPT_MODEL: "gpt-4"
  OPENAI_API_KEY: "${OPENAI_API_KEY}"
  CONFIDENCE_THRESHOLD: "0.7"

The evaluate_with_gpt.py script uses Langchain and GPT to analyze research findings:

import json
import sys
import os
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List, Optional

# Define the output structure
class AnalysisDecision(BaseModel):
    topics: List[str] = Field(description="List of detected topics in the research")
    confidence_score: float = Field(description="Overall confidence score (0-1)")
    recommended_analyses: List[str] = Field(description="List of recommended analysis types to perform")
    summary: str = Field(description="Brief summary of the key findings")
    additional_context: dict = Field(description="Additional context to pass to subsequent analyses")

# Set up the parser
parser = PydanticOutputParser(pydantic_object=AnalysisDecision)

# Create the prompt template
template = """
You are a research analysis expert. Based on the following research findings, please:
1. Identify the main topics covered
2. Assess the overall confidence and quality of the findings
3. Recommend which specialized analyses should be conducted
4. Provide a brief summary of the key points
5. Suggest additional context that would be valuable for subsequent analyses

Research findings:
{research_findings}

{format_instructions}
"""

prompt = ChatPromptTemplate.from_template(
    template=template,
    partial_variables={"format_instructions": parser.get_format_instructions()}
)

def evaluate_with_gpt(input_path, api_key, model_name, confidence_threshold):
    # Load research findings
    with open(input_path, 'r') as f:
        research_findings = f.read()

    # Initialize the LLM
    llm = ChatOpenAI(temperature=0, model_name=model_name, openai_api_key=api_key)

    # Format the prompt with our research findings
    messages = prompt.format_messages(research_findings=research_findings)

    # Get a response from the LLM
    llm_response = llm(messages)

    # Parse the response
    analysis = parser.parse(llm_response.content)

    # Build result for Cyclonetix
    result = {
        "metadata": {
            "confidence_score": analysis.confidence_score,
            "topics": analysis.topics,
            "summary": analysis.summary
        },
        "context_updates": analysis.additional_context
    }

    # Determine which subsequent analyses to run based on GPT's recommendations
    next_tasks = []

    # Only proceed if confidence is above threshold
    if analysis.confidence_score >= float(confidence_threshold):
        for analysis_type in analysis.recommended_analyses:
            # Map recommended analyses to task IDs
            if analysis_type.lower() == "deep" or analysis_type.lower() == "detailed":
                next_tasks.append("deep_analysis")
            elif analysis_type.lower() == "financial":
                next_tasks.append("financial_analysis")
            elif analysis_type.lower() == "technical":
                next_tasks.append("technical_analysis")
            elif analysis_type.lower() == "competitive" or analysis_type.lower() == "competitor":
                next_tasks.append("competitor_analysis")

    result["next_tasks"] = next_tasks

    return result

def main():
    input_path = os.environ.get("INPUT_PATH")
    api_key = os.environ.get("OPENAI_API_KEY")
    model_name = os.environ.get("GPT_MODEL", "gpt-4")
    confidence_threshold = os.environ.get("CONFIDENCE_THRESHOLD", "0.7")

    result = evaluate_with_gpt(input_path, api_key, model_name, confidence_threshold)

    eval_result_path = os.environ.get("CYCLO_EVAL_RESULT", "eval_result.json")
    with open(eval_result_path, 'w') as f:
        json.dump(result, f)

    # Exit with status code 0 for success
    sys.exit(0)

if __name__ == "__main__":
    main()

Finally, define the specialized analysis tasks:

# data/tasks/research/deep_analysis.yaml
id: "deep_analysis"
name: "Deep Analysis"
command: "python research/deep_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
  INPUT_PATH: "/data/research/processed"
  OUTPUT_PATH: "/data/research/deep_analysis"
# data/tasks/research/financial_analysis.yaml
id: "financial_analysis"
name: "Financial Analysis"
command: "python research/financial_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
  INPUT_PATH: "/data/research/processed"
  OUTPUT_PATH: "/data/research/financial_analysis"
# data/tasks/research/technical_analysis.yaml
id: "technical_analysis"
name: "Technical Analysis"
command: "python research/technical_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
  INPUT_PATH: "/data/research/processed"
  OUTPUT_PATH: "/data/research/technical_analysis"
# data/tasks/research/competitor_analysis.yaml
id: "competitor_analysis"
name: "Competitor Analysis"
command: "python research/competitor_analysis.py --input ${INPUT_PATH} --output ${OUTPUT_PATH}"
parameters:
  INPUT_PATH: "/data/research/processed"
  OUTPUT_PATH: "/data/research/competitor_analysis"

DAG Definition

Create a DAG for the research pipeline:

# data/dags/ai_research_pipeline.yaml
id: "ai_research_pipeline"
name: "AI-Driven Research Pipeline"
description: "Dynamic research workflow with AI-based path selection"
tasks:
  - id: "collect_data"
  - id: "process_data"
  - id: "evaluate_findings"
tags: ["research", "ai", "dynamic"]

We would create other DAGs for the specialized analysis paths, such as deep_analysis, financial_analysis, etc.

Context Propagation

One of the key advantages of this approach is that the context from the research pipeline is automatically propagated to the subsequent DAGs. In the evaluation point, we add context updates:

result = {
    "metadata": {
        "confidence_score": analysis.confidence_score,
        "topics": analysis.topics,
        "summary": analysis.summary
    },
    "context_updates": analysis.additional_context  # This propagates to downstream tasks
}

These context updates become available as environment variables in all subsequent tasks. For example, the financial analysis task could access GPT-identified financial metrics that weren’t explicitly defined in the original task parameters:

# In financial_analysis.py
import os

# Access context propagated from GPT evaluation
key_financial_metrics = os.environ.get("KEY_FINANCIAL_METRICS")
relevant_companies = os.environ.get("RELEVANT_COMPANIES")
time_period = os.environ.get("ANALYSIS_TIME_PERIOD")

# Use these context variables in the analysis

Execution

Schedule the research pipeline:

./cyclonetix schedule-dag ai_research_pipeline \
  --param collect_data.QUERY="AI trends in financial services" \
  --param evaluate_findings.OPENAI_API_KEY="your-openai-api-key"

The workflow will: 1. Collect and process the research data 2. Use Langchain and GPT to evaluate the findings and decide on next steps 3. Dynamically execute only the relevant analysis paths based on GPT’s recommendations 4. Propagate GPT-generated context to all downstream tasks

This pattern can be extended with: - Additional evaluation points after each analysis that use GPT to refine the research direction - Aggregation steps that combine results from multiple analyses - Notification tasks that alert researchers about significant findings - Iterative research loops where GPT can request additional information collection

Low-Latency Trade Pricing with Kafka

This recipe demonstrates how to build a high-performance trade pricing system with Cyclonetix and Kafka, focusing on low latency and high throughput.

Components

  1. Kafka Consumer: On the orchestrator, listens for incoming pricing requests and adds to relevant agent queue
  2. Pricing Engine: Calculates prices for trade requests and publishes results back to Kafka
  3. GPU Pricing Engine: Agent running on system with GPU for highly parallelized pricing calculations e.g., Monte Carlo simulations, calculates results and publishes back to Kafka

Task Definitions

First, let’s define a pricing request consumer task:

# data/tasks/pricing/calculate_price.yaml
id: "calculate_price"
name: "Calculate Trade Price"
command: "rust-pricer --trade-id ${TRADE_ID} --instrument ${INSTRUMENT} --quantity ${QUANTITY} --side ${SIDE}"
dependencies: []
parameters:
  TRADE_ID: ""
  INSTRUMENT: ""
  QUANTITY: ""
  SIDE: "BUY"
  MARKET_DATA_SERVICE: "http://market-data:8080"
queue: "pricing"

Now let’s define a GPU-accelerated pricing task:

# data/tasks/pricing/calculate_price.yaml
id: "calculate_price"
name: "Calculate Portfolio Price (GPU)"
command: "rust-wgpu-pricer --portfolio ${PORTFOLIO_ID} --simulations ${SIMULATIONS} --mu ${MU} --sigma ${SIGMA}"
dependencies: []
parameters:
  MARKET_DATA_SERVICE: "http://market-data:8080"
queue: "gpu-pricing"

Configuration for Low Latency

Redis configuration for faster state management

backend: “redis” backend_url: “redis://redis:6379” serialization_format: “binary” # Use binary serialization for better performance redis: connection_pool_size: 20


### Agent Specialization

Deploy specialized agents for different tasks:

```bash
# Start specialized agents for pricing calculations
./cyclonetix --agent --config config.yaml --env CYCLO_AGENT_QUEUES=pricing

# Start specialized agents for GPU pricing calculations
./cyclonetix --agent --config config.yaml --env CYCLO_AGENT_QUEUES=gpu-pricing

Implementation Approach

  1. Use Batching: Process multiple pricing requests in batches when possible
  2. Minimize State Updates: Reduce Redis operations for better performance
  3. Specialized Queues: Separate pricing calculations from I/O operations
  4. Resource Allocation: Dedicate more resources to pricing-critical paths

This pattern offers: - High throughput for pricing requests - Dynamic scaling based on incoming request volume - Separation of concerns between message handling and pricing logic - Low-latency response times for time-sensitive trading operations

Standard ETL Pipeline

This recipe demonstrates a typical Extract, Transform, Load (ETL) pipeline implemented with Cyclonetix.

Components

  1. Extract: Pull data from source systems
  2. Transform: Clean, validate, and transform the data
  3. Load: Load processed data into target systems
  4. Validation: Verify data quality and integrity

Task Definitions

Let’s define the extraction tasks:

# data/tasks/etl/extract_customer_data.yaml
id: "extract_customer_data"
name: "Extract Customer Data"
command: "python etl/extract.py --source ${SOURCE} --type customer --output ${OUTPUT_PATH}"
dependencies: []
parameters:
  SOURCE: "postgres://user:pass@db:5432/source_db"
  OUTPUT_PATH: "/data/raw/customers"
  BATCH_SIZE: 10000
queue: "extract"
# data/tasks/etl/extract_order_data.yaml
id: "extract_order_data"
name: "Extract Order Data"
command: "python etl/extract.py --source ${SOURCE} --type order --output ${OUTPUT_PATH}"
dependencies: []
parameters:
  SOURCE: "postgres://user:pass@db:5432/source_db"
  OUTPUT_PATH: "/data/raw/orders"
  BATCH_SIZE: 10000
queue: "extract"

Next, define the validation task:

# data/tasks/etl/validate_raw_data.yaml
id: "validate_raw_data"
name: "Validate Raw Data"
command: "python etl/validate.py --customers ${CUSTOMERS_PATH} --orders ${ORDERS_PATH} --output ${OUTPUT_PATH}"
dependencies:
  - "extract_customer_data"
  - "extract_order_data"
parameters:
  CUSTOMERS_PATH: "/data/raw/customers"
  ORDERS_PATH: "/data/raw/orders"
  OUTPUT_PATH: "/data/validated"
  RULES_CONFIG: "/config/validation_rules.json"
evaluation_point: true
queue: "transform"

The validation script should output a result indicating whether to proceed or handle errors:

# Example validation logic
import json
import os
import sys

def validate_customers(customers_path, rules_config):
    # Implementation of customer data validation
    # Returns (is_valid, errors)
    # ...
    return True, []

def validate_orders(orders_path, rules_config):
    # Implementation of order data validation
    # Returns (is_valid, errors)
    # ...
    return True, []

def validate_data(customers_path, orders_path, rules_config, output_path):
    # Perform validation checks
    customers_valid, customers_errors = validate_customers(customers_path, rules_config)
    orders_valid, orders_errors = validate_orders(orders_path, rules_config)

    # Write validation results
    validation_result = {
        "customers_valid": customers_valid,
        "orders_valid": orders_valid,
        "customers_errors": customers_errors,
        "orders_errors": orders_errors
    }

    with open(os.path.join(output_path, "validation_results.json"), "w") as f:
        json.dump(validation_result, f)

    # Determine next steps
    if customers_valid and orders_valid:
        return {
            "next_tasks": ["transform_data"],
            "metadata": {
                "validation_status": "passed",
                "error_count": 0
            }
        }
    else:
        return {
            "next_tasks": ["handle_validation_errors"],
            "parameters": {
                "handle_validation_errors": {
                    "ERRORS_PATH": os.path.join(output_path, "validation_results.json")
                }
            },
            "metadata": {
                "validation_status": "failed",
                "error_count": len(customers_errors) + len(orders_errors)
            }
        }

# Write result to CYCLO_EVAL_RESULT
result = validate_data(
    os.environ["CUSTOMERS_PATH"],
    os.environ["ORDERS_PATH"],
    os.environ["RULES_CONFIG"],
    os.environ["OUTPUT_PATH"]
)

with open(os.environ["CYCLO_EVAL_RESULT"], "w") as f:
    json.dump(result, f)

# Exit with appropriate code
sys.exit(0 if result["metadata"]["validation_status"] == "passed" else 1)

Define transformation and loading tasks:

# data/tasks/etl/transform_data.yaml
id: "transform_data"
name: "Transform Data"
command: "python etl/transform.py --customers ${CUSTOMERS_PATH} --orders ${ORDERS_PATH} --output ${OUTPUT_PATH}"
dependencies:
  - "validate_raw_data"
parameters:
  CUSTOMERS_PATH: "/data/validated/customers"
  ORDERS_PATH: "/data/validated/orders"
  OUTPUT_PATH: "/data/transformed"
  TRANSFORMATION_CONFIG: "/config/transformation_rules.json"
queue: "transform"
# data/tasks/etl/handle_validation_errors.yaml
id: "handle_validation_errors"
name: "Handle Validation Errors"
command: "python etl/handle_errors.py --errors ${ERRORS_PATH} --notification-endpoint ${NOTIFICATION_ENDPOINT}"
dependencies:
  - "validate_raw_data"
parameters:
  ERRORS_PATH: ""  # Will be set by the evaluation point
  NOTIFICATION_ENDPOINT: "http://notification-service:8080/api/notify"
queue: "error_handling"
# data/tasks/etl/load_data.yaml
id: "load_data"
name: "Load Transformed Data"
command: "python etl/load.py --input ${INPUT_PATH} --target ${TARGET_CONNECTION} --mode ${LOAD_MODE}"
dependencies:
  - "transform_data"
parameters:
  INPUT_PATH: "/data/transformed"
  TARGET_CONNECTION: "postgres://user:pass@warehouse:5432/dw"
  LOAD_MODE: "append"
queue: "load"

DAG Definition

Create a DAG for the ETL pipeline:

# data/dags/daily_etl_pipeline.yaml
id: "daily_etl_pipeline"
name: "Daily ETL Pipeline"
description: "Daily ETL process for customer and order data"
tasks:
  - id: "extract_customer_data"
  - id: "extract_order_data"
  - id: "validate_raw_data"
  - id: "transform_data"
  - id: "handle_validation_errors"
  - id: "load_data"
schedule:
  cron: "0 2 * * *"  # Run daily at 2:00 AM
  timezone: "UTC"
  catchup: false
context: "production"
tags: ["etl", "daily", "production"]

Execution

Schedule the ETL pipeline:

./cyclonetix schedule-dag daily_etl_pipeline

The ETL pipeline demonstrates: - Parallel data extraction from multiple sources - Data validation with conditional error handling - Transformation of validated data - Loading into target systems

This pattern can be extended with: - Additional validation checks and quality gates - Data reconciliation steps - Notifications and alerts - Incremental loading strategies - Multi-source data synchronization

Model Training and Deployment Pipeline

This recipe demonstrates a machine learning workflow with dynamic hyperparameter tuning, comprehensive evaluation, and selective deployment.

Components

  1. Data Preparation: Extracts and prepares training data
  2. Hyperparameter Optimization: Dynamically searches for optimal parameters
  3. Model Training: Trains models with selected parameters
  4. Evaluation and Approval: Evaluates models and approves deployment
  5. Deployment: Deploys models to different environments

Task Definitions

First, let’s define the data preparation tasks:

# data/tasks/ml/prepare_data.yaml
id: "prepare_data"
name: "Prepare Training Data"
command: "python ml/prepare_data.py --source ${DATA_SOURCE} --output ${OUTPUT_PATH} --split ${TRAIN_TEST_SPLIT}"
dependencies: []
parameters:
  DATA_SOURCE: "s3://data-bucket/raw/data.parquet"
  OUTPUT_PATH: "/data/ml/prepared"
  TRAIN_TEST_SPLIT: "0.8"
  FEATURES_CONFIG: "/config/features.json"

Next, define a hyperparameter search task:

# data/tasks/ml/hyperparameter_search.yaml
id: "hyperparameter_search"
name: "Hyperparameter Search"
command: "python ml/hyperparameter_search.py --data ${DATA_PATH} --output ${OUTPUT_PATH} --trials ${NUM_TRIALS}"
dependencies:
  - "prepare_data"
parameters:
  DATA_PATH: "/data/ml/prepared"
  OUTPUT_PATH: "/data/ml/hyperparams"
  NUM_TRIALS: "20"
  SEARCH_SPACE: "/config/search_space.json"
  METRIC: "f1_score"
evaluation_point: true

The hyperparameter search script will test multiple configurations and determine the best ones to use:

import json
import os
import sys
import numpy as np
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
import pickle

def objective(trial, X_train, y_train, X_test, y_test):
    # Define hyperparameter search space
    n_estimators = trial.suggest_int('n_estimators', 10, 100)
    max_depth = trial.suggest_int('max_depth', 3, 15)
    min_samples_split = trial.suggest_int('min_samples_split', 2, 10)

    # Train model
    clf = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        min_samples_split=min_samples_split,
        random_state=42
    )
    clf.fit(X_train, y_train)

    # Evaluate
    y_pred = clf.predict(X_test)
    score = f1_score(y_test, y_pred, average='weighted')
    return score

def run_hyperparameter_search(data_path, output_path, num_trials, metric):
    # Load data
    with open(os.path.join(data_path, "train.pkl"), "rb") as f:
        X_train, y_train = pickle.load(f)

    with open(os.path.join(data_path, "test.pkl"), "rb") as f:
        X_test, y_test = pickle.load(f)

    # Create study
    study = optuna.create_study(direction='maximize')
    study.optimize(
        lambda trial: objective(trial, X_train, y_train, X_test, y_test),
        n_trials=num_trials
    )

    # Get the top 3 trials
    top_trials = sorted(study.trials, key=lambda t: t.value, reverse=True)[:3]

    # Prepare results
    results = []
    for i, trial in enumerate(top_trials):
        model_name = f"model_{i+1}"
        params = trial.params
        score = trial.value

        results.append({
            "model_name": model_name,
            "params": params,
            "score": score
        })

        # Save hyperparameter set
        with open(os.path.join(output_path, f"{model_name}.json"), "w") as f:
            json.dump(params, f, indent=2)

    return results

def main():
    data_path = os.environ.get("DATA_PATH")
    output_path = os.environ.get("OUTPUT_PATH")
    num_trials = int(os.environ.get("NUM_TRIALS", "20"))
    metric = os.environ.get("METRIC", "f1_score")

    os.makedirs(output_path, exist_ok=True)

    results = run_hyperparameter_search(data_path, output_path, num_trials, metric)

    # Determine which models to train based on scores
    models_to_train = []
    for result in results:
        if result["score"] > 0.7:  # Only proceed with models that meet threshold
            models_to_train.append(result["model_name"])

    result = {
        "next_tasks": ["train_model"] * len(models_to_train),
        "parameters": {
            f"train_model_{i}": {
                "MODEL_NAME": model_name,
                "HYPERPARAMS_FILE": f"/data/ml/hyperparams/{model_name}.json"
            } for i, model_name in enumerate(models_to_train)
        },
        "metadata": {
            "search_results": results,
            "best_score": results[0]["score"] if results else None
        }
    }

    # Write result to CYCLO_EVAL_RESULT
    eval_result_path = os.environ.get("CYCLO_EVAL_RESULT", "eval_result.json")
    with open(eval_result_path, "w") as f:
        json.dump(result, f)

    sys.exit(0)

if __name__ == "__main__":
    main()

Now, define the model training task:

# data/tasks/ml/train_model.yaml
id: "train_model"
name: "Train ML Model"
command: "python ml/train_model.py --data ${DATA_PATH} --hyperparams ${HYPERPARAMS_FILE} --output ${OUTPUT_PATH} --model-name ${MODEL_NAME}"
dependencies:
  - "hyperparameter_search"
parameters:
  DATA_PATH: "/data/ml/prepared"
  HYPERPARAMS_FILE: ""  # Will be set by the evaluation point
  OUTPUT_PATH: "/data/ml/models"
  MODEL_NAME: ""  # Will be set by the evaluation point

Define a model evaluation task:

# data/tasks/ml/evaluate_models.yaml
id: "evaluate_models"
name: "Evaluate All Models"
command: "python ml/evaluate_models.py --models-dir ${MODELS_DIR} --data ${TEST_DATA} --output $CYCLO_EVAL_RESULT"
dependencies:
  - "train_model"
parameters:
  MODELS_DIR: "/data/ml/models"
  TEST_DATA: "/data/ml/prepared/test.pkl"
evaluation_point: true

The evaluation script selects the best model for deployment:

import json
import os
import sys
import pickle
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix

def evaluate_models(models_dir, test_data_path):
    # Load test data
    with open(test_data_path, "rb") as f:
        X_test, y_test = pickle.load(f)

    # Find all model files
    model_files = [f for f in os.listdir(models_dir) if f.endswith('.pkl')]

    results = []
    for model_file in model_files:
        model_path = os.path.join(models_dir, model_file)
        model_name = os.path.splitext(model_file)[0]

        # Load model
        with open(model_path, "rb") as f:
            model = pickle.load(f)

        # Evaluate
        y_pred = model.predict(X_test)
        report = classification_report(y_test, y_pred, output_dict=True)

        # Add to results
        results.append({
            "model_name": model_name,
            "accuracy": report["accuracy"],
            "weighted_f1": report["weighted avg"]["f1-score"],
            "macro_f1": report["macro avg"]["f1-score"]
        })

    # Sort by weighted F1 score
    results.sort(key=lambda x: x["weighted_f1"], reverse=True)

    # Select the best model
    best_model = results[0] if results else None

    return results, best_model

def main():
    models_dir = os.environ.get("MODELS_DIR")
    test_data_path = os.environ.get("TEST_DATA")

    results, best_model = evaluate_models(models_dir, test_data_path)

    # Determine next tasks based on evaluation
    if best_model and best_model["weighted_f1"] >= 0.8:
        # Good model found, proceed to deployment
        result = {
            "next_tasks": ["deploy_model"],
            "parameters": {
                "deploy_model": {
                    "MODEL_NAME": best_model["model_name"],
                    "MODEL_PATH": f"{models_dir}/{best_model['model_name']}.pkl"
                }
            },
            "metadata": {
                "evaluation_results": results,
                "best_model": best_model
            }
        }
    else:
        # No good model found, request human review
        result = {
            "next_tasks": ["request_human_review"],
            "metadata": {
                "evaluation_results": results,
                "best_model": best_model
            }
        }

    # Write result to CYCLO_EVAL_RESULT
    eval_result_path = os.environ.get("CYCLO_EVAL_RESULT", "eval_result.json")
    with open(eval_result_path, "w") as f:
        json.dump(result, f)

    sys.exit(0)

if __name__ == "__main__":
    main()

Define deployment and review tasks:

# data/tasks/ml/deploy_model.yaml
id: "deploy_model"
name: "Deploy ML Model"
command: "python ml/deploy_model.py --model ${MODEL_PATH} --environment ${ENVIRONMENT} --registry ${MODEL_REGISTRY}"
dependencies:
  - "evaluate_models"
parameters:
  MODEL_PATH: ""  # Will be set by the evaluation point
  MODEL_NAME: ""  # Will be set by the evaluation point
  ENVIRONMENT: "staging"
  MODEL_REGISTRY: "mlflow"
# data/tasks/ml/request_human_review.yaml
id: "request_human_review"
name: "Request Human Review of Models"
command: "python ml/request_review.py --models-dir ${MODELS_DIR} --dashboard-url ${DASHBOARD_URL} --notify ${NOTIFY_EMAIL}"
dependencies:
  - "evaluate_models"
parameters:
  MODELS_DIR: "/data/ml/models"
  DASHBOARD_URL: "https://ml-dashboard.example.com/models"
  NOTIFY_EMAIL: "data-science-team@example.com"

DAG Definition

Create a DAG for the ML pipeline:

# data/dags/ml_training_pipeline.yaml
id: "ml_training_pipeline"
name: "ML Training Pipeline"
description: "End-to-end machine learning training, evaluation and deployment"
tasks:
  - id: "prepare_data"
  - id: "hyperparameter_search"
  - id: "train_model"
  - id: "evaluate_models"
  - id: "deploy_model"
  - id: "request_human_review"
schedule:
  cron: "0 2 * * 1"  # Run weekly on Monday at 2:00 AM
  timezone: "UTC"
  catchup: false
tags: ["ml", "training", "deployment"]

Execution

Schedule the ML pipeline:

./cyclonetix schedule-dag ml_training_pipeline \
  --param prepare_data.DATA_SOURCE="s3://updated-data/latest.parquet"

This ML pipeline demonstrates: - Dynamic hyperparameter optimization - Parallel model training - Automatic model evaluation and selection - Conditional deployment based on quality thresholds - Human-in-the-loop review when needed

Cloud Run Agents

This recipe demonstrates how to deploy Cyclonetix agents on serverless platforms like Google Cloud Run for scalable task execution.

Components

  1. Cloud Coordinator: A Cyclonetix service that forwards tasks to cloud-native messaging systems
  2. Cloud Messaging: Message queues for task distribution (e.g., Pub/Sub, SQS, Service Bus)
  3. Serverless Agents: Cloud Run instances, Lambda functions, or Azure Functions that execute tasks

Architecture

In this serverless pattern:

  1. A cloud coordinator service acts as an intermediary between Cyclonetix and cloud-native messaging systems
  2. When a task is assigned to a specific queue, the coordinator forwards it to the appropriate cloud messaging system
  3. Serverless instances automatically scale based on message queue depth
  4. The agents execute tasks and publish results back to a results topic
  5. The coordinator updates task status in Cyclonetix state management

Cloud Run Configuration

Deploying the Cloud Coordinator

The cloud coordinator is deployed as a persistent service:

# Deploy the coordinator to Cloud Run
gcloud run deploy cyclonetix-cloud-coordinator \
  --image gcr.io/your-project/cyclonetix-cloud-coordinator \
  --platform managed \
  --memory 512Mi \
  --cpu 1 \
  --min-instances 1 \
  --set-env-vars="REDIS_URL=redis://10.0.0.1:6379,TASK_TOPIC=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
  --region us-central1

Deploying Cloud Run Agents

Agents are deployed as auto-scaling services:

# Deploy the agent to Cloud Run
gcloud run deploy cyclonetix-cloud-run-agent \
  --image gcr.io/your-project/cyclonetix-cloud-run-agent \
  --platform managed \
  --memory 2Gi \
  --cpu 2 \
  --min-instances 0 \
  --max-instances 100 \
  --set-env-vars="TASK_SUBSCRIPTION=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
  --region us-central1

Setting Up Pub/Sub and Autoscaling

Configure the messaging system and autoscaling:

# Create Pub/Sub topics and subscriptions
gcloud pubsub topics create cyclonetix-tasks
gcloud pubsub topics create cyclonetix-results

gcloud pubsub subscriptions create cyclonetix-tasks-sub \
  --topic=cyclonetix-tasks \
  --ack-deadline=300

gcloud pubsub subscriptions create cyclonetix-results-sub \
  --topic=cyclonetix-results \
  --ack-deadline=60

# Configure Cloud Run autoscaling for the agent
gcloud run services update cyclonetix-cloud-run-agent \
  --no-cpu-throttling \
  --concurrency=10 \
  --min-instances=0 \
  --max-instances=100

Example Task Configuration

Create a task that runs on Cloud Run:

# data/tasks/cloud_run/ml_inference.yaml
id: "ml_inference"
name: "ML Model Inference on Cloud Run"
command: "python3 /scripts/inference.py --model ${MODEL_PATH} --data ${DATA_PATH} --output ${OUTPUT_PATH}"
dependencies: []
parameters:
  MODEL_PATH: "/models/classifier.pkl"
  DATA_PATH: "/data/input.csv"
  OUTPUT_PATH: "/data/predictions.csv"
queue: "ml"  # This queue will be processed by Cloud Run agents

Create a DAG that includes this task:

# data/dags/cloud_run_inference.yaml
id: "cloud_run_inference"
name: "Cloud Run ML Inference"
description: "Run ML inference on Cloud Run"
tasks:
  - id: "ml_inference"
tags: ["ml", "cloud-run", "serverless"]

Benefits of Cloud Run Agents

  1. Zero-Cost When Idle: Agents scale to zero when no tasks are running
  2. Rapid Scaling: Automatically scales up to handle task spikes
  3. Resource Efficiency: Only pay for actual compute time used
  4. Specialized Hardware: Easy access to GPUs and other specialized hardware
  5. No Infrastructure Management: Google handles all infrastructure patching and maintenance

Multi-Cloud Agent Orchestration

This recipe demonstrates how to deploy Cyclonetix agents across multiple cloud providers and on-premises environments, creating a unified task execution platform across hybrid infrastructure.

Components

  1. Cyclonetix Orchestrator: Central orchestration service
  2. Cloud Coordinators: Services that bridge between Cyclonetix and cloud-specific messaging
  3. Cloud-Native Agents: Serverless agents that scale based on queue depth
  4. Persistent Agents: Always-on agents running on VMs or Kubernetes
  5. On-Premises Agents: Agents running in private data centers

Architecture

In this multi-cloud pattern:

  1. Tasks are assigned to queues based on their requirements (cloud provider, hardware, data locality)
  2. Cloud coordinators forward tasks to appropriate cloud messaging systems
  3. Agents on each platform execute tasks from their designated queues
  4. Results flow back through the same path to the central Cyclonetix orchestrator
  5. State management remains consistent across all environments

Multi-Cloud Setup

On-Premises Configuration

On-premises agents connect directly to the Cyclonetix state management:

# On-premises agent configuration (config.yaml)
agent:
  enabled: true
  id: "on-prem-agent-1"
  queues:
    - "on-prem"
    - "secure-data"
  concurrency: 8

state_management:
  type: "redis"
  url: "redis://cyclonetix-redis:6379"
  serialization_format: "json"

AWS Configuration

For AWS, we use a cloud coordinator with SQS:

# Create SQS queues
aws sqs create-queue --queue-name cyclonetix-tasks
aws sqs create-queue --queue-name cyclonetix-results

# Deploy the coordinator to ECS
aws ecs create-service \
  --cluster cyclonetix \
  --service-name aws-coordinator \
  --task-definition cyclonetix-aws-coordinator:1 \
  --desired-count 1 \
  --launch-type FARGATE \
  --network-configuration "awsvpcConfiguration={subnets=[subnet-12345],securityGroups=[sg-12345],assignPublicIp=ENABLED}"

# Deploy agents as Lambda functions
aws lambda create-function \
  --function-name cyclonetix-lambda-agent \
  --runtime provided.al2 \
  --role arn:aws:iam::123456789012:role/cyclonetix-lambda-role \
  --handler not.used.in.provided.runtime \
  --code S3Bucket=cyclonetix-deployment,S3Key=lambda-agent.zip \
  --environment "Variables={TASK_QUEUE=cyclonetix-tasks,RESULT_QUEUE=cyclonetix-results,AGENT_QUEUE=aws}"

Azure Configuration

For Azure, we use a cloud coordinator with Service Bus:

# Create Azure Service Bus
az servicebus namespace create \
  --resource-group cyclonetix-rg \
  --name cyclonetix-bus \
  --location eastus

az servicebus queue create \
  --resource-group cyclonetix-rg \
  --namespace-name cyclonetix-bus \
  --name cyclonetix-tasks

az servicebus queue create \
  --resource-group cyclonetix-rg \
  --namespace-name cyclonetix-bus \
  --name cyclonetix-results

# Deploy the coordinator to Azure Container Instances
az container create \
  --resource-group cyclonetix-rg \
  --name azure-coordinator \
  --image yourregistry.azurecr.io/cyclonetix-azure-coordinator:latest \
  --cpu 1 \
  --memory 1.5 \
  --environment-variables REDIS_URL=redis://cyclonetix-redis:6379 TASK_QUEUE=cyclonetix-tasks RESULT_QUEUE=cyclonetix-results

# Deploy agents as Azure Functions
az functionapp create \
  --resource-group cyclonetix-rg \
  --consumption-plan-location eastus \
  --runtime custom \
  --functions-version 4 \
  --name cyclonetix-function-agent \
  --storage-account cyclonetixstorage

Google Cloud Configuration

For GCP, we use a cloud coordinator with Pub/Sub:

# Create Pub/Sub topics
gcloud pubsub topics create cyclonetix-tasks
gcloud pubsub topics create cyclonetix-results

gcloud pubsub subscriptions create cyclonetix-tasks-sub \
  --topic=cyclonetix-tasks \
  --ack-deadline=300

# Deploy the coordinator to Cloud Run
gcloud run deploy gcp-coordinator \
  --image gcr.io/your-project/cyclonetix-gcp-coordinator \
  --platform managed \
  --memory 512Mi \
  --cpu 1 \
  --min-instances 1 \
  --set-env-vars="REDIS_URL=redis://10.0.0.1:6379,TASK_TOPIC=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
  --region us-central1

# Deploy agents to Cloud Run
gcloud run deploy gcp-agent \
  --image gcr.io/your-project/cyclonetix-cloud-run-agent \
  --platform managed \
  --memory 2Gi \
  --cpu 2 \
  --min-instances 0 \
  --max-instances 100 \
  --set-env-vars="TASK_SUBSCRIPTION=cyclonetix-tasks,RESULT_TOPIC=cyclonetix-results" \
  --region us-central1

Task Queue Routing

Define tasks with specific cloud destinations:

# data/tasks/aws/sagemaker_training.yaml
id: "sagemaker_training"
name: "SageMaker Model Training"
command: "python3 ml/aws/train_sagemaker.py --data ${DATA_PATH} --model-type ${MODEL_TYPE}"
dependencies: []
parameters:
  DATA_PATH: "s3://cyclonetix-data/training/dataset-1"
  MODEL_TYPE: "xgboost"
  INSTANCE_TYPE: "ml.m5.large"
queue: "aws-ml"  # Routed to AWS Lambda agents
# data/tasks/azure/batch_processing.yaml
id: "azure_batch_processing"
name: "Azure Batch Data Processing"
command: "python3 processing/azure/batch_process.py --input ${INPUT_CONTAINER} --output ${OUTPUT_CONTAINER}"
dependencies: []
parameters:
  INPUT_CONTAINER: "https://cyclonetixstorage.blob.core.windows.net/input"
  OUTPUT_CONTAINER: "https://cyclonetixstorage.blob.core.windows.net/output"
  POOL_ID: "cyclonetix-pool"
queue: "azure-batch"  # Routed to Azure Function agents
# data/tasks/gcp/bigquery_etl.yaml
id: "bigquery_etl"
name: "BigQuery ETL Process"
command: "python3 etl/gcp/bigquery_transform.py --project ${PROJECT_ID} --dataset ${DATASET}"
dependencies: []
parameters:
  PROJECT_ID: "cyclonetix-analytics"
  DATASET: "production_data"
  SQL_SCRIPT: "transforms/daily_aggregation.sql"
queue: "gcp-data"  # Routed to Cloud Run agents
# data/tasks/on_prem/secure_processing.yaml
id: "secure_processing"
name: "Secure Data Processing"
command: "python3 secure/process.py --data ${DATA_PATH} --keys ${KEYS_PATH}"
dependencies: []
parameters:
  DATA_PATH: "/secure/data/customer_records"
  KEYS_PATH: "/secure/keys/encryption_keys"
  OUTPUT_PATH: "/secure/processed/customer_insights"
queue: "secure-data"  # Routed to on-premises agents

Cross-Cloud DAG

Create a DAG that spans multiple cloud environments:

# data/dags/multi_cloud_analytics.yaml
id: "multi_cloud_analytics"
name: "Multi-Cloud Analytics Pipeline"
description: "Analytics workflow spanning multiple cloud environments"
tasks:
  - id: "on_prem_data_prep"
    queue: "on-prem"
  - id: "aws_data_analysis"
    queue: "aws-ml"
    dependencies:
      - "on_prem_data_prep"
  - id: "azure_visualization"
    queue: "azure-batch"
    dependencies:
      - "aws_data_analysis"
  - id: "gcp_dashboard_update"
    queue: "gcp-data"
    dependencies:
      - "azure_visualization"
tags: ["multi-cloud", "analytics", "hybrid"]

Cloud-Specific PaaS Integration

Agents can leverage cloud-specific PaaS offerings:

AWS SageMaker Integration

# Example of AWS Lambda agent invoking SageMaker
import boto3

def start_sagemaker_training(data_path, model_type, instance_type):
    sagemaker = boto3.client('sagemaker')

    response = sagemaker.create_training_job(
        TrainingJobName='cyclonetix-training-job',
        AlgorithmSpecification={
            'TrainingImage': '123456789012.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest',
            'TrainingInputMode': 'File'
        },
        RoleArn='arn:aws:iam::123456789012:role/SageMakerExecutionRole',
        InputDataConfig=[
            {
                'ChannelName': 'train',
                'DataSource': {
                    'S3DataSource': {
                        'S3DataType': 'S3Prefix',
                        'S3Uri': data_path,
                        'S3DataDistributionType': 'FullyReplicated'
                    }
                }
            }
        ],
        OutputDataConfig={
            'S3OutputPath': 's3://cyclonetix-models/output'
        },
        ResourceConfig={
            'InstanceType': instance_type,
            'InstanceCount': 1,
            'VolumeSizeInGB': 30
        },
        StoppingCondition={
            'MaxRuntimeInSeconds': 86400
        },
        HyperParameters={
            'objective': 'binary:logistic',
            'max_depth': '5',
            'eta': '0.2',
            'num_round': '100'
        }
    )

    return response['TrainingJobArn']

Azure ML Integration

# Example of Azure Function agent invoking Azure ML
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, command
from azure.ai.ml.entities import Environment, BuildContext
from azure.ai.ml.constants import AssetTypes

def start_azure_ml_job(data_path, model_type):
    credential = DefaultAzureCredential()
    ml_client = MLClient(
        credential=credential,
        subscription_id="subscription-id",
        resource_group_name="resource-group",
        workspace_name="workspace-name"
    )

    job = command(
        code="./src",
        command="python train.py --data ${{inputs.data_path}} --model-type ${{inputs.model_type}}",
        inputs={
            "data_path": data_path,
            "model_type": model_type
        },
        environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1",
        compute="aml-cluster",
        display_name="cyclonetix-training-job"
    )

    returned_job = ml_client.jobs.create_or_update(job)
    return returned_job.name

GCP AI Platform Integration

# Example of Cloud Run agent invoking AI Platform
from google.cloud import aiplatform

def start_vertex_training(project_id, region, data_path, model_type):
    aiplatform.init(project=project_id, location=region)

    custom_job = aiplatform.CustomJob(
        display_name="cyclonetix-training-job",
        worker_pool_specs=[
            {
                "machine_spec": {
                    "machine_type": "n1-standard-4",
                    "accelerator_type": "NVIDIA_TESLA_T4",
                    "accelerator_count": 1,
                },
                "replica_count": 1,
                "python_package_spec": {
                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.0-23:latest",
                    "package_uris": ["gs://cyclonetix-packages/trainer-0.1.tar.gz"],
                    "python_module": "trainer.task",
                    "args": [
                        f"--data-path={data_path}",
                        f"--model-type={model_type}",
                        "--epochs=20"
                    ],
                },
            }
        ],
    )

    custom_job_run = custom_job.run()
    return custom_job_run.name

Benefits of Multi-Cloud Agent Orchestration

  1. Optimal Cloud Selection: Route tasks to the most suitable cloud based on requirements
  2. Cost Optimization: Utilize spot instances or serverless offerings from multiple providers
  3. Avoid Vendor Lock-in: Maintain flexibility to move workloads between clouds
  4. Geographical Distribution: Execute tasks in regions closest to data or users
  5. Specialized Services: Use the best-in-class services from each cloud provider
  6. Compliance: Keep sensitive workloads on-premises while using cloud for non-sensitive tasks
  7. Hybrid Strategy: Gradually migrate to cloud while maintaining on-premises systems