Testing Framework Integration
This guide covers advanced testing configuration for Merobox, including custom testing utilities, performance testing, and integration with popular testing frameworks.
Advanced Testing Configuration
Configure Merobox for complex testing scenarios:
Basic Testing Setup
# conftest.py
import pytest
from merobox.testing import cluster, workflow
@pytest.fixture(scope="session")
def production_cluster():
"""Production-like cluster for integration tests."""
with cluster(
count=3,
prefix="prod",
image="ghcr.io/calimero-network/merod:latest",
chain_id="mainnet-1",
wait_for_ready=True
) as test_env:
yield test_env
@pytest.fixture(scope="function")
def test_workflow():
"""Run a complex workflow for each test."""
with workflow(
"workflows/test-setup.yml",
prefix="test",
scope="function"
) as env:
yield env
Advanced Cluster Configuration
# Advanced cluster setup
@pytest.fixture(scope="session")
def advanced_cluster():
"""Advanced cluster with custom configuration."""
with cluster(
count=5,
prefix="advanced",
image="ghcr.io/calimero-network/merod:edge",
chain_id="testnet-1",
wait_for_ready=True,
resources={
"memory": "2G",
"cpus": "1.0"
},
environment={
"RUST_LOG": "debug",
"CALIMERO_TEST_MODE": "true"
},
networks=["calimero-test", "calimero-internal"],
volumes={
"test-data": "/calimero/test-data"
}
) as test_env:
yield test_env
Multi-Environment Testing
# Multi-environment testing
@pytest.fixture(params=["development", "staging", "production"])
def environment_cluster(request):
"""Test against different environments."""
env_config = {
"development": {
"image": "ghcr.io/calimero-network/merod:dev",
"count": 2,
"resources": {"memory": "1G", "cpus": "0.5"}
},
"staging": {
"image": "ghcr.io/calimero-network/merod:staging",
"count": 3,
"resources": {"memory": "2G", "cpus": "1.0"}
},
"production": {
"image": "ghcr.io/calimero-network/merod:latest",
"count": 5,
"resources": {"memory": "4G", "cpus": "2.0"}
}
}
config = env_config[request.param]
with cluster(
count=config["count"],
prefix=request.param,
image=config["image"],
resources=config["resources"],
wait_for_ready=True
) as test_env:
test_env["environment"] = request.param
yield test_env
Custom Test Utilities
Create custom testing utilities for your specific needs:
Basic Test Utilities
# test_utils.py
from merobox.testing import cluster
from merobox.commands.utils import get_node_rpc_url
import time
import requests
class TestEnvironment:
def __init__(self, node_count=2):
self.node_count = node_count
self.cluster = None
def __enter__(self):
self.cluster = cluster(count=self.node_count)
return self.cluster.__enter__()
def __exit__(self, *args):
if self.cluster:
self.cluster.__exit__(*args)
def get_node_endpoint(self, node_name):
"""Get the RPC endpoint for a specific node."""
return get_node_rpc_url(node_name, self.cluster["manager"])
def wait_for_node_ready(self, node_name, timeout=60):
"""Wait for a node to be ready."""
endpoint = self.get_node_endpoint(node_name)
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{endpoint}/health", timeout=5)
if response.status_code == 200:
return True
except requests.RequestException:
pass
time.sleep(1)
raise TimeoutError(f"Node {node_name} not ready after {timeout} seconds")
def get_all_endpoints(self):
"""Get all node endpoints."""
endpoints = {}
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
endpoints[node_name] = self.get_node_endpoint(node_name)
return endpoints
Advanced Test Utilities
# advanced_test_utils.py
import asyncio
import aiohttp
from merobox.testing import cluster
from merobox.commands.utils import get_node_rpc_url
import json
import logging
class AsyncTestEnvironment:
def __init__(self, node_count=2):
self.node_count = node_count
self.cluster = None
self.session = None
async def __aenter__(self):
self.cluster = cluster(count=self.node_count)
self.cluster.__enter__()
# Create async HTTP session
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
if self.cluster:
self.cluster.__exit__(*args)
async def get_node_endpoint(self, node_name):
"""Get the RPC endpoint for a specific node."""
return get_node_rpc_url(node_name, self.cluster["manager"])
async def call_node_method(self, node_name, method, params=None):
"""Call a method on a specific node."""
endpoint = await self.get_node_endpoint(node_name)
url = f"{endpoint}/rpc"
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or [],
"id": 1
}
async with self.session.post(url, json=payload) as response:
return await response.json()
async def wait_for_consensus(self, timeout=60):
"""Wait for all nodes to reach consensus."""
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < timeout:
try:
# Check consensus on all nodes
tasks = []
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
task = self.call_node_method(node_name, "get_consensus_status")
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check if all nodes have consensus
if all(not isinstance(result, Exception) and result.get("result", {}).get("consensus") for result in results):
return True
except Exception as e:
logging.warning(f"Error checking consensus: {e}")
await asyncio.sleep(1)
raise TimeoutError(f"Consensus not reached after {timeout} seconds")
async def get_cluster_health(self):
"""Get health status of all nodes."""
tasks = []
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
endpoint = await self.get_node_endpoint(node_name)
task = self.session.get(f"{endpoint}/health")
tasks.append((node_name, task))
health_status = {}
for node_name, task in tasks:
try:
response = await task
health_status[node_name] = await response.json()
except Exception as e:
health_status[node_name] = {"error": str(e)}
return health_status
Test Data Management
# test_data_utils.py
import json
import tempfile
import os
from pathlib import Path
class TestDataManager:
def __init__(self, base_dir=None):
self.base_dir = base_dir or tempfile.mkdtemp()
self.test_data = {}
def create_test_application(self, name, version="1.0.0"):
"""Create a test WASM application."""
app_data = {
"name": name,
"version": version,
"metadata": {
"description": f"Test application {name}",
"author": "test",
"created_at": "2024-01-01T00:00:00Z"
}
}
app_file = Path(self.base_dir) / f"{name}.wasm"
app_file.write_bytes(b"fake wasm content")
self.test_data[name] = {
"file": str(app_file),
"data": app_data
}
return str(app_file)
def create_test_workflow(self, name, steps):
"""Create a test workflow file."""
workflow_data = {
"name": f"Test Workflow {name}",
"description": f"Test workflow for {name}",
"steps": steps
}
workflow_file = Path(self.base_dir) / f"{name}_workflow.yml"
workflow_file.write_text(json.dumps(workflow_data, indent=2))
return str(workflow_file)
def cleanup(self):
"""Clean up test data."""
import shutil
if os.path.exists(self.base_dir):
shutil.rmtree(self.base_dir)
Performance Testing
Configure Merobox for performance testing:
Basic Performance Test
# performance-test.yml
description: Performance testing configuration
name: Performance Test
# High-performance node configuration
nodes:
count: 5
image: ghcr.io/calimero-network/merod:edge
resources:
memory: '2G'
cpus: '1.0'
environment:
RUST_LOG: 'info'
CALIMERO_PERF_MODE: 'true'
# Performance monitoring
monitoring:
enabled: true
metrics:
- cpu_usage
- memory_usage
- network_io
- disk_io
interval: 5
steps:
- name: Performance Test
type: repeat
count: 1000
parallel: 10
steps:
- name: Load Test
type: call
node: calimero-node-{{iteration % 5 + 1}}
method: load_test
args:
load: '{{iteration}}'
Advanced Performance Testing
# performance_test.py
import asyncio
import aiohttp
import time
import statistics
from merobox.testing import cluster
class PerformanceTest:
def __init__(self, node_count=5, test_duration=300):
self.node_count = node_count
self.test_duration = test_duration
self.results = []
async def run_load_test(self, cluster_env):
"""Run load test against cluster."""
session = aiohttp.ClientSession()
try:
# Get all node endpoints
endpoints = []
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
endpoint = get_node_rpc_url(node_name, cluster_env["manager"])
endpoints.append(endpoint)
# Run load test
start_time = time.time()
tasks = []
while time.time() - start_time < self.test_duration:
# Create load test tasks
for endpoint in endpoints:
task = self._make_request(session, endpoint)
tasks.append(task)
# Wait for batch to complete
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
self.results.extend([r for r in batch_results if not isinstance(r, Exception)])
tasks = []
await asyncio.sleep(0.1) # Small delay between batches
finally:
await session.close()
async def _make_request(self, session, endpoint):
"""Make a single request to test performance."""
start_time = time.time()
try:
async with session.post(f"{endpoint}/rpc", json={
"jsonrpc": "2.0",
"method": "get_status",
"params": [],
"id": 1
}) as response:
await response.json()
return time.time() - start_time
except Exception as e:
return None
def analyze_results(self):
"""Analyze performance test results."""
if not self.results:
return {"error": "No results to analyze"}
response_times = [r for r in self.results if r is not None]
return {
"total_requests": len(self.results),
"successful_requests": len(response_times),
"failed_requests": len(self.results) - len(response_times),
"average_response_time": statistics.mean(response_times),
"median_response_time": statistics.median(response_times),
"p95_response_time": self._percentile(response_times, 95),
"p99_response_time": self._percentile(response_times, 99),
"requests_per_second": len(response_times) / self.test_duration
}
def _percentile(self, data, percentile):
"""Calculate percentile of data."""
sorted_data = sorted(data)
index = int(len(sorted_data) * percentile / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
# Usage
async def test_performance():
with cluster(count=5, prefix="perf") as cluster_env:
perf_test = PerformanceTest(node_count=5, test_duration=60)
await perf_test.run_load_test(cluster_env)
results = perf_test.analyze_results()
print(f"Performance results: {results}")
Stress Testing
# stress_test.py
import asyncio
import random
from merobox.testing import cluster
class StressTest:
def __init__(self, node_count=3, max_concurrent=100):
self.node_count = node_count
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def run_stress_test(self, cluster_env):
"""Run stress test with increasing load."""
tasks = []
# Gradually increase load
for load_level in range(10, self.max_concurrent + 1, 10):
print(f"Testing with {load_level} concurrent requests...")
# Create tasks for this load level
level_tasks = []
for _ in range(load_level):
task = self._stress_request(cluster_env)
level_tasks.append(task)
# Run tasks concurrently
results = await asyncio.gather(*level_tasks, return_exceptions=True)
# Analyze results
success_count = sum(1 for r in results if not isinstance(r, Exception))
print(f"Load level {load_level}: {success_count}/{load_level} successful")
# Wait before next load level
await asyncio.sleep(5)
async def _stress_request(self, cluster_env):
"""Make a stress test request."""
async with self.semaphore:
# Randomly select a node
node_id = random.randint(1, self.node_count)
node_name = f"calimero-node-{node_id}"
# Make request with random delay
await asyncio.sleep(random.uniform(0, 0.1))
# Simulate request
try:
# Your actual request logic here
return "success"
except Exception as e:
return e
Integration with Testing Frameworks
Pytest Integration
# conftest.py
import pytest
from merobox.testing import pytest_cluster, pytest_workflow
# Pytest fixtures
merobox_cluster = pytest_cluster(count=3, scope="session")
merobox_workflow = pytest_workflow("workflows/test.yml", scope="function")
# Custom fixtures
@pytest.fixture
def test_data():
"""Provide test data for tests."""
return {
"test_app": "test-application.wasm",
"test_config": {"setting": "value"},
"test_params": ["param1", "param2"]
}
@pytest.fixture
def mock_external_service():
"""Mock external service for testing."""
# Your mock setup here
pass
Test Cases
# test_integration.py
import pytest
from merobox.testing import cluster
class TestCalimeroIntegration:
def test_basic_cluster_operation(self, merobox_cluster):
"""Test basic cluster operations."""
endpoints = merobox_cluster["endpoints"]
assert len(endpoints) == 3
# Test each node
for node_name, endpoint in endpoints.items():
assert endpoint is not None
assert "calimero-node" in node_name
def test_workflow_execution(self, merobox_workflow):
"""Test workflow execution."""
workflow_result = merobox_workflow["workflow_result"]
assert workflow_result is True
# Check workflow outputs
assert "node_endpoints" in merobox_workflow
assert len(merobox_workflow["node_endpoints"]) > 0
def test_application_installation(self, merobox_cluster, test_data):
"""Test application installation."""
# Install test application
# Your installation logic here
pass
def test_performance_requirements(self, merobox_cluster):
"""Test performance requirements."""
# Performance testing logic
pass
@pytest.mark.asyncio
async def test_async_operations(self, merobox_cluster):
"""Test async operations."""
# Async testing logic
pass
Continuous Integration
# .github/workflows/test.yml
name: Test
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install merobox
- name: Run tests
run: |
pytest tests/ -v --cov=merobox
- name: Run performance tests
run: |
pytest tests/performance/ -v -m performance
Best Practices
Test Organization
- Modular Tests: Organize tests into logical modules
- Test Data: Use consistent test data management
- Fixtures: Create reusable test fixtures
- Documentation: Document test cases and their purpose
Performance Testing
- Baseline Metrics: Establish baseline performance metrics
- Load Patterns: Test various load patterns and scenarios
- Resource Monitoring: Monitor resource usage during tests
- Regression Testing: Detect performance regressions
Error Handling
- Test Failures: Handle test failures gracefully
- Cleanup: Ensure proper cleanup after tests
- Debugging: Provide good debugging information
- Retry Logic: Implement retry logic for flaky tests
Next Steps
Now that you understand testing framework integration:
- Resource Management - Resource limits and monitoring
- Security Configuration - Security settings and policies
- Advanced Configuration - Other advanced features
Was this page helpful?