Examples
Comprehensive examples demonstrating common use cases, patterns, and best practices for working with the Magpie Cloud SDK.
Use Cases
Real-world examples of how to leverage Magpie Cloud for different scenarios.
CI/Automation
Run automated tests and builds in isolated environments. Perfect for continuous integration pipelines that need clean, reproducible environments.
from magpie import Magpie
client = Magpie()
# Run tests in an isolated environment
test_job = client.jobs.run_and_wait(
name="test-suite",
script="""
git clone https://github.com/myorg/myrepo.git
cd myrepo
pip install -r requirements.txt
pytest tests/
""",
vcpus=4,
memory_mb=2048,
timeout_seconds=1800
)
if test_job.status == "completed":
print("Tests passed!")
print(test_job.logs)
else:
print("Tests failed!")
exit(1) Build Automation
Compile and package applications in consistent build environments.
from magpie import Magpie
client = Magpie()
# Build a Docker image
build_job = client.jobs.run_and_wait(
name="docker-build",
script="""
git clone https://github.com/myorg/app.git
cd app
docker build -t myapp:latest .
docker save myapp:latest > /workspace/myapp.tar
""",
vcpus=8,
memory_mb=8192,
stateful=True, # Persist the build artifact
timeout_seconds=3600
)
print(f"Build status: {build_job.status}")
print(f"Build logs:\\n{build_job.logs}") Data Pipelines
Process large datasets with parallel workers using templates for consistent configurations across multiple jobs.
from magpie import Magpie
client = Magpie()
# Create a processing template
template = client.templates.create(
name="data-processor",
script="""
import pandas as pd
import sys
# Get input file from parameter
input_file = sys.argv[1]
# Process the data
df = pd.read_csv(f"/data/{input_file}")
df_processed = df.apply(some_processing_function)
# Save results
df_processed.to_csv(f"/workspace/{input_file}.processed", index=False)
""",
vcpus=2,
memory_mb=4096,
stateful=True
)
# Process multiple files in parallel
files = ["data1.csv", "data2.csv", "data3.csv", "data4.csv"]
jobs = []
for file in files:
job = client.templates.run(
template_id=template.id,
name=f"process-{file}",
script_args=[file]
)
jobs.append(job)
# Wait for all jobs to complete
for job in jobs:
status = client.jobs.get_status(job.id)
while status.status == "running":
time.sleep(5)
status = client.jobs.get_status(job.id)
print(f"{job.name}: {status.status}") ETL Pipeline
Extract, transform, and load data with stateful workspaces for caching.
from magpie import Magpie
client = Magpie()
# Extract stage
extract_job = client.jobs.run_and_wait(
name="etl-extract",
script="""
# Download data from S3
aws s3 sync s3://mybucket/data/ /workspace/raw/
echo "Extracted $(ls -1 /workspace/raw/ | wc -l) files"
""",
env={
"AWS_ACCESS_KEY_ID": "your_key",
"AWS_SECRET_ACCESS_KEY": "your_secret"
},
vcpus=2,
memory_mb=2048,
stateful=True
)
# Transform stage (uses same workspace)
transform_job = client.jobs.run_and_wait(
name="etl-transform",
workspace_id=extract_job.workspace_id,
script="""
python /workspace/scripts/transform.py \\
--input /workspace/raw/ \\
--output /workspace/processed/
""",
vcpus=4,
memory_mb=8192,
stateful=True
)
# Load stage
load_job = client.jobs.run_and_wait(
name="etl-load",
workspace_id=transform_job.workspace_id,
script="""
# Upload processed data to database
python /workspace/scripts/load_to_db.py \\
--data /workspace/processed/
""",
vcpus=2,
memory_mb=4096,
stateful=True
)
print(f"ETL Pipeline completed: {load_job.status}") Interactive Development
SSH into persistent virtual machines for debugging, experimentation, and interactive development workflows.
from magpie import Magpie
client = Magpie()
# Create a persistent development environment
dev_job = client.jobs.create(
name="dev-environment",
script="sleep infinity", # Keep the VM running
vcpus=4,
memory_mb=8192,
stateful=True,
timeout_seconds=0 # No timeout
)
# Get SSH connection details
ssh_info = client.jobs.ssh(dev_job.id)
print(f"SSH into your development environment:")
print(f" {ssh_info.command}")
print(f"\\nWorkspace persists at: /workspace")
print(f"Environment ID: {dev_job.id}")
# Later, you can stop the environment when done
# client.jobs.stop(dev_job.id) Debugging Failed Jobs
Create an interactive environment from a failed job's workspace to debug issues.
from magpie import Magpie
client = Magpie()
# Run a job that might fail
job = client.jobs.create(
name="complex-task",
script="python /workspace/complex_script.py",
vcpus=2,
memory_mb=4096,
stateful=True
)
# Wait and check status
status = client.jobs.get_status(job.id)
while status.status == "running":
time.sleep(10)
status = client.jobs.get_status(job.id)
# If it failed, create a debug environment
if status.status == "failed":
print("Job failed! Creating debug environment...")
debug_job = client.jobs.create(
name="debug-session",
workspace_id=job.workspace_id, # Use same workspace
script="sleep infinity",
vcpus=2,
memory_mb=4096,
stateful=True
)
ssh_info = client.jobs.ssh(debug_job.id)
print(f"Debug environment ready:")
print(f" {ssh_info.command}")
print(f"\\nThe failed job's workspace is available at /workspace") Common Patterns
Reusable code patterns for common operations with the Magpie Cloud SDK.
Batch Processing
Process multiple tasks in parallel using Python's concurrent.futures for efficient batch operations.
from magpie import Magpie
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
client = Magpie()
def process_item(item):
"""Process a single item as a job."""
job = client.jobs.create(
name=f"process-{item['id']}",
script=item['script'],
vcpus=2,
memory_mb=2048
)
# Wait for completion
while True:
status = client.jobs.get_status(job.id)
if status.status in ["completed", "failed", "cancelled"]:
break
time.sleep(5)
return {
"item_id": item['id'],
"job_id": job.id,
"status": status.status,
"logs": client.jobs.get_logs(job.id).logs if status.status == "completed" else None
}
# Define batch of items to process
items = [
{"id": "item1", "script": "python process.py item1"},
{"id": "item2", "script": "python process.py item2"},
{"id": "item3", "script": "python process.py item3"},
{"id": "item4", "script": "python process.py item4"},
{"id": "item5", "script": "python process.py item5"},
]
# Process in parallel with max 3 concurrent jobs
results = []
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit all jobs
futures = {executor.submit(process_item, item): item for item in items}
# Collect results as they complete
for future in as_completed(futures):
item = futures[future]
try:
result = future.result()
results.append(result)
print(f"Completed {result['item_id']}: {result['status']}")
except Exception as e:
print(f"Error processing {item['id']}: {e}")
# Summary
successful = sum(1 for r in results if r['status'] == 'completed')
print(f"\\nBatch processing complete: {successful}/{len(items)} successful") Monitoring Jobs
Poll job status with exponential backoff and progress tracking.
from magpie import Magpie
import time
client = Magpie()
def wait_for_job(job_id, timeout=3600, initial_delay=1, max_delay=30):
"""
Wait for a job to complete with exponential backoff.
Args:
job_id: The job ID to monitor
timeout: Maximum time to wait in seconds
initial_delay: Starting delay between polls in seconds
max_delay: Maximum delay between polls in seconds
Returns:
Final job status object
"""
start_time = time.time()
delay = initial_delay
print(f"Monitoring job {job_id}...")
while True:
# Check if timeout exceeded
elapsed = time.time() - start_time
if elapsed > timeout:
raise TimeoutError(f"Job {job_id} exceeded timeout of {timeout}s")
# Get current status
status = client.jobs.get_status(job_id)
print(f"[{int(elapsed)}s] Status: {status.status}")
# Check if job is done
if status.status in ["completed", "failed", "cancelled"]:
return status
# Wait before next poll (exponential backoff)
time.sleep(delay)
delay = min(delay * 1.5, max_delay)
# Example usage
job = client.jobs.create(
name="long-running-task",
script="python train_model.py",
vcpus=4,
memory_mb=8192
)
try:
final_status = wait_for_job(job.id, timeout=7200)
if final_status.status == "completed":
logs = client.jobs.get_logs(job.id)
print(f"\\nJob completed successfully!")
print(f"Logs:\\n{logs.logs}")
else:
print(f"\\nJob ended with status: {final_status.status}")
except TimeoutError as e:
print(f"\\nError: {e}")
client.jobs.stop(job.id) Progress Callback Pattern
from magpie import Magpie
import time
client = Magpie()
def monitor_job_with_callback(job_id, callback=None, poll_interval=5):
"""
Monitor a job and call a callback function on each status update.
Args:
job_id: The job ID to monitor
callback: Function called with (job_id, status, elapsed_time)
poll_interval: Seconds between status checks
"""
start_time = time.time()
last_status = None
while True:
status = client.jobs.get_status(job_id)
elapsed = time.time() - start_time
# Call callback if status changed
if callback and status.status != last_status:
callback(job_id, status.status, elapsed)
last_status = status.status
# Exit if job is done
if status.status in ["completed", "failed", "cancelled"]:
return status
time.sleep(poll_interval)
# Example callback function
def progress_callback(job_id, status, elapsed):
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] Job {job_id[:8]}... -> {status} ({int(elapsed)}s)")
# Use it
job = client.jobs.create(
name="monitored-job",
script="python process.py",
vcpus=2,
memory_mb=2048
)
final_status = monitor_job_with_callback(job.id, callback=progress_callback) Environment Variables
Pass configuration and secrets to jobs using environment variables.
from magpie import Magpie
import os
client = Magpie()
# Pass environment variables to a job
job = client.jobs.create(
name="env-example",
script="""
echo "Database: $DB_HOST"
echo "Environment: $ENVIRONMENT"
# Use in Python scripts
python -c "import os; print(f'API Key: {os.getenv(\"API_KEY\")}')"
""",
env={
"DB_HOST": "postgres.example.com",
"DB_USER": "myapp",
"DB_PASSWORD": os.getenv("DB_PASSWORD"), # From local env
"API_KEY": os.getenv("EXTERNAL_API_KEY"),
"ENVIRONMENT": "production"
},
vcpus=1,
memory_mb=512
)
# Wait for completion
status = client.jobs.get_status(job.id)
while status.status == "running":
time.sleep(2)
status = client.jobs.get_status(job.id)
logs = client.jobs.get_logs(job.id)
print(logs.logs) Configuration from File
from magpie import Magpie
import json
client = Magpie()
# Load configuration from file
with open("config.json") as f:
config = json.load(f)
# Create env dict from config
env_vars = {
"CONFIG_" + k.upper(): str(v)
for k, v in config.items()
}
# Use in job
job = client.jobs.create(
name="configured-job",
script="""
# Access config values
echo "Server: $CONFIG_SERVER"
echo "Port: $CONFIG_PORT"
# Or use a config file approach
cat > /tmp/config.json << 'EOF'
$CONFIG_JSON
EOF
python app.py --config /tmp/config.json
""",
env={
**env_vars,
"CONFIG_JSON": json.dumps(config)
},
vcpus=2,
memory_mb=2048
) Error Handling
Robust error handling patterns for production applications using the Magpie Cloud SDK.
Basic Try-Except Pattern
from magpie import Magpie
from magpie.exceptions import (
AuthenticationError,
JobError,
ResourceError,
ValidationError,
TimeoutError
)
client = Magpie()
try:
job = client.jobs.create(
name="error-handling-example",
script="python my_script.py",
vcpus=2,
memory_mb=2048
)
# Wait for completion
status = client.jobs.get_status(job.id)
while status.status == "running":
time.sleep(5)
status = client.jobs.get_status(job.id)
if status.status == "completed":
logs = client.jobs.get_logs(job.id)
print("Success!")
print(logs.logs)
else:
print(f"Job failed with status: {status.status}")
logs = client.jobs.get_logs(job.id)
print(f"Logs:\\n{logs.logs}")
except AuthenticationError as e:
print(f"Authentication failed: {e}")
print("Check your API key")
except ValidationError as e:
print(f"Invalid parameters: {e}")
print("Check job configuration")
except ResourceError as e:
print(f"Resource error: {e}")
print("Requested resources may not be available")
except JobError as e:
print(f"Job error: {e}")
except TimeoutError as e:
print(f"Operation timed out: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# Log error to monitoring service
# Clean up resources if needed Retry Pattern with Exponential Backoff
from magpie import Magpie
from magpie.exceptions import ResourceError, TimeoutError
import time
client = Magpie()
def create_job_with_retry(max_retries=3, base_delay=2):
"""Create a job with automatic retry on transient errors."""
for attempt in range(max_retries):
try:
job = client.jobs.create(
name="retry-example",
script="python process.py",
vcpus=4,
memory_mb=4096
)
print(f"Job created successfully: {job.id}")
return job
except (ResourceError, TimeoutError) as e:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) # Exponential backoff
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay}s...")
time.sleep(delay)
else:
print(f"All {max_retries} attempts failed")
raise
except Exception as e:
# Don't retry on other errors
print(f"Non-retryable error: {e}")
raise
# Use it
try:
job = create_job_with_retry(max_retries=5)
except Exception as e:
print(f"Failed to create job: {e}")
# Handle failure (notify, log, etc.) Common Error Types
Understanding the different error types helps you handle failures appropriately.
| Error Type | Description | Common Causes | Recommended Action |
|---|---|---|---|
| AuthenticationError | Authentication or authorization failed | Invalid API key, expired key, insufficient permissions | Verify API key, check permissions, regenerate if needed |
| JobError | Job operation failed | Job not found, invalid state transition, execution error | Check job ID, verify job state, review logs |
| ResourceError | Requested resources unavailable | Insufficient quota, resource limits exceeded, capacity issues | Reduce resource requirements, retry later, contact support |
| ValidationError | Invalid request parameters | Missing required fields, invalid values, constraint violations | Check parameter types and values, review documentation |
| TimeoutError | Operation exceeded timeout | Slow network, service unavailable, long-running operation | Increase timeout, check network, retry operation |
Graceful Cleanup Pattern
from magpie import Magpie
import time
client = Magpie()
job = None
try:
# Create job
job = client.jobs.create(
name="cleanup-example",
script="python long_running_task.py",
vcpus=4,
memory_mb=8192,
stateful=True
)
# Process job
status = client.jobs.get_status(job.id)
while status.status == "running":
time.sleep(10)
status = client.jobs.get_status(job.id)
# Check result
if status.status != "completed":
raise Exception(f"Job failed with status: {status.status}")
# Process successful result
logs = client.jobs.get_logs(job.id)
print(logs.logs)
except KeyboardInterrupt:
print("\\nInterrupted by user")
if job:
print(f"Stopping job {job.id}...")
client.jobs.stop(job.id)
except Exception as e:
print(f"Error: {e}")
finally:
# Always cleanup
if job:
try:
# Delete ephemeral jobs, keep stateful ones for debugging
job_info = client.jobs.get(job.id)
if not job_info.stateful:
print(f"Cleaning up job {job.id}...")
client.jobs.delete(job.id)
else:
print(f"Job {job.id} preserved for debugging")
except Exception as e:
print(f"Cleanup warning: {e}")