- Migrated from HIVE branding to WHOOSH across all components - Enhanced backend API with new services: AI models, BZZZ integration, templates, members - Added comprehensive testing suite with security, performance, and integration tests - Improved frontend with new components for project setup, AI models, and team management - Updated MCP server implementation with WHOOSH-specific tools and resources - Enhanced deployment configurations with production-ready Docker setups - Added comprehensive documentation and setup guides - Implemented age encryption service and UCXL integration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
491 lines
19 KiB
Python
491 lines
19 KiB
Python
"""
|
|
Age Encryption Service for WHOOSH - Secure master key generation and management.
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Tuple, Any
|
|
from datetime import datetime
|
|
import json
|
|
import hashlib
|
|
import secrets
|
|
|
|
class AgeService:
|
|
"""
|
|
Age encryption service for WHOOSH project security.
|
|
Handles master key generation, storage, and encryption operations.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.age_binary = self._find_age_binary()
|
|
self.keys_storage_path = Path("/home/tony/AI/secrets/age_keys")
|
|
self.keys_storage_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _find_age_binary(self) -> str:
|
|
"""Find the age binary on the system."""
|
|
for path in ["/usr/bin/age", "/usr/local/bin/age", "age"]:
|
|
if shutil.which(path if path != "age" else path):
|
|
return path
|
|
raise RuntimeError("Age binary not found. Please install age encryption tool.")
|
|
|
|
def generate_master_key_pair(self, project_id: str, passphrase: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Generate a new Age master key pair for a project.
|
|
|
|
Args:
|
|
project_id: Unique project identifier
|
|
passphrase: Optional passphrase for additional security
|
|
|
|
Returns:
|
|
Dictionary containing key information and storage details
|
|
"""
|
|
try:
|
|
# Generate Age key pair using age-keygen
|
|
result = subprocess.run(
|
|
["age-keygen"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Age key generation failed: {result.stderr}")
|
|
|
|
# Parse the output to extract public and private keys
|
|
output_lines = result.stdout.strip().split('\n')
|
|
|
|
# Find the public key line (starts with "# public key:")
|
|
public_key = None
|
|
private_key = None
|
|
|
|
for i, line in enumerate(output_lines):
|
|
if line.startswith("# public key:"):
|
|
public_key = line.replace("# public key:", "").strip()
|
|
elif line.startswith("AGE-SECRET-KEY-"):
|
|
private_key = line.strip()
|
|
|
|
if not public_key or not private_key:
|
|
raise RuntimeError("Failed to parse Age key generation output")
|
|
|
|
# Generate key metadata
|
|
key_id = hashlib.sha256(public_key.encode()).hexdigest()[:16]
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Create secure storage for private key
|
|
private_key_path = self.keys_storage_path / f"{project_id}_{key_id}.key"
|
|
public_key_path = self.keys_storage_path / f"{project_id}_{key_id}.pub"
|
|
metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json"
|
|
|
|
# Encrypt private key with passphrase if provided
|
|
if passphrase:
|
|
private_key_content = self._encrypt_private_key(private_key, passphrase)
|
|
encrypted = True
|
|
else:
|
|
private_key_content = private_key
|
|
encrypted = False
|
|
|
|
# Store private key securely
|
|
private_key_path.write_text(private_key_content)
|
|
private_key_path.chmod(0o600) # Owner read/write only
|
|
|
|
# Store public key
|
|
public_key_path.write_text(public_key)
|
|
public_key_path.chmod(0o644) # Owner read/write, others read
|
|
|
|
# Create metadata
|
|
metadata = {
|
|
"project_id": project_id,
|
|
"key_id": key_id,
|
|
"public_key": public_key,
|
|
"private_key_path": str(private_key_path),
|
|
"public_key_path": str(public_key_path),
|
|
"encrypted": encrypted,
|
|
"created_at": timestamp,
|
|
"backup_locations": [],
|
|
"recovery_info": {
|
|
"security_questions": [],
|
|
"backup_methods": []
|
|
}
|
|
}
|
|
|
|
# Store metadata
|
|
metadata_path.write_text(json.dumps(metadata, indent=2))
|
|
metadata_path.chmod(0o600)
|
|
|
|
print(f"Age master key pair generated for project {project_id}")
|
|
print(f" Key ID: {key_id}")
|
|
print(f" Public key: {public_key}")
|
|
print(f" Private key stored: {private_key_path}")
|
|
|
|
return {
|
|
"key_id": key_id,
|
|
"public_key": public_key,
|
|
"private_key_stored": True,
|
|
"private_key_path": str(private_key_path),
|
|
"public_key_path": str(public_key_path),
|
|
"encrypted": encrypted,
|
|
"backup_location": str(self.keys_storage_path),
|
|
"created_at": timestamp,
|
|
"metadata": metadata
|
|
}
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Age key generation command failed: {e.stderr}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Age key generation failed: {str(e)}")
|
|
|
|
def _encrypt_private_key(self, private_key: str, passphrase: str) -> str:
|
|
"""
|
|
Encrypt private key with a passphrase using Age itself.
|
|
|
|
Args:
|
|
private_key: The raw private key
|
|
passphrase: Passphrase for encryption
|
|
|
|
Returns:
|
|
Encrypted private key content
|
|
"""
|
|
try:
|
|
# Create temporary files for input and passphrase
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_input:
|
|
temp_input.write(private_key)
|
|
temp_input.flush()
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_passphrase:
|
|
temp_passphrase.write(passphrase)
|
|
temp_passphrase.flush()
|
|
|
|
# Use Age with passphrase file to avoid TTY issues
|
|
env = os.environ.copy()
|
|
env['SHELL'] = '/bin/bash'
|
|
|
|
# Run age with passphrase from stdin
|
|
process = subprocess.Popen([
|
|
self.age_binary,
|
|
"-p", # Use passphrase
|
|
temp_input.name
|
|
], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, text=True, env=env)
|
|
|
|
stdout, stderr = process.communicate(input=passphrase + '\n')
|
|
|
|
# Clean up temp files
|
|
os.unlink(temp_input.name)
|
|
os.unlink(temp_passphrase.name)
|
|
|
|
if process.returncode != 0:
|
|
raise RuntimeError(f"Age encryption failed: {stderr}")
|
|
|
|
return stdout
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Private key encryption failed: {e.stderr}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Private key encryption error: {str(e)}")
|
|
|
|
def decrypt_private_key(self, project_id: str, key_id: str, passphrase: Optional[str] = None) -> str:
|
|
"""
|
|
Decrypt and retrieve a private key.
|
|
|
|
Args:
|
|
project_id: Project identifier
|
|
key_id: Key identifier
|
|
passphrase: Passphrase if key is encrypted
|
|
|
|
Returns:
|
|
Decrypted private key
|
|
"""
|
|
try:
|
|
private_key_path = self.keys_storage_path / f"{project_id}_{key_id}.key"
|
|
metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json"
|
|
|
|
if not private_key_path.exists():
|
|
raise RuntimeError(f"Private key not found for project {project_id}")
|
|
|
|
# Load metadata
|
|
if metadata_path.exists():
|
|
metadata = json.loads(metadata_path.read_text())
|
|
encrypted = metadata.get("encrypted", False)
|
|
else:
|
|
encrypted = False
|
|
|
|
# Read private key content
|
|
private_key_content = private_key_path.read_text()
|
|
|
|
if encrypted:
|
|
if not passphrase:
|
|
raise RuntimeError("Passphrase required for encrypted private key")
|
|
|
|
# Decrypt using Age with proper passphrase handling
|
|
process = subprocess.Popen([
|
|
self.age_binary,
|
|
"-d", # Decrypt
|
|
"-p" # Use passphrase
|
|
], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, text=True)
|
|
|
|
# Send passphrase first, then encrypted content
|
|
input_data = passphrase + '\n' + private_key_content
|
|
stdout, stderr = process.communicate(input=input_data)
|
|
|
|
if process.returncode != 0:
|
|
raise RuntimeError(f"Age decryption failed: {stderr}")
|
|
|
|
return stdout.strip()
|
|
else:
|
|
return private_key_content.strip()
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Private key decryption failed: {e.stderr}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Private key retrieval error: {str(e)}")
|
|
|
|
def encrypt_data(self, data: str, recipients: list[str]) -> str:
|
|
"""
|
|
Encrypt data for multiple recipients using their public keys.
|
|
|
|
Args:
|
|
data: Data to encrypt
|
|
recipients: List of Age public keys
|
|
|
|
Returns:
|
|
Encrypted data
|
|
"""
|
|
try:
|
|
# Build Age command with recipients
|
|
cmd = [self.age_binary]
|
|
for recipient in recipients:
|
|
cmd.extend(["-r", recipient])
|
|
cmd.append("-") # Read from stdin
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
input=data.encode('utf-8'),
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
|
|
# Return base64 encoded encrypted data for safe text handling
|
|
import base64
|
|
return base64.b64encode(result.stdout).decode('ascii')
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Data encryption failed: {e.stderr}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Data encryption error: {str(e)}")
|
|
|
|
def decrypt_data(self, encrypted_data: str, private_key: str) -> str:
|
|
"""
|
|
Decrypt data using a private key.
|
|
|
|
Args:
|
|
encrypted_data: Age-encrypted data
|
|
private_key: Age private key for decryption
|
|
|
|
Returns:
|
|
Decrypted data
|
|
"""
|
|
try:
|
|
import base64
|
|
|
|
# Decode base64 encrypted data
|
|
encrypted_bytes = base64.b64decode(encrypted_data.encode('ascii'))
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_key:
|
|
temp_key.write(private_key)
|
|
temp_key.flush()
|
|
|
|
result = subprocess.run([
|
|
self.age_binary,
|
|
"-d", # Decrypt
|
|
"-i", temp_key.name, # Identity file
|
|
"-" # Read from stdin
|
|
], input=encrypted_bytes, capture_output=True, check=True)
|
|
|
|
os.unlink(temp_key.name)
|
|
return result.stdout.decode('utf-8')
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Data decryption failed: {e.stderr}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Data decryption error: {str(e)}")
|
|
|
|
def list_project_keys(self, project_id: str) -> list[Dict[str, Any]]:
|
|
"""
|
|
List all keys for a project.
|
|
|
|
Args:
|
|
project_id: Project identifier
|
|
|
|
Returns:
|
|
List of key information dictionaries
|
|
"""
|
|
keys = []
|
|
pattern = f"{project_id}_*.json"
|
|
|
|
for metadata_file in self.keys_storage_path.glob(pattern):
|
|
try:
|
|
metadata = json.loads(metadata_file.read_text())
|
|
keys.append({
|
|
"key_id": metadata["key_id"],
|
|
"public_key": metadata["public_key"],
|
|
"encrypted": metadata["encrypted"],
|
|
"created_at": metadata["created_at"],
|
|
"backup_locations": metadata.get("backup_locations", [])
|
|
})
|
|
except Exception as e:
|
|
print(f"Error reading key metadata {metadata_file}: {e}")
|
|
continue
|
|
|
|
return keys
|
|
|
|
def backup_key(self, project_id: str, key_id: str, backup_location: str) -> bool:
|
|
"""
|
|
Create a backup of a key pair.
|
|
|
|
Args:
|
|
project_id: Project identifier
|
|
key_id: Key identifier
|
|
backup_location: Path to backup location
|
|
|
|
Returns:
|
|
Success status
|
|
"""
|
|
try:
|
|
backup_path = Path(backup_location)
|
|
backup_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Files to backup
|
|
files_to_backup = [
|
|
f"{project_id}_{key_id}.key",
|
|
f"{project_id}_{key_id}.pub",
|
|
f"{project_id}_{key_id}.json"
|
|
]
|
|
|
|
for filename in files_to_backup:
|
|
source = self.keys_storage_path / filename
|
|
dest = backup_path / filename
|
|
|
|
if source.exists():
|
|
shutil.copy2(source, dest)
|
|
# Preserve restrictive permissions
|
|
if filename.endswith('.key') or filename.endswith('.json'):
|
|
dest.chmod(0o600)
|
|
else:
|
|
dest.chmod(0o644)
|
|
|
|
# Update metadata with backup location
|
|
metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json"
|
|
if metadata_path.exists():
|
|
metadata = json.loads(metadata_path.read_text())
|
|
if backup_location not in metadata.get("backup_locations", []):
|
|
metadata.setdefault("backup_locations", []).append(backup_location)
|
|
metadata_path.write_text(json.dumps(metadata, indent=2))
|
|
|
|
print(f"Key backup created: {backup_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Key backup failed: {e}")
|
|
return False
|
|
|
|
def generate_recovery_phrase(self, project_id: str, key_id: str) -> str:
|
|
"""
|
|
Generate a human-readable recovery phrase for key recovery.
|
|
|
|
Args:
|
|
project_id: Project identifier
|
|
key_id: Key identifier
|
|
|
|
Returns:
|
|
Recovery phrase
|
|
"""
|
|
# Create a deterministic but secure recovery phrase
|
|
seed = f"{project_id}:{key_id}:{datetime.now().isoformat()}"
|
|
hash_bytes = hashlib.sha256(seed.encode()).digest()
|
|
|
|
# Use a simple word list for recovery phrases
|
|
words = [
|
|
"alpha", "bravo", "charlie", "delta", "echo", "foxtrot",
|
|
"golf", "hotel", "india", "juliet", "kilo", "lima",
|
|
"mike", "november", "oscar", "papa", "quebec", "romeo",
|
|
"sierra", "tango", "uniform", "victor", "whiskey", "xray",
|
|
"yankee", "zulu"
|
|
]
|
|
|
|
# Generate 12-word recovery phrase
|
|
phrase_words = []
|
|
for i in range(12):
|
|
word_index = hash_bytes[i % len(hash_bytes)] % len(words)
|
|
phrase_words.append(words[word_index])
|
|
|
|
recovery_phrase = " ".join(phrase_words)
|
|
|
|
# Store recovery phrase in metadata
|
|
metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json"
|
|
if metadata_path.exists():
|
|
metadata = json.loads(metadata_path.read_text())
|
|
metadata["recovery_phrase"] = recovery_phrase
|
|
metadata_path.write_text(json.dumps(metadata, indent=2))
|
|
|
|
return recovery_phrase
|
|
|
|
def validate_key_access(self, project_id: str, key_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Validate access to a key and return status information.
|
|
|
|
Args:
|
|
project_id: Project identifier
|
|
key_id: Key identifier
|
|
|
|
Returns:
|
|
Validation status and information
|
|
"""
|
|
try:
|
|
private_key_path = self.keys_storage_path / f"{project_id}_{key_id}.key"
|
|
public_key_path = self.keys_storage_path / f"{project_id}_{key_id}.pub"
|
|
metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json"
|
|
|
|
status = {
|
|
"key_id": key_id,
|
|
"private_key_exists": private_key_path.exists(),
|
|
"public_key_exists": public_key_path.exists(),
|
|
"metadata_exists": metadata_path.exists(),
|
|
"accessible": False,
|
|
"encrypted": False,
|
|
"backup_count": 0,
|
|
"created_at": None
|
|
}
|
|
|
|
if metadata_path.exists():
|
|
metadata = json.loads(metadata_path.read_text())
|
|
status["encrypted"] = metadata.get("encrypted", False)
|
|
status["backup_count"] = len(metadata.get("backup_locations", []))
|
|
status["created_at"] = metadata.get("created_at")
|
|
|
|
# Test key accessibility
|
|
if private_key_path.exists() and public_key_path.exists():
|
|
try:
|
|
# Test encryption/decryption with the key pair
|
|
public_key = public_key_path.read_text().strip()
|
|
test_data = "test-encryption-" + secrets.token_hex(8)
|
|
|
|
encrypted = self.encrypt_data(test_data, [public_key])
|
|
|
|
# For decryption test, we'd need the private key
|
|
# but we don't want to prompt for passphrase here
|
|
status["accessible"] = bool(encrypted)
|
|
|
|
except Exception:
|
|
status["accessible"] = False
|
|
|
|
return status
|
|
|
|
except Exception as e:
|
|
return {
|
|
"key_id": key_id,
|
|
"error": str(e),
|
|
"accessible": False
|
|
} |