""" Age Encryption Service for WHOOSH - Secure master key generation and management. """ import os import subprocess import tempfile import shutil from pathlib import Path from typing import Dict, Optional, Tuple, Any from datetime import datetime import json import hashlib import secrets class AgeService: """ Age encryption service for WHOOSH project security. Handles master key generation, storage, and encryption operations. """ def __init__(self): self.age_binary = self._find_age_binary() self.keys_storage_path = Path("/home/tony/AI/secrets/age_keys") self.keys_storage_path.mkdir(parents=True, exist_ok=True) def _find_age_binary(self) -> str: """Find the age binary on the system.""" for path in ["/usr/bin/age", "/usr/local/bin/age", "age"]: if shutil.which(path if path != "age" else path): return path raise RuntimeError("Age binary not found. Please install age encryption tool.") def generate_master_key_pair(self, project_id: str, passphrase: Optional[str] = None) -> Dict[str, Any]: """ Generate a new Age master key pair for a project. Args: project_id: Unique project identifier passphrase: Optional passphrase for additional security Returns: Dictionary containing key information and storage details """ try: # Generate Age key pair using age-keygen result = subprocess.run( ["age-keygen"], capture_output=True, text=True, check=True ) if result.returncode != 0: raise RuntimeError(f"Age key generation failed: {result.stderr}") # Parse the output to extract public and private keys output_lines = result.stdout.strip().split('\n') # Find the public key line (starts with "# public key:") public_key = None private_key = None for i, line in enumerate(output_lines): if line.startswith("# public key:"): public_key = line.replace("# public key:", "").strip() elif line.startswith("AGE-SECRET-KEY-"): private_key = line.strip() if not public_key or not private_key: raise RuntimeError("Failed to parse Age key generation output") # Generate key metadata key_id = hashlib.sha256(public_key.encode()).hexdigest()[:16] timestamp = datetime.now().isoformat() # Create secure storage for private key private_key_path = self.keys_storage_path / f"{project_id}_{key_id}.key" public_key_path = self.keys_storage_path / f"{project_id}_{key_id}.pub" metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json" # Encrypt private key with passphrase if provided if passphrase: private_key_content = self._encrypt_private_key(private_key, passphrase) encrypted = True else: private_key_content = private_key encrypted = False # Store private key securely private_key_path.write_text(private_key_content) private_key_path.chmod(0o600) # Owner read/write only # Store public key public_key_path.write_text(public_key) public_key_path.chmod(0o644) # Owner read/write, others read # Create metadata metadata = { "project_id": project_id, "key_id": key_id, "public_key": public_key, "private_key_path": str(private_key_path), "public_key_path": str(public_key_path), "encrypted": encrypted, "created_at": timestamp, "backup_locations": [], "recovery_info": { "security_questions": [], "backup_methods": [] } } # Store metadata metadata_path.write_text(json.dumps(metadata, indent=2)) metadata_path.chmod(0o600) print(f"Age master key pair generated for project {project_id}") print(f" Key ID: {key_id}") print(f" Public key: {public_key}") print(f" Private key stored: {private_key_path}") return { "key_id": key_id, "public_key": public_key, "private_key_stored": True, "private_key_path": str(private_key_path), "public_key_path": str(public_key_path), "encrypted": encrypted, "backup_location": str(self.keys_storage_path), "created_at": timestamp, "metadata": metadata } except subprocess.CalledProcessError as e: raise RuntimeError(f"Age key generation command failed: {e.stderr}") except Exception as e: raise RuntimeError(f"Age key generation failed: {str(e)}") def _encrypt_private_key(self, private_key: str, passphrase: str) -> str: """ Encrypt private key with a passphrase using Age itself. Args: private_key: The raw private key passphrase: Passphrase for encryption Returns: Encrypted private key content """ try: # Create temporary files for input and passphrase with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_input: temp_input.write(private_key) temp_input.flush() with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_passphrase: temp_passphrase.write(passphrase) temp_passphrase.flush() # Use Age with passphrase file to avoid TTY issues env = os.environ.copy() env['SHELL'] = '/bin/bash' # Run age with passphrase from stdin process = subprocess.Popen([ self.age_binary, "-p", # Use passphrase temp_input.name ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) stdout, stderr = process.communicate(input=passphrase + '\n') # Clean up temp files os.unlink(temp_input.name) os.unlink(temp_passphrase.name) if process.returncode != 0: raise RuntimeError(f"Age encryption failed: {stderr}") return stdout except subprocess.CalledProcessError as e: raise RuntimeError(f"Private key encryption failed: {e.stderr}") except Exception as e: raise RuntimeError(f"Private key encryption error: {str(e)}") def decrypt_private_key(self, project_id: str, key_id: str, passphrase: Optional[str] = None) -> str: """ Decrypt and retrieve a private key. Args: project_id: Project identifier key_id: Key identifier passphrase: Passphrase if key is encrypted Returns: Decrypted private key """ try: private_key_path = self.keys_storage_path / f"{project_id}_{key_id}.key" metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json" if not private_key_path.exists(): raise RuntimeError(f"Private key not found for project {project_id}") # Load metadata if metadata_path.exists(): metadata = json.loads(metadata_path.read_text()) encrypted = metadata.get("encrypted", False) else: encrypted = False # Read private key content private_key_content = private_key_path.read_text() if encrypted: if not passphrase: raise RuntimeError("Passphrase required for encrypted private key") # Decrypt using Age with proper passphrase handling process = subprocess.Popen([ self.age_binary, "-d", # Decrypt "-p" # Use passphrase ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Send passphrase first, then encrypted content input_data = passphrase + '\n' + private_key_content stdout, stderr = process.communicate(input=input_data) if process.returncode != 0: raise RuntimeError(f"Age decryption failed: {stderr}") return stdout.strip() else: return private_key_content.strip() except subprocess.CalledProcessError as e: raise RuntimeError(f"Private key decryption failed: {e.stderr}") except Exception as e: raise RuntimeError(f"Private key retrieval error: {str(e)}") def encrypt_data(self, data: str, recipients: list[str]) -> str: """ Encrypt data for multiple recipients using their public keys. Args: data: Data to encrypt recipients: List of Age public keys Returns: Encrypted data """ try: # Build Age command with recipients cmd = [self.age_binary] for recipient in recipients: cmd.extend(["-r", recipient]) cmd.append("-") # Read from stdin result = subprocess.run( cmd, input=data.encode('utf-8'), capture_output=True, check=True ) # Return base64 encoded encrypted data for safe text handling import base64 return base64.b64encode(result.stdout).decode('ascii') except subprocess.CalledProcessError as e: raise RuntimeError(f"Data encryption failed: {e.stderr}") except Exception as e: raise RuntimeError(f"Data encryption error: {str(e)}") def decrypt_data(self, encrypted_data: str, private_key: str) -> str: """ Decrypt data using a private key. Args: encrypted_data: Age-encrypted data private_key: Age private key for decryption Returns: Decrypted data """ try: import base64 # Decode base64 encrypted data encrypted_bytes = base64.b64decode(encrypted_data.encode('ascii')) with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_key: temp_key.write(private_key) temp_key.flush() result = subprocess.run([ self.age_binary, "-d", # Decrypt "-i", temp_key.name, # Identity file "-" # Read from stdin ], input=encrypted_bytes, capture_output=True, check=True) os.unlink(temp_key.name) return result.stdout.decode('utf-8') except subprocess.CalledProcessError as e: raise RuntimeError(f"Data decryption failed: {e.stderr}") except Exception as e: raise RuntimeError(f"Data decryption error: {str(e)}") def list_project_keys(self, project_id: str) -> list[Dict[str, Any]]: """ List all keys for a project. Args: project_id: Project identifier Returns: List of key information dictionaries """ keys = [] pattern = f"{project_id}_*.json" for metadata_file in self.keys_storage_path.glob(pattern): try: metadata = json.loads(metadata_file.read_text()) keys.append({ "key_id": metadata["key_id"], "public_key": metadata["public_key"], "encrypted": metadata["encrypted"], "created_at": metadata["created_at"], "backup_locations": metadata.get("backup_locations", []) }) except Exception as e: print(f"Error reading key metadata {metadata_file}: {e}") continue return keys def backup_key(self, project_id: str, key_id: str, backup_location: str) -> bool: """ Create a backup of a key pair. Args: project_id: Project identifier key_id: Key identifier backup_location: Path to backup location Returns: Success status """ try: backup_path = Path(backup_location) backup_path.mkdir(parents=True, exist_ok=True) # Files to backup files_to_backup = [ f"{project_id}_{key_id}.key", f"{project_id}_{key_id}.pub", f"{project_id}_{key_id}.json" ] for filename in files_to_backup: source = self.keys_storage_path / filename dest = backup_path / filename if source.exists(): shutil.copy2(source, dest) # Preserve restrictive permissions if filename.endswith('.key') or filename.endswith('.json'): dest.chmod(0o600) else: dest.chmod(0o644) # Update metadata with backup location metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json" if metadata_path.exists(): metadata = json.loads(metadata_path.read_text()) if backup_location not in metadata.get("backup_locations", []): metadata.setdefault("backup_locations", []).append(backup_location) metadata_path.write_text(json.dumps(metadata, indent=2)) print(f"Key backup created: {backup_path}") return True except Exception as e: print(f"Key backup failed: {e}") return False def generate_recovery_phrase(self, project_id: str, key_id: str) -> str: """ Generate a human-readable recovery phrase for key recovery. Args: project_id: Project identifier key_id: Key identifier Returns: Recovery phrase """ # Create a deterministic but secure recovery phrase seed = f"{project_id}:{key_id}:{datetime.now().isoformat()}" hash_bytes = hashlib.sha256(seed.encode()).digest() # Use a simple word list for recovery phrases words = [ "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa", "quebec", "romeo", "sierra", "tango", "uniform", "victor", "whiskey", "xray", "yankee", "zulu" ] # Generate 12-word recovery phrase phrase_words = [] for i in range(12): word_index = hash_bytes[i % len(hash_bytes)] % len(words) phrase_words.append(words[word_index]) recovery_phrase = " ".join(phrase_words) # Store recovery phrase in metadata metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json" if metadata_path.exists(): metadata = json.loads(metadata_path.read_text()) metadata["recovery_phrase"] = recovery_phrase metadata_path.write_text(json.dumps(metadata, indent=2)) return recovery_phrase def validate_key_access(self, project_id: str, key_id: str) -> Dict[str, Any]: """ Validate access to a key and return status information. Args: project_id: Project identifier key_id: Key identifier Returns: Validation status and information """ try: private_key_path = self.keys_storage_path / f"{project_id}_{key_id}.key" public_key_path = self.keys_storage_path / f"{project_id}_{key_id}.pub" metadata_path = self.keys_storage_path / f"{project_id}_{key_id}.json" status = { "key_id": key_id, "private_key_exists": private_key_path.exists(), "public_key_exists": public_key_path.exists(), "metadata_exists": metadata_path.exists(), "accessible": False, "encrypted": False, "backup_count": 0, "created_at": None } if metadata_path.exists(): metadata = json.loads(metadata_path.read_text()) status["encrypted"] = metadata.get("encrypted", False) status["backup_count"] = len(metadata.get("backup_locations", [])) status["created_at"] = metadata.get("created_at") # Test key accessibility if private_key_path.exists() and public_key_path.exists(): try: # Test encryption/decryption with the key pair public_key = public_key_path.read_text().strip() test_data = "test-encryption-" + secrets.token_hex(8) encrypted = self.encrypt_data(test_data, [public_key]) # For decryption test, we'd need the private key # but we don't want to prompt for passphrase here status["accessible"] = bool(encrypted) except Exception: status["accessible"] = False return status except Exception as e: return { "key_id": key_id, "error": str(e), "accessible": False }