Skip to main content
Version: Next

Testing Framework Integration

This guide covers advanced testing configuration for Merobox, including custom testing utilities, performance testing, and integration with popular testing frameworks.

Advanced Testing Configuration

Configure Merobox for complex testing scenarios:

Basic Testing Setup

# conftest.py
import pytest
from merobox.testing import cluster, workflow

@pytest.fixture(scope="session")
def production_cluster():
"""Production-like cluster for integration tests."""
with cluster(
count=3,
prefix="prod",
image="ghcr.io/calimero-network/merod:latest",
chain_id="mainnet-1",
wait_for_ready=True
) as test_env:
yield test_env

@pytest.fixture(scope="function")
def test_workflow():
"""Run a complex workflow for each test."""
with workflow(
"workflows/test-setup.yml",
prefix="test",
scope="function"
) as env:
yield env

Advanced Cluster Configuration

# Advanced cluster setup
@pytest.fixture(scope="session")
def advanced_cluster():
"""Advanced cluster with custom configuration."""
with cluster(
count=5,
prefix="advanced",
image="ghcr.io/calimero-network/merod:edge",
chain_id="testnet-1",
wait_for_ready=True,
resources={
"memory": "2G",
"cpus": "1.0"
},
environment={
"RUST_LOG": "debug",
"CALIMERO_TEST_MODE": "true"
},
networks=["calimero-test", "calimero-internal"],
volumes={
"test-data": "/calimero/test-data"
}
) as test_env:
yield test_env

Multi-Environment Testing

# Multi-environment testing
@pytest.fixture(params=["development", "staging", "production"])
def environment_cluster(request):
"""Test against different environments."""
env_config = {
"development": {
"image": "ghcr.io/calimero-network/merod:dev",
"count": 2,
"resources": {"memory": "1G", "cpus": "0.5"}
},
"staging": {
"image": "ghcr.io/calimero-network/merod:staging",
"count": 3,
"resources": {"memory": "2G", "cpus": "1.0"}
},
"production": {
"image": "ghcr.io/calimero-network/merod:latest",
"count": 5,
"resources": {"memory": "4G", "cpus": "2.0"}
}
}

config = env_config[request.param]
with cluster(
count=config["count"],
prefix=request.param,
image=config["image"],
resources=config["resources"],
wait_for_ready=True
) as test_env:
test_env["environment"] = request.param
yield test_env

Custom Test Utilities

Create custom testing utilities for your specific needs:

Basic Test Utilities

# test_utils.py
from merobox.testing import cluster
from merobox.commands.utils import get_node_rpc_url
import time
import requests

class TestEnvironment:
def __init__(self, node_count=2):
self.node_count = node_count
self.cluster = None

def __enter__(self):
self.cluster = cluster(count=self.node_count)
return self.cluster.__enter__()

def __exit__(self, *args):
if self.cluster:
self.cluster.__exit__(*args)

def get_node_endpoint(self, node_name):
"""Get the RPC endpoint for a specific node."""
return get_node_rpc_url(node_name, self.cluster["manager"])

def wait_for_node_ready(self, node_name, timeout=60):
"""Wait for a node to be ready."""
endpoint = self.get_node_endpoint(node_name)
start_time = time.time()

while time.time() - start_time < timeout:
try:
response = requests.get(f"{endpoint}/health", timeout=5)
if response.status_code == 200:
return True
except requests.RequestException:
pass
time.sleep(1)

raise TimeoutError(f"Node {node_name} not ready after {timeout} seconds")

def get_all_endpoints(self):
"""Get all node endpoints."""
endpoints = {}
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
endpoints[node_name] = self.get_node_endpoint(node_name)
return endpoints

Advanced Test Utilities

# advanced_test_utils.py
import asyncio
import aiohttp
from merobox.testing import cluster
from merobox.commands.utils import get_node_rpc_url
import json
import logging

class AsyncTestEnvironment:
def __init__(self, node_count=2):
self.node_count = node_count
self.cluster = None
self.session = None

async def __aenter__(self):
self.cluster = cluster(count=self.node_count)
self.cluster.__enter__()

# Create async HTTP session
self.session = aiohttp.ClientSession()
return self

async def __aexit__(self, *args):
if self.session:
await self.session.close()
if self.cluster:
self.cluster.__exit__(*args)

async def get_node_endpoint(self, node_name):
"""Get the RPC endpoint for a specific node."""
return get_node_rpc_url(node_name, self.cluster["manager"])

async def call_node_method(self, node_name, method, params=None):
"""Call a method on a specific node."""
endpoint = await self.get_node_endpoint(node_name)
url = f"{endpoint}/rpc"

payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or [],
"id": 1
}

async with self.session.post(url, json=payload) as response:
return await response.json()

async def wait_for_consensus(self, timeout=60):
"""Wait for all nodes to reach consensus."""
start_time = asyncio.get_event_loop().time()

while asyncio.get_event_loop().time() - start_time < timeout:
try:
# Check consensus on all nodes
tasks = []
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
task = self.call_node_method(node_name, "get_consensus_status")
tasks.append(task)

results = await asyncio.gather(*tasks, return_exceptions=True)

# Check if all nodes have consensus
if all(not isinstance(result, Exception) and result.get("result", {}).get("consensus") for result in results):
return True

except Exception as e:
logging.warning(f"Error checking consensus: {e}")

await asyncio.sleep(1)

raise TimeoutError(f"Consensus not reached after {timeout} seconds")

async def get_cluster_health(self):
"""Get health status of all nodes."""
tasks = []
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
endpoint = await self.get_node_endpoint(node_name)
task = self.session.get(f"{endpoint}/health")
tasks.append((node_name, task))

health_status = {}
for node_name, task in tasks:
try:
response = await task
health_status[node_name] = await response.json()
except Exception as e:
health_status[node_name] = {"error": str(e)}

return health_status

Test Data Management

# test_data_utils.py
import json
import tempfile
import os
from pathlib import Path

class TestDataManager:
def __init__(self, base_dir=None):
self.base_dir = base_dir or tempfile.mkdtemp()
self.test_data = {}

def create_test_application(self, name, version="1.0.0"):
"""Create a test WASM application."""
app_data = {
"name": name,
"version": version,
"metadata": {
"description": f"Test application {name}",
"author": "test",
"created_at": "2024-01-01T00:00:00Z"
}
}

app_file = Path(self.base_dir) / f"{name}.wasm"
app_file.write_bytes(b"fake wasm content")

self.test_data[name] = {
"file": str(app_file),
"data": app_data
}

return str(app_file)

def create_test_workflow(self, name, steps):
"""Create a test workflow file."""
workflow_data = {
"name": f"Test Workflow {name}",
"description": f"Test workflow for {name}",
"steps": steps
}

workflow_file = Path(self.base_dir) / f"{name}_workflow.yml"
workflow_file.write_text(json.dumps(workflow_data, indent=2))

return str(workflow_file)

def cleanup(self):
"""Clean up test data."""
import shutil
if os.path.exists(self.base_dir):
shutil.rmtree(self.base_dir)

Performance Testing

Configure Merobox for performance testing:

Basic Performance Test

# performance-test.yml
description: Performance testing configuration
name: Performance Test

# High-performance node configuration
nodes:
count: 5
image: ghcr.io/calimero-network/merod:edge
resources:
memory: '2G'
cpus: '1.0'
environment:
RUST_LOG: 'info'
CALIMERO_PERF_MODE: 'true'

# Performance monitoring
monitoring:
enabled: true
metrics:
- cpu_usage
- memory_usage
- network_io
- disk_io
interval: 5

steps:
- name: Performance Test
type: repeat
count: 1000
parallel: 10
steps:
- name: Load Test
type: call
node: calimero-node-{{iteration % 5 + 1}}
method: load_test
args:
load: '{{iteration}}'

Advanced Performance Testing

# performance_test.py
import asyncio
import aiohttp
import time
import statistics
from merobox.testing import cluster

class PerformanceTest:
def __init__(self, node_count=5, test_duration=300):
self.node_count = node_count
self.test_duration = test_duration
self.results = []

async def run_load_test(self, cluster_env):
"""Run load test against cluster."""
session = aiohttp.ClientSession()

try:
# Get all node endpoints
endpoints = []
for i in range(1, self.node_count + 1):
node_name = f"calimero-node-{i}"
endpoint = get_node_rpc_url(node_name, cluster_env["manager"])
endpoints.append(endpoint)

# Run load test
start_time = time.time()
tasks = []

while time.time() - start_time < self.test_duration:
# Create load test tasks
for endpoint in endpoints:
task = self._make_request(session, endpoint)
tasks.append(task)

# Wait for batch to complete
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
self.results.extend([r for r in batch_results if not isinstance(r, Exception)])

tasks = []
await asyncio.sleep(0.1) # Small delay between batches

finally:
await session.close()

async def _make_request(self, session, endpoint):
"""Make a single request to test performance."""
start_time = time.time()

try:
async with session.post(f"{endpoint}/rpc", json={
"jsonrpc": "2.0",
"method": "get_status",
"params": [],
"id": 1
}) as response:
await response.json()
return time.time() - start_time
except Exception as e:
return None

def analyze_results(self):
"""Analyze performance test results."""
if not self.results:
return {"error": "No results to analyze"}

response_times = [r for r in self.results if r is not None]

return {
"total_requests": len(self.results),
"successful_requests": len(response_times),
"failed_requests": len(self.results) - len(response_times),
"average_response_time": statistics.mean(response_times),
"median_response_time": statistics.median(response_times),
"p95_response_time": self._percentile(response_times, 95),
"p99_response_time": self._percentile(response_times, 99),
"requests_per_second": len(response_times) / self.test_duration
}

def _percentile(self, data, percentile):
"""Calculate percentile of data."""
sorted_data = sorted(data)
index = int(len(sorted_data) * percentile / 100)
return sorted_data[min(index, len(sorted_data) - 1)]

# Usage
async def test_performance():
with cluster(count=5, prefix="perf") as cluster_env:
perf_test = PerformanceTest(node_count=5, test_duration=60)
await perf_test.run_load_test(cluster_env)
results = perf_test.analyze_results()
print(f"Performance results: {results}")

Stress Testing

# stress_test.py
import asyncio
import random
from merobox.testing import cluster

class StressTest:
def __init__(self, node_count=3, max_concurrent=100):
self.node_count = node_count
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)

async def run_stress_test(self, cluster_env):
"""Run stress test with increasing load."""
tasks = []

# Gradually increase load
for load_level in range(10, self.max_concurrent + 1, 10):
print(f"Testing with {load_level} concurrent requests...")

# Create tasks for this load level
level_tasks = []
for _ in range(load_level):
task = self._stress_request(cluster_env)
level_tasks.append(task)

# Run tasks concurrently
results = await asyncio.gather(*level_tasks, return_exceptions=True)

# Analyze results
success_count = sum(1 for r in results if not isinstance(r, Exception))
print(f"Load level {load_level}: {success_count}/{load_level} successful")

# Wait before next load level
await asyncio.sleep(5)

async def _stress_request(self, cluster_env):
"""Make a stress test request."""
async with self.semaphore:
# Randomly select a node
node_id = random.randint(1, self.node_count)
node_name = f"calimero-node-{node_id}"

# Make request with random delay
await asyncio.sleep(random.uniform(0, 0.1))

# Simulate request
try:
# Your actual request logic here
return "success"
except Exception as e:
return e

Integration with Testing Frameworks

Pytest Integration

# conftest.py
import pytest
from merobox.testing import pytest_cluster, pytest_workflow

# Pytest fixtures
merobox_cluster = pytest_cluster(count=3, scope="session")
merobox_workflow = pytest_workflow("workflows/test.yml", scope="function")

# Custom fixtures
@pytest.fixture
def test_data():
"""Provide test data for tests."""
return {
"test_app": "test-application.wasm",
"test_config": {"setting": "value"},
"test_params": ["param1", "param2"]
}

@pytest.fixture
def mock_external_service():
"""Mock external service for testing."""
# Your mock setup here
pass

Test Cases

# test_integration.py
import pytest
from merobox.testing import cluster

class TestCalimeroIntegration:
def test_basic_cluster_operation(self, merobox_cluster):
"""Test basic cluster operations."""
endpoints = merobox_cluster["endpoints"]
assert len(endpoints) == 3

# Test each node
for node_name, endpoint in endpoints.items():
assert endpoint is not None
assert "calimero-node" in node_name

def test_workflow_execution(self, merobox_workflow):
"""Test workflow execution."""
workflow_result = merobox_workflow["workflow_result"]
assert workflow_result is True

# Check workflow outputs
assert "node_endpoints" in merobox_workflow
assert len(merobox_workflow["node_endpoints"]) > 0

def test_application_installation(self, merobox_cluster, test_data):
"""Test application installation."""
# Install test application
# Your installation logic here
pass

def test_performance_requirements(self, merobox_cluster):
"""Test performance requirements."""
# Performance testing logic
pass

@pytest.mark.asyncio
async def test_async_operations(self, merobox_cluster):
"""Test async operations."""
# Async testing logic
pass

Continuous Integration

# .github/workflows/test.yml
name: Test

on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'

- name: Install dependencies
run: |
pip install -r requirements.txt
pip install merobox

- name: Run tests
run: |
pytest tests/ -v --cov=merobox

- name: Run performance tests
run: |
pytest tests/performance/ -v -m performance

Best Practices

Test Organization

  1. Modular Tests: Organize tests into logical modules
  2. Test Data: Use consistent test data management
  3. Fixtures: Create reusable test fixtures
  4. Documentation: Document test cases and their purpose

Performance Testing

  1. Baseline Metrics: Establish baseline performance metrics
  2. Load Patterns: Test various load patterns and scenarios
  3. Resource Monitoring: Monitor resource usage during tests
  4. Regression Testing: Detect performance regressions

Error Handling

  1. Test Failures: Handle test failures gracefully
  2. Cleanup: Ensure proper cleanup after tests
  3. Debugging: Provide good debugging information
  4. Retry Logic: Implement retry logic for flaky tests

Next Steps

Now that you understand testing framework integration:

Was this page helpful?
Need some help? Check Support page