diff --git a/go.mod b/go.mod index 7d0784ac..b0de92b0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/klauspost/compress v1.11.12 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.3 - github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 + github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e diff --git a/go.sum b/go.sum index cf931aad..e77ed911 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca h1:9MdvV5kneekw github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 h1:9WQzXoYc37xBQ9YoQSSc1aoMJCvzX5OmirlivU0GEFU= github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 h1:aJYmbbVrq6rsFGAvQnAvoChjkjUOJGqVBdQ47vbEWD4= +github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml index 89c5c11f..bec2429b 100644 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ b/vendor/github.com/nats-io/nats.go/.travis.yml @@ -15,5 +15,5 @@ before_script: - find . -type f -name "*.go" | xargs misspell -error -locale US - staticcheck ./... script: -- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast; fi +- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off +- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi diff --git a/vendor/github.com/nats-io/nats.go/context.go b/vendor/github.com/nats-io/nats.go/context.go index 53e5ebb0..aa8c00eb 100644 --- a/vendor/github.com/nats-io/nats.go/context.go +++ b/vendor/github.com/nats-io/nats.go/context.go @@ -122,12 +122,7 @@ func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, } s.mu.Lock() - err := s.validateNextMsgState() - // Unless this is from an internal call, reject use of this API. - // Users should use Fetch() instead. - if err == nil && !pullSubInternal && s.jsi != nil && s.jsi.pull { - err = ErrTypeSubscription - } + err := s.validateNextMsgState(pullSubInternal) if err != nil { s.mu.Unlock() return nil, err diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index a0a0b93b..3904ad4e 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.3.4 + github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index 5997e883..32fa570a 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -17,9 +17,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.3.4 h1:WcNa6HDFX8gjZPHb8CJ9wxRHEjJSlhWUb/MKb6/mlUY= -github.com/nats-io/nats-server/v2 v2.3.4/go.mod h1:3mtbaN5GkCo/Z5T3nNj0I0/W1fPkKzLiDC6jjWJKp98= -github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 h1:6/bt9zMGA1D/i3ROeq8GjF8Tig5BVFh4V3gI+qpoWIs= +github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178/go.mod h1:7mTh0KSxKc63xAVop97cFCIGRkWCv6HoX9lMXRSNOhU= +github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 4309740f..92b6e544 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -96,6 +96,12 @@ const ( hbcThresh = 2 ) +// Types of control messages, so far heartbeat and flow control +const ( + jsCtrlHB = 1 + jsCtrlFC = 2 +) + // JetStream allows persistent messaging through JetStream. type JetStream interface { // Publish publishes a message to JetStream. @@ -267,6 +273,7 @@ type PubAck struct { Stream string `json:"stream"` Sequence uint64 `json:"seq"` Duplicate bool `json:"duplicate,omitempty"` + Domain string `json:"domain,omitempty"` } // Headers for published messages. @@ -766,6 +773,7 @@ func Context(ctx context.Context) ContextOpt { type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` DeliverSubject string `json:"deliver_subject,omitempty"` + DeliverGroup string `json:"deliver_group,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` @@ -795,12 +803,14 @@ type ConsumerInfo struct { NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` + PushBound bool `json:"push_bound,omitempty"` } // SequencePair includes the consumer and stream sequence info from a JetStream consumer. type SequencePair struct { - Consumer uint64 `json:"consumer_seq"` - Stream uint64 `json:"stream_seq"` + Consumer uint64 `json:"consumer_seq"` + Stream uint64 `json:"stream_seq"` + Last *time.Time `json:"last_active,omitempty"` } // nextRequest is for getting next messages for pull based consumers from JetStream. @@ -812,7 +822,6 @@ type nextRequest struct { // jsSub includes JetStream subscription info. type jsSub struct { - mu sync.RWMutex js *js // For pull subscribers, this is the next message subject to send requests to. @@ -823,8 +832,7 @@ type jsSub struct { stream string deliver string pull bool - durable bool - attached bool + dc bool // Delete JS consumer // Ordered consumers ordered bool @@ -835,29 +843,24 @@ type jsSub struct { // Heartbeats and Flow Control handling from push consumers. hbc *time.Timer hbi time.Duration - hbs bool active bool - fc bool cmeta string - fcs map[uint64]string + fcr string + fcd uint64 } -func (jsi *jsSub) unsubscribe(drainMode bool) error { - jsi.mu.Lock() - durable, attached := jsi.durable, jsi.attached - stream, consumer := jsi.stream, jsi.consumer - js := jsi.js - if jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil - } - jsi.mu.Unlock() - - if drainMode && (durable || attached) { - // Skip deleting consumer for durables/attached - // consumers when using drain mode. +// Deletes the JS Consumer. +// No connection nor subscription lock must be held on entry. +func (sub *Subscription) deleteConsumer() error { + sub.mu.Lock() + jsi := sub.jsi + if jsi == nil { + sub.mu.Unlock() return nil } + stream, consumer := jsi.stream, jsi.consumer + js := jsi.js + sub.mu.Unlock() return js.DeleteConsumer(stream, consumer) } @@ -875,6 +878,27 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { } // Subscribe will create a subscription to the appropriate stream and consumer. +// +// The stream and consumer names can be provided with the nats.Bind() option. +// For creating an ephemeral (where the consumer name is picked by the server), +// you can provide the stream name with nats.BindStream(). +// If no stream name is specified, the library will attempt to figure out which +// stream the subscription is for. See important notes below for more details. +// +// IMPORTANT NOTES: +// * If Bind() and Durable() options are not specified, the library will +// send a request to the server to create an ephemeral JetStream consumer, +// which will be deleted after an Unsubscribe() or Drain(), or automatically +// by the server after a short period of time after the NATS subscription is +// gone. +// * If Durable() only is specified, the library will attempt to lookup a JetStream +// consumer with this name and if found, will bind to it and not attempt to +// delete it. However, if not found, the library will send a request to create +// such durable JetStream consumer, but will still attempt to delete it after +// an Unsubscribe() or Drain(). +// * If Bind() option is provided, the library will attempt to lookup the +// consumer with the given name, and if the lookup fails, then the Subscribe() +// call will return an error. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -883,12 +907,15 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti } // SubscribeSync will create a sync subscription to the appropriate stream and consumer. +// See important note in Subscribe() func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts) } // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -897,17 +924,22 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) } // QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, queue, nil, mch, true, false, opts) } // ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// See important note in Subscribe() func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts) } // ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// See important note in Subscribe() func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, queue, nil, ch, false, false, opts) } @@ -918,6 +950,55 @@ func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) } +func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (string, error) { + ccfg := &info.Config + + // Make sure this new subject matches or is a subset. + if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { + return _EMPTY_, ErrSubjectMismatch + } + + // Prevent binding a subscription against incompatible consumer types. + if isPullMode && ccfg.DeliverSubject != _EMPTY_ { + return _EMPTY_, ErrPullSubscribeToPushConsumer + } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ { + return _EMPTY_, ErrPullSubscribeRequired + } + + // If pull mode, nothing else to check here. + if isPullMode { + return _EMPTY_, nil + } + + // At this point, we know the user wants push mode, and the JS consumer is + // really push mode. + + dg := info.Config.DeliverGroup + if dg == _EMPTY_ { + // Prevent an user from attempting to create a queue subscription on + // a JS consumer that was not created with a deliver group. + if queue != _EMPTY_ { + return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group") + } else if info.PushBound { + // Need to reject a non queue subscription to a non queue consumer + // if the consumer is already bound. + return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription") + } + } else { + // If the JS consumer has a deliver group, we need to fail a non queue + // subscription attempt: + if queue == _EMPTY_ { + return _EMPTY_, fmt.Errorf("cannot create a subscription for a consumer with a deliver group %q", dg) + } else if queue != dg { + // Here the user's queue group name does not match the one associated + // with the JS consumer. + return _EMPTY_, fmt.Errorf("cannot create a queue subscription %q for a consumer with a deliver group %q", + queue, dg) + } + } + return ccfg.DeliverSubject, nil +} + func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} @@ -929,20 +1010,50 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } } - badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + // If no stream name is specified, or if option SubjectIsDelivery is + // specified, the subject cannot be empty. + if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) { + return nil, fmt.Errorf("nats: subject required") + } + + // Note that these may change based on the consumer info response we may get. hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl - if isPullMode && badPullAck { - return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + + // Some checks for pull subscribers + if isPullMode { + // Check for bad ack policy + if o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy { + return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + } + // No deliver subject should be provided + if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery { + return nil, ErrPullSubscribeToPushConsumer + } + } + + // Some check/setting specific to queue subs + if queue != _EMPTY_ { + // Queue subscriber cannot have HB or FC (since messages will be randomly dispatched + // to members). We may in the future have a separate NATS subscription that all members + // would subscribe to and server would send on. + if o.cfg.Heartbeat > 0 || o.cfg.FlowControl { + // Not making this a public ErrXXX in case we allow in the future. + return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control") + } + + // If this is a queue subscription and no consumer nor durable name was specified, + // then we will use the queue name as a durable name. + if queue != _EMPTY_ && o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ { + o.cfg.Durable = queue + } } var ( err error shouldCreate bool - ccfg *ConsumerConfig info *ConsumerInfo deliver string - attached bool stream = o.stream consumer = o.consumer isDurable = o.cfg.Durable != _EMPTY_ @@ -951,6 +1062,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, lookupErr bool nc = js.nc nms string + hbi time.Duration + ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers. ) // Do some quick checks here for ordered consumers. We do these here instead of spread out @@ -990,126 +1103,106 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } hasFC, hasHeartbeats = true, true o.mack = true // To avoid auto-ack wrapping call below. + hbi = o.cfg.Heartbeat } - // In case a consumer has not been set explicitly, then the - // durable name will be used as the consumer name. - if consumer == _EMPTY_ { - consumer = o.cfg.Durable - } - - // Find the stream mapped to the subject if not bound to a stream already. - if o.stream == _EMPTY_ { - stream, err = js.lookupStreamBySubject(subj) - if err != nil { - return nil, err - } + // With this option, we go directly create the NATS subscription + // and skip all lookup/create. + if o.subjIsDelivery { + deliver = subj } else { - stream = o.stream - } - - // With an explicit durable name, we can lookup the consumer first - // to which it should be attaching to. - if consumer != _EMPTY_ { - info, err = js.ConsumerInfo(stream, consumer) - notFoundErr = errors.Is(err, ErrConsumerNotFound) - lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded - } - - switch { - case info != nil: - // Attach using the found consumer config. - ccfg = &info.Config - attached = true - - // Make sure this new subject matches or is a subset. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch + // In case a consumer has not been set explicitly, then the + // durable name will be used as the consumer name. + if consumer == _EMPTY_ { + consumer = o.cfg.Durable } - // Prevent binding a subscription against incompatible consumer types. - if isPullMode && ccfg.DeliverSubject != _EMPTY_ { - return nil, ErrPullSubscribeToPushConsumer - } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ { - return nil, ErrPullSubscribeRequired - } - if ccfg.DeliverSubject != _EMPTY_ { - deliver = ccfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() - } - case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): - // If the consumer is being bound and we got an error on pull subscribe then allow the error. - if !(isPullMode && lookupErr && consumerBound) { - return nil, err - } - default: - // Attempt to create consumer if not found nor using Bind. - shouldCreate = true - if o.cfg.DeliverSubject != _EMPTY_ { - deliver = o.cfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() - } - } - - var sub *Subscription - - // Check if we are manual ack. - if cb != nil && !o.mack { - ocb := cb - cb = func(m *Msg) { ocb(m); m.Ack() } - } - - // In case we need to hold onto it for ordered consumers. - var ccreq *createConsumerRequest - - // If we are creating or updating let's update cfg. - if shouldCreate { - if !isPullMode { - cfg.DeliverSubject = deliver - } - // Do filtering always, server will clear as needed. - cfg.FilterSubject = subj - - // If not set default to ack explicit. - if cfg.AckPolicy == ackPolicyNotSet { - cfg.AckPolicy = AckExplicitPolicy - } - // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max. - // TODO(dlc) - We should be able to update this if client updates PendingLimits. - if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { - if !isPullMode && cb != nil && hasFC { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 - } else if ch != nil { - cfg.MaxAckPending = cap(ch) - } else { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit + // Find the stream mapped to the subject if not bound to a stream already. + if o.stream == _EMPTY_ { + stream, err = js.lookupStreamBySubject(subj) + if err != nil { + return nil, err } + } else { + stream = o.stream } - // Create request here. - ccreq = &createConsumerRequest{ - Stream: stream, - Config: &cfg, - } - } - if isPullMode { - nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) - deliver = nc.newInbox() + // With an explicit durable name, we can lookup the consumer first + // to which it should be attaching to. + if consumer != _EMPTY_ { + info, err = js.ConsumerInfo(stream, consumer) + notFoundErr = errors.Is(err, ErrConsumerNotFound) + lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded + } + + switch { + case info != nil: + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err + } + icfg := &info.Config + hasFC, hbi = icfg.FlowControl, icfg.Heartbeat + hasHeartbeats = hbi > 0 + case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): + // If the consumer is being bound and we got an error on pull subscribe then allow the error. + if !(isPullMode && lookupErr && consumerBound) { + return nil, err + } + default: + // Attempt to create consumer if not found nor using Bind. + shouldCreate = true + if o.cfg.DeliverSubject != _EMPTY_ { + deliver = o.cfg.DeliverSubject + } else if !isPullMode { + deliver = nc.newInbox() + cfg.DeliverSubject = deliver + } + + // Do filtering always, server will clear as needed. + cfg.FilterSubject = subj + + // Pass the queue to the consumer config + if queue != _EMPTY_ { + cfg.DeliverGroup = queue + } + + // If not set default to ack explicit. + if cfg.AckPolicy == ackPolicyNotSet { + cfg.AckPolicy = AckExplicitPolicy + } + // If we have acks at all and the MaxAckPending is not set go ahead + // and set to the internal max. + // TODO(dlc) - We should be able to update this if client updates PendingLimits. + if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { + if !isPullMode && cb != nil && hasFC { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 + } else if ch != nil { + cfg.MaxAckPending = cap(ch) + } else { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit + } + } + // Create request here. + ccreq = &createConsumerRequest{ + Stream: stream, + Config: &cfg, + } + hbi = cfg.Heartbeat + } + + if isPullMode { + nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + deliver = nc.newInbox() + } } jsi := &jsSub{ js: js, stream: stream, consumer: consumer, - durable: isDurable, - attached: attached, deliver: deliver, - hbs: hasHeartbeats, - hbi: o.cfg.Heartbeat, - fc: hasFC, + hbi: hbi, ordered: o.ordered, ccreq: ccreq, dseq: 1, @@ -1118,7 +1211,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, psubj: subj, } - sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) + // Check if we are manual ack. + if cb != nil && !o.mack { + ocb := cb + cb = func(m *Msg) { ocb(m); m.Ack() } + } + sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } @@ -1176,9 +1274,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if !isPullMode { cleanUpSub() } - // Multiple subscribers could compete in creating the first consumer - // that will be shared using the same durable name. If this happens, then - // do a lookup of the consumer info and resubscribe using the latest info. if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) { @@ -1187,23 +1282,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if err != nil { return nil, err } - ccfg = &info.Config - - // Validate that the original subject does still match. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err } - - // Update attached status. - jsi.attached = true - - // Use the deliver subject from latest consumer config to attach. - if info.Config.DeliverSubject != _EMPTY_ { + if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. if ch != nil { ch = make(chan *Msg, cap(ch)) } - jsi.deliver = info.Config.DeliverSubject + jsi.deliver = deliver // Recreate the subscription here. sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { @@ -1216,10 +1304,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) } - } else if consumer == _EMPTY_ { - // Update our consumer name here which is filled in when we create the consumer. + } else { + // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain() sub.mu.Lock() - sub.jsi.consumer = info.Name + sub.jsi.dc = true + // If this is an ephemeral, we did not have a consumer name, we get it from the info + // after the AddConsumer returns. + if consumer == _EMPTY_ { + sub.jsi.consumer = info.Name + } sub.mu.Unlock() } } @@ -1256,23 +1349,36 @@ func (ecs *ErrConsumerSequenceMismatch) Error() string { ) } -// isControlMessage will return true if this is an empty control status message. -func isControlMessage(msg *Msg) bool { - return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg +// isJSControlMessage will return true if this is an empty control status message +// and indicate what type of control message it is, say jsCtrlHB or jsCtrlFC +func isJSControlMessage(msg *Msg) (bool, int) { + if len(msg.Data) > 0 || msg.Header.Get(statusHdr) != controlMsg { + return false, 0 + } + val := msg.Header.Get(descrHdr) + if strings.HasPrefix(val, "Idle") { + return true, jsCtrlHB + } + if strings.HasPrefix(val, "Flow") { + return true, jsCtrlFC + } + return true, 0 } -func (jsi *jsSub) trackSequences(reply string) { - jsi.mu.Lock() - jsi.cmeta = reply - jsi.mu.Unlock() +// Keeps track of the incoming message's reply subject so that the consumer's +// state (deliver sequence, etc..) can be checked against heartbeats. +// Runs under the subscription lock +func (sub *Subscription) trackSequences(reply string) { + sub.jsi.cmeta = reply } // Check to make sure messages are arriving in order. // Returns true if the sub had to be replaced. Will cause upper layers to return. +// The caller has verified that sub.jsi != nil and that this is not a control message. // Lock should be held. func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { // Ignore msgs with no reply like HBs and flowcontrol, they are handled elsewhere. - if m.Reply == _EMPTY_ || sub.jsi == nil || isControlMessage(m) { + if m.Reply == _EMPTY_ { return false } @@ -1281,19 +1387,15 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { if err != nil { return false } - sseq, dseq := uint64(parseNum(tokens[5])), uint64(parseNum(tokens[6])) + sseq, dseq := uint64(parseNum(tokens[ackStreamSeqTokenPos])), uint64(parseNum(tokens[ackConsumerSeqTokenPos])) jsi := sub.jsi - jsi.mu.Lock() if dseq != jsi.dseq { - rseq := jsi.sseq + 1 - jsi.mu.Unlock() - sub.resetOrderedConsumer(rseq) + sub.resetOrderedConsumer(jsi.sseq + 1) return true } // Update our tracking here. jsi.dseq, jsi.sseq = dseq+1, sseq - jsi.mu.Unlock() return false } @@ -1322,8 +1424,7 @@ func (sub *Subscription) applyNewSID() (osid int64) { // Lock should be held. func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc := sub.conn - closed := sub.closed - if sub.jsi == nil || nc == nil || closed { + if sub.jsi == nil || nc == nil || sub.closed { return } @@ -1334,8 +1435,8 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { newDeliver := nc.newInbox() sub.Subject = newDeliver - // Snapshot jsi under sub lock here. - jsi := sub.jsi + // Snapshot the new sid under sub lock. + nsid := sub.sid // We are still in the low level readloop for the connection so we need // to spin a go routine to try to create the new consumer. @@ -1345,7 +1446,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { // This is done here in this go routine to prevent lock inversion. nc.mu.Lock() nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_)) - nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, sub.sid)) + nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid)) nc.kickFlusher() nc.mu.Unlock() @@ -1354,11 +1455,12 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { nc.unsubscribe(sub, 0, true) } - jsi.mu.Lock() + sub.mu.Lock() + jsi := sub.jsi // Reset some items in jsi. jsi.dseq = 1 jsi.cmeta = _EMPTY_ - jsi.fcs = nil + jsi.fcr, jsi.fcd = _EMPTY_, 0 jsi.deliver = newDeliver // Reset consumer request for starting policy. cfg := jsi.ccreq.Config @@ -1369,7 +1471,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream) j, err := json.Marshal(jsi.ccreq) js := jsi.js - jsi.mu.Unlock() + sub.mu.Unlock() if err != nil { pushErr(err) @@ -1397,86 +1499,70 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { return } - jsi.mu.Lock() + sub.mu.Lock() jsi.consumer = cinfo.Name - jsi.mu.Unlock() + sub.mu.Unlock() }() } // checkForFlowControlResponse will check to see if we should send a flow control response -// based on the delivered index. -// Lock should be held. -func (sub *Subscription) checkForFlowControlResponse(delivered uint64) { - jsi, nc := sub.jsi, sub.conn - if jsi == nil { - return - } - - jsi.mu.Lock() - defer jsi.mu.Unlock() - - if len(jsi.fcs) == 0 { - return - } - - if reply := jsi.fcs[delivered]; reply != _EMPTY_ { - delete(jsi.fcs, delivered) - nc.Publish(reply, nil) +// based on the subscription current delivered index and the target. +// Runs under subscription lock +func (sub *Subscription) checkForFlowControlResponse() string { + // Caller has verified that there is a sub.jsi and fc + jsi := sub.jsi + if jsi.fcd == sub.delivered { + fcr := jsi.fcr + jsi.fcr, jsi.fcd = _EMPTY_, 0 + return fcr } + return _EMPTY_ } // Record an inbound flow control message. -func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi.mu.Lock() - if jsi.fcs == nil { - jsi.fcs = make(map[uint64]string) - } - jsi.fcs[dfuture] = reply - jsi.mu.Unlock() +// Runs under subscription lock +func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { + jsi := sub.jsi + jsi.fcr, jsi.fcd = reply, dfuture } // Checks for activity from our consumer. // If we do not think we are active send an async error. func (sub *Subscription) activityCheck() { + sub.mu.Lock() jsi := sub.jsi if jsi == nil { + sub.mu.Unlock() return } - jsi.mu.Lock() active := jsi.active jsi.hbc.Reset(jsi.hbi) jsi.active = false - jsi.mu.Unlock() + nc := sub.conn + closed := sub.closed + sub.mu.Unlock() - if !active { - sub.mu.Lock() - nc := sub.conn - closed := sub.closed - sub.mu.Unlock() - - if !closed { - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) - } - nc.mu.Unlock() + if !active && !closed { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) } + nc.mu.Unlock() } } // scheduleHeartbeatCheck sets up the timer check to make sure we are active // or receiving idle heartbeats.. func (sub *Subscription) scheduleHeartbeatCheck() { + sub.mu.Lock() + defer sub.mu.Unlock() + jsi := sub.jsi if jsi == nil { return } - jsi.mu.Lock() - defer jsi.mu.Unlock() - if jsi.hbc == nil { jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck) } else { @@ -1497,10 +1583,10 @@ func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { // checkForSequenceMismatch will make sure we have not missed any messages since last seen. func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) { // Process heartbeat received, get latest control metadata if present. - jsi.mu.Lock() + s.mu.Lock() ctrl, ordered := jsi.cmeta, jsi.ordered jsi.active = true - jsi.mu.Unlock() + s.mu.Unlock() if ctrl == _EMPTY_ { return @@ -1513,7 +1599,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) // Consumer sequence. var ldseq string - dseq := tokens[6] + dseq := tokens[ackConsumerSeqTokenPos] hdr := msg.Header[lastConsumerSeqHdr] if len(hdr) == 1 { ldseq = hdr[0] @@ -1524,7 +1610,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) if ldseq != dseq { // Dispatch async error including details such as // from where the consumer could be restarted. - sseq := parseNum(tokens[5]) + sseq := parseNum(tokens[ackStreamSeqTokenPos]) if ordered { s.mu.Lock() s.resetOrderedConsumer(jsi.sseq + 1) @@ -1585,6 +1671,10 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool + // Means that the subject passed to subscribe call will be used + // for the low level NATS subscription and no stream nor consumer + // lookup/creation will be done. + subjIsDelivery bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1729,6 +1819,13 @@ func RateLimit(n uint64) SubOpt { } // BindStream binds a consumer to a stream explicitly based on a name. +// When a stream name is not specified, the library uses the subscribe +// subject as a way to find the stream name. It is done by making a request +// to the server to get list of stream names that have a fileter for this +// subject. If the returned list contains a single stream, then this +// stream name will be used, otherwise the `ErrNoMatchingStream` is returned. +// To avoid the stream lookup, provide the stream name with this function. +// See also `Bind()`. func BindStream(stream string) SubOpt { return subOptFn(func(opts *subOpts) error { if opts.stream != _EMPTY_ && opts.stream != stream { @@ -1782,6 +1879,35 @@ func IdleHeartbeat(duration time.Duration) SubOpt { }) } +// DeliverSubject specifies the JetStream consumer deliver subject. +// +// This option is used only in situations where the consumer does not exist +// and a creation request is sent to the server. If not provided, an inbox +// will be selected. +// If a consumer exists, then the NATS subscription will be created on +// the JetStream consumer's DeliverSubject, not necessarily this subject. +func DeliverSubject(subject string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverSubject = subject + return nil + }) +} + +// SubjectIsDelivery specifies that the subject parameter in the subscribe +// call shall be used to create the NATS subscription and matches the +// JetStream consumer's deliver subject. +// +// NOTE: This is an "expert" API and should only be used when consumer lookup or +// creation by the library is not possible (for instance cross accounts). +// Since no lookup of the JetStream consumer is done, there is no way for +// the library to check the validity of this JetStream subscription. +func SubjectIsDelivery() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.subjIsDelivery = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -2154,13 +2280,28 @@ type MsgMetadata struct { Timestamp time.Time Stream string Consumer string + Domain string } -func getMetadataFields(subject string) ([]string, error) { - const expectedTokens = 9 - const btsep = '.' +const ( + ackDomainTokenPos = 2 + ackAccHashTokenPos = 3 + ackStreamTokenPos = 4 + ackConsumerTokenPos = 5 + ackNumDeliveredTokenPos = 6 + ackStreamSeqTokenPos = 7 + ackConsumerSeqTokenPos = 8 + ackTimestampSeqTokenPos = 9 + ackNumPendingTokenPos = 10 +) - tsa := [expectedTokens]string{} +func getMetadataFields(subject string) ([]string, error) { + const noDomainNoHashsExpectedAckTokens = 9 + const withDomainNoHashExpectedAckTokens = 10 + const withDomainAndHashExpectedAckTokens = 11 + + const btsep = '.' + tsa := [withDomainAndHashExpectedAckTokens]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(subject); i++ { if subject[i] == btsep { @@ -2169,9 +2310,39 @@ func getMetadataFields(subject string) ([]string, error) { } } tokens = append(tokens, subject[start:]) - if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { + // + // Newer server will include an account hash in the subject, and possibly the domain. + // So the subject could be: + // + // no domain: $JS.ACK...... + // with domain: $JS.ACK....... + // + // So old server number of tokens is 9, newer is 10 or 11. + // + l := len(tokens) + if l < noDomainNoHashsExpectedAckTokens || l > withDomainAndHashExpectedAckTokens { return nil, ErrNotJSMessage } + if tokens[0] != "$JS" || tokens[1] != "ACK" { + return nil, ErrNotJSMessage + } + // To make the rest of the library agnostic of that, we always return the tokens + // as if it is coming from a new server will all possible tokens. If domain or account + // hash are not specified, the tokens at those locations will simply be empty. + if l == noDomainNoHashsExpectedAckTokens || l == withDomainNoHashExpectedAckTokens { + // Extend the array (we know the backend is big enough) + // Compute how many tokens we need to insert. + itc := withDomainAndHashExpectedAckTokens - l + for i := 0; i < itc; i++ { + tokens = append(tokens, _EMPTY_) + } + // Move to the right anything that is after "ACK" token. + copy(tokens[ackDomainTokenPos+itc:], tokens[ackDomainTokenPos:]) + // Set the missing tokens to empty + for i := 0; i < itc; i++ { + tokens[ackDomainTokenPos+i] = _EMPTY_ + } + } return tokens, nil } @@ -2188,14 +2359,15 @@ func (m *Msg) Metadata() (*MsgMetadata, error) { } meta := &MsgMetadata{ - NumDelivered: uint64(parseNum(tokens[4])), - NumPending: uint64(parseNum(tokens[8])), - Timestamp: time.Unix(0, parseNum(tokens[7])), - Stream: tokens[2], - Consumer: tokens[3], + Domain: tokens[ackDomainTokenPos], + NumDelivered: uint64(parseNum(tokens[ackNumDeliveredTokenPos])), + NumPending: uint64(parseNum(tokens[ackNumPendingTokenPos])), + Timestamp: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])), + Stream: tokens[ackStreamTokenPos], + Consumer: tokens[ackConsumerTokenPos], } - meta.Sequence.Stream = uint64(parseNum(tokens[5])) - meta.Sequence.Consumer = uint64(parseNum(tokens[6])) + meta.Sequence.Stream = uint64(parseNum(tokens[ackStreamSeqTokenPos])) + meta.Sequence.Consumer = uint64(parseNum(tokens[ackConsumerSeqTokenPos])) return meta, nil } diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 093a11de..6f9067c3 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -2574,21 +2574,22 @@ func (nc *Conn) waitForMsgs(s *Subscription) { mcb := s.mcb max = s.max closed = s.closed + var fcReply string if !s.closed { s.delivered++ delivered = s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 + fcReply = s.checkForFlowControlResponse() s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) - } } } s.mu.Unlock() + // Respond to flow control if applicable + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if closed { break } @@ -2688,8 +2689,8 @@ func (nc *Conn) processMsg(data []byte) { var h Header var err error var ctrlMsg bool - var hasFC bool - var hasHBs bool + var ctrlType int + var fcReply string if nc.ps.ma.hdr > 0 { hbuf := msgPayload[:nc.ps.ma.hdr] @@ -2728,9 +2729,18 @@ func (nc *Conn) processMsg(data []byte) { // Skip flow control messages in case of using a JetStream context. jsi := sub.jsi if jsi != nil { - ctrlMsg, hasHBs, hasFC = isControlMessage(m), jsi.hbs, jsi.fc + // There has to be a header for it to be a control message. + if h != nil { + ctrlMsg, ctrlType = isJSControlMessage(m) + if ctrlMsg && ctrlType == jsCtrlHB { + // Check if the hearbeat has a "Consumer Stalled" header, if + // so, the value is the FC reply to send a nil message to. + // We will send it at the end of this function. + fcReply = m.Header.Get(consumerStalledHdr) + } + } // Check for ordered consumer here. If checkOrdered returns true that means it detected a gap. - if jsi.ordered && sub.checkOrderedMsgs(m) { + if !ctrlMsg && jsi.ordered && sub.checkOrderedMsgs(m) { sub.mu.Unlock() return } @@ -2777,19 +2787,19 @@ func (nc *Conn) processMsg(data []byte) { sub.pTail = m } } - if jsi != nil && hasHBs { + if jsi != nil { // Store the ACK metadata from the message to // compare later on with the received heartbeat. - jsi.trackSequences(m.Reply) + sub.trackSequences(m.Reply) } - } else if hasFC && m.Reply != _EMPTY_ { + } else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ { // This is a flow control message. // If we have no pending, go ahead and send in place. if sub.pMsgs <= 0 { - nc.Publish(m.Reply, nil) + fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - jsi.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) } } @@ -2797,8 +2807,12 @@ func (nc *Conn) processMsg(data []byte) { sub.sc = false sub.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + // Handle control heartbeat messages. - if ctrlMsg && hasHBs && m.Reply == _EMPTY_ { + if ctrlMsg && ctrlType == jsCtrlHB && m.Reply == _EMPTY_ { nc.checkForSequenceMismatch(m, sub, jsi) } @@ -3161,6 +3175,7 @@ const ( descrHdr = "Description" lastConsumerSeqHdr = "Nats-Last-Consumer" lastStreamSeqHdr = "Nats-Last-Stream" + consumerStalledHdr = "Nats-Consumer-Stalled" noResponders = "503" noMessagesSts = "404" reqTimeoutSts = "408" @@ -3808,6 +3823,12 @@ func (nc *Conn) removeSub(s *Subscription) { } s.mch = nil + // If JS subscription then stop HB timer. + if jsi := s.jsi; jsi != nil && jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + // Mark as invalid s.closed = true if s.pCond != nil { @@ -3857,6 +3878,15 @@ func (s *Subscription) IsValid() bool { // Drain will remove interest but continue callbacks until all messages // have been processed. +// +// For a JetStream subscription, if the library has created the JetStream +// consumer, the library will send a DeleteConsumer request to the server +// when the Drain operation completes. If a failure occurs when deleting +// the JetStream consumer, an error will be reported to the asynchronous +// error callback. +// If you do not wish the JetStream consumer to be automatically deleted, +// ensure that the consumer is not created by the library, which means +// create the consumer with AddConsumer and bind to this consumer. func (s *Subscription) Drain() error { if s == nil { return ErrBadSubscription @@ -3871,6 +3901,15 @@ func (s *Subscription) Drain() error { } // Unsubscribe will remove interest in the given subject. +// +// For a JetStream subscription, if the library has created the JetStream +// consumer, it will send a DeleteConsumer request to the server (if the +// unsubscribe itself was successful). If the delete operation fails, the +// error will be returned. +// If you do not wish the JetStream consumer to be automatically deleted, +// ensure that the consumer is not created by the library, which means +// create the consumer with AddConsumer and bind to this consumer (using +// the nats.Bind() option). func (s *Subscription) Unsubscribe() error { if s == nil { return ErrBadSubscription @@ -3878,6 +3917,7 @@ func (s *Subscription) Unsubscribe() error { s.mu.Lock() conn := s.conn closed := s.closed + dc := s.jsi != nil && s.jsi.dc s.mu.Unlock() if conn == nil || conn.IsClosed() { return ErrConnectionClosed @@ -3888,7 +3928,11 @@ func (s *Subscription) Unsubscribe() error { if conn.IsDraining() { return ErrConnectionDraining } - return conn.unsubscribe(s, 0, false) + err := conn.unsubscribe(s, 0, false) + if err == nil && dc { + err = s.deleteConsumer() + } + return err } // checkDrained will watch for a subscription to be fully drained @@ -3902,6 +3946,12 @@ func (nc *Conn) checkDrained(sub *Subscription) { // is correct and the server will not send additional information. nc.Flush() + sub.mu.Lock() + // For JS subscriptions, check if we are going to delete the + // JS consumer when drain completes. + dc := sub.jsi != nil && sub.jsi.dc + sub.mu.Unlock() + // Once we are here we just wait for Pending to reach 0 or // any other state to exit this go routine. for { @@ -3921,6 +3971,15 @@ func (nc *Conn) checkDrained(sub *Subscription) { nc.mu.Lock() nc.removeSub(sub) nc.mu.Unlock() + if dc { + if err := sub.deleteConsumer(); err != nil { + nc.mu.Lock() + if errCB := nc.Opts.AsyncErrorCB; errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() + } + } return } @@ -3959,18 +4018,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { sub.mu.Unlock() } - // For JetStream consumers, need to clean up ephemeral consumers - // or delete durable ones if called with Unsubscribe. - sub.mu.Lock() - jsi := sub.jsi - sub.mu.Unlock() - if jsi != nil && maxStr == _EMPTY_ { - err := jsi.unsubscribe(drainMode) - if err != nil { - return err - } - } - nc.mu.Lock() // ok here, but defer is expensive defer nc.mu.Unlock() @@ -4014,7 +4061,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { } s.mu.Lock() - err := s.validateNextMsgState() + err := s.validateNextMsgState(false) if err != nil { s.mu.Unlock() return nil, err @@ -4065,7 +4112,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { // validateNextMsgState checks whether the subscription is in a valid // state to call NextMsg and be delivered another message synchronously. // This should be called while holding the lock. -func (s *Subscription) validateNextMsgState() error { +func (s *Subscription) validateNextMsgState(pullSubInternal bool) error { if s.connClosed { return ErrConnectionClosed } @@ -4083,7 +4130,11 @@ func (s *Subscription) validateNextMsgState() error { s.sc = false return ErrSlowConsumer } - + // Unless this is from an internal call, reject use of this API. + // Users should use Fetch() instead. + if !pullSubInternal && s.jsi != nil && s.jsi.pull { + return ErrTypeSubscription + } return nil } @@ -4108,17 +4159,13 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { nc := s.conn max := s.max + var fcReply string // Update some stats. s.delivered++ delivered := s.delivered if s.jsi != nil { - s.jsi.mu.Lock() - needCheck := s.jsi.fc && len(s.jsi.fcs) > 0 + fcReply = s.checkForFlowControlResponse() s.jsi.active = true - s.jsi.mu.Unlock() - if needCheck { - s.checkForFlowControlResponse(delivered) - } } if s.typ == SyncSubscription { @@ -4127,6 +4174,10 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } s.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + if max > 0 { if delivered > max { return ErrMaxMessages @@ -4729,6 +4780,8 @@ func (nc *Conn) drainConnection() { // will be drained and can not publish any additional messages. Upon draining // of the publishers, the connection will be closed. Use the ClosedCB() // option to know when the connection has moved from draining to closed. +// +// See note in Subscription.Drain for JetStream subscriptions. func (nc *Conn) Drain() error { nc.mu.Lock() if nc.isClosed() { diff --git a/vendor/modules.txt b/vendor/modules.txt index cf59f433..30be76e0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.3 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 +# github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin