Files
chorus-services/modules/shhh/main.py
tony 4511f4c801 Pre-cleanup snapshot - all current files
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-05 02:32:45 +10:00

182 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
SHHH Secrets Sentinel - Main Entry Point
Production-ready secrets detection and monitoring system for CHORUS Services.
"""
import asyncio
import argparse
import sys
import yaml
from pathlib import Path
import structlog
from typing import Dict, Any
# Updated imports to bring in the new and modified components
from pipeline.processor import MessageProcessor
from core.hypercore_reader import HypercoreReader
from core.detector import SecretDetector
from core.llm_analyzer import LLMAnalyzer
from core.quarantine import QuarantineManager
from core.sanitized_writer import SanitizedWriter
def setup_logging(log_level: str = "INFO", structured: bool = True):
"""Configure structured logging"""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer() if structured else structlog.dev.ConsoleRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
def load_config(config_path: str) -> Dict[str, Any]:
"""Load configuration from YAML file"""
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
return config
except FileNotFoundError:
print(f"Configuration file not found: {config_path}, using defaults.")
return get_default_config()
except yaml.YAMLError as e:
print(f"Error parsing configuration file: {e}")
sys.exit(1)
def get_default_config() -> Dict[str, Any]:
"""Get default configuration, updated for the new architecture."""
return {
'primary_log_path': '/path/to/primary/hypercore.log',
'sanitized_log_path': '/path/to/sanitized/hypercore.log',
'database_url': 'postgresql://shhh:password@localhost:5432/shhh_sentinel',
'patterns_file': 'patterns.yaml',
'ollama_endpoint': 'http://localhost:11434/api/generate',
'ollama_model': 'llama3',
'llm_confidence_threshold': 0.90,
'shhh_agent_prompt_file': 'SHHH_SECRETS_SENTINEL_AGENT_PROMPT.md'
}
async def run_monitor_mode(config: Dict[str, Any]):
"""Run in monitoring mode with the new hybrid pipeline."""
logger = structlog.get_logger()
logger.info("Starting SHHH in monitor mode with hybrid pipeline...")
writer = None
try:
# 1. Load System Prompt for LLM
try:
with open(config['shhh_agent_prompt_file'], "r") as f:
ollama_system_prompt = f.read()
except FileNotFoundError:
logger.error(f"LLM prompt file not found at {config['shhh_agent_prompt_file']}. Aborting.")
return
# 2. Instantiation of components
# Note: HypercoreReader and QuarantineManager might need async initialization
# which is not shown here for simplicity, following the plan.
reader = HypercoreReader(config['primary_log_path'])
detector = SecretDetector(config['patterns_file'])
llm_analyzer = LLMAnalyzer(config['ollama_endpoint'], config['ollama_model'], ollama_system_prompt)
quarantine = QuarantineManager(config['database_url'])
writer = SanitizedWriter(config['sanitized_log_path'])
processor = MessageProcessor(
reader=reader,
detector=detector,
llm_analyzer=llm_analyzer,
quarantine=quarantine,
writer=writer,
llm_threshold=config['llm_confidence_threshold']
)
# 3. Execution
logger.info("Starting processor stream...")
await processor.process_stream()
except Exception as e:
logger.error("An error occurred during monitor mode execution.", error=str(e))
finally:
if writer:
writer.close()
logger.info("Monitor mode shut down complete.")
async def run_api_mode(config: Dict[str, Any], host: str, port: int):
"""Run in API mode (dashboard server) - UNCHANGED"""
import uvicorn
from api.main import app
app.state.config = config
uvicorn_config = uvicorn.Config(app=app, host=host, port=port, log_level="info", access_log=True)
server = uvicorn.Server(uvicorn_config)
await server.serve()
async def run_test_mode(config: Dict[str, Any], test_file: str):
"""Run in test mode with sample data - UNCHANGED but may be broken."""
logger = structlog.get_logger()
logger.warning("Test mode may be broken due to recent refactoring.")
# This part of the code would need to be updated to work with the new SecretDetector.
# For now, it remains as it was.
from core.detector import SecretDetector
from datetime import datetime
detector = SecretDetector(config['patterns_file'])
logger.info("Running SHHH in test mode")
# ... (rest of the test mode logic is likely broken and needs updating)
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="SHHH Secrets Sentinel")
parser.add_argument('--config', '-c', default='config.yaml', help='Configuration file path')
parser.add_argument('--mode', '-m', choices=['monitor', 'api', 'test'], default='monitor', help='Operation mode')
parser.add_argument('--log-level', '-l', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='Log level')
parser.add_argument('--structured-logs', action='store_true', help='Use structured JSON logging')
parser.add_argument('--host', default='127.0.0.1', help='API server host')
parser.add_argument('--port', '-p', type=int, default=8000, help='API server port')
parser.add_argument('--test-file', help='Test data file for test mode')
parser.add_argument('--version', '-v', action='version', version='SHHH Secrets Sentinel 1.1.0 (Hybrid)')
args = parser.parse_args()
setup_logging(args.log_level, args.structured_logs)
logger = structlog.get_logger()
config = load_config(args.config)
logger.info("Starting SHHH Secrets Sentinel", mode=args.mode, config_file=args.config)
try:
if args.mode == 'monitor':
asyncio.run(run_monitor_mode(config))
elif args.mode == 'api':
asyncio.run(run_api_mode(config, args.host, args.port))
elif args.mode == 'test':
asyncio.run(run_test_mode(config, args.test_file))
except KeyboardInterrupt:
logger.info("Shutting down due to keyboard interrupt.")
except Exception as e:
logger.error("Application failed", error=str(e))
sys.exit(1)
logger.info("SHHH Secrets Sentinel stopped.")
if __name__ == '__main__':
main()