Files
HCFS/hcfs-python/tests/test_embeddings.py
2025-07-30 09:34:16 +10:00

605 lines
22 KiB
Python

"""
Test suite for Embedding System functionality.
Tests covering:
- Embedding generation and caching
- Vector database operations
- Semantic and hybrid search
- Performance characteristics
- Async compatibility
"""
import pytest
import tempfile
import shutil
import numpy as np
from pathlib import Path
import time
import threading
import concurrent.futures
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from hcfs.core.context_db import Context
from hcfs.core.context_db_optimized_fixed import OptimizedContextDatabase
from hcfs.core.embeddings_optimized import OptimizedEmbeddingManager, VectorSearchResult
class TestEmbeddingGeneration:
"""Test embedding generation functionality."""
@pytest.fixture
def temp_embedding_system(self):
"""Create temporary embedding system."""
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / "test_context.db"
vector_db_path = temp_dir / "test_vectors.db"
context_db = OptimizedContextDatabase(str(db_path))
embedding_manager = OptimizedEmbeddingManager(
context_db,
model_name="mini",
vector_db_path=str(vector_db_path),
cache_size=100,
batch_size=4
)
yield context_db, embedding_manager
shutil.rmtree(temp_dir)
def test_single_embedding_generation(self, temp_embedding_system):
"""Test generating a single embedding."""
_, embedding_manager = temp_embedding_system
text = "Test embedding generation"
embedding = embedding_manager.generate_embedding(text)
assert isinstance(embedding, np.ndarray)
assert embedding.shape == (384,) # MiniLM dimension
assert not np.isnan(embedding).any()
assert not np.isinf(embedding).any()
def test_embedding_caching(self, temp_embedding_system):
"""Test embedding caching functionality."""
_, embedding_manager = temp_embedding_system
text = "Test caching functionality"
# First generation (cache miss)
start_time = time.time()
embedding1 = embedding_manager.generate_embedding(text)
first_time = time.time() - start_time
# Second generation (cache hit)
start_time = time.time()
embedding2 = embedding_manager.generate_embedding(text)
second_time = time.time() - start_time
# Verify embeddings are identical
assert np.allclose(embedding1, embedding2)
# Cache should be significantly faster
assert second_time < first_time * 0.1 # At least 10x faster
def test_batch_embedding_generation(self, temp_embedding_system):
"""Test batch embedding generation."""
_, embedding_manager = temp_embedding_system
texts = [
"First test text",
"Second test text",
"Third test text",
"Fourth test text"
]
embeddings = embedding_manager.generate_embeddings_batch(texts)
assert len(embeddings) == len(texts)
assert all(isinstance(emb, np.ndarray) for emb in embeddings)
assert all(emb.shape == (384,) for emb in embeddings)
# Verify embeddings are different for different texts
assert not np.allclose(embeddings[0], embeddings[1])
def test_batch_vs_individual_performance(self, temp_embedding_system):
"""Test batch processing performance."""
_, embedding_manager = temp_embedding_system
texts = [f"Performance test text {i}" for i in range(8)]
# Individual processing
start_time = time.time()
individual_embeddings = [
embedding_manager.generate_embedding(text, use_cache=False)
for text in texts
]
individual_time = time.time() - start_time
# Clear cache to ensure fair comparison
embedding_manager.vector_cache.clear()
# Batch processing
start_time = time.time()
batch_embeddings = embedding_manager.generate_embeddings_batch(texts, use_cache=False)
batch_time = time.time() - start_time
# Verify results are equivalent
assert len(individual_embeddings) == len(batch_embeddings)
for ind, batch in zip(individual_embeddings, batch_embeddings):
assert np.allclose(ind, batch, rtol=1e-5)
# Batch should be faster
speedup = individual_time / batch_time
assert speedup > 2.0 # At least 2x speedup expected
class TestVectorDatabase:
"""Test vector database operations."""
@pytest.fixture
def temp_vector_system(self):
"""Create temporary vector database system."""
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / "test_context.db"
vector_db_path = temp_dir / "test_vectors.db"
context_db = OptimizedContextDatabase(str(db_path))
embedding_manager = OptimizedEmbeddingManager(
context_db,
model_name="mini",
vector_db_path=str(vector_db_path),
cache_size=50
)
yield context_db, embedding_manager
shutil.rmtree(temp_dir)
def test_embedding_storage_retrieval(self, temp_vector_system):
"""Test storing and retrieving embeddings."""
_, embedding_manager = temp_vector_system
# Create test embedding
test_embedding = np.random.rand(384).astype(np.float32)
context_id = 123
# Store embedding
embedding_manager.store_embedding(context_id, test_embedding)
# Retrieve embedding
retrieved = embedding_manager.get_embedding(context_id)
assert retrieved is not None
assert np.allclose(test_embedding, retrieved, rtol=1e-6)
def test_batch_embedding_storage(self, temp_vector_system):
"""Test batch embedding storage."""
_, embedding_manager = temp_vector_system
# Create test embeddings
embeddings_data = [
(i, np.random.rand(384).astype(np.float32))
for i in range(10, 20)
]
# Store batch
embedding_manager.store_embeddings_batch(embeddings_data)
# Verify all were stored
for context_id, original_embedding in embeddings_data:
retrieved = embedding_manager.get_embedding(context_id)
assert retrieved is not None
assert np.allclose(original_embedding, retrieved, rtol=1e-6)
def test_vector_similarity_search(self, temp_vector_system):
"""Test vector similarity search."""
_, embedding_manager = temp_vector_system
# Create and store reference embeddings
reference_embedding = np.random.rand(384).astype(np.float32)
similar_embedding = reference_embedding + np.random.rand(384).astype(np.float32) * 0.1
different_embedding = np.random.rand(384).astype(np.float32)
embedding_manager.store_embedding(1, reference_embedding)
embedding_manager.store_embedding(2, similar_embedding)
embedding_manager.store_embedding(3, different_embedding)
# Search for similar embeddings
query_embedding = reference_embedding + np.random.rand(384).astype(np.float32) * 0.05
results = embedding_manager.vector_similarity_search(query_embedding, top_k=3)
assert len(results) <= 3
assert all(isinstance(result, VectorSearchResult) for result in results)
# Results should be sorted by similarity (highest first)
scores = [result.score for result in results]
assert scores == sorted(scores, reverse=True)
# Reference embedding should be most similar
assert results[0].context_id == 1
def test_embeddings_index_building(self, temp_vector_system):
"""Test building embeddings index."""
context_db, embedding_manager = temp_vector_system
# Create test contexts
contexts = [
Context(None, f"/test/{i}", f"Test content {i}", f"Summary {i}", "user", 1)
for i in range(5)
]
context_ids = []
for context in contexts:
context_id = context_db.store_context(context)
context_ids.append(context_id)
# Build embeddings index
index_stats = embedding_manager.build_embeddings_index(batch_size=2)
assert index_stats["total_processed"] == 5
assert index_stats["embeddings_per_second"] > 0
# Verify embeddings were created
for context_id in context_ids:
embedding = embedding_manager.get_embedding(context_id)
assert embedding is not None
assert embedding.shape == (384,)
class TestSemanticSearch:
"""Test semantic search functionality."""
@pytest.fixture
def temp_search_system(self):
"""Create search system with test data."""
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / "search_test.db"
vector_db_path = temp_dir / "search_vectors.db"
context_db = OptimizedContextDatabase(str(db_path))
embedding_manager = OptimizedEmbeddingManager(
context_db,
model_name="mini",
vector_db_path=str(vector_db_path)
)
# Create test contexts
test_contexts = [
Context(None, "/ml/algorithms", "Machine learning algorithms and models", "ML summary", "user1", 1),
Context(None, "/ml/neural", "Neural networks and deep learning", "NN summary", "user1", 1),
Context(None, "/web/api", "RESTful API development", "API summary", "user2", 1),
Context(None, "/web/frontend", "Frontend web development", "Frontend summary", "user2", 1),
Context(None, "/db/sql", "SQL database queries", "SQL summary", "user3", 1)
]
# Store contexts and build embeddings
for context in test_contexts:
context_db.store_context(context)
embedding_manager.build_embeddings_index()
yield context_db, embedding_manager
shutil.rmtree(temp_dir)
def test_semantic_search_accuracy(self, temp_search_system):
"""Test semantic search accuracy."""
_, embedding_manager = temp_search_system
# Search for ML-related content
results = embedding_manager.semantic_search_optimized(
"machine learning models", top_k=3, include_contexts=True
)
assert len(results) > 0
assert all(isinstance(result, VectorSearchResult) for result in results)
assert all(result.context is not None for result in results)
# Top results should be ML-related
top_result = results[0]
assert "/ml/" in top_result.context.path
assert top_result.score > 0.3 # Reasonable similarity threshold
def test_semantic_search_with_path_filter(self, temp_search_system):
"""Test semantic search with path filtering."""
_, embedding_manager = temp_search_system
# Search only in web-related paths
results = embedding_manager.semantic_search_optimized(
"development", path_prefix="/web", top_k=5, include_contexts=True
)
assert len(results) > 0
# All results should be from /web paths
for result in results:
assert result.context.path.startswith("/web")
def test_hybrid_search_functionality(self, temp_search_system):
"""Test hybrid search combining semantic and BM25."""
_, embedding_manager = temp_search_system
results = embedding_manager.hybrid_search_optimized(
"neural network algorithms",
top_k=3,
semantic_weight=0.7
)
assert len(results) > 0
assert all(isinstance(result, VectorSearchResult) for result in results)
# Check that metadata includes both scores
for result in results:
if result.metadata:
assert "semantic_score" in result.metadata
assert "bm25_score" in result.metadata
assert "semantic_weight" in result.metadata
def test_search_performance(self, temp_search_system):
"""Test search performance characteristics."""
_, embedding_manager = temp_search_system
query = "database optimization"
# Time semantic search
start_time = time.time()
semantic_results = embedding_manager.semantic_search_optimized(query, top_k=5)
semantic_time = time.time() - start_time
# Time hybrid search
start_time = time.time()
hybrid_results = embedding_manager.hybrid_search_optimized(query, top_k=5)
hybrid_time = time.time() - start_time
assert semantic_time < 1.0 # Should be under 1 second
assert hybrid_time < 2.0 # Hybrid search can be slightly slower
assert len(semantic_results) > 0
assert len(hybrid_results) > 0
class TestConcurrentOperations:
"""Test concurrent embedding operations."""
@pytest.fixture
def temp_concurrent_system(self):
"""Create system for concurrent testing."""
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / "concurrent_test.db"
vector_db_path = temp_dir / "concurrent_vectors.db"
context_db = OptimizedContextDatabase(str(db_path))
embedding_manager = OptimizedEmbeddingManager(
context_db,
model_name="mini",
vector_db_path=str(vector_db_path),
cache_size=100
)
yield context_db, embedding_manager
shutil.rmtree(temp_dir)
def test_concurrent_embedding_generation(self, temp_concurrent_system):
"""Test concurrent embedding generation."""
_, embedding_manager = temp_concurrent_system
def generate_embeddings(worker_id):
results = []
for i in range(3):
text = f"Worker {worker_id} text {i}"
embedding = embedding_manager.generate_embedding(text)
results.append((text, embedding))
return results
# Run concurrent workers
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(generate_embeddings, i) for i in range(3)]
all_results = [future.result() for future in futures]
assert len(all_results) == 3
assert all(len(worker_results) == 3 for worker_results in all_results)
# Verify all embeddings are valid
for worker_results in all_results:
for text, embedding in worker_results:
assert isinstance(embedding, np.ndarray)
assert embedding.shape == (384,)
def test_concurrent_vector_operations(self, temp_concurrent_system):
"""Test concurrent vector database operations."""
_, embedding_manager = temp_concurrent_system
def vector_operations(worker_id):
results = []
base_id = worker_id * 100
# Store embeddings
for i in range(5):
context_id = base_id + i
embedding = np.random.rand(384).astype(np.float32)
embedding_manager.store_embedding(context_id, embedding)
results.append((context_id, embedding))
# Retrieve embeddings
retrieved = []
for context_id, original in results:
retrieved_embedding = embedding_manager.get_embedding(context_id)
retrieved.append((context_id, retrieved_embedding))
return results, retrieved
# Run concurrent operations
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(vector_operations, i) for i in range(3)]
all_results = [future.result() for future in futures]
# Verify all operations completed successfully
for stored, retrieved in all_results:
assert len(stored) == 5
assert len(retrieved) == 5
for (stored_id, stored_emb), (retrieved_id, retrieved_emb) in zip(stored, retrieved):
assert stored_id == retrieved_id
assert np.allclose(stored_emb, retrieved_emb, rtol=1e-6)
def test_concurrent_search_operations(self, temp_concurrent_system):
"""Test concurrent search operations."""
context_db, embedding_manager = temp_concurrent_system
# Set up test data
contexts = [
Context(None, f"/concurrent/{i}", f"Concurrent test content {i}", f"Summary {i}", "user", 1)
for i in range(10)
]
for context in contexts:
context_db.store_context(context)
embedding_manager.build_embeddings_index()
def search_worker(worker_id):
results = []
queries = [f"concurrent test {worker_id}", f"content {worker_id}", f"summary {worker_id}"]
for query in queries:
search_results = embedding_manager.semantic_search_optimized(query, top_k=3)
results.append((query, len(search_results)))
return results
# Run concurrent searches
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(search_worker, i) for i in range(4)]
all_results = [future.result() for future in futures]
# Verify all searches completed
for worker_results in all_results:
assert len(worker_results) == 3
for query, result_count in worker_results:
assert result_count >= 0 # Should have some results
class TestEmbeddingStatistics:
"""Test embedding system statistics and monitoring."""
@pytest.fixture
def temp_stats_system(self):
"""Create system for statistics testing."""
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / "stats_test.db"
vector_db_path = temp_dir / "stats_vectors.db"
context_db = OptimizedContextDatabase(str(db_path))
embedding_manager = OptimizedEmbeddingManager(
context_db,
model_name="mini",
vector_db_path=str(vector_db_path)
)
yield context_db, embedding_manager
shutil.rmtree(temp_dir)
def test_statistics_collection(self, temp_stats_system):
"""Test statistics collection."""
context_db, embedding_manager = temp_stats_system
# Create some test data
contexts = [
Context(None, f"/stats/{i}", f"Stats test {i}", f"Summary {i}", "user", 1)
for i in range(5)
]
for context in contexts:
context_db.store_context(context)
embedding_manager.build_embeddings_index()
# Get statistics
stats = embedding_manager.get_statistics()
assert "database_stats" in stats
assert "cache_stats" in stats
assert "current_model" in stats
db_stats = stats["database_stats"]
assert db_stats["total_embeddings"] == 5
assert db_stats["unique_models"] >= 1
assert db_stats["average_dimension"] == 384
cache_stats = stats["cache_stats"]
assert "size" in cache_stats
assert "max_size" in cache_stats
assert "hit_rate" in cache_stats
def test_cache_statistics(self, temp_stats_system):
"""Test cache statistics tracking."""
_, embedding_manager = temp_stats_system
# Generate some embeddings to populate cache
texts = [f"Cache test {i}" for i in range(10)]
for text in texts:
embedding_manager.generate_embedding(text)
# Access some cached embeddings
for text in texts[:5]:
embedding_manager.generate_embedding(text) # Cache hits
cache_stats = embedding_manager.vector_cache.stats()
assert cache_stats["size"] <= cache_stats["max_size"]
assert cache_stats["size"] > 0
def test_cleanup_operations(self, temp_stats_system):
"""Test cleanup operations."""
_, embedding_manager = temp_stats_system
# Store some test embeddings
for i in range(10):
embedding = np.random.rand(384).astype(np.float32)
embedding_manager.store_embedding(i, embedding)
# Get initial count
initial_stats = embedding_manager.get_statistics()
initial_count = initial_stats["database_stats"]["total_embeddings"]
# Clear cache
embedding_manager.vector_cache.clear()
# Cache should be empty
cache_stats = embedding_manager.vector_cache.stats()
assert cache_stats["size"] == 0
# But embeddings should still be in database
final_stats = embedding_manager.get_statistics()
final_count = final_stats["database_stats"]["total_embeddings"]
assert final_count == initial_count
def run_embedding_tests():
"""Run all embedding tests."""
import subprocess
import sys
try:
# Run pytest on this module
result = subprocess.run([
sys.executable, "-m", "pytest", __file__, "-v", "--tb=short"
], capture_output=True, text=True, cwd=Path(__file__).parent.parent)
print("EMBEDDING SYSTEM TEST RESULTS")
print("=" * 50)
print(result.stdout)
if result.stderr:
print("ERRORS:")
print(result.stderr)
return result.returncode == 0
except Exception as e:
print(f"Failed to run tests: {e}")
return False
if __name__ == "__main__":
success = run_embedding_tests()
exit(0 if success else 1)