Files
hive/backend/app/core/security.py
anthonyrawlins b6bff318d9 WIP: Save current work before CHORUS rebrand
- Agent roles integration progress
- Various backend and frontend updates
- Storybook cache cleanup

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-01 02:20:56 +10:00

288 lines
10 KiB
Python

"""
Security utilities for JWT token generation, validation, and API key management.
"""
import os
import uuid
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
import jwt
from fastapi import HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi import Request
from sqlalchemy.orm import Session
# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "30"))
# Security scheme
security = HTTPBearer(auto_error=False)
class TokenManager:
"""Manages JWT token creation, validation, and refresh."""
@staticmethod
def create_access_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Add standard claims
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access",
"jti": str(uuid.uuid4()), # JWT ID for blacklisting
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def create_refresh_token(
user_id: int,
expires_delta: Optional[timedelta] = None
) -> str:
"""Create a JWT refresh token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = {
"sub": str(user_id),
"exp": expire,
"iat": datetime.utcnow(),
"type": "refresh",
"jti": str(uuid.uuid4()),
}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def verify_token(token: str) -> Dict[str, Any]:
"""Verify and decode a JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
@staticmethod
def extract_user_id(token: str) -> int:
"""Extract user ID from a valid token."""
payload = TokenManager.verify_token(token)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token missing user information"
)
return int(user_id)
@staticmethod
def get_token_claims(token: str) -> Dict[str, Any]:
"""Get all claims from a token without verification (for expired tokens)."""
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM], options={"verify_exp": False})
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token format"
)
class APIKeyManager:
"""Manages API key generation, validation, and permissions."""
@staticmethod
def generate_api_key() -> tuple[str, str, str]:
"""
Generate a new API key.
Returns: (plain_key, hashed_key, prefix)
"""
from app.models.auth import APIKey
plain_key, hashed_key = APIKey.generate_api_key()
prefix = plain_key[:8] # First 8 characters for identification
return plain_key, hashed_key, prefix
@staticmethod
def validate_api_key(session: Session, api_key: str) -> Optional[Dict[str, Any]]:
"""
Validate an API key and return user/key information.
Returns None if invalid.
"""
from app.models.auth import APIKey, User
# Find API key by trying to match the hash
api_keys = session.query(APIKey).filter(APIKey.is_active == True).all()
for key_record in api_keys:
if APIKey.verify_api_key(api_key, key_record.key_hash):
if not key_record.is_valid():
return None
# Get user information
user = session.query(User).filter(User.id == key_record.user_id).first()
if not user or not user.is_active:
return None
# Record usage
key_record.record_usage()
session.commit()
return {
"user_id": user.id,
"username": user.username,
"api_key_id": key_record.id,
"scopes": key_record.get_scopes(),
"is_superuser": user.is_superuser,
}
return None
@staticmethod
def check_scope_permission(user_scopes: List[str], required_scope: str) -> bool:
"""Check if user has required scope permission."""
# Admin users have all permissions
if "admin" in user_scopes:
return True
# Check for specific scope
if required_scope in user_scopes:
return True
# Check for wildcard permissions (e.g., "workflows:*" covers "workflows:read")
scope_parts = required_scope.split(":")
if len(scope_parts) >= 2:
wildcard_scope = f"{scope_parts[0]}:*"
if wildcard_scope in user_scopes:
return True
return False
class AuthenticationError(Exception):
"""Custom exception for authentication errors."""
def __init__(self, message: str, status_code: int = status.HTTP_401_UNAUTHORIZED):
self.message = message
self.status_code = status_code
super().__init__(self.message)
class AuthManager:
"""Main authentication manager combining JWT and API key auth."""
@staticmethod
def authenticate_request(
session: Session,
authorization: Optional[HTTPAuthorizationCredentials] = None,
api_key: Optional[str] = None
) -> Dict[str, Any]:
"""
Authenticate a request using either Bearer token or API key.
Returns user context information.
"""
# Try API key authentication first
if api_key:
user_context = APIKeyManager.validate_api_key(session, api_key)
if user_context:
user_context["auth_type"] = "api_key"
return user_context
else:
raise AuthenticationError("Invalid API key")
# Try JWT Bearer token authentication
if authorization and authorization.scheme.lower() == "bearer":
try:
payload = TokenManager.verify_token(authorization.credentials)
# Check if token is blacklisted
from app.models.auth import TokenBlacklist
jti = payload.get("jti")
if jti and TokenBlacklist.is_token_blacklisted(session, jti):
raise AuthenticationError("Token has been revoked")
# Get user information
user_id = payload.get("sub") # UUID as string
from app.models.user import User
user = session.query(User).filter(User.id == user_id).first()
if not user or not user.is_active:
raise AuthenticationError("User not found or inactive")
return {
"user_id": user.id,
"username": user.username,
"scopes": ["admin"] if user.is_superuser else [],
"is_superuser": user.is_superuser,
"auth_type": "jwt",
"token_jti": jti,
}
except HTTPException as e:
raise AuthenticationError(e.detail, e.status_code)
raise AuthenticationError("No valid authentication provided")
@staticmethod
def require_scope(required_scope: str):
"""Decorator to require specific scope for an endpoint."""
def decorator(func):
def wrapper(*args, **kwargs):
# This will be implemented in the dependency injection system
return func(*args, **kwargs)
return wrapper
return decorator
def create_token_response(user_id: int, user_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a complete token response with access and refresh tokens."""
# Create access token with user data
access_token_data = {
"sub": str(user_id),
"username": user_data.get("username"),
"scopes": user_data.get("scopes", []),
}
access_token = TokenManager.create_access_token(access_token_data)
refresh_token = TokenManager.create_refresh_token(user_id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60, # seconds
}
def get_password_hash(password: str) -> str:
"""Hash a password for storage."""
from app.models.user import User
return User.hash_password(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
from app.models.auth import pwd_context
return pwd_context.verify(plain_password, hashed_password)