- Migrated from HIVE branding to WHOOSH across all components - Enhanced backend API with new services: AI models, BZZZ integration, templates, members - Added comprehensive testing suite with security, performance, and integration tests - Improved frontend with new components for project setup, AI models, and team management - Updated MCP server implementation with WHOOSH-specific tools and resources - Enhanced deployment configurations with production-ready Docker setups - Added comprehensive documentation and setup guides - Implemented age encryption service and UCXL integration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
431 lines
18 KiB
Python
431 lines
18 KiB
Python
"""
|
|
GITEA Service for WHOOSH - Integrates with GITEA for repository and project management.
|
|
Uses the existing BZZZ GITEA client implementation for consistency.
|
|
"""
|
|
import os
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional, Any
|
|
from datetime import datetime
|
|
import requests
|
|
from app.models.project import Project
|
|
|
|
|
|
class GiteaService:
|
|
"""
|
|
GITEA service for WHOOSH project management.
|
|
Handles repository creation, issue management, and BZZZ task coordination.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.gitea_base_url = "http://ironwood:3000"
|
|
self.gitea_api_base = f"{self.gitea_base_url}/api/v1"
|
|
self.gitea_token = self._get_gitea_token()
|
|
|
|
# Default BZZZ task labels
|
|
self.bzzz_labels = {
|
|
"task": "bzzz-task",
|
|
"in_progress": "in-progress",
|
|
"completed": "completed",
|
|
"frontend": "frontend",
|
|
"backend": "backend",
|
|
"security": "security",
|
|
"design": "design",
|
|
"devops": "devops",
|
|
"documentation": "documentation",
|
|
"bug": "bug",
|
|
"enhancement": "enhancement",
|
|
"architecture": "architecture"
|
|
}
|
|
|
|
def _get_gitea_token(self) -> Optional[str]:
|
|
"""Get GITEA token from secrets or environment."""
|
|
try:
|
|
# Try Docker secret first (most secure)
|
|
docker_secret_path = Path("/run/secrets/gitea_token")
|
|
if docker_secret_path.exists():
|
|
return docker_secret_path.read_text().strip()
|
|
|
|
# Try filesystem secret - primary location
|
|
gitea_token_path = Path("/home/tony/chorus/business/secrets/gitea-token")
|
|
if gitea_token_path.exists():
|
|
return gitea_token_path.read_text().strip()
|
|
|
|
# Try fallback location
|
|
gitea_token_fallback = Path("/home/tony/AI/secrets/passwords_and_tokens/gitea-token")
|
|
if gitea_token_fallback.exists():
|
|
return gitea_token_fallback.read_text().strip()
|
|
|
|
# Try environment variable
|
|
token = os.getenv("GITEA_TOKEN")
|
|
if token:
|
|
return token.strip()
|
|
|
|
print("Warning: No GITEA token found. Repository operations will be limited.")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error reading GITEA token: {e}")
|
|
return None
|
|
|
|
def _make_api_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:
|
|
"""Make authenticated API request to GITEA."""
|
|
if not self.gitea_token:
|
|
raise Exception("GITEA token required for API operations")
|
|
|
|
url = f"{self.gitea_api_base}/{endpoint.lstrip('/')}"
|
|
headers = {
|
|
"Authorization": f"token {self.gitea_token}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
try:
|
|
if method.upper() == "GET":
|
|
response = requests.get(url, headers=headers, timeout=30)
|
|
elif method.upper() == "POST":
|
|
response = requests.post(url, headers=headers, json=data, timeout=30)
|
|
elif method.upper() == "PUT":
|
|
response = requests.put(url, headers=headers, json=data, timeout=30)
|
|
elif method.upper() == "PATCH":
|
|
response = requests.patch(url, headers=headers, json=data, timeout=30)
|
|
elif method.upper() == "DELETE":
|
|
response = requests.delete(url, headers=headers, timeout=30)
|
|
else:
|
|
raise ValueError(f"Unsupported HTTP method: {method}")
|
|
|
|
if response.status_code >= 200 and response.status_code < 300:
|
|
return response.json() if response.content else {}
|
|
else:
|
|
print(f"GITEA API error: {response.status_code} - {response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error making GITEA API request to {url}: {e}")
|
|
return None
|
|
|
|
def create_repository(self, owner: str, repo_name: str, description: str = "",
|
|
private: bool = False, auto_init: bool = True) -> Optional[Dict]:
|
|
"""Create a new repository in GITEA."""
|
|
data = {
|
|
"name": repo_name,
|
|
"description": description,
|
|
"private": private,
|
|
"auto_init": auto_init,
|
|
"gitignores": "Python,Node,Go,Rust", # Common gitignore templates
|
|
"license": "MIT", # Default to MIT license
|
|
"readme": "Default"
|
|
}
|
|
|
|
# Try to create under organization first, fallback to user
|
|
result = self._make_api_request("POST", f"orgs/{owner}/repos", data)
|
|
if not result:
|
|
# Fallback to user repository
|
|
result = self._make_api_request("POST", "user/repos", data)
|
|
|
|
if result:
|
|
print(f"Created GITEA repository: {owner}/{repo_name}")
|
|
|
|
# Set up BZZZ labels after repo creation
|
|
self._setup_bzzz_labels(owner, repo_name)
|
|
|
|
return {
|
|
"id": result.get("id"),
|
|
"name": result.get("name"),
|
|
"full_name": result.get("full_name"),
|
|
"html_url": result.get("html_url"),
|
|
"clone_url": result.get("clone_url"),
|
|
"ssh_url": result.get("ssh_url"),
|
|
"default_branch": result.get("default_branch", "main"),
|
|
"private": result.get("private", False)
|
|
}
|
|
|
|
return None
|
|
|
|
def _setup_bzzz_labels(self, owner: str, repo_name: str) -> bool:
|
|
"""Set up BZZZ task coordination labels in the repository."""
|
|
labels_data = [
|
|
{"name": self.bzzz_labels["task"], "color": "0366d6", "description": "Task available for BZZZ agent coordination"},
|
|
{"name": self.bzzz_labels["in_progress"], "color": "fbca04", "description": "Task currently being worked on"},
|
|
{"name": self.bzzz_labels["completed"], "color": "28a745", "description": "Task completed by BZZZ agent"},
|
|
{"name": self.bzzz_labels["frontend"], "color": "e99695", "description": "Frontend development task"},
|
|
{"name": self.bzzz_labels["backend"], "color": "5319e7", "description": "Backend development task"},
|
|
{"name": self.bzzz_labels["security"], "color": "d93f0b", "description": "Security-related task"},
|
|
{"name": self.bzzz_labels["design"], "color": "f9d0c4", "description": "UI/UX design task"},
|
|
{"name": self.bzzz_labels["devops"], "color": "0e8a16", "description": "DevOps and infrastructure task"},
|
|
{"name": self.bzzz_labels["documentation"], "color": "0075ca", "description": "Documentation task"},
|
|
{"name": self.bzzz_labels["bug"], "color": "d73a4a", "description": "Bug fix task"},
|
|
{"name": self.bzzz_labels["enhancement"], "color": "a2eeef", "description": "Feature enhancement task"},
|
|
{"name": self.bzzz_labels["architecture"], "color": "5319e7", "description": "System architecture task"}
|
|
]
|
|
|
|
success_count = 0
|
|
for label_data in labels_data:
|
|
result = self._make_api_request("POST", f"repos/{owner}/{repo_name}/labels", label_data)
|
|
if result:
|
|
success_count += 1
|
|
|
|
print(f"Set up {success_count}/{len(labels_data)} BZZZ labels for {owner}/{repo_name}")
|
|
return success_count == len(labels_data)
|
|
|
|
def create_issue(self, owner: str, repo_name: str, title: str, body: str = "",
|
|
labels: Optional[List[str]] = None, assignees: Optional[List[str]] = None) -> Optional[Dict]:
|
|
"""Create an issue in the repository."""
|
|
data = {
|
|
"title": title,
|
|
"body": body,
|
|
"labels": labels or [],
|
|
"assignees": assignees or []
|
|
}
|
|
|
|
result = self._make_api_request("POST", f"repos/{owner}/{repo_name}/issues", data)
|
|
if result:
|
|
return {
|
|
"id": result.get("id"),
|
|
"number": result.get("number"),
|
|
"title": result.get("title"),
|
|
"body": result.get("body"),
|
|
"state": result.get("state"),
|
|
"html_url": result.get("html_url"),
|
|
"created_at": result.get("created_at"),
|
|
"updated_at": result.get("updated_at")
|
|
}
|
|
|
|
return None
|
|
|
|
def get_repository_info(self, owner: str, repo_name: str) -> Optional[Dict]:
|
|
"""Get repository information."""
|
|
result = self._make_api_request("GET", f"repos/{owner}/{repo_name}")
|
|
if result:
|
|
return {
|
|
"id": result.get("id"),
|
|
"name": result.get("name"),
|
|
"full_name": result.get("full_name"),
|
|
"description": result.get("description"),
|
|
"html_url": result.get("html_url"),
|
|
"clone_url": result.get("clone_url"),
|
|
"ssh_url": result.get("ssh_url"),
|
|
"default_branch": result.get("default_branch", "main"),
|
|
"private": result.get("private", False),
|
|
"stars_count": result.get("stars_count", 0),
|
|
"forks_count": result.get("forks_count", 0),
|
|
"open_issues_count": result.get("open_issues_count", 0),
|
|
"created_at": result.get("created_at"),
|
|
"updated_at": result.get("updated_at")
|
|
}
|
|
|
|
return None
|
|
|
|
def list_repositories(self, owner: Optional[str] = None) -> List[Dict]:
|
|
"""List repositories for user or organization."""
|
|
if owner:
|
|
# List organization repositories
|
|
result = self._make_api_request("GET", f"orgs/{owner}/repos")
|
|
if not result:
|
|
# Fallback to user repositories
|
|
result = self._make_api_request("GET", f"users/{owner}/repos")
|
|
else:
|
|
# List current user's repositories
|
|
result = self._make_api_request("GET", "user/repos")
|
|
|
|
if result and isinstance(result, list):
|
|
repositories = []
|
|
for repo in result:
|
|
repositories.append({
|
|
"id": repo.get("id"),
|
|
"name": repo.get("name"),
|
|
"full_name": repo.get("full_name"),
|
|
"description": repo.get("description"),
|
|
"html_url": repo.get("html_url"),
|
|
"clone_url": repo.get("clone_url"),
|
|
"default_branch": repo.get("default_branch", "main"),
|
|
"private": repo.get("private", False),
|
|
"created_at": repo.get("created_at"),
|
|
"updated_at": repo.get("updated_at")
|
|
})
|
|
return repositories
|
|
|
|
return []
|
|
|
|
def get_bzzz_tasks(self, owner: str, repo_name: str, state: str = "open") -> List[Dict]:
|
|
"""Get BZZZ tasks (issues with bzzz-task label) from repository."""
|
|
endpoint = f"repos/{owner}/{repo_name}/issues"
|
|
params = f"?state={state}&labels={self.bzzz_labels['task']}"
|
|
|
|
result = self._make_api_request("GET", f"{endpoint}{params}")
|
|
if result and isinstance(result, list):
|
|
tasks = []
|
|
for issue in result:
|
|
# Check if task is claimed (has assignees)
|
|
is_claimed = bool(issue.get("assignees"))
|
|
|
|
# Determine task type from labels
|
|
task_type = self._determine_task_type(issue)
|
|
|
|
tasks.append({
|
|
"id": issue.get("id"),
|
|
"number": issue.get("number"),
|
|
"title": issue.get("title"),
|
|
"body": issue.get("body"),
|
|
"state": issue.get("state"),
|
|
"labels": [label.get("name") for label in issue.get("labels", [])],
|
|
"assignees": [assignee.get("login") for assignee in issue.get("assignees", [])],
|
|
"html_url": issue.get("html_url"),
|
|
"created_at": issue.get("created_at"),
|
|
"updated_at": issue.get("updated_at"),
|
|
"is_claimed": is_claimed,
|
|
"task_type": task_type
|
|
})
|
|
|
|
return tasks
|
|
|
|
return []
|
|
|
|
def _determine_task_type(self, issue: Dict) -> str:
|
|
"""Determine task type from issue labels and content."""
|
|
labels = [label.get("name", "").lower() for label in issue.get("labels", [])]
|
|
title_lower = issue.get("title", "").lower()
|
|
body_lower = issue.get("body", "").lower()
|
|
|
|
# Priority order for task type determination
|
|
type_mappings = [
|
|
("bug", ["bug", "error", "fix"]),
|
|
("security", ["security", "vulnerability", "auth"]),
|
|
("architecture", ["architecture", "system", "design"]),
|
|
("frontend", ["frontend", "ui", "react", "vue"]),
|
|
("backend", ["backend", "api", "server"]),
|
|
("devops", ["devops", "deployment", "ci", "cd", "docker"]),
|
|
("documentation", ["docs", "documentation", "readme"]),
|
|
("enhancement", ["enhancement", "feature", "improvement"]),
|
|
("design", ["design", "ux", "mockup"])
|
|
]
|
|
|
|
for task_type, keywords in type_mappings:
|
|
if any(keyword in labels for keyword in keywords) or \
|
|
any(keyword in title_lower for keyword in keywords) or \
|
|
any(keyword in body_lower for keyword in keywords):
|
|
return task_type
|
|
|
|
return "general"
|
|
|
|
def create_bzzz_task(self, owner: str, repo_name: str, title: str, description: str,
|
|
task_type: str = "general", priority: str = "medium") -> Optional[Dict]:
|
|
"""Create a new BZZZ task (issue with bzzz-task label)."""
|
|
labels = [self.bzzz_labels["task"]]
|
|
|
|
# Add type-specific labels
|
|
if task_type in self.bzzz_labels:
|
|
labels.append(self.bzzz_labels[task_type])
|
|
|
|
# Add priority label
|
|
if priority == "high":
|
|
labels.append("priority-high")
|
|
elif priority == "low":
|
|
labels.append("priority-low")
|
|
|
|
return self.create_issue(owner, repo_name, title, description, labels)
|
|
|
|
def setup_project_repository(self, project_data: Dict) -> Optional[Dict]:
|
|
"""Complete project repository setup with WHOOSH integration."""
|
|
try:
|
|
# Extract project details
|
|
project_name = project_data.get("name", "").lower().replace(" ", "-")
|
|
description = project_data.get("description", "")
|
|
owner = project_data.get("owner", "whoosh") # Default to whoosh organization
|
|
private = project_data.get("private", False)
|
|
|
|
# Create repository
|
|
repo_info = self.create_repository(
|
|
owner=owner,
|
|
repo_name=project_name,
|
|
description=description,
|
|
private=private,
|
|
auto_init=True
|
|
)
|
|
|
|
if not repo_info:
|
|
return None
|
|
|
|
# Create initial project structure issue
|
|
initial_issue = self.create_bzzz_task(
|
|
owner=owner,
|
|
repo_name=project_name,
|
|
title="🚀 Project Setup and Initial Structure",
|
|
description=f"""# {project_data.get('name', project_name)}
|
|
|
|
{description}
|
|
|
|
## Initial Setup Tasks
|
|
|
|
- [ ] Set up project structure
|
|
- [ ] Configure development environment
|
|
- [ ] Add README documentation
|
|
- [ ] Set up CI/CD pipeline
|
|
- [ ] Configure testing framework
|
|
|
|
This issue tracks the initial project setup. Additional tasks will be created as needed.
|
|
|
|
---
|
|
*Created by WHOOSH Project Setup Wizard*
|
|
""",
|
|
task_type="architecture",
|
|
priority="high"
|
|
)
|
|
|
|
return {
|
|
"repository": repo_info,
|
|
"initial_issue": initial_issue,
|
|
"gitea_url": f"{self.gitea_base_url}/{owner}/{project_name}",
|
|
"clone_url": repo_info.get("clone_url"),
|
|
"bzzz_enabled": True,
|
|
"labels_configured": True
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error setting up project repository: {e}")
|
|
return None
|
|
|
|
def validate_repository_access(self, owner: str, repo_name: str) -> Dict[str, Any]:
|
|
"""Validate access to a repository and return status information."""
|
|
try:
|
|
repo_info = self.get_repository_info(owner, repo_name)
|
|
if repo_info:
|
|
# Check if BZZZ labels exist
|
|
labels_result = self._make_api_request("GET", f"repos/{owner}/{repo_name}/labels")
|
|
bzzz_labels_exist = False
|
|
if labels_result:
|
|
label_names = [label.get("name") for label in labels_result]
|
|
bzzz_labels_exist = self.bzzz_labels["task"] in label_names
|
|
|
|
# Get task count
|
|
bzzz_tasks = self.get_bzzz_tasks(owner, repo_name)
|
|
|
|
return {
|
|
"accessible": True,
|
|
"repository": repo_info,
|
|
"bzzz_labels_configured": bzzz_labels_exist,
|
|
"bzzz_task_count": len(bzzz_tasks),
|
|
"bzzz_ready": bzzz_labels_exist
|
|
}
|
|
else:
|
|
return {
|
|
"accessible": False,
|
|
"error": "Repository not found or access denied"
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"accessible": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
def get_project_git_url(self, owner: str, repo_name: str, use_ssh: bool = False) -> Optional[str]:
|
|
"""Get the appropriate git URL for cloning."""
|
|
repo_info = self.get_repository_info(owner, repo_name)
|
|
if repo_info:
|
|
if use_ssh:
|
|
return repo_info.get("ssh_url")
|
|
else:
|
|
return repo_info.get("clone_url")
|
|
return None |