""" Test suite for Embedding System functionality. Tests covering: - Embedding generation and caching - Vector database operations - Semantic and hybrid search - Performance characteristics - Async compatibility """ import pytest import tempfile import shutil import numpy as np from pathlib import Path import time import threading import concurrent.futures import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from hcfs.core.context_db import Context from hcfs.core.context_db_optimized_fixed import OptimizedContextDatabase from hcfs.core.embeddings_optimized import OptimizedEmbeddingManager, VectorSearchResult class TestEmbeddingGeneration: """Test embedding generation functionality.""" @pytest.fixture def temp_embedding_system(self): """Create temporary embedding system.""" temp_dir = Path(tempfile.mkdtemp()) db_path = temp_dir / "test_context.db" vector_db_path = temp_dir / "test_vectors.db" context_db = OptimizedContextDatabase(str(db_path)) embedding_manager = OptimizedEmbeddingManager( context_db, model_name="mini", vector_db_path=str(vector_db_path), cache_size=100, batch_size=4 ) yield context_db, embedding_manager shutil.rmtree(temp_dir) def test_single_embedding_generation(self, temp_embedding_system): """Test generating a single embedding.""" _, embedding_manager = temp_embedding_system text = "Test embedding generation" embedding = embedding_manager.generate_embedding(text) assert isinstance(embedding, np.ndarray) assert embedding.shape == (384,) # MiniLM dimension assert not np.isnan(embedding).any() assert not np.isinf(embedding).any() def test_embedding_caching(self, temp_embedding_system): """Test embedding caching functionality.""" _, embedding_manager = temp_embedding_system text = "Test caching functionality" # First generation (cache miss) start_time = time.time() embedding1 = embedding_manager.generate_embedding(text) first_time = time.time() - start_time # Second generation (cache hit) start_time = time.time() embedding2 = embedding_manager.generate_embedding(text) second_time = time.time() - start_time # Verify embeddings are identical assert np.allclose(embedding1, embedding2) # Cache should be significantly faster assert second_time < first_time * 0.1 # At least 10x faster def test_batch_embedding_generation(self, temp_embedding_system): """Test batch embedding generation.""" _, embedding_manager = temp_embedding_system texts = [ "First test text", "Second test text", "Third test text", "Fourth test text" ] embeddings = embedding_manager.generate_embeddings_batch(texts) assert len(embeddings) == len(texts) assert all(isinstance(emb, np.ndarray) for emb in embeddings) assert all(emb.shape == (384,) for emb in embeddings) # Verify embeddings are different for different texts assert not np.allclose(embeddings[0], embeddings[1]) def test_batch_vs_individual_performance(self, temp_embedding_system): """Test batch processing performance.""" _, embedding_manager = temp_embedding_system texts = [f"Performance test text {i}" for i in range(8)] # Individual processing start_time = time.time() individual_embeddings = [ embedding_manager.generate_embedding(text, use_cache=False) for text in texts ] individual_time = time.time() - start_time # Clear cache to ensure fair comparison embedding_manager.vector_cache.clear() # Batch processing start_time = time.time() batch_embeddings = embedding_manager.generate_embeddings_batch(texts, use_cache=False) batch_time = time.time() - start_time # Verify results are equivalent assert len(individual_embeddings) == len(batch_embeddings) for ind, batch in zip(individual_embeddings, batch_embeddings): assert np.allclose(ind, batch, rtol=1e-5) # Batch should be faster speedup = individual_time / batch_time assert speedup > 2.0 # At least 2x speedup expected class TestVectorDatabase: """Test vector database operations.""" @pytest.fixture def temp_vector_system(self): """Create temporary vector database system.""" temp_dir = Path(tempfile.mkdtemp()) db_path = temp_dir / "test_context.db" vector_db_path = temp_dir / "test_vectors.db" context_db = OptimizedContextDatabase(str(db_path)) embedding_manager = OptimizedEmbeddingManager( context_db, model_name="mini", vector_db_path=str(vector_db_path), cache_size=50 ) yield context_db, embedding_manager shutil.rmtree(temp_dir) def test_embedding_storage_retrieval(self, temp_vector_system): """Test storing and retrieving embeddings.""" _, embedding_manager = temp_vector_system # Create test embedding test_embedding = np.random.rand(384).astype(np.float32) context_id = 123 # Store embedding embedding_manager.store_embedding(context_id, test_embedding) # Retrieve embedding retrieved = embedding_manager.get_embedding(context_id) assert retrieved is not None assert np.allclose(test_embedding, retrieved, rtol=1e-6) def test_batch_embedding_storage(self, temp_vector_system): """Test batch embedding storage.""" _, embedding_manager = temp_vector_system # Create test embeddings embeddings_data = [ (i, np.random.rand(384).astype(np.float32)) for i in range(10, 20) ] # Store batch embedding_manager.store_embeddings_batch(embeddings_data) # Verify all were stored for context_id, original_embedding in embeddings_data: retrieved = embedding_manager.get_embedding(context_id) assert retrieved is not None assert np.allclose(original_embedding, retrieved, rtol=1e-6) def test_vector_similarity_search(self, temp_vector_system): """Test vector similarity search.""" _, embedding_manager = temp_vector_system # Create and store reference embeddings reference_embedding = np.random.rand(384).astype(np.float32) similar_embedding = reference_embedding + np.random.rand(384).astype(np.float32) * 0.1 different_embedding = np.random.rand(384).astype(np.float32) embedding_manager.store_embedding(1, reference_embedding) embedding_manager.store_embedding(2, similar_embedding) embedding_manager.store_embedding(3, different_embedding) # Search for similar embeddings query_embedding = reference_embedding + np.random.rand(384).astype(np.float32) * 0.05 results = embedding_manager.vector_similarity_search(query_embedding, top_k=3) assert len(results) <= 3 assert all(isinstance(result, VectorSearchResult) for result in results) # Results should be sorted by similarity (highest first) scores = [result.score for result in results] assert scores == sorted(scores, reverse=True) # Reference embedding should be most similar assert results[0].context_id == 1 def test_embeddings_index_building(self, temp_vector_system): """Test building embeddings index.""" context_db, embedding_manager = temp_vector_system # Create test contexts contexts = [ Context(None, f"/test/{i}", f"Test content {i}", f"Summary {i}", "user", 1) for i in range(5) ] context_ids = [] for context in contexts: context_id = context_db.store_context(context) context_ids.append(context_id) # Build embeddings index index_stats = embedding_manager.build_embeddings_index(batch_size=2) assert index_stats["total_processed"] == 5 assert index_stats["embeddings_per_second"] > 0 # Verify embeddings were created for context_id in context_ids: embedding = embedding_manager.get_embedding(context_id) assert embedding is not None assert embedding.shape == (384,) class TestSemanticSearch: """Test semantic search functionality.""" @pytest.fixture def temp_search_system(self): """Create search system with test data.""" temp_dir = Path(tempfile.mkdtemp()) db_path = temp_dir / "search_test.db" vector_db_path = temp_dir / "search_vectors.db" context_db = OptimizedContextDatabase(str(db_path)) embedding_manager = OptimizedEmbeddingManager( context_db, model_name="mini", vector_db_path=str(vector_db_path) ) # Create test contexts test_contexts = [ Context(None, "/ml/algorithms", "Machine learning algorithms and models", "ML summary", "user1", 1), Context(None, "/ml/neural", "Neural networks and deep learning", "NN summary", "user1", 1), Context(None, "/web/api", "RESTful API development", "API summary", "user2", 1), Context(None, "/web/frontend", "Frontend web development", "Frontend summary", "user2", 1), Context(None, "/db/sql", "SQL database queries", "SQL summary", "user3", 1) ] # Store contexts and build embeddings for context in test_contexts: context_db.store_context(context) embedding_manager.build_embeddings_index() yield context_db, embedding_manager shutil.rmtree(temp_dir) def test_semantic_search_accuracy(self, temp_search_system): """Test semantic search accuracy.""" _, embedding_manager = temp_search_system # Search for ML-related content results = embedding_manager.semantic_search_optimized( "machine learning models", top_k=3, include_contexts=True ) assert len(results) > 0 assert all(isinstance(result, VectorSearchResult) for result in results) assert all(result.context is not None for result in results) # Top results should be ML-related top_result = results[0] assert "/ml/" in top_result.context.path assert top_result.score > 0.3 # Reasonable similarity threshold def test_semantic_search_with_path_filter(self, temp_search_system): """Test semantic search with path filtering.""" _, embedding_manager = temp_search_system # Search only in web-related paths results = embedding_manager.semantic_search_optimized( "development", path_prefix="/web", top_k=5, include_contexts=True ) assert len(results) > 0 # All results should be from /web paths for result in results: assert result.context.path.startswith("/web") def test_hybrid_search_functionality(self, temp_search_system): """Test hybrid search combining semantic and BM25.""" _, embedding_manager = temp_search_system results = embedding_manager.hybrid_search_optimized( "neural network algorithms", top_k=3, semantic_weight=0.7 ) assert len(results) > 0 assert all(isinstance(result, VectorSearchResult) for result in results) # Check that metadata includes both scores for result in results: if result.metadata: assert "semantic_score" in result.metadata assert "bm25_score" in result.metadata assert "semantic_weight" in result.metadata def test_search_performance(self, temp_search_system): """Test search performance characteristics.""" _, embedding_manager = temp_search_system query = "database optimization" # Time semantic search start_time = time.time() semantic_results = embedding_manager.semantic_search_optimized(query, top_k=5) semantic_time = time.time() - start_time # Time hybrid search start_time = time.time() hybrid_results = embedding_manager.hybrid_search_optimized(query, top_k=5) hybrid_time = time.time() - start_time assert semantic_time < 1.0 # Should be under 1 second assert hybrid_time < 2.0 # Hybrid search can be slightly slower assert len(semantic_results) > 0 assert len(hybrid_results) > 0 class TestConcurrentOperations: """Test concurrent embedding operations.""" @pytest.fixture def temp_concurrent_system(self): """Create system for concurrent testing.""" temp_dir = Path(tempfile.mkdtemp()) db_path = temp_dir / "concurrent_test.db" vector_db_path = temp_dir / "concurrent_vectors.db" context_db = OptimizedContextDatabase(str(db_path)) embedding_manager = OptimizedEmbeddingManager( context_db, model_name="mini", vector_db_path=str(vector_db_path), cache_size=100 ) yield context_db, embedding_manager shutil.rmtree(temp_dir) def test_concurrent_embedding_generation(self, temp_concurrent_system): """Test concurrent embedding generation.""" _, embedding_manager = temp_concurrent_system def generate_embeddings(worker_id): results = [] for i in range(3): text = f"Worker {worker_id} text {i}" embedding = embedding_manager.generate_embedding(text) results.append((text, embedding)) return results # Run concurrent workers with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(generate_embeddings, i) for i in range(3)] all_results = [future.result() for future in futures] assert len(all_results) == 3 assert all(len(worker_results) == 3 for worker_results in all_results) # Verify all embeddings are valid for worker_results in all_results: for text, embedding in worker_results: assert isinstance(embedding, np.ndarray) assert embedding.shape == (384,) def test_concurrent_vector_operations(self, temp_concurrent_system): """Test concurrent vector database operations.""" _, embedding_manager = temp_concurrent_system def vector_operations(worker_id): results = [] base_id = worker_id * 100 # Store embeddings for i in range(5): context_id = base_id + i embedding = np.random.rand(384).astype(np.float32) embedding_manager.store_embedding(context_id, embedding) results.append((context_id, embedding)) # Retrieve embeddings retrieved = [] for context_id, original in results: retrieved_embedding = embedding_manager.get_embedding(context_id) retrieved.append((context_id, retrieved_embedding)) return results, retrieved # Run concurrent operations with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(vector_operations, i) for i in range(3)] all_results = [future.result() for future in futures] # Verify all operations completed successfully for stored, retrieved in all_results: assert len(stored) == 5 assert len(retrieved) == 5 for (stored_id, stored_emb), (retrieved_id, retrieved_emb) in zip(stored, retrieved): assert stored_id == retrieved_id assert np.allclose(stored_emb, retrieved_emb, rtol=1e-6) def test_concurrent_search_operations(self, temp_concurrent_system): """Test concurrent search operations.""" context_db, embedding_manager = temp_concurrent_system # Set up test data contexts = [ Context(None, f"/concurrent/{i}", f"Concurrent test content {i}", f"Summary {i}", "user", 1) for i in range(10) ] for context in contexts: context_db.store_context(context) embedding_manager.build_embeddings_index() def search_worker(worker_id): results = [] queries = [f"concurrent test {worker_id}", f"content {worker_id}", f"summary {worker_id}"] for query in queries: search_results = embedding_manager.semantic_search_optimized(query, top_k=3) results.append((query, len(search_results))) return results # Run concurrent searches with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(search_worker, i) for i in range(4)] all_results = [future.result() for future in futures] # Verify all searches completed for worker_results in all_results: assert len(worker_results) == 3 for query, result_count in worker_results: assert result_count >= 0 # Should have some results class TestEmbeddingStatistics: """Test embedding system statistics and monitoring.""" @pytest.fixture def temp_stats_system(self): """Create system for statistics testing.""" temp_dir = Path(tempfile.mkdtemp()) db_path = temp_dir / "stats_test.db" vector_db_path = temp_dir / "stats_vectors.db" context_db = OptimizedContextDatabase(str(db_path)) embedding_manager = OptimizedEmbeddingManager( context_db, model_name="mini", vector_db_path=str(vector_db_path) ) yield context_db, embedding_manager shutil.rmtree(temp_dir) def test_statistics_collection(self, temp_stats_system): """Test statistics collection.""" context_db, embedding_manager = temp_stats_system # Create some test data contexts = [ Context(None, f"/stats/{i}", f"Stats test {i}", f"Summary {i}", "user", 1) for i in range(5) ] for context in contexts: context_db.store_context(context) embedding_manager.build_embeddings_index() # Get statistics stats = embedding_manager.get_statistics() assert "database_stats" in stats assert "cache_stats" in stats assert "current_model" in stats db_stats = stats["database_stats"] assert db_stats["total_embeddings"] == 5 assert db_stats["unique_models"] >= 1 assert db_stats["average_dimension"] == 384 cache_stats = stats["cache_stats"] assert "size" in cache_stats assert "max_size" in cache_stats assert "hit_rate" in cache_stats def test_cache_statistics(self, temp_stats_system): """Test cache statistics tracking.""" _, embedding_manager = temp_stats_system # Generate some embeddings to populate cache texts = [f"Cache test {i}" for i in range(10)] for text in texts: embedding_manager.generate_embedding(text) # Access some cached embeddings for text in texts[:5]: embedding_manager.generate_embedding(text) # Cache hits cache_stats = embedding_manager.vector_cache.stats() assert cache_stats["size"] <= cache_stats["max_size"] assert cache_stats["size"] > 0 def test_cleanup_operations(self, temp_stats_system): """Test cleanup operations.""" _, embedding_manager = temp_stats_system # Store some test embeddings for i in range(10): embedding = np.random.rand(384).astype(np.float32) embedding_manager.store_embedding(i, embedding) # Get initial count initial_stats = embedding_manager.get_statistics() initial_count = initial_stats["database_stats"]["total_embeddings"] # Clear cache embedding_manager.vector_cache.clear() # Cache should be empty cache_stats = embedding_manager.vector_cache.stats() assert cache_stats["size"] == 0 # But embeddings should still be in database final_stats = embedding_manager.get_statistics() final_count = final_stats["database_stats"]["total_embeddings"] assert final_count == initial_count def run_embedding_tests(): """Run all embedding tests.""" import subprocess import sys try: # Run pytest on this module result = subprocess.run([ sys.executable, "-m", "pytest", __file__, "-v", "--tb=short" ], capture_output=True, text=True, cwd=Path(__file__).parent.parent) print("EMBEDDING SYSTEM TEST RESULTS") print("=" * 50) print(result.stdout) if result.stderr: print("ERRORS:") print(result.stderr) return result.returncode == 0 except Exception as e: print(f"Failed to run tests: {e}") return False if __name__ == "__main__": success = run_embedding_tests() exit(0 if success else 1)