Files
CHORUS/pkg/protocol/integration.go
anthonyrawlins 9bdcbe0447 Integrate BACKBEAT SDK and resolve KACHING license validation
Major integrations and fixes:
- Added BACKBEAT SDK integration for P2P operation timing
- Implemented beat-aware status tracking for distributed operations
- Added Docker secrets support for secure license management
- Resolved KACHING license validation via HTTPS/TLS
- Updated docker-compose configuration for clean stack deployment
- Disabled rollback policies to prevent deployment failures
- Added license credential storage (CHORUS-DEV-MULTI-001)

Technical improvements:
- BACKBEAT P2P operation tracking with phase management
- Enhanced configuration system with file-based secrets
- Improved error handling for license validation
- Clean separation of KACHING and CHORUS deployment stacks

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-06 07:56:26 +10:00

338 lines
8.7 KiB
Go

package protocol
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"chorus/pkg/config"
"chorus/pkg/dht"
"chorus/p2p"
"github.com/libp2p/go-libp2p/core/peer"
)
// ProtocolManager manages the CHORUS v2 protocol components
type ProtocolManager struct {
config *config.Config
node *p2p.Node
resolver *Resolver
enabled bool
// Local peer information
localPeer *PeerCapability
}
// NewProtocolManager creates a new protocol manager
func NewProtocolManager(cfg *config.Config, node *p2p.Node) (*ProtocolManager, error) {
if cfg == nil || node == nil {
return nil, fmt.Errorf("config and node are required")
}
pm := &ProtocolManager{
config: cfg,
node: node,
enabled: cfg.V2.Enabled,
}
// Only initialize if v2 protocol is enabled
if pm.enabled {
if err := pm.initialize(); err != nil {
return nil, fmt.Errorf("failed to initialize protocol manager: %w", err)
}
}
return pm, nil
}
// initialize sets up the protocol components
func (pm *ProtocolManager) initialize() error {
// Create resolver
resolverOpts := []ResolverOption{
WithCacheTTL(pm.config.V2.URIResolution.CacheTTL),
WithMaxPeersPerResult(pm.config.V2.URIResolution.MaxPeersPerResult),
}
// Set default strategy
switch pm.config.V2.URIResolution.DefaultStrategy {
case "exact":
resolverOpts = append(resolverOpts, WithDefaultStrategy(StrategyExact))
case "priority":
resolverOpts = append(resolverOpts, WithDefaultStrategy(StrategyPriority))
case "load_balance":
resolverOpts = append(resolverOpts, WithDefaultStrategy(StrategyLoadBalance))
default:
resolverOpts = append(resolverOpts, WithDefaultStrategy(StrategyBestMatch))
}
pm.resolver = NewResolver(pm.node.Host().Peerstore(), resolverOpts...)
// Initialize local peer information
pm.localPeer = &PeerCapability{
PeerID: pm.node.ID(),
Agent: pm.config.Agent.ID,
Role: pm.config.Agent.Role,
Capabilities: pm.config.Agent.Capabilities,
Models: pm.config.Agent.Models,
Specialization: pm.config.Agent.Specialization,
LastSeen: time.Now(),
Status: "ready",
Metadata: make(map[string]string),
}
// Add project information if available
if project := pm.getProjectFromConfig(); project != "" {
pm.localPeer.Metadata["project"] = project
}
// Register local peer
pm.resolver.RegisterPeer(pm.node.ID(), pm.localPeer)
return nil
}
// IsEnabled returns whether the v2 protocol is enabled
func (pm *ProtocolManager) IsEnabled() bool {
return pm.enabled
}
// ResolveURI resolves a CHORUS:// URI to peer addresses
func (pm *ProtocolManager) ResolveURI(ctx context.Context, uriStr string) (*ResolutionResult, error) {
if !pm.enabled {
return nil, fmt.Errorf("v2 protocol not enabled")
}
return pm.resolver.ResolveString(ctx, uriStr)
}
// RegisterPeer registers a peer's capabilities
func (pm *ProtocolManager) RegisterPeer(peerID peer.ID, capabilities *PeerCapability) {
if !pm.enabled {
return
}
pm.resolver.RegisterPeer(peerID, capabilities)
// Announce to DHT if enabled
if pm.node.IsDHTEnabled() {
pm.announcePeerToDHT(context.Background(), capabilities)
}
}
// UpdateLocalPeerStatus updates the local peer's status
func (pm *ProtocolManager) UpdateLocalPeerStatus(status string) {
if !pm.enabled {
return
}
pm.localPeer.Status = status
pm.localPeer.LastSeen = time.Now()
pm.resolver.RegisterPeer(pm.node.ID(), pm.localPeer)
}
// GetLocalPeer returns the local peer information
func (pm *ProtocolManager) GetLocalPeer() *PeerCapability {
return pm.localPeer
}
// GetAllPeers returns all known peers
func (pm *ProtocolManager) GetAllPeers() map[peer.ID]*PeerCapability {
if !pm.enabled {
return make(map[peer.ID]*PeerCapability)
}
return pm.resolver.GetPeerCapabilities()
}
// HandlePeerCapabilityMessage handles incoming peer capability messages
func (pm *ProtocolManager) HandlePeerCapabilityMessage(peerID peer.ID, data []byte) error {
if !pm.enabled {
return nil // Silently ignore if v2 not enabled
}
var capability PeerCapability
if err := json.Unmarshal(data, &capability); err != nil {
return fmt.Errorf("failed to unmarshal capability message: %w", err)
}
capability.PeerID = peerID
capability.LastSeen = time.Now()
pm.resolver.RegisterPeer(peerID, &capability)
return nil
}
// AnnounceCapabilities announces the local peer's capabilities
func (pm *ProtocolManager) AnnounceCapabilities() error {
if !pm.enabled {
return nil
}
// Update local peer information
pm.localPeer.LastSeen = time.Now()
// Announce to DHT if enabled
if pm.node.IsDHTEnabled() {
return pm.announcePeerToDHT(context.Background(), pm.localPeer)
}
return nil
}
// announcePeerToDHT announces a peer's capabilities to the DHT
func (pm *ProtocolManager) announcePeerToDHT(ctx context.Context, capability *PeerCapability) error {
dht := pm.node.DHT()
if dht == nil {
return fmt.Errorf("DHT not available")
}
// Register peer with role-based and capability-based keys
if capability.Role != "" {
dht.RegisterPeer(capability.PeerID, capability.Agent, capability.Role, capability.Capabilities)
if err := dht.AnnounceRole(ctx, capability.Role); err != nil {
// Log error but don't fail
}
}
// Announce each capability
for _, cap := range capability.Capabilities {
if err := dht.AnnounceCapability(ctx, cap); err != nil {
// Log error but don't fail
}
}
// Announce general peer presence
if err := dht.Provide(ctx, "CHORUS:peer"); err != nil {
// Log error but don't fail
}
return nil
}
// FindPeersByRole finds peers with a specific role
func (pm *ProtocolManager) FindPeersByRole(ctx context.Context, role string) ([]*PeerCapability, error) {
if !pm.enabled {
return nil, fmt.Errorf("v2 protocol not enabled")
}
// First try DHT if available
if pm.node.IsDHTEnabled() {
dhtPeers, err := pm.node.DHT().FindPeersByRole(ctx, role)
if err == nil && len(dhtPeers) > 0 {
// Convert DHT peer info to capabilities
var capabilities []*PeerCapability
for _, dhtPeer := range dhtPeers {
cap := &PeerCapability{
PeerID: dhtPeer.ID,
Agent: dhtPeer.Agent,
Role: dhtPeer.Role,
LastSeen: dhtPeer.LastSeen,
Metadata: make(map[string]string),
}
capabilities = append(capabilities, cap)
}
return capabilities, nil
}
}
// Fall back to local resolver
var result []*PeerCapability
for _, peer := range pm.resolver.GetPeerCapabilities() {
if peer.Role == role || role == "*" {
result = append(result, peer)
}
}
return result, nil
}
// ValidateURI validates a CHORUS:// URI
func (pm *ProtocolManager) ValidateURI(uriStr string) error {
if !pm.enabled {
return fmt.Errorf("v2 protocol not enabled")
}
_, err := ParseBzzzURI(uriStr)
return err
}
// CreateURI creates a CHORUS:// URI with the given components
func (pm *ProtocolManager) CreateURI(agent, role, project, task, path string) (*BzzzURI, error) {
if !pm.enabled {
return nil, fmt.Errorf("v2 protocol not enabled")
}
// Use configured defaults if components are empty
if agent == "" {
agent = pm.config.V2.SemanticAddressing.DefaultAgent
}
if role == "" {
role = pm.config.V2.SemanticAddressing.DefaultRole
}
if project == "" {
project = pm.config.V2.SemanticAddressing.DefaultProject
}
return NewBzzzURI(agent, role, project, task, path), nil
}
// GetFeatureFlags returns the current feature flags
func (pm *ProtocolManager) GetFeatureFlags() map[string]bool {
return pm.config.V2.FeatureFlags
}
// IsFeatureEnabled checks if a specific feature is enabled
func (pm *ProtocolManager) IsFeatureEnabled(feature string) bool {
if !pm.enabled {
return false
}
enabled, exists := pm.config.V2.FeatureFlags[feature]
return exists && enabled
}
// Close shuts down the protocol manager
func (pm *ProtocolManager) Close() error {
if pm.resolver != nil {
return pm.resolver.Close()
}
return nil
}
// getProjectFromConfig extracts project information from configuration
func (pm *ProtocolManager) getProjectFromConfig() string {
// Try to infer project from agent ID or other configuration
if pm.config.Agent.ID != "" {
parts := strings.Split(pm.config.Agent.ID, "-")
if len(parts) > 0 {
return parts[0]
}
}
// Default project if none can be inferred
return "CHORUS"
}
// GetStats returns protocol statistics
func (pm *ProtocolManager) GetStats() map[string]interface{} {
stats := map[string]interface{}{
"enabled": pm.enabled,
"local_peer": pm.localPeer,
"known_peers": len(pm.resolver.GetPeerCapabilities()),
}
if pm.node.IsDHTEnabled() {
dht := pm.node.DHT()
stats["dht_enabled"] = true
stats["dht_bootstrapped"] = dht.IsBootstrapped()
stats["dht_size"] = dht.GetDHTSize()
stats["dht_connected_peers"] = len(dht.GetConnectedPeers())
} else {
stats["dht_enabled"] = false
}
return stats
}