 131868bdca
			
		
	
	131868bdca
	
	
	
		
			
			Major security, observability, and configuration improvements:
## Security Hardening
- Implemented configurable CORS (no more wildcards)
- Added comprehensive auth middleware for admin endpoints
- Enhanced webhook HMAC validation
- Added input validation and rate limiting
- Security headers and CSP policies
## Configuration Management
- Made N8N webhook URL configurable (WHOOSH_N8N_BASE_URL)
- Replaced all hardcoded endpoints with environment variables
- Added feature flags for LLM vs heuristic composition
- Gitea fetch hardening with EAGER_FILTER and FULL_RESCAN options
## API Completeness
- Implemented GetCouncilComposition function
- Added GET /api/v1/councils/{id} endpoint
- Council artifacts API (POST/GET /api/v1/councils/{id}/artifacts)
- /admin/health/details endpoint with component status
- Database lookup for repository URLs (no hardcoded fallbacks)
## Observability & Performance
- Added OpenTelemetry distributed tracing with goal/pulse correlation
- Performance optimization database indexes
- Comprehensive health monitoring
- Enhanced logging and error handling
## Infrastructure
- Production-ready P2P discovery (replaces mock implementation)
- Removed unused Redis configuration
- Enhanced Docker Swarm integration
- Added migration files for performance indexes
## Code Quality
- Comprehensive input validation
- Graceful error handling and failsafe fallbacks
- Backwards compatibility maintained
- Following security best practices
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
		
	
		
			
				
	
	
		
			3881 lines
		
	
	
		
			110 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			3881 lines
		
	
	
		
			110 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020-2023 The NATS Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package nats
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"crypto/sha256"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/nats-io/nats.go/internal/parser"
 | |
| 	"github.com/nats-io/nuid"
 | |
| )
 | |
| 
 | |
| // JetStream allows persistent messaging through JetStream.
 | |
| //
 | |
| // NOTE: JetStream is part of legacy API.
 | |
| // Users are encouraged to switch to the new JetStream API for enhanced capabilities and
 | |
| // simplified API. Please refer to the `jetstream` package.
 | |
| // See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
 | |
| type JetStream interface {
 | |
| 	// Publish publishes a message to JetStream.
 | |
| 	Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
 | |
| 
 | |
| 	// PublishMsg publishes a Msg to JetStream.
 | |
| 	PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)
 | |
| 
 | |
| 	// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
 | |
| 	// The data should not be changed until the PubAckFuture has been processed.
 | |
| 	PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
 | |
| 
 | |
| 	// PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture.
 | |
| 	// The message should not be changed until the PubAckFuture has been processed.
 | |
| 	PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
 | |
| 
 | |
| 	// PublishAsyncPending returns the number of async publishes outstanding for this context.
 | |
| 	PublishAsyncPending() int
 | |
| 
 | |
| 	// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
 | |
| 	PublishAsyncComplete() <-chan struct{}
 | |
| 
 | |
| 	// Subscribe creates an async Subscription for JetStream.
 | |
| 	// The stream and consumer names can be provided with the nats.Bind() option.
 | |
| 	// For creating an ephemeral (where the consumer name is picked by the server),
 | |
| 	// you can provide the stream name with nats.BindStream().
 | |
| 	// If no stream name is specified, the library will attempt to figure out which
 | |
| 	// stream the subscription is for. See important notes below for more details.
 | |
| 	//
 | |
| 	// IMPORTANT NOTES:
 | |
| 	// * If none of the options Bind() nor Durable() are specified, the library will
 | |
| 	// send a request to the server to create an ephemeral JetStream consumer,
 | |
| 	// which will be deleted after an Unsubscribe() or Drain(), or automatically
 | |
| 	// by the server after a short period of time after the NATS subscription is
 | |
| 	// gone.
 | |
| 	// * If Durable() option is specified, the library will attempt to lookup a JetStream
 | |
| 	// consumer with this name, and if found, will bind to it and not attempt to
 | |
| 	// delete it. However, if not found, the library will send a request to
 | |
| 	// create such durable JetStream consumer. Note that the library will delete
 | |
| 	// the JetStream consumer after an Unsubscribe() or Drain() only if it
 | |
| 	// created the durable consumer while subscribing. If the durable consumer
 | |
| 	// already existed prior to subscribing it won't be deleted.
 | |
| 	// * If Bind() option is provided, the library will attempt to lookup the
 | |
| 	// consumer with the given name, and if successful, bind to it. If the lookup fails,
 | |
| 	// then the Subscribe() call will return an error.
 | |
| 	Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
 | |
| 
 | |
| 	// SubscribeSync creates a Subscription that can be used to process messages synchronously.
 | |
| 	// See important note in Subscribe()
 | |
| 	SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
 | |
| 
 | |
| 	// ChanSubscribe creates channel based Subscription.
 | |
| 	// See important note in Subscribe()
 | |
| 	ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
 | |
| 
 | |
| 	// ChanQueueSubscribe creates channel based Subscription with a queue group.
 | |
| 	// See important note in QueueSubscribe()
 | |
| 	ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
 | |
| 
 | |
| 	// QueueSubscribe creates a Subscription with a queue group.
 | |
| 	// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
 | |
| 	// See important note in Subscribe()
 | |
| 	QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
 | |
| 
 | |
| 	// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
 | |
| 	// See important note in QueueSubscribe()
 | |
| 	QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
 | |
| 
 | |
| 	// PullSubscribe creates a Subscription that can fetch messages.
 | |
| 	// See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be
 | |
| 	// set to an empty string.
 | |
| 	PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
 | |
| }
 | |
| 
 | |
| // JetStreamContext allows JetStream messaging and stream management.
 | |
| //
 | |
| // NOTE: JetStreamContext is part of legacy API.
 | |
| // Users are encouraged to switch to the new JetStream API for enhanced capabilities and
 | |
| // simplified API. Please refer to the `jetstream` package.
 | |
| // See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
 | |
| type JetStreamContext interface {
 | |
| 	JetStream
 | |
| 	JetStreamManager
 | |
| 	KeyValueManager
 | |
| 	ObjectStoreManager
 | |
| }
 | |
| 
 | |
| // Request API subjects for JetStream.
 | |
| const (
 | |
| 	// defaultAPIPrefix is the default prefix for the JetStream API.
 | |
| 	defaultAPIPrefix = "$JS.API."
 | |
| 
 | |
| 	// jsDomainT is used to create JetStream API prefix by specifying only Domain
 | |
| 	jsDomainT = "$JS.%s.API."
 | |
| 
 | |
| 	// jsExtDomainT is used to create a StreamSource External APIPrefix
 | |
| 	jsExtDomainT = "$JS.%s.API"
 | |
| 
 | |
| 	// apiAccountInfo is for obtaining general information about JetStream.
 | |
| 	apiAccountInfo = "INFO"
 | |
| 
 | |
| 	// apiConsumerCreateT is used to create consumers.
 | |
| 	// it accepts stream name and consumer name.
 | |
| 	apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"
 | |
| 
 | |
| 	// apiConsumerCreateT is used to create consumers.
 | |
| 	// it accepts stream name, consumer name and filter subject
 | |
| 	apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"
 | |
| 
 | |
| 	// apiLegacyConsumerCreateT is used to create consumers.
 | |
| 	// this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0.
 | |
| 	apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"
 | |
| 
 | |
| 	// apiDurableCreateT is used to create durable consumers.
 | |
| 	// this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0.
 | |
| 	apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
 | |
| 
 | |
| 	// apiConsumerInfoT is used to create consumers.
 | |
| 	apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
 | |
| 
 | |
| 	// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
 | |
| 	apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
 | |
| 
 | |
| 	// apiConsumerDeleteT is used to delete consumers.
 | |
| 	apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
 | |
| 
 | |
| 	// apiConsumerListT is used to return all detailed consumer information
 | |
| 	apiConsumerListT = "CONSUMER.LIST.%s"
 | |
| 
 | |
| 	// apiConsumerNamesT is used to return a list with all consumer names for the stream.
 | |
| 	apiConsumerNamesT = "CONSUMER.NAMES.%s"
 | |
| 
 | |
| 	// apiStreams can lookup a stream by subject.
 | |
| 	apiStreams = "STREAM.NAMES"
 | |
| 
 | |
| 	// apiStreamCreateT is the endpoint to create new streams.
 | |
| 	apiStreamCreateT = "STREAM.CREATE.%s"
 | |
| 
 | |
| 	// apiStreamInfoT is the endpoint to get information on a stream.
 | |
| 	apiStreamInfoT = "STREAM.INFO.%s"
 | |
| 
 | |
| 	// apiStreamUpdateT is the endpoint to update existing streams.
 | |
| 	apiStreamUpdateT = "STREAM.UPDATE.%s"
 | |
| 
 | |
| 	// apiStreamDeleteT is the endpoint to delete streams.
 | |
| 	apiStreamDeleteT = "STREAM.DELETE.%s"
 | |
| 
 | |
| 	// apiStreamPurgeT is the endpoint to purge streams.
 | |
| 	apiStreamPurgeT = "STREAM.PURGE.%s"
 | |
| 
 | |
| 	// apiStreamListT is the endpoint that will return all detailed stream information
 | |
| 	apiStreamListT = "STREAM.LIST"
 | |
| 
 | |
| 	// apiMsgGetT is the endpoint to get a message.
 | |
| 	apiMsgGetT = "STREAM.MSG.GET.%s"
 | |
| 
 | |
| 	// apiMsgGetT is the endpoint to perform a direct get of a message.
 | |
| 	apiDirectMsgGetT = "DIRECT.GET.%s"
 | |
| 
 | |
| 	// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
 | |
| 	apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"
 | |
| 
 | |
| 	// apiMsgDeleteT is the endpoint to remove a message.
 | |
| 	apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
 | |
| 
 | |
| 	// orderedHeartbeatsInterval is how fast we want HBs from the server during idle.
 | |
| 	orderedHeartbeatsInterval = 5 * time.Second
 | |
| 
 | |
| 	// Scale for threshold of missed HBs or lack of activity.
 | |
| 	hbcThresh = 2
 | |
| 
 | |
| 	// For ChanSubscription, we can't update sub.delivered as we do for other
 | |
| 	// type of subscriptions, since the channel is user provided.
 | |
| 	// With flow control in play, we will check for flow control on incoming
 | |
| 	// messages (as opposed to when they are delivered), but also from a go
 | |
| 	// routine. Without this, the subscription would possibly stall until
 | |
| 	// a new message or heartbeat/fc are received.
 | |
| 	chanSubFCCheckInterval = 250 * time.Millisecond
 | |
| 
 | |
| 	// Default time wait between retries on Publish iff err is NoResponders.
 | |
| 	DefaultPubRetryWait = 250 * time.Millisecond
 | |
| 
 | |
| 	// Default number of retries
 | |
| 	DefaultPubRetryAttempts = 2
 | |
| 
 | |
| 	// defaultAsyncPubAckInflight is the number of async pub acks inflight.
 | |
| 	defaultAsyncPubAckInflight = 4000
 | |
| )
 | |
| 
 | |
| // Types of control messages, so far heartbeat and flow control
 | |
| const (
 | |
| 	jsCtrlHB = 1
 | |
| 	jsCtrlFC = 2
 | |
| )
 | |
| 
 | |
| // js is an internal struct from a JetStreamContext.
 | |
| type js struct {
 | |
| 	nc   *Conn
 | |
| 	opts *jsOpts
 | |
| 
 | |
| 	// For async publish context.
 | |
| 	mu             sync.RWMutex
 | |
| 	rpre           string
 | |
| 	rsub           *Subscription
 | |
| 	pafs           map[string]*pubAckFuture
 | |
| 	stc            chan struct{}
 | |
| 	dch            chan struct{}
 | |
| 	rr             *rand.Rand
 | |
| 	connStatusCh   chan (Status)
 | |
| 	replyPrefix    string
 | |
| 	replyPrefixLen int
 | |
| }
 | |
| 
 | |
| type jsOpts struct {
 | |
| 	ctx context.Context
 | |
| 	// For importing JetStream from other accounts.
 | |
| 	pre string
 | |
| 	// Amount of time to wait for API requests.
 | |
| 	wait time.Duration
 | |
| 	// For async publish error handling.
 | |
| 	aecb MsgErrHandler
 | |
| 	// Max async pub ack in flight
 | |
| 	maxpa int
 | |
| 	// the domain that produced the pre
 | |
| 	domain string
 | |
| 	// enables protocol tracing
 | |
| 	ctrace      ClientTrace
 | |
| 	shouldTrace bool
 | |
| 	// purgeOpts contains optional stream purge options
 | |
| 	purgeOpts *StreamPurgeRequest
 | |
| 	// streamInfoOpts contains optional stream info options
 | |
| 	streamInfoOpts *StreamInfoRequest
 | |
| 	// streamListSubject is used for subject filtering when listing streams / stream names
 | |
| 	streamListSubject string
 | |
| 	// For direct get message requests
 | |
| 	directGet bool
 | |
| 	// For direct get next message
 | |
| 	directNextFor string
 | |
| 
 | |
| 	// featureFlags are used to enable/disable specific JetStream features
 | |
| 	featureFlags featureFlags
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	defaultRequestWait  = 5 * time.Second
 | |
| 	defaultAccountCheck = 20 * time.Second
 | |
| )
 | |
| 
 | |
| // JetStream returns a JetStreamContext for messaging and stream management.
 | |
| // Errors are only returned if inconsistent options are provided.
 | |
| //
 | |
| // NOTE: JetStreamContext is part of legacy API.
 | |
| // Users are encouraged to switch to the new JetStream API for enhanced capabilities and
 | |
| // simplified API. Please refer to the `jetstream` package.
 | |
| // See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
 | |
| func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
 | |
| 	js := &js{
 | |
| 		nc: nc,
 | |
| 		opts: &jsOpts{
 | |
| 			pre:   defaultAPIPrefix,
 | |
| 			wait:  defaultRequestWait,
 | |
| 			maxpa: defaultAsyncPubAckInflight,
 | |
| 		},
 | |
| 	}
 | |
| 	inboxPrefix := InboxPrefix
 | |
| 	if js.nc.Opts.InboxPrefix != _EMPTY_ {
 | |
| 		inboxPrefix = js.nc.Opts.InboxPrefix + "."
 | |
| 	}
 | |
| 	js.replyPrefix = inboxPrefix
 | |
| 	js.replyPrefixLen = len(js.replyPrefix) + aReplyTokensize + 1
 | |
| 
 | |
| 	for _, opt := range opts {
 | |
| 		if err := opt.configureJSContext(js.opts); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return js, nil
 | |
| }
 | |
| 
 | |
| // JSOpt configures a JetStreamContext.
 | |
| type JSOpt interface {
 | |
| 	configureJSContext(opts *jsOpts) error
 | |
| }
 | |
| 
 | |
| // jsOptFn configures an option for the JetStreamContext.
 | |
| type jsOptFn func(opts *jsOpts) error
 | |
| 
 | |
| func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
 | |
| 	return opt(opts)
 | |
| }
 | |
| 
 | |
| type featureFlags struct {
 | |
| 	useDurableConsumerCreate bool
 | |
| }
 | |
| 
 | |
| // UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
 | |
| // If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
 | |
| // to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
 | |
| func UseLegacyDurableConsumers() JSOpt {
 | |
| 	return jsOptFn(func(opts *jsOpts) error {
 | |
| 		opts.featureFlags.useDurableConsumerCreate = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ClientTrace can be used to trace API interactions for the JetStream Context.
 | |
| type ClientTrace struct {
 | |
| 	RequestSent      func(subj string, payload []byte)
 | |
| 	ResponseReceived func(subj string, payload []byte, hdr Header)
 | |
| }
 | |
| 
 | |
| func (ct ClientTrace) configureJSContext(js *jsOpts) error {
 | |
| 	js.ctrace = ct
 | |
| 	js.shouldTrace = true
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Domain changes the domain part of JetStream API prefix.
 | |
| func Domain(domain string) JSOpt {
 | |
| 	if domain == _EMPTY_ {
 | |
| 		return APIPrefix(_EMPTY_)
 | |
| 	}
 | |
| 
 | |
| 	return jsOptFn(func(js *jsOpts) error {
 | |
| 		js.domain = domain
 | |
| 		js.pre = fmt.Sprintf(jsDomainT, domain)
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| }
 | |
| 
 | |
| func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
 | |
| 	js.purgeOpts = s
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *StreamInfoRequest) configureJSContext(js *jsOpts) error {
 | |
| 	js.streamInfoOpts = s
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // APIPrefix changes the default prefix used for the JetStream API.
 | |
| func APIPrefix(pre string) JSOpt {
 | |
| 	return jsOptFn(func(js *jsOpts) error {
 | |
| 		if pre == _EMPTY_ {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		js.pre = pre
 | |
| 		if !strings.HasSuffix(js.pre, ".") {
 | |
| 			js.pre = js.pre + "."
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // DirectGet is an option that can be used to make GetMsg() or GetLastMsg()
 | |
| // retrieve message directly from a group of servers (leader and replicas)
 | |
| // if the stream was created with the AllowDirect option.
 | |
| func DirectGet() JSOpt {
 | |
| 	return jsOptFn(func(js *jsOpts) error {
 | |
| 		js.directGet = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // DirectGetNext is an option that can be used to make GetMsg() retrieve message
 | |
| // directly from a group of servers (leader and replicas) if the stream was
 | |
| // created with the AllowDirect option.
 | |
| // The server will find the next message matching the filter `subject` starting
 | |
| // at the start sequence (argument in GetMsg()). The filter `subject` can be a
 | |
| // wildcard.
 | |
| func DirectGetNext(subject string) JSOpt {
 | |
| 	return jsOptFn(func(js *jsOpts) error {
 | |
| 		js.directGet = true
 | |
| 		js.directNextFor = subject
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests.
 | |
| // It allows filtering the returned streams by subject associated with each stream.
 | |
| // Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return
 | |
| // all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
 | |
| func StreamListFilter(subject string) JSOpt {
 | |
| 	return jsOptFn(func(opts *jsOpts) error {
 | |
| 		opts.streamListSubject = subject
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (js *js) apiSubj(subj string) string {
 | |
| 	if js.opts.pre == _EMPTY_ {
 | |
| 		return subj
 | |
| 	}
 | |
| 	var b strings.Builder
 | |
| 	b.WriteString(js.opts.pre)
 | |
| 	b.WriteString(subj)
 | |
| 	return b.String()
 | |
| }
 | |
| 
 | |
| // PubOpt configures options for publishing JetStream messages.
 | |
| type PubOpt interface {
 | |
| 	configurePublish(opts *pubOpts) error
 | |
| }
 | |
| 
 | |
| // pubOptFn is a function option used to configure JetStream Publish.
 | |
| type pubOptFn func(opts *pubOpts) error
 | |
| 
 | |
| func (opt pubOptFn) configurePublish(opts *pubOpts) error {
 | |
| 	return opt(opts)
 | |
| }
 | |
| 
 | |
| type pubOpts struct {
 | |
| 	ctx context.Context
 | |
| 	ttl time.Duration
 | |
| 	id  string
 | |
| 	lid string  // Expected last msgId
 | |
| 	str string  // Expected stream name
 | |
| 	seq *uint64 // Expected last sequence
 | |
| 	lss *uint64 // Expected last sequence per subject
 | |
| 
 | |
| 	// Publish retries for NoResponders err.
 | |
| 	rwait time.Duration // Retry wait between attempts
 | |
| 	rnum  int           // Retry attempts
 | |
| 
 | |
| 	// stallWait is the max wait of a async pub ack.
 | |
| 	stallWait time.Duration
 | |
| }
 | |
| 
 | |
| // pubAckResponse is the ack response from the JetStream API when publishing a message.
 | |
| type pubAckResponse struct {
 | |
| 	apiResponse
 | |
| 	*PubAck
 | |
| }
 | |
| 
 | |
| // PubAck is an ack received after successfully publishing a message.
 | |
| type PubAck struct {
 | |
| 	Stream    string `json:"stream"`
 | |
| 	Sequence  uint64 `json:"seq"`
 | |
| 	Duplicate bool   `json:"duplicate,omitempty"`
 | |
| 	Domain    string `json:"domain,omitempty"`
 | |
| }
 | |
| 
 | |
| // Headers for published messages.
 | |
| const (
 | |
| 	MsgIdHdr               = "Nats-Msg-Id"
 | |
| 	ExpectedStreamHdr      = "Nats-Expected-Stream"
 | |
| 	ExpectedLastSeqHdr     = "Nats-Expected-Last-Sequence"
 | |
| 	ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
 | |
| 	ExpectedLastMsgIdHdr   = "Nats-Expected-Last-Msg-Id"
 | |
| 	MsgRollup              = "Nats-Rollup"
 | |
| )
 | |
| 
 | |
| // Headers for republished messages and direct gets.
 | |
| const (
 | |
| 	JSStream       = "Nats-Stream"
 | |
| 	JSSequence     = "Nats-Sequence"
 | |
| 	JSTimeStamp    = "Nats-Time-Stamp"
 | |
| 	JSSubject      = "Nats-Subject"
 | |
| 	JSLastSequence = "Nats-Last-Sequence"
 | |
| )
 | |
| 
 | |
| // MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
 | |
| const MsgSize = "Nats-Msg-Size"
 | |
| 
 | |
| // Rollups, can be subject only or all messages.
 | |
| const (
 | |
| 	MsgRollupSubject = "sub"
 | |
| 	MsgRollupAll     = "all"
 | |
| )
 | |
| 
 | |
| // PublishMsg publishes a Msg to a stream from JetStream.
 | |
| func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
 | |
| 	var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
 | |
| 	if len(opts) > 0 {
 | |
| 		if m.Header == nil {
 | |
| 			m.Header = Header{}
 | |
| 		}
 | |
| 		for _, opt := range opts {
 | |
| 			if err := opt.configurePublish(&o); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	// Check for option collisions. Right now just timeout and context.
 | |
| 	if o.ctx != nil && o.ttl != 0 {
 | |
| 		return nil, ErrContextAndTimeout
 | |
| 	}
 | |
| 	if o.ttl == 0 && o.ctx == nil {
 | |
| 		o.ttl = js.opts.wait
 | |
| 	}
 | |
| 	if o.stallWait > 0 {
 | |
| 		return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
 | |
| 	}
 | |
| 
 | |
| 	if o.id != _EMPTY_ {
 | |
| 		m.Header.Set(MsgIdHdr, o.id)
 | |
| 	}
 | |
| 	if o.lid != _EMPTY_ {
 | |
| 		m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
 | |
| 	}
 | |
| 	if o.str != _EMPTY_ {
 | |
| 		m.Header.Set(ExpectedStreamHdr, o.str)
 | |
| 	}
 | |
| 	if o.seq != nil {
 | |
| 		m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
 | |
| 	}
 | |
| 	if o.lss != nil {
 | |
| 		m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
 | |
| 	}
 | |
| 
 | |
| 	var resp *Msg
 | |
| 	var err error
 | |
| 
 | |
| 	if o.ttl > 0 {
 | |
| 		resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
 | |
| 	} else {
 | |
| 		resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
 | |
| 	}
 | |
| 
 | |
| 	if err != nil {
 | |
| 		for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ {
 | |
| 			// To protect against small blips in leadership changes etc, if we get a no responders here retry.
 | |
| 			if o.ctx != nil {
 | |
| 				select {
 | |
| 				case <-o.ctx.Done():
 | |
| 				case <-time.After(o.rwait):
 | |
| 				}
 | |
| 			} else {
 | |
| 				time.Sleep(o.rwait)
 | |
| 			}
 | |
| 			if o.ttl > 0 {
 | |
| 				ttl -= o.rwait
 | |
| 				if ttl <= 0 {
 | |
| 					err = ErrTimeout
 | |
| 					break
 | |
| 				}
 | |
| 				resp, err = js.nc.RequestMsg(m, time.Duration(ttl))
 | |
| 			} else {
 | |
| 				resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
 | |
| 			}
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			if errors.Is(err, ErrNoResponders) {
 | |
| 				err = ErrNoStreamResponse
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var pa pubAckResponse
 | |
| 	if err := json.Unmarshal(resp.Data, &pa); err != nil {
 | |
| 		return nil, ErrInvalidJSAck
 | |
| 	}
 | |
| 	if pa.Error != nil {
 | |
| 		return nil, pa.Error
 | |
| 	}
 | |
| 	if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
 | |
| 		return nil, ErrInvalidJSAck
 | |
| 	}
 | |
| 	return pa.PubAck, nil
 | |
| }
 | |
| 
 | |
| // Publish publishes a message to a stream from JetStream.
 | |
| func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) {
 | |
| 	return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
 | |
| }
 | |
| 
 | |
| // PubAckFuture is a future for a PubAck.
 | |
| type PubAckFuture interface {
 | |
| 	// Ok returns a receive only channel that can be used to get a PubAck.
 | |
| 	Ok() <-chan *PubAck
 | |
| 
 | |
| 	// Err returns a receive only channel that can be used to get the error from an async publish.
 | |
| 	Err() <-chan error
 | |
| 
 | |
| 	// Msg returns the message that was sent to the server.
 | |
| 	Msg() *Msg
 | |
| }
 | |
| 
 | |
| type pubAckFuture struct {
 | |
| 	js     *js
 | |
| 	msg    *Msg
 | |
| 	pa     *PubAck
 | |
| 	st     time.Time
 | |
| 	err    error
 | |
| 	errCh  chan error
 | |
| 	doneCh chan *PubAck
 | |
| }
 | |
| 
 | |
| func (paf *pubAckFuture) Ok() <-chan *PubAck {
 | |
| 	paf.js.mu.Lock()
 | |
| 	defer paf.js.mu.Unlock()
 | |
| 
 | |
| 	if paf.doneCh == nil {
 | |
| 		paf.doneCh = make(chan *PubAck, 1)
 | |
| 		if paf.pa != nil {
 | |
| 			paf.doneCh <- paf.pa
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return paf.doneCh
 | |
| }
 | |
| 
 | |
| func (paf *pubAckFuture) Err() <-chan error {
 | |
| 	paf.js.mu.Lock()
 | |
| 	defer paf.js.mu.Unlock()
 | |
| 
 | |
| 	if paf.errCh == nil {
 | |
| 		paf.errCh = make(chan error, 1)
 | |
| 		if paf.err != nil {
 | |
| 			paf.errCh <- paf.err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return paf.errCh
 | |
| }
 | |
| 
 | |
| func (paf *pubAckFuture) Msg() *Msg {
 | |
| 	paf.js.mu.RLock()
 | |
| 	defer paf.js.mu.RUnlock()
 | |
| 	return paf.msg
 | |
| }
 | |
| 
 | |
| // For quick token lookup etc.
 | |
| const aReplyTokensize = 6
 | |
| 
 | |
| func (js *js) newAsyncReply() string {
 | |
| 	js.mu.Lock()
 | |
| 	if js.rsub == nil {
 | |
| 		// Create our wildcard reply subject.
 | |
| 		sha := sha256.New()
 | |
| 		sha.Write([]byte(nuid.Next()))
 | |
| 		b := sha.Sum(nil)
 | |
| 		for i := 0; i < aReplyTokensize; i++ {
 | |
| 			b[i] = rdigits[int(b[i]%base)]
 | |
| 		}
 | |
| 		js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
 | |
| 		sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
 | |
| 		if err != nil {
 | |
| 			js.mu.Unlock()
 | |
| 			return _EMPTY_
 | |
| 		}
 | |
| 		js.rsub = sub
 | |
| 		js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
 | |
| 	}
 | |
| 	if js.connStatusCh == nil {
 | |
| 		js.connStatusCh = js.nc.StatusChanged(RECONNECTING, CLOSED)
 | |
| 		go js.resetPendingAcksOnReconnect()
 | |
| 	}
 | |
| 	var sb strings.Builder
 | |
| 	sb.WriteString(js.rpre)
 | |
| 	rn := js.rr.Int63()
 | |
| 	var b [aReplyTokensize]byte
 | |
| 	for i, l := 0, rn; i < len(b); i++ {
 | |
| 		b[i] = rdigits[l%base]
 | |
| 		l /= base
 | |
| 	}
 | |
| 	sb.Write(b[:])
 | |
| 	js.mu.Unlock()
 | |
| 	return sb.String()
 | |
| }
 | |
| 
 | |
| func (js *js) resetPendingAcksOnReconnect() {
 | |
| 	js.mu.Lock()
 | |
| 	connStatusCh := js.connStatusCh
 | |
| 	js.mu.Unlock()
 | |
| 	for {
 | |
| 		newStatus, ok := <-connStatusCh
 | |
| 		if !ok || newStatus == CLOSED {
 | |
| 			return
 | |
| 		}
 | |
| 		js.mu.Lock()
 | |
| 		errCb := js.opts.aecb
 | |
| 		for id, paf := range js.pafs {
 | |
| 			paf.err = ErrDisconnected
 | |
| 			if paf.errCh != nil {
 | |
| 				paf.errCh <- paf.err
 | |
| 			}
 | |
| 			if errCb != nil {
 | |
| 				// clear reply subject so that new one is created on republish
 | |
| 				js.mu.Unlock()
 | |
| 				errCb(js, paf.msg, ErrDisconnected)
 | |
| 				js.mu.Lock()
 | |
| 			}
 | |
| 			delete(js.pafs, id)
 | |
| 		}
 | |
| 		if js.dch != nil {
 | |
| 			close(js.dch)
 | |
| 			js.dch = nil
 | |
| 		}
 | |
| 		js.mu.Unlock()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (js *js) cleanupReplySub() {
 | |
| 	js.mu.Lock()
 | |
| 	if js.rsub != nil {
 | |
| 		js.rsub.Unsubscribe()
 | |
| 		js.rsub = nil
 | |
| 	}
 | |
| 	if js.connStatusCh != nil {
 | |
| 		close(js.connStatusCh)
 | |
| 		js.connStatusCh = nil
 | |
| 	}
 | |
| 	js.mu.Unlock()
 | |
| }
 | |
| 
 | |
| // registerPAF will register for a PubAckFuture.
 | |
| func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
 | |
| 	js.mu.Lock()
 | |
| 	if js.pafs == nil {
 | |
| 		js.pafs = make(map[string]*pubAckFuture)
 | |
| 	}
 | |
| 	paf.js = js
 | |
| 	js.pafs[id] = paf
 | |
| 	np := len(js.pafs)
 | |
| 	maxpa := js.opts.maxpa
 | |
| 	js.mu.Unlock()
 | |
| 	return np, maxpa
 | |
| }
 | |
| 
 | |
| // Lock should be held.
 | |
| func (js *js) getPAF(id string) *pubAckFuture {
 | |
| 	if js.pafs == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return js.pafs[id]
 | |
| }
 | |
| 
 | |
| // clearPAF will remove a PubAckFuture that was registered.
 | |
| func (js *js) clearPAF(id string) {
 | |
| 	js.mu.Lock()
 | |
| 	delete(js.pafs, id)
 | |
| 	js.mu.Unlock()
 | |
| }
 | |
| 
 | |
| // PublishAsyncPending returns how many PubAckFutures are pending.
 | |
| func (js *js) PublishAsyncPending() int {
 | |
| 	js.mu.RLock()
 | |
| 	defer js.mu.RUnlock()
 | |
| 	return len(js.pafs)
 | |
| }
 | |
| 
 | |
| func (js *js) asyncStall() <-chan struct{} {
 | |
| 	js.mu.Lock()
 | |
| 	if js.stc == nil {
 | |
| 		js.stc = make(chan struct{})
 | |
| 	}
 | |
| 	stc := js.stc
 | |
| 	js.mu.Unlock()
 | |
| 	return stc
 | |
| }
 | |
| 
 | |
| // Handle an async reply from PublishAsync.
 | |
| func (js *js) handleAsyncReply(m *Msg) {
 | |
| 	if len(m.Subject) <= js.replyPrefixLen {
 | |
| 		return
 | |
| 	}
 | |
| 	id := m.Subject[js.replyPrefixLen:]
 | |
| 
 | |
| 	js.mu.Lock()
 | |
| 	paf := js.getPAF(id)
 | |
| 	if paf == nil {
 | |
| 		js.mu.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 	// Remove
 | |
| 	delete(js.pafs, id)
 | |
| 
 | |
| 	// Check on anyone stalled and waiting.
 | |
| 	if js.stc != nil && len(js.pafs) < js.opts.maxpa {
 | |
| 		close(js.stc)
 | |
| 		js.stc = nil
 | |
| 	}
 | |
| 	// Check on anyone one waiting on done status.
 | |
| 	if js.dch != nil && len(js.pafs) == 0 {
 | |
| 		dch := js.dch
 | |
| 		js.dch = nil
 | |
| 		// Defer here so error is processed and can be checked.
 | |
| 		defer close(dch)
 | |
| 	}
 | |
| 
 | |
| 	doErr := func(err error) {
 | |
| 		paf.err = err
 | |
| 		if paf.errCh != nil {
 | |
| 			paf.errCh <- paf.err
 | |
| 		}
 | |
| 		cb := js.opts.aecb
 | |
| 		js.mu.Unlock()
 | |
| 		if cb != nil {
 | |
| 			cb(paf.js, paf.msg, err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Process no responders etc.
 | |
| 	if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
 | |
| 		doErr(ErrNoResponders)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var pa pubAckResponse
 | |
| 	if err := json.Unmarshal(m.Data, &pa); err != nil {
 | |
| 		doErr(ErrInvalidJSAck)
 | |
| 		return
 | |
| 	}
 | |
| 	if pa.Error != nil {
 | |
| 		doErr(pa.Error)
 | |
| 		return
 | |
| 	}
 | |
| 	if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
 | |
| 		doErr(ErrInvalidJSAck)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// So here we have received a proper puback.
 | |
| 	paf.pa = pa.PubAck
 | |
| 	if paf.doneCh != nil {
 | |
| 		paf.doneCh <- paf.pa
 | |
| 	}
 | |
| 	js.mu.Unlock()
 | |
| }
 | |
| 
 | |
| // MsgErrHandler is used to process asynchronous errors from
 | |
| // JetStream PublishAsync. It will return the original
 | |
| // message sent to the server for possible retransmitting and the error encountered.
 | |
| type MsgErrHandler func(JetStream, *Msg, error)
 | |
| 
 | |
| // PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
 | |
| func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt {
 | |
| 	return jsOptFn(func(js *jsOpts) error {
 | |
| 		js.aecb = cb
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
 | |
| func PublishAsyncMaxPending(max int) JSOpt {
 | |
| 	return jsOptFn(func(js *jsOpts) error {
 | |
| 		if max < 1 {
 | |
| 			return errors.New("nats: max ack pending should be >= 1")
 | |
| 		}
 | |
| 		js.maxpa = max
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // PublishAsync publishes a message to JetStream and returns a PubAckFuture
 | |
| func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
 | |
| 	return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
 | |
| }
 | |
| 
 | |
| const defaultStallWait = 200 * time.Millisecond
 | |
| 
 | |
| func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
 | |
| 	var o pubOpts
 | |
| 	if len(opts) > 0 {
 | |
| 		if m.Header == nil {
 | |
| 			m.Header = Header{}
 | |
| 		}
 | |
| 		for _, opt := range opts {
 | |
| 			if err := opt.configurePublish(&o); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Timeouts and contexts do not make sense for these.
 | |
| 	if o.ttl != 0 || o.ctx != nil {
 | |
| 		return nil, ErrContextAndTimeout
 | |
| 	}
 | |
| 	stallWait := defaultStallWait
 | |
| 	if o.stallWait > 0 {
 | |
| 		stallWait = o.stallWait
 | |
| 	}
 | |
| 
 | |
| 	// FIXME(dlc) - Make common.
 | |
| 	if o.id != _EMPTY_ {
 | |
| 		m.Header.Set(MsgIdHdr, o.id)
 | |
| 	}
 | |
| 	if o.lid != _EMPTY_ {
 | |
| 		m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
 | |
| 	}
 | |
| 	if o.str != _EMPTY_ {
 | |
| 		m.Header.Set(ExpectedStreamHdr, o.str)
 | |
| 	}
 | |
| 	if o.seq != nil {
 | |
| 		m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
 | |
| 	}
 | |
| 	if o.lss != nil {
 | |
| 		m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
 | |
| 	}
 | |
| 
 | |
| 	// Reply
 | |
| 	if m.Reply != _EMPTY_ {
 | |
| 		return nil, errors.New("nats: reply subject should be empty")
 | |
| 	}
 | |
| 	reply := m.Reply
 | |
| 	m.Reply = js.newAsyncReply()
 | |
| 	defer func() { m.Reply = reply }()
 | |
| 
 | |
| 	if m.Reply == _EMPTY_ {
 | |
| 		return nil, errors.New("nats: error creating async reply handler")
 | |
| 	}
 | |
| 
 | |
| 	id := m.Reply[js.replyPrefixLen:]
 | |
| 	paf := &pubAckFuture{msg: m, st: time.Now()}
 | |
| 	numPending, maxPending := js.registerPAF(id, paf)
 | |
| 
 | |
| 	if maxPending > 0 && numPending >= maxPending {
 | |
| 		select {
 | |
| 		case <-js.asyncStall():
 | |
| 		case <-time.After(stallWait):
 | |
| 			js.clearPAF(id)
 | |
| 			return nil, errors.New("nats: stalled with too many outstanding async published messages")
 | |
| 		}
 | |
| 	}
 | |
| 	if err := js.nc.PublishMsg(m); err != nil {
 | |
| 		js.clearPAF(id)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return paf, nil
 | |
| }
 | |
| 
 | |
| // PublishAsyncComplete returns a channel that will be closed when all outstanding messages have been ack'd.
 | |
| func (js *js) PublishAsyncComplete() <-chan struct{} {
 | |
| 	js.mu.Lock()
 | |
| 	defer js.mu.Unlock()
 | |
| 	if js.dch == nil {
 | |
| 		js.dch = make(chan struct{})
 | |
| 	}
 | |
| 	dch := js.dch
 | |
| 	if len(js.pafs) == 0 {
 | |
| 		close(js.dch)
 | |
| 		js.dch = nil
 | |
| 	}
 | |
| 	return dch
 | |
| }
 | |
| 
 | |
| // MsgId sets the message ID used for deduplication.
 | |
| func MsgId(id string) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		opts.id = id
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ExpectStream sets the expected stream to respond from the publish.
 | |
| func ExpectStream(stream string) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		opts.str = stream
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ExpectLastSequence sets the expected sequence in the response from the publish.
 | |
| func ExpectLastSequence(seq uint64) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		opts.seq = &seq
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
 | |
| func ExpectLastSequencePerSubject(seq uint64) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		opts.lss = &seq
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ExpectLastMsgId sets the expected last msgId in the response from the publish.
 | |
| func ExpectLastMsgId(id string) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		opts.lid = id
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // RetryWait sets the retry wait time when ErrNoResponders is encountered.
 | |
| func RetryWait(dur time.Duration) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		opts.rwait = dur
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // RetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
 | |
| func RetryAttempts(num int) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		opts.rnum = num
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // StallWait sets the max wait when the producer becomes stall producing messages.
 | |
| func StallWait(ttl time.Duration) PubOpt {
 | |
| 	return pubOptFn(func(opts *pubOpts) error {
 | |
| 		if ttl <= 0 {
 | |
| 			return fmt.Errorf("nats: stall wait should be more than 0")
 | |
| 		}
 | |
| 		opts.stallWait = ttl
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| type ackOpts struct {
 | |
| 	ttl      time.Duration
 | |
| 	ctx      context.Context
 | |
| 	nakDelay time.Duration
 | |
| }
 | |
| 
 | |
| // AckOpt are the options that can be passed when acknowledge a message.
 | |
| type AckOpt interface {
 | |
| 	configureAck(opts *ackOpts) error
 | |
| }
 | |
| 
 | |
| // MaxWait sets the maximum amount of time we will wait for a response.
 | |
| type MaxWait time.Duration
 | |
| 
 | |
| func (ttl MaxWait) configureJSContext(js *jsOpts) error {
 | |
| 	js.wait = time.Duration(ttl)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ttl MaxWait) configurePull(opts *pullOpts) error {
 | |
| 	opts.ttl = time.Duration(ttl)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // AckWait sets the maximum amount of time we will wait for an ack.
 | |
| type AckWait time.Duration
 | |
| 
 | |
| func (ttl AckWait) configurePublish(opts *pubOpts) error {
 | |
| 	opts.ttl = time.Duration(ttl)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ttl AckWait) configureSubscribe(opts *subOpts) error {
 | |
| 	opts.cfg.AckWait = time.Duration(ttl)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ttl AckWait) configureAck(opts *ackOpts) error {
 | |
| 	opts.ttl = time.Duration(ttl)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ContextOpt is an option used to set a context.Context.
 | |
| type ContextOpt struct {
 | |
| 	context.Context
 | |
| }
 | |
| 
 | |
| func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
 | |
| 	opts.ctx = ctx
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
 | |
| 	opts.ctx = ctx
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ctx ContextOpt) configureSubscribe(opts *subOpts) error {
 | |
| 	opts.ctx = ctx
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ctx ContextOpt) configurePull(opts *pullOpts) error {
 | |
| 	opts.ctx = ctx
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ctx ContextOpt) configureAck(opts *ackOpts) error {
 | |
| 	opts.ctx = ctx
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Context returns an option that can be used to configure a context for APIs
 | |
| // that are context aware such as those part of the JetStream interface.
 | |
| func Context(ctx context.Context) ContextOpt {
 | |
| 	return ContextOpt{ctx}
 | |
| }
 | |
| 
 | |
| type nakDelay time.Duration
 | |
| 
 | |
| func (d nakDelay) configureAck(opts *ackOpts) error {
 | |
| 	opts.nakDelay = time.Duration(d)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Subscribe
 | |
| 
 | |
| // ConsumerConfig is the configuration of a JetStream consumer.
 | |
| type ConsumerConfig struct {
 | |
| 	Durable         string          `json:"durable_name,omitempty"`
 | |
| 	Name            string          `json:"name,omitempty"`
 | |
| 	Description     string          `json:"description,omitempty"`
 | |
| 	DeliverPolicy   DeliverPolicy   `json:"deliver_policy"`
 | |
| 	OptStartSeq     uint64          `json:"opt_start_seq,omitempty"`
 | |
| 	OptStartTime    *time.Time      `json:"opt_start_time,omitempty"`
 | |
| 	AckPolicy       AckPolicy       `json:"ack_policy"`
 | |
| 	AckWait         time.Duration   `json:"ack_wait,omitempty"`
 | |
| 	MaxDeliver      int             `json:"max_deliver,omitempty"`
 | |
| 	BackOff         []time.Duration `json:"backoff,omitempty"`
 | |
| 	FilterSubject   string          `json:"filter_subject,omitempty"`
 | |
| 	FilterSubjects  []string        `json:"filter_subjects,omitempty"`
 | |
| 	ReplayPolicy    ReplayPolicy    `json:"replay_policy"`
 | |
| 	RateLimit       uint64          `json:"rate_limit_bps,omitempty"` // Bits per sec
 | |
| 	SampleFrequency string          `json:"sample_freq,omitempty"`
 | |
| 	MaxWaiting      int             `json:"max_waiting,omitempty"`
 | |
| 	MaxAckPending   int             `json:"max_ack_pending,omitempty"`
 | |
| 	FlowControl     bool            `json:"flow_control,omitempty"`
 | |
| 	Heartbeat       time.Duration   `json:"idle_heartbeat,omitempty"`
 | |
| 	HeadersOnly     bool            `json:"headers_only,omitempty"`
 | |
| 
 | |
| 	// Pull based options.
 | |
| 	MaxRequestBatch    int           `json:"max_batch,omitempty"`
 | |
| 	MaxRequestExpires  time.Duration `json:"max_expires,omitempty"`
 | |
| 	MaxRequestMaxBytes int           `json:"max_bytes,omitempty"`
 | |
| 
 | |
| 	// Push based consumers.
 | |
| 	DeliverSubject string `json:"deliver_subject,omitempty"`
 | |
| 	DeliverGroup   string `json:"deliver_group,omitempty"`
 | |
| 
 | |
| 	// Inactivity threshold.
 | |
| 	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
 | |
| 
 | |
| 	// Generally inherited by parent stream and other markers, now can be configured directly.
 | |
| 	Replicas int `json:"num_replicas"`
 | |
| 	// Force memory storage.
 | |
| 	MemoryStorage bool `json:"mem_storage,omitempty"`
 | |
| 
 | |
| 	// Metadata is additional metadata for the Consumer.
 | |
| 	// Keys starting with `_nats` are reserved.
 | |
| 	// NOTE: Metadata requires nats-server v2.10.0+
 | |
| 	Metadata map[string]string `json:"metadata,omitempty"`
 | |
| }
 | |
| 
 | |
| // ConsumerInfo is the info from a JetStream consumer.
 | |
| type ConsumerInfo struct {
 | |
| 	Stream         string         `json:"stream_name"`
 | |
| 	Name           string         `json:"name"`
 | |
| 	Created        time.Time      `json:"created"`
 | |
| 	Config         ConsumerConfig `json:"config"`
 | |
| 	Delivered      SequenceInfo   `json:"delivered"`
 | |
| 	AckFloor       SequenceInfo   `json:"ack_floor"`
 | |
| 	NumAckPending  int            `json:"num_ack_pending"`
 | |
| 	NumRedelivered int            `json:"num_redelivered"`
 | |
| 	NumWaiting     int            `json:"num_waiting"`
 | |
| 	NumPending     uint64         `json:"num_pending"`
 | |
| 	Cluster        *ClusterInfo   `json:"cluster,omitempty"`
 | |
| 	PushBound      bool           `json:"push_bound,omitempty"`
 | |
| }
 | |
| 
 | |
| // SequenceInfo has both the consumer and the stream sequence and last activity.
 | |
| type SequenceInfo struct {
 | |
| 	Consumer uint64     `json:"consumer_seq"`
 | |
| 	Stream   uint64     `json:"stream_seq"`
 | |
| 	Last     *time.Time `json:"last_active,omitempty"`
 | |
| }
 | |
| 
 | |
| // SequencePair includes the consumer and stream sequence info from a JetStream consumer.
 | |
| type SequencePair struct {
 | |
| 	Consumer uint64 `json:"consumer_seq"`
 | |
| 	Stream   uint64 `json:"stream_seq"`
 | |
| }
 | |
| 
 | |
| // nextRequest is for getting next messages for pull based consumers from JetStream.
 | |
| type nextRequest struct {
 | |
| 	Expires   time.Duration `json:"expires,omitempty"`
 | |
| 	Batch     int           `json:"batch,omitempty"`
 | |
| 	NoWait    bool          `json:"no_wait,omitempty"`
 | |
| 	MaxBytes  int           `json:"max_bytes,omitempty"`
 | |
| 	Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
 | |
| }
 | |
| 
 | |
| // jsSub includes JetStream subscription info.
 | |
| type jsSub struct {
 | |
| 	js *js
 | |
| 
 | |
| 	// For pull subscribers, this is the next message subject to send requests to.
 | |
| 	nms string
 | |
| 
 | |
| 	psubj    string // the subject that was passed by user to the subscribe calls
 | |
| 	consumer string
 | |
| 	stream   string
 | |
| 	deliver  string
 | |
| 	pull     bool
 | |
| 	dc       bool // Delete JS consumer
 | |
| 	ackNone  bool
 | |
| 
 | |
| 	// This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
 | |
| 	// add consumer response. Note that some versions of the server gather the
 | |
| 	// consumer info *after* the creation of the consumer, which means that
 | |
| 	// some messages may have been already delivered. So the sum of the two
 | |
| 	// is a more accurate representation of the number of messages pending or
 | |
| 	// in the process of being delivered to the subscription when created.
 | |
| 	pending uint64
 | |
| 
 | |
| 	// Ordered consumers
 | |
| 	ordered bool
 | |
| 	dseq    uint64
 | |
| 	sseq    uint64
 | |
| 	ccreq   *createConsumerRequest
 | |
| 
 | |
| 	// Heartbeats and Flow Control handling from push consumers.
 | |
| 	hbc    *time.Timer
 | |
| 	hbi    time.Duration
 | |
| 	active bool
 | |
| 	cmeta  string
 | |
| 	fcr    string
 | |
| 	fcd    uint64
 | |
| 	fciseq uint64
 | |
| 	csfct  *time.Timer
 | |
| 
 | |
| 	// Cancellation function to cancel context on drain/unsubscribe.
 | |
| 	cancel func()
 | |
| }
 | |
| 
 | |
| // Deletes the JS Consumer.
 | |
| // No connection nor subscription lock must be held on entry.
 | |
| func (sub *Subscription) deleteConsumer() error {
 | |
| 	sub.mu.Lock()
 | |
| 	jsi := sub.jsi
 | |
| 	if jsi == nil {
 | |
| 		sub.mu.Unlock()
 | |
| 		return nil
 | |
| 	}
 | |
| 	if jsi.stream == _EMPTY_ || jsi.consumer == _EMPTY_ {
 | |
| 		sub.mu.Unlock()
 | |
| 		return nil
 | |
| 	}
 | |
| 	stream, consumer := jsi.stream, jsi.consumer
 | |
| 	js := jsi.js
 | |
| 	sub.mu.Unlock()
 | |
| 
 | |
| 	return js.DeleteConsumer(stream, consumer)
 | |
| }
 | |
| 
 | |
| // SubOpt configures options for subscribing to JetStream consumers.
 | |
| type SubOpt interface {
 | |
| 	configureSubscribe(opts *subOpts) error
 | |
| }
 | |
| 
 | |
| // subOptFn is a function option used to configure a JetStream Subscribe.
 | |
| type subOptFn func(opts *subOpts) error
 | |
| 
 | |
| func (opt subOptFn) configureSubscribe(opts *subOpts) error {
 | |
| 	return opt(opts)
 | |
| }
 | |
| 
 | |
| // Subscribe creates an async Subscription for JetStream.
 | |
| // The stream and consumer names can be provided with the nats.Bind() option.
 | |
| // For creating an ephemeral (where the consumer name is picked by the server),
 | |
| // you can provide the stream name with nats.BindStream().
 | |
| // If no stream name is specified, the library will attempt to figure out which
 | |
| // stream the subscription is for. See important notes below for more details.
 | |
| //
 | |
| // IMPORTANT NOTES:
 | |
| // * If none of the options Bind() nor Durable() are specified, the library will
 | |
| // send a request to the server to create an ephemeral JetStream consumer,
 | |
| // which will be deleted after an Unsubscribe() or Drain(), or automatically
 | |
| // by the server after a short period of time after the NATS subscription is
 | |
| // gone.
 | |
| // * If Durable() option is specified, the library will attempt to lookup a JetStream
 | |
| // consumer with this name, and if found, will bind to it and not attempt to
 | |
| // delete it. However, if not found, the library will send a request to create
 | |
| // such durable JetStream consumer. The library will delete the JetStream consumer
 | |
| // after an Unsubscribe() or Drain().
 | |
| // * If Bind() option is provided, the library will attempt to lookup the
 | |
| // consumer with the given name, and if successful, bind to it. If the lookup fails,
 | |
| // then the Subscribe() call will return an error.
 | |
| func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
 | |
| 	if cb == nil {
 | |
| 		return nil, ErrBadSubscription
 | |
| 	}
 | |
| 	return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts)
 | |
| }
 | |
| 
 | |
| // SubscribeSync creates a Subscription that can be used to process messages synchronously.
 | |
| // See important note in Subscribe()
 | |
| func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
 | |
| 	mch := make(chan *Msg, js.nc.Opts.SubChanLen)
 | |
| 	return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts)
 | |
| }
 | |
| 
 | |
| // QueueSubscribe creates a Subscription with a queue group.
 | |
| // If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
 | |
| // See important note in Subscribe()
 | |
| func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
 | |
| 	if cb == nil {
 | |
| 		return nil, ErrBadSubscription
 | |
| 	}
 | |
| 	return js.subscribe(subj, queue, cb, nil, false, false, opts)
 | |
| }
 | |
| 
 | |
| // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
 | |
| // See important note in QueueSubscribe()
 | |
| func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
 | |
| 	mch := make(chan *Msg, js.nc.Opts.SubChanLen)
 | |
| 	return js.subscribe(subj, queue, nil, mch, true, false, opts)
 | |
| }
 | |
| 
 | |
| // ChanSubscribe creates channel based Subscription.
 | |
| // Using ChanSubscribe without buffered capacity is not recommended since
 | |
| // it will be prone to dropping messages with a slow consumer error.  Make sure to give the channel enough
 | |
| // capacity to handle bursts in traffic, for example other Subscribe APIs use a default of 512k capacity in comparison.
 | |
| // See important note in Subscribe()
 | |
| func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
 | |
| 	return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts)
 | |
| }
 | |
| 
 | |
| // ChanQueueSubscribe creates channel based Subscription with a queue group.
 | |
| // See important note in QueueSubscribe()
 | |
| func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
 | |
| 	return js.subscribe(subj, queue, nil, ch, false, false, opts)
 | |
| }
 | |
| 
 | |
| // PullSubscribe creates a Subscription that can fetch messages.
 | |
| // See important note in Subscribe()
 | |
| func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
 | |
| 	mch := make(chan *Msg, js.nc.Opts.SubChanLen)
 | |
| 	if durable != "" {
 | |
| 		opts = append(opts, Durable(durable))
 | |
| 	}
 | |
| 	return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts)
 | |
| }
 | |
| 
 | |
| func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
 | |
| 	ccfg := &info.Config
 | |
| 
 | |
| 	// Make sure this new subject matches or is a subset.
 | |
| 	if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
 | |
| 		return _EMPTY_, ErrSubjectMismatch
 | |
| 	}
 | |
| 
 | |
| 	// Prevent binding a subscription against incompatible consumer types.
 | |
| 	if isPullMode && ccfg.DeliverSubject != _EMPTY_ {
 | |
| 		return _EMPTY_, ErrPullSubscribeToPushConsumer
 | |
| 	} else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ {
 | |
| 		return _EMPTY_, ErrPullSubscribeRequired
 | |
| 	}
 | |
| 
 | |
| 	// If pull mode, nothing else to check here.
 | |
| 	if isPullMode {
 | |
| 		return _EMPTY_, checkConfig(ccfg, userCfg)
 | |
| 	}
 | |
| 
 | |
| 	// At this point, we know the user wants push mode, and the JS consumer is
 | |
| 	// really push mode.
 | |
| 
 | |
| 	dg := info.Config.DeliverGroup
 | |
| 	if dg == _EMPTY_ {
 | |
| 		// Prevent an user from attempting to create a queue subscription on
 | |
| 		// a JS consumer that was not created with a deliver group.
 | |
| 		if queue != _EMPTY_ {
 | |
| 			return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group")
 | |
| 		} else if info.PushBound {
 | |
| 			// Need to reject a non queue subscription to a non queue consumer
 | |
| 			// if the consumer is already bound.
 | |
| 			return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription")
 | |
| 		}
 | |
| 	} else {
 | |
| 		// If the JS consumer has a deliver group, we need to fail a non queue
 | |
| 		// subscription attempt:
 | |
| 		if queue == _EMPTY_ {
 | |
| 			return _EMPTY_, fmt.Errorf("cannot create a subscription for a consumer with a deliver group %q", dg)
 | |
| 		} else if queue != dg {
 | |
| 			// Here the user's queue group name does not match the one associated
 | |
| 			// with the JS consumer.
 | |
| 			return _EMPTY_, fmt.Errorf("cannot create a queue subscription %q for a consumer with a deliver group %q",
 | |
| 				queue, dg)
 | |
| 		}
 | |
| 	}
 | |
| 	if err := checkConfig(ccfg, userCfg); err != nil {
 | |
| 		return _EMPTY_, err
 | |
| 	}
 | |
| 	return ccfg.DeliverSubject, nil
 | |
| }
 | |
| 
 | |
| func checkConfig(s, u *ConsumerConfig) error {
 | |
| 	makeErr := func(fieldName string, usrVal, srvVal any) error {
 | |
| 		return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
 | |
| 	}
 | |
| 
 | |
| 	if u.Durable != _EMPTY_ && u.Durable != s.Durable {
 | |
| 		return makeErr("durable", u.Durable, s.Durable)
 | |
| 	}
 | |
| 	if u.Description != _EMPTY_ && u.Description != s.Description {
 | |
| 		return makeErr("description", u.Description, s.Description)
 | |
| 	}
 | |
| 	if u.DeliverPolicy != deliverPolicyNotSet && u.DeliverPolicy != s.DeliverPolicy {
 | |
| 		return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy)
 | |
| 	}
 | |
| 	if u.OptStartSeq > 0 && u.OptStartSeq != s.OptStartSeq {
 | |
| 		return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq)
 | |
| 	}
 | |
| 	if u.OptStartTime != nil && !u.OptStartTime.IsZero() && !(*u.OptStartTime).Equal(*s.OptStartTime) {
 | |
| 		return makeErr("optional start time", u.OptStartTime, s.OptStartTime)
 | |
| 	}
 | |
| 	if u.AckPolicy != ackPolicyNotSet && u.AckPolicy != s.AckPolicy {
 | |
| 		return makeErr("ack policy", u.AckPolicy, s.AckPolicy)
 | |
| 	}
 | |
| 	if u.AckWait > 0 && u.AckWait != s.AckWait {
 | |
| 		return makeErr("ack wait", u.AckWait, s.AckWait)
 | |
| 	}
 | |
| 	if u.MaxDeliver > 0 && u.MaxDeliver != s.MaxDeliver {
 | |
| 		return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver)
 | |
| 	}
 | |
| 	if u.ReplayPolicy != replayPolicyNotSet && u.ReplayPolicy != s.ReplayPolicy {
 | |
| 		return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy)
 | |
| 	}
 | |
| 	if u.RateLimit > 0 && u.RateLimit != s.RateLimit {
 | |
| 		return makeErr("rate limit", u.RateLimit, s.RateLimit)
 | |
| 	}
 | |
| 	if u.SampleFrequency != _EMPTY_ && u.SampleFrequency != s.SampleFrequency {
 | |
| 		return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency)
 | |
| 	}
 | |
| 	if u.MaxWaiting > 0 && u.MaxWaiting != s.MaxWaiting {
 | |
| 		return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting)
 | |
| 	}
 | |
| 	if u.MaxAckPending > 0 && u.MaxAckPending != s.MaxAckPending {
 | |
| 		return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending)
 | |
| 	}
 | |
| 	// For flow control, we want to fail if the user explicit wanted it, but
 | |
| 	// it is not set in the existing consumer. If it is not asked by the user,
 | |
| 	// the library still handles it and so no reason to fail.
 | |
| 	if u.FlowControl && !s.FlowControl {
 | |
| 		return makeErr("flow control", u.FlowControl, s.FlowControl)
 | |
| 	}
 | |
| 	if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat {
 | |
| 		return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
 | |
| 	}
 | |
| 	if u.Replicas > 0 && u.Replicas != s.Replicas {
 | |
| 		return makeErr("replicas", u.Replicas, s.Replicas)
 | |
| 	}
 | |
| 	if u.MemoryStorage && !s.MemoryStorage {
 | |
| 		return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) {
 | |
| 	cfg := ConsumerConfig{
 | |
| 		DeliverPolicy: deliverPolicyNotSet,
 | |
| 		AckPolicy:     ackPolicyNotSet,
 | |
| 		ReplayPolicy:  replayPolicyNotSet,
 | |
| 	}
 | |
| 	o := subOpts{cfg: &cfg}
 | |
| 	if len(opts) > 0 {
 | |
| 		for _, opt := range opts {
 | |
| 			if opt == nil {
 | |
| 				continue
 | |
| 			}
 | |
| 			if err := opt.configureSubscribe(&o); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If no stream name is specified, the subject cannot be empty.
 | |
| 	if subj == _EMPTY_ && o.stream == _EMPTY_ {
 | |
| 		return nil, fmt.Errorf("nats: subject required")
 | |
| 	}
 | |
| 
 | |
| 	// Note that these may change based on the consumer info response we may get.
 | |
| 	hasHeartbeats := o.cfg.Heartbeat > 0
 | |
| 	hasFC := o.cfg.FlowControl
 | |
| 
 | |
| 	// Some checks for pull subscribers
 | |
| 	if isPullMode {
 | |
| 		// No deliver subject should be provided
 | |
| 		if o.cfg.DeliverSubject != _EMPTY_ {
 | |
| 			return nil, ErrPullSubscribeToPushConsumer
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Some check/setting specific to queue subs
 | |
| 	if queue != _EMPTY_ {
 | |
| 		// Queue subscriber cannot have HB or FC (since messages will be randomly dispatched
 | |
| 		// to members). We may in the future have a separate NATS subscription that all members
 | |
| 		// would subscribe to and server would send on.
 | |
| 		if o.cfg.Heartbeat > 0 || o.cfg.FlowControl {
 | |
| 			// Not making this a public ErrXXX in case we allow in the future.
 | |
| 			return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control")
 | |
| 		}
 | |
| 
 | |
| 		// If this is a queue subscription and no consumer nor durable name was specified,
 | |
| 		// then we will use the queue name as a durable name.
 | |
| 		if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
 | |
| 			if err := checkConsumerName(queue); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			o.cfg.Durable = queue
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		err           error
 | |
| 		shouldCreate  bool
 | |
| 		info          *ConsumerInfo
 | |
| 		deliver       string
 | |
| 		stream        = o.stream
 | |
| 		consumer      = o.consumer
 | |
| 		isDurable     = o.cfg.Durable != _EMPTY_
 | |
| 		consumerBound = o.bound
 | |
| 		ctx           = o.ctx
 | |
| 		skipCInfo     = o.skipCInfo
 | |
| 		notFoundErr   bool
 | |
| 		lookupErr     bool
 | |
| 		nc            = js.nc
 | |
| 		nms           string
 | |
| 		hbi           time.Duration
 | |
| 		ccreq         *createConsumerRequest // In case we need to hold onto it for ordered consumers.
 | |
| 		maxap         int
 | |
| 	)
 | |
| 
 | |
| 	// Do some quick checks here for ordered consumers. We do these here instead of spread out
 | |
| 	// in the individual SubOpts.
 | |
| 	if o.ordered {
 | |
| 		// Make sure we are not durable.
 | |
| 		if isDurable {
 | |
| 			return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
 | |
| 		}
 | |
| 		// Check ack policy.
 | |
| 		if o.cfg.AckPolicy != ackPolicyNotSet {
 | |
| 			return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
 | |
| 		}
 | |
| 		// Check max deliver.
 | |
| 		if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
 | |
| 			return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
 | |
| 		}
 | |
| 		// No deliver subject, we pick our own.
 | |
| 		if o.cfg.DeliverSubject != _EMPTY_ {
 | |
| 			return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
 | |
| 		}
 | |
| 		// Queue groups not allowed.
 | |
| 		if queue != _EMPTY_ {
 | |
| 			return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
 | |
| 		}
 | |
| 		// Check for bound consumers.
 | |
| 		if consumer != _EMPTY_ {
 | |
| 			return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
 | |
| 		}
 | |
| 		// Check for pull mode.
 | |
| 		if isPullMode {
 | |
| 			return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
 | |
| 		}
 | |
| 		// Setup how we need it to be here.
 | |
| 		o.cfg.FlowControl = true
 | |
| 		o.cfg.AckPolicy = AckNonePolicy
 | |
| 		o.cfg.MaxDeliver = 1
 | |
| 		o.cfg.AckWait = 22 * time.Hour // Just set to something known, not utilized.
 | |
| 		// Force R1 and MemoryStorage for these.
 | |
| 		o.cfg.Replicas = 1
 | |
| 		o.cfg.MemoryStorage = true
 | |
| 
 | |
| 		if !hasHeartbeats {
 | |
| 			o.cfg.Heartbeat = orderedHeartbeatsInterval
 | |
| 		}
 | |
| 		hasFC, hasHeartbeats = true, true
 | |
| 		o.mack = true // To avoid auto-ack wrapping call below.
 | |
| 		hbi = o.cfg.Heartbeat
 | |
| 	}
 | |
| 
 | |
| 	// In case a consumer has not been set explicitly, then the
 | |
| 	// durable name will be used as the consumer name.
 | |
| 	if consumer == _EMPTY_ {
 | |
| 		consumer = o.cfg.Durable
 | |
| 	}
 | |
| 
 | |
| 	// Find the stream mapped to the subject if not bound to a stream already.
 | |
| 	if stream == _EMPTY_ {
 | |
| 		stream, err = js.StreamNameBySubject(subj)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// With an explicit durable name, we can lookup the consumer first
 | |
| 	// to which it should be attaching to.
 | |
| 	// If SkipConsumerLookup was used, do not call consumer info.
 | |
| 	if consumer != _EMPTY_ && !o.skipCInfo {
 | |
| 		info, err = js.ConsumerInfo(stream, consumer)
 | |
| 		notFoundErr = errors.Is(err, ErrConsumerNotFound)
 | |
| 		lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded)
 | |
| 	}
 | |
| 
 | |
| 	switch {
 | |
| 	case info != nil:
 | |
| 		deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		icfg := &info.Config
 | |
| 		hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
 | |
| 		hasHeartbeats = hbi > 0
 | |
| 		maxap = icfg.MaxAckPending
 | |
| 	case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
 | |
| 		// If the consumer is being bound and we got an error on pull subscribe then allow the error.
 | |
| 		if !(isPullMode && lookupErr && consumerBound) {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	case skipCInfo:
 | |
| 		// When skipping consumer info, need to rely on the manually passed sub options
 | |
| 		// to match the expected behavior from the subscription.
 | |
| 		hasFC, hbi = o.cfg.FlowControl, o.cfg.Heartbeat
 | |
| 		hasHeartbeats = hbi > 0
 | |
| 		maxap = o.cfg.MaxAckPending
 | |
| 		deliver = o.cfg.DeliverSubject
 | |
| 		if consumerBound {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		// When not bound to a consumer already, proceed to create.
 | |
| 		fallthrough
 | |
| 	default:
 | |
| 		// Attempt to create consumer if not found nor using Bind.
 | |
| 		shouldCreate = true
 | |
| 		if o.cfg.DeliverSubject != _EMPTY_ {
 | |
| 			deliver = o.cfg.DeliverSubject
 | |
| 		} else if !isPullMode {
 | |
| 			deliver = nc.NewInbox()
 | |
| 			cfg.DeliverSubject = deliver
 | |
| 		}
 | |
| 		// Do filtering always, server will clear as needed.
 | |
| 		cfg.FilterSubject = subj
 | |
| 
 | |
| 		// Pass the queue to the consumer config
 | |
| 		if queue != _EMPTY_ {
 | |
| 			cfg.DeliverGroup = queue
 | |
| 		}
 | |
| 
 | |
| 		// If not set, default to deliver all
 | |
| 		if cfg.DeliverPolicy == deliverPolicyNotSet {
 | |
| 			cfg.DeliverPolicy = DeliverAllPolicy
 | |
| 		}
 | |
| 		// If not set, default to ack explicit.
 | |
| 		if cfg.AckPolicy == ackPolicyNotSet {
 | |
| 			cfg.AckPolicy = AckExplicitPolicy
 | |
| 		}
 | |
| 		// If not set, default to instant
 | |
| 		if cfg.ReplayPolicy == replayPolicyNotSet {
 | |
| 			cfg.ReplayPolicy = ReplayInstantPolicy
 | |
| 		}
 | |
| 
 | |
| 		// If we have acks at all and the MaxAckPending is not set go ahead
 | |
| 		// and set to the internal max for channel based consumers
 | |
| 		if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy {
 | |
| 			cfg.MaxAckPending = cap(ch)
 | |
| 		}
 | |
| 		// Create request here.
 | |
| 		ccreq = &createConsumerRequest{
 | |
| 			Stream: stream,
 | |
| 			Config: &cfg,
 | |
| 		}
 | |
| 		hbi = cfg.Heartbeat
 | |
| 	}
 | |
| 
 | |
| 	if isPullMode {
 | |
| 		nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
 | |
| 		deliver = nc.NewInbox()
 | |
| 		// for pull consumers, create a wildcard subscription to differentiate pull requests
 | |
| 		deliver += ".*"
 | |
| 	}
 | |
| 
 | |
| 	// In case this has a context, then create a child context that
 | |
| 	// is possible to cancel via unsubscribe / drain.
 | |
| 	var cancel func()
 | |
| 	if ctx != nil {
 | |
| 		ctx, cancel = context.WithCancel(ctx)
 | |
| 	}
 | |
| 
 | |
| 	jsi := &jsSub{
 | |
| 		js:       js,
 | |
| 		stream:   stream,
 | |
| 		consumer: consumer,
 | |
| 		deliver:  deliver,
 | |
| 		hbi:      hbi,
 | |
| 		ordered:  o.ordered,
 | |
| 		ccreq:    ccreq,
 | |
| 		dseq:     1,
 | |
| 		pull:     isPullMode,
 | |
| 		nms:      nms,
 | |
| 		psubj:    subj,
 | |
| 		cancel:   cancel,
 | |
| 		ackNone:  o.cfg.AckPolicy == AckNonePolicy,
 | |
| 	}
 | |
| 
 | |
| 	// Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
 | |
| 	if cb != nil && !o.mack && o.cfg.AckPolicy != AckNonePolicy {
 | |
| 		ocb := cb
 | |
| 		cb = func(m *Msg) { ocb(m); m.Ack() }
 | |
| 	}
 | |
| 	sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
 | |
| 	// We need to clear the jsi so we do not remove any durables etc.
 | |
| 	cleanUpSub := func() {
 | |
| 		if sub != nil {
 | |
| 			sub.mu.Lock()
 | |
| 			sub.jsi = nil
 | |
| 			sub.mu.Unlock()
 | |
| 			sub.Unsubscribe()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If we are creating or updating let's process that request.
 | |
| 	consName := o.cfg.Name
 | |
| 	if shouldCreate {
 | |
| 		if cfg.Durable != "" {
 | |
| 			consName = cfg.Durable
 | |
| 		} else if consName == "" {
 | |
| 			consName = getHash(nuid.Next())
 | |
| 		}
 | |
| 		info, err := js.upsertConsumer(stream, consName, ccreq.Config)
 | |
| 		if err != nil {
 | |
| 			var apiErr *APIError
 | |
| 			if ok := errors.As(err, &apiErr); !ok {
 | |
| 				cleanUpSub()
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			if consumer == _EMPTY_ ||
 | |
| 				(apiErr.ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr.ErrorCode != JSErrCodeConsumerNameExists) {
 | |
| 				cleanUpSub()
 | |
| 				if errors.Is(apiErr, ErrStreamNotFound) {
 | |
| 					return nil, ErrStreamNotFound
 | |
| 				}
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			// We will not be using this sub here if we were push based.
 | |
| 			if !isPullMode {
 | |
| 				cleanUpSub()
 | |
| 			}
 | |
| 
 | |
| 			info, err = js.ConsumerInfo(stream, consumer)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			if !isPullMode {
 | |
| 				// We can't reuse the channel, so if one was passed, we need to create a new one.
 | |
| 				if isSync {
 | |
| 					ch = make(chan *Msg, cap(ch))
 | |
| 				} else if ch != nil {
 | |
| 					// User provided (ChanSubscription), simply try to drain it.
 | |
| 					for done := false; !done; {
 | |
| 						select {
 | |
| 						case <-ch:
 | |
| 						default:
 | |
| 							done = true
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 				jsi.deliver = deliver
 | |
| 				jsi.hbi = info.Config.Heartbeat
 | |
| 
 | |
| 				// Recreate the subscription here.
 | |
| 				sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
 | |
| 				if err != nil {
 | |
| 					return nil, err
 | |
| 				}
 | |
| 				hasFC = info.Config.FlowControl
 | |
| 				hasHeartbeats = info.Config.Heartbeat > 0
 | |
| 			}
 | |
| 		} else {
 | |
| 			// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
 | |
| 			sub.mu.Lock()
 | |
| 			sub.jsi.dc = true
 | |
| 			sub.jsi.pending = info.NumPending + info.Delivered.Consumer
 | |
| 			// If this is an ephemeral, we did not have a consumer name, we get it from the info
 | |
| 			// after the AddConsumer returns.
 | |
| 			if consumer == _EMPTY_ {
 | |
| 				sub.jsi.consumer = info.Name
 | |
| 				if isPullMode {
 | |
| 					sub.jsi.nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, info.Name)
 | |
| 				}
 | |
| 			}
 | |
| 			sub.mu.Unlock()
 | |
| 		}
 | |
| 		// Capture max ack pending from the info response here which covers both
 | |
| 		// success and failure followed by consumer lookup.
 | |
| 		maxap = info.Config.MaxAckPending
 | |
| 	}
 | |
| 
 | |
| 	// If maxap is greater than the default sub's pending limit, use that.
 | |
| 	if maxap > DefaultSubPendingMsgsLimit {
 | |
| 		// For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit
 | |
| 		bl := maxap * 1024 * 1024
 | |
| 		if bl < DefaultSubPendingBytesLimit {
 | |
| 			bl = DefaultSubPendingBytesLimit
 | |
| 		}
 | |
| 		if err := sub.SetPendingLimits(maxap, bl); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Do heartbeats last if needed.
 | |
| 	if hasHeartbeats {
 | |
| 		sub.scheduleHeartbeatCheck()
 | |
| 	}
 | |
| 	// For ChanSubscriptions, if we know that there is flow control, we will
 | |
| 	// start a go routine that evaluates the number of delivered messages
 | |
| 	// and process flow control.
 | |
| 	if sub.Type() == ChanSubscription && hasFC {
 | |
| 		sub.chanSubcheckForFlowControlResponse()
 | |
| 	}
 | |
| 
 | |
| 	// Wait for context to get canceled if there is one.
 | |
| 	if ctx != nil {
 | |
| 		go func() {
 | |
| 			<-ctx.Done()
 | |
| 			sub.Unsubscribe()
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	return sub, nil
 | |
| }
 | |
| 
 | |
| // InitialConsumerPending returns the number of messages pending to be
 | |
| // delivered to the consumer when the subscription was created.
 | |
| func (sub *Subscription) InitialConsumerPending() (uint64, error) {
 | |
| 	sub.mu.Lock()
 | |
| 	defer sub.mu.Unlock()
 | |
| 	if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
 | |
| 		return 0, fmt.Errorf("%w: not a JetStream subscription", ErrTypeSubscription)
 | |
| 	}
 | |
| 	return sub.jsi.pending, nil
 | |
| }
 | |
| 
 | |
| // This long-lived routine is used per ChanSubscription to check
 | |
| // on the number of delivered messages and check for flow control response.
 | |
| func (sub *Subscription) chanSubcheckForFlowControlResponse() {
 | |
| 	sub.mu.Lock()
 | |
| 	// We don't use defer since if we need to send an RC reply, we need
 | |
| 	// to do it outside the sub's lock. So doing explicit unlock...
 | |
| 	if sub.closed {
 | |
| 		sub.mu.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 	var fcReply string
 | |
| 	var nc *Conn
 | |
| 
 | |
| 	jsi := sub.jsi
 | |
| 	if jsi.csfct == nil {
 | |
| 		jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse)
 | |
| 	} else {
 | |
| 		fcReply = sub.checkForFlowControlResponse()
 | |
| 		nc = sub.conn
 | |
| 		// Do the reset here under the lock, it's ok...
 | |
| 		jsi.csfct.Reset(chanSubFCCheckInterval)
 | |
| 	}
 | |
| 	sub.mu.Unlock()
 | |
| 	// This call will return an error (which we don't care here)
 | |
| 	// if nc is nil or fcReply is empty.
 | |
| 	nc.Publish(fcReply, nil)
 | |
| }
 | |
| 
 | |
| // ErrConsumerSequenceMismatch represents an error from a consumer
 | |
| // that received a Heartbeat including sequence different to the
 | |
| // one expected from the view of the client.
 | |
| type ErrConsumerSequenceMismatch struct {
 | |
| 	// StreamResumeSequence is the stream sequence from where the consumer
 | |
| 	// should resume consuming from the stream.
 | |
| 	StreamResumeSequence uint64
 | |
| 
 | |
| 	// ConsumerSequence is the sequence of the consumer that is behind.
 | |
| 	ConsumerSequence uint64
 | |
| 
 | |
| 	// LastConsumerSequence is the sequence of the consumer when the heartbeat
 | |
| 	// was received.
 | |
| 	LastConsumerSequence uint64
 | |
| }
 | |
| 
 | |
| func (ecs *ErrConsumerSequenceMismatch) Error() string {
 | |
| 	return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
 | |
| 		ecs.ConsumerSequence,
 | |
| 		ecs.LastConsumerSequence-ecs.ConsumerSequence,
 | |
| 		ecs.StreamResumeSequence,
 | |
| 	)
 | |
| }
 | |
| 
 | |
| // isJSControlMessage will return true if this is an empty control status message
 | |
| // and indicate what type of control message it is, say jsCtrlHB or jsCtrlFC
 | |
| func isJSControlMessage(msg *Msg) (bool, int) {
 | |
| 	if len(msg.Data) > 0 || msg.Header.Get(statusHdr) != controlMsg {
 | |
| 		return false, 0
 | |
| 	}
 | |
| 	val := msg.Header.Get(descrHdr)
 | |
| 	if strings.HasPrefix(val, "Idle") {
 | |
| 		return true, jsCtrlHB
 | |
| 	}
 | |
| 	if strings.HasPrefix(val, "Flow") {
 | |
| 		return true, jsCtrlFC
 | |
| 	}
 | |
| 	return true, 0
 | |
| }
 | |
| 
 | |
| // Keeps track of the incoming message's reply subject so that the consumer's
 | |
| // state (deliver sequence, etc..) can be checked against heartbeats.
 | |
| // We will also bump the incoming data message sequence that is used in FC cases.
 | |
| // Runs under the subscription lock
 | |
| func (sub *Subscription) trackSequences(reply string) {
 | |
| 	// For flow control, keep track of incoming message sequence.
 | |
| 	sub.jsi.fciseq++
 | |
| 	sub.jsi.cmeta = reply
 | |
| }
 | |
| 
 | |
| // Check to make sure messages are arriving in order.
 | |
| // Returns true if the sub had to be replaced. Will cause upper layers to return.
 | |
| // The caller has verified that sub.jsi != nil and that this is not a control message.
 | |
| // Lock should be held.
 | |
| func (sub *Subscription) checkOrderedMsgs(m *Msg) bool {
 | |
| 	// Ignore msgs with no reply like HBs and flow control, they are handled elsewhere.
 | |
| 	if m.Reply == _EMPTY_ {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// Normal message here.
 | |
| 	tokens, err := parser.GetMetadataFields(m.Reply)
 | |
| 	if err != nil {
 | |
| 		return false
 | |
| 	}
 | |
| 	sseq, dseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]), parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos])
 | |
| 
 | |
| 	jsi := sub.jsi
 | |
| 	if dseq != jsi.dseq {
 | |
| 		sub.resetOrderedConsumer(jsi.sseq + 1)
 | |
| 		return true
 | |
| 	}
 | |
| 	// Update our tracking here.
 | |
| 	jsi.dseq, jsi.sseq = dseq+1, sseq
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // Update and replace sid.
 | |
| // Lock should be held on entry but will be unlocked to prevent lock inversion.
 | |
| func (sub *Subscription) applyNewSID() (osid int64) {
 | |
| 	nc := sub.conn
 | |
| 	sub.mu.Unlock()
 | |
| 
 | |
| 	nc.subsMu.Lock()
 | |
| 	osid = sub.sid
 | |
| 	delete(nc.subs, osid)
 | |
| 	// Place new one.
 | |
| 	nc.ssid++
 | |
| 	nsid := nc.ssid
 | |
| 	nc.subs[nsid] = sub
 | |
| 	nc.subsMu.Unlock()
 | |
| 
 | |
| 	sub.mu.Lock()
 | |
| 	sub.sid = nsid
 | |
| 	return osid
 | |
| }
 | |
| 
 | |
| // We are here if we have detected a gap with an ordered consumer.
 | |
| // We will create a new consumer and rewire the low level subscription.
 | |
| // Lock should be held.
 | |
| func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
 | |
| 	nc := sub.conn
 | |
| 	if sub.jsi == nil || nc == nil || sub.closed {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var maxStr string
 | |
| 	// If there was an AUTO_UNSUB done, we need to adjust the new value
 | |
| 	// to send after the SUB for the new sid.
 | |
| 	if sub.max > 0 {
 | |
| 		if sub.jsi.fciseq < sub.max {
 | |
| 			adjustedMax := sub.max - sub.jsi.fciseq
 | |
| 			maxStr = strconv.Itoa(int(adjustedMax))
 | |
| 		} else {
 | |
| 			// We are already at the max, so we should just unsub the
 | |
| 			// existing sub and be done
 | |
| 			go func(sid int64) {
 | |
| 				nc.mu.Lock()
 | |
| 				nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_))
 | |
| 				nc.kickFlusher()
 | |
| 				nc.mu.Unlock()
 | |
| 			}(sub.sid)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
 | |
| 	osid := sub.applyNewSID()
 | |
| 
 | |
| 	// Grab new inbox.
 | |
| 	newDeliver := nc.NewInbox()
 | |
| 	sub.Subject = newDeliver
 | |
| 
 | |
| 	// Snapshot the new sid under sub lock.
 | |
| 	nsid := sub.sid
 | |
| 
 | |
| 	// We are still in the low level readLoop for the connection so we need
 | |
| 	// to spin a go routine to try to create the new consumer.
 | |
| 	go func() {
 | |
| 		// Unsubscribe and subscribe with new inbox and sid.
 | |
| 		// Remap a new low level sub into this sub since its client accessible.
 | |
| 		// This is done here in this go routine to prevent lock inversion.
 | |
| 		nc.mu.Lock()
 | |
| 		nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
 | |
| 		nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
 | |
| 		if maxStr != _EMPTY_ {
 | |
| 			nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr))
 | |
| 		}
 | |
| 		nc.kickFlusher()
 | |
| 		nc.mu.Unlock()
 | |
| 
 | |
| 		pushErr := func(err error) {
 | |
| 			nc.handleConsumerSequenceMismatch(sub, fmt.Errorf("%w: recreating ordered consumer", err))
 | |
| 			nc.unsubscribe(sub, 0, true)
 | |
| 		}
 | |
| 
 | |
| 		sub.mu.Lock()
 | |
| 		jsi := sub.jsi
 | |
| 		// Reset some items in jsi.
 | |
| 		jsi.dseq = 1
 | |
| 		jsi.cmeta = _EMPTY_
 | |
| 		jsi.fcr, jsi.fcd = _EMPTY_, 0
 | |
| 		jsi.deliver = newDeliver
 | |
| 		// Reset consumer request for starting policy.
 | |
| 		cfg := jsi.ccreq.Config
 | |
| 		cfg.DeliverSubject = newDeliver
 | |
| 		cfg.DeliverPolicy = DeliverByStartSequencePolicy
 | |
| 		cfg.OptStartSeq = sseq
 | |
| 		// In case the consumer was created with a start time, we need to clear it
 | |
| 		// since we are now using a start sequence.
 | |
| 		cfg.OptStartTime = nil
 | |
| 
 | |
| 		js := jsi.js
 | |
| 		sub.mu.Unlock()
 | |
| 
 | |
| 		sub.mu.Lock()
 | |
| 		// Attempt to delete the existing consumer.
 | |
| 		// We don't wait for the response since even if it's unsuccessful,
 | |
| 		// inactivity threshold will kick in and delete it.
 | |
| 		if jsi.consumer != _EMPTY_ {
 | |
| 			go js.DeleteConsumer(jsi.stream, jsi.consumer)
 | |
| 		}
 | |
| 		jsi.consumer = ""
 | |
| 		sub.mu.Unlock()
 | |
| 		consName := getHash(nuid.Next())
 | |
| 		cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
 | |
| 		if err != nil {
 | |
| 			var apiErr *APIError
 | |
| 			if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
 | |
| 				// if creating consumer failed, retry
 | |
| 				return
 | |
| 			} else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeInsufficientResourcesErr {
 | |
| 				// retry for insufficient resources, as it may mean that client is connected to a running
 | |
| 				// server in cluster while the server hosting R1 JetStream resources is restarting
 | |
| 				return
 | |
| 			}
 | |
| 			pushErr(err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		sub.mu.Lock()
 | |
| 		jsi.consumer = cinfo.Name
 | |
| 		sub.mu.Unlock()
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // For jetstream subscriptions, returns the number of delivered messages.
 | |
| // For ChanSubscription, this value is computed based on the known number
 | |
| // of messages added to the channel minus the current size of that channel.
 | |
| // Lock held on entry
 | |
| func (sub *Subscription) getJSDelivered() uint64 {
 | |
| 	if sub.typ == ChanSubscription {
 | |
| 		return sub.jsi.fciseq - uint64(len(sub.mch))
 | |
| 	}
 | |
| 	return sub.delivered
 | |
| }
 | |
| 
 | |
| // checkForFlowControlResponse will check to see if we should send a flow control response
 | |
| // based on the subscription current delivered index and the target.
 | |
| // Runs under subscription lock
 | |
| func (sub *Subscription) checkForFlowControlResponse() string {
 | |
| 	// Caller has verified that there is a sub.jsi and fc
 | |
| 	jsi := sub.jsi
 | |
| 	jsi.active = true
 | |
| 	if sub.getJSDelivered() >= jsi.fcd {
 | |
| 		fcr := jsi.fcr
 | |
| 		jsi.fcr, jsi.fcd = _EMPTY_, 0
 | |
| 		return fcr
 | |
| 	}
 | |
| 	return _EMPTY_
 | |
| }
 | |
| 
 | |
| // Record an inbound flow control message.
 | |
| // Runs under subscription lock
 | |
| func (sub *Subscription) scheduleFlowControlResponse(reply string) {
 | |
| 	sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq
 | |
| }
 | |
| 
 | |
| // Checks for activity from our consumer.
 | |
| // If we do not think we are active send an async error.
 | |
| func (sub *Subscription) activityCheck() {
 | |
| 	sub.mu.Lock()
 | |
| 	jsi := sub.jsi
 | |
| 	if jsi == nil || sub.closed {
 | |
| 		sub.mu.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	active := jsi.active
 | |
| 	jsi.hbc.Reset(jsi.hbi * hbcThresh)
 | |
| 	jsi.active = false
 | |
| 	nc := sub.conn
 | |
| 	sub.mu.Unlock()
 | |
| 
 | |
| 	if !active {
 | |
| 		if !jsi.ordered || nc.Status() != CONNECTED {
 | |
| 			nc.mu.Lock()
 | |
| 			if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
 | |
| 				nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
 | |
| 			}
 | |
| 			nc.mu.Unlock()
 | |
| 			return
 | |
| 		}
 | |
| 		sub.mu.Lock()
 | |
| 		sub.resetOrderedConsumer(jsi.sseq + 1)
 | |
| 		sub.mu.Unlock()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // scheduleHeartbeatCheck sets up the timer check to make sure we are active
 | |
| // or receiving idle heartbeats..
 | |
| func (sub *Subscription) scheduleHeartbeatCheck() {
 | |
| 	sub.mu.Lock()
 | |
| 	defer sub.mu.Unlock()
 | |
| 
 | |
| 	jsi := sub.jsi
 | |
| 	if jsi == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if jsi.hbc == nil {
 | |
| 		jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck)
 | |
| 	} else {
 | |
| 		jsi.hbc.Reset(jsi.hbi * hbcThresh)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
 | |
| func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
 | |
| 	nc.mu.Lock()
 | |
| 	errCB := nc.Opts.AsyncErrorCB
 | |
| 	if errCB != nil {
 | |
| 		nc.ach.push(func() { errCB(nc, sub, err) })
 | |
| 	}
 | |
| 	nc.mu.Unlock()
 | |
| }
 | |
| 
 | |
| // checkForSequenceMismatch will make sure we have not missed any messages since last seen.
 | |
| func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
 | |
| 	// Process heartbeat received, get latest control metadata if present.
 | |
| 	s.mu.Lock()
 | |
| 	ctrl, ordered := jsi.cmeta, jsi.ordered
 | |
| 	jsi.active = true
 | |
| 	s.mu.Unlock()
 | |
| 
 | |
| 	if ctrl == _EMPTY_ {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	tokens, err := parser.GetMetadataFields(ctrl)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Consumer sequence.
 | |
| 	var ldseq string
 | |
| 	dseq := tokens[parser.AckConsumerSeqTokenPos]
 | |
| 	hdr := msg.Header[lastConsumerSeqHdr]
 | |
| 	if len(hdr) == 1 {
 | |
| 		ldseq = hdr[0]
 | |
| 	}
 | |
| 
 | |
| 	// Detect consumer sequence mismatch and whether
 | |
| 	// should restart the consumer.
 | |
| 	if ldseq != dseq {
 | |
| 		// Dispatch async error including details such as
 | |
| 		// from where the consumer could be restarted.
 | |
| 		sseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos])
 | |
| 		if ordered {
 | |
| 			s.mu.Lock()
 | |
| 			s.resetOrderedConsumer(jsi.sseq + 1)
 | |
| 			s.mu.Unlock()
 | |
| 		} else {
 | |
| 			ecs := &ErrConsumerSequenceMismatch{
 | |
| 				StreamResumeSequence: uint64(sseq),
 | |
| 				ConsumerSequence:     parser.ParseNum(dseq),
 | |
| 				LastConsumerSequence: parser.ParseNum(ldseq),
 | |
| 			}
 | |
| 			nc.handleConsumerSequenceMismatch(s, ecs)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type streamRequest struct {
 | |
| 	Subject string `json:"subject,omitempty"`
 | |
| }
 | |
| 
 | |
| type streamNamesResponse struct {
 | |
| 	apiResponse
 | |
| 	apiPaged
 | |
| 	Streams []string `json:"streams"`
 | |
| }
 | |
| 
 | |
| type subOpts struct {
 | |
| 	// For attaching.
 | |
| 	stream, consumer string
 | |
| 	// For creating or updating.
 | |
| 	cfg *ConsumerConfig
 | |
| 	// For binding a subscription to a consumer without creating it.
 | |
| 	bound bool
 | |
| 	// For manual ack
 | |
| 	mack bool
 | |
| 	// For an ordered consumer.
 | |
| 	ordered bool
 | |
| 	ctx     context.Context
 | |
| 
 | |
| 	// To disable calling ConsumerInfo
 | |
| 	skipCInfo bool
 | |
| }
 | |
| 
 | |
| // SkipConsumerLookup will omit looking up consumer when [Bind], [Durable]
 | |
| // or [ConsumerName] are provided.
 | |
| //
 | |
| // NOTE: This setting may cause an existing consumer to be overwritten. Also,
 | |
| // because consumer lookup is skipped, all consumer options like AckPolicy,
 | |
| // DeliverSubject etc. need to be provided even if consumer already exists.
 | |
| func SkipConsumerLookup() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.skipCInfo = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
 | |
| // There are no redeliveries and no acks, and flow control and heartbeats will be added but
 | |
| // will be taken care of without additional client code.
 | |
| func OrderedConsumer() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.ordered = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ManualAck disables auto ack functionality for async subscriptions.
 | |
| func ManualAck() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.mack = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Description will set the description for the created consumer.
 | |
| func Description(description string) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.Description = description
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Durable defines the consumer name for JetStream durable subscribers.
 | |
| // This function will return ErrInvalidConsumerName if the name contains
 | |
| // any dot ".".
 | |
| func Durable(consumer string) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		if opts.cfg.Durable != _EMPTY_ {
 | |
| 			return fmt.Errorf("nats: option Durable set more than once")
 | |
| 		}
 | |
| 		if opts.consumer != _EMPTY_ && opts.consumer != consumer {
 | |
| 			return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
 | |
| 		}
 | |
| 		if err := checkConsumerName(consumer); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		opts.cfg.Durable = consumer
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // DeliverAll will configure a Consumer to receive all the
 | |
| // messages from a Stream.
 | |
| func DeliverAll() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.DeliverPolicy = DeliverAllPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // DeliverLast configures a Consumer to receive messages
 | |
| // starting with the latest one.
 | |
| func DeliverLast() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.DeliverPolicy = DeliverLastPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // DeliverLastPerSubject configures a Consumer to receive messages
 | |
| // starting with the latest one for each filtered subject.
 | |
| func DeliverLastPerSubject() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // DeliverNew configures a Consumer to receive messages
 | |
| // published after the subscription.
 | |
| func DeliverNew() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.DeliverPolicy = DeliverNewPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // StartSequence configures a Consumer to receive
 | |
| // messages from a start sequence.
 | |
| func StartSequence(seq uint64) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
 | |
| 		opts.cfg.OptStartSeq = seq
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // StartTime configures a Consumer to receive
 | |
| // messages from a start time.
 | |
| func StartTime(startTime time.Time) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
 | |
| 		opts.cfg.OptStartTime = &startTime
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // AckNone requires no acks for delivered messages.
 | |
| func AckNone() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.AckPolicy = AckNonePolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // AckAll when acking a sequence number, this implicitly acks all sequences
 | |
| // below this one as well.
 | |
| func AckAll() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.AckPolicy = AckAllPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // AckExplicit requires ack or nack for all messages.
 | |
| func AckExplicit() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.AckPolicy = AckExplicitPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // MaxDeliver sets the number of redeliveries for a message.
 | |
| func MaxDeliver(n int) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.MaxDeliver = n
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // MaxAckPending sets the number of outstanding acks that are allowed before
 | |
| // message delivery is halted.
 | |
| func MaxAckPending(n int) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.MaxAckPending = n
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ReplayOriginal replays the messages at the original speed.
 | |
| func ReplayOriginal() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.ReplayPolicy = ReplayOriginalPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ReplayInstant replays the messages as fast as possible.
 | |
| func ReplayInstant() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.ReplayPolicy = ReplayInstantPolicy
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // RateLimit is the Bits per sec rate limit applied to a push consumer.
 | |
| func RateLimit(n uint64) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.RateLimit = n
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // BackOff is an array of time durations that represent the time to delay based on delivery count.
 | |
| func BackOff(backOff []time.Duration) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.BackOff = backOff
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // BindStream binds a consumer to a stream explicitly based on a name.
 | |
| // When a stream name is not specified, the library uses the subscribe
 | |
| // subject as a way to find the stream name. It is done by making a request
 | |
| // to the server to get list of stream names that have a filter for this
 | |
| // subject. If the returned list contains a single stream, then this
 | |
| // stream name will be used, otherwise the `ErrNoMatchingStream` is returned.
 | |
| // To avoid the stream lookup, provide the stream name with this function.
 | |
| // See also `Bind()`.
 | |
| func BindStream(stream string) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		if opts.stream != _EMPTY_ && opts.stream != stream {
 | |
| 			return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
 | |
| 		}
 | |
| 
 | |
| 		opts.stream = stream
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Bind binds a subscription to an existing consumer from a stream without attempting to create.
 | |
| // The first argument is the stream name and the second argument will be the consumer name.
 | |
| func Bind(stream, consumer string) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		if stream == _EMPTY_ {
 | |
| 			return ErrStreamNameRequired
 | |
| 		}
 | |
| 		if consumer == _EMPTY_ {
 | |
| 			return ErrConsumerNameRequired
 | |
| 		}
 | |
| 
 | |
| 		// In case of pull subscribers, the durable name is a required parameter
 | |
| 		// so check that they are not different.
 | |
| 		if opts.cfg.Durable != _EMPTY_ && opts.cfg.Durable != consumer {
 | |
| 			return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.cfg.Durable, consumer)
 | |
| 		}
 | |
| 		if opts.stream != _EMPTY_ && opts.stream != stream {
 | |
| 			return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
 | |
| 		}
 | |
| 		opts.stream = stream
 | |
| 		opts.consumer = consumer
 | |
| 		opts.bound = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // EnableFlowControl enables flow control for a push based consumer.
 | |
| func EnableFlowControl() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.FlowControl = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
 | |
| // For pull consumers, idle heartbeat has to be set on each [Fetch] call.
 | |
| func IdleHeartbeat(duration time.Duration) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.Heartbeat = duration
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // DeliverSubject specifies the JetStream consumer deliver subject.
 | |
| //
 | |
| // This option is used only in situations where the consumer does not exist
 | |
| // and a creation request is sent to the server. If not provided, an inbox
 | |
| // will be selected.
 | |
| // If a consumer exists, then the NATS subscription will be created on
 | |
| // the JetStream consumer's DeliverSubject, not necessarily this subject.
 | |
| func DeliverSubject(subject string) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.DeliverSubject = subject
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // HeadersOnly() will instruct the consumer to only deliver headers and no payloads.
 | |
| func HeadersOnly() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.HeadersOnly = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // MaxRequestBatch sets the maximum pull consumer batch size that a Fetch()
 | |
| // can request.
 | |
| func MaxRequestBatch(max int) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.MaxRequestBatch = max
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // MaxRequestExpires sets the maximum pull consumer request expiration that a
 | |
| // Fetch() can request (using the Fetch's timeout value).
 | |
| func MaxRequestExpires(max time.Duration) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.MaxRequestExpires = max
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
 | |
| // Fetch() can receive.
 | |
| func MaxRequestMaxBytes(bytes int) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.MaxRequestMaxBytes = bytes
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // InactiveThreshold indicates how long the server should keep a consumer
 | |
| // after detecting a lack of activity. In NATS Server 2.8.4 and earlier, this
 | |
| // option only applies to ephemeral consumers. In NATS Server 2.9.0 and later,
 | |
| // this option applies to both ephemeral and durable consumers, allowing durable
 | |
| // consumers to also be deleted automatically after the inactivity threshold has
 | |
| // passed.
 | |
| func InactiveThreshold(threshold time.Duration) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		if threshold < 0 {
 | |
| 			return fmt.Errorf("invalid InactiveThreshold value (%v), needs to be greater or equal to 0", threshold)
 | |
| 		}
 | |
| 		opts.cfg.InactiveThreshold = threshold
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ConsumerReplicas sets the number of replica count for a consumer.
 | |
| func ConsumerReplicas(replicas int) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		if replicas < 1 {
 | |
| 			return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", replicas)
 | |
| 		}
 | |
| 		opts.cfg.Replicas = replicas
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ConsumerMemoryStorage sets the memory storage to true for a consumer.
 | |
| func ConsumerMemoryStorage() SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.MemoryStorage = true
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ConsumerName sets the name for a consumer.
 | |
| func ConsumerName(name string) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.Name = name
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ConsumerFilterSubjects can be used to set multiple subject filters on the consumer.
 | |
| // It has to be used in conjunction with [nats.BindStream] and
 | |
| // with empty 'subject' parameter.
 | |
| func ConsumerFilterSubjects(subjects ...string) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.FilterSubjects = subjects
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
 | |
| 	sub.mu.Lock()
 | |
| 	// TODO(dlc) - Better way to mark especially if we attach.
 | |
| 	if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
 | |
| 		sub.mu.Unlock()
 | |
| 		return nil, ErrTypeSubscription
 | |
| 	}
 | |
| 
 | |
| 	// Consumer info lookup should fail if in direct mode.
 | |
| 	js := sub.jsi.js
 | |
| 	stream, consumer := sub.jsi.stream, sub.jsi.consumer
 | |
| 	sub.mu.Unlock()
 | |
| 
 | |
| 	return js.getConsumerInfo(stream, consumer)
 | |
| }
 | |
| 
 | |
| type pullOpts struct {
 | |
| 	maxBytes int
 | |
| 	ttl      time.Duration
 | |
| 	ctx      context.Context
 | |
| 	hb       time.Duration
 | |
| }
 | |
| 
 | |
| // PullOpt are the options that can be passed when pulling a batch of messages.
 | |
| type PullOpt interface {
 | |
| 	configurePull(opts *pullOpts) error
 | |
| }
 | |
| 
 | |
| // PullMaxWaiting defines the max inflight pull requests.
 | |
| func PullMaxWaiting(n int) SubOpt {
 | |
| 	return subOptFn(func(opts *subOpts) error {
 | |
| 		opts.cfg.MaxWaiting = n
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| type PullHeartbeat time.Duration
 | |
| 
 | |
| func (h PullHeartbeat) configurePull(opts *pullOpts) error {
 | |
| 	if h <= 0 {
 | |
| 		return fmt.Errorf("%w: idle heartbeat has to be greater than 0", ErrInvalidArg)
 | |
| 	}
 | |
| 	opts.hb = time.Duration(h)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // PullMaxBytes defines the max bytes allowed for a fetch request.
 | |
| type PullMaxBytes int
 | |
| 
 | |
| func (n PullMaxBytes) configurePull(opts *pullOpts) error {
 | |
| 	opts.maxBytes = int(n)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	// errNoMessages is an error that a Fetch request using no_wait can receive to signal
 | |
| 	// that there are no more messages available.
 | |
| 	errNoMessages = errors.New("nats: no messages")
 | |
| 
 | |
| 	// errRequestsPending is an error that represents a sub.Fetch requests that was using
 | |
| 	// no_wait and expires time got discarded by the server.
 | |
| 	errRequestsPending = errors.New("nats: requests pending")
 | |
| )
 | |
| 
 | |
| // Returns if the given message is a user message or not, and if
 | |
| // `checkSts` is true, returns appropriate error based on the
 | |
| // content of the status (404, etc..)
 | |
| func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
 | |
| 	// Assume user message
 | |
| 	usrMsg = true
 | |
| 
 | |
| 	// If payload or no header, consider this a user message
 | |
| 	if len(msg.Data) > 0 || len(msg.Header) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	// Look for status header
 | |
| 	val := msg.Header.Get(statusHdr)
 | |
| 	// If not present, then this is considered a user message
 | |
| 	if val == _EMPTY_ {
 | |
| 		return
 | |
| 	}
 | |
| 	// At this point, this is not a user message since there is
 | |
| 	// no payload and a "Status" header.
 | |
| 	usrMsg = false
 | |
| 
 | |
| 	// If we don't care about status, we are done.
 | |
| 	if !checkSts {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// if it's a heartbeat message, report as not user msg
 | |
| 	if isHb, _ := isJSControlMessage(msg); isHb {
 | |
| 		return
 | |
| 	}
 | |
| 	switch val {
 | |
| 	case noResponders:
 | |
| 		err = ErrNoResponders
 | |
| 	case noMessagesSts:
 | |
| 		// 404 indicates that there are no messages.
 | |
| 		err = errNoMessages
 | |
| 	case reqTimeoutSts:
 | |
| 		// In case of a fetch request with no wait request and expires time,
 | |
| 		// need to skip 408 errors and retry.
 | |
| 		if isNoWait {
 | |
| 			err = errRequestsPending
 | |
| 		} else {
 | |
| 			// Older servers may send a 408 when a request in the server was expired
 | |
| 			// and interest is still found, which will be the case for our
 | |
| 			// implementation. Regardless, ignore 408 errors until receiving at least
 | |
| 			// one message when making requests without no_wait.
 | |
| 			err = ErrTimeout
 | |
| 		}
 | |
| 	case jetStream409Sts:
 | |
| 		if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "consumer deleted") {
 | |
| 			err = ErrConsumerDeleted
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "leadership change") {
 | |
| 			err = ErrConsumerLeadershipChanged
 | |
| 			break
 | |
| 		}
 | |
| 		fallthrough
 | |
| 	default:
 | |
| 		err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Fetch pulls a batch of messages from a stream for a pull consumer.
 | |
| func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
 | |
| 	if sub == nil {
 | |
| 		return nil, ErrBadSubscription
 | |
| 	}
 | |
| 	if batch < 1 {
 | |
| 		return nil, ErrInvalidArg
 | |
| 	}
 | |
| 
 | |
| 	var o pullOpts
 | |
| 	for _, opt := range opts {
 | |
| 		if err := opt.configurePull(&o); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	if o.ctx != nil && o.ttl != 0 {
 | |
| 		return nil, ErrContextAndTimeout
 | |
| 	}
 | |
| 
 | |
| 	sub.mu.Lock()
 | |
| 	jsi := sub.jsi
 | |
| 	// Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
 | |
| 	// so check for jsi.pull boolean instead.
 | |
| 	if jsi == nil || !jsi.pull {
 | |
| 		sub.mu.Unlock()
 | |
| 		return nil, ErrTypeSubscription
 | |
| 	}
 | |
| 
 | |
| 	nc := sub.conn
 | |
| 	nms := sub.jsi.nms
 | |
| 	rply, _ := newFetchInbox(jsi.deliver)
 | |
| 	js := sub.jsi.js
 | |
| 	pmc := len(sub.mch) > 0
 | |
| 
 | |
| 	// All fetch requests have an expiration, in case of no explicit expiration
 | |
| 	// then the default timeout of the JetStream context is used.
 | |
| 	ttl := o.ttl
 | |
| 	if ttl == 0 {
 | |
| 		ttl = js.opts.wait
 | |
| 	}
 | |
| 	sub.mu.Unlock()
 | |
| 
 | |
| 	// Use the given context or setup a default one for the span
 | |
| 	// of the pull batch request.
 | |
| 	var (
 | |
| 		ctx    = o.ctx
 | |
| 		err    error
 | |
| 		cancel context.CancelFunc
 | |
| 	)
 | |
| 	if ctx == nil {
 | |
| 		ctx, cancel = context.WithTimeout(context.Background(), ttl)
 | |
| 	} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
 | |
| 		// Prevent from passing the background context which will just block
 | |
| 		// and cannot be canceled either.
 | |
| 		if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
 | |
| 			return nil, ErrNoDeadlineContext
 | |
| 		}
 | |
| 
 | |
| 		// If the context did not have a deadline, then create a new child context
 | |
| 		// that will use the default timeout from the JS context.
 | |
| 		ctx, cancel = context.WithTimeout(ctx, ttl)
 | |
| 	} else {
 | |
| 		ctx, cancel = context.WithCancel(ctx)
 | |
| 	}
 | |
| 	defer cancel()
 | |
| 
 | |
| 	// if heartbeat is set, validate it against the context timeout
 | |
| 	if o.hb > 0 {
 | |
| 		deadline, _ := ctx.Deadline()
 | |
| 		if 2*o.hb >= time.Until(deadline) {
 | |
| 			return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Check if context not done already before making the request.
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		if o.ctx != nil { // Timeout or Cancel triggered by context object option
 | |
| 			err = ctx.Err()
 | |
| 		} else { // Timeout triggered by timeout option
 | |
| 			err = ErrTimeout
 | |
| 		}
 | |
| 	default:
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		msgs = make([]*Msg, 0, batch)
 | |
| 		msg  *Msg
 | |
| 	)
 | |
| 	for pmc && len(msgs) < batch {
 | |
| 		// Check next msg with booleans that say that this is an internal call
 | |
| 		// for a pull subscribe (so don't reject it) and don't wait if there
 | |
| 		// are no messages.
 | |
| 		msg, err = sub.nextMsgWithContext(ctx, true, false)
 | |
| 		if err != nil {
 | |
| 			if errors.Is(err, errNoMessages) {
 | |
| 				err = nil
 | |
| 			}
 | |
| 			break
 | |
| 		}
 | |
| 		// Check msg but just to determine if this is a user message
 | |
| 		// or status message, however, we don't care about values of status
 | |
| 		// messages at this point in the Fetch() call, so checkMsg can't
 | |
| 		// return an error.
 | |
| 		if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
 | |
| 			msgs = append(msgs, msg)
 | |
| 		}
 | |
| 	}
 | |
| 	var hbTimer *time.Timer
 | |
| 	var hbErr error
 | |
| 	sub.mu.Lock()
 | |
| 	subClosed := sub.closed || sub.draining
 | |
| 	sub.mu.Unlock()
 | |
| 	if subClosed {
 | |
| 		err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
 | |
| 	}
 | |
| 	hbLock := sync.Mutex{}
 | |
| 	if err == nil && len(msgs) < batch && !subClosed {
 | |
| 		// For batch real size of 1, it does not make sense to set no_wait in
 | |
| 		// the request.
 | |
| 		noWait := batch-len(msgs) > 1
 | |
| 
 | |
| 		var nr nextRequest
 | |
| 
 | |
| 		sendReq := func() error {
 | |
| 			// The current deadline for the context will be used
 | |
| 			// to set the expires TTL for a fetch request.
 | |
| 			deadline, _ := ctx.Deadline()
 | |
| 			ttl = time.Until(deadline)
 | |
| 
 | |
| 			// Check if context has already been canceled or expired.
 | |
| 			select {
 | |
| 			case <-ctx.Done():
 | |
| 				return ctx.Err()
 | |
| 			default:
 | |
| 			}
 | |
| 
 | |
| 			// Make our request expiration a bit shorter than the current timeout.
 | |
| 			expires := ttl
 | |
| 			if ttl >= 20*time.Millisecond {
 | |
| 				expires = ttl - 10*time.Millisecond
 | |
| 			}
 | |
| 
 | |
| 			nr.Batch = batch - len(msgs)
 | |
| 			nr.Expires = expires
 | |
| 			nr.NoWait = noWait
 | |
| 			nr.MaxBytes = o.maxBytes
 | |
| 			if 2*o.hb < expires {
 | |
| 				nr.Heartbeat = o.hb
 | |
| 			} else {
 | |
| 				nr.Heartbeat = 0
 | |
| 			}
 | |
| 			req, _ := json.Marshal(nr)
 | |
| 			if err := nc.PublishRequest(nms, rply, req); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			if o.hb > 0 {
 | |
| 				if hbTimer == nil {
 | |
| 					hbTimer = time.AfterFunc(2*o.hb, func() {
 | |
| 						hbLock.Lock()
 | |
| 						hbErr = ErrNoHeartbeat
 | |
| 						hbLock.Unlock()
 | |
| 						cancel()
 | |
| 					})
 | |
| 				} else {
 | |
| 					hbTimer.Reset(2 * o.hb)
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		err = sendReq()
 | |
| 		for err == nil && len(msgs) < batch {
 | |
| 			// Ask for next message and wait if there are no messages
 | |
| 			msg, err = sub.nextMsgWithContext(ctx, true, true)
 | |
| 			if err == nil {
 | |
| 				if hbTimer != nil {
 | |
| 					hbTimer.Reset(2 * o.hb)
 | |
| 				}
 | |
| 				var usrMsg bool
 | |
| 
 | |
| 				usrMsg, err = checkMsg(msg, true, noWait)
 | |
| 				if err == nil && usrMsg {
 | |
| 					msgs = append(msgs, msg)
 | |
| 				} else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 {
 | |
| 					// If we have a 404/408 for our "no_wait" request and have
 | |
| 					// not collected any message, then resend request to
 | |
| 					// wait this time.
 | |
| 					noWait = false
 | |
| 					err = sendReq()
 | |
| 				} else if errors.Is(err, ErrTimeout) && len(msgs) == 0 {
 | |
| 					// If we get a 408, we will bail if we already collected some
 | |
| 					// messages, otherwise ignore and go back calling nextMsg.
 | |
| 					err = nil
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		if hbTimer != nil {
 | |
| 			hbTimer.Stop()
 | |
| 		}
 | |
| 	}
 | |
| 	// If there is at least a message added to msgs, then need to return OK and no error
 | |
| 	if err != nil && len(msgs) == 0 {
 | |
| 		hbLock.Lock()
 | |
| 		defer hbLock.Unlock()
 | |
| 		if hbErr != nil {
 | |
| 			return nil, hbErr
 | |
| 		}
 | |
| 		return nil, o.checkCtxErr(err)
 | |
| 	}
 | |
| 	return msgs, nil
 | |
| }
 | |
| 
 | |
| // newFetchInbox returns subject used as reply subject when sending pull requests
 | |
| // as well as request ID. For non-wildcard subject, request ID is empty and
 | |
| // passed subject is not transformed
 | |
| func newFetchInbox(subj string) (string, string) {
 | |
| 	if !strings.HasSuffix(subj, ".*") {
 | |
| 		return subj, ""
 | |
| 	}
 | |
| 	reqID := nuid.Next()
 | |
| 	var sb strings.Builder
 | |
| 	sb.WriteString(subj[:len(subj)-1])
 | |
| 	sb.WriteString(reqID)
 | |
| 	return sb.String(), reqID
 | |
| }
 | |
| 
 | |
| func subjectMatchesReqID(subject, reqID string) bool {
 | |
| 	subjectParts := strings.Split(subject, ".")
 | |
| 	if len(subjectParts) < 2 {
 | |
| 		return false
 | |
| 	}
 | |
| 	return subjectParts[len(subjectParts)-1] == reqID
 | |
| }
 | |
| 
 | |
| // MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch].
 | |
| type MessageBatch interface {
 | |
| 	// Messages returns a channel on which messages will be published.
 | |
| 	Messages() <-chan *Msg
 | |
| 
 | |
| 	// Error returns an error encountered when fetching messages.
 | |
| 	Error() error
 | |
| 
 | |
| 	// Done signals end of execution.
 | |
| 	Done() <-chan struct{}
 | |
| }
 | |
| 
 | |
| type messageBatch struct {
 | |
| 	msgs chan *Msg
 | |
| 	err  error
 | |
| 	done chan struct{}
 | |
| }
 | |
| 
 | |
| func (mb *messageBatch) Messages() <-chan *Msg {
 | |
| 	return mb.msgs
 | |
| }
 | |
| 
 | |
| func (mb *messageBatch) Error() error {
 | |
| 	return mb.err
 | |
| }
 | |
| 
 | |
| func (mb *messageBatch) Done() <-chan struct{} {
 | |
| 	return mb.done
 | |
| }
 | |
| 
 | |
| // FetchBatch pulls a batch of messages from a stream for a pull consumer.
 | |
| // Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch],
 | |
| // allowing to retrieve incoming messages from a channel.
 | |
| // The returned channel is always closed after all messages for a batch have been
 | |
| // delivered by the server - it is safe to iterate over it using range.
 | |
| //
 | |
| // To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
 | |
| // or [nats.Context] (with deadline set).
 | |
| //
 | |
| // This method will not return error in case of pull request expiry (even if there are no messages).
 | |
| // Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
 | |
| func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error) {
 | |
| 	if sub == nil {
 | |
| 		return nil, ErrBadSubscription
 | |
| 	}
 | |
| 	if batch < 1 {
 | |
| 		return nil, ErrInvalidArg
 | |
| 	}
 | |
| 
 | |
| 	var o pullOpts
 | |
| 	for _, opt := range opts {
 | |
| 		if err := opt.configurePull(&o); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	if o.ctx != nil && o.ttl != 0 {
 | |
| 		return nil, ErrContextAndTimeout
 | |
| 	}
 | |
| 	sub.mu.Lock()
 | |
| 	jsi := sub.jsi
 | |
| 	// Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
 | |
| 	// so check for jsi.pull boolean instead.
 | |
| 	if jsi == nil || !jsi.pull {
 | |
| 		sub.mu.Unlock()
 | |
| 		return nil, ErrTypeSubscription
 | |
| 	}
 | |
| 
 | |
| 	nc := sub.conn
 | |
| 	nms := sub.jsi.nms
 | |
| 	rply, reqID := newFetchInbox(sub.jsi.deliver)
 | |
| 	js := sub.jsi.js
 | |
| 	pmc := len(sub.mch) > 0
 | |
| 
 | |
| 	// All fetch requests have an expiration, in case of no explicit expiration
 | |
| 	// then the default timeout of the JetStream context is used.
 | |
| 	ttl := o.ttl
 | |
| 	if ttl == 0 {
 | |
| 		ttl = js.opts.wait
 | |
| 	}
 | |
| 	sub.mu.Unlock()
 | |
| 
 | |
| 	// Use the given context or setup a default one for the span
 | |
| 	// of the pull batch request.
 | |
| 	var (
 | |
| 		ctx           = o.ctx
 | |
| 		cancel        context.CancelFunc
 | |
| 		cancelContext = true
 | |
| 	)
 | |
| 	if ctx == nil {
 | |
| 		ctx, cancel = context.WithTimeout(context.Background(), ttl)
 | |
| 	} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
 | |
| 		// Prevent from passing the background context which will just block
 | |
| 		// and cannot be canceled either.
 | |
| 		if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
 | |
| 			return nil, ErrNoDeadlineContext
 | |
| 		}
 | |
| 
 | |
| 		// If the context did not have a deadline, then create a new child context
 | |
| 		// that will use the default timeout from the JS context.
 | |
| 		ctx, cancel = context.WithTimeout(ctx, ttl)
 | |
| 	} else {
 | |
| 		ctx, cancel = context.WithCancel(ctx)
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		// only cancel the context here if we are sure the fetching goroutine has not been started yet
 | |
| 		if cancelContext {
 | |
| 			cancel()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// if heartbeat is set, validate it against the context timeout
 | |
| 	if o.hb > 0 {
 | |
| 		deadline, _ := ctx.Deadline()
 | |
| 		if 2*o.hb >= time.Until(deadline) {
 | |
| 			return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Check if context not done already before making the request.
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		if o.ctx != nil { // Timeout or Cancel triggered by context object option
 | |
| 			return nil, ctx.Err()
 | |
| 		} else { // Timeout triggered by timeout option
 | |
| 			return nil, ErrTimeout
 | |
| 		}
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	result := &messageBatch{
 | |
| 		msgs: make(chan *Msg, batch),
 | |
| 		done: make(chan struct{}, 1),
 | |
| 	}
 | |
| 	var msg *Msg
 | |
| 	for pmc && len(result.msgs) < batch {
 | |
| 		// Check next msg with booleans that say that this is an internal call
 | |
| 		// for a pull subscribe (so don't reject it) and don't wait if there
 | |
| 		// are no messages.
 | |
| 		msg, err := sub.nextMsgWithContext(ctx, true, false)
 | |
| 		if err != nil {
 | |
| 			if errors.Is(err, errNoMessages) {
 | |
| 				err = nil
 | |
| 			}
 | |
| 			result.err = err
 | |
| 			break
 | |
| 		}
 | |
| 		// Check msg but just to determine if this is a user message
 | |
| 		// or status message, however, we don't care about values of status
 | |
| 		// messages at this point in the Fetch() call, so checkMsg can't
 | |
| 		// return an error.
 | |
| 		if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
 | |
| 			result.msgs <- msg
 | |
| 		}
 | |
| 	}
 | |
| 	sub.mu.Lock()
 | |
| 	subClosed := sub.closed || sub.draining
 | |
| 	sub.mu.Unlock()
 | |
| 	if len(result.msgs) == batch || result.err != nil || subClosed {
 | |
| 		close(result.msgs)
 | |
| 		if subClosed && len(result.msgs) == 0 {
 | |
| 			return nil, errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
 | |
| 		}
 | |
| 		result.done <- struct{}{}
 | |
| 		return result, nil
 | |
| 	}
 | |
| 
 | |
| 	deadline, _ := ctx.Deadline()
 | |
| 	ttl = time.Until(deadline)
 | |
| 
 | |
| 	// Make our request expiration a bit shorter than the current timeout.
 | |
| 	expires := ttl
 | |
| 	if ttl >= 20*time.Millisecond {
 | |
| 		expires = ttl - 10*time.Millisecond
 | |
| 	}
 | |
| 
 | |
| 	requestBatch := batch - len(result.msgs)
 | |
| 	req := nextRequest{
 | |
| 		Expires:   expires,
 | |
| 		Batch:     requestBatch,
 | |
| 		MaxBytes:  o.maxBytes,
 | |
| 		Heartbeat: o.hb,
 | |
| 	}
 | |
| 	reqJSON, err := json.Marshal(req)
 | |
| 	if err != nil {
 | |
| 		close(result.msgs)
 | |
| 		result.done <- struct{}{}
 | |
| 		result.err = err
 | |
| 		return result, nil
 | |
| 	}
 | |
| 	if err := nc.PublishRequest(nms, rply, reqJSON); err != nil {
 | |
| 		if len(result.msgs) == 0 {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		close(result.msgs)
 | |
| 		result.done <- struct{}{}
 | |
| 		result.err = err
 | |
| 		return result, nil
 | |
| 	}
 | |
| 	var hbTimer *time.Timer
 | |
| 	var hbErr error
 | |
| 	hbLock := sync.Mutex{}
 | |
| 	if o.hb > 0 {
 | |
| 		hbTimer = time.AfterFunc(2*o.hb, func() {
 | |
| 			hbLock.Lock()
 | |
| 			hbErr = ErrNoHeartbeat
 | |
| 			hbLock.Unlock()
 | |
| 			cancel()
 | |
| 		})
 | |
| 	}
 | |
| 	cancelContext = false
 | |
| 	go func() {
 | |
| 		defer cancel()
 | |
| 		var requestMsgs int
 | |
| 		for requestMsgs < requestBatch {
 | |
| 			// Ask for next message and wait if there are no messages
 | |
| 			msg, err = sub.nextMsgWithContext(ctx, true, true)
 | |
| 			if err != nil {
 | |
| 				break
 | |
| 			}
 | |
| 			if hbTimer != nil {
 | |
| 				hbTimer.Reset(2 * o.hb)
 | |
| 			}
 | |
| 			var usrMsg bool
 | |
| 
 | |
| 			usrMsg, err = checkMsg(msg, true, false)
 | |
| 			if err != nil {
 | |
| 				if errors.Is(err, ErrTimeout) {
 | |
| 					if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) {
 | |
| 						// ignore timeout message from server if it comes from a different pull request
 | |
| 						continue
 | |
| 					}
 | |
| 					err = nil
 | |
| 				}
 | |
| 				break
 | |
| 			}
 | |
| 			if usrMsg {
 | |
| 				result.msgs <- msg
 | |
| 				requestMsgs++
 | |
| 			}
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			hbLock.Lock()
 | |
| 			if hbErr != nil {
 | |
| 				result.err = hbErr
 | |
| 			} else {
 | |
| 				result.err = o.checkCtxErr(err)
 | |
| 			}
 | |
| 			hbLock.Unlock()
 | |
| 		}
 | |
| 		close(result.msgs)
 | |
| 		result.done <- struct{}{}
 | |
| 	}()
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
 | |
| func (o *pullOpts) checkCtxErr(err error) error {
 | |
| 	if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) {
 | |
| 		return ErrTimeout
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
 | |
| 	defer cancel()
 | |
| 	return js.getConsumerInfoContext(ctx, stream, consumer)
 | |
| }
 | |
| 
 | |
| func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
 | |
| 	ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
 | |
| 	resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
 | |
| 	if err != nil {
 | |
| 		if errors.Is(err, ErrNoResponders) {
 | |
| 			err = ErrJetStreamNotEnabled
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var info consumerResponse
 | |
| 	if err := json.Unmarshal(resp.Data, &info); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if info.Error != nil {
 | |
| 		if errors.Is(info.Error, ErrConsumerNotFound) {
 | |
| 			return nil, ErrConsumerNotFound
 | |
| 		}
 | |
| 		if errors.Is(info.Error, ErrStreamNotFound) {
 | |
| 			return nil, ErrStreamNotFound
 | |
| 		}
 | |
| 		return nil, info.Error
 | |
| 	}
 | |
| 	if info.Error == nil && info.ConsumerInfo == nil {
 | |
| 		return nil, ErrConsumerNotFound
 | |
| 	}
 | |
| 	return info.ConsumerInfo, nil
 | |
| }
 | |
| 
 | |
| // a RequestWithContext with tracing via TraceCB
 | |
| func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
 | |
| 	if js.opts.shouldTrace {
 | |
| 		ctrace := js.opts.ctrace
 | |
| 		if ctrace.RequestSent != nil {
 | |
| 			ctrace.RequestSent(subj, data)
 | |
| 		}
 | |
| 	}
 | |
| 	resp, err := js.nc.RequestWithContext(ctx, subj, data)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if js.opts.shouldTrace {
 | |
| 		ctrace := js.opts.ctrace
 | |
| 		if ctrace.RequestSent != nil {
 | |
| 			ctrace.ResponseReceived(subj, resp.Data, resp.Header)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| func (m *Msg) checkReply() error {
 | |
| 	if m == nil || m.Sub == nil {
 | |
| 		return ErrMsgNotBound
 | |
| 	}
 | |
| 	if m.Reply == _EMPTY_ {
 | |
| 		return ErrMsgNoReply
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ackReply handles all acks. Will do the right thing for pull and sync mode.
 | |
| // It ensures that an ack is only sent a single time, regardless of
 | |
| // how many times it is being called to avoid duplicated acks.
 | |
| func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
 | |
| 	var o ackOpts
 | |
| 	for _, opt := range opts {
 | |
| 		if err := opt.configureAck(&o); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := m.checkReply(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	var ackNone bool
 | |
| 	var js *js
 | |
| 
 | |
| 	sub := m.Sub
 | |
| 	sub.mu.Lock()
 | |
| 	nc := sub.conn
 | |
| 	if jsi := sub.jsi; jsi != nil {
 | |
| 		js = jsi.js
 | |
| 		ackNone = jsi.ackNone
 | |
| 	}
 | |
| 	sub.mu.Unlock()
 | |
| 
 | |
| 	// Skip if already acked.
 | |
| 	if atomic.LoadUint32(&m.ackd) == 1 {
 | |
| 		return ErrMsgAlreadyAckd
 | |
| 	}
 | |
| 	if ackNone {
 | |
| 		return ErrCantAckIfConsumerAckNone
 | |
| 	}
 | |
| 
 | |
| 	usesCtx := o.ctx != nil
 | |
| 	usesWait := o.ttl > 0
 | |
| 
 | |
| 	// Only allow either AckWait or Context option to set the timeout.
 | |
| 	if usesWait && usesCtx {
 | |
| 		return ErrContextAndTimeout
 | |
| 	}
 | |
| 
 | |
| 	sync = sync || usesCtx || usesWait
 | |
| 	ctx := o.ctx
 | |
| 	wait := defaultRequestWait
 | |
| 	if usesWait {
 | |
| 		wait = o.ttl
 | |
| 	} else if js != nil {
 | |
| 		wait = js.opts.wait
 | |
| 	}
 | |
| 
 | |
| 	var body []byte
 | |
| 	var err error
 | |
| 	// This will be > 0 only when called from NakWithDelay()
 | |
| 	if o.nakDelay > 0 {
 | |
| 		body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
 | |
| 	} else {
 | |
| 		body = ackType
 | |
| 	}
 | |
| 
 | |
| 	if sync {
 | |
| 		if usesCtx {
 | |
| 			_, err = nc.RequestWithContext(ctx, m.Reply, body)
 | |
| 		} else {
 | |
| 			_, err = nc.Request(m.Reply, body, wait)
 | |
| 		}
 | |
| 	} else {
 | |
| 		err = nc.Publish(m.Reply, body)
 | |
| 	}
 | |
| 
 | |
| 	// Mark that the message has been acked unless it is ackProgress
 | |
| 	// which can be sent many times.
 | |
| 	if err == nil && !bytes.Equal(ackType, ackProgress) {
 | |
| 		atomic.StoreUint32(&m.ackd, 1)
 | |
| 	}
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Ack acknowledges a message. This tells the server that the message was
 | |
| // successfully processed and it can move on to the next message.
 | |
| func (m *Msg) Ack(opts ...AckOpt) error {
 | |
| 	return m.ackReply(ackAck, false, opts...)
 | |
| }
 | |
| 
 | |
| // AckSync is the synchronous version of Ack. This indicates successful message
 | |
| // processing.
 | |
| func (m *Msg) AckSync(opts ...AckOpt) error {
 | |
| 	return m.ackReply(ackAck, true, opts...)
 | |
| }
 | |
| 
 | |
| // Nak negatively acknowledges a message. This tells the server to redeliver
 | |
| // the message. You can configure the number of redeliveries by passing
 | |
| // nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
 | |
| func (m *Msg) Nak(opts ...AckOpt) error {
 | |
| 	return m.ackReply(ackNak, false, opts...)
 | |
| }
 | |
| 
 | |
| // Nak negatively acknowledges a message. This tells the server to redeliver
 | |
| // the message after the give `delay` duration. You can configure the number
 | |
| // of redeliveries by passing nats.MaxDeliver when you Subscribe.
 | |
| // The default is infinite redeliveries.
 | |
| func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
 | |
| 	if delay > 0 {
 | |
| 		opts = append(opts, nakDelay(delay))
 | |
| 	}
 | |
| 	return m.ackReply(ackNak, false, opts...)
 | |
| }
 | |
| 
 | |
| // Term tells the server to not redeliver this message, regardless of the value
 | |
| // of nats.MaxDeliver.
 | |
| func (m *Msg) Term(opts ...AckOpt) error {
 | |
| 	return m.ackReply(ackTerm, false, opts...)
 | |
| }
 | |
| 
 | |
| // InProgress tells the server that this message is being worked on. It resets
 | |
| // the redelivery timer on the server.
 | |
| func (m *Msg) InProgress(opts ...AckOpt) error {
 | |
| 	return m.ackReply(ackProgress, false, opts...)
 | |
| }
 | |
| 
 | |
| // MsgMetadata is the JetStream metadata associated with received messages.
 | |
| type MsgMetadata struct {
 | |
| 	Sequence     SequencePair
 | |
| 	NumDelivered uint64
 | |
| 	NumPending   uint64
 | |
| 	Timestamp    time.Time
 | |
| 	Stream       string
 | |
| 	Consumer     string
 | |
| 	Domain       string
 | |
| }
 | |
| 
 | |
| // Metadata retrieves the metadata from a JetStream message. This method will
 | |
| // return an error for non-JetStream Msgs.
 | |
| func (m *Msg) Metadata() (*MsgMetadata, error) {
 | |
| 	if err := m.checkReply(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	tokens, err := parser.GetMetadataFields(m.Reply)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	meta := &MsgMetadata{
 | |
| 		Domain:       tokens[parser.AckDomainTokenPos],
 | |
| 		NumDelivered: parser.ParseNum(tokens[parser.AckNumDeliveredTokenPos]),
 | |
| 		NumPending:   parser.ParseNum(tokens[parser.AckNumPendingTokenPos]),
 | |
| 		Timestamp:    time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))),
 | |
| 		Stream:       tokens[parser.AckStreamTokenPos],
 | |
| 		Consumer:     tokens[parser.AckConsumerTokenPos],
 | |
| 	}
 | |
| 	meta.Sequence.Stream = parser.ParseNum(tokens[parser.AckStreamSeqTokenPos])
 | |
| 	meta.Sequence.Consumer = parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos])
 | |
| 	return meta, nil
 | |
| }
 | |
| 
 | |
| // AckPolicy determines how the consumer should acknowledge delivered messages.
 | |
| type AckPolicy int
 | |
| 
 | |
| const (
 | |
| 	// AckNonePolicy requires no acks for delivered messages.
 | |
| 	AckNonePolicy AckPolicy = iota
 | |
| 
 | |
| 	// AckAllPolicy when acking a sequence number, this implicitly acks all
 | |
| 	// sequences below this one as well.
 | |
| 	AckAllPolicy
 | |
| 
 | |
| 	// AckExplicitPolicy requires ack or nack for all messages.
 | |
| 	AckExplicitPolicy
 | |
| 
 | |
| 	// For configuration mismatch check
 | |
| 	ackPolicyNotSet = 99
 | |
| )
 | |
| 
 | |
| func jsonString(s string) string {
 | |
| 	return "\"" + s + "\""
 | |
| }
 | |
| 
 | |
| func (p *AckPolicy) UnmarshalJSON(data []byte) error {
 | |
| 	switch string(data) {
 | |
| 	case jsonString("none"):
 | |
| 		*p = AckNonePolicy
 | |
| 	case jsonString("all"):
 | |
| 		*p = AckAllPolicy
 | |
| 	case jsonString("explicit"):
 | |
| 		*p = AckExplicitPolicy
 | |
| 	default:
 | |
| 		return fmt.Errorf("nats: can not unmarshal %q", data)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p AckPolicy) MarshalJSON() ([]byte, error) {
 | |
| 	switch p {
 | |
| 	case AckNonePolicy:
 | |
| 		return json.Marshal("none")
 | |
| 	case AckAllPolicy:
 | |
| 		return json.Marshal("all")
 | |
| 	case AckExplicitPolicy:
 | |
| 		return json.Marshal("explicit")
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("nats: unknown acknowledgement policy %v", p)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p AckPolicy) String() string {
 | |
| 	switch p {
 | |
| 	case AckNonePolicy:
 | |
| 		return "AckNone"
 | |
| 	case AckAllPolicy:
 | |
| 		return "AckAll"
 | |
| 	case AckExplicitPolicy:
 | |
| 		return "AckExplicit"
 | |
| 	case ackPolicyNotSet:
 | |
| 		return "Not Initialized"
 | |
| 	default:
 | |
| 		return "Unknown AckPolicy"
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
 | |
| type ReplayPolicy int
 | |
| 
 | |
| const (
 | |
| 	// ReplayInstantPolicy will replay messages as fast as possible.
 | |
| 	ReplayInstantPolicy ReplayPolicy = iota
 | |
| 
 | |
| 	// ReplayOriginalPolicy will maintain the same timing as the messages were received.
 | |
| 	ReplayOriginalPolicy
 | |
| 
 | |
| 	// For configuration mismatch check
 | |
| 	replayPolicyNotSet = 99
 | |
| )
 | |
| 
 | |
| func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
 | |
| 	switch string(data) {
 | |
| 	case jsonString("instant"):
 | |
| 		*p = ReplayInstantPolicy
 | |
| 	case jsonString("original"):
 | |
| 		*p = ReplayOriginalPolicy
 | |
| 	default:
 | |
| 		return fmt.Errorf("nats: can not unmarshal %q", data)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
 | |
| 	switch p {
 | |
| 	case ReplayOriginalPolicy:
 | |
| 		return json.Marshal("original")
 | |
| 	case ReplayInstantPolicy:
 | |
| 		return json.Marshal("instant")
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("nats: unknown replay policy %v", p)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	ackAck      = []byte("+ACK")
 | |
| 	ackNak      = []byte("-NAK")
 | |
| 	ackProgress = []byte("+WPI")
 | |
| 	ackTerm     = []byte("+TERM")
 | |
| )
 | |
| 
 | |
| // DeliverPolicy determines how the consumer should select the first message to deliver.
 | |
| type DeliverPolicy int
 | |
| 
 | |
| const (
 | |
| 	// DeliverAllPolicy starts delivering messages from the very beginning of a
 | |
| 	// stream. This is the default.
 | |
| 	DeliverAllPolicy DeliverPolicy = iota
 | |
| 
 | |
| 	// DeliverLastPolicy will start the consumer with the last sequence
 | |
| 	// received.
 | |
| 	DeliverLastPolicy
 | |
| 
 | |
| 	// DeliverNewPolicy will only deliver new messages that are sent after the
 | |
| 	// consumer is created.
 | |
| 	DeliverNewPolicy
 | |
| 
 | |
| 	// DeliverByStartSequencePolicy will deliver messages starting from a given
 | |
| 	// sequence.
 | |
| 	DeliverByStartSequencePolicy
 | |
| 
 | |
| 	// DeliverByStartTimePolicy will deliver messages starting from a given
 | |
| 	// time.
 | |
| 	DeliverByStartTimePolicy
 | |
| 
 | |
| 	// DeliverLastPerSubjectPolicy will start the consumer with the last message
 | |
| 	// for all subjects received.
 | |
| 	DeliverLastPerSubjectPolicy
 | |
| 
 | |
| 	// For configuration mismatch check
 | |
| 	deliverPolicyNotSet = 99
 | |
| )
 | |
| 
 | |
| func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
 | |
| 	switch string(data) {
 | |
| 	case jsonString("all"), jsonString("undefined"):
 | |
| 		*p = DeliverAllPolicy
 | |
| 	case jsonString("last"):
 | |
| 		*p = DeliverLastPolicy
 | |
| 	case jsonString("new"):
 | |
| 		*p = DeliverNewPolicy
 | |
| 	case jsonString("by_start_sequence"):
 | |
| 		*p = DeliverByStartSequencePolicy
 | |
| 	case jsonString("by_start_time"):
 | |
| 		*p = DeliverByStartTimePolicy
 | |
| 	case jsonString("last_per_subject"):
 | |
| 		*p = DeliverLastPerSubjectPolicy
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
 | |
| 	switch p {
 | |
| 	case DeliverAllPolicy:
 | |
| 		return json.Marshal("all")
 | |
| 	case DeliverLastPolicy:
 | |
| 		return json.Marshal("last")
 | |
| 	case DeliverNewPolicy:
 | |
| 		return json.Marshal("new")
 | |
| 	case DeliverByStartSequencePolicy:
 | |
| 		return json.Marshal("by_start_sequence")
 | |
| 	case DeliverByStartTimePolicy:
 | |
| 		return json.Marshal("by_start_time")
 | |
| 	case DeliverLastPerSubjectPolicy:
 | |
| 		return json.Marshal("last_per_subject")
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RetentionPolicy determines how messages in a set are retained.
 | |
| type RetentionPolicy int
 | |
| 
 | |
| const (
 | |
| 	// LimitsPolicy (default) means that messages are retained until any given limit is reached.
 | |
| 	// This could be one of MaxMsgs, MaxBytes, or MaxAge.
 | |
| 	LimitsPolicy RetentionPolicy = iota
 | |
| 	// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
 | |
| 	InterestPolicy
 | |
| 	// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
 | |
| 	WorkQueuePolicy
 | |
| )
 | |
| 
 | |
| // DiscardPolicy determines how to proceed when limits of messages or bytes are
 | |
| // reached.
 | |
| type DiscardPolicy int
 | |
| 
 | |
| const (
 | |
| 	// DiscardOld will remove older messages to return to the limits. This is
 | |
| 	// the default.
 | |
| 	DiscardOld DiscardPolicy = iota
 | |
| 	//DiscardNew will fail to store new messages.
 | |
| 	DiscardNew
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	limitsPolicyString    = "limits"
 | |
| 	interestPolicyString  = "interest"
 | |
| 	workQueuePolicyString = "workqueue"
 | |
| )
 | |
| 
 | |
| func (rp RetentionPolicy) String() string {
 | |
| 	switch rp {
 | |
| 	case LimitsPolicy:
 | |
| 		return "Limits"
 | |
| 	case InterestPolicy:
 | |
| 		return "Interest"
 | |
| 	case WorkQueuePolicy:
 | |
| 		return "WorkQueue"
 | |
| 	default:
 | |
| 		return "Unknown Retention Policy"
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
 | |
| 	switch rp {
 | |
| 	case LimitsPolicy:
 | |
| 		return json.Marshal(limitsPolicyString)
 | |
| 	case InterestPolicy:
 | |
| 		return json.Marshal(interestPolicyString)
 | |
| 	case WorkQueuePolicy:
 | |
| 		return json.Marshal(workQueuePolicyString)
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("nats: can not marshal %v", rp)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
 | |
| 	switch string(data) {
 | |
| 	case jsonString(limitsPolicyString):
 | |
| 		*rp = LimitsPolicy
 | |
| 	case jsonString(interestPolicyString):
 | |
| 		*rp = InterestPolicy
 | |
| 	case jsonString(workQueuePolicyString):
 | |
| 		*rp = WorkQueuePolicy
 | |
| 	default:
 | |
| 		return fmt.Errorf("nats: can not unmarshal %q", data)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (dp DiscardPolicy) String() string {
 | |
| 	switch dp {
 | |
| 	case DiscardOld:
 | |
| 		return "DiscardOld"
 | |
| 	case DiscardNew:
 | |
| 		return "DiscardNew"
 | |
| 	default:
 | |
| 		return "Unknown Discard Policy"
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
 | |
| 	switch dp {
 | |
| 	case DiscardOld:
 | |
| 		return json.Marshal("old")
 | |
| 	case DiscardNew:
 | |
| 		return json.Marshal("new")
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("nats: can not marshal %v", dp)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
 | |
| 	switch strings.ToLower(string(data)) {
 | |
| 	case jsonString("old"):
 | |
| 		*dp = DiscardOld
 | |
| 	case jsonString("new"):
 | |
| 		*dp = DiscardNew
 | |
| 	default:
 | |
| 		return fmt.Errorf("nats: can not unmarshal %q", data)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // StorageType determines how messages are stored for retention.
 | |
| type StorageType int
 | |
| 
 | |
| const (
 | |
| 	// FileStorage specifies on disk storage. It's the default.
 | |
| 	FileStorage StorageType = iota
 | |
| 	// MemoryStorage specifies in memory only.
 | |
| 	MemoryStorage
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	memoryStorageString = "memory"
 | |
| 	fileStorageString   = "file"
 | |
| )
 | |
| 
 | |
| func (st StorageType) String() string {
 | |
| 	switch st {
 | |
| 	case MemoryStorage:
 | |
| 		return "Memory"
 | |
| 	case FileStorage:
 | |
| 		return "File"
 | |
| 	default:
 | |
| 		return "Unknown Storage Type"
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (st StorageType) MarshalJSON() ([]byte, error) {
 | |
| 	switch st {
 | |
| 	case MemoryStorage:
 | |
| 		return json.Marshal(memoryStorageString)
 | |
| 	case FileStorage:
 | |
| 		return json.Marshal(fileStorageString)
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("nats: can not marshal %v", st)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (st *StorageType) UnmarshalJSON(data []byte) error {
 | |
| 	switch string(data) {
 | |
| 	case jsonString(memoryStorageString):
 | |
| 		*st = MemoryStorage
 | |
| 	case jsonString(fileStorageString):
 | |
| 		*st = FileStorage
 | |
| 	default:
 | |
| 		return fmt.Errorf("nats: can not unmarshal %q", data)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type StoreCompression uint8
 | |
| 
 | |
| const (
 | |
| 	NoCompression StoreCompression = iota
 | |
| 	S2Compression
 | |
| )
 | |
| 
 | |
| func (alg StoreCompression) String() string {
 | |
| 	switch alg {
 | |
| 	case NoCompression:
 | |
| 		return "None"
 | |
| 	case S2Compression:
 | |
| 		return "S2"
 | |
| 	default:
 | |
| 		return "Unknown StoreCompression"
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (alg StoreCompression) MarshalJSON() ([]byte, error) {
 | |
| 	var str string
 | |
| 	switch alg {
 | |
| 	case S2Compression:
 | |
| 		str = "s2"
 | |
| 	case NoCompression:
 | |
| 		str = "none"
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("unknown compression algorithm")
 | |
| 	}
 | |
| 	return json.Marshal(str)
 | |
| }
 | |
| 
 | |
| func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
 | |
| 	var str string
 | |
| 	if err := json.Unmarshal(b, &str); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	switch str {
 | |
| 	case "s2":
 | |
| 		*alg = S2Compression
 | |
| 	case "none":
 | |
| 		*alg = NoCompression
 | |
| 	default:
 | |
| 		return fmt.Errorf("unknown compression algorithm")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Length of our hash used for named consumers.
 | |
| const nameHashLen = 8
 | |
| 
 | |
| // Computes a hash for the given `name`.
 | |
| func getHash(name string) string {
 | |
| 	sha := sha256.New()
 | |
| 	sha.Write([]byte(name))
 | |
| 	b := sha.Sum(nil)
 | |
| 	for i := 0; i < nameHashLen; i++ {
 | |
| 		b[i] = rdigits[int(b[i]%base)]
 | |
| 	}
 | |
| 	return string(b[:nameHashLen])
 | |
| }
 |