- Migrated from HIVE branding to WHOOSH across all components - Enhanced backend API with new services: AI models, BZZZ integration, templates, members - Added comprehensive testing suite with security, performance, and integration tests - Improved frontend with new components for project setup, AI models, and team management - Updated MCP server implementation with WHOOSH-specific tools and resources - Enhanced deployment configurations with production-ready Docker setups - Added comprehensive documentation and setup guides - Implemented age encryption service and UCXL integration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
592 lines
22 KiB
Python
592 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
UCXL Integration Service for WHOOSH
|
|
Connects WHOOSH to the existing UCXL addressing system for distributed artifact storage and retrieval
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import aiohttp
|
|
import hashlib
|
|
from typing import Dict, List, Optional, Any, Union
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
import urllib.parse
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class UCXLProtocol(Enum):
|
|
"""UCXL protocol types"""
|
|
UCXL = "ucxl"
|
|
UCXL_SECURE = "ucxls"
|
|
|
|
@dataclass
|
|
class UCXLAddress:
|
|
"""UCXL address structure: ucxl://user:password@PROJECT:COMPONENT/path"""
|
|
protocol: UCXLProtocol
|
|
user: Optional[str] = None
|
|
password: Optional[str] = None
|
|
project: Optional[str] = None
|
|
component: Optional[str] = None
|
|
path: Optional[str] = None
|
|
|
|
@classmethod
|
|
def parse(cls, address: str) -> 'UCXLAddress':
|
|
"""Parse UCXL address string into components"""
|
|
if not address.startswith(('ucxl://', 'ucxls://')):
|
|
raise ValueError(f"Invalid UCXL address: {address}")
|
|
|
|
protocol = UCXLProtocol.UCXL if address.startswith('ucxl://') else UCXLProtocol.UCXL_SECURE
|
|
address_part = address[len(f"{protocol.value}://"):]
|
|
|
|
# Parse user:password@PROJECT:COMPONENT/path
|
|
user = password = project = component = path = None
|
|
|
|
# Check for user credentials
|
|
if '@' in address_part:
|
|
credentials, remainder = address_part.split('@', 1)
|
|
if ':' in credentials:
|
|
user, password = credentials.split(':', 1)
|
|
else:
|
|
user = credentials
|
|
else:
|
|
remainder = address_part
|
|
|
|
# Parse PROJECT:COMPONENT/path
|
|
if '/' in remainder:
|
|
project_component, path = remainder.split('/', 1)
|
|
else:
|
|
project_component = remainder
|
|
path = ""
|
|
|
|
if ':' in project_component:
|
|
project, component = project_component.split(':', 1)
|
|
else:
|
|
project = project_component
|
|
|
|
return cls(
|
|
protocol=protocol,
|
|
user=user,
|
|
password=password,
|
|
project=project,
|
|
component=component,
|
|
path=path
|
|
)
|
|
|
|
def to_string(self) -> str:
|
|
"""Convert back to UCXL address string"""
|
|
result = f"{self.protocol.value}://"
|
|
|
|
if self.user:
|
|
result += self.user
|
|
if self.password:
|
|
result += f":{self.password}"
|
|
result += "@"
|
|
|
|
if self.project:
|
|
result += self.project
|
|
if self.component:
|
|
result += f":{self.component}"
|
|
|
|
if self.path:
|
|
result += f"/{self.path}"
|
|
|
|
return result
|
|
|
|
@dataclass
|
|
class UCXLArtifact:
|
|
"""UCXL artifact metadata"""
|
|
address: str
|
|
content_hash: str
|
|
content_type: str
|
|
size: int
|
|
created_at: datetime
|
|
modified_at: datetime
|
|
metadata: Dict[str, Any]
|
|
|
|
class UCXLIntegrationService:
|
|
"""
|
|
Service for integrating WHOOSH with the existing UCXL addressing system.
|
|
Provides distributed artifact storage, retrieval, and temporal navigation.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
self.config = config or self._default_config()
|
|
self.ucxl_browser_endpoints = self.config.get("ucxl_browser_endpoints", [])
|
|
self.bzzz_gateway_endpoints = self.config.get("bzzz_gateway_endpoints", [])
|
|
self.session: Optional[aiohttp.ClientSession] = None
|
|
self.artifact_cache: Dict[str, UCXLArtifact] = {}
|
|
self.dht_nodes: List[str] = []
|
|
|
|
def _default_config(self) -> Dict[str, Any]:
|
|
"""Default UCXL integration configuration"""
|
|
return {
|
|
"ucxl_browser_endpoints": [
|
|
"http://192.168.1.27:8080", # walnut (if UCXL browser running)
|
|
"http://192.168.1.72:8080", # acacia
|
|
"http://192.168.1.113:8080", # ironwood
|
|
],
|
|
"bzzz_gateway_endpoints": [
|
|
"http://192.168.1.27:8080", # BZZZ gateways for DHT access
|
|
"http://192.168.1.72:8080",
|
|
"http://192.168.1.113:8080",
|
|
],
|
|
"default_project": "WHOOSH",
|
|
"cache_size": 1000,
|
|
"cache_ttl": 3600, # 1 hour
|
|
"timeout": 30,
|
|
}
|
|
|
|
async def initialize(self) -> bool:
|
|
"""Initialize UCXL integration service"""
|
|
try:
|
|
logger.info("🔗 Initializing UCXL Integration Service")
|
|
|
|
# Create HTTP session
|
|
self.session = aiohttp.ClientSession(
|
|
timeout=aiohttp.ClientTimeout(total=self.config["timeout"])
|
|
)
|
|
|
|
# Discover DHT nodes through BZZZ gateways
|
|
await self._discover_dht_nodes()
|
|
|
|
# Test connectivity to UCXL systems
|
|
await self._test_ucxl_connectivity()
|
|
|
|
logger.info(f"✅ UCXL Integration initialized with {len(self.dht_nodes)} DHT nodes")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to initialize UCXL integration: {e}")
|
|
return False
|
|
|
|
async def _discover_dht_nodes(self) -> None:
|
|
"""Discover DHT nodes through BZZZ gateways"""
|
|
discovered_nodes = set()
|
|
|
|
for endpoint in self.bzzz_gateway_endpoints:
|
|
try:
|
|
async with self.session.get(f"{endpoint}/api/dht/nodes") as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
nodes = data.get("nodes", [])
|
|
discovered_nodes.update(nodes)
|
|
logger.debug(f"Discovered {len(nodes)} DHT nodes from {endpoint}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to discover DHT nodes from {endpoint}: {e}")
|
|
|
|
self.dht_nodes = list(discovered_nodes)
|
|
logger.info(f"🔍 Discovered {len(self.dht_nodes)} DHT nodes")
|
|
|
|
async def _test_ucxl_connectivity(self) -> None:
|
|
"""Test connectivity to UCXL browser endpoints"""
|
|
working_endpoints = []
|
|
|
|
for endpoint in self.ucxl_browser_endpoints:
|
|
try:
|
|
async with self.session.get(f"{endpoint}/health") as response:
|
|
if response.status == 200:
|
|
working_endpoints.append(endpoint)
|
|
logger.debug(f"✅ UCXL endpoint online: {endpoint}")
|
|
else:
|
|
logger.warning(f"⚠️ UCXL endpoint unhealthy: {endpoint} (HTTP {response.status})")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ UCXL endpoint unreachable: {endpoint} ({e})")
|
|
|
|
# Update working endpoints
|
|
self.ucxl_browser_endpoints = working_endpoints
|
|
logger.info(f"🔗 {len(working_endpoints)} UCXL endpoints available")
|
|
|
|
async def store_artifact(
|
|
self,
|
|
project: str,
|
|
component: str,
|
|
path: str,
|
|
content: Union[str, bytes],
|
|
content_type: str = "text/plain",
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Optional[str]:
|
|
"""
|
|
Store an artifact in the distributed UCXL system
|
|
Returns the UCXL address if successful
|
|
"""
|
|
try:
|
|
# Create UCXL address
|
|
ucxl_addr = UCXLAddress(
|
|
protocol=UCXLProtocol.UCXL,
|
|
project=project,
|
|
component=component,
|
|
path=path
|
|
)
|
|
address = ucxl_addr.to_string()
|
|
|
|
# Prepare content
|
|
if isinstance(content, str):
|
|
content_bytes = content.encode('utf-8')
|
|
else:
|
|
content_bytes = content
|
|
|
|
# Generate content hash
|
|
content_hash = hashlib.sha256(content_bytes).hexdigest()
|
|
|
|
# Prepare artifact data
|
|
artifact_data = {
|
|
"address": address,
|
|
"content": content_bytes.decode('utf-8') if content_type.startswith('text/') else content_bytes.hex(),
|
|
"content_type": content_type,
|
|
"content_hash": content_hash,
|
|
"size": len(content_bytes),
|
|
"metadata": metadata or {},
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Try to store through BZZZ gateways (DHT)
|
|
for endpoint in self.bzzz_gateway_endpoints:
|
|
try:
|
|
async with self.session.post(
|
|
f"{endpoint}/api/dht/store",
|
|
json=artifact_data
|
|
) as response:
|
|
if response.status == 201:
|
|
result = await response.json()
|
|
logger.info(f"📦 Stored artifact: {address}")
|
|
|
|
# Cache the artifact
|
|
artifact = UCXLArtifact(
|
|
address=address,
|
|
content_hash=content_hash,
|
|
content_type=content_type,
|
|
size=len(content_bytes),
|
|
created_at=datetime.utcnow(),
|
|
modified_at=datetime.utcnow(),
|
|
metadata=metadata or {}
|
|
)
|
|
self.artifact_cache[address] = artifact
|
|
|
|
return address
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to store via {endpoint}: {e}")
|
|
continue
|
|
|
|
logger.error("❌ Failed to store artifact in any DHT node")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error storing artifact: {e}")
|
|
return None
|
|
|
|
async def retrieve_artifact(self, address: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieve an artifact from the distributed UCXL system
|
|
Returns artifact data if found
|
|
"""
|
|
try:
|
|
# Check cache first
|
|
if address in self.artifact_cache:
|
|
cached = self.artifact_cache[address]
|
|
logger.debug(f"🎯 Cache hit for {address}")
|
|
|
|
# Return cached metadata (actual content retrieval may still need DHT)
|
|
return {
|
|
"address": address,
|
|
"content_hash": cached.content_hash,
|
|
"content_type": cached.content_type,
|
|
"size": cached.size,
|
|
"created_at": cached.created_at.isoformat(),
|
|
"modified_at": cached.modified_at.isoformat(),
|
|
"metadata": cached.metadata,
|
|
"cached": True
|
|
}
|
|
|
|
# Parse UCXL address
|
|
ucxl_addr = UCXLAddress.parse(address)
|
|
|
|
# Try to retrieve through BZZZ gateways (DHT)
|
|
for endpoint in self.bzzz_gateway_endpoints:
|
|
try:
|
|
# Use address hash as DHT key
|
|
key = hashlib.sha256(address.encode()).hexdigest()
|
|
|
|
async with self.session.get(
|
|
f"{endpoint}/api/dht/retrieve/{key}"
|
|
) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
logger.info(f"📦 Retrieved artifact: {address}")
|
|
|
|
# Cache the result
|
|
if data.get("content_hash"):
|
|
artifact = UCXLArtifact(
|
|
address=address,
|
|
content_hash=data["content_hash"],
|
|
content_type=data.get("content_type", "application/octet-stream"),
|
|
size=data.get("size", 0),
|
|
created_at=datetime.fromisoformat(data.get("created_at", datetime.utcnow().isoformat())),
|
|
modified_at=datetime.fromisoformat(data.get("modified_at", datetime.utcnow().isoformat())),
|
|
metadata=data.get("metadata", {})
|
|
)
|
|
self.artifact_cache[address] = artifact
|
|
|
|
return data
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to retrieve from {endpoint}: {e}")
|
|
continue
|
|
|
|
logger.warning(f"⚠️ Artifact not found: {address}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error retrieving artifact: {e}")
|
|
return None
|
|
|
|
async def list_artifacts(
|
|
self,
|
|
project: Optional[str] = None,
|
|
component: Optional[str] = None,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List artifacts from the distributed UCXL system
|
|
Optionally filter by project and/or component
|
|
"""
|
|
try:
|
|
# Try to list through BZZZ gateways
|
|
all_artifacts = []
|
|
|
|
for endpoint in self.bzzz_gateway_endpoints:
|
|
try:
|
|
params = {"limit": limit}
|
|
if project:
|
|
params["project"] = project
|
|
if component:
|
|
params["component"] = component
|
|
|
|
async with self.session.get(
|
|
f"{endpoint}/api/dht/list",
|
|
params=params
|
|
) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
artifacts = data.get("artifacts", [])
|
|
all_artifacts.extend(artifacts)
|
|
logger.debug(f"Listed {len(artifacts)} artifacts from {endpoint}")
|
|
break # Use first successful response
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to list from {endpoint}: {e}")
|
|
continue
|
|
|
|
# Deduplicate by address
|
|
seen_addresses = set()
|
|
unique_artifacts = []
|
|
for artifact in all_artifacts:
|
|
addr = artifact.get("address")
|
|
if addr and addr not in seen_addresses:
|
|
seen_addresses.add(addr)
|
|
unique_artifacts.append(artifact)
|
|
|
|
logger.info(f"📋 Listed {len(unique_artifacts)} unique artifacts")
|
|
return unique_artifacts[:limit]
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing artifacts: {e}")
|
|
return []
|
|
|
|
async def resolve_temporal_address(
|
|
self,
|
|
address: str,
|
|
timestamp: Optional[datetime] = None
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Resolve a UCXL address at a specific point in time
|
|
Uses temporal navigation capabilities
|
|
"""
|
|
try:
|
|
# Parse address
|
|
ucxl_addr = UCXLAddress.parse(address)
|
|
|
|
# Try temporal resolution through UCXL browser endpoints
|
|
for endpoint in self.ucxl_browser_endpoints:
|
|
try:
|
|
params = {"address": address}
|
|
if timestamp:
|
|
params["timestamp"] = timestamp.isoformat()
|
|
|
|
async with self.session.get(
|
|
f"{endpoint}/api/temporal/resolve",
|
|
params=params
|
|
) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
logger.info(f"🕐 Temporal resolution: {address} @ {timestamp}")
|
|
return data
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Temporal resolution failed via {endpoint}: {e}")
|
|
continue
|
|
|
|
# Fallback to current version
|
|
logger.info(f"🔄 Falling back to current version: {address}")
|
|
return await self.retrieve_artifact(address)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in temporal resolution: {e}")
|
|
return None
|
|
|
|
async def create_project_context(
|
|
self,
|
|
project_name: str,
|
|
description: str,
|
|
components: List[str],
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Optional[str]:
|
|
"""
|
|
Create a project context in the UCXL system
|
|
Returns the project UCXL address
|
|
"""
|
|
try:
|
|
# Create project metadata
|
|
project_data = {
|
|
"name": project_name,
|
|
"description": description,
|
|
"components": components,
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
"metadata": metadata or {}
|
|
}
|
|
|
|
# Store as JSON in UCXL system
|
|
address = await self.store_artifact(
|
|
project=project_name,
|
|
component="PROJECT_META",
|
|
path="project.json",
|
|
content=json.dumps(project_data, indent=2),
|
|
content_type="application/json",
|
|
metadata={
|
|
"type": "project_context",
|
|
"version": "1.0",
|
|
"created_by": "WHOOSH"
|
|
}
|
|
)
|
|
|
|
if address:
|
|
logger.info(f"📁 Created project context: {project_name} -> {address}")
|
|
|
|
return address
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error creating project context: {e}")
|
|
return None
|
|
|
|
async def link_artifacts(
|
|
self,
|
|
source_address: str,
|
|
target_address: str,
|
|
relationship: str,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> bool:
|
|
"""
|
|
Create a relationship link between two UCXL artifacts
|
|
"""
|
|
try:
|
|
# Create link metadata
|
|
link_data = {
|
|
"source": source_address,
|
|
"target": target_address,
|
|
"relationship": relationship,
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
"metadata": metadata or {}
|
|
}
|
|
|
|
# Generate link address
|
|
link_hash = hashlib.sha256(f"{source_address}:{target_address}:{relationship}".encode()).hexdigest()[:16]
|
|
|
|
# Store link in UCXL system
|
|
link_address = await self.store_artifact(
|
|
project="WHOOSH",
|
|
component="LINKS",
|
|
path=f"link-{link_hash}.json",
|
|
content=json.dumps(link_data, indent=2),
|
|
content_type="application/json",
|
|
metadata={
|
|
"type": "artifact_link",
|
|
"source": source_address,
|
|
"target": target_address,
|
|
"relationship": relationship
|
|
}
|
|
)
|
|
|
|
if link_address:
|
|
logger.info(f"🔗 Created artifact link: {source_address} --{relationship}--> {target_address}")
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error linking artifacts: {e}")
|
|
return False
|
|
|
|
async def get_artifact_links(self, address: str) -> List[Dict[str, Any]]:
|
|
"""Get all links involving a specific artifact"""
|
|
try:
|
|
# Search for links in the LINKS component
|
|
all_links = await self.list_artifacts(project="WHOOSH", component="LINKS")
|
|
|
|
# Filter links involving this address
|
|
relevant_links = []
|
|
for link_artifact in all_links:
|
|
link_addr = link_artifact.get("address")
|
|
if link_addr:
|
|
# Retrieve link data
|
|
link_data = await self.retrieve_artifact(link_addr)
|
|
if link_data and (
|
|
link_data.get("source") == address or
|
|
link_data.get("target") == address
|
|
):
|
|
relevant_links.append(link_data)
|
|
|
|
logger.info(f"🔗 Found {len(relevant_links)} links for {address}")
|
|
return relevant_links
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting artifact links: {e}")
|
|
return []
|
|
|
|
async def get_system_status(self) -> Dict[str, Any]:
|
|
"""Get UCXL integration system status"""
|
|
try:
|
|
return {
|
|
"ucxl_endpoints": len(self.ucxl_browser_endpoints),
|
|
"dht_nodes": len(self.dht_nodes),
|
|
"bzzz_gateways": len(self.bzzz_gateway_endpoints),
|
|
"cached_artifacts": len(self.artifact_cache),
|
|
"cache_limit": self.config["cache_size"],
|
|
"system_health": min(1.0, len(self.dht_nodes) / max(1, len(self.bzzz_gateway_endpoints))),
|
|
"last_update": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting system status: {e}")
|
|
return {
|
|
"error": str(e),
|
|
"system_health": 0.0,
|
|
"last_update": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
async def cleanup(self) -> None:
|
|
"""Cleanup UCXL integration resources"""
|
|
try:
|
|
if self.session:
|
|
await self.session.close()
|
|
logger.info("🧹 UCXL Integration Service cleanup completed")
|
|
except Exception as e:
|
|
logger.error(f"❌ Error during cleanup: {e}")
|
|
|
|
# Global service instance
|
|
ucxl_service = UCXLIntegrationService() |