 6933a6ccb1
			
		
	
	6933a6ccb1
	
	
	
		
			
			- Complete Gemini CLI agent adapter with SSH execution - CLI agent factory with connection pooling - SSH executor with AsyncSSH for remote CLI execution - Backend integration with CLI agent manager - MCP server updates with CLI agent tools - Frontend UI updates for mixed agent types - Database migrations for CLI agent support - Docker deployment with CLI source integration - Comprehensive documentation and testing 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
	
		
			29 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	
			29 KiB
		
	
	
	
	
	
	
	
🧪 CCLI Testing Strategy
Project: Gemini CLI Agent Integration
Version: 1.0
Testing Philosophy: Fail Fast, Test Early, Protect Production
🎯 Testing Objectives
Primary Goals
- Zero Impact: Ensure CLI agent integration doesn't affect existing Ollama agents
- Reliability: Validate CLI agents work consistently under various conditions
- Performance: Ensure CLI agents meet performance requirements
- Security: Verify SSH connections and authentication are secure
- Scalability: Test concurrent execution and resource usage
Quality Gates
- Unit Tests: ≥90% code coverage for CLI agent components
- Integration Tests: 100% of CLI agent workflows tested end-to-end
- Performance Tests: CLI agents perform within 150% of Ollama baseline
- Security Tests: All SSH connections and authentication validated
- Load Tests: System stable under 10x normal load with CLI agents
📋 Test Categories
1. 🔧 Unit Tests
1.1 CLI Agent Adapter Tests
# File: src/tests/test_gemini_cli_agent.py
import pytest
from unittest.mock import Mock, AsyncMock
from src.agents.gemini_cli_agent import GeminiCliAgent, GeminiCliConfig
class TestGeminiCliAgent:
    @pytest.fixture
    def agent_config(self):
        return GeminiCliConfig(
            host="test-host",
            node_path="/test/node",
            gemini_path="/test/gemini",
            node_version="v22.14.0",
            model="gemini-2.5-pro"
        )
    
    @pytest.fixture
    def agent(self, agent_config):
        return GeminiCliAgent(agent_config, "test_specialty")
    
    async def test_execute_task_success(self, agent, mocker):
        """Test successful task execution"""
        mock_ssh_execute = mocker.patch.object(agent, '_ssh_execute')
        mock_ssh_execute.return_value = Mock(
            stdout="Test response",
            returncode=0,
            duration=1.5
        )
        
        result = await agent.execute_task("Test prompt")
        
        assert result["status"] == "completed"
        assert result["response"] == "Test response"
        assert result["execution_time"] == 1.5
        assert result["model"] == "gemini-2.5-pro"
    
    async def test_execute_task_failure(self, agent, mocker):
        """Test task execution failure handling"""
        mock_ssh_execute = mocker.patch.object(agent, '_ssh_execute')
        mock_ssh_execute.side_effect = Exception("SSH connection failed")
        
        result = await agent.execute_task("Test prompt")
        
        assert result["status"] == "failed"
        assert "SSH connection failed" in result["error"]
    
    async def test_concurrent_task_limit(self, agent):
        """Test concurrent task execution limits"""
        agent.config.max_concurrent = 2
        
        # Start 2 tasks
        task1 = agent.execute_task("Task 1")
        task2 = agent.execute_task("Task 2")
        
        # Third task should fail
        with pytest.raises(Exception, match="maximum concurrent tasks"):
            await agent.execute_task("Task 3")
1.2 SSH Executor Tests
# File: src/tests/test_ssh_executor.py
import pytest
from src.executors.ssh_executor import SSHExecutor, SSHResult
class TestSSHExecutor:
    @pytest.fixture
    def executor(self):
        return SSHExecutor(connection_pool_size=2)
    
    async def test_connection_pooling(self, executor, mocker):
        """Test SSH connection pooling"""
        mock_connect = mocker.patch('asyncssh.connect')
        mock_conn = AsyncMock()
        mock_connect.return_value = mock_conn
        
        # Execute multiple commands on same host
        await executor.execute("test-host", "command1")
        await executor.execute("test-host", "command2")
        
        # Should reuse connection
        assert mock_connect.call_count == 1
    
    async def test_command_timeout(self, executor, mocker):
        """Test command timeout handling"""
        mock_connect = mocker.patch('asyncssh.connect')
        mock_conn = AsyncMock()
        mock_conn.run.side_effect = asyncio.TimeoutError()
        mock_connect.return_value = mock_conn
        
        with pytest.raises(Exception, match="SSH command timeout"):
            await executor.execute("test-host", "slow-command", timeout=1)
1.3 Agent Factory Tests
# File: src/tests/test_cli_agent_factory.py
from src.agents.cli_agent_factory import CliAgentFactory
class TestCliAgentFactory:
    def test_create_known_agent(self):
        """Test creating predefined agents"""
        agent = CliAgentFactory.create_agent("walnut-gemini", "general_ai")
        
        assert agent.config.host == "walnut"
        assert agent.config.node_version == "v22.14.0"
        assert agent.specialization == "general_ai"
    
    def test_create_unknown_agent(self):
        """Test error handling for unknown agents"""
        with pytest.raises(ValueError, match="Unknown CLI agent"):
            CliAgentFactory.create_agent("nonexistent-agent", "test")
2. 🔗 Integration Tests
2.1 End-to-End CLI Agent Execution
# File: src/tests/integration/test_cli_agent_integration.py
import pytest
from backend.app.core.hive_coordinator import HiveCoordinator
from backend.app.models.agent import Agent, AgentType
class TestCliAgentIntegration:
    @pytest.fixture
    async def coordinator(self):
        coordinator = HiveCoordinator()
        await coordinator.initialize()
        return coordinator
    
    @pytest.fixture
    def cli_agent(self):
        return Agent(
            id="test-cli-agent",
            endpoint="cli://test-host",
            model="gemini-2.5-pro",
            specialty="general_ai",
            agent_type=AgentType.CLI_GEMINI,
            cli_config={
                "host": "test-host",
                "node_path": "/test/node",
                "gemini_path": "/test/gemini",
                "node_version": "v22.14.0"
            }
        )
    
    async def test_cli_task_execution(self, coordinator, cli_agent):
        """Test complete CLI task execution workflow"""
        task = coordinator.create_task(
            task_type=AgentType.CLI_GEMINI,
            context={"prompt": "What is 2+2?"},
            priority=3
        )
        
        result = await coordinator.execute_task(task, cli_agent)
        
        assert result["status"] == "completed"
        assert "response" in result
        assert task.status == TaskStatus.COMPLETED
2.2 Mixed Agent Type Coordination
# File: src/tests/integration/test_mixed_agent_coordination.py
class TestMixedAgentCoordination:
    async def test_ollama_and_cli_agents_together(self, coordinator):
        """Test Ollama and CLI agents working together"""
        # Create tasks for both agent types
        ollama_task = coordinator.create_task(
            task_type=AgentType.PYTORCH_DEV,
            context={"prompt": "Generate Python code"},
            priority=3
        )
        
        cli_task = coordinator.create_task(
            task_type=AgentType.CLI_GEMINI,
            context={"prompt": "Analyze this code"},
            priority=3
        )
        
        # Execute tasks concurrently
        ollama_result, cli_result = await asyncio.gather(
            coordinator.process_task(ollama_task),
            coordinator.process_task(cli_task)
        )
        
        assert ollama_result["status"] == "completed"
        assert cli_result["status"] == "completed"
2.3 MCP Server CLI Agent Support
// File: mcp-server/src/tests/integration/test_cli_agent_mcp.test.ts
describe('MCP CLI Agent Integration', () => {
    let hiveTools: HiveTools;
    
    beforeEach(() => {
        hiveTools = new HiveTools(mockHiveClient);
    });
    
    test('should execute task on CLI agent', async () => {
        const result = await hiveTools.executeTool('hive_create_task', {
            type: 'cli_gemini',
            priority: 3,
            objective: 'Test CLI agent execution'
        });
        
        expect(result.isError).toBe(false);
        expect(result.content[0].text).toContain('Task created successfully');
    });
    
    test('should discover both Ollama and CLI agents', async () => {
        const result = await hiveTools.executeTool('hive_get_agents', {});
        
        expect(result.isError).toBe(false);
        const agents = JSON.parse(result.content[0].text);
        
        // Should include both types
        expect(agents.some(a => a.agent_type === 'ollama')).toBe(true);
        expect(agents.some(a => a.agent_type === 'cli_gemini')).toBe(true);
    });
});
3. 📊 Performance Tests
3.1 Response Time Benchmarking
# File: scripts/benchmark-response-times.sh
#!/bin/bash
echo "🏃 CLI Agent Response Time Benchmarking"
# Test single task execution times
benchmark_single_task() {
    local agent_type=$1
    local iterations=10
    local total_time=0
    
    echo "Benchmarking $agent_type agent (${iterations} iterations)..."
    
    for i in $(seq 1 $iterations); do
        start_time=$(date +%s.%N)
        
        curl -s -X POST http://localhost:8000/api/tasks \
            -H "Content-Type: application/json" \
            -d "{
                \"agent_type\": \"$agent_type\",
                \"prompt\": \"What is the capital of France?\",
                \"priority\": 3
            }" > /dev/null
        
        end_time=$(date +%s.%N)
        duration=$(echo "$end_time - $start_time" | bc)
        total_time=$(echo "$total_time + $duration" | bc)
        
        echo "Iteration $i: ${duration}s"
    done
    
    average_time=$(echo "scale=2; $total_time / $iterations" | bc)
    echo "$agent_type average response time: ${average_time}s"
}
# Run benchmarks
benchmark_single_task "ollama"
benchmark_single_task "cli_gemini"
# Compare results
echo "📊 Performance Comparison Complete"
3.2 Concurrent Execution Testing
# File: scripts/test_concurrent_execution.py
import asyncio
import aiohttp
import time
from typing import List, Tuple
async def test_concurrent_cli_agents():
    """Test concurrent CLI agent execution under load"""
    
    async def execute_task(session: aiohttp.ClientSession, task_id: int) -> Tuple[int, float, str]:
        start_time = time.time()
        
        async with session.post(
            'http://localhost:8000/api/tasks',
            json={
                'agent_type': 'cli_gemini',
                'prompt': f'Process task {task_id}',
                'priority': 3
            }
        ) as response:
            result = await response.json()
            duration = time.time() - start_time
            status = result.get('status', 'unknown')
            
            return task_id, duration, status
    
    # Test various concurrency levels
    concurrency_levels = [1, 2, 4, 8, 16]
    
    for concurrency in concurrency_levels:
        print(f"\n🔄 Testing {concurrency} concurrent CLI agent tasks...")
        
        async with aiohttp.ClientSession() as session:
            tasks = [
                execute_task(session, i) 
                for i in range(concurrency)
            ]
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Analyze results
            successful_tasks = [r for r in results if isinstance(r, tuple) and r[2] == 'completed']
            failed_tasks = [r for r in results if not isinstance(r, tuple) or r[2] != 'completed']
            
            if successful_tasks:
                avg_duration = sum(r[1] for r in successful_tasks) / len(successful_tasks)
                print(f"  ✅ {len(successful_tasks)}/{concurrency} tasks successful")
                print(f"  ⏱️  Average duration: {avg_duration:.2f}s")
            
            if failed_tasks:
                print(f"  ❌ {len(failed_tasks)} tasks failed")
if __name__ == "__main__":
    asyncio.run(test_concurrent_cli_agents())
3.3 Resource Usage Monitoring
# File: scripts/monitor_resource_usage.py
import psutil
import time
import asyncio
from typing import Dict, List
class ResourceMonitor:
    def __init__(self):
        self.baseline_metrics = self.get_system_metrics()
        
    def get_system_metrics(self) -> Dict:
        """Get current system resource usage"""
        return {
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'network_io': psutil.net_io_counters(),
            'ssh_connections': self.count_ssh_connections()
        }
    
    def count_ssh_connections(self) -> int:
        """Count active SSH connections"""
        connections = psutil.net_connections()
        ssh_conns = [c for c in connections if c.laddr and c.laddr.port == 22]
        return len(ssh_conns)
    
    async def monitor_during_cli_execution(self, duration_minutes: int = 10):
        """Monitor resource usage during CLI agent execution"""
        print(f"🔍 Monitoring resources for {duration_minutes} minutes...")
        
        metrics_history = []
        end_time = time.time() + (duration_minutes * 60)
        
        while time.time() < end_time:
            current_metrics = self.get_system_metrics()
            metrics_history.append({
                'timestamp': time.time(),
                **current_metrics
            })
            
            print(f"CPU: {current_metrics['cpu_percent']}%, "
                  f"Memory: {current_metrics['memory_percent']}%, "
                  f"SSH Connections: {current_metrics['ssh_connections']}")
            
            await asyncio.sleep(30)  # Sample every 30 seconds
        
        self.analyze_resource_usage(metrics_history)
    
    def analyze_resource_usage(self, metrics_history: List[Dict]):
        """Analyze resource usage patterns"""
        if not metrics_history:
            return
        
        avg_cpu = sum(m['cpu_percent'] for m in metrics_history) / len(metrics_history)
        max_cpu = max(m['cpu_percent'] for m in metrics_history)
        
        avg_memory = sum(m['memory_percent'] for m in metrics_history) / len(metrics_history)
        max_memory = max(m['memory_percent'] for m in metrics_history)
        
        max_ssh_conns = max(m['ssh_connections'] for m in metrics_history)
        
        print(f"\n📊 Resource Usage Analysis:")
        print(f"  CPU - Average: {avg_cpu:.1f}%, Peak: {max_cpu:.1f}%")
        print(f"  Memory - Average: {avg_memory:.1f}%, Peak: {max_memory:.1f}%")
        print(f"  SSH Connections - Peak: {max_ssh_conns}")
        
        # Check if within acceptable limits
        if max_cpu > 80:
            print("  ⚠️  High CPU usage detected")
        if max_memory > 85:
            print("  ⚠️  High memory usage detected")
        if max_ssh_conns > 20:
            print("  ⚠️  High SSH connection count")
4. 🔒 Security Tests
4.1 SSH Authentication Testing
# File: src/tests/security/test_ssh_security.py
import pytest
from src.executors.ssh_executor import SSHExecutor
class TestSSHSecurity:
    async def test_key_based_authentication(self):
        """Test SSH key-based authentication"""
        executor = SSHExecutor()
        
        # Should succeed with proper key
        result = await executor.execute("walnut", "echo 'test'")
        assert result.returncode == 0
    
    async def test_connection_timeout(self):
        """Test SSH connection timeout handling"""
        executor = SSHExecutor()
        
        with pytest.raises(Exception, match="timeout"):
            await executor.execute("invalid-host", "echo 'test'", timeout=5)
    
    async def test_command_injection_prevention(self):
        """Test prevention of command injection"""
        executor = SSHExecutor()
        
        # Malicious command should be properly escaped
        malicious_input = "test'; rm -rf /; echo 'evil"
        result = await executor.execute("walnut", f"echo '{malicious_input}'")
        
        # Should not execute the rm command
        assert "evil" in result.stdout
        assert result.returncode == 0
4.2 Network Security Testing
# File: scripts/test-network-security.sh
#!/bin/bash
echo "🔒 Network Security Testing for CLI Agents"
# Test SSH connection encryption
test_ssh_encryption() {
    echo "Testing SSH connection encryption..."
    
    # Capture network traffic during SSH session
    timeout 10s tcpdump -i any -c 20 port 22 > /tmp/ssh_traffic.log 2>&1 &
    tcpdump_pid=$!
    
    # Execute CLI command
    ssh walnut "echo 'test connection'" > /dev/null 2>&1
    
    # Stop traffic capture
    kill $tcpdump_pid 2>/dev/null
    
    # Verify encrypted traffic (should not contain plaintext)
    if grep -q "test connection" /tmp/ssh_traffic.log; then
        echo "❌ SSH traffic appears to be unencrypted"
        return 1
    else
        echo "✅ SSH traffic is properly encrypted"
        return 0
    fi
}
# Test connection limits
test_connection_limits() {
    echo "Testing SSH connection limits..."
    
    # Try to open many connections
    for i in {1..25}; do
        ssh -o ConnectTimeout=5 walnut "sleep 1" &
    done
    
    wait
    echo "✅ Connection limit testing completed"
}
# Run security tests
test_ssh_encryption
test_connection_limits
echo "🔒 Network security testing completed"
5. 🚀 Load Tests
5.1 Sustained Load Testing
# File: scripts/load_test_sustained.py
import asyncio
import aiohttp
import random
import time
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class LoadTestConfig:
    duration_minutes: int = 30
    requests_per_second: int = 2
    cli_agent_percentage: int = 30  # 30% CLI, 70% Ollama
    
class SustainedLoadTester:
    def __init__(self, config: LoadTestConfig):
        self.config = config
        self.results = []
        
    async def generate_load(self):
        """Generate sustained load on the system"""
        end_time = time.time() + (self.config.duration_minutes * 60)
        task_counter = 0
        
        async with aiohttp.ClientSession() as session:
            while time.time() < end_time:
                # Determine agent type based on percentage
                use_cli = random.randint(1, 100) <= self.config.cli_agent_percentage
                agent_type = "cli_gemini" if use_cli else "ollama"
                
                # Create task
                task = asyncio.create_task(
                    self.execute_single_request(session, agent_type, task_counter)
                )
                
                task_counter += 1
                
                # Maintain request rate
                await asyncio.sleep(1.0 / self.config.requests_per_second)
        
        # Wait for all tasks to complete
        await asyncio.gather(*asyncio.all_tasks(), return_exceptions=True)
        
        self.analyze_results()
    
    async def execute_single_request(self, session: aiohttp.ClientSession, 
                                   agent_type: str, task_id: int):
        """Execute a single request and record metrics"""
        start_time = time.time()
        
        try:
            async with session.post(
                'http://localhost:8000/api/tasks',
                json={
                    'agent_type': agent_type,
                    'prompt': f'Load test task {task_id}',
                    'priority': 3
                },
                timeout=aiohttp.ClientTimeout(total=60)
            ) as response:
                result = await response.json()
                duration = time.time() - start_time
                
                self.results.append({
                    'task_id': task_id,
                    'agent_type': agent_type,
                    'duration': duration,
                    'status': response.status,
                    'success': response.status == 200
                })
                
        except Exception as e:
            duration = time.time() - start_time
            self.results.append({
                'task_id': task_id,
                'agent_type': agent_type,
                'duration': duration,
                'status': 0,
                'success': False,
                'error': str(e)
            })
    
    def analyze_results(self):
        """Analyze load test results"""
        if not self.results:
            print("No results to analyze")
            return
        
        total_requests = len(self.results)
        successful_requests = sum(1 for r in self.results if r['success'])
        
        cli_results = [r for r in self.results if r['agent_type'] == 'cli_gemini']
        ollama_results = [r for r in self.results if r['agent_type'] == 'ollama']
        
        print(f"\n📊 Load Test Results:")
        print(f"  Total Requests: {total_requests}")
        print(f"  Success Rate: {successful_requests/total_requests*100:.1f}%")
        
        if cli_results:
            cli_avg_duration = sum(r['duration'] for r in cli_results) / len(cli_results)
            cli_success_rate = sum(1 for r in cli_results if r['success']) / len(cli_results)
            print(f"  CLI Agents - Count: {len(cli_results)}, "
                  f"Avg Duration: {cli_avg_duration:.2f}s, "
                  f"Success Rate: {cli_success_rate*100:.1f}%")
        
        if ollama_results:
            ollama_avg_duration = sum(r['duration'] for r in ollama_results) / len(ollama_results)
            ollama_success_rate = sum(1 for r in ollama_results if r['success']) / len(ollama_results)
            print(f"  Ollama Agents - Count: {len(ollama_results)}, "
                  f"Avg Duration: {ollama_avg_duration:.2f}s, "
                  f"Success Rate: {ollama_success_rate*100:.1f}%")
# Run load test
if __name__ == "__main__":
    config = LoadTestConfig(
        duration_minutes=30,
        requests_per_second=2,
        cli_agent_percentage=30
    )
    
    tester = SustainedLoadTester(config)
    asyncio.run(tester.generate_load())
6. 🧪 Chaos Testing
6.1 Network Interruption Testing
# File: scripts/chaos-test-network.sh
#!/bin/bash
echo "🌪️  Chaos Testing: Network Interruptions"
# Function to simulate network latency
simulate_network_latency() {
    local target_host=$1
    local delay_ms=$2
    local duration_seconds=$3
    
    echo "Adding ${delay_ms}ms latency to $target_host for ${duration_seconds}s..."
    
    # Add network delay (requires root/sudo)
    sudo tc qdisc add dev eth0 root netem delay ${delay_ms}ms
    
    # Wait for specified duration
    sleep $duration_seconds
    
    # Remove network delay
    sudo tc qdisc del dev eth0 root netem
    
    echo "Network latency removed"
}
# Function to simulate network packet loss
simulate_packet_loss() {
    local loss_percentage=$1
    local duration_seconds=$2
    
    echo "Simulating ${loss_percentage}% packet loss for ${duration_seconds}s..."
    
    sudo tc qdisc add dev eth0 root netem loss ${loss_percentage}%
    sleep $duration_seconds
    sudo tc qdisc del dev eth0 root netem
    
    echo "Packet loss simulation ended"
}
# Test CLI agent resilience during network issues
test_cli_resilience_during_network_chaos() {
    echo "Testing CLI agent resilience during network chaos..."
    
    # Start background CLI agent tasks
    for i in {1..5}; do
        {
            curl -X POST http://localhost:8000/api/tasks \
                -H "Content-Type: application/json" \
                -d "{\"agent_type\": \"cli_gemini\", \"prompt\": \"Chaos test task $i\"}" \
                > /tmp/chaos_test_$i.log 2>&1
        } &
    done
    
    # Introduce network chaos
    sleep 2
    simulate_network_latency "walnut" 500 10  # 500ms delay for 10 seconds
    sleep 5
    simulate_packet_loss 10 5  # 10% packet loss for 5 seconds
    
    # Wait for all tasks to complete
    wait
    
    # Analyze results
    echo "Analyzing chaos test results..."
    for i in {1..5}; do
        if grep -q "completed" /tmp/chaos_test_$i.log; then
            echo "  Task $i: ✅ Completed successfully despite network chaos"
        else
            echo "  Task $i: ❌ Failed during network chaos"
        fi
    done
}
# Note: This script requires root privileges for network simulation
if [[ $EUID -eq 0 ]]; then
    test_cli_resilience_during_network_chaos
else
    echo "⚠️  Chaos testing requires root privileges for network simulation"
    echo "Run with: sudo $0"
fi
📊 Test Automation & CI/CD Integration
Automated Test Pipeline
# File: .github/workflows/ccli-tests.yml
name: CCLI Integration Tests
on:
  push:
    branches: [ feature/gemini-cli-integration ]
  pull_request:
    branches: [ master ]
jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: 3.11
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-asyncio pytest-mock
      - name: Run unit tests
        run: pytest src/tests/ -v --cov=src --cov-report=xml
      - name: Upload coverage
        uses: codecov/codecov-action@v1
  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    steps:
      - uses: actions/checkout@v2
      - name: Start test environment
        run: docker-compose -f docker-compose.test.yml up -d
      - name: Wait for services
        run: sleep 30
      - name: Run integration tests
        run: pytest src/tests/integration/ -v
      - name: Cleanup
        run: docker-compose -f docker-compose.test.yml down
  security-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: Run security scan
        run: |
          pip install bandit safety
          bandit -r src/
          safety check
Test Reporting Dashboard
# File: scripts/generate_test_report.py
import json
import datetime
from pathlib import Path
class TestReportGenerator:
    def __init__(self):
        self.results = {
            'timestamp': datetime.datetime.now().isoformat(),
            'test_suites': {}
        }
    
    def add_test_suite(self, suite_name: str, results: dict):
        """Add test suite results to the report"""
        self.results['test_suites'][suite_name] = {
            'total_tests': results.get('total', 0),
            'passed': results.get('passed', 0),
            'failed': results.get('failed', 0),
            'success_rate': results.get('passed', 0) / max(results.get('total', 1), 1),
            'duration': results.get('duration', 0),
            'details': results.get('details', [])
        }
    
    def generate_html_report(self, output_path: str):
        """Generate HTML test report"""
        html_content = self._build_html_report()
        
        with open(output_path, 'w') as f:
            f.write(html_content)
    
    def _build_html_report(self) -> str:
        """Build HTML report content"""
        # HTML report template with test results
        return f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>CCLI Test Report</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 40px; }}
                .success {{ color: green; }}
                .failure {{ color: red; }}
                .suite {{ margin: 20px 0; padding: 15px; border: 1px solid #ddd; }}
            </style>
        </head>
        <body>
            <h1>🧪 CCLI Test Report</h1>
            <p>Generated: {self.results['timestamp']}</p>
            {self._generate_suite_summaries()}
        </body>
        </html>
        """
    
    def _generate_suite_summaries(self) -> str:
        """Generate HTML for test suite summaries"""
        html = ""
        for suite_name, results in self.results['test_suites'].items():
            status_class = "success" if results['success_rate'] >= 0.95 else "failure"
            html += f"""
            <div class="suite">
                <h2>{suite_name}</h2>
                <p class="{status_class}">
                    {results['passed']}/{results['total']} tests passed 
                    ({results['success_rate']*100:.1f}%)
                </p>
                <p>Duration: {results['duration']:.2f}s</p>
            </div>
            """
        return html
🎯 Success Criteria & Exit Conditions
Test Completion Criteria
- Unit Tests: ≥90% code coverage achieved
- Integration Tests: All CLI agent workflows tested successfully
- Performance Tests: CLI agents within 150% of Ollama baseline
- Security Tests: All SSH connections validated and secure
- Load Tests: System stable under 10x normal load
- Chaos Tests: System recovers gracefully from network issues
Go/No-Go Decision Points
- After Unit Testing: Proceed if >90% coverage and all tests pass
- After Integration Testing: Proceed if CLI agents work with existing system
- After Performance Testing: Proceed if performance within acceptable limits
- After Security Testing: Proceed if no security vulnerabilities found
- After Load Testing: Proceed if system handles production-like load
Rollback Triggers
- Any test category has <80% success rate
- CLI agent performance >200% of Ollama baseline
- Security vulnerabilities discovered
- System instability under normal load
- Negative impact on existing Ollama agents
This comprehensive testing strategy ensures the CLI agent integration is thoroughly validated before production deployment while maintaining the stability and performance of the existing Hive system.