#!/usr/bin/env node /** * BZZZ MCP Server * Model Context Protocol server enabling GPT-4 agents to participate in BZZZ P2P network */ import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js"; import { BzzzProtocolTools } from "./tools/protocol-tools.js"; import { AgentManager } from "./agents/agent-manager.js"; import { ConversationManager } from "./conversations/conversation-manager.js"; import { BzzzP2PConnector } from "./p2p/bzzz-connector.js"; import { OpenAIIntegration } from "./ai/openai-integration.js"; import { CostTracker } from "./utils/cost-tracker.js"; import { Logger } from "./utils/logger.js"; import { Config } from "./config/config.js"; class BzzzMcpServer { private server: Server; private protocolTools!: BzzzProtocolTools; private agentManager!: AgentManager; private conversationManager!: ConversationManager; private p2pConnector!: BzzzP2PConnector; private openaiIntegration!: OpenAIIntegration; private costTracker!: CostTracker; private logger: Logger; constructor() { this.logger = new Logger("BzzzMcpServer"); // Initialize server this.server = new Server( { name: "bzzz-mcp-server", version: "1.0.0", }, { capabilities: { tools: {}, resources: {}, }, } ); // Initialize components this.initializeComponents(); this.setupToolHandlers(); this.setupEventHandlers(); } private initializeComponents(): void { const config = Config.getInstance(); // Initialize OpenAI integration this.openaiIntegration = new OpenAIIntegration({ apiKey: config.openai.apiKey, defaultModel: config.openai.defaultModel, maxTokens: config.openai.maxTokens, }); // Initialize cost tracking this.costTracker = new CostTracker({ dailyLimit: config.cost.dailyLimit, monthlyLimit: config.cost.monthlyLimit, warningThreshold: config.cost.warningThreshold, }); // Initialize P2P connector this.p2pConnector = new BzzzP2PConnector({ bzzzNodeUrl: config.bzzz.nodeUrl, networkId: config.bzzz.networkId, }); // Initialize conversation manager this.conversationManager = new ConversationManager({ maxActiveThreads: config.conversation.maxActiveThreads, defaultTimeout: config.conversation.defaultTimeout, escalationRules: config.conversation.escalationRules, }); // Initialize agent manager this.agentManager = new AgentManager({ openaiIntegration: this.openaiIntegration, costTracker: this.costTracker, conversationManager: this.conversationManager, p2pConnector: this.p2pConnector, }); // Initialize protocol tools this.protocolTools = new BzzzProtocolTools({ agentManager: this.agentManager, p2pConnector: this.p2pConnector, conversationManager: this.conversationManager, }); } private setupToolHandlers(): void { // List available tools this.server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ // Protocol tools { name: "bzzz_announce", description: "Announce agent presence and capabilities on the BZZZ network", inputSchema: { type: "object", properties: { agent_id: { type: "string", description: "Unique agent identifier" }, role: { type: "string", description: "Agent role (architect, reviewer, etc.)" }, capabilities: { type: "array", items: { type: "string" }, description: "List of agent capabilities" }, specialization: { type: "string", description: "Agent specialization area" }, max_tasks: { type: "number", default: 3, description: "Maximum concurrent tasks" }, }, required: ["agent_id", "role"], }, }, { name: "bzzz_lookup", description: "Discover agents and resources using semantic addressing", inputSchema: { type: "object", properties: { semantic_address: { type: "string", description: "Format: bzzz://agent:role@project:task/path", }, filter_criteria: { type: "object", properties: { expertise: { type: "array", items: { type: "string" } }, availability: { type: "boolean" }, performance_threshold: { type: "number" }, }, }, }, required: ["semantic_address"], }, }, { name: "bzzz_get", description: "Retrieve content from BZZZ semantic addresses", inputSchema: { type: "object", properties: { address: { type: "string", description: "BZZZ semantic address" }, include_metadata: { type: "boolean", default: true }, max_history: { type: "number", default: 10 }, }, required: ["address"], }, }, { name: "bzzz_post", description: "Post events or messages to BZZZ addresses", inputSchema: { type: "object", properties: { target_address: { type: "string", description: "Target BZZZ address" }, message_type: { type: "string", description: "Type of message" }, content: { type: "object", description: "Message content" }, priority: { type: "string", enum: ["low", "medium", "high", "urgent"], default: "medium" }, thread_id: { type: "string", description: "Optional conversation thread ID" }, }, required: ["target_address", "message_type", "content"], }, }, { name: "bzzz_thread", description: "Manage threaded conversations between agents", inputSchema: { type: "object", properties: { action: { type: "string", enum: ["create", "join", "leave", "list", "summarize"], description: "Thread action to perform" }, thread_id: { type: "string", description: "Thread identifier" }, participants: { type: "array", items: { type: "string" }, description: "List of participant agent IDs" }, topic: { type: "string", description: "Thread topic" }, }, required: ["action"], }, }, { name: "bzzz_subscribe", description: "Subscribe to real-time events from BZZZ network", inputSchema: { type: "object", properties: { event_types: { type: "array", items: { type: "string" }, description: "Types of events to subscribe to" }, filter_address: { type: "string", description: "Optional address filter" }, callback_webhook: { type: "string", description: "Optional webhook URL" }, }, required: ["event_types"], }, }, ], }; }); // Handle tool calls this.server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; try { let result; switch (name) { case "bzzz_announce": result = await this.protocolTools.handleAnnounce(args || {}); break; case "bzzz_lookup": result = await this.protocolTools.handleLookup(args || {}); break; case "bzzz_get": result = await this.protocolTools.handleGet(args || {}); break; case "bzzz_post": result = await this.protocolTools.handlePost(args || {}); break; case "bzzz_thread": result = await this.protocolTools.handleThread(args || {}); break; case "bzzz_subscribe": result = await this.protocolTools.handleSubscribe(args || {}); break; default: throw new Error(`Unknown tool: ${name}`); } return { content: [ { type: "text" as const, text: JSON.stringify(result, null, 2), }, ], }; } catch (error) { this.logger.error(`Tool execution failed for ${name}:`, error); return { content: [ { type: "text" as const, text: `Error: ${error instanceof Error ? error.message : String(error)}`, }, ], isError: true, }; } }); } private setupEventHandlers(): void { // Handle P2P events this.p2pConnector.on("message", (message) => { this.logger.debug("P2P message received:", message); this.conversationManager.handleIncomingMessage(message); }); // Handle conversation events this.conversationManager.on("escalation", (thread, reason) => { this.logger.warn(`Thread ${thread.id} escalated: ${reason}`); this.handleEscalation(thread, reason); }); // Handle cost warnings this.costTracker.on("warning", (usage) => { this.logger.warn("Cost warning:", usage); }); this.costTracker.on("limit_exceeded", (usage) => { this.logger.error("Cost limit exceeded:", usage); // Implement emergency shutdown or throttling }); } private async handleEscalation(thread: any, reason: string): Promise { // Implement human escalation logic this.logger.info(`Escalating thread ${thread.id} to human: ${reason}`); // Could integrate with: // - Slack notifications // - Email alerts // - WHOOSH orchestration system // - N8N workflows } public async start(): Promise { // Connect to BZZZ P2P network await this.p2pConnector.connect(); this.logger.info("Connected to BZZZ P2P network"); // Start conversation manager await this.conversationManager.start(); this.logger.info("Conversation manager started"); // Start agent manager await this.agentManager.start(); this.logger.info("Agent manager started"); // Start MCP server const transport = new StdioServerTransport(); await this.server.connect(transport); this.logger.info("BZZZ MCP Server started and listening"); } public async stop(): Promise { this.logger.info("Shutting down BZZZ MCP Server..."); await this.agentManager.stop(); await this.conversationManager.stop(); await this.p2pConnector.disconnect(); this.logger.info("BZZZ MCP Server stopped"); } } // Start server if run directly if (require.main === module) { const server = new BzzzMcpServer(); process.on("SIGINT", async () => { console.log("Received SIGINT, shutting down gracefully..."); await server.stop(); process.exit(0); }); process.on("SIGTERM", async () => { console.log("Received SIGTERM, shutting down gracefully..."); await server.stop(); process.exit(0); }); server.start().catch((error) => { console.error("Failed to start BZZZ MCP Server:", error); process.exit(1); }); } export { BzzzMcpServer };