10 Commits

Author SHA1 Message Date
anthonyrawlins
e94df4be6b fix(docs): Correct Mermaid syntax with user-provided fixes 2025-07-17 20:21:50 +10:00
anthonyrawlins
786e890808 fix(docs): Correct unterminated link in architecture diagram 2025-07-17 20:19:12 +10:00
Anthony Rawlins
baa26a2aab Update SYSTEM_ARCHITECTURE.md 2025-07-17 20:08:53 +10:00
anthonyrawlins
8934aae6c6 fix(docs): Overwrite diagrams to fix persistent syntax errors 2025-07-17 20:03:54 +10:00
anthonyrawlins
4960f5578f fix(docs): Remove superfluous 'end' from flowchart diagram 2025-07-17 19:44:51 +10:00
anthonyrawlins
4766b6dc19 fix(docs): Correct Mermaid syntax in architecture diagram 2025-07-17 19:42:09 +10:00
anthonyrawlins
3914eafad6 fix(docs): Correct Mermaid syntax in architecture diagram 2025-07-17 15:24:05 +10:00
anthonyrawlins
0eca6c781d docs: Add system architecture and task flow diagrams 2025-07-17 15:21:43 +10:00
anthonyrawlins
6993a7f945 refactor: Parameterize hardcoded values and resolve Integration duality 2025-07-17 15:12:04 +10:00
anthonyrawlins
d1d61c063b Fix critical issues breaking task execution cycle
- Fix branch name validation by hashing peer IDs using SHA256
- Fix Hive API claiming error by using correct 'task_number' parameter
- Improve console app display with 300% wider columns and adaptive width
- Add GitHub CLI integration to sandbox with token authentication
- Enhance system prompt with collaboration guidelines and help escalation
- Fix sandbox lifecycle to preserve work even if PR creation fails

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-14 22:06:50 +10:00
27 changed files with 586 additions and 3620 deletions

View File

@@ -1,192 +0,0 @@
# Project Bzzz: Decentralized Task Execution Network - Development Plan
## 1. Overview & Vision
This document outlines the development plan for **Project Bzzz**, a decentralized task execution network designed to enhance the existing **Hive Cluster**.
The vision is to evolve from a centrally coordinated system to a resilient, peer-to-peer (P2P) mesh of autonomous agents. This architecture eliminates single points of failure, improves scalability, and allows for dynamic, collaborative task resolution. Bzzz will complement the existing N8N orchestration layer, acting as a powerful, self-organizing execution fabric.
---
## 2. Core Architecture
The system is built on three key pillars: decentralized networking, GitHub-native task management, and verifiable, distributed logging.
| Component | Technology | Purpose |
| :--- | :--- | :--- |
| **Networking** | **libp2p** | For peer discovery (mDNS, DHT), identity, and secure P2P communication. |
| **Task Management** | **GitHub Issues** | The single source of truth for task definition, allocation, and tracking. |
| **Messaging** | **libp2p Pub/Sub** | For broadcasting capabilities and coordinating collaborative help requests. |
| **Logging** | **Hypercore Protocol** | For creating a tamper-proof, decentralized, and replicable logging system for debugging. |
---
## 3. Architectural Refinements & Key Features
Based on our analysis, the following refinements will be adopted:
### 3.1. Task Allocation via GitHub Assignment
To prevent race conditions and simplify logic, we will use GitHub's native issue assignment mechanism as an atomic lock. The `task_claim` pub/sub topic is no longer needed.
**Workflow:**
1. A `bzzz-agent` discovers a new, *unassigned* issue in the target repository.
2. The agent immediately attempts to **assign itself** to the issue via the GitHub API.
3. **Success:** If the assignment succeeds, the agent has exclusive ownership of the task and begins execution.
4. **Failure:** If the assignment fails (because another agent was faster), the agent logs the contention and looks for another task.
### 3.2. Collaborative Task Execution with Hop Limit
The `task_help_request` feature enables agents to collaborate on complex tasks. To prevent infinite request loops and network flooding, we will implement a **hop limit**.
- **Hop Limit:** A `task_help_request` will be discarded after being forwarded **3 times**.
- If a task cannot be completed after 3 help requests, it will be marked as "failed," and a comment will be added to the GitHub issue for manual review.
### 3.3. Decentralized Logging with Hypercore
To solve the challenge of debugging a distributed system, each agent will manage its own secure, append-only log stream using the Hypercore Protocol.
- **Log Creation:** Each agent generates a `hypercore` and broadcasts its public key via the `capabilities` message.
- **Log Replication:** Any other agent (or a dedicated monitoring node) can use this key to replicate the log stream in real-time or after the fact.
- **Benefits:** This creates a verifiable and resilient audit trail for every agent's actions, which is invaluable for debugging without relying on a centralized logging server.
---
## 4. Integration with the Hive Ecosystem
Bzzz is designed to integrate seamlessly with the existing cluster infrastructure.
### 4.1. Deployment Strategy: Docker + Host Networking (PREFERRED APPROACH)
Based on comprehensive analysis of the existing Hive infrastructure and Bzzz's P2P requirements, we will use a **hybrid deployment approach** that combines Docker containerization with host networking:
```yaml
# Docker Compose configuration for bzzz-agent
services:
bzzz-agent:
image: registry.home.deepblack.cloud/tony/bzzz-agent:latest
network_mode: "host" # Direct host network access for P2P
volumes:
- ./data:/app/data
- /var/run/docker.sock:/var/run/docker.sock # Docker API access
environment:
- NODE_ID=${HOSTNAME}
- GITHUB_TOKEN_FILE=/run/secrets/github-token
secrets:
- github-token
restart: unless-stopped
deploy:
placement:
constraints:
- node.role == worker # Deploy on all worker nodes
```
**Rationale for Docker + Host Networking:**
-**P2P Networking Advantages**: Direct access to host networking enables efficient mDNS discovery, NAT traversal, and lower latency communication
-**Infrastructure Consistency**: Maintains Docker Swarm deployment patterns and existing operational procedures
-**Resource Efficiency**: Eliminates Docker overlay network overhead for P2P communication
-**Best of Both Worlds**: Container portability and management with native network performance
### 4.2. Cluster Integration Points
- **Phased Rollout:** Deploy `bzzz-agent` containers across all cluster nodes (ACACIA, WALNUT, IRONWOOD, ROSEWOOD, FORSTEINET) using Docker Swarm
- **Network Architecture**: Leverages existing 192.168.1.0/24 LAN for P2P mesh communication
- **Resource Coordination**: Agents discover and utilize existing Ollama endpoints (port 11434) and CLI tools
- **Storage Integration**: Uses NFS shares (/rust/containers/) for shared configuration and Hypercore log storage
### 4.3. Integration with Existing Services
- **N8N as a Task Initiator:** High-level workflows in N8N will now terminate by creating a detailed GitHub Issue. This action triggers the Bzzz mesh, which handles the execution and reports back by creating a Pull Request.
- **Hive Coexistence**: Bzzz will run alongside existing Hive services on different ports, allowing gradual migration of workloads
- **The "Mesh Visualizer":** A dedicated monitoring dashboard will be created. It will:
1. Subscribe to the `capabilities` pub/sub topic to visualize the live network topology.
2. Replicate and display the Hypercore log streams from all active agents.
3. Integrate with existing Grafana dashboards for unified monitoring
---
## 5. Security Strategy
- **GitHub Token Management:** Agents will use short-lived, fine-grained Personal Access Tokens. These tokens will be stored securely in **HashiCorp Vault** or a similar secrets management tool, and retrieved by the agent at runtime.
- **Network Security:** All peer-to-peer communication is automatically **encrypted end-to-end** by `libp2p`.
---
## 6. Recommended Tech Stack
| Category | Recommendation | Notes |
| :--- | :--- | :--- |
| **Language** | **Go** or **Rust** | Strongly recommended for performance, concurrency, and system-level programming. |
| **Networking** | `go-libp2p` / `rust-libp2p` | The official and most mature implementations. |
| **Logging** | `hypercore-go` / `hypercore-rs` | Libraries for implementing the Hypercore Protocol. |
| **GitHub API** | `go-github` / `octokit.rs` | Official and community-maintained clients for interacting with GitHub. |
---
## 7. Development Milestones
This 8-week plan incorporates the refined architecture.
| Week | Deliverables | Key Features |
| :--- | :--- | :--- |
| **1** | **P2P Foundation & Logging** | Setup libp2p peer discovery and establish a **Hypercore log stream** for each agent. |
| **2** | **Capability Broadcasting** | Implement `capability_detector` and broadcast agent status via pub/sub. |
| **3** | **GitHub Task Claiming** | Ingest issues from GitHub and implement the **assignment-based task claiming** logic. |
| **4** | **Core Task Execution** | Integrate local CLIs (Ollama, etc.) to perform basic tasks based on issue content. |
| **5** | **GitHub Result Workflow** | Implement logic to create Pull Requests or follow-up issues upon task completion. |
| **6** | **Collaborative Help** | Implement the `task_help_request` and `task_help_response` flow with the **hop limit**. |
| **7** | **Monitoring & Visualization** | Build the first version of the **Mesh Visualizer** dashboard to display agent status and logs. |
| **8** | **Deployment & Testing** | Package the agent as a Docker container with host networking, write Docker Swarm deployment guide, and conduct end-to-end testing across cluster nodes. |
---
## 8. Potential Risks & Mitigation
- **Network Partitions ("Split-Brain"):**
- **Risk:** A network partition could lead to two separate meshes trying to work on the same task.
- **Mitigation:** Using GitHub's issue assignment as the atomic lock effectively solves this. The first agent to successfully claim the issue wins, regardless of network state.
- **Dependency on GitHub:**
- **Risk:** The system's ability to acquire new tasks is dependent on the availability of the GitHub API.
- **Mitigation:** This is an accepted trade-off for gaining a robust, native task management platform. Agents can be designed to continue working on already-claimed tasks during a GitHub outage.
- **Debugging Complexity:**
- **Risk:** Debugging distributed systems remains challenging.
- **Mitigation:** The Hypercore-based logging system provides a powerful and verifiable audit trail, which is a significant step towards mitigating this complexity. The Mesh Visualizer will also be a critical tool.
- **Docker Host Networking Security:**
- **Risk:** Host networking mode exposes containers directly to the host network, reducing isolation.
- **Mitigation:**
- Implement strict firewall rules on each node
- Use libp2p's built-in encryption for all P2P communication
- Run containers with restricted user privileges (non-root)
- Regular security audits of exposed ports and services
---
## 9. Migration Strategy from Hive
### 9.1. Gradual Transition Plan
1. **Phase 1: Parallel Deployment** (Weeks 1-2)
- Deploy Bzzz agents alongside existing Hive infrastructure
- Use different port ranges to avoid conflicts
- Monitor resource usage and network performance
2. **Phase 2: Simple Task Migration** (Weeks 3-4)
- Route basic code generation tasks through GitHub issues → Bzzz
- Keep complex multi-agent workflows in existing Hive + n8n
- Compare performance metrics between systems
3. **Phase 3: Workflow Integration** (Weeks 5-6)
- Modify n8n workflows to create GitHub issues as final step
- Implement Bzzz → Hive result reporting for hybrid workflows
- Test end-to-end task lifecycle
4. **Phase 4: Full Migration** (Weeks 7-8)
- Migrate majority of workloads to Bzzz mesh
- Retain Hive for monitoring and dashboard functionality
- Plan eventual deprecation of centralized coordinator
### 9.2. Compatibility Layer
- **API Bridge**: Maintain existing Hive API endpoints that proxy to Bzzz mesh
- **Data Migration**: Export task history and agent configurations from PostgreSQL
- **Monitoring Continuity**: Integrate Bzzz metrics into existing Grafana dashboards

View File

@@ -7,6 +7,15 @@ RUN apt-get update && apt-get install -y \
git \
curl \
tree \
wget \
&& rm -rf /var/lib/apt/lists/*
# Install GitHub CLI
RUN curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg | dd of=/usr/share/keyrings/githubcli-archive-keyring.gpg \
&& chmod go+r /usr/share/keyrings/githubcli-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" | tee /etc/apt/sources.list.d/github-cli.list > /dev/null \
&& apt-get update \
&& apt-get install -y gh \
&& rm -rf /var/lib/apt/lists/*
# Create a non-root user for the agent to run as

View File

@@ -1,232 +0,0 @@
# 🐝 Bzzz-Hive Integration TODOs
**Updated**: January 13, 2025
**Context**: Dynamic Multi-Repository Task Discovery via Hive API
---
## 🎯 **HIGH PRIORITY: Dynamic Repository Management**
### **1. Hive API Client Integration**
- [ ] **Create Hive API client**
```go
// pkg/hive/client.go
type HiveClient struct {
BaseURL string
APIKey string
HTTPClient *http.Client
}
func (c *HiveClient) GetActiveRepositories() ([]Repository, error)
func (c *HiveClient) GetProjectTasks(projectID int) ([]Task, error)
func (c *HiveClient) ClaimTask(projectID, taskID int, agentID string) error
func (c *HiveClient) UpdateTaskStatus(projectID, taskID int, status string) error
```
### **2. Configuration Management**
- [ ] **Remove hardcoded repository configuration**
- [ ] Remove repository settings from main.go
- [ ] Create Hive API endpoint configuration
- [ ] Add API authentication configuration
- [ ] Support multiple simultaneous repository polling
- [ ] **Environment/Config file support**
```go
type Config struct {
HiveAPI struct {
BaseURL string `yaml:"base_url"`
APIKey string `yaml:"api_key"`
} `yaml:"hive_api"`
Agent struct {
ID string `yaml:"id"`
Capabilities []string `yaml:"capabilities"`
PollInterval string `yaml:"poll_interval"`
} `yaml:"agent"`
}
```
### **3. Multi-Repository Task Coordination**
- [ ] **Enhance GitHub Integration service**
```go
// github/integration.go modifications
type Integration struct {
hiveClient *hive.HiveClient
repositories map[int]*RepositoryClient // projectID -> GitHub client
// ... existing fields
}
func (i *Integration) pollHiveForRepositories() error
func (i *Integration) syncRepositoryClients() error
func (i *Integration) aggregateTasksFromAllRepos() ([]*Task, error)
```
---
## 🔧 **MEDIUM PRIORITY: Enhanced Task Management**
### **4. Repository-Aware Task Processing**
- [ ] **Extend Task structure**
```go
type Task struct {
// Existing fields...
ProjectID int `json:"project_id"`
ProjectName string `json:"project_name"`
GitURL string `json:"git_url"`
Owner string `json:"owner"`
Repository string `json:"repository"`
Branch string `json:"branch"`
}
```
### **5. Intelligent Task Routing**
- [ ] **Project-aware task filtering**
- [ ] Filter tasks by agent capabilities per project
- [ ] Consider project-specific requirements
- [ ] Implement project priority weighting
- [ ] Add load balancing across projects
### **6. Cross-Repository Coordination**
- [ ] **Enhanced meta-discussion for multi-project coordination**
```go
type ProjectContext struct {
ProjectID int
GitURL string
TaskCount int
ActiveAgents []string
}
func (i *Integration) announceProjectStatus(ctx ProjectContext) error
func (i *Integration) coordinateAcrossProjects() error
```
---
## 🚀 **LOW PRIORITY: Advanced Features**
### **7. Project-Specific Configuration**
- [ ] **Per-project agent specialization**
- [ ] Different capabilities per project type
- [ ] Project-specific model preferences
- [ ] Custom escalation rules per project
- [ ] Project-aware conversation limits
### **8. Enhanced Monitoring & Metrics**
- [ ] **Multi-project performance tracking**
- [ ] Tasks completed per project
- [ ] Agent efficiency across projects
- [ ] Cross-project collaboration metrics
- [ ] Project-specific escalation rates
### **9. Advanced Task Coordination**
- [ ] **Cross-project dependencies**
- [ ] Detect related tasks across repositories
- [ ] Coordinate dependent task execution
- [ ] Share knowledge between project contexts
- [ ] Manage resource allocation across projects
---
## 📋 **IMPLEMENTATION PLAN**
### **Phase 1: Core Hive Integration (Week 1)**
1. **Day 1-2**: Create Hive API client and configuration management
2. **Day 3-4**: Modify GitHub integration to use dynamic repositories
3. **Day 5**: Test with single active project from Hive
4. **Day 6-7**: Multi-repository polling and task aggregation
### **Phase 2: Enhanced Coordination (Week 2)**
1. **Day 1-3**: Repository-aware task processing and routing
2. **Day 4-5**: Cross-repository meta-discussion enhancements
3. **Day 6-7**: Project-specific escalation and configuration
### **Phase 3: Advanced Features (Week 3)**
1. **Day 1-3**: Performance monitoring and metrics
2. **Day 4-5**: Cross-project dependency management
3. **Day 6-7**: Production testing and optimization
---
## 🔧 **CODE STRUCTURE CHANGES**
### **New Files to Create:**
```
pkg/
├── hive/
│ ├── client.go # Hive API client
│ ├── models.go # Hive data structures
│ └── config.go # Hive configuration
├── config/
│ ├── config.go # Configuration management
│ └── defaults.go # Default configuration
└── repository/
├── manager.go # Multi-repository management
├── router.go # Task routing logic
└── coordinator.go # Cross-repository coordination
```
### **Files to Modify:**
```
main.go # Remove hardcoded repo config
github/integration.go # Add Hive client integration
github/client.go # Support multiple repository configs
pubsub/pubsub.go # Enhanced project context messaging
```
---
## 📊 **TESTING STRATEGY**
### **Unit Tests**
- [ ] Hive API client functionality
- [ ] Multi-repository configuration loading
- [ ] Task aggregation and routing logic
- [ ] Project-aware filtering algorithms
### **Integration Tests**
- [ ] End-to-end Hive API communication
- [ ] Multi-repository GitHub integration
- [ ] Cross-project task coordination
- [ ] P2P coordination with project context
### **System Tests**
- [ ] Full workflow: Hive project activation → Bzzz task discovery → coordination
- [ ] Performance under multiple active projects
- [ ] Failure scenarios (Hive API down, GitHub rate limits)
- [ ] Escalation workflows across different projects
---
## ✅ **SUCCESS CRITERIA**
### **Phase 1 Complete When:**
- [ ] Bzzz agents query Hive API for active repositories
- [ ] Agents can discover tasks from multiple GitHub repositories
- [ ] Task claims are reported back to Hive system
- [ ] Configuration is fully dynamic (no hardcoded repositories)
### **Phase 2 Complete When:**
- [ ] Agents coordinate effectively across multiple projects
- [ ] Task routing considers project-specific requirements
- [ ] Meta-discussions include project context
- [ ] Performance metrics track multi-project activity
### **Full Integration Complete When:**
- [ ] System scales to 10+ active projects simultaneously
- [ ] Cross-project coordination is seamless
- [ ] Escalation workflows are project-aware
- [ ] Analytics provide comprehensive project insights
---
## 🔧 **IMMEDIATE NEXT STEPS**
1. **Create Hive API client** (`pkg/hive/client.go`)
2. **Implement configuration management** (`pkg/config/config.go`)
3. **Modify main.go** to use dynamic repository discovery
4. **Test with single Hive project** to validate integration
5. **Extend to multiple repositories** once basic flow works
---
**Next Action**: Implement Hive API client and remove hardcoded repository configuration from main.go.

View File

@@ -1,138 +0,0 @@
# Bzzz P2P Coordination System - Progress Report
## Overview
This report documents the implementation and testing progress of the Bzzz P2P mesh coordination system with meta-thinking capabilities (Antennae framework).
## Major Accomplishments
### 1. High-Priority Feature Implementation ✅
- **Fixed stub function implementations** in `github/integration.go`
- Implemented proper task filtering based on agent capabilities
- Added task announcement logic for P2P coordination
- Enhanced capability-based task matching with keyword analysis
- **Completed Hive API client integration**
- Extended PostgreSQL database schema for bzzz integration
- Updated ProjectService to use database instead of filesystem scanning
- Implemented secure Docker secrets for GitHub token access
- **Removed hardcoded repository configuration**
- Dynamic repository discovery via Hive API
- Database-driven project management
### 2. Security Enhancements ✅
- **Docker Secrets Implementation**
- Replaced filesystem-based GitHub token access with Docker secrets
- Updated docker-compose.swarm.yml with proper secrets configuration
- Enhanced security posture for credential management
### 3. Database Integration ✅
- **Extended Hive Database Schema**
- Added bzzz-specific fields to projects table
- Inserted Hive repository as test project with 9 bzzz-task labeled issues
- Successful GitHub API integration showing real issue discovery
### 4. Independent Testing Infrastructure ✅
- **Mock Hive API Server** (`mock-hive-server.py`)
- Provides fake projects and tasks for real bzzz coordination
- Comprehensive task simulation with realistic coordination scenarios
- Background task generation for dynamic testing
- Enhanced with work capture endpoints:
- `/api/bzzz/projects/<id>/submit-work` - Capture actual agent work/code
- `/api/bzzz/projects/<id>/create-pr` - Capture pull request content
- `/api/bzzz/projects/<id>/coordination-discussion` - Log coordination discussions
- `/api/bzzz/projects/<id>/log-prompt` - Log agent prompts and model usage
- **Real-Time Monitoring Dashboard** (`cmd/bzzz-monitor.py`)
- btop/nvtop-style console interface for coordination monitoring
- Real coordination channel metrics and message rate tracking
- Compact timestamp display and efficient space utilization
- Live agent activity and P2P network status monitoring
### 5. P2P Network Verification ✅
- **Confirmed Multi-Node Operation**
- WALNUT, ACACIA, IRONWOOD nodes running as systemd services
- 2 connected peers with regular availability broadcasts
- P2P mesh discovery and communication functioning correctly
### 6. Cross-Repository Coordination Framework ✅
- **Antennae Meta-Discussion System**
- Advanced cross-repository coordination capabilities
- Dependency detection and conflict resolution
- AI-powered coordination plan generation
- Consensus detection algorithms
## Current System Status
### Working Components
1. ✅ P2P mesh networking (libp2p + mDNS)
2. ✅ Agent availability broadcasting
3. ✅ Database-driven repository discovery
4. ✅ Secure credential management
5. ✅ Real-time monitoring infrastructure
6. ✅ Mock API testing framework
7. ✅ Work capture endpoints (ready for use)
### Identified Issues
1.**GitHub Repository Verification Failures**
- Mock repositories (e.g., `mock-org/hive`) return 404 errors
- Prevents agents from proceeding with task discovery
- Need local Git hosting solution
2.**Task Claim Logic Incomplete**
- Agents broadcast availability but don't actively claim tasks
- Missing integration between P2P discovery and task claiming
- Need to enhance bzzz binary task claim workflow
3.**Docker Overlay Network Issues**
- Some connectivity issues between services
- May impact agent coordination in containerized environments
## File Locations and Key Components
### Core Implementation Files
- `/home/tony/AI/projects/Bzzz/github/integration.go` - Enhanced task filtering and P2P coordination
- `/home/tony/AI/projects/hive/backend/app/services/project_service.py` - Database-driven project service
- `/home/tony/AI/projects/hive/docker-compose.swarm.yml` - Docker secrets configuration
### Testing and Monitoring
- `/home/tony/AI/projects/Bzzz/mock-hive-server.py` - Mock API with work capture
- `/home/tony/AI/projects/Bzzz/cmd/bzzz-monitor.py` - Real-time coordination dashboard
- `/home/tony/AI/projects/Bzzz/scripts/trigger_mock_coordination.sh` - Coordination test script
### Configuration
- `/etc/systemd/system/bzzz.service.d/mock-api.conf` - Systemd override for mock API testing
- `/tmp/bzzz_agent_work/` - Directory for captured agent work (when functioning)
- `/tmp/bzzz_pull_requests/` - Directory for captured pull requests
- `/tmp/bzzz_agent_prompts/` - Directory for captured agent prompts and model usage
## Technical Achievements
### Database Schema Extensions
```sql
-- Extended projects table with bzzz integration fields
ALTER TABLE projects ADD COLUMN bzzz_enabled BOOLEAN DEFAULT false;
ALTER TABLE projects ADD COLUMN ready_to_claim BOOLEAN DEFAULT false;
ALTER TABLE projects ADD COLUMN private_repo BOOLEAN DEFAULT false;
ALTER TABLE projects ADD COLUMN github_token_required BOOLEAN DEFAULT false;
```
### Docker Secrets Integration
```yaml
secrets:
- github_token
environment:
- GITHUB_TOKEN_FILE=/run/secrets/github_token
```
### P2P Network Statistics
- **Active Nodes**: 3 (WALNUT, ACACIA, IRONWOOD)
- **Connected Peers**: 2 per node
- **Network Protocol**: libp2p with mDNS discovery
- **Message Broadcasting**: Availability, capability, coordination
## Next Steps Required
See PROJECT_TODOS.md for comprehensive task list.
## Summary
The Bzzz P2P coordination system has a solid foundation with working P2P networking, database integration, secure credential management, and comprehensive testing infrastructure. The main blockers are the need for a local Git hosting solution and completion of the task claim logic in the bzzz binary.

View File

@@ -1,224 +0,0 @@
🐝 Project: Bzzz — P2P Task Coordination System
## 🔧 Architecture Overview (libp2p + pubsub + JSON)
This system will compliment and partially replace elements of the Hive Software System. This is intended to be a replacement for the multitude of MCP, and API calls to the ollama and gemini-cli agents over port 11434 etc. By replacing the master/slave paradigm with a mesh network we allow each node to trigger workflows or respond to calls for work as availability dictates rather than being stuck in endless timeouts awaiting responses. We also eliminate the central coordinator as a single point of failure.
### 📂 Components
#### 1. **Peer Node**
Each machine runs a P2P agent that:
- Connects to other peers via libp2p
- Subscribes to pubsub topics
- Periodically broadcasts status/capabilities
- Receives and executes tasks
- Publishes task results as GitHub pull requests or issues
- Can request assistance from other peers
- Monitors a GitHub repository for new issues (task source)
Each node uses a dedicated GitHub account with:
- A personal access token (fine-scoped to repo/PRs)
- A configured `.gitconfig` for commit identity
#### 2. **libp2p Network**
- All peers discover each other using mDNS, Bootstrap peers, or DHT
- Peer identity is cryptographic (libp2p peer ID)
- Communication is encrypted end-to-end
#### 3. **GitHub Integration**
- Tasks are sourced from GitHub Issues in a designated repository
- Nodes will claim and respond to tasks by:
- Forking the repository (once)
- Creating a working branch
- Making changes to files as instructed by task input
- Committing changes using their GitHub identity
- Creating a pull request or additional GitHub issues
- Publishing final result as a PR, issue(s), or failure report
#### 4. **PubSub Topics**
| Topic | Direction | Purpose |
|------------------|------------------|---------------------------------------------|
| `capabilities` | Peer → All Peers | Broadcast available models, status |
| `task_broadcast` | Peer → All Peers | Publish a GitHub issue as task |
| `task_claim` | Peer → All Peers | Claim responsibility for a task |
| `task_result` | Peer → All Peers | Share PR, issue, or failure result |
| `presence_ping` | Peer → All Peers | Lightweight presence signal |
| `task_help_request` | Peer → All Peers | Request assistance for a task |
| `task_help_response`| Peer → All Peers | Offer help or handle sub-task |
### 📊 Data Flow Diagram
```
+------------------+ libp2p +------------------+
| Peer A |<------------------->| Peer B |
| |<------------------->| |
| - Publishes: | | - Publishes: |
| capabilities | | task_result |
| task_broadcast | | capabilities |
| help_request | | help_response |
| - Subscribes to: | | - Subscribes to: |
| task_result | | task_broadcast |
| help_request | | help_request |
+------------------+ +------------------+
^ ^
| |
| |
+----------------------+-----------------+
|
v
+------------------+
| Peer C |
+------------------+
```
### 📂 Sample JSON Messages
#### `capabilities`
```json
{
"type": "capabilities",
"node_id": "pi-node-1",
"cpu": 43.5,
"gpu": 2.3,
"models": ["llama3", "mistral"],
"installed": ["ollama", "gemini-cli"],
"status": "idle",
"timestamp": "2025-07-12T01:23:45Z"
}
```
#### `task_broadcast`
```json
{
"type": "task",
"task_id": "#42",
"repo": "example-org/task-repo",
"issue_url": "https://github.com/example-org/task-repo/issues/42",
"model": "ollama",
"input": "Add unit tests to utils module",
"params": {"branch_prefix": "task-42-"},
"timestamp": "2025-07-12T02:00:00Z"
}
```
#### `task_claim`
```json
{
"type": "task_claim",
"task_id": "#42",
"node_id": "pi-node-2",
"timestamp": "2025-07-12T02:00:03Z"
}
```
#### `task_result`
```json
{
"type": "task_result",
"task_id": "#42",
"node_id": "pi-node-2",
"result_type": "pull_request",
"result_url": "https://github.com/example-org/task-repo/pull/97",
"duration_ms": 15830,
"timestamp": "2025-07-12T02:10:05Z"
}
```
#### `task_help_request`
```json
{
"type": "task_help_request",
"task_id": "#42",
"from_node": "pi-node-2",
"reason": "Long-running task or missing capability",
"requested_capability": "claude-cli",
"timestamp": "2025-07-12T02:05:00Z"
}
```
#### `task_help_response`
```json
{
"type": "task_help_response",
"task_id": "#42",
"from_node": "pi-node-3",
"can_help": true,
"capabilities": ["claude-cli"],
"eta_seconds": 30,
"timestamp": "2025-07-12T02:05:02Z"
}
```
---
## 🚀 Development Brief
### 🧱 Tech Stack
- **Language**: Node.js (or Go/Rust)
- **Networking**: libp2p
- **Messaging**: pubsub with JSON
- **Task Execution**: Local CLI (ollama, gemini, claude)
- **System Monitoring**: `os-utils`, `psutil`, `nvidia-smi`
- **Runtime**: systemd services on Linux
- **GitHub Interaction**: `octokit` (Node), Git CLI
### 🛠 Key Modules
#### 1. `peer_agent.js`
- Initializes libp2p node
- Joins pubsub topics
- Periodically publishes capabilities
- Listens for tasks, runs them, and reports PR/results
- Handles help requests and responses
#### 2. `capability_detector.js`
- Detects:
- CPU/GPU load
- Installed models (via `ollama list`)
- Installed CLIs (`which gemini`, `which claude`)
#### 3. `task_executor.js`
- Parses GitHub issue input
- Forks repo (if needed)
- Creates working branch, applies changes
- Commits changes using local Git identity
- Pushes branch and creates pull request or follow-up issues
#### 4. `github_bot.js`
- Authenticates GitHub API client
- Watches for new issues in repo
- Publishes them as `task_broadcast`
- Handles PR/issue creation and error handling
#### 5. `state_manager.js`
- Keeps internal view of network state
- Tracks peers capabilities, liveness
- Matches help requests to eligible peers
### 📆 Milestones
| Week | Deliverables |
| ---- | ------------------------------------------------------------ |
| 1 | libp2p peer bootstrapping + pubsub skeleton |
| 2 | JSON messaging spec + capability broadcasting |
| 3 | GitHub issue ingestion + task broadcast |
| 4 | CLI integration with Ollama/Gemini/Claude |
| 5 | GitHub PR/issue/failure workflows |
| 6 | Help request/response logic, delegation framework |
| 7 | systemd setup, CLI utilities, and resilience |
| 8 | End-to-end testing, GitHub org coordination, deployment guide|
---
Would you like a prototype `task_help_request` matchmaking function or sample test matrix for capability validation?

View File

@@ -1,165 +0,0 @@
# Bzzz Antennae Monitoring Dashboard
A real-time console monitoring dashboard for the Bzzz P2P coordination system, similar to btop/nvtop for system monitoring.
## Features
🔍 **Real-time P2P Status**
- Connected peer count with history graph
- Node ID and network status
- Hive API connectivity status
🤖 **Agent Activity Monitoring**
- Live agent availability updates
- Agent status distribution (ready/working/busy)
- Recent activity tracking
🎯 **Coordination Activity**
- Task announcements and completions
- Coordination session tracking
- Message flow statistics
📊 **Visual Elements**
- ASCII graphs for historical data
- Color-coded status indicators
- Live activity log with timestamps
## Usage
### Basic Usage
```bash
# Run with default 1-second refresh rate
python3 cmd/bzzz-monitor.py
# Custom refresh rate (2 seconds)
python3 cmd/bzzz-monitor.py --refresh-rate 2.0
# Disable colors for logging/screenshots
python3 cmd/bzzz-monitor.py --no-color
```
### Installation as System Command
```bash
# Copy to system bin
sudo cp cmd/bzzz-monitor.py /usr/local/bin/bzzz-monitor
sudo chmod +x /usr/local/bin/bzzz-monitor
# Now run from anywhere
bzzz-monitor
```
## Dashboard Layout
```
┌─ Bzzz P2P Coordination Monitor ─┐
│ Uptime: 0:02:15 │ Node: 12*SEE3To... │
└───────────────────────────────────┘
P2P Network Status
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Connected Peers: 2
Hive API Status: Offline (Overlay Network Issues)
Peer History (last 20 samples):
███▇▆▆▇████▇▆▇███▇▆▇ (1-3 peers)
Agent Activity
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Recent Updates (1m): 8
Ready: 6
Working: 2
Coordination Activity
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Total Messages: 45
Total Tasks: 12
Active Sessions: 1
Recent Tasks (5m): 8
Recent Activity
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
11:10:35 [AVAIL] Agent acacia-node... status: ready
11:10:33 [TASK] Task announcement: hive#15 - WebSocket support
11:10:30 [COORD] Meta-coordination session started
11:10:28 [AVAIL] Agent ironwood-node... status: working
11:10:25 [ERROR] Failed to get active repositories: API 404
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Press Ctrl+C to exit | Refresh rate: 1.0s
```
## Monitoring Data Sources
The dashboard pulls data from:
1. **Systemd Service Logs**: `journalctl -u bzzz.service`
2. **P2P Network Status**: Extracted from bzzz log messages
3. **Agent Availability**: Parsed from availability_broadcast messages
4. **Task Activity**: Detected from task/repository-related log entries
5. **Error Tracking**: Monitors for failures and connection issues
## Color Coding
- 🟢 **Green**: Good status, active connections, ready agents
- 🟡 **Yellow**: Working status, moderate activity
- 🔴 **Red**: Errors, failed connections, busy agents
- 🔵 **Blue**: Information, neutral data
- 🟣 **Magenta**: Coordination-specific activity
- 🔷 **Cyan**: Network and P2P data
## Real-time Updates
The dashboard updates every 1-2 seconds by default and tracks:
- **P2P Connections**: Shows immediate peer join/leave events
- **Agent Status**: Real-time availability broadcasts from all nodes
- **Task Flow**: Live task announcements and coordination activity
- **System Health**: Continuous monitoring of service status and errors
## Performance
- **Low Resource Usage**: Python-based with minimal CPU/memory impact
- **Efficient Parsing**: Only processes recent logs (last 30-50 lines)
- **Responsive UI**: Fast refresh rates without overwhelming the terminal
- **Historical Data**: Maintains rolling buffers for trend analysis
## Troubleshooting
### No Data Appearing
```bash
# Check if bzzz service is running
systemctl status bzzz.service
# Verify log access permissions
journalctl -u bzzz.service --since "1 minute ago"
```
### High CPU Usage
```bash
# Reduce refresh rate
bzzz-monitor --refresh-rate 5.0
```
### Color Issues
```bash
# Disable colors
bzzz-monitor --no-color
# Check terminal color support
echo $TERM
```
## Integration
The monitor works alongside:
- **Live Bzzz System**: Monitors real P2P mesh (WALNUT/ACACIA/IRONWOOD)
- **Test Suite**: Can monitor test coordination scenarios
- **Development**: Perfect for debugging antennae coordination logic
## Future Enhancements
- 📈 Export metrics to CSV/JSON
- 🔔 Alert system for critical events
- 📊 Web-based dashboard version
- 🎯 Coordination session drill-down
- 📱 Mobile-friendly output

View File

@@ -1,112 +0,0 @@
# Bzzz + Antennae Development Task Backlog
Based on the UNIFIED_DEVELOPMENT_PLAN.md, here are the development tasks ready for distribution to the Hive cluster:
## Week 1-2: Foundation Tasks
### Task 1: P2P Networking Foundation 🔧
**Assigned to**: WALNUT (Advanced Coding - starcoder2:15b)
**Priority**: 5 (Critical)
**Objective**: Design and implement core P2P networking foundation for Project Bzzz using libp2p in Go
**Requirements**:
- Use go-libp2p library for mesh networking
- Implement mDNS peer discovery for local network (192.168.1.0/24)
- Create secure encrypted P2P connections with peer identity
- Design pub/sub topics for both task coordination (Bzzz) and meta-discussion (Antennae)
- Prepare for Docker + host networking deployment
- Create modular Go code structure in `/home/tony/AI/projects/Bzzz/`
**Deliverables**:
- `main.go` - Entry point and peer initialization
- `p2p/` - P2P networking module with libp2p integration
- `discovery/` - mDNS peer discovery implementation
- `pubsub/` - Pub/sub messaging for capability broadcasting
- `go.mod` - Go module definition with dependencies
- `Dockerfile` - Container with host networking support
### Task 2: Distributed Logging System 📊
**Assigned to**: IRONWOOD (Reasoning Analysis - phi4:14b)
**Priority**: 4 (High)
**Dependencies**: Task 1 (P2P Foundation)
**Objective**: Architect and implement Hypercore-based distributed logging system
**Requirements**:
- Design append-only log streams using Hypercore Protocol
- Implement public key broadcasting for log identity
- Create log replication capabilities between peers
- Store both execution logs (Bzzz) and discussion transcripts (Antennae)
- Ensure tamper-proof audit trails for debugging
- Integrate with P2P capability detection module
**Deliverables**:
- `logging/` - Hypercore-based logging module
- `replication/` - Log replication and synchronization
- `audit/` - Tamper-proof audit trail verification
- Documentation on log schema and replication protocol
### Task 3: GitHub Integration Module 📋
**Assigned to**: ACACIA (Code Review/Docs - codellama)
**Priority**: 4 (High)
**Dependencies**: Task 1 (P2P Foundation)
**Objective**: Implement GitHub integration for atomic task claiming and collaborative workflows
**Requirements**:
- Create atomic issue assignment mechanism (GitHub's native assignment)
- Implement repository forking, branch creation, and commit workflows
- Generate pull requests with discussion transcript links
- Handle task result posting and failure reporting
- Use GitHub API for all interactions
- Include comprehensive error handling and retry logic
**Deliverables**:
- `github/` - GitHub API integration module
- `workflows/` - Repository and branch management
- `tasks/` - Task claiming and result posting
- Integration tests with GitHub API
- Documentation on GitHub workflow process
## Week 3-4: Integration Tasks
### Task 4: Meta-Discussion Implementation 💬
**Assigned to**: IRONWOOD (Reasoning Analysis)
**Priority**: 3 (Medium)
**Dependencies**: Task 1, Task 2
**Objective**: Implement Antennae meta-discussion layer for collaborative reasoning
**Requirements**:
- Create structured messaging for agent collaboration
- Implement "propose plan" and "objection period" logic
- Add hop limits (3 hops) and participant caps for safety
- Design escalation paths to human intervention
- Integrate with Hypercore logging for discussion transcripts
### Task 5: End-to-End Integration 🔄
**Assigned to**: WALNUT (Advanced Coding)
**Priority**: 2 (Normal)
**Dependencies**: All previous tasks
**Objective**: Integrate all components and create working Bzzz+Antennae system
**Requirements**:
- Combine P2P networking, logging, and GitHub integration
- Implement full task lifecycle with meta-discussion
- Create Docker Swarm deployment configuration
- Add monitoring and health checks
- Comprehensive testing across cluster nodes
## Current Status
**Hive Cluster Ready**: 3 agents registered with proper specializations
- walnut: starcoder2:15b (kernel_dev)
- ironwood: phi4:14b (reasoning)
- acacia: codellama (docs_writer)
**Authentication Working**: Dev user and API access configured
⚠️ **Task Submission**: Need to resolve API endpoint issues for automated task distribution
**Next Steps**:
1. Fix task creation API endpoint issues
2. Submit tasks to respective agents based on specializations
3. Monitor execution and coordinate between agents
4. Test the collaborative reasoning (Antennae) layer once P2P foundation is complete

BIN
bzzz-hive

Binary file not shown.

View File

@@ -1,523 +0,0 @@
#!/usr/bin/env python3
"""
Bzzz Antennae Real-time Monitoring Dashboard
Similar to btop/nvtop for system monitoring, but for P2P coordination activity
Usage: python3 bzzz-monitor.py [--refresh-rate 1.0]
"""
import argparse
import json
import os
import subprocess
import sys
import time
from collections import defaultdict, deque
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
# Color codes for terminal output
class Colors:
RESET = '\033[0m'
BOLD = '\033[1m'
DIM = '\033[2m'
# Standard colors
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
# Bright colors
BRIGHT_RED = '\033[91m'
BRIGHT_GREEN = '\033[92m'
BRIGHT_YELLOW = '\033[93m'
BRIGHT_BLUE = '\033[94m'
BRIGHT_MAGENTA = '\033[95m'
BRIGHT_CYAN = '\033[96m'
BRIGHT_WHITE = '\033[97m'
# Background colors
BG_RED = '\033[41m'
BG_GREEN = '\033[42m'
BG_YELLOW = '\033[43m'
BG_BLUE = '\033[44m'
BG_MAGENTA = '\033[45m'
BG_CYAN = '\033[46m'
class BzzzMonitor:
def __init__(self, refresh_rate: float = 1.0):
self.refresh_rate = refresh_rate
self.start_time = datetime.now()
# Data storage
self.p2p_history = deque(maxlen=60) # Last 60 data points
self.availability_history = deque(maxlen=100)
self.task_history = deque(maxlen=50)
self.error_history = deque(maxlen=20)
self.coordination_sessions = {}
self.agent_stats = defaultdict(lambda: {'messages': 0, 'tasks': 0, 'last_seen': None})
# Current stats
self.current_peers = 0
self.current_node_id = "Unknown"
self.total_messages = 0
self.total_tasks = 0
self.total_errors = 0
self.api_status = "Unknown"
# Real coordination channel counters
self.channel_stats = {
'bzzz_coordination': {'messages': 0, 'rate': 0, 'last_count': 0},
'antennae_meta': {'messages': 0, 'rate': 0, 'last_count': 0},
'availability_broadcasts': {'messages': 0, 'rate': 0, 'last_count': 0},
'capability_broadcasts': {'messages': 0, 'rate': 0, 'last_count': 0},
'task_announcements': {'messages': 0, 'rate': 0, 'last_count': 0},
'coordination_sessions': {'messages': 0, 'rate': 0, 'last_count': 0}
}
self.last_rate_update = datetime.now()
# Terminal size
self.update_terminal_size()
def update_terminal_size(self):
"""Get current terminal size"""
try:
result = subprocess.run(['stty', 'size'], capture_output=True, text=True)
lines, cols = map(int, result.stdout.strip().split())
self.terminal_height = lines
self.terminal_width = cols
except:
self.terminal_height = 24
self.terminal_width = 80
def clear_screen(self):
"""Clear the terminal screen"""
print('\033[2J\033[H', end='')
def get_bzzz_status(self) -> Dict:
"""Get current bzzz service status"""
try:
# Get systemd status
result = subprocess.run(['systemctl', 'is-active', 'bzzz.service'],
capture_output=True, text=True)
service_status = result.stdout.strip()
# Get recent logs for analysis
result = subprocess.run([
'journalctl', '-u', 'bzzz.service', '--since', '30 seconds ago', '-n', '50'
], capture_output=True, text=True)
recent_logs = result.stdout
return {
'service_status': service_status,
'logs': recent_logs,
'timestamp': datetime.now()
}
except Exception as e:
return {
'service_status': 'error',
'logs': f"Error getting status: {e}",
'timestamp': datetime.now()
}
def parse_logs(self, logs: str):
"""Parse bzzz logs and extract coordination data"""
lines = logs.split('\n')
for line in lines:
timestamp = datetime.now()
# Extract node ID
if 'Node Status - ID:' in line:
try:
node_part = line.split('Node Status - ID: ')[1].split(',')[0]
self.current_node_id = node_part.strip()
except:
pass
# Extract peer count
if 'Connected Peers:' in line:
try:
peer_count = int(line.split('Connected Peers: ')[1].split()[0])
self.current_peers = peer_count
self.p2p_history.append({
'timestamp': timestamp,
'peers': peer_count
})
except:
pass
# Track availability broadcasts (agent activity)
if 'availability_broadcast' in line:
self.channel_stats['availability_broadcasts']['messages'] += 1
try:
# Extract agent info from availability broadcast
if 'node_id:' in line and 'status:' in line:
agent_id = "unknown"
status = "unknown"
# Parse the log line for agent details
parts = line.split('node_id:')
if len(parts) > 1:
agent_part = parts[1].split()[0].strip('<>')
agent_id = agent_part
if 'status:' in line:
status_part = line.split('status:')[1].split()[0]
status = status_part
self.availability_history.append({
'timestamp': timestamp,
'agent_id': agent_id,
'status': status
})
self.agent_stats[agent_id]['last_seen'] = timestamp
self.agent_stats[agent_id]['messages'] += 1
except:
pass
# Track capability broadcasts
if 'capability_broadcast' in line or 'Capabilities changed' in line:
self.channel_stats['capability_broadcasts']['messages'] += 1
# Track coordination messages
if any(coord_keyword in line.lower() for coord_keyword in ['coordination', 'meta-discussion', 'antennae']):
if 'antennae' in line.lower() or 'meta-discussion' in line.lower():
self.channel_stats['antennae_meta']['messages'] += 1
else:
self.channel_stats['bzzz_coordination']['messages'] += 1
# Track task activity
if any(keyword in line.lower() for keyword in ['task', 'repository', 'github']):
self.task_history.append({
'timestamp': timestamp,
'activity': line.strip()
})
self.total_tasks += 1
# Track specific task announcements
if any(announce_keyword in line.lower() for announce_keyword in ['task announcement', 'announcing task', 'new task']):
self.channel_stats['task_announcements']['messages'] += 1
# Track coordination sessions
if any(session_keyword in line.lower() for session_keyword in ['session start', 'coordination session', 'meta coordination']):
self.channel_stats['coordination_sessions']['messages'] += 1
# Track errors
if any(keyword in line.lower() for keyword in ['error', 'failed', 'cannot']):
self.error_history.append({
'timestamp': timestamp,
'error': line.strip()
})
self.total_errors += 1
# Check API status
if 'Failed to get active repositories' in line:
self.api_status = "Offline (Overlay Network Issues)"
elif 'API request failed' in line:
self.api_status = "Error"
# Track coordination activity (when antennae system is active)
if any(keyword in line.lower() for keyword in ['coordination', 'antennae', 'meta']):
# This would track actual coordination sessions
pass
def draw_header(self):
"""Draw the header section"""
uptime = datetime.now() - self.start_time
uptime_str = str(uptime).split('.')[0] # Remove microseconds
header = f"{Colors.BOLD}{Colors.BRIGHT_CYAN}┌─ Bzzz P2P Coordination Monitor ─┐{Colors.RESET}"
status_line = f"{Colors.CYAN}{Colors.RESET} Uptime: {uptime_str} {Colors.CYAN}{Colors.RESET} Node: {self.current_node_id[:12]}... {Colors.CYAN}{Colors.RESET}"
separator = f"{Colors.CYAN}└───────────────────────────────────┘{Colors.RESET}"
print(header)
print(status_line)
print(separator)
print()
def draw_p2p_status(self):
"""Draw P2P network status section"""
print(f"{Colors.BOLD}{Colors.BRIGHT_GREEN}P2P Network Status{Colors.RESET}")
print("" * 30)
# Current peers
peer_color = Colors.BRIGHT_GREEN if self.current_peers > 0 else Colors.BRIGHT_RED
print(f"Connected Peers: {peer_color}{self.current_peers}{Colors.RESET}")
# API Status
api_color = Colors.BRIGHT_RED if "Error" in self.api_status or "Offline" in self.api_status else Colors.BRIGHT_GREEN
print(f"Hive API Status: {api_color}{self.api_status}{Colors.RESET}")
# Peer connection history (mini graph)
if len(self.p2p_history) > 1:
print(f"\nPeer History (last {len(self.p2p_history)} samples):")
self.draw_mini_graph([p['peers'] for p in self.p2p_history], "peers")
print()
def draw_agent_activity(self):
"""Draw agent activity section"""
print(f"{Colors.BOLD}{Colors.BRIGHT_YELLOW}Agent Activity{Colors.RESET}")
print("" * 30)
if not self.availability_history:
print(f"{Colors.DIM}No recent agent activity{Colors.RESET}")
else:
# Recent availability updates
recent_count = len([a for a in self.availability_history if
datetime.now() - a['timestamp'] < timedelta(minutes=1)])
print(f"Recent Updates (1m): {Colors.BRIGHT_YELLOW}{recent_count}{Colors.RESET}")
# Agent status summary
agent_counts = defaultdict(int)
for activity in list(self.availability_history)[-10:]: # Last 10 activities
if activity['status'] in ['ready', 'working', 'busy']:
agent_counts[activity['status']] += 1
for status, count in agent_counts.items():
status_color = {
'ready': Colors.BRIGHT_GREEN,
'working': Colors.BRIGHT_YELLOW,
'busy': Colors.BRIGHT_RED
}.get(status, Colors.WHITE)
print(f" {status.title()}: {status_color}{count}{Colors.RESET}")
print()
def update_message_rates(self):
"""Calculate message rates for each channel"""
now = datetime.now()
time_diff = (now - self.last_rate_update).total_seconds()
if time_diff >= 10: # Update rates every 10 seconds
for channel, stats in self.channel_stats.items():
msg_diff = stats['messages'] - stats['last_count']
stats['rate'] = msg_diff / time_diff if time_diff > 0 else 0
stats['last_count'] = stats['messages']
self.last_rate_update = now
def draw_coordination_channels(self):
"""Draw real coordination channel statistics"""
print(f"{Colors.BOLD}{Colors.BRIGHT_MAGENTA}Coordination Channels{Colors.RESET}")
print("" * 50)
self.update_message_rates()
# Display channel stats in compact format
channels = [
('Availability', 'availability_broadcasts', Colors.GREEN),
('Capabilities', 'capability_broadcasts', Colors.BLUE),
('Bzzz Coord', 'bzzz_coordination', Colors.YELLOW),
('Antennae', 'antennae_meta', Colors.MAGENTA),
('Task Ann.', 'task_announcements', Colors.CYAN),
('Sessions', 'coordination_sessions', Colors.WHITE)
]
for name, key, color in channels:
stats = self.channel_stats[key]
rate_str = f"{stats['rate']:.1f}/s" if stats['rate'] > 0 else "0/s"
print(f"{name.ljust(11)}: {color}{str(stats['messages']).rjust(4)}{Colors.RESET} msgs {Colors.DIM}({rate_str}){Colors.RESET}")
print()
def draw_coordination_status(self):
"""Draw coordination activity section"""
print(f"{Colors.BOLD}{Colors.BRIGHT_CYAN}System Status{Colors.RESET}")
print("" * 30)
# Total coordination stats
total_coord_msgs = (self.channel_stats['bzzz_coordination']['messages'] +
self.channel_stats['antennae_meta']['messages'])
print(f"Coordination Messages: {Colors.BRIGHT_CYAN}{total_coord_msgs}{Colors.RESET}")
print(f"Active Sessions: {Colors.BRIGHT_GREEN}{len(self.coordination_sessions)}{Colors.RESET}")
# Recent task activity
if self.task_history:
recent_tasks = len([t for t in self.task_history if
datetime.now() - t['timestamp'] < timedelta(minutes=5)])
print(f"Recent Tasks (5m): {Colors.BRIGHT_YELLOW}{recent_tasks}{Colors.RESET}")
print()
def draw_recent_activity(self):
"""Draw recent activity log with compact timestamps"""
print(f"{Colors.BOLD}{Colors.BRIGHT_WHITE}Recent Activity{Colors.RESET}")
print("" * 50)
# Combine and sort recent activities
all_activities = []
# Add availability updates
for activity in list(self.availability_history)[-5:]:
all_activities.append({
'time': activity['timestamp'],
'type': 'AVAIL',
'message': f"Agent {activity['agent_id'][-6:]} status: {activity['status']}",
'color': Colors.GREEN
})
# Add task activities
for activity in list(self.task_history)[-3:]:
# Extract meaningful info from log line
msg = activity['activity']
if 'availability_broadcast' in msg:
continue # Skip these, we show them in AVAIL
elif 'Connected Peers:' in msg:
peers = msg.split('Connected Peers: ')[1].split()[0] if 'Connected Peers: ' in msg else "?"
msg = f"P2P: {peers} peers connected"
elif 'repository' in msg.lower():
msg = "Repository sync activity"
elif 'Failed' in msg:
continue # These go to errors
else:
msg = msg.split(': ', 1)[-1] if ': ' in msg else msg
msg = msg[:60] + "..." if len(msg) > 60 else msg
all_activities.append({
'time': activity['timestamp'],
'type': 'TASK',
'message': msg,
'color': Colors.YELLOW
})
# Add errors
for error in list(self.error_history)[-3:]:
err_msg = error['error']
if 'Failed to get active repositories' in err_msg:
err_msg = "Hive API request failed"
elif 'Failed to create GitHub client' in err_msg:
err_msg = "GitHub verification failed (expected)"
else:
err_msg = err_msg.split(': ', 1)[-1] if ': ' in err_msg else err_msg
err_msg = err_msg[:60] + "..." if len(err_msg) > 60 else err_msg
all_activities.append({
'time': error['timestamp'],
'type': 'ERROR',
'message': err_msg,
'color': Colors.RED
})
# Sort by time and show most recent
all_activities.sort(key=lambda x: x['time'], reverse=True)
for activity in all_activities[:8]: # Show last 8 activities
# Use relative time for more space
now = datetime.now()
diff = now - activity['time']
if diff.total_seconds() < 60:
time_str = f"{int(diff.total_seconds())}s"
elif diff.total_seconds() < 3600:
time_str = f"{int(diff.total_seconds()//60)}m"
else:
time_str = f"{int(diff.total_seconds()//3600)}h"
type_str = f"[{activity['type']}]".ljust(7)
print(f"{Colors.DIM}{time_str.rjust(3)}{Colors.RESET} {activity['color']}{type_str}{Colors.RESET} {activity['message']}")
print()
def draw_mini_graph(self, data: List[int], label: str):
"""Draw a simple ASCII graph"""
if not data or len(data) < 2:
return
max_val = max(data) if data else 1
min_val = min(data) if data else 0
range_val = max_val - min_val if max_val != min_val else 1
# Normalize to 0-10 scale for display
normalized = [int(((val - min_val) / range_val) * 10) for val in data]
# Draw graph
graph_chars = ['', '', '', '', '', '', '', '']
graph_line = ""
for val in normalized:
if val == 0:
graph_line += ""
elif val >= len(graph_chars):
graph_line += ""
else:
graph_line += graph_chars[val]
print(f"{Colors.CYAN}{graph_line}{Colors.RESET} ({min_val}-{max_val} {label})")
def draw_footer(self):
"""Draw footer with controls"""
print("" * 50)
print(f"{Colors.DIM}Press Ctrl+C to exit | Refresh rate: {self.refresh_rate}s{Colors.RESET}")
def run(self):
"""Main monitoring loop"""
try:
while True:
self.clear_screen()
self.update_terminal_size()
# Get fresh data
status = self.get_bzzz_status()
if status['logs']:
self.parse_logs(status['logs'])
# Draw dashboard
self.draw_header()
self.draw_p2p_status()
self.draw_coordination_channels()
self.draw_agent_activity()
self.draw_coordination_status()
self.draw_recent_activity()
self.draw_footer()
# Wait for next refresh
time.sleep(self.refresh_rate)
except KeyboardInterrupt:
print(f"\n{Colors.BRIGHT_CYAN}🛑 Bzzz Monitor stopped{Colors.RESET}")
sys.exit(0)
except Exception as e:
print(f"\n{Colors.BRIGHT_RED}❌ Error: {e}{Colors.RESET}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='Bzzz P2P Coordination Monitor')
parser.add_argument('--refresh-rate', type=float, default=1.0,
help='Refresh rate in seconds (default: 1.0)')
parser.add_argument('--no-color', action='store_true',
help='Disable colored output')
args = parser.parse_args()
# Disable colors if requested
if args.no_color:
for attr in dir(Colors):
if not attr.startswith('_'):
setattr(Colors, attr, '')
# Check if bzzz service exists
try:
result = subprocess.run(['systemctl', 'status', 'bzzz.service'],
capture_output=True, text=True)
if result.returncode != 0 and 'not be found' in result.stderr:
print(f"{Colors.BRIGHT_RED}❌ Bzzz service not found. Is it installed and running?{Colors.RESET}")
sys.exit(1)
except Exception as e:
print(f"{Colors.BRIGHT_RED}❌ Error checking bzzz service: {e}{Colors.RESET}")
sys.exit(1)
print(f"{Colors.BRIGHT_CYAN}🚀 Starting Bzzz Monitor...{Colors.RESET}")
time.sleep(1)
monitor = BzzzMonitor(refresh_rate=args.refresh_rate)
monitor.run()
if __name__ == '__main__':
main()

View File

@@ -1,254 +0,0 @@
#!/usr/bin/env python3
"""
Advanced Meta Discussion Demo for Bzzz P2P Mesh
Shows cross-repository coordination and dependency detection
"""
import json
import time
from datetime import datetime
def demo_cross_repository_coordination():
"""Demonstrate advanced meta discussion features"""
print("🎯 ADVANCED BZZZ META DISCUSSION DEMO")
print("=" * 60)
print("Scenario: Multi-repository microservices coordination")
print()
# Simulate multiple repositories in the system
repositories = {
"api-gateway": {
"agent": "walnut-12345",
"capabilities": ["code-generation", "api-design", "security"],
"current_task": {
"id": 42,
"title": "Implement OAuth2 authentication flow",
"description": "Add OAuth2 support to API gateway with JWT tokens",
"labels": ["security", "api", "authentication"]
}
},
"user-service": {
"agent": "acacia-67890",
"capabilities": ["code-analysis", "database", "microservices"],
"current_task": {
"id": 87,
"title": "Update user schema for OAuth integration",
"description": "Add OAuth provider fields to user table",
"labels": ["database", "schema", "authentication"]
}
},
"notification-service": {
"agent": "ironwood-54321",
"capabilities": ["advanced-reasoning", "integration", "messaging"],
"current_task": {
"id": 156,
"title": "Secure webhook endpoints with JWT",
"description": "Validate JWT tokens on webhook endpoints",
"labels": ["security", "webhook", "authentication"]
}
}
}
print("📋 ACTIVE TASKS ACROSS REPOSITORIES:")
for repo, info in repositories.items():
task = info["current_task"]
print(f" 🔧 {repo}: #{task['id']} - {task['title']}")
print(f" Agent: {info['agent']} | Labels: {', '.join(task['labels'])}")
print()
# Demo 1: Dependency Detection
print("🔍 PHASE 1: DEPENDENCY DETECTION")
print("-" * 40)
dependencies = [
{
"task1": "api-gateway/#42",
"task2": "user-service/#87",
"relationship": "API_Contract",
"reason": "OAuth implementation requires coordinated schema changes",
"confidence": 0.9
},
{
"task1": "api-gateway/#42",
"task2": "notification-service/#156",
"relationship": "Security_Compliance",
"reason": "Both implement JWT token validation",
"confidence": 0.85
}
]
for dep in dependencies:
print(f"🔗 DEPENDENCY DETECTED:")
print(f" {dep['task1']}{dep['task2']}")
print(f" Type: {dep['relationship']} (confidence: {dep['confidence']})")
print(f" Reason: {dep['reason']}")
print()
# Demo 2: Coordination Session Creation
print("🎯 PHASE 2: COORDINATION SESSION INITIATED")
print("-" * 40)
session_id = f"coord_oauth_{int(time.time())}"
print(f"📝 Session ID: {session_id}")
print(f"📅 Created: {datetime.now().strftime('%H:%M:%S')}")
print(f"👥 Participants: walnut-12345, acacia-67890, ironwood-54321")
print()
# Demo 3: AI-Generated Coordination Plan
print("🤖 PHASE 3: AI-GENERATED COORDINATION PLAN")
print("-" * 40)
coordination_plan = """
COORDINATION PLAN: OAuth2 Implementation Across Services
1. EXECUTION ORDER:
- Phase 1: user-service (schema changes)
- Phase 2: api-gateway (OAuth implementation)
- Phase 3: notification-service (JWT validation)
2. SHARED ARTIFACTS:
- JWT token format specification
- OAuth2 endpoint documentation
- Database schema migration scripts
- Shared security configuration
3. COORDINATION REQUIREMENTS:
- walnut-12345: Define JWT token structure before implementation
- acacia-67890: Migrate user schema first, share field mappings
- ironwood-54321: Wait for JWT format, implement validation
4. POTENTIAL CONFLICTS:
- JWT payload structure disagreements
- Token expiration time mismatches
- Security scope definition conflicts
5. SUCCESS CRITERIA:
- All services use consistent JWT format
- OAuth flow works end-to-end
- Security audit passes on all endpoints
- Integration tests pass across all services
"""
print(coordination_plan)
# Demo 4: Agent Coordination Messages
print("💬 PHASE 4: AGENT COORDINATION MESSAGES")
print("-" * 40)
messages = [
{
"timestamp": "14:32:01",
"from": "walnut-12345 (api-gateway)",
"type": "proposal",
"content": "I propose using RS256 JWT tokens with 15min expiry. Standard claims: sub, iat, exp, scope."
},
{
"timestamp": "14:32:45",
"from": "acacia-67890 (user-service)",
"type": "question",
"content": "Should we store the OAuth provider info in the user table or separate table? Also need refresh token strategy."
},
{
"timestamp": "14:33:20",
"from": "ironwood-54321 (notification-service)",
"type": "agreement",
"content": "RS256 sounds good. For webhooks, I'll validate signature and check 'webhook' scope. Need the public key endpoint."
},
{
"timestamp": "14:34:10",
"from": "walnut-12345 (api-gateway)",
"type": "response",
"content": "Separate oauth_providers table is better for multiple providers. Public key at /.well-known/jwks.json"
},
{
"timestamp": "14:34:55",
"from": "acacia-67890 (user-service)",
"type": "agreement",
"content": "Agreed on separate table. I'll create migration script and share the schema. ETA: 2 hours."
}
]
for msg in messages:
print(f"[{msg['timestamp']}] {msg['from']} ({msg['type']}):")
print(f" {msg['content']}")
print()
# Demo 5: Automatic Resolution Detection
print("✅ PHASE 5: COORDINATION RESOLUTION")
print("-" * 40)
print("🔍 ANALYSIS: Consensus detected")
print(" - All agents agreed on JWT format (RS256)")
print(" - Database strategy decided (separate oauth_providers table)")
print(" - Public key endpoint established (/.well-known/jwks.json)")
print(" - Implementation order confirmed")
print()
print("📋 COORDINATION COMPLETE:")
print(" - Session status: RESOLVED")
print(" - Resolution: Consensus reached on OAuth implementation")
print(" - Next steps: acacia-67890 starts schema migration")
print(" - Dependencies: walnut-12345 waits for schema completion")
print()
# Demo 6: Alternative - Escalation Scenario
print("🚨 ALTERNATIVE: ESCALATION SCENARIO")
print("-" * 40)
escalation_scenario = """
ESCALATION TRIGGERED: Security Implementation Conflict
Reason: Agents cannot agree on JWT token expiration time
- walnut-12345 wants 15 minutes (high security)
- acacia-67890 wants 4 hours (user experience)
- ironwood-54321 wants 1 hour (compromise)
Messages exceeded threshold: 12 messages without consensus
Human expert summoned via N8N webhook to deepblack.cloud
Escalation webhook payload:
{
"session_id": "coord_oauth_1752401234",
"conflict_type": "security_policy_disagreement",
"agents_involved": ["walnut-12345", "acacia-67890", "ironwood-54321"],
"repositories": ["api-gateway", "user-service", "notification-service"],
"issue_summary": "JWT expiration time conflict preventing OAuth implementation",
"requires_human_decision": true,
"urgency": "medium"
}
"""
print(escalation_scenario)
# Demo 7: System Capabilities Summary
print("🎯 ADVANCED META DISCUSSION CAPABILITIES")
print("-" * 40)
capabilities = [
"✅ Cross-repository dependency detection",
"✅ Intelligent task relationship analysis",
"✅ AI-generated coordination plans",
"✅ Multi-agent conversation management",
"✅ Consensus detection and resolution",
"✅ Automatic escalation to humans",
"✅ Session lifecycle management",
"✅ Hop-limited message propagation",
"✅ Custom dependency rules",
"✅ Project-aware coordination"
]
for cap in capabilities:
print(f" {cap}")
print()
print("🚀 PRODUCTION READY:")
print(" - P2P mesh infrastructure: ✅ Deployed")
print(" - Antennae meta-discussion: ✅ Active")
print(" - Dependency detection: ✅ Implemented")
print(" - Coordination sessions: ✅ Functional")
print(" - Human escalation: ✅ N8N integrated")
print()
print("🎯 Ready for real cross-repository coordination!")
if __name__ == "__main__":
demo_cross_repository_coordination()

View File

@@ -0,0 +1,94 @@
# Bzzz System Architecture & Flow
This document contains diagrams to visualize the architecture and data flows of the Bzzz distributed task coordination system.
---
### ✅ Fixed **Component Architecture Diagram**
```mermaid
graph TD
subgraph External_Systems ["External Systems"]
GitHub[(GitHub Repositories)] -- "Tasks (Issues/PRs)" --> BzzzAgent
HiveAPI[Hive REST API] -- "Repo Lists & Status Updates" --> BzzzAgent
N8N([N8N Webhooks])
Ollama[Ollama API]
end
subgraph Bzzz_Agent_Node ["Bzzz Agent Node"]
BzzzAgent[Bzzz Agent]
BzzzAgent -- "Manages" --> P2P
BzzzAgent -- "Uses" --> Integration
BzzzAgent -- "Uses" --> Executor
BzzzAgent -- "Uses" --> Logging
P2P(P2P/PubSub Layer) -- "Discovers Peers" --> Discovery
P2P -- "Communicates via" --> Antennae
Integration(GitHub Integration) -- "Polls for Tasks" --> HiveAPI
Integration -- "Claims Tasks" --> GitHub
Executor(Task Executor) -- "Runs Commands In" --> Sandbox
Executor -- "Gets Next Command From" --> Reasoning
Reasoning(Reasoning Module) -- "Sends Prompts To" --> Ollama
Sandbox(Docker Sandbox) -->|Isolated| Executor
Logging(Hypercore Logging) -->|Creates Audit Trail| BzzzAgent
Discovery(mDNS Discovery)
end
BzzzAgent -- "P2P Comms" --> OtherAgent[Other Bzzz Agent]
OtherAgent -- "P2P Comms" --> BzzzAgent
Executor -- "Escalates To" --> N8N
classDef internal fill:#D6EAF8,stroke:#2E86C1,stroke-width:2px;
class BzzzAgent,P2P,Integration,Executor,Reasoning,Sandbox,Logging,Discovery internal
classDef external fill:#E8DAEF,stroke:#8E44AD,stroke-width:2px;
class GitHub,HiveAPI,N8N,Ollama external
```
---
### ✅ Fixed **Task Execution Flowchart**
```mermaid
flowchart TD
A[Start: Unassigned Task on GitHub] --> B{Bzzz Agent Polls Hive API}
B --> C{Discovers Active Repositories}
C --> D{Polls Repos for Suitable Tasks}
D --> E{Task Found?}
E -- No --> B
E -- Yes --> F[Agent Claims Task via GitHub API]
F --> G[Report Claim to Hive API]
G --> H[Announce Claim on P2P PubSub]
H --> I[Create Docker Sandbox]
I --> J[Clone Repository]
J --> K{Generate Next Command via Reasoning/Ollama}
K --> L{Is Task Complete?}
L -- No --> M[Execute Command in Sandbox]
M --> N[Feed Output Back to Reasoning]
N --> K
L -- Yes --> O[Create Branch & Commit Changes]
O --> P[Push Branch to GitHub]
P --> Q[Create Pull Request]
Q --> R[Report Completion to Hive API]
R --> S[Announce Completion on PubSub]
S --> T[Destroy Docker Sandbox]
T --> Z[End]
K -- "Needs Help" --> MD1
%% Meta-Discussion Loop (Separate Cluster)
subgraph Meta_Discussion ["Meta-Discussion (Antennae)"]
MD1{Agent Proposes Plan} -->|PubSub| MD2[Other Agents Review]
MD2 -->|Feedback| MD1
MD1 -->|Stuck?| MD3{Escalate to N8N}
end
H -.-> MD1
```

View File

@@ -13,19 +13,27 @@ import (
const maxIterations = 10 // Prevents infinite loops
// ExecuteTask manages the entire lifecycle of a task using a sandboxed environment.
func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.HypercoreLog) (string, error) {
// 1. Create the sandbox environment
sb, err := sandbox.CreateSandbox(ctx, "") // Use default image for now
if err != nil {
return "", fmt.Errorf("failed to create sandbox: %w", err)
// ExecuteTaskResult contains the result of task execution
type ExecuteTaskResult struct {
BranchName string
Sandbox *sandbox.Sandbox
}
defer sb.DestroySandbox()
// ExecuteTask manages the entire lifecycle of a task using a sandboxed environment.
// Returns sandbox reference so it can be destroyed after PR creation
func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.HypercoreLog, agentConfig *config.AgentConfig) (*ExecuteTaskResult, error) {
// 1. Create the sandbox environment
sb, err := sandbox.CreateSandbox(ctx, "", agentConfig) // Use default image for now
if err != nil {
return nil, fmt.Errorf("failed to create sandbox: %w", err)
}
// NOTE: Do NOT defer destroy here - let caller handle it
// 2. Clone the repository inside the sandbox
cloneCmd := fmt.Sprintf("git clone %s .", task.GitURL)
if _, err := sb.RunCommand(cloneCmd); err != nil {
return "", fmt.Errorf("failed to clone repository in sandbox: %w", err)
sb.DestroySandbox() // Clean up on error
return nil, fmt.Errorf("failed to clone repository in sandbox: %w", err)
}
hlog.Append(logging.TaskProgress, map[string]interface{}{"task_id": task.Number, "status": "cloned repo"})
@@ -35,7 +43,8 @@ func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.Hy
// a. Generate the next command based on the task and previous output
nextCommand, err := generateNextCommand(ctx, task, lastCommandOutput)
if err != nil {
return "", fmt.Errorf("failed to generate next command: %w", err)
sb.DestroySandbox() // Clean up on error
return nil, fmt.Errorf("failed to generate next command: %w", err)
}
hlog.Append(logging.TaskProgress, map[string]interface{}{
@@ -65,34 +74,56 @@ func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.Hy
// 4. Create a new branch and commit the changes
branchName := fmt.Sprintf("bzzz-task-%d", task.Number)
if _, err := sb.RunCommand(fmt.Sprintf("git checkout -b %s", branchName)); err != nil {
return "", fmt.Errorf("failed to create branch: %w", err)
sb.DestroySandbox() // Clean up on error
return nil, fmt.Errorf("failed to create branch: %w", err)
}
if _, err := sb.RunCommand("git add ."); err != nil {
return "", fmt.Errorf("failed to add files: %w", err)
sb.DestroySandbox() // Clean up on error
return nil, fmt.Errorf("failed to add files: %w", err)
}
commitCmd := fmt.Sprintf("git commit -m 'feat: resolve task #%d'", task.Number)
if _, err := sb.RunCommand(commitCmd); err != nil {
return "", fmt.Errorf("failed to commit changes: %w", err)
sb.DestroySandbox() // Clean up on error
return nil, fmt.Errorf("failed to commit changes: %w", err)
}
// 5. Push the new branch
if _, err := sb.RunCommand(fmt.Sprintf("git push origin %s", branchName)); err != nil {
return "", fmt.Errorf("failed to push branch: %w", err)
sb.DestroySandbox() // Clean up on error
return nil, fmt.Errorf("failed to push branch: %w", err)
}
hlog.Append(logging.TaskProgress, map[string]interface{}{"task_id": task.Number, "status": "pushed changes"})
return branchName, nil
return &ExecuteTaskResult{
BranchName: branchName,
Sandbox: sb,
}, nil
}
// generateNextCommand uses the LLM to decide the next command to execute.
func generateNextCommand(ctx context.Context, task *types.EnhancedTask, lastOutput string) (string, error) {
prompt := fmt.Sprintf(
"You are an AI developer in a sandboxed shell environment. Your goal is to solve the following GitHub issue:\n\n"+
"You are an AI developer agent in the Bzzz P2P distributed development network, working in a sandboxed shell environment.\n\n"+
"TASK DETAILS:\n"+
"Title: %s\nDescription: %s\n\n"+
"You can only interact with the system by issuing shell commands. "+
"The previous command output was:\n---\n%s\n---\n"+
"Based on this, what is the single next shell command you should run? "+
"If you believe the task is complete and ready for a pull request, respond with 'TASK_COMPLETE'.",
"CAPABILITIES & RESOURCES:\n"+
"- You can issue shell commands to solve this GitHub issue\n"+
"- You are part of a collaborative P2P mesh with other AI agents\n"+
"- If stuck, you can ask for help by using keywords: 'stuck', 'help', 'clarification needed', 'manual intervention'\n"+
"- Complex problems automatically escalate to human experts via N8N webhooks\n"+
"- You have access to git, build tools, editors, and development utilities\n"+
"- GitHub CLI (gh) is available for creating PRs: use 'gh pr create --title \"title\" --body \"description\"'\n"+
"- GitHub authentication is configured automatically\n"+
"- Work is preserved even if issues occur - your changes are committed and pushed\n\n"+
"COLLABORATION GUIDELINES:\n"+
"- Use clear, descriptive commit messages\n"+
"- Break complex problems into smaller steps\n"+
"- Ask for help early if you encounter unfamiliar technologies\n"+
"- Document your reasoning in commands where helpful\n\n"+
"PREVIOUS OUTPUT:\n---\n%s\n---\n\n"+
"Based on this context, what is the single next shell command you should run?\n"+
"If you believe the task is complete and ready for a pull request, respond with 'TASK_COMPLETE'.\n"+
"If you need help, include relevant keywords in your response.",
task.Title, task.Description, lastOutput,
)

View File

@@ -2,6 +2,7 @@ package github
import (
"context"
"crypto/sha256"
"fmt"
"time"
@@ -181,7 +182,7 @@ func (c *Client) ClaimTask(issueNumber int, agentID string) (*Task, error) {
// Attempt atomic assignment using GitHub's native assignment
// GitHub only accepts existing usernames, so we'll assign to the repo owner
githubAssignee := "anthonyrawlins"
githubAssignee := c.config.Assignee
issueRequest := &github.IssueRequest{
Assignee: &githubAssignee,
}
@@ -321,9 +322,16 @@ func (c *Client) ListAvailableTasks() ([]*Task, error) {
return tasks, nil
}
// hashAgentID creates a short hash of the agent ID for safe branch naming
func hashAgentID(agentID string) string {
hash := sha256.Sum256([]byte(agentID))
return fmt.Sprintf("%x", hash[:8]) // Use first 8 bytes (16 hex chars)
}
// createTaskBranch creates a new branch for task work
func (c *Client) createTaskBranch(issueNumber int, agentID string) error {
branchName := fmt.Sprintf("%s%d-%s", c.config.BranchPrefix, issueNumber, agentID)
hashedAgentID := hashAgentID(agentID)
branchName := fmt.Sprintf("%s%d-%s", c.config.BranchPrefix, issueNumber, hashedAgentID)
// Get the base branch reference
baseRef, _, err := c.client.Git.GetRef(

View File

@@ -1,455 +0,0 @@
package github
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/anthonyrawlins/bzzz/executor"
"github.com/anthonyrawlins/bzzz/logging"
"github.com/anthonyrawlins/bzzz/pkg/hive"
"github.com/anthonyrawlins/bzzz/pkg/types"
"github.com/anthonyrawlins/bzzz/pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)
// HiveIntegration handles dynamic repository discovery via Hive API
type HiveIntegration struct {
hiveClient *hive.HiveClient
githubToken string
pubsub *pubsub.PubSub
hlog *logging.HypercoreLog
ctx context.Context
config *IntegrationConfig
// Repository management
repositories map[int]*RepositoryClient // projectID -> GitHub client
repositoryLock sync.RWMutex
// Conversation tracking
activeDiscussions map[string]*Conversation // "projectID:taskID" -> conversation
discussionLock sync.RWMutex
}
// RepositoryClient wraps a GitHub client for a specific repository
type RepositoryClient struct {
Client *Client
Repository hive.Repository
LastSync time.Time
}
// NewHiveIntegration creates a new Hive-based GitHub integration
func NewHiveIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig) *HiveIntegration {
if config.PollInterval == 0 {
config.PollInterval = 30 * time.Second
}
if config.MaxTasks == 0 {
config.MaxTasks = 3
}
return &HiveIntegration{
hiveClient: hiveClient,
githubToken: githubToken,
pubsub: ps,
hlog: hlog,
ctx: ctx,
config: config,
repositories: make(map[int]*RepositoryClient),
activeDiscussions: make(map[string]*Conversation),
}
}
// Start begins the Hive-GitHub integration
func (hi *HiveIntegration) Start() {
fmt.Printf("🔗 Starting Hive-GitHub integration for agent: %s\n", hi.config.AgentID)
// Register the handler for incoming meta-discussion messages
hi.pubsub.SetAntennaeMessageHandler(hi.handleMetaDiscussion)
// Start repository discovery and task polling
go hi.repositoryDiscoveryLoop()
go hi.taskPollingLoop()
}
// repositoryDiscoveryLoop periodically discovers active repositories from Hive
func (hi *HiveIntegration) repositoryDiscoveryLoop() {
ticker := time.NewTicker(5 * time.Minute) // Check for new repositories every 5 minutes
defer ticker.Stop()
// Initial discovery
hi.syncRepositories()
for {
select {
case <-hi.ctx.Done():
return
case <-ticker.C:
hi.syncRepositories()
}
}
}
// syncRepositories synchronizes the list of active repositories from Hive
func (hi *HiveIntegration) syncRepositories() {
repositories, err := hi.hiveClient.GetActiveRepositories(hi.ctx)
if err != nil {
fmt.Printf("❌ Failed to get active repositories: %v\n", err)
return
}
hi.repositoryLock.Lock()
defer hi.repositoryLock.Unlock()
// Track which repositories we've seen
currentRepos := make(map[int]bool)
for _, repo := range repositories {
currentRepos[repo.ProjectID] = true
// Check if we already have a client for this repository
if _, exists := hi.repositories[repo.ProjectID]; !exists {
// Create new GitHub client for this repository
githubConfig := &Config{
AccessToken: hi.githubToken,
Owner: repo.Owner,
Repository: repo.Repository,
BaseBranch: repo.Branch,
}
client, err := NewClient(hi.ctx, githubConfig)
if err != nil {
fmt.Printf("❌ Failed to create GitHub client for %s/%s: %v\n", repo.Owner, repo.Repository, err)
continue
}
hi.repositories[repo.ProjectID] = &RepositoryClient{
Client: client,
Repository: repo,
LastSync: time.Now(),
}
fmt.Printf("✅ Added repository: %s/%s (Project ID: %d)\n", repo.Owner, repo.Repository, repo.ProjectID)
}
}
// Remove repositories that are no longer active
for projectID := range hi.repositories {
if !currentRepos[projectID] {
delete(hi.repositories, projectID)
fmt.Printf("🗑️ Removed inactive repository (Project ID: %d)\n", projectID)
}
}
fmt.Printf("📊 Repository sync complete: %d active repositories\n", len(hi.repositories))
}
// taskPollingLoop periodically polls all repositories for available tasks
func (hi *HiveIntegration) taskPollingLoop() {
ticker := time.NewTicker(hi.config.PollInterval)
defer ticker.Stop()
for {
select {
case <-hi.ctx.Done():
return
case <-ticker.C:
hi.pollAllRepositories()
}
}
}
// pollAllRepositories checks all active repositories for available tasks
func (hi *HiveIntegration) pollAllRepositories() {
hi.repositoryLock.RLock()
repositories := make([]*RepositoryClient, 0, len(hi.repositories))
for _, repo := range hi.repositories {
repositories = append(repositories, repo)
}
hi.repositoryLock.RUnlock()
if len(repositories) == 0 {
return
}
fmt.Printf("🔍 Polling %d repositories for available tasks...\n", len(repositories))
var allTasks []*types.EnhancedTask
// Collect tasks from all repositories
for _, repoClient := range repositories {
tasks, err := hi.getRepositoryTasks(repoClient)
if err != nil {
fmt.Printf("❌ Failed to get tasks from %s/%s: %v\n",
repoClient.Repository.Owner, repoClient.Repository.Repository, err)
continue
}
allTasks = append(allTasks, tasks...)
}
if len(allTasks) == 0 {
return
}
fmt.Printf("📋 Found %d total available tasks across all repositories\n", len(allTasks))
// Apply filtering and selection
suitableTasks := hi.filterSuitableTasks(allTasks)
if len(suitableTasks) == 0 {
fmt.Printf("⚠️ No suitable tasks for agent capabilities: %v\n", hi.config.Capabilities)
return
}
// Select and claim the highest priority task
task := suitableTasks[0]
hi.claimAndExecuteTask(task)
}
// getRepositoryTasks fetches available tasks from a specific repository
func (hi *HiveIntegration) getRepositoryTasks(repoClient *RepositoryClient) ([]*types.EnhancedTask, error) {
// Get tasks from GitHub
githubTasks, err := repoClient.Client.ListAvailableTasks()
if err != nil {
return nil, err
}
// Convert to enhanced tasks with project context
var enhancedTasks []*types.EnhancedTask
for _, task := range githubTasks {
enhancedTask := &types.EnhancedTask{
ID: task.ID,
Number: task.Number,
Title: task.Title,
Description: task.Description,
State: task.State,
Labels: task.Labels,
Assignee: task.Assignee,
CreatedAt: task.CreatedAt,
UpdatedAt: task.UpdatedAt,
TaskType: task.TaskType,
Priority: task.Priority,
Requirements: task.Requirements,
Deliverables: task.Deliverables,
Context: task.Context,
ProjectID: repoClient.Repository.ProjectID,
GitURL: repoClient.Repository.GitURL,
Repository: repoClient.Repository,
}
enhancedTasks = append(enhancedTasks, enhancedTask)
}
return enhancedTasks, nil
}
// filterSuitableTasks filters tasks based on agent capabilities
func (hi *HiveIntegration) filterSuitableTasks(tasks []*types.EnhancedTask) []*types.EnhancedTask {
var suitable []*types.EnhancedTask
for _, task := range tasks {
if hi.canHandleTaskType(task.TaskType) {
suitable = append(suitable, task)
}
}
return suitable
}
// canHandleTaskType checks if this agent can handle the given task type
func (hi *HiveIntegration) canHandleTaskType(taskType string) bool {
for _, capability := range hi.config.Capabilities {
if capability == taskType || capability == "general" || capability == "task-coordination" {
return true
}
}
return false
}
// claimAndExecuteTask claims a task and begins execution
func (hi *HiveIntegration) claimAndExecuteTask(task *types.EnhancedTask) {
hi.repositoryLock.RLock()
repoClient, exists := hi.repositories[task.ProjectID]
hi.repositoryLock.RUnlock()
if !exists {
fmt.Printf("❌ Repository client not found for project %d\n", task.ProjectID)
return
}
// Claim the task in GitHub
_, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID)
if err != nil {
fmt.Printf("❌ Failed to claim task %d in %s/%s: %v\n",
task.Number, task.Repository.Owner, task.Repository.Repository, err)
return
}
fmt.Printf("✋ Claimed task #%d from %s/%s: %s\n",
task.Number, task.Repository.Owner, task.Repository.Repository, task.Title)
// Log the claim
hi.hlog.Append(logging.TaskClaimed, map[string]interface{}{
"task_id": task.Number,
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
"title": task.Title,
})
// Report claim to Hive
if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil {
fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err)
}
// Start task execution
go hi.executeTask(task, repoClient)
}
// executeTask executes a claimed task with reasoning and coordination
func (hi *HiveIntegration) executeTask(task *types.EnhancedTask, repoClient *RepositoryClient) {
// Define the dynamic topic for this task
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", task.Number)
hi.pubsub.JoinDynamicTopic(taskTopic)
defer hi.pubsub.LeaveDynamicTopic(taskTopic)
fmt.Printf("🚀 Starting execution of task #%d in sandbox...\n", task.Number)
// The executor now handles the entire iterative process.
branchName, err := executor.ExecuteTask(hi.ctx, task, hi.hlog)
if err != nil {
fmt.Printf("❌ Failed to execute task #%d: %v\n", task.Number, err)
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "task execution failed in sandbox"})
return
}
// Create a pull request
pr, err := repoClient.Client.CreatePullRequest(task.Number, branchName, hi.config.AgentID)
if err != nil {
fmt.Printf("❌ Failed to create pull request for task #%d: %v\n", task.Number, err)
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "failed to create pull request"})
return
}
fmt.Printf("✅ Successfully created pull request for task #%d: %s\n", task.Number, pr.GetHTMLURL())
hi.hlog.Append(logging.TaskCompleted, map[string]interface{}{
"task_id": task.Number,
"pr_url": pr.GetHTMLURL(),
"pr_number": pr.GetNumber(),
})
// Report completion to Hive
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{
"pull_request_url": pr.GetHTMLURL(),
}); err != nil {
fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err)
}
}
// requestAssistance publishes a help request to the task-specific topic.
func (hi *HiveIntegration) requestAssistance(task *types.EnhancedTask, reason, topic string) {
fmt.Printf("🆘 Agent %s is requesting assistance for task #%d: %s\n", hi.config.AgentID, task.Number, reason)
hi.hlog.Append(logging.TaskHelpRequested, map[string]interface{}{
"task_id": task.Number,
"reason": reason,
})
helpRequest := map[string]interface{}{
"issue_id": task.Number,
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
"reason": reason,
}
hi.pubsub.PublishToDynamicTopic(topic, pubsub.TaskHelpRequest, helpRequest)
}
// handleMetaDiscussion handles all incoming messages from dynamic and static topics.
func (hi *HiveIntegration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
switch msg.Type {
case pubsub.TaskHelpRequest:
hi.handleHelpRequest(msg, from)
case pubsub.TaskHelpResponse:
hi.handleHelpResponse(msg, from)
default:
// Handle other meta-discussion messages (e.g., peer feedback)
}
}
// handleHelpRequest is called when another agent requests assistance.
func (hi *HiveIntegration) handleHelpRequest(msg pubsub.Message, from peer.ID) {
issueID, _ := msg.Data["issue_id"].(float64)
reason, _ := msg.Data["reason"].(string)
fmt.Printf("🙋 Received help request for task #%d from %s: %s\n", int(issueID), from.ShortString(), reason)
// Simple logic: if we are not busy, we can help.
// A more advanced agent would check its capabilities against the reason.
canHelp := true // Placeholder for more complex logic
if canHelp {
fmt.Printf("✅ Agent %s can help with task #%d\n", hi.config.AgentID, int(issueID))
hi.hlog.Append(logging.TaskHelpOffered, map[string]interface{}{
"task_id": int(issueID),
"requester_id": from.ShortString(),
})
response := map[string]interface{}{
"issue_id": issueID,
"can_help": true,
"capabilities": hi.config.Capabilities,
}
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", int(issueID))
hi.pubsub.PublishToDynamicTopic(taskTopic, pubsub.TaskHelpResponse, response)
}
}
// handleHelpResponse is called when an agent receives an offer for help.
func (hi *HiveIntegration) handleHelpResponse(msg pubsub.Message, from peer.ID) {
issueID, _ := msg.Data["issue_id"].(float64)
canHelp, _ := msg.Data["can_help"].(bool)
if canHelp {
fmt.Printf("🤝 Received help offer for task #%d from %s\n", int(issueID), from.ShortString())
hi.hlog.Append(logging.TaskHelpReceived, map[string]interface{}{
"task_id": int(issueID),
"helper_id": from.ShortString(),
})
// In a full implementation, the agent would now delegate a sub-task
// or use the helper's capabilities. For now, we just log it.
}
}
// shouldEscalate determines if a task needs human intervention
func (hi *HiveIntegration) shouldEscalate(response string, history []string) bool {
// Check for escalation keywords
lowerResponse := strings.ToLower(response)
keywords := []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"}
for _, keyword := range keywords {
if strings.Contains(lowerResponse, keyword) {
return true
}
}
// Check conversation length
if len(history) >= 10 {
return true
}
return false
}
// triggerHumanEscalation sends escalation to Hive and N8N
func (hi *HiveIntegration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) {
hi.hlog.Append(logging.Escalation, map[string]interface{}{
"task_id": convo.TaskID,
"reason": reason,
})
// Report to Hive system
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{
"escalation_reason": reason,
"conversation_length": len(convo.History),
"escalated_by": hi.config.AgentID,
}); err != nil {
fmt.Printf("⚠️ Failed to report escalation to Hive: %v\n", err)
}
fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID)
}

View File

@@ -1,60 +1,48 @@
package github
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/anthonyrawlins/bzzz/executor"
"github.com/anthonyrawlins/bzzz/logging"
"github.com/anthonyrawlins/bzzz/pkg/hive"
"github.com/anthonyrawlins/bzzz/pkg/types"
"github.com/anthonyrawlins/bzzz/pubsub"
"github.com/anthonyrawlins/bzzz/reasoning"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
// humanEscalationWebhookURL is the N8N webhook for escalating tasks.
humanEscalationWebhookURL = "https://n8n.home.deepblack.cloud/webhook-test/human-escalation"
// conversationHistoryLimit is the number of messages before auto-escalation.
conversationHistoryLimit = 10
)
var escalationKeywords = []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"}
// Conversation represents the history of a discussion for a task.
type Conversation struct {
TaskID int
TaskTitle string
TaskDescription string
History []string
LastUpdated time.Time
IsEscalated bool
}
// Integration handles the integration between GitHub tasks and Bzzz P2P coordination
// Integration handles dynamic repository discovery via Hive API
type Integration struct {
client *Client
hiveClient *hive.HiveClient
githubToken string
pubsub *pubsub.PubSub
hlog *logging.HypercoreLog
ctx context.Context
config *IntegrationConfig
agentConfig *config.AgentConfig
activeDiscussions map[int]*Conversation
// Repository management
repositories map[int]*RepositoryClient // projectID -> GitHub client
repositoryLock sync.RWMutex
// Conversation tracking
activeDiscussions map[string]*Conversation // "projectID:taskID" -> conversation
discussionLock sync.RWMutex
}
// IntegrationConfig holds configuration for GitHub-Bzzz integration
type IntegrationConfig struct {
PollInterval time.Duration
MaxTasks int
AgentID string
Capabilities []string
// RepositoryClient wraps a GitHub client for a specific repository
type RepositoryClient struct {
Client *Client
Repository hive.Repository
LastSync time.Time
}
// NewIntegration creates a new GitHub-Bzzz integration
func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, config *IntegrationConfig) *Integration {
// NewIntegration creates a new Hive-based GitHub integration
func NewIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig, agentConfig *config.AgentConfig) *Integration {
if config.PollInterval == 0 {
config.PollInterval = 30 * time.Second
}
@@ -63,234 +51,215 @@ func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, conf
}
return &Integration{
client: client,
hiveClient: hiveClient,
githubToken: githubToken,
pubsub: ps,
hlog: hlog,
ctx: ctx,
config: config,
activeDiscussions: make(map[int]*Conversation),
agentConfig: agentConfig,
repositories: make(map[int]*RepositoryClient),
activeDiscussions: make(map[string]*Conversation),
}
}
// Start begins the GitHub-Bzzz integration
func (i *Integration) Start() {
fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID)
i.pubsub.SetAntennaeMessageHandler(i.handleMetaDiscussion)
go i.pollForTasks()
// Start begins the Hive-GitHub integration
func (hi *Integration) Start() {
fmt.Printf("🔗 Starting Hive-GitHub integration for agent: %s\n", hi.config.AgentID)
// Register the handler for incoming meta-discussion messages
hi.pubsub.SetAntennaeMessageHandler(hi.handleMetaDiscussion)
// Start repository discovery and task polling
go hi.repositoryDiscoveryLoop()
go hi.taskPollingLoop()
}
// pollForTasks periodically checks GitHub for available tasks
func (i *Integration) pollForTasks() {
ticker := time.NewTicker(i.config.PollInterval)
// repositoryDiscoveryLoop periodically discovers active repositories from Hive
func (hi *Integration) repositoryDiscoveryLoop() {
ticker := time.NewTicker(5 * time.Minute) // Check for new repositories every 5 minutes
defer ticker.Stop()
// Initial discovery
hi.syncRepositories()
for {
select {
case <-hi.ctx.Done():
return
case <-ticker.C:
hi.syncRepositories()
}
}
}
// syncRepositories synchronizes the list of active repositories from Hive
func (hi *Integration) syncRepositories() {
repositories, err := hi.hiveClient.GetActiveRepositories(hi.ctx)
if err != nil {
fmt.Printf("❌ Failed to get active repositories: %v\n", err)
return
}
hi.repositoryLock.Lock()
defer hi.repositoryLock.Unlock()
// Track which repositories we've seen
currentRepos := make(map[int]bool)
for _, repo := range repositories {
currentRepos[repo.ProjectID] = true
// Check if we already have a client for this repository
if _, exists := hi.repositories[repo.ProjectID]; !exists {
// Create new GitHub client for this repository
githubConfig := &Config{
AccessToken: hi.githubToken,
Owner: repo.Owner,
Repository: repo.Repository,
BaseBranch: repo.Branch,
}
client, err := NewClient(hi.ctx, githubConfig)
if err != nil {
fmt.Printf("❌ Failed to create GitHub client for %s/%s: %v\n", repo.Owner, repo.Repository, err)
continue
}
hi.repositories[repo.ProjectID] = &RepositoryClient{
Client: client,
Repository: repo,
LastSync: time.Now(),
}
fmt.Printf("✅ Added repository: %s/%s (Project ID: %d)\n", repo.Owner, repo.Repository, repo.ProjectID)
}
}
// Remove repositories that are no longer active
for projectID := range hi.repositories {
if !currentRepos[projectID] {
delete(hi.repositories, projectID)
fmt.Printf("🗑️ Removed inactive repository (Project ID: %d)\n", projectID)
}
}
fmt.Printf("📊 Repository sync complete: %d active repositories\n", len(hi.repositories))
}
// taskPollingLoop periodically polls all repositories for available tasks
func (hi *Integration) taskPollingLoop() {
ticker := time.NewTicker(hi.config.PollInterval)
defer ticker.Stop()
for {
select {
case <-i.ctx.Done():
case <-hi.ctx.Done():
return
case <-ticker.C:
if err := i.checkAndClaimTasks(); err != nil {
fmt.Printf("❌ Error checking tasks: %v\n", err)
}
hi.pollAllRepositories()
}
}
}
// checkAndClaimTasks looks for available tasks and claims suitable ones
func (i *Integration) checkAndClaimTasks() error {
tasks, err := i.client.ListAvailableTasks()
// pollAllRepositories checks all active repositories for available tasks
func (hi *Integration) pollAllRepositories() {
hi.repositoryLock.RLock()
repositories := make([]*RepositoryClient, 0, len(hi.repositories))
for _, repo := range hi.repositories {
repositories = append(repositories, repo)
}
hi.repositoryLock.RUnlock()
if len(repositories) == 0 {
return
}
fmt.Printf("🔍 Polling %d repositories for available tasks...\n", len(repositories))
var allTasks []*types.EnhancedTask
// Collect tasks from all repositories
for _, repoClient := range repositories {
tasks, err := hi.getRepositoryTasks(repoClient)
if err != nil {
return fmt.Errorf("failed to list tasks: %w", err)
fmt.Printf("❌ Failed to get tasks from %s/%s: %v\n",
repoClient.Repository.Owner, repoClient.Repository.Repository, err)
continue
}
if len(tasks) == 0 {
return nil
allTasks = append(allTasks, tasks...)
}
suitableTasks := i.filterSuitableTasks(tasks)
if len(allTasks) == 0 {
return
}
fmt.Printf("📋 Found %d total available tasks across all repositories\n", len(allTasks))
// Apply filtering and selection
suitableTasks := hi.filterSuitableTasks(allTasks)
if len(suitableTasks) == 0 {
return nil
fmt.Printf("⚠️ No suitable tasks for agent capabilities: %v\n", hi.config.Capabilities)
return
}
// Select and claim the highest priority task
task := suitableTasks[0]
claimedTask, err := i.client.ClaimTask(task.Number, i.config.AgentID)
hi.claimAndExecuteTask(task)
}
// getRepositoryTasks fetches available tasks from a specific repository
func (hi *Integration) getRepositoryTasks(repoClient *RepositoryClient) ([]*types.EnhancedTask, error) {
// Get tasks from GitHub
githubTasks, err := repoClient.Client.ListAvailableTasks()
if err != nil {
return fmt.Errorf("failed to claim task %d: %w", task.Number, err)
}
fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title)
go i.executeTask(claimedTask)
return nil
return nil, err
}
// executeTask starts the task by generating and proposing a plan.
func (i *Integration) executeTask(task *Task) {
fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title)
prompt := fmt.Sprintf("You are an expert AI developer. Based on the following GitHub issue, create a concise, step-by-step plan to resolve it. Issue Title: %s. Issue Body: %s.", task.Title, task.Description)
plan, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt)
if err != nil {
fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err)
return
}
fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan)
i.discussionLock.Lock()
i.activeDiscussions[task.Number] = &Conversation{
TaskID: task.Number,
TaskTitle: task.Title,
TaskDescription: task.Description,
History: []string{fmt.Sprintf("Plan by %s: %s", i.config.AgentID, plan)},
LastUpdated: time.Now(),
}
i.discussionLock.Unlock()
metaMsg := map[string]interface{}{
"issue_id": task.Number,
"message": "Here is my proposed plan of action. What are your thoughts?",
"plan": plan,
}
if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, metaMsg); err != nil {
fmt.Printf("⚠️ Failed to publish plan to meta-discussion channel: %v\n", err)
// Convert to enhanced tasks with project context
var enhancedTasks []*types.EnhancedTask
for _, task := range githubTasks {
enhancedTask := &types.EnhancedTask{
ID: task.ID,
Number: task.Number,
Title: task.Title,
Description: task.Description,
State: task.State,
Labels: task.Labels,
Assignee: task.Assignee,
CreatedAt: task.CreatedAt,
UpdatedAt: task.UpdatedAt,
TaskType: task.TaskType,
Priority: task.Priority,
Requirements: task.Requirements,
Deliverables: task.Deliverables,
Context: task.Context,
ProjectID: repoClient.Repository.ProjectID,
GitURL: repoClient.Repository.GitURL,
Repository: repoClient.Repository,
}
enhancedTasks = append(enhancedTasks, enhancedTask)
}
// handleMetaDiscussion is the core handler for incoming Antennae messages.
func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
issueID, ok := msg.Data["issue_id"].(float64)
if !ok {
return
}
taskID := int(issueID)
i.discussionLock.Lock()
convo, exists := i.activeDiscussions[taskID]
if !exists || convo.IsEscalated {
i.discussionLock.Unlock()
return
return enhancedTasks, nil
}
incomingMessage, _ := msg.Data["message"].(string)
convo.History = append(convo.History, fmt.Sprintf("Response from %s: %s", from.ShortString(), incomingMessage))
convo.LastUpdated = time.Now()
i.discussionLock.Unlock()
fmt.Printf("🎯 Received peer feedback for task #%d. Reasoning about a response...\n", taskID)
historyStr := strings.Join(convo.History, "\n")
prompt := fmt.Sprintf(
"You are an AI developer collaborating on a task. "+
"This is the original task: Title: %s, Body: %s. "+
"This is the conversation so far:\n%s\n\n"+
"Based on the last message, provide a concise and helpful response.",
convo.TaskTitle, convo.TaskDescription, historyStr,
)
response, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt)
if err != nil {
fmt.Printf("❌ Failed to generate response for task #%d: %v\n", taskID, err)
return
}
// Check if the situation requires human intervention
if i.shouldEscalate(response, convo.History) {
fmt.Printf("🚨 Escalating task #%d for human review.\n", taskID)
convo.IsEscalated = true
go i.triggerHumanEscalation(convo, response)
return
}
fmt.Printf("💬 Sending response for task #%d...\n", taskID)
responseMsg := map[string]interface{}{
"issue_id": taskID,
"message": response,
}
if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, responseMsg); err != nil {
fmt.Printf("⚠️ Failed to publish response for task #%d: %v\n", taskID, err)
}
}
// shouldEscalate determines if a task needs human intervention.
func (i *Integration) shouldEscalate(response string, history []string) bool {
// Rule 1: Check for keywords in the latest response
lowerResponse := strings.ToLower(response)
for _, keyword := range escalationKeywords {
if strings.Contains(lowerResponse, keyword) {
return true
}
}
// Rule 2: Check if the conversation is too long
if len(history) >= conversationHistoryLimit {
return true
}
return false
}
// triggerHumanEscalation sends the conversation details to the N8N webhook.
func (i *Integration) triggerHumanEscalation(convo *Conversation, reason string) {
// 1. Announce the escalation to other agents
escalationMsg := map[string]interface{}{
"issue_id": convo.TaskID,
"message": "This task has been escalated for human review. No further automated action will be taken.",
"reason": reason,
}
if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, escalationMsg); err != nil {
fmt.Printf("⚠️ Failed to publish escalation message for task #%d: %v\n", convo.TaskID, err)
}
// 2. Send the payload to the N8N webhook
payload := map[string]interface{}{
"task_id": convo.TaskID,
"task_title": convo.TaskTitle,
"escalation_agent": i.config.AgentID,
"reason": reason,
"history": strings.Join(convo.History, "\n"),
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
fmt.Printf("❌ Failed to marshal escalation payload for task #%d: %v\n", convo.TaskID, err)
return
}
req, err := http.NewRequestWithContext(i.ctx, "POST", humanEscalationWebhookURL, bytes.NewBuffer(payloadBytes))
if err != nil {
fmt.Printf("❌ Failed to create escalation request for task #%d: %v\n", convo.TaskID, err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Printf("❌ Failed to send escalation webhook for task #%d: %v\n", convo.TaskID, err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
fmt.Printf("⚠️ Human escalation webhook for task #%d returned non-2xx status: %d\n", convo.TaskID, resp.StatusCode)
} else {
fmt.Printf("✅ Successfully escalated task #%d to human administrator.\n", convo.TaskID)
}
}
// filterSuitableTasks filters tasks based on agent capabilities and task labels
func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task {
var suitable []*Task
// filterSuitableTasks filters tasks based on agent capabilities
func (hi *Integration) filterSuitableTasks(tasks []*types.EnhancedTask) []*types.EnhancedTask {
var suitable []*types.EnhancedTask
for _, task := range tasks {
// Check if we can handle this task based on its labels or title keywords
if i.canHandleTask(task) {
if hi.canHandleTaskType(task.TaskType) {
suitable = append(suitable, task)
}
}
fmt.Printf("🔍 Filtered %d suitable tasks from %d total tasks\n", len(suitable), len(tasks))
return suitable
}
// canHandleTaskType checks if this agent can handle the given task type
func (i *Integration) canHandleTaskType(taskType string) bool {
for _, capability := range i.config.Capabilities {
func (hi *Integration) canHandleTaskType(taskType string) bool {
for _, capability := range hi.config.Capabilities {
if capability == taskType || capability == "general" || capability == "task-coordination" {
return true
}
@@ -298,67 +267,206 @@ func (i *Integration) canHandleTaskType(taskType string) bool {
return false
}
// canHandleTask determines if this agent can handle a specific task
func (i *Integration) canHandleTask(task *Task) bool {
// Check task labels for capability matches
for _, label := range task.Labels {
if i.canHandleTaskType(label) {
// claimAndExecuteTask claims a task and begins execution
func (hi *Integration) claimAndExecuteTask(task *types.EnhancedTask) {
hi.repositoryLock.RLock()
repoClient, exists := hi.repositories[task.ProjectID]
hi.repositoryLock.RUnlock()
if !exists {
fmt.Printf("❌ Repository client not found for project %d\n", task.ProjectID)
return
}
// Claim the task in GitHub
_, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID)
if err != nil {
fmt.Printf("❌ Failed to claim task %d in %s/%s: %v\n",
task.Number, task.Repository.Owner, task.Repository.Repository, err)
return
}
fmt.Printf("✋ Claimed task #%d from %s/%s: %s\n",
task.Number, task.Repository.Owner, task.Repository.Repository, task.Title)
// Log the claim
hi.hlog.Append(logging.TaskClaimed, map[string]interface{}{
"task_id": task.Number,
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
"title": task.Title,
})
// Report claim to Hive
if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil {
fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err)
}
// Start task execution
go hi.executeTask(task, repoClient)
}
// executeTask executes a claimed task with reasoning and coordination
func (hi *Integration) executeTask(task *types.EnhancedTask, repoClient *RepositoryClient) {
// Define the dynamic topic for this task
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", task.Number)
hi.pubsub.JoinDynamicTopic(taskTopic)
defer hi.pubsub.LeaveDynamicTopic(taskTopic)
fmt.Printf("🚀 Starting execution of task #%d in sandbox...\n", task.Number)
// The executor now handles the entire iterative process.
result, err := executor.ExecuteTask(hi.ctx, task, hi.hlog, hi.agentConfig)
if err != nil {
fmt.Printf("❌ Failed to execute task #%d: %v\n", task.Number, err)
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "task execution failed in sandbox"})
return
}
// Ensure sandbox cleanup happens regardless of PR creation success/failure
defer result.Sandbox.DestroySandbox()
// Create a pull request
pr, err := repoClient.Client.CreatePullRequest(task.Number, result.BranchName, hi.config.AgentID)
if err != nil {
fmt.Printf("❌ Failed to create pull request for task #%d: %v\n", task.Number, err)
fmt.Printf("📝 Note: Branch '%s' has been pushed to repository and work is preserved\n", result.BranchName)
// Escalate PR creation failure to humans via N8N webhook
escalationReason := fmt.Sprintf("Failed to create pull request: %v. Task execution completed successfully and work is preserved in branch '%s', but PR creation failed.", err, result.BranchName)
hi.requestAssistance(task, escalationReason, fmt.Sprintf("bzzz/meta/issue/%d", task.Number))
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{
"task_id": task.Number,
"reason": "failed to create pull request",
"branch_name": result.BranchName,
"work_preserved": true,
"escalated": true,
})
return
}
fmt.Printf("✅ Successfully created pull request for task #%d: %s\n", task.Number, pr.GetHTMLURL())
hi.hlog.Append(logging.TaskCompleted, map[string]interface{}{
"task_id": task.Number,
"pr_url": pr.GetHTMLURL(),
"pr_number": pr.GetNumber(),
})
// Report completion to Hive
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{
"pull_request_url": pr.GetHTMLURL(),
}); err != nil {
fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err)
}
}
// requestAssistance publishes a help request to the task-specific topic.
func (hi *Integration) requestAssistance(task *types.EnhancedTask, reason, topic string) {
fmt.Printf("🆘 Agent %s is requesting assistance for task #%d: %s\n", hi.config.AgentID, task.Number, reason)
hi.hlog.Append(logging.TaskHelpRequested, map[string]interface{}{
"task_id": task.Number,
"reason": reason,
})
helpRequest := map[string]interface{}{
"issue_id": task.Number,
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
"reason": reason,
}
hi.pubsub.PublishToDynamicTopic(topic, pubsub.TaskHelpRequest, helpRequest)
}
// handleMetaDiscussion handles all incoming messages from dynamic and static topics.
func (hi *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
switch msg.Type {
case pubsub.TaskHelpRequest:
hi.handleHelpRequest(msg, from)
case pubsub.TaskHelpResponse:
hi.handleHelpResponse(msg, from)
default:
// Handle other meta-discussion messages (e.g., peer feedback)
}
}
// handleHelpRequest is called when another agent requests assistance.
func (hi *Integration) handleHelpRequest(msg pubsub.Message, from peer.ID) {
issueID, _ := msg.Data["issue_id"].(float64)
reason, _ := msg.Data["reason"].(string)
fmt.Printf("🙋 Received help request for task #%d from %s: %s\n", int(issueID), from.ShortString(), reason)
// Simple logic: if we are not busy, we can help.
// TODO: A more advanced agent would check its capabilities against the reason.
canHelp := true // Placeholder for more complex logic
if canHelp {
fmt.Printf("✅ Agent %s can help with task #%d\n", hi.config.AgentID, int(issueID))
hi.hlog.Append(logging.TaskHelpOffered, map[string]interface{}{
"task_id": int(issueID),
"requester_id": from.ShortString(),
})
response := map[string]interface{}{
"issue_id": issueID,
"can_help": true,
"capabilities": hi.config.Capabilities,
}
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", int(issueID))
hi.pubsub.PublishToDynamicTopic(taskTopic, pubsub.TaskHelpResponse, response)
}
}
// handleHelpResponse is called when an agent receives an offer for help.
func (hi *Integration) handleHelpResponse(msg pubsub.Message, from peer.ID) {
issueID, _ := msg.Data["issue_id"].(float64)
canHelp, _ := msg.Data["can_help"].(bool)
if canHelp {
fmt.Printf("🤝 Received help offer for task #%d from %s\n", int(issueID), from.ShortString())
hi.hlog.Append(logging.TaskHelpReceived, map[string]interface{}{
"task_id": int(issueID),
"helper_id": from.ShortString(),
})
// In a full implementation, the agent would now delegate a sub-task
// or use the helper's capabilities. For now, we just log it.
}
}
// shouldEscalate determines if a task needs human intervention
func (hi *Integration) shouldEscalate(response string, history []string) bool {
// Check for escalation keywords
lowerResponse := strings.ToLower(response)
keywords := []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"}
for _, keyword := range keywords {
if strings.Contains(lowerResponse, keyword) {
return true
}
}
// Check title/description for keyword matches based on capabilities
taskText := strings.ToLower(task.Title + " " + task.Description)
for _, capability := range i.config.Capabilities {
switch capability {
case "code-generation", "coding":
if strings.Contains(taskText, "code") || strings.Contains(taskText, "implement") ||
strings.Contains(taskText, "develop") || strings.Contains(taskText, "write") {
// Check conversation length
if len(history) >= 10 {
return true
}
case "code-analysis", "review":
if strings.Contains(taskText, "review") || strings.Contains(taskText, "analyze") ||
strings.Contains(taskText, "audit") || strings.Contains(taskText, "refactor") {
return true
}
case "debugging", "bug-fix":
if strings.Contains(taskText, "bug") || strings.Contains(taskText, "fix") ||
strings.Contains(taskText, "error") || strings.Contains(taskText, "debug") {
return true
}
case "testing":
if strings.Contains(taskText, "test") || strings.Contains(taskText, "spec") ||
strings.Contains(taskText, "validation") {
return true
}
case "documentation":
if strings.Contains(taskText, "doc") || strings.Contains(taskText, "readme") ||
strings.Contains(taskText, "guide") || strings.Contains(taskText, "manual") {
return true
}
case "general", "task-coordination", "meta-discussion":
// These capabilities can handle any task
return true
}
}
// If no specific match, check if we have general capabilities
return i.canHandleTaskType("general")
return false
}
// announceTaskClaim broadcasts task claim to P2P mesh for coordination
func (i *Integration) announceTaskClaim(task *Task) error {
claimData := map[string]interface{}{
"task_id": task.ID,
"task_number": task.Number,
"task_title": task.Title,
"agent_id": i.config.AgentID,
"timestamp": time.Now().Unix(),
"repository": fmt.Sprintf("%s/%s", task.Labels[0], task.Labels[1]), // Assuming owner/repo in labels
"action": "claimed",
// triggerHumanEscalation sends escalation to Hive and N8N
func (hi *Integration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) {
hi.hlog.Append(logging.Escalation, map[string]interface{}{
"task_id": convo.TaskID,
"reason": reason,
})
// Report to Hive system
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{
"escalation_reason": reason,
"conversation_length": len(convo.History),
"escalated_by": hi.config.AgentID,
}); err != nil {
fmt.Printf("⚠️ Failed to report escalation to Hive: %v\n", err)
}
fmt.Printf("📢 Announcing task claim to P2P mesh: Task #%d\n", task.Number)
return i.pubsub.PublishBzzzMessage(pubsub.TaskClaim, claimData)
fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID)
}

1
go.mod
View File

@@ -41,6 +41,7 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/huin/goupnp v1.3.0 // indirect

2
go.sum
View File

@@ -219,6 +219,8 @@ github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE0
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=

View File

@@ -141,7 +141,7 @@ func main() {
}
// Initialize dynamic GitHub integration
var ghIntegration *github.HiveIntegration
var ghIntegration *github.Integration
if githubToken != "" {
// Use agent ID from config (auto-generated from node ID)
agentID := cfg.Agent.ID
@@ -156,7 +156,7 @@ func main() {
MaxTasks: cfg.Agent.MaxTasks,
}
ghIntegration = github.NewHiveIntegration(ctx, hiveClient, githubToken, ps, hlog, integrationConfig)
ghIntegration = github.NewIntegration(ctx, hiveClient, githubToken, ps, hlog, integrationConfig, &cfg.Agent)
// Start the integration service
ghIntegration.Start()
@@ -339,7 +339,7 @@ func announceCapabilitiesOnChange(ps *pubsub.PubSub, nodeID string, cfg *config.
cfg.Agent.Models = validModels
// Configure reasoning module with available models and webhook
reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook)
reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook, cfg.Agent.DefaultReasoningModel)
}
// Get current capabilities

View File

@@ -1,702 +0,0 @@
#!/usr/bin/env python3
"""
Mock Hive API Server for Bzzz Testing
This simulates what the real Hive API would provide to bzzz agents:
- Active repositories with bzzz-enabled tasks
- Fake GitHub issues with bzzz-task labels
- Task dependencies and coordination scenarios
The real bzzz agents will consume this fake data and do actual coordination.
"""
import json
import random
import time
from datetime import datetime, timedelta
from flask import Flask, jsonify, request
from threading import Thread
app = Flask(__name__)
# Mock data for repositories and tasks
MOCK_REPOSITORIES = [
{
"project_id": 1,
"name": "hive-coordination-platform",
"git_url": "https://github.com/mock/hive",
"owner": "mock-org",
"repository": "hive",
"branch": "main",
"bzzz_enabled": True,
"ready_to_claim": True,
"private_repo": False,
"github_token_required": False
},
{
"project_id": 2,
"name": "bzzz-p2p-system",
"git_url": "https://github.com/mock/bzzz",
"owner": "mock-org",
"repository": "bzzz",
"branch": "main",
"bzzz_enabled": True,
"ready_to_claim": True,
"private_repo": False,
"github_token_required": False
},
{
"project_id": 3,
"name": "distributed-ai-development",
"git_url": "https://github.com/mock/distributed-ai-dev",
"owner": "mock-org",
"repository": "distributed-ai-dev",
"branch": "main",
"bzzz_enabled": True,
"ready_to_claim": True,
"private_repo": False,
"github_token_required": False
},
{
"project_id": 4,
"name": "infrastructure-automation",
"git_url": "https://github.com/mock/infra-automation",
"owner": "mock-org",
"repository": "infra-automation",
"branch": "main",
"bzzz_enabled": True,
"ready_to_claim": True,
"private_repo": False,
"github_token_required": False
}
]
# Mock tasks with realistic coordination scenarios
MOCK_TASKS = {
1: [ # hive tasks
{
"number": 15,
"title": "Add WebSocket support for real-time coordination",
"description": "Implement WebSocket endpoints for real-time agent coordination messages",
"state": "open",
"labels": ["bzzz-task", "feature", "realtime", "coordination"],
"created_at": "2025-01-14T10:00:00Z",
"updated_at": "2025-01-14T10:30:00Z",
"html_url": "https://github.com/mock/hive/issues/15",
"is_claimed": False,
"assignees": [],
"task_type": "feature",
"dependencies": [
{
"repository": "bzzz",
"task_number": 23,
"dependency_type": "api_contract"
}
]
},
{
"number": 16,
"title": "Implement agent authentication system",
"description": "Add secure JWT-based authentication for bzzz agents accessing Hive APIs",
"state": "open",
"labels": ["bzzz-task", "security", "auth", "high-priority"],
"created_at": "2025-01-14T09:30:00Z",
"updated_at": "2025-01-14T10:45:00Z",
"html_url": "https://github.com/mock/hive/issues/16",
"is_claimed": False,
"assignees": [],
"task_type": "security",
"dependencies": []
},
{
"number": 17,
"title": "Create coordination metrics dashboard",
"description": "Build dashboard showing cross-repository coordination statistics",
"state": "open",
"labels": ["bzzz-task", "dashboard", "metrics", "ui"],
"created_at": "2025-01-14T11:00:00Z",
"updated_at": "2025-01-14T11:15:00Z",
"html_url": "https://github.com/mock/hive/issues/17",
"is_claimed": False,
"assignees": [],
"task_type": "feature",
"dependencies": [
{
"repository": "bzzz",
"task_number": 24,
"dependency_type": "api_contract"
}
]
}
],
2: [ # bzzz tasks
{
"number": 23,
"title": "Define coordination API contract",
"description": "Standardize API contract for cross-repository coordination messaging",
"state": "open",
"labels": ["bzzz-task", "api", "coordination", "blocker"],
"created_at": "2025-01-14T09:00:00Z",
"updated_at": "2025-01-14T10:00:00Z",
"html_url": "https://github.com/mock/bzzz/issues/23",
"is_claimed": False,
"assignees": [],
"task_type": "api_design",
"dependencies": []
},
{
"number": 24,
"title": "Implement dependency detection algorithm",
"description": "Auto-detect task dependencies across repositories using graph analysis",
"state": "open",
"labels": ["bzzz-task", "algorithm", "coordination", "complex"],
"created_at": "2025-01-14T10:15:00Z",
"updated_at": "2025-01-14T10:30:00Z",
"html_url": "https://github.com/mock/bzzz/issues/24",
"is_claimed": False,
"assignees": [],
"task_type": "feature",
"dependencies": [
{
"repository": "bzzz",
"task_number": 23,
"dependency_type": "api_contract"
}
]
},
{
"number": 25,
"title": "Add consensus algorithm for coordination",
"description": "Implement distributed consensus for multi-agent task coordination",
"state": "open",
"labels": ["bzzz-task", "consensus", "distributed-systems", "hard"],
"created_at": "2025-01-14T11:30:00Z",
"updated_at": "2025-01-14T11:45:00Z",
"html_url": "https://github.com/mock/bzzz/issues/25",
"is_claimed": False,
"assignees": [],
"task_type": "feature",
"dependencies": []
}
],
3: [ # distributed-ai-dev tasks
{
"number": 8,
"title": "Add support for bzzz coordination",
"description": "Integrate with bzzz P2P coordination system for distributed AI development",
"state": "open",
"labels": ["bzzz-task", "integration", "p2p", "ai"],
"created_at": "2025-01-14T10:45:00Z",
"updated_at": "2025-01-14T11:00:00Z",
"html_url": "https://github.com/mock/distributed-ai-dev/issues/8",
"is_claimed": False,
"assignees": [],
"task_type": "integration",
"dependencies": [
{
"repository": "bzzz",
"task_number": 23,
"dependency_type": "api_contract"
},
{
"repository": "hive",
"task_number": 16,
"dependency_type": "security"
}
]
},
{
"number": 9,
"title": "Implement AI model coordination",
"description": "Enable coordination between AI models across different development environments",
"state": "open",
"labels": ["bzzz-task", "ai-coordination", "models", "complex"],
"created_at": "2025-01-14T11:15:00Z",
"updated_at": "2025-01-14T11:30:00Z",
"html_url": "https://github.com/mock/distributed-ai-dev/issues/9",
"is_claimed": False,
"assignees": [],
"task_type": "feature",
"dependencies": [
{
"repository": "distributed-ai-dev",
"task_number": 8,
"dependency_type": "integration"
}
]
}
],
4: [ # infra-automation tasks
{
"number": 12,
"title": "Automate bzzz deployment across cluster",
"description": "Create automated deployment scripts for bzzz agents on all cluster nodes",
"state": "open",
"labels": ["bzzz-task", "deployment", "automation", "devops"],
"created_at": "2025-01-14T12:00:00Z",
"updated_at": "2025-01-14T12:15:00Z",
"html_url": "https://github.com/mock/infra-automation/issues/12",
"is_claimed": False,
"assignees": [],
"task_type": "infrastructure",
"dependencies": [
{
"repository": "hive",
"task_number": 16,
"dependency_type": "security"
}
]
}
]
}
# Track claimed tasks
claimed_tasks = {}
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({"status": "healthy", "service": "mock-hive-api", "timestamp": datetime.now().isoformat()})
@app.route('/api/bzzz/active-repos', methods=['GET'])
def get_active_repositories():
"""Return mock active repositories for bzzz consumption"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] 📡 Bzzz requested active repositories")
# Randomly vary the number of available repos for more realistic testing
available_repos = random.sample(MOCK_REPOSITORIES, k=random.randint(2, len(MOCK_REPOSITORIES)))
return jsonify({"repositories": available_repos})
@app.route('/api/bzzz/projects/<int:project_id>/tasks', methods=['GET'])
def get_project_tasks(project_id):
"""Return mock bzzz-task labeled issues for a specific project"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] 📋 Bzzz requested tasks for project {project_id}")
if project_id not in MOCK_TASKS:
return jsonify([])
# Return tasks, updating claim status
tasks = []
for task in MOCK_TASKS[project_id]:
task_copy = task.copy()
claim_key = f"{project_id}-{task['number']}"
# Check if task is claimed
if claim_key in claimed_tasks:
claim_info = claimed_tasks[claim_key]
# Tasks expire after 30 minutes if not updated
if datetime.now() - claim_info['claimed_at'] < timedelta(minutes=30):
task_copy['is_claimed'] = True
task_copy['assignees'] = [claim_info['agent_id']]
else:
# Claim expired
del claimed_tasks[claim_key]
task_copy['is_claimed'] = False
task_copy['assignees'] = []
tasks.append(task_copy)
return jsonify(tasks)
@app.route('/api/bzzz/projects/<int:project_id>/claim', methods=['POST'])
def claim_task(project_id):
"""Register task claim with mock Hive system"""
data = request.get_json()
task_number = data.get('task_number')
agent_id = data.get('agent_id')
print(f"[{datetime.now().strftime('%H:%M:%S')}] 🎯 Agent {agent_id} claiming task {project_id}#{task_number}")
if not task_number or not agent_id:
return jsonify({"error": "task_number and agent_id are required"}), 400
claim_key = f"{project_id}-{task_number}"
# Check if already claimed
if claim_key in claimed_tasks:
existing_claim = claimed_tasks[claim_key]
if datetime.now() - existing_claim['claimed_at'] < timedelta(minutes=30):
return jsonify({
"error": "Task already claimed",
"claimed_by": existing_claim['agent_id'],
"claimed_at": existing_claim['claimed_at'].isoformat()
}), 409
# Register the claim
claim_id = f"{project_id}-{task_number}-{agent_id}-{int(time.time())}"
claimed_tasks[claim_key] = {
"agent_id": agent_id,
"claimed_at": datetime.now(),
"claim_id": claim_id
}
print(f"[{datetime.now().strftime('%H:%M:%S')}] ✅ Task {project_id}#{task_number} claimed by {agent_id}")
return jsonify({"success": True, "claim_id": claim_id})
@app.route('/api/bzzz/projects/<int:project_id>/status', methods=['PUT'])
def update_task_status(project_id):
"""Update task status in mock Hive system"""
data = request.get_json()
task_number = data.get('task_number')
status = data.get('status')
metadata = data.get('metadata', {})
print(f"[{datetime.now().strftime('%H:%M:%S')}] 📊 Task {project_id}#{task_number} status: {status}")
if not task_number or not status:
return jsonify({"error": "task_number and status are required"}), 400
# Log status update
if status == "completed":
claim_key = f"{project_id}-{task_number}"
if claim_key in claimed_tasks:
agent_id = claimed_tasks[claim_key]['agent_id']
print(f"[{datetime.now().strftime('%H:%M:%S')}] 🎉 Task {project_id}#{task_number} completed by {agent_id}")
del claimed_tasks[claim_key] # Remove claim
elif status == "escalated":
print(f"[{datetime.now().strftime('%H:%M:%S')}] 🚨 Task {project_id}#{task_number} escalated: {metadata}")
return jsonify({"success": True})
@app.route('/api/bzzz/coordination-log', methods=['POST'])
def log_coordination_activity():
"""Log coordination activity for monitoring"""
data = request.get_json()
activity_type = data.get('type', 'unknown')
details = data.get('details', {})
print(f"[{datetime.now().strftime('%H:%M:%S')}] 🧠 Coordination: {activity_type} - {details}")
# Save coordination activity to file
save_coordination_work(activity_type, details)
return jsonify({"success": True, "logged": True})
@app.route('/api/bzzz/projects/<int:project_id>/submit-work', methods=['POST'])
def submit_work(project_id):
"""Endpoint for agents to submit their actual work/code/solutions"""
data = request.get_json()
task_number = data.get('task_number')
agent_id = data.get('agent_id')
work_type = data.get('work_type', 'code') # code, documentation, configuration, etc.
content = data.get('content', '')
files = data.get('files', {}) # Dictionary of filename -> content
commit_message = data.get('commit_message', '')
description = data.get('description', '')
print(f"[{datetime.now().strftime('%H:%M:%S')}] 📝 Work submission: {agent_id} -> Project {project_id} Task {task_number}")
print(f" Type: {work_type}, Files: {len(files)}, Content length: {len(content)}")
# Save the actual work content
work_data = {
"project_id": project_id,
"task_number": task_number,
"agent_id": agent_id,
"work_type": work_type,
"content": content,
"files": files,
"commit_message": commit_message,
"description": description,
"submitted_at": datetime.now().isoformat()
}
save_agent_work(work_data)
return jsonify({
"success": True,
"work_id": f"{project_id}-{task_number}-{int(time.time())}",
"message": "Work submitted successfully to mock repository"
})
@app.route('/api/bzzz/projects/<int:project_id>/create-pr', methods=['POST'])
def create_pull_request(project_id):
"""Endpoint for agents to submit pull request content"""
data = request.get_json()
task_number = data.get('task_number')
agent_id = data.get('agent_id')
pr_title = data.get('title', '')
pr_description = data.get('description', '')
files_changed = data.get('files_changed', {})
branch_name = data.get('branch_name', f"bzzz-task-{task_number}")
print(f"[{datetime.now().strftime('%H:%M:%S')}] 🔀 Pull Request: {agent_id} -> Project {project_id}")
print(f" Title: {pr_title}")
print(f" Files changed: {len(files_changed)}")
# Save the pull request content
pr_data = {
"project_id": project_id,
"task_number": task_number,
"agent_id": agent_id,
"title": pr_title,
"description": pr_description,
"files_changed": files_changed,
"branch_name": branch_name,
"created_at": datetime.now().isoformat(),
"status": "open"
}
save_pull_request(pr_data)
return jsonify({
"success": True,
"pr_number": random.randint(100, 999),
"pr_url": f"https://github.com/mock/{get_repo_name(project_id)}/pull/{random.randint(100, 999)}",
"message": "Pull request created successfully in mock repository"
})
@app.route('/api/bzzz/projects/<int:project_id>/coordination-discussion', methods=['POST'])
def log_coordination_discussion(project_id):
"""Endpoint for agents to log coordination discussions and decisions"""
data = request.get_json()
discussion_type = data.get('type', 'general') # dependency_analysis, conflict_resolution, etc.
participants = data.get('participants', [])
messages = data.get('messages', [])
decisions = data.get('decisions', [])
context = data.get('context', {})
print(f"[{datetime.now().strftime('%H:%M:%S')}] 💬 Coordination Discussion: Project {project_id}")
print(f" Type: {discussion_type}, Participants: {len(participants)}, Messages: {len(messages)}")
# Save coordination discussion
discussion_data = {
"project_id": project_id,
"type": discussion_type,
"participants": participants,
"messages": messages,
"decisions": decisions,
"context": context,
"timestamp": datetime.now().isoformat()
}
save_coordination_discussion(discussion_data)
return jsonify({"success": True, "logged": True})
@app.route('/api/bzzz/projects/<int:project_id>/log-prompt', methods=['POST'])
def log_agent_prompt(project_id):
"""Endpoint for agents to log the prompts they are receiving/generating"""
data = request.get_json()
task_number = data.get('task_number')
agent_id = data.get('agent_id')
prompt_type = data.get('prompt_type', 'task_analysis') # task_analysis, coordination, meta_thinking
prompt_content = data.get('prompt_content', '')
context = data.get('context', {})
model_used = data.get('model_used', 'unknown')
print(f"[{datetime.now().strftime('%H:%M:%S')}] 🧠 Prompt Log: {agent_id} -> {prompt_type}")
print(f" Model: {model_used}, Task: {project_id}#{task_number}")
print(f" Prompt length: {len(prompt_content)} chars")
# Save the prompt data
prompt_data = {
"project_id": project_id,
"task_number": task_number,
"agent_id": agent_id,
"prompt_type": prompt_type,
"prompt_content": prompt_content,
"context": context,
"model_used": model_used,
"timestamp": datetime.now().isoformat()
}
save_agent_prompt(prompt_data)
return jsonify({"success": True, "logged": True})
def save_agent_prompt(prompt_data):
"""Save agent prompts to files for analysis"""
import os
timestamp = datetime.now()
work_dir = "/tmp/bzzz_agent_prompts"
os.makedirs(work_dir, exist_ok=True)
# Create filename with project, task, and timestamp
project_id = prompt_data["project_id"]
task_number = prompt_data["task_number"]
agent_id = prompt_data["agent_id"].replace("/", "_") # Clean agent ID for filename
prompt_type = prompt_data["prompt_type"]
filename = f"prompt_{prompt_type}_p{project_id}_t{task_number}_{agent_id}_{timestamp.strftime('%H%M%S')}.json"
prompt_file = os.path.join(work_dir, filename)
with open(prompt_file, "w") as f:
json.dump(prompt_data, f, indent=2)
print(f" 💾 Saved prompt to: {prompt_file}")
# Also save to daily log
log_file = os.path.join(work_dir, f"agent_prompts_log_{timestamp.strftime('%Y%m%d')}.jsonl")
with open(log_file, "a") as f:
f.write(json.dumps(prompt_data) + "\n")
def save_agent_work(work_data):
"""Save actual agent work submissions to files"""
import os
timestamp = datetime.now()
work_dir = "/tmp/bzzz_agent_work"
os.makedirs(work_dir, exist_ok=True)
# Create filename with project, task, and timestamp
project_id = work_data["project_id"]
task_number = work_data["task_number"]
agent_id = work_data["agent_id"].replace("/", "_") # Clean agent ID for filename
filename = f"work_p{project_id}_t{task_number}_{agent_id}_{timestamp.strftime('%H%M%S')}.json"
work_file = os.path.join(work_dir, filename)
with open(work_file, "w") as f:
json.dump(work_data, f, indent=2)
print(f" 💾 Saved work to: {work_file}")
# Also save to daily log
log_file = os.path.join(work_dir, f"agent_work_log_{timestamp.strftime('%Y%m%d')}.jsonl")
with open(log_file, "a") as f:
f.write(json.dumps(work_data) + "\n")
def save_pull_request(pr_data):
"""Save pull request content to files"""
import os
timestamp = datetime.now()
work_dir = "/tmp/bzzz_pull_requests"
os.makedirs(work_dir, exist_ok=True)
# Create filename with project, task, and timestamp
project_id = pr_data["project_id"]
task_number = pr_data["task_number"]
agent_id = pr_data["agent_id"].replace("/", "_") # Clean agent ID for filename
filename = f"pr_p{project_id}_t{task_number}_{agent_id}_{timestamp.strftime('%H%M%S')}.json"
pr_file = os.path.join(work_dir, filename)
with open(pr_file, "w") as f:
json.dump(pr_data, f, indent=2)
print(f" 💾 Saved PR to: {pr_file}")
# Also save to daily log
log_file = os.path.join(work_dir, f"pull_requests_log_{timestamp.strftime('%Y%m%d')}.jsonl")
with open(log_file, "a") as f:
f.write(json.dumps(pr_data) + "\n")
def save_coordination_discussion(discussion_data):
"""Save coordination discussions to files"""
import os
timestamp = datetime.now()
work_dir = "/tmp/bzzz_coordination_discussions"
os.makedirs(work_dir, exist_ok=True)
# Create filename with project and timestamp
project_id = discussion_data["project_id"]
discussion_type = discussion_data["type"]
filename = f"discussion_{discussion_type}_p{project_id}_{timestamp.strftime('%H%M%S')}.json"
discussion_file = os.path.join(work_dir, filename)
with open(discussion_file, "w") as f:
json.dump(discussion_data, f, indent=2)
print(f" 💾 Saved discussion to: {discussion_file}")
# Also save to daily log
log_file = os.path.join(work_dir, f"coordination_discussions_{timestamp.strftime('%Y%m%d')}.jsonl")
with open(log_file, "a") as f:
f.write(json.dumps(discussion_data) + "\n")
def get_repo_name(project_id):
"""Get repository name from project ID"""
repo_map = {
1: "hive",
2: "bzzz",
3: "distributed-ai-dev",
4: "infra-automation"
}
return repo_map.get(project_id, "unknown-repo")
def save_coordination_work(activity_type, details):
"""Save coordination work to files for analysis"""
timestamp = datetime.now()
work_dir = "/tmp/bzzz_coordination_work"
os.makedirs(work_dir, exist_ok=True)
# Create detailed log entry
work_entry = {
"timestamp": timestamp.isoformat(),
"type": activity_type,
"details": details,
"session_id": details.get("session_id", "unknown")
}
# Save to daily log file
log_file = os.path.join(work_dir, f"coordination_work_{timestamp.strftime('%Y%m%d')}.jsonl")
with open(log_file, "a") as f:
f.write(json.dumps(work_entry) + "\n")
# Save individual work items to separate files
if activity_type in ["code_generation", "task_solution", "pull_request_content"]:
work_file = os.path.join(work_dir, f"{activity_type}_{timestamp.strftime('%H%M%S')}.json")
with open(work_file, "w") as f:
json.dump(work_entry, f, indent=2)
def start_background_task_updates():
"""Background thread to simulate changing task priorities and new tasks"""
def background_updates():
while True:
time.sleep(random.randint(60, 180)) # Every 1-3 minutes
# Occasionally add a new urgent task
if random.random() < 0.3: # 30% chance
project_id = random.choice([1, 2, 3, 4])
urgent_task = {
"number": random.randint(100, 999),
"title": f"URGENT: {random.choice(['Critical bug fix', 'Security patch', 'Production issue', 'Integration failure'])}",
"description": "High priority task requiring immediate attention",
"state": "open",
"labels": ["bzzz-task", "urgent", "critical"],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"html_url": f"https://github.com/mock/repo/issues/{random.randint(100, 999)}",
"is_claimed": False,
"assignees": [],
"task_type": "bug",
"dependencies": []
}
if project_id not in MOCK_TASKS:
MOCK_TASKS[project_id] = []
MOCK_TASKS[project_id].append(urgent_task)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 🚨 NEW URGENT TASK: Project {project_id} - {urgent_task['title']}")
thread = Thread(target=background_updates, daemon=True)
thread.start()
if __name__ == '__main__':
print("🚀 Starting Mock Hive API Server for Bzzz Testing")
print("=" * 50)
print("This server provides fake projects and tasks to real bzzz agents")
print("Real bzzz coordination will happen with this simulated data")
print("")
print("Available endpoints:")
print(" GET /health - Health check")
print(" GET /api/bzzz/active-repos - Active repositories")
print(" GET /api/bzzz/projects/<id>/tasks - Project tasks")
print(" POST /api/bzzz/projects/<id>/claim - Claim task")
print(" PUT /api/bzzz/projects/<id>/status - Update task status")
print(" POST /api/bzzz/projects/<id>/submit-work - Submit actual work/code")
print(" POST /api/bzzz/projects/<id>/create-pr - Submit pull request content")
print(" POST /api/bzzz/projects/<id>/coordination-discussion - Log coordination discussions")
print(" POST /api/bzzz/projects/<id>/log-prompt - Log agent prompts and model usage")
print(" POST /api/bzzz/coordination-log - Log coordination activity")
print("")
print("Starting background task updates...")
start_background_task_updates()
print(f"🌟 Mock Hive API running on http://localhost:5000")
print("Configure bzzz to use: BZZZ_HIVE_API_URL=http://localhost:5000")
print("")
app.run(host='0.0.0.0', port=5000, debug=False)

View File

@@ -36,6 +36,8 @@ type AgentConfig struct {
Models []string `yaml:"models"`
Specialization string `yaml:"specialization"`
ModelSelectionWebhook string `yaml:"model_selection_webhook"`
DefaultReasoningModel string `yaml:"default_reasoning_model"`
SandboxImage string `yaml:"sandbox_image"`
}
// GitHubConfig holds GitHub integration settings
@@ -44,6 +46,7 @@ type GitHubConfig struct {
UserAgent string `yaml:"user_agent"`
Timeout time.Duration `yaml:"timeout"`
RateLimit bool `yaml:"rate_limit"`
Assignee string `yaml:"assignee"`
}
// P2PConfig holds P2P networking configuration
@@ -107,12 +110,15 @@ func getDefaultConfig() *Config {
Models: []string{"phi3", "llama3.1"},
Specialization: "general_developer",
ModelSelectionWebhook: "https://n8n.home.deepblack.cloud/webhook/model-selection",
DefaultReasoningModel: "phi3",
SandboxImage: "registry.home.deepblack.cloud/tony/bzzz-sandbox:latest",
},
GitHub: GitHubConfig{
TokenFile: "/home/tony/AI/secrets/passwords_and_tokens/gh-token",
UserAgent: "Bzzz-P2P-Agent/1.0",
Timeout: 30 * time.Second,
RateLimit: true,
Assignee: "anthonyrawlins",
},
P2P: P2PConfig{
ServiceTag: "bzzz-peer-discovery",

View File

@@ -49,7 +49,7 @@ type ActiveRepositoriesResponse struct {
// TaskClaimRequest represents a task claim request to Hive
type TaskClaimRequest struct {
TaskID int `json:"task_id"`
TaskNumber int `json:"task_number"`
AgentID string `json:"agent_id"`
ClaimedAt int64 `json:"claimed_at"`
}
@@ -133,7 +133,7 @@ func (c *HiveClient) ClaimTask(ctx context.Context, projectID, taskID int, agent
url := fmt.Sprintf("%s/api/bzzz/projects/%d/claim", c.BaseURL, projectID)
claimRequest := TaskClaimRequest{
TaskID: taskID,
TaskNumber: taskID,
AgentID: agentID,
ClaimedAt: time.Now().Unix(),
}

View File

@@ -18,6 +18,7 @@ const (
var (
availableModels []string
modelWebhookURL string
defaultModel string
)
// OllamaRequest represents the request payload for the Ollama API.
@@ -84,9 +85,10 @@ func GenerateResponse(ctx context.Context, model, prompt string) (string, error)
}
// SetModelConfig configures the available models and webhook URL for smart model selection
func SetModelConfig(models []string, webhookURL string) {
func SetModelConfig(models []string, webhookURL, defaultReasoningModel string) {
availableModels = models
modelWebhookURL = webhookURL
defaultModel = defaultReasoningModel
}
// selectBestModel calls the model selection webhook to choose the best model for a prompt
@@ -96,7 +98,7 @@ func selectBestModel(availableModels []string, prompt string) string {
if len(availableModels) > 0 {
return availableModels[0]
}
return "phi3" // Last resort fallback
return defaultModel // Last resort fallback
}
requestPayload := map[string]interface{}{

View File

@@ -8,17 +8,13 @@ import (
"io"
"os"
"path/filepath"
"strings"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
)
const (
// DefaultDockerImage is the image used if a task does not specify one.
DefaultDockerImage = "registry.home.deepblack.cloud/tony/bzzz-sandbox:latest"
)
// Sandbox represents a stateful, isolated execution environment for a single task.
type Sandbox struct {
ID string // The ID of the running container.
@@ -36,9 +32,9 @@ type CommandResult struct {
}
// CreateSandbox provisions a new Docker container for a task.
func CreateSandbox(ctx context.Context, taskImage string) (*Sandbox, error) {
func CreateSandbox(ctx context.Context, taskImage string, agentConfig *config.AgentConfig) (*Sandbox, error) {
if taskImage == "" {
taskImage = DefaultDockerImage
taskImage = agentConfig.SandboxImage
}
// Create a new Docker client
@@ -53,6 +49,16 @@ func CreateSandbox(ctx context.Context, taskImage string) (*Sandbox, error) {
return nil, fmt.Errorf("failed to create temp dir for sandbox: %w", err)
}
// Read GitHub token for authentication
githubToken := os.Getenv("BZZZ_GITHUB_TOKEN")
if githubToken == "" {
// Try to read from file
tokenBytes, err := os.ReadFile("/home/tony/AI/secrets/passwords_and_tokens/gh-token")
if err == nil {
githubToken = strings.TrimSpace(string(tokenBytes))
}
}
// Define container configuration
containerConfig := &container.Config{
Image: taskImage,
@@ -60,6 +66,10 @@ func CreateSandbox(ctx context.Context, taskImage string) (*Sandbox, error) {
OpenStdin: true,
WorkingDir: "/home/agent/work",
User: "agent",
Env: []string{
"GITHUB_TOKEN=" + githubToken,
"GH_TOKEN=" + githubToken,
},
}
// Define host configuration (e.g., volume mounts, resource limits)

View File

@@ -1,21 +0,0 @@
hive_api:
base_url: "https://hive.home.deepblack.cloud"
api_key: ""
timeout: "30s"
agent:
id: "test-agent"
capabilities: ["task-coordination", "meta-discussion", "general"]
models: ["phi3"]
specialization: "general_developer"
poll_interval: "60s"
max_tasks: 1
github:
token_file: ""
p2p:
escalation_webhook: "https://n8n.home.deepblack.cloud/webhook-test/human-escalation"
logging:
level: "debug"

View File

@@ -1,94 +0,0 @@
#!/usr/bin/env python3
"""
Test script for Bzzz-Hive API integration.
Tests the newly created API endpoints for dynamic repository discovery.
"""
import sys
import os
sys.path.append('/home/tony/AI/projects/hive/backend')
from app.services.project_service import ProjectService
import json
def test_project_service():
"""Test the ProjectService with Bzzz integration methods."""
print("🧪 Testing ProjectService with Bzzz integration...")
service = ProjectService()
# Test 1: Get all projects
print("\n📁 Testing get_all_projects()...")
projects = service.get_all_projects()
print(f"Found {len(projects)} total projects")
# Find projects with GitHub repos
github_projects = [p for p in projects if p.get('github_repo')]
print(f"Found {len(github_projects)} projects with GitHub repositories:")
for project in github_projects:
print(f" - {project['name']}: {project['github_repo']}")
# Test 2: Get active repositories for Bzzz
print("\n🐝 Testing get_bzzz_active_repositories()...")
try:
active_repos = service.get_bzzz_active_repositories()
print(f"Found {len(active_repos)} repositories ready for Bzzz coordination:")
for repo in active_repos:
print(f"\n 📦 Repository: {repo['name']}")
print(f" Owner: {repo['owner']}")
print(f" Repository: {repo['repository']}")
print(f" Git URL: {repo['git_url']}")
print(f" Ready to claim: {repo['ready_to_claim']}")
print(f" Project ID: {repo['project_id']}")
except Exception as e:
print(f"❌ Error testing active repositories: {e}")
# Test 3: Get bzzz-task issues for the hive project specifically
print("\n🎯 Testing get_bzzz_project_tasks() for 'hive' project...")
try:
hive_tasks = service.get_bzzz_project_tasks('hive')
print(f"Found {len(hive_tasks)} bzzz-task issues in hive project:")
for task in hive_tasks:
print(f"\n 🎫 Issue #{task['number']}: {task['title']}")
print(f" State: {task['state']}")
print(f" Labels: {task['labels']}")
print(f" Task Type: {task['task_type']}")
print(f" Claimed: {task['is_claimed']}")
if task['assignees']:
print(f" Assignees: {', '.join(task['assignees'])}")
print(f" URL: {task['html_url']}")
except Exception as e:
print(f"❌ Error testing hive project tasks: {e}")
# Test 4: Simulate API endpoint response format
print("\n📡 Testing API endpoint response format...")
try:
active_repos = service.get_bzzz_active_repositories()
api_response = {"repositories": active_repos}
print("API Response Preview (first 500 chars):")
response_json = json.dumps(api_response, indent=2)
print(response_json[:500] + "..." if len(response_json) > 500 else response_json)
except Exception as e:
print(f"❌ Error formatting API response: {e}")
def main():
print("🚀 Starting Bzzz-Hive API Integration Test")
print("="*50)
try:
test_project_service()
print("\n✅ Test completed successfully!")
except Exception as e:
print(f"\n❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -1,98 +0,0 @@
#!/usr/bin/env python3
"""
Test script to trigger and observe bzzz meta discussion
"""
import json
import time
import requests
from datetime import datetime
def test_meta_discussion():
"""Test the Antennae meta discussion by simulating a complex task"""
print("🎯 Testing Bzzz Antennae Meta Discussion")
print("=" * 50)
# Test 1: Check if the P2P mesh is active
print("1. Checking P2P mesh status...")
# We can't directly inject into the P2P mesh from here, but we can:
# - Check the bzzz service logs for meta discussion activity
# - Create a mock scenario description
mock_scenario = {
"task_type": "complex_architecture_design",
"description": "Design a microservices architecture for a distributed AI system with P2P coordination",
"complexity": "high",
"requires_collaboration": True,
"estimated_agents_needed": 3
}
print(f"📋 Mock Complex Task:")
print(f" Type: {mock_scenario['task_type']}")
print(f" Description: {mock_scenario['description']}")
print(f" Complexity: {mock_scenario['complexity']}")
print(f" Collaboration Required: {mock_scenario['requires_collaboration']}")
# Test 2: Demonstrate what would happen in meta discussion
print("\n2. Simulating Antennae Meta Discussion Flow:")
print(" 🤖 Agent A (walnut): 'I'll handle the API gateway design'")
print(" 🤖 Agent B (acacia): 'I can work on the data layer architecture'")
print(" 🤖 Agent C (ironwood): 'I'll focus on the P2P coordination logic'")
print(" 🎯 Meta Discussion: Agents coordinate task splitting and dependencies")
# Test 3: Show escalation scenario
print("\n3. Human Escalation Scenario:")
print(" ⚠️ Agents detect conflicting approaches to distributed consensus")
print(" 🚨 Automatic escalation triggered after 3 rounds of discussion")
print(" 👤 Human expert summoned via N8N webhook")
# Test 4: Check current bzzz logs for any meta discussion activity
print("\n4. Checking recent bzzz activity...")
try:
# This would show any recent meta discussion logs
import subprocess
result = subprocess.run([
'journalctl', '-u', 'bzzz.service', '--no-pager', '-l', '-n', '20'
], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
logs = result.stdout
if 'meta' in logs.lower() or 'antennae' in logs.lower():
print(" ✅ Found meta discussion activity in logs!")
# Show relevant lines
for line in logs.split('\n'):
if 'meta' in line.lower() or 'antennae' in line.lower():
print(f" 📝 {line}")
else:
print(" No recent meta discussion activity (expected - no active tasks)")
else:
print(" ⚠️ Could not access bzzz logs")
except Exception as e:
print(f" ⚠️ Error checking logs: {e}")
# Test 5: Show what capabilities support meta discussion
print("\n5. Meta Discussion Capabilities:")
capabilities = [
"meta-discussion",
"task-coordination",
"collaborative-reasoning",
"human-escalation",
"cross-repository-coordination"
]
for cap in capabilities:
print(f"{cap}")
print("\n🎯 Meta Discussion Test Complete!")
print("\nTo see meta discussion in action:")
print("1. Configure repositories in Hive with 'bzzz_enabled: true'")
print("2. Create complex GitHub issues labeled 'bzzz-task'")
print("3. Watch agents coordinate via Antennae P2P channel")
print("4. Monitor logs: journalctl -u bzzz.service -f | grep -i meta")
if __name__ == "__main__":
test_meta_discussion()

View File

@@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""
Simple test to check GitHub API access for bzzz-task issues.
"""
import requests
from pathlib import Path
def get_github_token():
"""Get GitHub token from secrets file."""
try:
# Try gh-token first
gh_token_path = Path("/home/tony/AI/secrets/passwords_and_tokens/gh-token")
if gh_token_path.exists():
return gh_token_path.read_text().strip()
# Try GitHub token
github_token_path = Path("/home/tony/AI/secrets/passwords_and_tokens/github-token")
if github_token_path.exists():
return github_token_path.read_text().strip()
# Fallback to GitLab token if GitHub token doesn't exist
gitlab_token_path = Path("/home/tony/AI/secrets/passwords_and_tokens/claude-gitlab-token")
if gitlab_token_path.exists():
return gitlab_token_path.read_text().strip()
except Exception:
pass
return None
def test_github_bzzz_tasks():
"""Test fetching bzzz-task issues from GitHub."""
token = get_github_token()
if not token:
print("❌ No GitHub token found")
return
print("🐙 Testing GitHub API access for bzzz-task issues...")
# Test with the hive repository
repo = "anthonyrawlins/hive"
url = f"https://api.github.com/repos/{repo}/issues"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
# First, get all open issues
print(f"\n📊 Fetching all open issues from {repo}...")
response = requests.get(url, headers=headers, params={"state": "open"}, timeout=10)
if response.status_code == 200:
all_issues = response.json()
print(f"Found {len(all_issues)} total open issues")
# Show all labels used in the repository
all_labels = set()
for issue in all_issues:
for label in issue.get('labels', []):
all_labels.add(label['name'])
print(f"All labels in use: {sorted(all_labels)}")
else:
print(f"❌ Failed to fetch issues: {response.status_code} - {response.text}")
return
# Now test for bzzz-task labeled issues
print(f"\n🐝 Fetching bzzz-task labeled issues from {repo}...")
response = requests.get(url, headers=headers, params={"labels": "bzzz-task", "state": "open"}, timeout=10)
if response.status_code == 200:
bzzz_issues = response.json()
print(f"Found {len(bzzz_issues)} issues with 'bzzz-task' label")
if not bzzz_issues:
print(" No issues found with 'bzzz-task' label")
print(" You can create test issues with this label for testing")
for issue in bzzz_issues:
print(f"\n 🎫 Issue #{issue['number']}: {issue['title']}")
print(f" State: {issue['state']}")
print(f" Labels: {[label['name'] for label in issue.get('labels', [])]}")
print(f" Assignees: {[assignee['login'] for assignee in issue.get('assignees', [])]}")
print(f" URL: {issue['html_url']}")
else:
print(f"❌ Failed to fetch bzzz-task issues: {response.status_code} - {response.text}")
def main():
print("🚀 Simple GitHub API Test for Bzzz Integration")
print("="*50)
test_github_bzzz_tasks()
if __name__ == "__main__":
main()