 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			908 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			908 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package yamux
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"log"
 | |
| 	"math"
 | |
| 	"net"
 | |
| 	"os"
 | |
| 	"runtime/debug"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	pool "github.com/libp2p/go-buffer-pool"
 | |
| )
 | |
| 
 | |
| // The MemoryManager allows management of memory allocations.
 | |
| // Memory is allocated:
 | |
| // 1. When opening / accepting a new stream. This uses the highest priority.
 | |
| // 2. When trying to increase the stream receive window. This uses a lower priority.
 | |
| // This is a subset of the libp2p's resource manager ResourceScopeSpan interface.
 | |
| type MemoryManager interface {
 | |
| 	ReserveMemory(size int, prio uint8) error
 | |
| 
 | |
| 	// ReleaseMemory explicitly releases memory previously reserved with ReserveMemory
 | |
| 	ReleaseMemory(size int)
 | |
| 
 | |
| 	// Done ends the span and releases associated resources.
 | |
| 	Done()
 | |
| }
 | |
| 
 | |
| type nullMemoryManagerImpl struct{}
 | |
| 
 | |
| func (n nullMemoryManagerImpl) ReserveMemory(size int, prio uint8) error { return nil }
 | |
| func (n nullMemoryManagerImpl) ReleaseMemory(size int)                   {}
 | |
| func (n nullMemoryManagerImpl) Done()                                    {}
 | |
| 
 | |
| var nullMemoryManager = &nullMemoryManagerImpl{}
 | |
| 
 | |
| // Session is used to wrap a reliable ordered connection and to
 | |
| // multiplex it into multiple streams.
 | |
| type Session struct {
 | |
| 	rtt int64 // to be accessed atomically, in nanoseconds
 | |
| 
 | |
| 	// remoteGoAway indicates the remote side does
 | |
| 	// not want futher connections. Must be first for alignment.
 | |
| 	remoteGoAway int32
 | |
| 
 | |
| 	// localGoAway indicates that we should stop
 | |
| 	// accepting futher connections. Must be first for alignment.
 | |
| 	localGoAway int32
 | |
| 
 | |
| 	// nextStreamID is the next stream we should
 | |
| 	// send. This depends if we are a client/server.
 | |
| 	nextStreamID uint32
 | |
| 
 | |
| 	// config holds our configuration
 | |
| 	config *Config
 | |
| 
 | |
| 	// logger is used for our logs
 | |
| 	logger *log.Logger
 | |
| 
 | |
| 	// conn is the underlying connection
 | |
| 	conn net.Conn
 | |
| 
 | |
| 	// reader is a buffered reader
 | |
| 	reader io.Reader
 | |
| 
 | |
| 	newMemoryManager func() (MemoryManager, error)
 | |
| 
 | |
| 	// pings is used to track inflight pings
 | |
| 	pingLock   sync.Mutex
 | |
| 	pingID     uint32
 | |
| 	activePing *ping
 | |
| 
 | |
| 	// streams maps a stream id to a stream, and inflight has an entry
 | |
| 	// for any outgoing stream that has not yet been established. Both are
 | |
| 	// protected by streamLock.
 | |
| 	numIncomingStreams uint32
 | |
| 	streams            map[uint32]*Stream
 | |
| 	inflight           map[uint32]struct{}
 | |
| 	streamLock         sync.Mutex
 | |
| 
 | |
| 	// synCh acts like a semaphore. It is sized to the AcceptBacklog which
 | |
| 	// is assumed to be symmetric between the client and server. This allows
 | |
| 	// the client to avoid exceeding the backlog and instead blocks the open.
 | |
| 	synCh chan struct{}
 | |
| 
 | |
| 	// acceptCh is used to pass ready streams to the client
 | |
| 	acceptCh chan *Stream
 | |
| 
 | |
| 	// sendCh is used to send messages
 | |
| 	sendCh chan []byte
 | |
| 
 | |
| 	// pingCh and pingCh are used to send pings and pongs
 | |
| 	pongCh, pingCh chan uint32
 | |
| 
 | |
| 	// recvDoneCh is closed when recv() exits to avoid a race
 | |
| 	// between stream registration and stream shutdown
 | |
| 	recvDoneCh chan struct{}
 | |
| 
 | |
| 	// sendDoneCh is closed when send() exits to avoid a race
 | |
| 	// between returning from a Stream.Write and exiting from the send loop
 | |
| 	// (which may be reading a buffer on-load-from Stream.Write).
 | |
| 	sendDoneCh chan struct{}
 | |
| 
 | |
| 	// client is true if we're the client and our stream IDs should be odd.
 | |
| 	client bool
 | |
| 
 | |
| 	// shutdown is used to safely close a session
 | |
| 	shutdown     bool
 | |
| 	shutdownErr  error
 | |
| 	shutdownCh   chan struct{}
 | |
| 	shutdownLock sync.Mutex
 | |
| 
 | |
| 	// keepaliveTimer is a periodic timer for keepalive messages. It's nil
 | |
| 	// when keepalives are disabled.
 | |
| 	keepaliveLock   sync.Mutex
 | |
| 	keepaliveTimer  *time.Timer
 | |
| 	keepaliveActive bool
 | |
| }
 | |
| 
 | |
| // newSession is used to construct a new session
 | |
| func newSession(config *Config, conn net.Conn, client bool, readBuf int, newMemoryManager func() (MemoryManager, error)) *Session {
 | |
| 	var reader io.Reader = conn
 | |
| 	if readBuf > 0 {
 | |
| 		reader = bufio.NewReaderSize(reader, readBuf)
 | |
| 	}
 | |
| 	if newMemoryManager == nil {
 | |
| 		newMemoryManager = func() (MemoryManager, error) { return nullMemoryManager, nil }
 | |
| 	}
 | |
| 	s := &Session{
 | |
| 		config:           config,
 | |
| 		client:           client,
 | |
| 		logger:           log.New(config.LogOutput, "", log.LstdFlags),
 | |
| 		conn:             conn,
 | |
| 		reader:           reader,
 | |
| 		streams:          make(map[uint32]*Stream),
 | |
| 		inflight:         make(map[uint32]struct{}),
 | |
| 		synCh:            make(chan struct{}, config.AcceptBacklog),
 | |
| 		acceptCh:         make(chan *Stream, config.AcceptBacklog),
 | |
| 		sendCh:           make(chan []byte, 64),
 | |
| 		pongCh:           make(chan uint32, config.PingBacklog),
 | |
| 		pingCh:           make(chan uint32),
 | |
| 		recvDoneCh:       make(chan struct{}),
 | |
| 		sendDoneCh:       make(chan struct{}),
 | |
| 		shutdownCh:       make(chan struct{}),
 | |
| 		newMemoryManager: newMemoryManager,
 | |
| 	}
 | |
| 	if client {
 | |
| 		s.nextStreamID = 1
 | |
| 	} else {
 | |
| 		s.nextStreamID = 2
 | |
| 	}
 | |
| 	if config.EnableKeepAlive {
 | |
| 		s.startKeepalive()
 | |
| 	}
 | |
| 	go s.recv()
 | |
| 	go s.send()
 | |
| 	go s.startMeasureRTT()
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // IsClosed does a safe check to see if we have shutdown
 | |
| func (s *Session) IsClosed() bool {
 | |
| 	select {
 | |
| 	case <-s.shutdownCh:
 | |
| 		return true
 | |
| 	default:
 | |
| 		return false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // CloseChan returns a read-only channel which is closed as
 | |
| // soon as the session is closed.
 | |
| func (s *Session) CloseChan() <-chan struct{} {
 | |
| 	return s.shutdownCh
 | |
| }
 | |
| 
 | |
| // NumStreams returns the number of currently open streams
 | |
| func (s *Session) NumStreams() int {
 | |
| 	s.streamLock.Lock()
 | |
| 	num := len(s.streams)
 | |
| 	s.streamLock.Unlock()
 | |
| 	return num
 | |
| }
 | |
| 
 | |
| // Open is used to create a new stream as a net.Conn
 | |
| func (s *Session) Open(ctx context.Context) (net.Conn, error) {
 | |
| 	conn, err := s.OpenStream(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return conn, nil
 | |
| }
 | |
| 
 | |
| // OpenStream is used to create a new stream
 | |
| func (s *Session) OpenStream(ctx context.Context) (*Stream, error) {
 | |
| 	if s.IsClosed() {
 | |
| 		return nil, s.shutdownErr
 | |
| 	}
 | |
| 	if atomic.LoadInt32(&s.remoteGoAway) == 1 {
 | |
| 		return nil, ErrRemoteGoAway
 | |
| 	}
 | |
| 
 | |
| 	// Block if we have too many inflight SYNs
 | |
| 	select {
 | |
| 	case s.synCh <- struct{}{}:
 | |
| 	case <-ctx.Done():
 | |
| 		return nil, ctx.Err()
 | |
| 	case <-s.shutdownCh:
 | |
| 		return nil, s.shutdownErr
 | |
| 	}
 | |
| 
 | |
| 	span, err := s.newMemoryManager()
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create resource scope span: %w", err)
 | |
| 	}
 | |
| 	if err := span.ReserveMemory(initialStreamWindow, 255); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| GET_ID:
 | |
| 	// Get an ID, and check for stream exhaustion
 | |
| 	id := atomic.LoadUint32(&s.nextStreamID)
 | |
| 	if id >= math.MaxUint32-1 {
 | |
| 		span.Done()
 | |
| 		return nil, ErrStreamsExhausted
 | |
| 	}
 | |
| 	if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
 | |
| 		goto GET_ID
 | |
| 	}
 | |
| 
 | |
| 	// Register the stream
 | |
| 	stream := newStream(s, id, streamInit, initialStreamWindow, span)
 | |
| 	s.streamLock.Lock()
 | |
| 	s.streams[id] = stream
 | |
| 	s.inflight[id] = struct{}{}
 | |
| 	s.streamLock.Unlock()
 | |
| 
 | |
| 	// Send the window update to create
 | |
| 	if err := stream.sendWindowUpdate(ctx.Done()); err != nil {
 | |
| 		defer span.Done()
 | |
| 		select {
 | |
| 		case <-s.synCh:
 | |
| 		default:
 | |
| 			s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return stream, nil
 | |
| }
 | |
| 
 | |
| // Accept is used to block until the next available stream
 | |
| // is ready to be accepted.
 | |
| func (s *Session) Accept() (net.Conn, error) {
 | |
| 	conn, err := s.AcceptStream()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return conn, err
 | |
| }
 | |
| 
 | |
| // AcceptStream is used to block until the next available stream
 | |
| // is ready to be accepted.
 | |
| func (s *Session) AcceptStream() (*Stream, error) {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case stream := <-s.acceptCh:
 | |
| 			if err := stream.sendWindowUpdate(nil); err != nil {
 | |
| 				// don't return accept errors.
 | |
| 				s.logger.Printf("[WARN] error sending window update before accepting: %s", err)
 | |
| 				continue
 | |
| 			}
 | |
| 			return stream, nil
 | |
| 		case <-s.shutdownCh:
 | |
| 			return nil, s.shutdownErr
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Close is used to close the session and all streams.
 | |
| // Attempts to send a GoAway before closing the connection.
 | |
| func (s *Session) Close() error {
 | |
| 	s.shutdownLock.Lock()
 | |
| 	defer s.shutdownLock.Unlock()
 | |
| 
 | |
| 	if s.shutdown {
 | |
| 		return nil
 | |
| 	}
 | |
| 	s.shutdown = true
 | |
| 	if s.shutdownErr == nil {
 | |
| 		s.shutdownErr = ErrSessionShutdown
 | |
| 	}
 | |
| 	close(s.shutdownCh)
 | |
| 	s.conn.Close()
 | |
| 	s.stopKeepalive()
 | |
| 	<-s.recvDoneCh
 | |
| 	<-s.sendDoneCh
 | |
| 
 | |
| 	s.streamLock.Lock()
 | |
| 	defer s.streamLock.Unlock()
 | |
| 	for id, stream := range s.streams {
 | |
| 		stream.forceClose()
 | |
| 		delete(s.streams, id)
 | |
| 		stream.memorySpan.Done()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // exitErr is used to handle an error that is causing the
 | |
| // session to terminate.
 | |
| func (s *Session) exitErr(err error) {
 | |
| 	s.shutdownLock.Lock()
 | |
| 	if s.shutdownErr == nil {
 | |
| 		s.shutdownErr = err
 | |
| 	}
 | |
| 	s.shutdownLock.Unlock()
 | |
| 	s.Close()
 | |
| }
 | |
| 
 | |
| // GoAway can be used to prevent accepting further
 | |
| // connections. It does not close the underlying conn.
 | |
| func (s *Session) GoAway() error {
 | |
| 	return s.sendMsg(s.goAway(goAwayNormal), nil, nil)
 | |
| }
 | |
| 
 | |
| // goAway is used to send a goAway message
 | |
| func (s *Session) goAway(reason uint32) header {
 | |
| 	atomic.SwapInt32(&s.localGoAway, 1)
 | |
| 	hdr := encode(typeGoAway, 0, 0, reason)
 | |
| 	return hdr
 | |
| }
 | |
| 
 | |
| func (s *Session) measureRTT() {
 | |
| 	rtt, err := s.Ping()
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	if !atomic.CompareAndSwapInt64(&s.rtt, 0, rtt.Nanoseconds()) {
 | |
| 		prev := atomic.LoadInt64(&s.rtt)
 | |
| 		smoothedRTT := prev/2 + rtt.Nanoseconds()/2
 | |
| 		atomic.StoreInt64(&s.rtt, smoothedRTT)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Session) startMeasureRTT() {
 | |
| 	s.measureRTT()
 | |
| 	t := time.NewTicker(s.config.MeasureRTTInterval)
 | |
| 	defer t.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-s.CloseChan():
 | |
| 			return
 | |
| 		case <-t.C:
 | |
| 			s.measureRTT()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // 0 if we don't yet have a measurement
 | |
| func (s *Session) getRTT() time.Duration {
 | |
| 	return time.Duration(atomic.LoadInt64(&s.rtt))
 | |
| }
 | |
| 
 | |
| // Ping is used to measure the RTT response time
 | |
| func (s *Session) Ping() (dur time.Duration, err error) {
 | |
| 	// Prepare a ping.
 | |
| 	s.pingLock.Lock()
 | |
| 	// If there's an active ping, jump on the bandwagon.
 | |
| 	if activePing := s.activePing; activePing != nil {
 | |
| 		s.pingLock.Unlock()
 | |
| 		return activePing.wait()
 | |
| 	}
 | |
| 
 | |
| 	// Ok, our job to send the ping.
 | |
| 	activePing := newPing(s.pingID)
 | |
| 	s.pingID++
 | |
| 	s.activePing = activePing
 | |
| 	s.pingLock.Unlock()
 | |
| 
 | |
| 	defer func() {
 | |
| 		// complete ping promise
 | |
| 		activePing.finish(dur, err)
 | |
| 
 | |
| 		// Unset it.
 | |
| 		s.pingLock.Lock()
 | |
| 		s.activePing = nil
 | |
| 		s.pingLock.Unlock()
 | |
| 	}()
 | |
| 
 | |
| 	// Send the ping request, waiting at most one connection write timeout
 | |
| 	// to flush it.
 | |
| 	timer := time.NewTimer(s.config.ConnectionWriteTimeout)
 | |
| 	defer timer.Stop()
 | |
| 	select {
 | |
| 	case s.pingCh <- activePing.id:
 | |
| 	case <-timer.C:
 | |
| 		return 0, ErrTimeout
 | |
| 	case <-s.shutdownCh:
 | |
| 		return 0, s.shutdownErr
 | |
| 	}
 | |
| 
 | |
| 	// The "time" starts once we've actually sent the ping. Otherwise, we'll
 | |
| 	// measure the time it takes to flush the queue as well.
 | |
| 	start := time.Now()
 | |
| 
 | |
| 	// Wait for a response, again waiting at most one write timeout.
 | |
| 	if !timer.Stop() {
 | |
| 		<-timer.C
 | |
| 	}
 | |
| 	timer.Reset(s.config.ConnectionWriteTimeout)
 | |
| 	select {
 | |
| 	case <-activePing.pingResponse:
 | |
| 	case <-timer.C:
 | |
| 		return 0, ErrTimeout
 | |
| 	case <-s.shutdownCh:
 | |
| 		return 0, s.shutdownErr
 | |
| 	}
 | |
| 
 | |
| 	// Compute the RTT
 | |
| 	return time.Since(start), nil
 | |
| }
 | |
| 
 | |
| // startKeepalive starts the keepalive process.
 | |
| func (s *Session) startKeepalive() {
 | |
| 	s.keepaliveLock.Lock()
 | |
| 	defer s.keepaliveLock.Unlock()
 | |
| 	s.keepaliveTimer = time.AfterFunc(s.config.KeepAliveInterval, func() {
 | |
| 		s.keepaliveLock.Lock()
 | |
| 		if s.keepaliveTimer == nil || s.keepaliveActive {
 | |
| 			// keepalives have been stopped or a keepalive is active.
 | |
| 			s.keepaliveLock.Unlock()
 | |
| 			return
 | |
| 		}
 | |
| 		s.keepaliveActive = true
 | |
| 		s.keepaliveLock.Unlock()
 | |
| 
 | |
| 		_, err := s.Ping()
 | |
| 
 | |
| 		s.keepaliveLock.Lock()
 | |
| 		s.keepaliveActive = false
 | |
| 		if s.keepaliveTimer != nil {
 | |
| 			s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
 | |
| 		}
 | |
| 		s.keepaliveLock.Unlock()
 | |
| 
 | |
| 		if err != nil {
 | |
| 			s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
 | |
| 			s.exitErr(ErrKeepAliveTimeout)
 | |
| 		}
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // stopKeepalive stops the keepalive process.
 | |
| func (s *Session) stopKeepalive() {
 | |
| 	s.keepaliveLock.Lock()
 | |
| 	defer s.keepaliveLock.Unlock()
 | |
| 	if s.keepaliveTimer != nil {
 | |
| 		s.keepaliveTimer.Stop()
 | |
| 		s.keepaliveTimer = nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Session) extendKeepalive() {
 | |
| 	s.keepaliveLock.Lock()
 | |
| 	if s.keepaliveTimer != nil && !s.keepaliveActive {
 | |
| 		// Don't stop the timer and drain the channel. This is an
 | |
| 		// AfterFunc, not a normal timer, and any attempts to drain the
 | |
| 		// channel will block forever.
 | |
| 		//
 | |
| 		// Go will stop the timer for us internally anyways. The docs
 | |
| 		// say one must stop the timer before calling reset but that's
 | |
| 		// to ensure that the timer doesn't end up firing immediately
 | |
| 		// after calling Reset.
 | |
| 		s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
 | |
| 	}
 | |
| 	s.keepaliveLock.Unlock()
 | |
| }
 | |
| 
 | |
| // send sends the header and body.
 | |
| func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
 | |
| 	select {
 | |
| 	case <-s.shutdownCh:
 | |
| 		return s.shutdownErr
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	// duplicate as we're sending this async.
 | |
| 	buf := pool.Get(headerSize + len(body))
 | |
| 	copy(buf[:headerSize], hdr[:])
 | |
| 	copy(buf[headerSize:], body)
 | |
| 
 | |
| 	select {
 | |
| 	case <-s.shutdownCh:
 | |
| 		pool.Put(buf)
 | |
| 		return s.shutdownErr
 | |
| 	case s.sendCh <- buf:
 | |
| 		return nil
 | |
| 	case <-deadline:
 | |
| 		pool.Put(buf)
 | |
| 		return ErrTimeout
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // send is a long running goroutine that sends data
 | |
| func (s *Session) send() {
 | |
| 	if err := s.sendLoop(); err != nil {
 | |
| 		s.exitErr(err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Session) sendLoop() (err error) {
 | |
| 	defer func() {
 | |
| 		if rerr := recover(); rerr != nil {
 | |
| 			fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack())
 | |
| 			err = fmt.Errorf("panic in yamux send loop: %s", rerr)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	defer close(s.sendDoneCh)
 | |
| 
 | |
| 	// Extend the write deadline if we've passed the halfway point. This can
 | |
| 	// be expensive so this ensures we only have to do this once every
 | |
| 	// ConnectionWriteTimeout/2 (usually 5s).
 | |
| 	var lastWriteDeadline time.Time
 | |
| 	extendWriteDeadline := func() error {
 | |
| 		now := time.Now()
 | |
| 		// If over half of the deadline has elapsed, extend it.
 | |
| 		if now.Add(s.config.ConnectionWriteTimeout / 2).After(lastWriteDeadline) {
 | |
| 			lastWriteDeadline = now.Add(s.config.ConnectionWriteTimeout)
 | |
| 			return s.conn.SetWriteDeadline(lastWriteDeadline)
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	writer := s.conn
 | |
| 
 | |
| 	// FIXME: https://github.com/libp2p/go-libp2p/issues/644
 | |
| 	// Write coalescing is disabled for now.
 | |
| 
 | |
| 	// writer := pool.Writer{W: s.conn}
 | |
| 
 | |
| 	// var writeTimeout *time.Timer
 | |
| 	// var writeTimeoutCh <-chan time.Time
 | |
| 	// if s.config.WriteCoalesceDelay > 0 {
 | |
| 	//	writeTimeout = time.NewTimer(s.config.WriteCoalesceDelay)
 | |
| 	//	defer writeTimeout.Stop()
 | |
| 
 | |
| 	//	writeTimeoutCh = writeTimeout.C
 | |
| 	// } else {
 | |
| 	//	ch := make(chan time.Time)
 | |
| 	//	close(ch)
 | |
| 	//	writeTimeoutCh = ch
 | |
| 	// }
 | |
| 
 | |
| 	for {
 | |
| 		// yield after processing the last message, if we've shutdown.
 | |
| 		// s.sendCh is a buffered channel and Go doesn't guarantee select order.
 | |
| 		select {
 | |
| 		case <-s.shutdownCh:
 | |
| 			return nil
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		var buf []byte
 | |
| 		// Make sure to send any pings & pongs first so they don't get stuck behind writes.
 | |
| 		select {
 | |
| 		case pingID := <-s.pingCh:
 | |
| 			buf = pool.Get(headerSize)
 | |
| 			hdr := encode(typePing, flagSYN, 0, pingID)
 | |
| 			copy(buf, hdr[:])
 | |
| 		case pingID := <-s.pongCh:
 | |
| 			buf = pool.Get(headerSize)
 | |
| 			hdr := encode(typePing, flagACK, 0, pingID)
 | |
| 			copy(buf, hdr[:])
 | |
| 		default:
 | |
| 			// Then send normal data.
 | |
| 			select {
 | |
| 			case buf = <-s.sendCh:
 | |
| 			case pingID := <-s.pingCh:
 | |
| 				buf = pool.Get(headerSize)
 | |
| 				hdr := encode(typePing, flagSYN, 0, pingID)
 | |
| 				copy(buf, hdr[:])
 | |
| 			case pingID := <-s.pongCh:
 | |
| 				buf = pool.Get(headerSize)
 | |
| 				hdr := encode(typePing, flagACK, 0, pingID)
 | |
| 				copy(buf, hdr[:])
 | |
| 			case <-s.shutdownCh:
 | |
| 				return nil
 | |
| 				// default:
 | |
| 				//	select {
 | |
| 				//	case buf = <-s.sendCh:
 | |
| 				//	case <-s.shutdownCh:
 | |
| 				//		return nil
 | |
| 				//	case <-writeTimeoutCh:
 | |
| 				//		if err := writer.Flush(); err != nil {
 | |
| 				//			if os.IsTimeout(err) {
 | |
| 				//				err = ErrConnectionWriteTimeout
 | |
| 				//			}
 | |
| 				//			return err
 | |
| 				//		}
 | |
| 
 | |
| 				//		select {
 | |
| 				//		case buf = <-s.sendCh:
 | |
| 				//		case <-s.shutdownCh:
 | |
| 				//			return nil
 | |
| 				//		}
 | |
| 
 | |
| 				//		if writeTimeout != nil {
 | |
| 				//			writeTimeout.Reset(s.config.WriteCoalesceDelay)
 | |
| 				//		}
 | |
| 				//	}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if err := extendWriteDeadline(); err != nil {
 | |
| 			pool.Put(buf)
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		_, err := writer.Write(buf)
 | |
| 		pool.Put(buf)
 | |
| 
 | |
| 		if err != nil {
 | |
| 			if os.IsTimeout(err) {
 | |
| 				err = ErrConnectionWriteTimeout
 | |
| 			}
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // recv is a long running goroutine that accepts new data
 | |
| func (s *Session) recv() {
 | |
| 	if err := s.recvLoop(); err != nil {
 | |
| 		s.exitErr(err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
 | |
| var (
 | |
| 	handlers = []func(*Session, header) error{
 | |
| 		typeData:         (*Session).handleStreamMessage,
 | |
| 		typeWindowUpdate: (*Session).handleStreamMessage,
 | |
| 		typePing:         (*Session).handlePing,
 | |
| 		typeGoAway:       (*Session).handleGoAway,
 | |
| 	}
 | |
| )
 | |
| 
 | |
| // recvLoop continues to receive data until a fatal error is encountered
 | |
| func (s *Session) recvLoop() (err error) {
 | |
| 	defer func() {
 | |
| 		if rerr := recover(); rerr != nil {
 | |
| 			fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack())
 | |
| 			err = fmt.Errorf("panic in yamux receive loop: %s", rerr)
 | |
| 		}
 | |
| 	}()
 | |
| 	defer close(s.recvDoneCh)
 | |
| 	var hdr header
 | |
| 	for {
 | |
| 		// fmt.Printf("ReadFull from %#v\n", s.reader)
 | |
| 		// Read the header
 | |
| 		if _, err := io.ReadFull(s.reader, hdr[:]); err != nil {
 | |
| 			if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
 | |
| 				s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
 | |
| 			}
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		// Reset the keepalive timer every time we receive data.
 | |
| 		// There's no reason to keepalive if we're active. Worse, if the
 | |
| 		// peer is busy sending us stuff, the pong might get stuck
 | |
| 		// behind a bunch of data.
 | |
| 		s.extendKeepalive()
 | |
| 
 | |
| 		// Verify the version
 | |
| 		if hdr.Version() != protoVersion {
 | |
| 			s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
 | |
| 			return ErrInvalidVersion
 | |
| 		}
 | |
| 
 | |
| 		mt := hdr.MsgType()
 | |
| 		if mt < typeData || mt > typeGoAway {
 | |
| 			return ErrInvalidMsgType
 | |
| 		}
 | |
| 
 | |
| 		if err := handlers[mt](s, hdr); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // handleStreamMessage handles either a data or window update frame
 | |
| func (s *Session) handleStreamMessage(hdr header) error {
 | |
| 	// Check for a new stream creation
 | |
| 	id := hdr.StreamID()
 | |
| 	flags := hdr.Flags()
 | |
| 	if flags&flagSYN == flagSYN {
 | |
| 		if err := s.incomingStream(id); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Get the stream
 | |
| 	s.streamLock.Lock()
 | |
| 	stream := s.streams[id]
 | |
| 	s.streamLock.Unlock()
 | |
| 
 | |
| 	// If we do not have a stream, likely we sent a RST
 | |
| 	if stream == nil {
 | |
| 		// Drain any data on the wire
 | |
| 		if hdr.MsgType() == typeData && hdr.Length() > 0 {
 | |
| 			s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
 | |
| 			if _, err := io.CopyN(io.Discard, s.reader, int64(hdr.Length())); err != nil {
 | |
| 				s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
 | |
| 				return nil
 | |
| 			}
 | |
| 		} else {
 | |
| 			s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Check if this is a window update
 | |
| 	if hdr.MsgType() == typeWindowUpdate {
 | |
| 		stream.incrSendWindow(hdr, flags)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Read the new data
 | |
| 	if err := stream.readData(hdr, flags, s.reader); err != nil {
 | |
| 		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
 | |
| 			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // handlePing is invoked for a typePing frame
 | |
| func (s *Session) handlePing(hdr header) error {
 | |
| 	flags := hdr.Flags()
 | |
| 	pingID := hdr.Length()
 | |
| 
 | |
| 	// Check if this is a query, respond back in a separate context so we
 | |
| 	// don't interfere with the receiving thread blocking for the write.
 | |
| 	if flags&flagSYN == flagSYN {
 | |
| 		select {
 | |
| 		case s.pongCh <- pingID:
 | |
| 		default:
 | |
| 			s.logger.Printf("[WARN] yamux: dropped ping reply")
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Handle a response
 | |
| 	s.pingLock.Lock()
 | |
| 	// If we have an active ping, and this is a response to that active
 | |
| 	// ping, complete the ping.
 | |
| 	if s.activePing != nil && s.activePing.id == pingID {
 | |
| 		// Don't assume that the peer won't send multiple responses for
 | |
| 		// the same ping.
 | |
| 		select {
 | |
| 		case s.activePing.pingResponse <- struct{}{}:
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| 	s.pingLock.Unlock()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // handleGoAway is invokde for a typeGoAway frame
 | |
| func (s *Session) handleGoAway(hdr header) error {
 | |
| 	code := hdr.Length()
 | |
| 	switch code {
 | |
| 	case goAwayNormal:
 | |
| 		atomic.SwapInt32(&s.remoteGoAway, 1)
 | |
| 	case goAwayProtoErr:
 | |
| 		s.logger.Printf("[ERR] yamux: received protocol error go away")
 | |
| 		return fmt.Errorf("yamux protocol error")
 | |
| 	case goAwayInternalErr:
 | |
| 		s.logger.Printf("[ERR] yamux: received internal error go away")
 | |
| 		return fmt.Errorf("remote yamux internal error")
 | |
| 	default:
 | |
| 		s.logger.Printf("[ERR] yamux: received unexpected go away")
 | |
| 		return fmt.Errorf("unexpected go away received")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // incomingStream is used to create a new incoming stream
 | |
| func (s *Session) incomingStream(id uint32) error {
 | |
| 	if s.client != (id%2 == 0) {
 | |
| 		s.logger.Printf("[ERR] yamux: both endpoints are clients")
 | |
| 		return fmt.Errorf("both yamux endpoints are clients")
 | |
| 	}
 | |
| 	// Reject immediately if we are doing a go away
 | |
| 	if atomic.LoadInt32(&s.localGoAway) == 1 {
 | |
| 		hdr := encode(typeWindowUpdate, flagRST, id, 0)
 | |
| 		return s.sendMsg(hdr, nil, nil)
 | |
| 	}
 | |
| 
 | |
| 	// Allocate a new stream
 | |
| 	span, err := s.newMemoryManager()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to create resource span: %w", err)
 | |
| 	}
 | |
| 	if err := span.ReserveMemory(initialStreamWindow, 255); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	stream := newStream(s, id, streamSYNReceived, initialStreamWindow, span)
 | |
| 
 | |
| 	s.streamLock.Lock()
 | |
| 	defer s.streamLock.Unlock()
 | |
| 
 | |
| 	// Check if stream already exists
 | |
| 	if _, ok := s.streams[id]; ok {
 | |
| 		s.logger.Printf("[ERR] yamux: duplicate stream declared")
 | |
| 		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
 | |
| 			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
 | |
| 		}
 | |
| 		span.Done()
 | |
| 		return ErrDuplicateStream
 | |
| 	}
 | |
| 
 | |
| 	if s.numIncomingStreams >= s.config.MaxIncomingStreams {
 | |
| 		// too many active streams at the same time
 | |
| 		s.logger.Printf("[WARN] yamux: MaxIncomingStreams exceeded, forcing stream reset")
 | |
| 		defer span.Done()
 | |
| 		hdr := encode(typeWindowUpdate, flagRST, id, 0)
 | |
| 		return s.sendMsg(hdr, nil, nil)
 | |
| 	}
 | |
| 
 | |
| 	s.numIncomingStreams++
 | |
| 	// Register the stream
 | |
| 	s.streams[id] = stream
 | |
| 
 | |
| 	// Check if we've exceeded the backlog
 | |
| 	select {
 | |
| 	case s.acceptCh <- stream:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		// Backlog exceeded! RST the stream
 | |
| 		defer span.Done()
 | |
| 		s.logger.Printf("[WARN] yamux: backlog exceeded, forcing stream reset")
 | |
| 		s.deleteStream(id)
 | |
| 		hdr := encode(typeWindowUpdate, flagRST, id, 0)
 | |
| 		return s.sendMsg(hdr, nil, nil)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // closeStream is used to close a stream once both sides have
 | |
| // issued a close. If there was an in-flight SYN and the stream
 | |
| // was not yet established, then this will give the credit back.
 | |
| func (s *Session) closeStream(id uint32) {
 | |
| 	s.streamLock.Lock()
 | |
| 	defer s.streamLock.Unlock()
 | |
| 	if _, ok := s.inflight[id]; ok {
 | |
| 		select {
 | |
| 		case <-s.synCh:
 | |
| 		default:
 | |
| 			s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
 | |
| 		}
 | |
| 		delete(s.inflight, id)
 | |
| 	}
 | |
| 	s.deleteStream(id)
 | |
| }
 | |
| 
 | |
| func (s *Session) deleteStream(id uint32) {
 | |
| 	str, ok := s.streams[id]
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 	if s.client == (id%2 == 0) {
 | |
| 		if s.numIncomingStreams == 0 {
 | |
| 			s.logger.Printf("[ERR] yamux: numIncomingStreams underflow")
 | |
| 			// prevent the creation of any new streams
 | |
| 			s.numIncomingStreams = math.MaxUint32
 | |
| 		} else {
 | |
| 			s.numIncomingStreams--
 | |
| 		}
 | |
| 	}
 | |
| 	delete(s.streams, id)
 | |
| 	str.memorySpan.Done()
 | |
| }
 | |
| 
 | |
| // establishStream is used to mark a stream that was in the
 | |
| // SYN Sent state as established.
 | |
| func (s *Session) establishStream(id uint32) {
 | |
| 	s.streamLock.Lock()
 | |
| 	if _, ok := s.inflight[id]; ok {
 | |
| 		delete(s.inflight, id)
 | |
| 	} else {
 | |
| 		s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
 | |
| 	}
 | |
| 	select {
 | |
| 	case <-s.synCh:
 | |
| 	default:
 | |
| 		s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
 | |
| 	}
 | |
| 	s.streamLock.Unlock()
 | |
| }
 |