🎭 CHORUS now contains full BZZZ functionality adapted for containers Core systems ported: - P2P networking (libp2p with DHT and PubSub) - Task coordination (COOEE protocol) - HMMM collaborative reasoning - SHHH encryption and security - SLURP admin election system - UCXL content addressing - UCXI server integration - Hypercore logging system - Health monitoring and graceful shutdown - License validation with KACHING Container adaptations: - Environment variable configuration (no YAML files) - Container-optimized logging to stdout/stderr - Auto-generated agent IDs for container deployments - Docker-first architecture All proven BZZZ P2P protocols, AI integration, and collaboration features are now available in containerized form. Next: Build and test container deployment. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
262 lines
6.1 KiB
Go
262 lines
6.1 KiB
Go
package dht
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// DHTStats represents common DHT statistics across implementations
|
|
type DHTStats struct {
|
|
TotalKeys int `json:"total_keys"`
|
|
TotalPeers int `json:"total_peers"`
|
|
Latency time.Duration `json:"latency"`
|
|
ErrorCount int `json:"error_count"`
|
|
ErrorRate float64 `json:"error_rate"`
|
|
Uptime time.Duration `json:"uptime"`
|
|
}
|
|
|
|
// MockDHT implements the DHT interface for testing purposes
|
|
// It provides the same interface as the real DHT but operates in-memory
|
|
type MockDHT struct {
|
|
storage map[string][]byte
|
|
providers map[string][]string // key -> list of peer IDs
|
|
peers map[string]*MockPeer
|
|
latency time.Duration
|
|
failureRate float64
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
type MockPeer struct {
|
|
ID string
|
|
Address string
|
|
Online bool
|
|
}
|
|
|
|
// NewMockDHT creates a new mock DHT instance
|
|
func NewMockDHT() *MockDHT {
|
|
return &MockDHT{
|
|
storage: make(map[string][]byte),
|
|
providers: make(map[string][]string),
|
|
peers: make(map[string]*MockPeer),
|
|
latency: 10 * time.Millisecond, // Default 10ms latency
|
|
failureRate: 0.0, // No failures by default
|
|
}
|
|
}
|
|
|
|
// SetLatency configures network latency simulation
|
|
func (m *MockDHT) SetLatency(latency time.Duration) {
|
|
m.latency = latency
|
|
}
|
|
|
|
// SetFailureRate configures failure simulation (0.0 = no failures, 1.0 = always fail)
|
|
func (m *MockDHT) SetFailureRate(rate float64) {
|
|
m.failureRate = rate
|
|
}
|
|
|
|
// simulateNetworkConditions applies latency and potential failures
|
|
func (m *MockDHT) simulateNetworkConditions(ctx context.Context) error {
|
|
// Check for context cancellation
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
// Simulate network latency
|
|
if m.latency > 0 {
|
|
select {
|
|
case <-time.After(m.latency):
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Simulate network failures
|
|
if m.failureRate > 0 && rand.Float64() < m.failureRate {
|
|
return fmt.Errorf("mock network failure (simulated)")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PutValue stores a key-value pair in the DHT
|
|
func (m *MockDHT) PutValue(ctx context.Context, key string, value []byte) error {
|
|
if err := m.simulateNetworkConditions(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.storage[key] = make([]byte, len(value))
|
|
copy(m.storage[key], value)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetValue retrieves a value from the DHT
|
|
func (m *MockDHT) GetValue(ctx context.Context, key string) ([]byte, error) {
|
|
if err := m.simulateNetworkConditions(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
value, exists := m.storage[key]
|
|
if !exists {
|
|
return nil, fmt.Errorf("key not found: %s", key)
|
|
}
|
|
|
|
// Return a copy to prevent external modification
|
|
result := make([]byte, len(value))
|
|
copy(result, value)
|
|
return result, nil
|
|
}
|
|
|
|
// Provide announces that this node can provide the given key
|
|
func (m *MockDHT) Provide(ctx context.Context, key string) error {
|
|
if err := m.simulateNetworkConditions(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// Mock peer ID for this node
|
|
peerID := "mock-peer-local"
|
|
|
|
if _, exists := m.providers[key]; !exists {
|
|
m.providers[key] = make([]string, 0)
|
|
}
|
|
|
|
// Add peer to providers list if not already present
|
|
for _, existingPeer := range m.providers[key] {
|
|
if existingPeer == peerID {
|
|
return nil // Already providing
|
|
}
|
|
}
|
|
|
|
m.providers[key] = append(m.providers[key], peerID)
|
|
return nil
|
|
}
|
|
|
|
// FindProviders finds peers that can provide the given key
|
|
func (m *MockDHT) FindProviders(ctx context.Context, key string, limit int) ([]string, error) {
|
|
if err := m.simulateNetworkConditions(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
providers, exists := m.providers[key]
|
|
if !exists {
|
|
return []string{}, nil
|
|
}
|
|
|
|
// Apply limit if specified
|
|
if limit > 0 && len(providers) > limit {
|
|
result := make([]string, limit)
|
|
copy(result, providers[:limit])
|
|
return result, nil
|
|
}
|
|
|
|
// Return copy of providers
|
|
result := make([]string, len(providers))
|
|
copy(result, providers)
|
|
return result, nil
|
|
}
|
|
|
|
// AddPeer adds a mock peer to the network
|
|
func (m *MockDHT) AddPeer(peerID, address string) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.peers[peerID] = &MockPeer{
|
|
ID: peerID,
|
|
Address: address,
|
|
Online: true,
|
|
}
|
|
}
|
|
|
|
// RemovePeer removes a mock peer from the network
|
|
func (m *MockDHT) RemovePeer(peerID string) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
delete(m.peers, peerID)
|
|
|
|
// Remove from all provider lists
|
|
for key, providers := range m.providers {
|
|
filtered := make([]string, 0, len(providers))
|
|
for _, provider := range providers {
|
|
if provider != peerID {
|
|
filtered = append(filtered, provider)
|
|
}
|
|
}
|
|
m.providers[key] = filtered
|
|
}
|
|
}
|
|
|
|
// GetPeers returns all mock peers
|
|
func (m *MockDHT) GetPeers() map[string]*MockPeer {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
result := make(map[string]*MockPeer)
|
|
for id, peer := range m.peers {
|
|
result[id] = &MockPeer{
|
|
ID: peer.ID,
|
|
Address: peer.Address,
|
|
Online: peer.Online,
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// ListKeys returns all stored keys (for testing purposes)
|
|
func (m *MockDHT) ListKeys() []string {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
keys := make([]string, 0, len(m.storage))
|
|
for key := range m.storage {
|
|
keys = append(keys, key)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
// Clear removes all data from the mock DHT
|
|
func (m *MockDHT) Clear() {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.storage = make(map[string][]byte)
|
|
m.providers = make(map[string][]string)
|
|
m.peers = make(map[string]*MockPeer)
|
|
}
|
|
|
|
// GetStats returns statistics about the mock DHT
|
|
func (m *MockDHT) GetStats() DHTStats {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
return DHTStats{
|
|
TotalKeys: len(m.storage),
|
|
TotalPeers: len(m.peers),
|
|
Latency: m.latency,
|
|
ErrorCount: 0, // Mock DHT doesn't simulate errors in stats
|
|
ErrorRate: m.failureRate,
|
|
Uptime: time.Hour, // Mock uptime
|
|
}
|
|
}
|
|
|
|
type MockDHTStats struct {
|
|
TotalKeys int `json:"total_keys"`
|
|
TotalPeers int `json:"total_peers"`
|
|
TotalProviders int `json:"total_providers"`
|
|
Latency time.Duration `json:"latency"`
|
|
FailureRate float64 `json:"failure_rate"`
|
|
} |