diff --git a/pkg/slurp/distribution/consistent_hash.go b/pkg/slurp/distribution/consistent_hash.go index ea00807..cec4e23 100644 --- a/pkg/slurp/distribution/consistent_hash.go +++ b/pkg/slurp/distribution/consistent_hash.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides consistent hashing for distributed context placement package distribution diff --git a/pkg/slurp/distribution/coordinator.go b/pkg/slurp/distribution/coordinator.go index 950ee7d..e47753e 100644 --- a/pkg/slurp/distribution/coordinator.go +++ b/pkg/slurp/distribution/coordinator.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides centralized coordination for distributed context operations package distribution diff --git a/pkg/slurp/distribution/dht.go b/pkg/slurp/distribution/dht.go index 2885477..b8d59a6 100644 --- a/pkg/slurp/distribution/dht.go +++ b/pkg/slurp/distribution/dht.go @@ -2,17 +2,8 @@ package distribution import ( "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "fmt" - "sync" "time" - "chorus/pkg/config" - "chorus/pkg/crypto" - "chorus/pkg/dht" - "chorus/pkg/election" slurpContext "chorus/pkg/slurp/context" "chorus/pkg/ucxl" ) diff --git a/pkg/slurp/distribution/dht_impl.go b/pkg/slurp/distribution/dht_impl.go index 3f7d152..a171858 100644 --- a/pkg/slurp/distribution/dht_impl.go +++ b/pkg/slurp/distribution/dht_impl.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides DHT-based context distribution implementation package distribution diff --git a/pkg/slurp/distribution/distribution_stub.go b/pkg/slurp/distribution/distribution_stub.go new file mode 100644 index 0000000..2f658c0 --- /dev/null +++ b/pkg/slurp/distribution/distribution_stub.go @@ -0,0 +1,453 @@ +//go:build !slurp_full +// +build !slurp_full + +package distribution + +import ( + "context" + "sync" + "time" + + "chorus/pkg/config" + "chorus/pkg/crypto" + "chorus/pkg/dht" + "chorus/pkg/election" + slurpContext "chorus/pkg/slurp/context" + "chorus/pkg/ucxl" +) + +// DHTContextDistributor provides an in-memory stub implementation that satisfies the +// ContextDistributor interface when the full libp2p-based stack is unavailable. +type DHTContextDistributor struct { + mu sync.RWMutex + dht dht.DHT + config *config.Config + storage map[string]*slurpContext.ContextNode + stats *DistributionStatistics + policy *ReplicationPolicy +} + +// NewDHTContextDistributor returns a stub distributor that stores contexts in-memory. +func NewDHTContextDistributor( + dhtInstance dht.DHT, + roleCrypto *crypto.RoleCrypto, + electionManager election.Election, + cfg *config.Config, +) (*DHTContextDistributor, error) { + return &DHTContextDistributor{ + dht: dhtInstance, + config: cfg, + storage: make(map[string]*slurpContext.ContextNode), + stats: &DistributionStatistics{CollectedAt: time.Now()}, + policy: &ReplicationPolicy{ + DefaultFactor: 1, + MinFactor: 1, + MaxFactor: 1, + }, + }, nil +} + +func (d *DHTContextDistributor) Start(ctx context.Context) error { return nil } +func (d *DHTContextDistributor) Stop(ctx context.Context) error { return nil } + +func (d *DHTContextDistributor) DistributeContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error { + if node == nil { + return nil + } + d.mu.Lock() + defer d.mu.Unlock() + key := node.UCXLAddress.String() + d.storage[key] = node + d.stats.TotalDistributions++ + d.stats.SuccessfulDistributions++ + return nil +} + +func (d *DHTContextDistributor) RetrieveContext(ctx context.Context, address ucxl.Address, role string) (*slurpContext.ResolvedContext, error) { + d.mu.RLock() + defer d.mu.RUnlock() + if node, ok := d.storage[address.String()]; ok { + return &slurpContext.ResolvedContext{ + UCXLAddress: address, + Summary: node.Summary, + Purpose: node.Purpose, + Technologies: append([]string{}, node.Technologies...), + Tags: append([]string{}, node.Tags...), + Insights: append([]string{}, node.Insights...), + ResolvedAt: time.Now(), + }, nil + } + return nil, nil +} + +func (d *DHTContextDistributor) UpdateContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) (*ConflictResolution, error) { + if err := d.DistributeContext(ctx, node, roles); err != nil { + return nil, err + } + return &ConflictResolution{Address: node.UCXLAddress, ResolutionType: ResolutionMerged, ResolvedAt: time.Now(), Confidence: 1.0}, nil +} + +func (d *DHTContextDistributor) DeleteContext(ctx context.Context, address ucxl.Address) error { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.storage, address.String()) + return nil +} + +func (d *DHTContextDistributor) ListDistributedContexts(ctx context.Context, role string, criteria *DistributionCriteria) ([]*DistributedContextInfo, error) { + d.mu.RLock() + defer d.mu.RUnlock() + infos := make([]*DistributedContextInfo, 0, len(d.storage)) + for _, node := range d.storage { + infos = append(infos, &DistributedContextInfo{ + Address: node.UCXLAddress, + Roles: append([]string{}, role), + ReplicaCount: 1, + HealthyReplicas: 1, + LastUpdated: time.Now(), + }) + } + return infos, nil +} + +func (d *DHTContextDistributor) Sync(ctx context.Context) (*SyncResult, error) { + return &SyncResult{SyncedContexts: len(d.storage), SyncedAt: time.Now()}, nil +} + +func (d *DHTContextDistributor) Replicate(ctx context.Context, address ucxl.Address, replicationFactor int) error { + return nil +} + +func (d *DHTContextDistributor) GetReplicaHealth(ctx context.Context, address ucxl.Address) (*ReplicaHealth, error) { + d.mu.RLock() + defer d.mu.RUnlock() + _, ok := d.storage[address.String()] + return &ReplicaHealth{ + Address: address, + TotalReplicas: boolToInt(ok), + HealthyReplicas: boolToInt(ok), + FailedReplicas: 0, + OverallHealth: healthFromBool(ok), + LastChecked: time.Now(), + }, nil +} + +func (d *DHTContextDistributor) GetDistributionStats() (*DistributionStatistics, error) { + d.mu.RLock() + defer d.mu.RUnlock() + statsCopy := *d.stats + statsCopy.LastSyncTime = time.Now() + return &statsCopy, nil +} + +func (d *DHTContextDistributor) SetReplicationPolicy(policy *ReplicationPolicy) error { + d.mu.Lock() + defer d.mu.Unlock() + if policy != nil { + d.policy = policy + } + return nil +} + +func boolToInt(ok bool) int { + if ok { + return 1 + } + return 0 +} + +func healthFromBool(ok bool) HealthStatus { + if ok { + return HealthHealthy + } + return HealthDegraded +} + +// Replication manager stub ---------------------------------------------------------------------- + +type stubReplicationManager struct { + policy *ReplicationPolicy +} + +func newStubReplicationManager(policy *ReplicationPolicy) *stubReplicationManager { + if policy == nil { + policy = &ReplicationPolicy{DefaultFactor: 1, MinFactor: 1, MaxFactor: 1} + } + return &stubReplicationManager{policy: policy} +} + +func NewReplicationManager(dhtInstance dht.DHT, cfg *config.Config) (ReplicationManager, error) { + return newStubReplicationManager(nil), nil +} + +func (rm *stubReplicationManager) EnsureReplication(ctx context.Context, address ucxl.Address, factor int) error { + return nil +} + +func (rm *stubReplicationManager) RepairReplicas(ctx context.Context, address ucxl.Address) (*RepairResult, error) { + return &RepairResult{ + Address: address.String(), + RepairSuccessful: true, + RepairedAt: time.Now(), + }, nil +} + +func (rm *stubReplicationManager) BalanceReplicas(ctx context.Context) (*RebalanceResult, error) { + return &RebalanceResult{RebalanceTime: time.Millisecond, RebalanceSuccessful: true}, nil +} + +func (rm *stubReplicationManager) GetReplicationStatus(ctx context.Context, address ucxl.Address) (*ReplicationStatus, error) { + return &ReplicationStatus{ + Address: address.String(), + DesiredReplicas: rm.policy.DefaultFactor, + CurrentReplicas: rm.policy.DefaultFactor, + HealthyReplicas: rm.policy.DefaultFactor, + ReplicaDistribution: map[string]int{}, + Status: "nominal", + }, nil +} + +func (rm *stubReplicationManager) SetReplicationFactor(factor int) error { + if factor < 1 { + factor = 1 + } + rm.policy.DefaultFactor = factor + return nil +} + +func (rm *stubReplicationManager) GetReplicationStats() (*ReplicationStatistics, error) { + return &ReplicationStatistics{LastUpdated: time.Now()}, nil +} + +// Conflict resolver stub ------------------------------------------------------------------------ + +type ConflictResolverImpl struct{} + +func NewConflictResolver(dhtInstance dht.DHT, cfg *config.Config) (ConflictResolver, error) { + return &ConflictResolverImpl{}, nil +} + +func (cr *ConflictResolverImpl) ResolveConflict(ctx context.Context, local, remote *slurpContext.ContextNode) (*ConflictResolution, error) { + return &ConflictResolution{Address: local.UCXLAddress, ResolutionType: ResolutionMerged, MergedContext: local, ResolvedAt: time.Now(), Confidence: 1.0}, nil +} + +func (cr *ConflictResolverImpl) DetectConflicts(ctx context.Context, update *slurpContext.ContextNode) ([]*PotentialConflict, error) { + return []*PotentialConflict{}, nil +} + +func (cr *ConflictResolverImpl) MergeContexts(ctx context.Context, contexts []*slurpContext.ContextNode) (*slurpContext.ContextNode, error) { + if len(contexts) == 0 { + return nil, nil + } + return contexts[0], nil +} + +func (cr *ConflictResolverImpl) GetConflictHistory(ctx context.Context, address ucxl.Address) ([]*ConflictResolution, error) { + return []*ConflictResolution{}, nil +} + +func (cr *ConflictResolverImpl) SetResolutionStrategy(strategy *ResolutionStrategy) error { + return nil +} + +// Gossip protocol stub ------------------------------------------------------------------------- + +type stubGossipProtocol struct{} + +func NewGossipProtocol(dhtInstance dht.DHT, cfg *config.Config) (GossipProtocol, error) { + return &stubGossipProtocol{}, nil +} + +func (gp *stubGossipProtocol) StartGossip(ctx context.Context) error { return nil } +func (gp *stubGossipProtocol) StopGossip(ctx context.Context) error { return nil } +func (gp *stubGossipProtocol) GossipMetadata(ctx context.Context, peer string) error { return nil } +func (gp *stubGossipProtocol) GetGossipState() (*GossipState, error) { + return &GossipState{}, nil +} +func (gp *stubGossipProtocol) SetGossipInterval(interval time.Duration) error { return nil } +func (gp *stubGossipProtocol) GetGossipStats() (*GossipStatistics, error) { + return &GossipStatistics{LastUpdated: time.Now()}, nil +} + +// Network manager stub ------------------------------------------------------------------------- + +type stubNetworkManager struct { + dht dht.DHT +} + +func NewNetworkManager(dhtInstance dht.DHT, cfg *config.Config) (NetworkManager, error) { + return &stubNetworkManager{dht: dhtInstance}, nil +} + +func (nm *stubNetworkManager) DetectPartition(ctx context.Context) (*PartitionInfo, error) { + return &PartitionInfo{DetectedAt: time.Now()}, nil +} + +func (nm *stubNetworkManager) GetTopology(ctx context.Context) (*NetworkTopology, error) { + return &NetworkTopology{UpdatedAt: time.Now()}, nil +} + +func (nm *stubNetworkManager) GetPeers(ctx context.Context) ([]*PeerInfo, error) { + return []*PeerInfo{}, nil +} + +func (nm *stubNetworkManager) CheckConnectivity(ctx context.Context, peers []string) (*ConnectivityReport, error) { + report := &ConnectivityReport{ + TotalPeers: len(peers), + ReachablePeers: len(peers), + PeerResults: make(map[string]*ConnectivityResult), + TestedAt: time.Now(), + } + for _, id := range peers { + report.PeerResults[id] = &ConnectivityResult{PeerID: id, Reachable: true, TestedAt: time.Now()} + } + return report, nil +} + +func (nm *stubNetworkManager) RecoverFromPartition(ctx context.Context) (*RecoveryResult, error) { + return &RecoveryResult{RecoverySuccessful: true, RecoveredAt: time.Now()}, nil +} + +func (nm *stubNetworkManager) GetNetworkStats() (*NetworkStatistics, error) { + return &NetworkStatistics{LastUpdated: time.Now(), LastHealthCheck: time.Now()}, nil +} + +// Vector clock stub --------------------------------------------------------------------------- + +type defaultVectorClockManager struct { + mu sync.Mutex + clocks map[string]*VectorClock +} + +func NewVectorClockManager(dhtInstance dht.DHT, nodeID string) (VectorClockManager, error) { + return &defaultVectorClockManager{clocks: make(map[string]*VectorClock)}, nil +} + +func (vcm *defaultVectorClockManager) GetClock(nodeID string) (*VectorClock, error) { + vcm.mu.Lock() + defer vcm.mu.Unlock() + if clock, ok := vcm.clocks[nodeID]; ok { + return clock, nil + } + clock := &VectorClock{Clock: map[string]int64{nodeID: time.Now().Unix()}, UpdatedAt: time.Now()} + vcm.clocks[nodeID] = clock + return clock, nil +} + +func (vcm *defaultVectorClockManager) UpdateClock(nodeID string, clock *VectorClock) error { + vcm.mu.Lock() + defer vcm.mu.Unlock() + vcm.clocks[nodeID] = clock + return nil +} + +func (vcm *defaultVectorClockManager) CompareClock(clock1, clock2 *VectorClock) ClockRelation { + return ClockConcurrent +} +func (vcm *defaultVectorClockManager) MergeClock(clocks []*VectorClock) *VectorClock { + return &VectorClock{Clock: make(map[string]int64), UpdatedAt: time.Now()} +} + +// Coordinator stub ---------------------------------------------------------------------------- + +type DistributionCoordinator struct { + config *config.Config + distributor ContextDistributor + stats *CoordinationStatistics + metrics *PerformanceMetrics +} + +func NewDistributionCoordinator( + cfg *config.Config, + dhtInstance dht.DHT, + roleCrypto *crypto.RoleCrypto, + electionManager election.Election, +) (*DistributionCoordinator, error) { + distributor, err := NewDHTContextDistributor(dhtInstance, roleCrypto, electionManager, cfg) + if err != nil { + return nil, err + } + return &DistributionCoordinator{ + config: cfg, + distributor: distributor, + stats: &CoordinationStatistics{LastUpdated: time.Now()}, + metrics: &PerformanceMetrics{CollectedAt: time.Now()}, + }, nil +} + +func (dc *DistributionCoordinator) Start(ctx context.Context) error { return nil } +func (dc *DistributionCoordinator) Stop(ctx context.Context) error { return nil } + +func (dc *DistributionCoordinator) DistributeContext(ctx context.Context, request *DistributionRequest) (*DistributionResult, error) { + if request == nil || request.ContextNode == nil { + return &DistributionResult{Success: true, CompletedAt: time.Now()}, nil + } + if err := dc.distributor.DistributeContext(ctx, request.ContextNode, request.TargetRoles); err != nil { + return nil, err + } + return &DistributionResult{Success: true, DistributedNodes: []string{"local"}, CompletedAt: time.Now()}, nil +} + +func (dc *DistributionCoordinator) CoordinateReplication(ctx context.Context, address ucxl.Address, factor int) (*RebalanceResult, error) { + return &RebalanceResult{RebalanceTime: time.Millisecond, RebalanceSuccessful: true}, nil +} + +func (dc *DistributionCoordinator) ResolveConflicts(ctx context.Context, conflicts []*PotentialConflict) ([]*ConflictResolution, error) { + resolutions := make([]*ConflictResolution, 0, len(conflicts)) + for _, conflict := range conflicts { + resolutions = append(resolutions, &ConflictResolution{Address: conflict.Address, ResolutionType: ResolutionMerged, ResolvedAt: time.Now(), Confidence: 1.0}) + } + return resolutions, nil +} + +func (dc *DistributionCoordinator) GetClusterHealth() (*ClusterHealth, error) { + return &ClusterHealth{OverallStatus: HealthHealthy, LastUpdated: time.Now()}, nil +} + +func (dc *DistributionCoordinator) GetCoordinationStats() (*CoordinationStatistics, error) { + return dc.stats, nil +} + +func (dc *DistributionCoordinator) GetPerformanceMetrics() (*PerformanceMetrics, error) { + return dc.metrics, nil +} + +// Minimal type definitions (mirroring slurp_full variants) -------------------------------------- + +type CoordinationStatistics struct { + TasksProcessed int + LastUpdated time.Time +} + +type PerformanceMetrics struct { + CollectedAt time.Time +} + +type ClusterHealth struct { + OverallStatus HealthStatus + HealthyNodes int + UnhealthyNodes int + LastUpdated time.Time + ComponentHealth map[string]*ComponentHealth + Alerts []string +} + +type ComponentHealth struct { + ComponentType string + Status string + HealthScore float64 + LastCheck time.Time +} + +type DistributionRequest struct { + RequestID string + ContextNode *slurpContext.ContextNode + TargetRoles []string +} + +type DistributionResult struct { + RequestID string + Success bool + DistributedNodes []string + CompletedAt time.Time +} diff --git a/pkg/slurp/distribution/gossip.go b/pkg/slurp/distribution/gossip.go index 4f36cab..cf74f74 100644 --- a/pkg/slurp/distribution/gossip.go +++ b/pkg/slurp/distribution/gossip.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides gossip protocol for metadata synchronization package distribution @@ -9,8 +12,8 @@ import ( "sync" "time" - "chorus/pkg/dht" "chorus/pkg/config" + "chorus/pkg/dht" "chorus/pkg/ucxl" ) @@ -33,14 +36,14 @@ type GossipProtocolImpl struct { // GossipMessage represents a message in the gossip protocol type GossipMessage struct { - MessageID string `json:"message_id"` - MessageType GossipMessageType `json:"message_type"` - SenderID string `json:"sender_id"` - Timestamp time.Time `json:"timestamp"` - TTL int `json:"ttl"` - VectorClock map[string]int64 `json:"vector_clock"` - Payload map[string]interface{} `json:"payload"` - Metadata *GossipMessageMetadata `json:"metadata"` + MessageID string `json:"message_id"` + MessageType GossipMessageType `json:"message_type"` + SenderID string `json:"sender_id"` + Timestamp time.Time `json:"timestamp"` + TTL int `json:"ttl"` + VectorClock map[string]int64 `json:"vector_clock"` + Payload map[string]interface{} `json:"payload"` + Metadata *GossipMessageMetadata `json:"metadata"` } // GossipMessageType represents different types of gossip messages @@ -57,26 +60,26 @@ const ( // GossipMessageMetadata contains metadata about gossip messages type GossipMessageMetadata struct { - Priority Priority `json:"priority"` - Reliability bool `json:"reliability"` - Encrypted bool `json:"encrypted"` - Compressed bool `json:"compressed"` - OriginalSize int `json:"original_size"` - CompressionType string `json:"compression_type"` + Priority Priority `json:"priority"` + Reliability bool `json:"reliability"` + Encrypted bool `json:"encrypted"` + Compressed bool `json:"compressed"` + OriginalSize int `json:"original_size"` + CompressionType string `json:"compression_type"` } // ContextMetadata represents metadata about a distributed context type ContextMetadata struct { - Address ucxl.Address `json:"address"` - Version int64 `json:"version"` - LastUpdated time.Time `json:"last_updated"` - UpdatedBy string `json:"updated_by"` - Roles []string `json:"roles"` - Size int64 `json:"size"` - Checksum string `json:"checksum"` - ReplicationNodes []string `json:"replication_nodes"` - VectorClock map[string]int64 `json:"vector_clock"` - Status MetadataStatus `json:"status"` + Address ucxl.Address `json:"address"` + Version int64 `json:"version"` + LastUpdated time.Time `json:"last_updated"` + UpdatedBy string `json:"updated_by"` + Roles []string `json:"roles"` + Size int64 `json:"size"` + Checksum string `json:"checksum"` + ReplicationNodes []string `json:"replication_nodes"` + VectorClock map[string]int64 `json:"vector_clock"` + Status MetadataStatus `json:"status"` } // MetadataStatus represents the status of context metadata @@ -84,16 +87,16 @@ type MetadataStatus string const ( MetadataStatusActive MetadataStatus = "active" - MetadataStatusDeprecated MetadataStatus = "deprecated" + MetadataStatusDeprecated MetadataStatus = "deprecated" MetadataStatusDeleted MetadataStatus = "deleted" MetadataStatusConflicted MetadataStatus = "conflicted" ) // FailureDetector detects failed nodes in the network type FailureDetector struct { - mu sync.RWMutex - suspectedNodes map[string]time.Time - failedNodes map[string]time.Time + mu sync.RWMutex + suspectedNodes map[string]time.Time + failedNodes map[string]time.Time heartbeatTimeout time.Duration failureThreshold time.Duration } @@ -441,9 +444,9 @@ func (gp *GossipProtocolImpl) sendHeartbeat(ctx context.Context) { TTL: 1, // Heartbeats don't propagate VectorClock: gp.getVectorClock(), Payload: map[string]interface{}{ - "status": "alive", - "load": gp.calculateNodeLoad(), - "version": "1.0.0", + "status": "alive", + "load": gp.calculateNodeLoad(), + "version": "1.0.0", "capabilities": []string{"context_distribution", "replication"}, }, Metadata: &GossipMessageMetadata{ @@ -679,4 +682,4 @@ func min(a, b int) int { return a } return b -} \ No newline at end of file +} diff --git a/pkg/slurp/distribution/monitoring.go b/pkg/slurp/distribution/monitoring.go index ea440cd..043ba6b 100644 --- a/pkg/slurp/distribution/monitoring.go +++ b/pkg/slurp/distribution/monitoring.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides comprehensive monitoring and observability for distributed context operations package distribution diff --git a/pkg/slurp/distribution/network.go b/pkg/slurp/distribution/network.go index bffbb8d..f599da1 100644 --- a/pkg/slurp/distribution/network.go +++ b/pkg/slurp/distribution/network.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides network management for distributed context operations package distribution diff --git a/pkg/slurp/distribution/replication.go b/pkg/slurp/distribution/replication.go index 9af4263..4f66430 100644 --- a/pkg/slurp/distribution/replication.go +++ b/pkg/slurp/distribution/replication.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides replication management for distributed contexts package distribution diff --git a/pkg/slurp/distribution/security.go b/pkg/slurp/distribution/security.go index 79bb814..018a1c8 100644 --- a/pkg/slurp/distribution/security.go +++ b/pkg/slurp/distribution/security.go @@ -1,3 +1,6 @@ +//go:build slurp_full +// +build slurp_full + // Package distribution provides comprehensive security for distributed context operations package distribution