 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			641 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			641 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package identify
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"golang.org/x/exp/slices"
 | |
| 
 | |
| 	"github.com/libp2p/go-libp2p/core/event"
 | |
| 	"github.com/libp2p/go-libp2p/core/host"
 | |
| 	"github.com/libp2p/go-libp2p/core/network"
 | |
| 	"github.com/libp2p/go-libp2p/core/peerstore"
 | |
| 	"github.com/libp2p/go-libp2p/p2p/host/eventbus"
 | |
| 
 | |
| 	ma "github.com/multiformats/go-multiaddr"
 | |
| 	manet "github.com/multiformats/go-multiaddr/net"
 | |
| )
 | |
| 
 | |
| // ActivationThresh sets how many times an address must be seen as "activated"
 | |
| // and therefore advertised to other peers as an address that the local peer
 | |
| // can be contacted on. The "seen" events expire by default after 40 minutes
 | |
| // (OwnObservedAddressTTL * ActivationThreshold). The are cleaned up during
 | |
| // the GC rounds set by GCInterval.
 | |
| var ActivationThresh = 4
 | |
| 
 | |
| // GCInterval specicies how often to make a round cleaning seen events and
 | |
| // observed addresses. An address will be cleaned if it has not been seen in
 | |
| // OwnObservedAddressTTL (10 minutes). A "seen" event will be cleaned up if
 | |
| // it is older than OwnObservedAddressTTL * ActivationThresh (40 minutes).
 | |
| var GCInterval = 10 * time.Minute
 | |
| 
 | |
| // observedAddrManagerWorkerChannelSize defines how many addresses can be enqueued
 | |
| // for adding to an ObservedAddrManager.
 | |
| var observedAddrManagerWorkerChannelSize = 16
 | |
| 
 | |
| // maxObservedAddrsPerIPAndTransport is the maximum number of observed addresses
 | |
| // we will return for each (IPx/TCP or UDP) group.
 | |
| var maxObservedAddrsPerIPAndTransport = 2
 | |
| 
 | |
| // observation records an address observation from an "observer" (where every IP
 | |
| // address is a unique observer).
 | |
| type observation struct {
 | |
| 	// seenTime is the last time this observation was made.
 | |
| 	seenTime time.Time
 | |
| 	// inbound indicates whether or not this observation has been made from
 | |
| 	// an inbound connection. This remains true even if we an observation
 | |
| 	// from a subsequent outbound connection.
 | |
| 	inbound bool
 | |
| }
 | |
| 
 | |
| // observedAddr is an entry for an address reported by our peers.
 | |
| // We only use addresses that:
 | |
| //   - have been observed at least 4 times in last 40 minutes. (counter symmetric nats)
 | |
| //   - have been observed at least once recently (10 minutes), because our position in the
 | |
| //     network, or network port mapppings, may have changed.
 | |
| type observedAddr struct {
 | |
| 	addr       ma.Multiaddr
 | |
| 	seenBy     map[string]observation // peer(observer) address -> observation info
 | |
| 	lastSeen   time.Time
 | |
| 	numInbound int
 | |
| }
 | |
| 
 | |
| func (oa *observedAddr) activated() bool {
 | |
| 
 | |
| 	// We only activate if other peers observed the same address
 | |
| 	// of ours at least 4 times. SeenBy peers are removed by GC if
 | |
| 	// they say the address more than ttl*ActivationThresh
 | |
| 	return len(oa.seenBy) >= ActivationThresh
 | |
| }
 | |
| 
 | |
| // GroupKey returns the group in which this observation belongs. Currently, an
 | |
| // observed address's group is just the address with all ports set to 0. This
 | |
| // means we can advertise the most commonly observed external ports without
 | |
| // advertising _every_ observed port.
 | |
| func (oa *observedAddr) groupKey() string {
 | |
| 	key := make([]byte, 0, len(oa.addr.Bytes()))
 | |
| 	ma.ForEach(oa.addr, func(c ma.Component) bool {
 | |
| 		switch proto := c.Protocol(); proto.Code {
 | |
| 		case ma.P_TCP, ma.P_UDP:
 | |
| 			key = append(key, proto.VCode...)
 | |
| 			key = append(key, 0, 0) // zero in two bytes
 | |
| 		default:
 | |
| 			key = append(key, c.Bytes()...)
 | |
| 		}
 | |
| 		return true
 | |
| 	})
 | |
| 
 | |
| 	return string(key)
 | |
| }
 | |
| 
 | |
| type newObservation struct {
 | |
| 	conn     network.Conn
 | |
| 	observed ma.Multiaddr
 | |
| }
 | |
| 
 | |
| // ObservedAddrManager keeps track of a ObservedAddrs.
 | |
| type ObservedAddrManager struct {
 | |
| 	host host.Host
 | |
| 
 | |
| 	closeOnce sync.Once
 | |
| 	refCount  sync.WaitGroup
 | |
| 	ctx       context.Context // the context is canceled when Close is called
 | |
| 	ctxCancel context.CancelFunc
 | |
| 
 | |
| 	// latest observation from active connections
 | |
| 	// we'll "re-observe" these when we gc
 | |
| 	activeConnsMu sync.Mutex
 | |
| 	// active connection -> most recent observation
 | |
| 	activeConns map[network.Conn]ma.Multiaddr
 | |
| 
 | |
| 	mu     sync.RWMutex
 | |
| 	closed bool
 | |
| 	// local(internal) address -> list of observed(external) addresses
 | |
| 	addrs        map[string][]*observedAddr
 | |
| 	ttl          time.Duration
 | |
| 	refreshTimer *time.Timer
 | |
| 
 | |
| 	// this is the worker channel
 | |
| 	wch chan newObservation
 | |
| 
 | |
| 	reachabilitySub event.Subscription
 | |
| 	reachability    network.Reachability
 | |
| 
 | |
| 	currentUDPNATDeviceType  network.NATDeviceType
 | |
| 	currentTCPNATDeviceType  network.NATDeviceType
 | |
| 	emitNATDeviceTypeChanged event.Emitter
 | |
| }
 | |
| 
 | |
| // NewObservedAddrManager returns a new address manager using
 | |
| // peerstore.OwnObservedAddressTTL as the TTL.
 | |
| func NewObservedAddrManager(host host.Host) (*ObservedAddrManager, error) {
 | |
| 	oas := &ObservedAddrManager{
 | |
| 		addrs:       make(map[string][]*observedAddr),
 | |
| 		ttl:         peerstore.OwnObservedAddrTTL,
 | |
| 		wch:         make(chan newObservation, observedAddrManagerWorkerChannelSize),
 | |
| 		host:        host,
 | |
| 		activeConns: make(map[network.Conn]ma.Multiaddr),
 | |
| 		// refresh every ttl/2 so we don't forget observations from connected peers
 | |
| 		refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2),
 | |
| 	}
 | |
| 	oas.ctx, oas.ctxCancel = context.WithCancel(context.Background())
 | |
| 
 | |
| 	reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("identify (obsaddr)"))
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to subscribe to reachability event: %s", err)
 | |
| 	}
 | |
| 	oas.reachabilitySub = reachabilitySub
 | |
| 
 | |
| 	emitter, err := host.EventBus().Emitter(new(event.EvtNATDeviceTypeChanged), eventbus.Stateful)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create emitter for NATDeviceType: %s", err)
 | |
| 	}
 | |
| 	oas.emitNATDeviceTypeChanged = emitter
 | |
| 
 | |
| 	oas.host.Network().Notify((*obsAddrNotifiee)(oas))
 | |
| 	oas.refCount.Add(1)
 | |
| 	go oas.worker()
 | |
| 	return oas, nil
 | |
| }
 | |
| 
 | |
| // AddrsFor return all activated observed addresses associated with the given
 | |
| // (resolved) listen address.
 | |
| func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
 | |
| 	oas.mu.RLock()
 | |
| 	defer oas.mu.RUnlock()
 | |
| 
 | |
| 	if len(oas.addrs) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	observedAddrs, ok := oas.addrs[string(addr.Bytes())]
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	return oas.filter(observedAddrs)
 | |
| }
 | |
| 
 | |
| // Addrs return all activated observed addresses
 | |
| func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
 | |
| 	oas.mu.RLock()
 | |
| 	defer oas.mu.RUnlock()
 | |
| 
 | |
| 	if len(oas.addrs) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	var allObserved []*observedAddr
 | |
| 	for _, addrs := range oas.addrs {
 | |
| 		allObserved = append(allObserved, addrs...)
 | |
| 	}
 | |
| 	return oas.filter(allObserved)
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) filter(observedAddrs []*observedAddr) []ma.Multiaddr {
 | |
| 	pmap := make(map[string][]*observedAddr)
 | |
| 	now := time.Now()
 | |
| 
 | |
| 	for i := range observedAddrs {
 | |
| 		a := observedAddrs[i]
 | |
| 		if now.Sub(a.lastSeen) <= oas.ttl && a.activated() {
 | |
| 			// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
 | |
| 			pat := a.groupKey()
 | |
| 			pmap[pat] = append(pmap[pat], a)
 | |
| 
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	addrs := make([]ma.Multiaddr, 0, len(observedAddrs))
 | |
| 	for pat := range pmap {
 | |
| 		s := pmap[pat]
 | |
| 
 | |
| 		slices.SortFunc(s, func(first, second *observedAddr) int {
 | |
| 			// We prefer inbound connection observations over outbound.
 | |
| 			if first.numInbound > second.numInbound {
 | |
| 				return -1
 | |
| 			}
 | |
| 			// For ties, we prefer the ones with more votes.
 | |
| 			if first.numInbound == second.numInbound && len(first.seenBy) > len(second.seenBy) {
 | |
| 				return -1
 | |
| 			}
 | |
| 			return 1
 | |
| 		})
 | |
| 
 | |
| 		for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
 | |
| 			addrs = append(addrs, s[i].addr)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return addrs
 | |
| }
 | |
| 
 | |
| // Record records an address observation, if valid.
 | |
| func (oas *ObservedAddrManager) Record(conn network.Conn, observed ma.Multiaddr) {
 | |
| 	select {
 | |
| 	case oas.wch <- newObservation{
 | |
| 		conn:     conn,
 | |
| 		observed: observed,
 | |
| 	}:
 | |
| 	default:
 | |
| 		log.Debugw("dropping address observation due to full buffer",
 | |
| 			"from", conn.RemoteMultiaddr(),
 | |
| 			"observed", observed,
 | |
| 		)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) worker() {
 | |
| 	defer oas.refCount.Done()
 | |
| 
 | |
| 	ticker := time.NewTicker(GCInterval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	subChan := oas.reachabilitySub.Out()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case evt, ok := <-subChan:
 | |
| 			if !ok {
 | |
| 				subChan = nil
 | |
| 				continue
 | |
| 			}
 | |
| 			ev := evt.(event.EvtLocalReachabilityChanged)
 | |
| 			oas.reachability = ev.Reachability
 | |
| 		case obs := <-oas.wch:
 | |
| 			oas.maybeRecordObservation(obs.conn, obs.observed)
 | |
| 		case <-ticker.C:
 | |
| 			oas.gc()
 | |
| 		case <-oas.refreshTimer.C:
 | |
| 			oas.refresh()
 | |
| 		case <-oas.ctx.Done():
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) refresh() {
 | |
| 	oas.activeConnsMu.Lock()
 | |
| 	recycledObservations := make([]newObservation, 0, len(oas.activeConns))
 | |
| 	for conn, observed := range oas.activeConns {
 | |
| 		recycledObservations = append(recycledObservations, newObservation{
 | |
| 			conn:     conn,
 | |
| 			observed: observed,
 | |
| 		})
 | |
| 	}
 | |
| 	oas.activeConnsMu.Unlock()
 | |
| 
 | |
| 	oas.mu.Lock()
 | |
| 	defer oas.mu.Unlock()
 | |
| 	for _, obs := range recycledObservations {
 | |
| 		oas.recordObservationUnlocked(obs.conn, obs.observed)
 | |
| 	}
 | |
| 	// refresh every ttl/2 so we don't forget observations from connected peers
 | |
| 	oas.refreshTimer.Reset(oas.ttl / 2)
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) gc() {
 | |
| 	oas.mu.Lock()
 | |
| 	defer oas.mu.Unlock()
 | |
| 
 | |
| 	now := time.Now()
 | |
| 	for local, observedAddrs := range oas.addrs {
 | |
| 		filteredAddrs := observedAddrs[:0]
 | |
| 		for _, a := range observedAddrs {
 | |
| 			// clean up SeenBy set
 | |
| 			for k, ob := range a.seenBy {
 | |
| 				if now.Sub(ob.seenTime) > oas.ttl*time.Duration(ActivationThresh) {
 | |
| 					delete(a.seenBy, k)
 | |
| 					if ob.inbound {
 | |
| 						a.numInbound--
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			// leave only alive observed addresses
 | |
| 			if now.Sub(a.lastSeen) <= oas.ttl {
 | |
| 				filteredAddrs = append(filteredAddrs, a)
 | |
| 			}
 | |
| 		}
 | |
| 		if len(filteredAddrs) > 0 {
 | |
| 			oas.addrs[local] = filteredAddrs
 | |
| 		} else {
 | |
| 			delete(oas.addrs, local)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) addConn(conn network.Conn, observed ma.Multiaddr) {
 | |
| 	oas.activeConnsMu.Lock()
 | |
| 	defer oas.activeConnsMu.Unlock()
 | |
| 
 | |
| 	// We need to make sure we haven't received a disconnect event for this
 | |
| 	// connection yet. The only way to do that right now is to make sure the
 | |
| 	// swarm still has the connection.
 | |
| 	//
 | |
| 	// Doing this under a lock that we _also_ take in a disconnect event
 | |
| 	// handler ensures everything happens in the right order.
 | |
| 	for _, c := range oas.host.Network().ConnsToPeer(conn.RemotePeer()) {
 | |
| 		if c == conn {
 | |
| 			oas.activeConns[conn] = observed
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) removeConn(conn network.Conn) {
 | |
| 	// DO NOT remove this lock.
 | |
| 	// This ensures we don't call addConn at the same time:
 | |
| 	// 1. see that we have a connection and pause inside addConn right before recording it.
 | |
| 	// 2. process a disconnect event.
 | |
| 	// 3. record the connection (leaking it).
 | |
| 
 | |
| 	oas.activeConnsMu.Lock()
 | |
| 	delete(oas.activeConns, conn)
 | |
| 	oas.activeConnsMu.Unlock()
 | |
| }
 | |
| 
 | |
| type normalizeMultiaddrer interface {
 | |
| 	NormalizeMultiaddr(addr ma.Multiaddr) ma.Multiaddr
 | |
| }
 | |
| 
 | |
| type addrsProvider interface {
 | |
| 	Addrs() []ma.Multiaddr
 | |
| }
 | |
| 
 | |
| type listenAddrsProvider interface {
 | |
| 	ListenAddresses() []ma.Multiaddr
 | |
| 	InterfaceListenAddresses() ([]ma.Multiaddr, error)
 | |
| }
 | |
| 
 | |
| func shouldRecordObservation(host addrsProvider, network listenAddrsProvider, conn network.ConnMultiaddrs, observed ma.Multiaddr) bool {
 | |
| 	// First, determine if this observation is even worth keeping...
 | |
| 
 | |
| 	// Ignore observations from loopback nodes. We already know our loopback
 | |
| 	// addresses.
 | |
| 	if manet.IsIPLoopback(observed) {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// Provided by NAT64 peers, these addresses are specific to the peer and not publicly routable
 | |
| 	if manet.IsNAT64IPv4ConvertedIPv6Addr(observed) {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// we should only use ObservedAddr when our connection's LocalAddr is one
 | |
| 	// of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that
 | |
| 	// address's external mapping is not very useful because the port will not be
 | |
| 	// the same as the listen addr.
 | |
| 	ifaceaddrs, err := network.InterfaceListenAddresses()
 | |
| 	if err != nil {
 | |
| 		log.Infof("failed to get interface listen addrs", err)
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	normalizer, canNormalize := host.(normalizeMultiaddrer)
 | |
| 
 | |
| 	if canNormalize {
 | |
| 		for i, a := range ifaceaddrs {
 | |
| 			ifaceaddrs[i] = normalizer.NormalizeMultiaddr(a)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	local := conn.LocalMultiaddr()
 | |
| 	if canNormalize {
 | |
| 		local = normalizer.NormalizeMultiaddr(local)
 | |
| 	}
 | |
| 
 | |
| 	listenAddrs := network.ListenAddresses()
 | |
| 	if canNormalize {
 | |
| 		for i, a := range listenAddrs {
 | |
| 			listenAddrs[i] = normalizer.NormalizeMultiaddr(a)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if !ma.Contains(ifaceaddrs, local) && !ma.Contains(listenAddrs, local) {
 | |
| 		// not in our list
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	hostAddrs := host.Addrs()
 | |
| 	if canNormalize {
 | |
| 		for i, a := range hostAddrs {
 | |
| 			hostAddrs[i] = normalizer.NormalizeMultiaddr(a)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// We should reject the connection if the observation doesn't match the
 | |
| 	// transports of one of our advertised addresses.
 | |
| 	if !HasConsistentTransport(observed, hostAddrs) &&
 | |
| 		!HasConsistentTransport(observed, listenAddrs) {
 | |
| 		log.Debugw(
 | |
| 			"observed multiaddr doesn't match the transports of any announced addresses",
 | |
| 			"from", conn.RemoteMultiaddr(),
 | |
| 			"observed", observed,
 | |
| 		)
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) maybeRecordObservation(conn network.Conn, observed ma.Multiaddr) {
 | |
| 	shouldRecord := shouldRecordObservation(oas.host, oas.host.Network(), conn, observed)
 | |
| 	if shouldRecord {
 | |
| 		// Ok, the observation is good, record it.
 | |
| 		log.Debugw("added own observed listen addr", "observed", observed)
 | |
| 		defer oas.addConn(conn, observed)
 | |
| 
 | |
| 		oas.mu.Lock()
 | |
| 		defer oas.mu.Unlock()
 | |
| 		oas.recordObservationUnlocked(conn, observed)
 | |
| 
 | |
| 		if oas.reachability == network.ReachabilityPrivate {
 | |
| 			oas.emitAllNATTypes()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) recordObservationUnlocked(conn network.Conn, observed ma.Multiaddr) {
 | |
| 	now := time.Now()
 | |
| 	observerString := observerGroup(conn.RemoteMultiaddr())
 | |
| 	localString := string(conn.LocalMultiaddr().Bytes())
 | |
| 	ob := observation{
 | |
| 		seenTime: now,
 | |
| 		inbound:  conn.Stat().Direction == network.DirInbound,
 | |
| 	}
 | |
| 
 | |
| 	// check if observed address seen yet, if so, update it
 | |
| 	for _, observedAddr := range oas.addrs[localString] {
 | |
| 		if observedAddr.addr.Equal(observed) {
 | |
| 			// Don't trump an outbound observation with an inbound
 | |
| 			// one.
 | |
| 			wasInbound := observedAddr.seenBy[observerString].inbound
 | |
| 			isInbound := ob.inbound
 | |
| 			ob.inbound = isInbound || wasInbound
 | |
| 
 | |
| 			if !wasInbound && isInbound {
 | |
| 				observedAddr.numInbound++
 | |
| 			}
 | |
| 
 | |
| 			observedAddr.seenBy[observerString] = ob
 | |
| 			observedAddr.lastSeen = now
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// observed address not seen yet, append it
 | |
| 	oa := &observedAddr{
 | |
| 		addr: observed,
 | |
| 		seenBy: map[string]observation{
 | |
| 			observerString: ob,
 | |
| 		},
 | |
| 		lastSeen: now,
 | |
| 	}
 | |
| 	if ob.inbound {
 | |
| 		oa.numInbound++
 | |
| 	}
 | |
| 	oas.addrs[localString] = append(oas.addrs[localString], oa)
 | |
| }
 | |
| 
 | |
| // For a given transport Protocol (TCP/UDP):
 | |
| //
 | |
| // 1. If we have an activated address, we are behind an Cone NAT.
 | |
| // With regards to RFC 3489, this could be either a Full Cone NAT, a Restricted Cone NAT or a
 | |
| // Port Restricted Cone NAT. However, we do NOT differentiate between them here and simply classify all such NATs as a Cone NAT.
 | |
| //
 | |
| // 2. If four different peers observe a different address for us on outbound connections, we
 | |
| // are MOST probably behind a Symmetric NAT.
 | |
| //
 | |
| // Please see the documentation on the enumerations for `network.NATDeviceType` for more details about these NAT Device types
 | |
| // and how they relate to NAT traversal via Hole Punching.
 | |
| func (oas *ObservedAddrManager) emitAllNATTypes() {
 | |
| 	var allObserved []*observedAddr
 | |
| 	for _, addrs := range oas.addrs {
 | |
| 		allObserved = append(allObserved, addrs...)
 | |
| 	}
 | |
| 
 | |
| 	hasChanged, natType := oas.emitSpecificNATType(allObserved, ma.P_TCP, network.NATTransportTCP, oas.currentTCPNATDeviceType)
 | |
| 	if hasChanged {
 | |
| 		oas.currentTCPNATDeviceType = natType
 | |
| 	}
 | |
| 
 | |
| 	hasChanged, natType = oas.emitSpecificNATType(allObserved, ma.P_UDP, network.NATTransportUDP, oas.currentUDPNATDeviceType)
 | |
| 	if hasChanged {
 | |
| 		oas.currentUDPNATDeviceType = natType
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // returns true along with the new NAT device type if the NAT device type for the given protocol has changed.
 | |
| // returns false otherwise.
 | |
| func (oas *ObservedAddrManager) emitSpecificNATType(addrs []*observedAddr, protoCode int, transportProto network.NATTransportProtocol,
 | |
| 	currentNATType network.NATDeviceType) (bool, network.NATDeviceType) {
 | |
| 	now := time.Now()
 | |
| 	seenBy := make(map[string]struct{})
 | |
| 	cnt := 0
 | |
| 
 | |
| 	for _, oa := range addrs {
 | |
| 		_, err := oa.addr.ValueForProtocol(protoCode)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// if we have an activated addresses, it's a Cone NAT.
 | |
| 		if now.Sub(oa.lastSeen) <= oas.ttl && oa.activated() {
 | |
| 			if currentNATType != network.NATDeviceTypeCone {
 | |
| 				oas.emitNATDeviceTypeChanged.Emit(event.EvtNATDeviceTypeChanged{
 | |
| 					TransportProtocol: transportProto,
 | |
| 					NatDeviceType:     network.NATDeviceTypeCone,
 | |
| 				})
 | |
| 				return true, network.NATDeviceTypeCone
 | |
| 			}
 | |
| 
 | |
| 			// our current NAT Device Type is already CONE, nothing to do here.
 | |
| 			return false, 0
 | |
| 		}
 | |
| 
 | |
| 		// An observed address on an outbound connection that has ONLY been seen by one peer
 | |
| 		if now.Sub(oa.lastSeen) <= oas.ttl && oa.numInbound == 0 && len(oa.seenBy) == 1 {
 | |
| 			cnt++
 | |
| 			for s := range oa.seenBy {
 | |
| 				seenBy[s] = struct{}{}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If four different peers observe a different address for us on each of four outbound connections, we
 | |
| 	// are MOST probably behind a Symmetric NAT.
 | |
| 	if cnt >= ActivationThresh && len(seenBy) >= ActivationThresh {
 | |
| 		if currentNATType != network.NATDeviceTypeSymmetric {
 | |
| 			oas.emitNATDeviceTypeChanged.Emit(event.EvtNATDeviceTypeChanged{
 | |
| 				TransportProtocol: transportProto,
 | |
| 				NatDeviceType:     network.NATDeviceTypeSymmetric,
 | |
| 			})
 | |
| 			return true, network.NATDeviceTypeSymmetric
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return false, 0
 | |
| }
 | |
| 
 | |
| func (oas *ObservedAddrManager) Close() error {
 | |
| 	oas.closeOnce.Do(func() {
 | |
| 		oas.ctxCancel()
 | |
| 
 | |
| 		oas.mu.Lock()
 | |
| 		oas.closed = true
 | |
| 		oas.refreshTimer.Stop()
 | |
| 		oas.mu.Unlock()
 | |
| 
 | |
| 		oas.refCount.Wait()
 | |
| 		oas.reachabilitySub.Close()
 | |
| 		oas.host.Network().StopNotify((*obsAddrNotifiee)(oas))
 | |
| 	})
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // observerGroup is a function that determines what part of
 | |
| // a multiaddr counts as a different observer. for example,
 | |
| // two ipfs nodes at the same IP/TCP transport would get
 | |
| // the exact same NAT mapping; they would count as the
 | |
| // same observer. This may protect against NATs who assign
 | |
| // different ports to addresses at different IP hosts, but
 | |
| // not TCP ports.
 | |
| //
 | |
| // Here, we use the root multiaddr address. This is mostly
 | |
| // IP addresses. In practice, this is what we want.
 | |
| func observerGroup(m ma.Multiaddr) string {
 | |
| 	// TODO: If IPv6 rolls out we should mark /64 routing zones as one group
 | |
| 	first, _ := ma.SplitFirst(m)
 | |
| 	return string(first.Bytes())
 | |
| }
 | |
| 
 | |
| // SetTTL sets the TTL of an observed address manager.
 | |
| func (oas *ObservedAddrManager) SetTTL(ttl time.Duration) {
 | |
| 	oas.mu.Lock()
 | |
| 	defer oas.mu.Unlock()
 | |
| 	if oas.closed {
 | |
| 		return
 | |
| 	}
 | |
| 	oas.ttl = ttl
 | |
| 	// refresh every ttl/2 so we don't forget observations from connected peers
 | |
| 	oas.refreshTimer.Reset(ttl / 2)
 | |
| }
 | |
| 
 | |
| // TTL gets the TTL of an observed address manager.
 | |
| func (oas *ObservedAddrManager) TTL() time.Duration {
 | |
| 	oas.mu.RLock()
 | |
| 	defer oas.mu.RUnlock()
 | |
| 	return oas.ttl
 | |
| }
 | |
| 
 | |
| type obsAddrNotifiee ObservedAddrManager
 | |
| 
 | |
| func (on *obsAddrNotifiee) Listen(n network.Network, a ma.Multiaddr)      {}
 | |
| func (on *obsAddrNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}
 | |
| func (on *obsAddrNotifiee) Connected(n network.Network, v network.Conn)   {}
 | |
| func (on *obsAddrNotifiee) Disconnected(n network.Network, v network.Conn) {
 | |
| 	(*ObservedAddrManager)(on).removeConn(v)
 | |
| }
 |