Examples

Comprehensive examples demonstrating common use cases, patterns, and best practices for working with the Magpie Cloud SDK.

Use Cases

Real-world examples of how to leverage Magpie Cloud for different scenarios.

CI/Automation

Run automated tests and builds in isolated environments. Perfect for continuous integration pipelines that need clean, reproducible environments.

from magpie import Magpie

client = Magpie()

# Run tests in an isolated environment
test_job = client.jobs.run_and_wait(
    name="test-suite",
    script="""
        git clone https://github.com/myorg/myrepo.git
        cd myrepo
        pip install -r requirements.txt
        pytest tests/
    """,
    vcpus=4,
    memory_mb=2048,
    timeout_seconds=1800
)

if test_job.status == "completed":
    print("Tests passed!")
    print(test_job.logs)
else:
    print("Tests failed!")
    exit(1)

Build Automation

Compile and package applications in consistent build environments.

from magpie import Magpie

client = Magpie()

# Build a Docker image
build_job = client.jobs.run_and_wait(
    name="docker-build",
    script="""
        git clone https://github.com/myorg/app.git
        cd app
        docker build -t myapp:latest .
        docker save myapp:latest > /workspace/myapp.tar
    """,
    vcpus=8,
    memory_mb=8192,
    stateful=True,  # Persist the build artifact
    timeout_seconds=3600
)

print(f"Build status: {build_job.status}")
print(f"Build logs:\\n{build_job.logs}")

Data Pipelines

Process large datasets with parallel workers using templates for consistent configurations across multiple jobs.

from magpie import Magpie

client = Magpie()

# Create a processing template
template = client.templates.create(
    name="data-processor",
    script="""
        import pandas as pd
        import sys

        # Get input file from parameter
        input_file = sys.argv[1]

        # Process the data
        df = pd.read_csv(f"/data/{input_file}")
        df_processed = df.apply(some_processing_function)

        # Save results
        df_processed.to_csv(f"/workspace/{input_file}.processed", index=False)
    """,
    vcpus=2,
    memory_mb=4096,
    stateful=True
)

# Process multiple files in parallel
files = ["data1.csv", "data2.csv", "data3.csv", "data4.csv"]
jobs = []

for file in files:
    job = client.templates.run(
        template_id=template.id,
        name=f"process-{file}",
        script_args=[file]
    )
    jobs.append(job)

# Wait for all jobs to complete
for job in jobs:
    status = client.jobs.get_status(job.id)
    while status.status == "running":
        time.sleep(5)
        status = client.jobs.get_status(job.id)

    print(f"{job.name}: {status.status}")

ETL Pipeline

Extract, transform, and load data with stateful workspaces for caching.

from magpie import Magpie

client = Magpie()

# Extract stage
extract_job = client.jobs.run_and_wait(
    name="etl-extract",
    script="""
        # Download data from S3
        aws s3 sync s3://mybucket/data/ /workspace/raw/

        echo "Extracted $(ls -1 /workspace/raw/ | wc -l) files"
    """,
    env={
        "AWS_ACCESS_KEY_ID": "your_key",
        "AWS_SECRET_ACCESS_KEY": "your_secret"
    },
    vcpus=2,
    memory_mb=2048,
    stateful=True
)

# Transform stage (uses same workspace)
transform_job = client.jobs.run_and_wait(
    name="etl-transform",
    workspace_id=extract_job.workspace_id,
    script="""
        python /workspace/scripts/transform.py \\
            --input /workspace/raw/ \\
            --output /workspace/processed/
    """,
    vcpus=4,
    memory_mb=8192,
    stateful=True
)

# Load stage
load_job = client.jobs.run_and_wait(
    name="etl-load",
    workspace_id=transform_job.workspace_id,
    script="""
        # Upload processed data to database
        python /workspace/scripts/load_to_db.py \\
            --data /workspace/processed/
    """,
    vcpus=2,
    memory_mb=4096,
    stateful=True
)

print(f"ETL Pipeline completed: {load_job.status}")

Interactive Development

SSH into persistent virtual machines for debugging, experimentation, and interactive development workflows.

from magpie import Magpie

client = Magpie()

# Create a persistent development environment
dev_job = client.jobs.create(
    name="dev-environment",
    script="sleep infinity",  # Keep the VM running
    vcpus=4,
    memory_mb=8192,
    stateful=True,
    timeout_seconds=0  # No timeout
)

# Get SSH connection details
ssh_info = client.jobs.ssh(dev_job.id)

print(f"SSH into your development environment:")
print(f"  {ssh_info.command}")
print(f"\\nWorkspace persists at: /workspace")
print(f"Environment ID: {dev_job.id}")

# Later, you can stop the environment when done
# client.jobs.stop(dev_job.id)

Debugging Failed Jobs

Create an interactive environment from a failed job's workspace to debug issues.

from magpie import Magpie

client = Magpie()

# Run a job that might fail
job = client.jobs.create(
    name="complex-task",
    script="python /workspace/complex_script.py",
    vcpus=2,
    memory_mb=4096,
    stateful=True
)

# Wait and check status
status = client.jobs.get_status(job.id)
while status.status == "running":
    time.sleep(10)
    status = client.jobs.get_status(job.id)

# If it failed, create a debug environment
if status.status == "failed":
    print("Job failed! Creating debug environment...")

    debug_job = client.jobs.create(
        name="debug-session",
        workspace_id=job.workspace_id,  # Use same workspace
        script="sleep infinity",
        vcpus=2,
        memory_mb=4096,
        stateful=True
    )

    ssh_info = client.jobs.ssh(debug_job.id)
    print(f"Debug environment ready:")
    print(f"  {ssh_info.command}")
    print(f"\\nThe failed job's workspace is available at /workspace")

Common Patterns

Reusable code patterns for common operations with the Magpie Cloud SDK.

Batch Processing

Process multiple tasks in parallel using Python's concurrent.futures for efficient batch operations.

from magpie import Magpie
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

client = Magpie()

def process_item(item):
    """Process a single item as a job."""
    job = client.jobs.create(
        name=f"process-{item['id']}",
        script=item['script'],
        vcpus=2,
        memory_mb=2048
    )

    # Wait for completion
    while True:
        status = client.jobs.get_status(job.id)
        if status.status in ["completed", "failed", "cancelled"]:
            break
        time.sleep(5)

    return {
        "item_id": item['id'],
        "job_id": job.id,
        "status": status.status,
        "logs": client.jobs.get_logs(job.id).logs if status.status == "completed" else None
    }

# Define batch of items to process
items = [
    {"id": "item1", "script": "python process.py item1"},
    {"id": "item2", "script": "python process.py item2"},
    {"id": "item3", "script": "python process.py item3"},
    {"id": "item4", "script": "python process.py item4"},
    {"id": "item5", "script": "python process.py item5"},
]

# Process in parallel with max 3 concurrent jobs
results = []
with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit all jobs
    futures = {executor.submit(process_item, item): item for item in items}

    # Collect results as they complete
    for future in as_completed(futures):
        item = futures[future]
        try:
            result = future.result()
            results.append(result)
            print(f"Completed {result['item_id']}: {result['status']}")
        except Exception as e:
            print(f"Error processing {item['id']}: {e}")

# Summary
successful = sum(1 for r in results if r['status'] == 'completed')
print(f"\\nBatch processing complete: {successful}/{len(items)} successful")

Monitoring Jobs

Poll job status with exponential backoff and progress tracking.

from magpie import Magpie
import time

client = Magpie()

def wait_for_job(job_id, timeout=3600, initial_delay=1, max_delay=30):
    """
    Wait for a job to complete with exponential backoff.

    Args:
        job_id: The job ID to monitor
        timeout: Maximum time to wait in seconds
        initial_delay: Starting delay between polls in seconds
        max_delay: Maximum delay between polls in seconds

    Returns:
        Final job status object
    """
    start_time = time.time()
    delay = initial_delay

    print(f"Monitoring job {job_id}...")

    while True:
        # Check if timeout exceeded
        elapsed = time.time() - start_time
        if elapsed > timeout:
            raise TimeoutError(f"Job {job_id} exceeded timeout of {timeout}s")

        # Get current status
        status = client.jobs.get_status(job_id)
        print(f"[{int(elapsed)}s] Status: {status.status}")

        # Check if job is done
        if status.status in ["completed", "failed", "cancelled"]:
            return status

        # Wait before next poll (exponential backoff)
        time.sleep(delay)
        delay = min(delay * 1.5, max_delay)

# Example usage
job = client.jobs.create(
    name="long-running-task",
    script="python train_model.py",
    vcpus=4,
    memory_mb=8192
)

try:
    final_status = wait_for_job(job.id, timeout=7200)

    if final_status.status == "completed":
        logs = client.jobs.get_logs(job.id)
        print(f"\\nJob completed successfully!")
        print(f"Logs:\\n{logs.logs}")
    else:
        print(f"\\nJob ended with status: {final_status.status}")

except TimeoutError as e:
    print(f"\\nError: {e}")
    client.jobs.stop(job.id)

Progress Callback Pattern

from magpie import Magpie
import time

client = Magpie()

def monitor_job_with_callback(job_id, callback=None, poll_interval=5):
    """
    Monitor a job and call a callback function on each status update.

    Args:
        job_id: The job ID to monitor
        callback: Function called with (job_id, status, elapsed_time)
        poll_interval: Seconds between status checks
    """
    start_time = time.time()
    last_status = None

    while True:
        status = client.jobs.get_status(job_id)
        elapsed = time.time() - start_time

        # Call callback if status changed
        if callback and status.status != last_status:
            callback(job_id, status.status, elapsed)

        last_status = status.status

        # Exit if job is done
        if status.status in ["completed", "failed", "cancelled"]:
            return status

        time.sleep(poll_interval)

# Example callback function
def progress_callback(job_id, status, elapsed):
    timestamp = time.strftime("%H:%M:%S")
    print(f"[{timestamp}] Job {job_id[:8]}... -> {status} ({int(elapsed)}s)")

# Use it
job = client.jobs.create(
    name="monitored-job",
    script="python process.py",
    vcpus=2,
    memory_mb=2048
)

final_status = monitor_job_with_callback(job.id, callback=progress_callback)

Environment Variables

Pass configuration and secrets to jobs using environment variables.

from magpie import Magpie
import os

client = Magpie()

# Pass environment variables to a job
job = client.jobs.create(
    name="env-example",
    script="""
        echo "Database: $DB_HOST"
        echo "Environment: $ENVIRONMENT"

        # Use in Python scripts
        python -c "import os; print(f'API Key: {os.getenv(\"API_KEY\")}')"
    """,
    env={
        "DB_HOST": "postgres.example.com",
        "DB_USER": "myapp",
        "DB_PASSWORD": os.getenv("DB_PASSWORD"),  # From local env
        "API_KEY": os.getenv("EXTERNAL_API_KEY"),
        "ENVIRONMENT": "production"
    },
    vcpus=1,
    memory_mb=512
)

# Wait for completion
status = client.jobs.get_status(job.id)
while status.status == "running":
    time.sleep(2)
    status = client.jobs.get_status(job.id)

logs = client.jobs.get_logs(job.id)
print(logs.logs)

Configuration from File

from magpie import Magpie
import json

client = Magpie()

# Load configuration from file
with open("config.json") as f:
    config = json.load(f)

# Create env dict from config
env_vars = {
    "CONFIG_" + k.upper(): str(v)
    for k, v in config.items()
}

# Use in job
job = client.jobs.create(
    name="configured-job",
    script="""
        # Access config values
        echo "Server: $CONFIG_SERVER"
        echo "Port: $CONFIG_PORT"

        # Or use a config file approach
        cat > /tmp/config.json << 'EOF'
$CONFIG_JSON
EOF

        python app.py --config /tmp/config.json
    """,
    env={
        **env_vars,
        "CONFIG_JSON": json.dumps(config)
    },
    vcpus=2,
    memory_mb=2048
)

Error Handling

Robust error handling patterns for production applications using the Magpie Cloud SDK.

Basic Try-Except Pattern

from magpie import Magpie
from magpie.exceptions import (
    AuthenticationError,
    JobError,
    ResourceError,
    ValidationError,
    TimeoutError
)

client = Magpie()

try:
    job = client.jobs.create(
        name="error-handling-example",
        script="python my_script.py",
        vcpus=2,
        memory_mb=2048
    )

    # Wait for completion
    status = client.jobs.get_status(job.id)
    while status.status == "running":
        time.sleep(5)
        status = client.jobs.get_status(job.id)

    if status.status == "completed":
        logs = client.jobs.get_logs(job.id)
        print("Success!")
        print(logs.logs)
    else:
        print(f"Job failed with status: {status.status}")
        logs = client.jobs.get_logs(job.id)
        print(f"Logs:\\n{logs.logs}")

except AuthenticationError as e:
    print(f"Authentication failed: {e}")
    print("Check your API key")

except ValidationError as e:
    print(f"Invalid parameters: {e}")
    print("Check job configuration")

except ResourceError as e:
    print(f"Resource error: {e}")
    print("Requested resources may not be available")

except JobError as e:
    print(f"Job error: {e}")

except TimeoutError as e:
    print(f"Operation timed out: {e}")

except Exception as e:
    print(f"Unexpected error: {e}")
    # Log error to monitoring service
    # Clean up resources if needed

Retry Pattern with Exponential Backoff

from magpie import Magpie
from magpie.exceptions import ResourceError, TimeoutError
import time

client = Magpie()

def create_job_with_retry(max_retries=3, base_delay=2):
    """Create a job with automatic retry on transient errors."""

    for attempt in range(max_retries):
        try:
            job = client.jobs.create(
                name="retry-example",
                script="python process.py",
                vcpus=4,
                memory_mb=4096
            )
            print(f"Job created successfully: {job.id}")
            return job

        except (ResourceError, TimeoutError) as e:
            if attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)  # Exponential backoff
                print(f"Attempt {attempt + 1} failed: {e}")
                print(f"Retrying in {delay}s...")
                time.sleep(delay)
            else:
                print(f"All {max_retries} attempts failed")
                raise

        except Exception as e:
            # Don't retry on other errors
            print(f"Non-retryable error: {e}")
            raise

# Use it
try:
    job = create_job_with_retry(max_retries=5)
except Exception as e:
    print(f"Failed to create job: {e}")
    # Handle failure (notify, log, etc.)

Common Error Types

Understanding the different error types helps you handle failures appropriately.

Error Type Description Common Causes Recommended Action
AuthenticationError Authentication or authorization failed Invalid API key, expired key, insufficient permissions Verify API key, check permissions, regenerate if needed
JobError Job operation failed Job not found, invalid state transition, execution error Check job ID, verify job state, review logs
ResourceError Requested resources unavailable Insufficient quota, resource limits exceeded, capacity issues Reduce resource requirements, retry later, contact support
ValidationError Invalid request parameters Missing required fields, invalid values, constraint violations Check parameter types and values, review documentation
TimeoutError Operation exceeded timeout Slow network, service unavailable, long-running operation Increase timeout, check network, retry operation

Graceful Cleanup Pattern

from magpie import Magpie
import time

client = Magpie()

job = None
try:
    # Create job
    job = client.jobs.create(
        name="cleanup-example",
        script="python long_running_task.py",
        vcpus=4,
        memory_mb=8192,
        stateful=True
    )

    # Process job
    status = client.jobs.get_status(job.id)
    while status.status == "running":
        time.sleep(10)
        status = client.jobs.get_status(job.id)

    # Check result
    if status.status != "completed":
        raise Exception(f"Job failed with status: {status.status}")

    # Process successful result
    logs = client.jobs.get_logs(job.id)
    print(logs.logs)

except KeyboardInterrupt:
    print("\\nInterrupted by user")
    if job:
        print(f"Stopping job {job.id}...")
        client.jobs.stop(job.id)

except Exception as e:
    print(f"Error: {e}")

finally:
    # Always cleanup
    if job:
        try:
            # Delete ephemeral jobs, keep stateful ones for debugging
            job_info = client.jobs.get(job.id)
            if not job_info.stateful:
                print(f"Cleaning up job {job.id}...")
                client.jobs.delete(job.id)
            else:
                print(f"Job {job.id} preserved for debugging")
        except Exception as e:
            print(f"Cleanup warning: {e}")

Next Steps