#!/usr/bin/env python3
"""
WHOOSH Security Audit Suite - Phase 5.3
Comprehensive security testing and vulnerability assessment.
"""
import requests
import json
import re
import time
from typing import Dict, List, Tuple
from urllib.parse import urlparse
from datetime import datetime
class WHOOSHSecurityAuditor:
"""Comprehensive security auditing for WHOOSH system"""
def __init__(self, base_url: str = "http://localhost:8087"):
self.base_url = base_url
self.vulnerabilities = []
self.security_score = 100
def log_vulnerability(self, severity: str, category: str, description: str, details: Dict = None):
"""Log a security vulnerability"""
vuln = {
'severity': severity, # LOW, MEDIUM, HIGH, CRITICAL
'category': category,
'description': description,
'details': details or {},
'timestamp': datetime.now().isoformat()
}
self.vulnerabilities.append(vuln)
# Adjust security score based on severity
score_impact = {
'CRITICAL': -25,
'HIGH': -15,
'MEDIUM': -10,
'LOW': -5
}
self.security_score += score_impact.get(severity, 0)
severity_emoji = {'CRITICAL': 'šØ', 'HIGH': 'ā', 'MEDIUM': 'ā ļø', 'LOW': 'š”'}
print(f"{severity_emoji.get(severity, 'ā ļø')} {severity}: {description}")
def test_cors_configuration(self) -> bool:
"""Test CORS configuration security"""
print("\nš CORS CONFIGURATION AUDIT")
try:
# Test CORS headers
response = requests.options(f"{self.base_url}/api/templates", timeout=5)
cors_headers = {k: v for k, v in response.headers.items() if 'access-control' in k.lower()}
if not cors_headers:
self.log_vulnerability(
"MEDIUM",
"CORS",
"CORS headers not configured - potential cross-origin issues",
{"missing_headers": ["Access-Control-Allow-Origin"]}
)
return False
# Check for overly permissive CORS
origin_header = cors_headers.get('Access-Control-Allow-Origin', '')
if origin_header == '*':
self.log_vulnerability(
"HIGH",
"CORS",
"CORS configured to allow all origins (*) - security risk",
{"cors_origin": origin_header}
)
# Check credentials handling
credentials = cors_headers.get('Access-Control-Allow-Credentials', '').lower()
if credentials == 'true' and origin_header == '*':
self.log_vulnerability(
"CRITICAL",
"CORS",
"CORS allows credentials with wildcard origin - critical security flaw",
{"cors_credentials": credentials, "cors_origin": origin_header}
)
print(f"ā
CORS headers present: {len(cors_headers)} headers configured")
return True
except Exception as e:
self.log_vulnerability(
"MEDIUM",
"CORS",
f"Unable to test CORS configuration: {e}",
{"error": str(e)}
)
return False
def test_authentication_security(self) -> bool:
"""Test authentication and authorization mechanisms"""
print("\nš AUTHENTICATION SECURITY AUDIT")
try:
# Test if sensitive endpoints are protected
sensitive_endpoints = [
"/api/projects/setup",
"/api/members",
"/api/crypto/generate-age-keys"
]
unprotected_endpoints = []
for endpoint in sensitive_endpoints:
try:
response = requests.get(f"{self.base_url}{endpoint}", timeout=5)
# These endpoints should require authentication (401) or return proper error
if response.status_code == 200:
unprotected_endpoints.append(endpoint)
self.log_vulnerability(
"HIGH",
"Authentication",
f"Sensitive endpoint {endpoint} accessible without authentication",
{"endpoint": endpoint, "status_code": response.status_code}
)
elif response.status_code in [401, 403, 422]:
print(f"ā
{endpoint} properly protected (Status: {response.status_code})")
except requests.exceptions.RequestException:
# Endpoint not available in test mode - this is expected
print(f"āŖ {endpoint} not available in test mode")
return len(unprotected_endpoints) == 0
except Exception as e:
self.log_vulnerability(
"MEDIUM",
"Authentication",
f"Authentication testing failed: {e}",
{"error": str(e)}
)
return False
def test_input_validation(self) -> bool:
"""Test input validation and injection vulnerabilities"""
print("\nš”ļø INPUT VALIDATION AUDIT")
try:
# Test SQL injection patterns
sql_payloads = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"UNION SELECT * FROM users",
"'; INSERT INTO"
]
# Test XSS patterns
xss_payloads = [
"",
"javascript:alert('xss')",
"
",
"'>"
]
vulnerable_endpoints = []
# Test template endpoint with malicious input
for payload in sql_payloads + xss_payloads:
try:
response = requests.get(
f"{self.base_url}/api/templates",
params={"search": payload},
timeout=5
)
# Check if payload is reflected in response
if payload in response.text:
vulnerable_endpoints.append(f"/api/templates?search={payload}")
self.log_vulnerability(
"HIGH",
"Input Validation",
f"Potential injection vulnerability - payload reflected",
{"payload": payload, "endpoint": "/api/templates"}
)
except requests.exceptions.RequestException:
pass
if not vulnerable_endpoints:
print("ā
No obvious injection vulnerabilities found")
return len(vulnerable_endpoints) == 0
except Exception as e:
self.log_vulnerability(
"LOW",
"Input Validation",
f"Input validation testing limited: {e}",
{"error": str(e)}
)
return True # Don't fail the test for testing limitations
def test_information_disclosure(self) -> bool:
"""Test for information disclosure vulnerabilities"""
print("\nš INFORMATION DISCLOSURE AUDIT")
try:
# Test error handling
response = requests.get(f"{self.base_url}/api/nonexistent", timeout=5)
sensitive_patterns = [
r'traceback',
r'stack trace',
r'/home/\w+',
r'password',
r'secret',
r'private.*key',
r'database.*error'
]
response_text = response.text.lower()
for pattern in sensitive_patterns:
if re.search(pattern, response_text):
self.log_vulnerability(
"MEDIUM",
"Information Disclosure",
f"Sensitive information in error response: {pattern}",
{"pattern": pattern, "status_code": response.status_code}
)
# Test server headers
server_headers = response.headers.get('Server', '')
if server_headers and 'uvicorn' in server_headers.lower():
self.log_vulnerability(
"LOW",
"Information Disclosure",
"Server version information disclosed in headers",
{"server_header": server_headers}
)
# Test API documentation exposure
docs_response = requests.get(f"{self.base_url}/docs", timeout=5)
if docs_response.status_code == 200:
print("ā ļø API documentation publicly accessible")
# This might be intentional for development, so mark as informational
print(" Consider restricting access in production environment")
print("ā
Information disclosure audit completed")
return True
except Exception as e:
self.log_vulnerability(
"LOW",
"Information Disclosure",
f"Information disclosure testing limited: {e}",
{"error": str(e)}
)
return True
def test_rate_limiting(self) -> bool:
"""Test rate limiting and DoS protection"""
print("\nā” RATE LIMITING AUDIT")
try:
# Make rapid requests to test rate limiting
start_time = time.time()
responses = []
for i in range(50): # 50 rapid requests
response = requests.get(f"{self.base_url}/health", timeout=1)
responses.append(response.status_code)
end_time = time.time()
duration = end_time - start_time
requests_per_second = 50 / duration
# Check if any requests were rate limited
rate_limited = len([r for r in responses if r == 429])
if rate_limited == 0 and requests_per_second > 20:
self.log_vulnerability(
"MEDIUM",
"Rate Limiting",
"No rate limiting detected - potential DoS vulnerability",
{"rps": requests_per_second, "total_requests": 50}
)
else:
print(f"ā
Rate limiting appears active or requests naturally throttled")
print(f" Request rate: {requests_per_second:.1f} RPS, {rate_limited} rate limited")
return True
except Exception as e:
self.log_vulnerability(
"LOW",
"Rate Limiting",
f"Rate limiting testing failed: {e}",
{"error": str(e)}
)
return True
def test_secure_headers(self) -> bool:
"""Test security headers"""
print("\nš SECURITY HEADERS AUDIT")
try:
response = requests.get(f"{self.base_url}/health", timeout=5)
headers = response.headers
# Check for important security headers
security_headers = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': ['DENY', 'SAMEORIGIN'],
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': None, # Only for HTTPS
'Content-Security-Policy': None,
'Referrer-Policy': 'strict-origin-when-cross-origin'
}
missing_headers = []
for header, expected in security_headers.items():
if header not in headers:
missing_headers.append(header)
severity = "MEDIUM" if header in ['X-Content-Type-Options', 'X-Frame-Options'] else "LOW"
self.log_vulnerability(
severity,
"Security Headers",
f"Missing security header: {header}",
{"missing_header": header}
)
else:
value = headers[header]
if expected and isinstance(expected, list):
if value not in expected:
self.log_vulnerability(
"LOW",
"Security Headers",
f"Suboptimal {header} value: {value}",
{"header": header, "value": value, "expected": expected}
)
if not missing_headers:
print("ā
All important security headers present")
else:
print(f"ā ļø Missing {len(missing_headers)} security headers")
return len(missing_headers) < 3
except Exception as e:
self.log_vulnerability(
"LOW",
"Security Headers",
f"Security headers testing failed: {e}",
{"error": str(e)}
)
return True
def run_comprehensive_audit(self) -> Dict:
"""Run complete security audit"""
print("š WHOOSH SECURITY AUDIT SUITE")
print("=" * 60)
print(f"Target: {self.base_url}")
print(f"Started: {datetime.now().isoformat()}")
# Run all security tests
test_results = {
'CORS Configuration': self.test_cors_configuration(),
'Authentication Security': self.test_authentication_security(),
'Input Validation': self.test_input_validation(),
'Information Disclosure': self.test_information_disclosure(),
'Rate Limiting': self.test_rate_limiting(),
'Security Headers': self.test_secure_headers()
}
# Calculate final security score
passed_tests = len([r for r in test_results.values() if r])
total_tests = len(test_results)
test_pass_rate = (passed_tests / total_tests) * 100
# Security grade based on score and vulnerabilities
critical_vulns = len([v for v in self.vulnerabilities if v['severity'] == 'CRITICAL'])
high_vulns = len([v for v in self.vulnerabilities if v['severity'] == 'HIGH'])
if critical_vulns > 0:
security_grade = "F"
elif high_vulns > 2:
security_grade = "D"
elif self.security_score >= 90:
security_grade = "A"
elif self.security_score >= 80:
security_grade = "B"
elif self.security_score >= 70:
security_grade = "C"
else:
security_grade = "D"
# Generate report
report = {
'security_score': max(0, self.security_score),
'security_grade': security_grade,
'test_results': test_results,
'test_pass_rate': test_pass_rate,
'vulnerabilities': self.vulnerabilities,
'vulnerability_summary': {
'critical': len([v for v in self.vulnerabilities if v['severity'] == 'CRITICAL']),
'high': len([v for v in self.vulnerabilities if v['severity'] == 'HIGH']),
'medium': len([v for v in self.vulnerabilities if v['severity'] == 'MEDIUM']),
'low': len([v for v in self.vulnerabilities if v['severity'] == 'LOW'])
},
'recommendations': self._generate_security_recommendations(),
'audit_timestamp': datetime.now().isoformat()
}
return report
def _generate_security_recommendations(self) -> List[str]:
"""Generate security recommendations based on findings"""
recommendations = []
# Group vulnerabilities by category
vuln_categories = {}
for vuln in self.vulnerabilities:
category = vuln['category']
if category not in vuln_categories:
vuln_categories[category] = []
vuln_categories[category].append(vuln)
if 'CORS' in vuln_categories:
recommendations.append("Configure CORS properly with specific origins instead of wildcards")
if 'Authentication' in vuln_categories:
recommendations.append("Implement proper authentication middleware for all sensitive endpoints")
if 'Input Validation' in vuln_categories:
recommendations.append("Strengthen input validation and sanitization across all endpoints")
if 'Security Headers' in vuln_categories:
recommendations.append("Implement missing security headers to prevent common web attacks")
if 'Rate Limiting' in vuln_categories:
recommendations.append("Implement rate limiting to prevent abuse and DoS attacks")
# Always recommend these for production
recommendations.extend([
"Enable HTTPS/TLS encryption for all communications",
"Implement comprehensive logging and monitoring",
"Regular security updates and dependency scanning",
"Consider Web Application Firewall (WAF) for additional protection"
])
return recommendations
def main():
"""Main security audit runner"""
auditor = WHOOSHSecurityAuditor()
# Run comprehensive audit
results = auditor.run_comprehensive_audit()
# Print summary
print("\nš SECURITY AUDIT SUMMARY")
print("=" * 60)
print(f"Security Score: {results['security_score']}/100")
print(f"Security Grade: {results['security_grade']}")
print(f"Test Pass Rate: {results['test_pass_rate']:.1f}%")
print(f"\nVulnerabilities Found:")
summary = results['vulnerability_summary']
print(f" šØ Critical: {summary['critical']}")
print(f" ā High: {summary['high']}")
print(f" ā ļø Medium: {summary['medium']}")
print(f" š” Low: {summary['low']}")
if results['recommendations']:
print(f"\nš” SECURITY RECOMMENDATIONS:")
for rec in results['recommendations']:
print(f" ⢠{rec}")
# Save detailed results
timestamp = int(time.time())
filename = f"security_audit_results_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
print(f"\nš Detailed audit results saved to: {filename}")
# Exit code based on security grade
if results['security_grade'] in ['A', 'B']:
print("š SECURITY AUDIT PASSED!")
return 0
else:
print("ā ļø SECURITY ISSUES DETECTED - REVIEW REQUIRED")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())