Pre-cleanup snapshot - all current files

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
tony
2025-08-05 02:32:45 +10:00
parent 26079aa8da
commit 4511f4c801
32 changed files with 5072 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
# SHHH Core Module
"""
Core components for the SHHH Secrets Sentinel system.
"""
__version__ = "1.0.0"

View File

@@ -0,0 +1,52 @@
import re
import yaml
from pathlib import Path
class SecretDetector:
"""
A simplified secret detection engine using configurable regex patterns.
It scans text for secrets, redacts them, and provides metadata.
"""
def __init__(self, patterns_file: str = "patterns.yaml"):
self.patterns_file = Path(patterns_file)
self.patterns = self._load_patterns()
def _load_patterns(self) -> dict:
"""Load detection patterns from YAML configuration."""
try:
with open(self.patterns_file, 'r') as f:
config = yaml.safe_load(f)
patterns = config.get('patterns', {})
# Pre-compile regex for efficiency
for name, props in patterns.items():
if props.get('active', True):
props['compiled_regex'] = re.compile(props['regex'])
return patterns
except Exception as e:
print(f"[ERROR] Failed to load patterns from {self.patterns_file}: {e}")
return {}
def scan(self, text: str) -> list[dict]:
"""Scans text and returns a list of found secrets with metadata."""
matches = []
for pattern_name, pattern in self.patterns.items():
if pattern.get('active', True) and 'compiled_regex' in pattern:
regex_match = pattern['compiled_regex'].search(text)
if regex_match:
matches.append({
"secret_type": pattern_name,
"value": regex_match.group(0),
"confidence": pattern.get("confidence", 0.8),
"severity": pattern.get("severity", "MEDIUM")
})
return matches
def redact(self, text: str, secret_value: str) -> str:
"""Redacts a specific secret value within a string."""
# Ensure we don't reveal too much for very short secrets
if len(secret_value) < 8:
return text.replace(secret_value, "[REDACTED]")
redacted_str = secret_value[:4] + "****" + secret_value[-4:]
return text.replace(secret_value, f"[REDACTED:{redacted_str}]")

View File

@@ -0,0 +1,35 @@
import asyncio
from datetime import datetime
class LogEntry:
"""A mock log entry object for testing purposes."""
def __init__(self, content):
self.content = content
self.timestamp = datetime.now()
# Add other fields as needed to match the processor's expectations
self.source_agent = "mock_agent"
self.message_type = "mock_message"
self.metadata = {}
self.is_bzzz_message = False
self.bzzz_message_id = None
class HypercoreReader:
"""
A simplified, mock HypercoreReader that reads from a plain text file
to simulate a stream of log entries for testing.
"""
def __init__(self, log_path: str, **kwargs):
self.log_path = log_path
async def stream_entries(self):
"""
An async generator that yields log entries from a text file.
"""
try:
with open(self.log_path, 'r') as f:
for line in f:
yield LogEntry(line.strip())
await asyncio.sleep(0.01) # Simulate async behavior
except FileNotFoundError:
print(f"[ERROR] Hypercore log file not found at: {self.log_path}")
return

View File

@@ -0,0 +1,44 @@
import requests
import json
class LLMAnalyzer:
"""Analyzes text for secrets using a local LLM via Ollama."""
def __init__(self, endpoint: str, model: str, system_prompt: str):
self.endpoint = endpoint
self.model = model
self.system_prompt = system_prompt
def analyze(self, text: str) -> dict:
"""
Sends text to the Ollama API for analysis and returns a structured JSON response.
Returns:
A dictionary like:
{
"secret_found": bool,
"secret_type": str,
"confidence_score": float,
"severity": str
}
Returns a default "not found" response on error.
"""
prompt = f"Log entry: \"{text}\"\n\nAnalyze this for secrets and respond with only the required JSON."
payload = {
"model": self.model,
"system": self.system_prompt,
"prompt": prompt,
"format": "json",
"stream": False
}
try:
response = requests.post(self.endpoint, json=payload, timeout=15)
response.raise_for_status()
# The response from Ollama is a JSON string, which needs to be parsed.
analysis = json.loads(response.json().get("response", "{}"))
return analysis
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"[ERROR] LLMAnalyzer failed: {e}")
# Fallback: If LLM fails, assume no secret was found to avoid blocking the pipeline.
return {"secret_found": False}

View File

@@ -0,0 +1,22 @@
from datetime import datetime
class QuarantineManager:
"""
A simplified, mock QuarantineManager for testing purposes.
It prints quarantined messages to the console instead of saving to a database.
"""
def __init__(self, database_url: str, **kwargs):
print(f"[MockQuarantine] Initialized with db_url: {database_url}")
def quarantine_message(self, message, secret_type: str, severity: str, redacted_content: str):
"""
Prints a quarantined message to the console.
"""
print("\n--- QUARANTINE ALERT ---")
print(f"Timestamp: {datetime.now().isoformat()}")
print(f"Severity: {severity}")
print(f"Secret Type: {secret_type}")
print(f"Original Content (from mock): {message.content}")
print(f"Redacted Content: {redacted_content}")
print("------------------------\n")

View File

@@ -0,0 +1,16 @@
class SanitizedWriter:
"""Writes log entries to the sanitized sister hypercore log."""
def __init__(self, sanitized_log_path: str):
self.log_path = sanitized_log_path
# Placeholder for hypercore writing logic. For now, we'll append to a file.
self.log_file = open(self.log_path, "a")
def write(self, log_entry: str):
"""Writes a single log entry to the sanitized stream."""
self.log_file.write(log_entry + "\n")
self.log_file.flush()
def close(self):
self.log_file.close()