diff --git a/pkg/mcp/server.go b/pkg/mcp/server.go index f45da5d..617eda2 100644 --- a/pkg/mcp/server.go +++ b/pkg/mcp/server.go @@ -102,6 +102,7 @@ const ( StatusCollaborating AgentStatus = "collaborating" StatusEscalating AgentStatus = "escalating" StatusTerminating AgentStatus = "terminating" + StatusOffline AgentStatus = "offline" ) // AgentTask represents a task being worked on by an agent @@ -427,7 +428,7 @@ func (s *McpServer) processMCPMessage(message map[string]interface{}) (map[strin case "tools/call": return s.callTool(params) case "resources/list": - return s.listResources(), nil + return s.listResources() case "resources/read": return s.readResource(params) default: @@ -625,4 +626,347 @@ type Relation struct { Type string Strength float64 Evidence []string +} + +// REST API handlers + +func (s *McpServer) handleAgentsAPI(w http.ResponseWriter, r *http.Request) { + s.agentsMutex.RLock() + defer s.agentsMutex.RUnlock() + + agents := make([]map[string]interface{}, 0, len(s.agents)) + for _, agent := range s.agents { + agent.mutex.RLock() + agents = append(agents, map[string]interface{}{ + "id": agent.ID, + "role": agent.Role, + "status": agent.Status, + "specialization": agent.Specialization, + "capabilities": agent.Capabilities, + "current_tasks": len(agent.CurrentTasks), + "max_tasks": agent.MaxTasks, + }) + agent.mutex.RUnlock() + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "agents": agents, + "total": len(agents), + }) +} + +func (s *McpServer) handleConversationsAPI(w http.ResponseWriter, r *http.Request) { + // Collect all active conversation threads from agents + conversations := make([]map[string]interface{}, 0) + + s.agentsMutex.RLock() + for _, agent := range s.agents { + agent.mutex.RLock() + for threadID, thread := range agent.ActiveThreads { + conversations = append(conversations, map[string]interface{}{ + "id": threadID, + "topic": thread.Topic, + "state": thread.State, + "participants": len(thread.Participants), + "created_at": thread.CreatedAt, + }) + } + agent.mutex.RUnlock() + } + s.agentsMutex.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "conversations": conversations, + "total": len(conversations), + }) +} + +func (s *McpServer) handleStatsAPI(w http.ResponseWriter, r *http.Request) { + s.stats.mutex.RLock() + defer s.stats.mutex.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "start_time": s.stats.StartTime, + "uptime_seconds": time.Since(s.stats.StartTime).Seconds(), + "total_requests": s.stats.TotalRequests, + "active_agents": s.stats.ActiveAgents, + "messages_processed": s.stats.MessagesProcessed, + "tokens_consumed": s.stats.TokensConsumed, + "average_cost_per_task": s.stats.AverageCostPerTask, + "error_rate": s.stats.ErrorRate, + }) +} + +func (s *McpServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) { + s.agentsMutex.RLock() + agentCount := len(s.agents) + s.agentsMutex.RUnlock() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "healthy", + "active_agents": agentCount, + "uptime": time.Since(s.stats.StartTime).String(), + }) +} + +// Message handlers + +func (s *McpServer) handleBzzzMessages() { + // Subscribe to BZZZ messages via pubsub + if s.pubsub == nil { + return + } + + // Listen for BZZZ coordination messages + for { + select { + case <-s.ctx.Done(): + return + default: + // Process BZZZ messages from pubsub + time.Sleep(1 * time.Second) + } + } +} + +func (s *McpServer) handleHmmmMessages() { + // Subscribe to HMMM messages via pubsub + if s.pubsub == nil { + return + } + + // Listen for HMMM discussion messages + for { + select { + case <-s.ctx.Done(): + return + default: + // Process HMMM messages from pubsub + time.Sleep(1 * time.Second) + } + } +} + +func (s *McpServer) periodicTasks() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + // Update agent statistics + s.agentsMutex.RLock() + s.stats.mutex.Lock() + s.stats.ActiveAgents = len(s.agents) + s.stats.mutex.Unlock() + s.agentsMutex.RUnlock() + + // Re-announce agents periodically + s.agentsMutex.RLock() + for _, agent := range s.agents { + if time.Since(agent.LastAnnouncement) > 5*time.Minute { + s.announceAgent(agent) + } + } + s.agentsMutex.RUnlock() + } + } +} + +// Agent management + +func (s *McpServer) stopAgent(agent *GPTAgent) { + agent.mutex.Lock() + defer agent.mutex.Unlock() + + // Update status + agent.Status = StatusOffline + + // Clean up active tasks + for taskID := range agent.CurrentTasks { + delete(agent.CurrentTasks, taskID) + } + + // Clean up active threads + for threadID := range agent.ActiveThreads { + delete(agent.ActiveThreads, threadID) + } + + s.hlog.Append(logging.PeerLeft, map[string]interface{}{ + "agent_id": agent.ID, + "role": string(agent.Role), + }) +} + +func (s *McpServer) initiateCollaboration(thread *ConversationThread) error { + // Send collaboration invitation to all participants + for _, participant := range thread.Participants { + s.agentsMutex.RLock() + agent, exists := s.agents[participant.AgentID] + s.agentsMutex.RUnlock() + + if !exists { + continue + } + + // Update participant status + agent.mutex.Lock() + participant.Status = ParticipantStatusActive + agent.mutex.Unlock() + + // Log collaboration start + s.hlog.Append(logging.Collaboration, map[string]interface{}{ + "event": "collaboration_started", + "thread_id": thread.ID, + "agent_id": agent.ID, + "role": string(agent.Role), + }) + } + + return nil +} + +// MCP tool listing + +func (s *McpServer) listTools() map[string]interface{} { + return map[string]interface{}{ + "tools": []map[string]interface{}{ + { + "name": "chorus_announce", + "description": "Announce agent availability to CHORUS network", + "parameters": map[string]interface{}{ + "agent_id": "string", + "capabilities": "array", + "specialization": "string", + }, + }, + { + "name": "chorus_lookup", + "description": "Look up available agents by capability or role", + "parameters": map[string]interface{}{ + "capability": "string", + "role": "string", + }, + }, + { + "name": "chorus_get", + "description": "Retrieve context or data from CHORUS DHT", + "parameters": map[string]interface{}{ + "key": "string", + }, + }, + { + "name": "chorus_store", + "description": "Store data in CHORUS DHT", + "parameters": map[string]interface{}{ + "key": "string", + "value": "string", + }, + }, + { + "name": "chorus_collaborate", + "description": "Request multi-agent collaboration on a task", + "parameters": map[string]interface{}{ + "task": "object", + "required_roles": "array", + }, + }, + }, + } +} + +// MCP resource handling + +func (s *McpServer) listResources() (map[string]interface{}, error) { + return map[string]interface{}{ + "resources": []map[string]interface{}{ + { + "uri": "chorus://agents", + "name": "Available Agents", + "description": "List of all available CHORUS agents", + "mimeType": "application/json", + }, + { + "uri": "chorus://dht", + "name": "DHT Storage", + "description": "Access to distributed hash table storage", + "mimeType": "application/json", + }, + }, + }, nil +} + +func (s *McpServer) readResource(params map[string]interface{}) (map[string]interface{}, error) { + uri, ok := params["uri"].(string) + if !ok { + return nil, fmt.Errorf("missing uri parameter") + } + + switch uri { + case "chorus://agents": + s.agentsMutex.RLock() + defer s.agentsMutex.RUnlock() + + agents := make([]map[string]interface{}, 0, len(s.agents)) + for _, agent := range s.agents { + agents = append(agents, map[string]interface{}{ + "id": agent.ID, + "role": agent.Role, + "status": agent.Status, + }) + } + return map[string]interface{}{"agents": agents}, nil + + case "chorus://dht": + return map[string]interface{}{"message": "DHT access not implemented"}, nil + + default: + return nil, fmt.Errorf("unknown resource: %s", uri) + } +} + +// BZZZ tool handlers + +func (s *McpServer) handleBzzzLookup(params map[string]interface{}) (map[string]interface{}, error) { + // Stub: Lookup agents or resources via BZZZ + return map[string]interface{}{ + "results": []interface{}{}, + }, nil +} + +func (s *McpServer) handleBzzzGet(params map[string]interface{}) (map[string]interface{}, error) { + // Stub: Get data from BZZZ system + return map[string]interface{}{ + "data": nil, + }, nil +} + +func (s *McpServer) handleBzzzPost(params map[string]interface{}) (map[string]interface{}, error) { + // Stub: Post data to BZZZ system + return map[string]interface{}{ + "success": false, + "message": "not implemented", + }, nil +} + +func (s *McpServer) handleBzzzThread(params map[string]interface{}) (map[string]interface{}, error) { + // Stub: Handle BZZZ thread operations + return map[string]interface{}{ + "thread": nil, + }, nil +} + +func (s *McpServer) handleBzzzSubscribe(params map[string]interface{}) (map[string]interface{}, error) { + // Stub: Subscribe to BZZZ events + return map[string]interface{}{ + "subscribed": false, + "message": "not implemented", + }, nil } \ No newline at end of file