// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "bytes" "encoding/binary" "encoding/json" "errors" "fmt" "math/rand" "reflect" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nuid" "golang.org/x/time/rate" ) // Headers sent with Request Timeout const ( JSPullRequestPendingMsgs = "Nats-Pending-Messages" JSPullRequestPendingBytes = "Nats-Pending-Bytes" ) // Headers sent when batch size was completed, but there were remaining bytes. const JsPullRequestRemainingBytesT = "NATS/1.0 409 Batch Completed\r\n%s: %d\r\n%s: %d\r\n\r\n" type ConsumerInfo struct { Stream string `json:"stream_name"` Name string `json:"name"` Created time.Time `json:"created"` Config *ConsumerConfig `json:"config,omitempty"` Delivered SequenceInfo `json:"delivered"` AckFloor SequenceInfo `json:"ack_floor"` NumAckPending int `json:"num_ack_pending"` NumRedelivered int `json:"num_redelivered"` NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` // TimeStamp indicates when the info was gathered TimeStamp time.Time `json:"ts"` } type ConsumerConfig struct { // Durable is deprecated. All consumers should have names, picked by clients. Durable string `json:"durable_name,omitempty"` Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` FilterSubjects []string `json:"filter_subjects,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` FlowControl bool `json:"flow_control,omitempty"` HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. MaxRequestBatch int `json:"max_batch,omitempty"` MaxRequestExpires time.Duration `json:"max_expires,omitempty"` MaxRequestMaxBytes int `json:"max_bytes,omitempty"` // Push based consumers. DeliverSubject string `json:"deliver_subject,omitempty"` DeliverGroup string `json:"deliver_group,omitempty"` // Ephemeral inactivity threshold. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` // Generally inherited by parent stream and other markers, now can be configured directly. Replicas int `json:"num_replicas"` // Force memory storage. MemoryStorage bool `json:"mem_storage,omitempty"` // Don't add to general clients. Direct bool `json:"direct,omitempty"` // Metadata is additional metadata for the Consumer. Metadata map[string]string `json:"metadata,omitempty"` } // SequenceInfo has both the consumer and the stream sequence and last activity. type SequenceInfo struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` Last *time.Time `json:"last_active,omitempty"` } type CreateConsumerRequest struct { Stream string `json:"stream_name"` Config ConsumerConfig `json:"config"` Action ConsumerAction `json:"action"` } type ConsumerAction int const ( ActionCreateOrUpdate ConsumerAction = iota ActionUpdate ActionCreate ) const ( actionUpdateString = "update" actionCreateString = "create" actionCreateOrUpdateString = "" ) func (a ConsumerAction) String() string { switch a { case ActionCreateOrUpdate: return actionCreateOrUpdateString case ActionCreate: return actionCreateString case ActionUpdate: return actionUpdateString } return actionCreateOrUpdateString } func (a ConsumerAction) MarshalJSON() ([]byte, error) { switch a { case ActionCreate: return json.Marshal(actionCreateString) case ActionUpdate: return json.Marshal(actionUpdateString) case ActionCreateOrUpdate: return json.Marshal(actionCreateOrUpdateString) default: return nil, fmt.Errorf("can not marshal %v", a) } } func (a *ConsumerAction) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("create"): *a = ActionCreate case jsonString("update"): *a = ActionUpdate case jsonString(""): *a = ActionCreateOrUpdate default: return fmt.Errorf("unknown consumer action: %v", string(data)) } return nil } // ConsumerNakOptions is for optional NAK values, e.g. delay. type ConsumerNakOptions struct { Delay time.Duration `json:"delay"` } // DeliverPolicy determines how the consumer should select the first message to deliver. type DeliverPolicy int const ( // DeliverAll will be the default so can be omitted from the request. DeliverAll DeliverPolicy = iota // DeliverLast will start the consumer with the last sequence received. DeliverLast // DeliverNew will only deliver new messages that are sent after the consumer is created. DeliverNew // DeliverByStartSequence will look for a defined starting sequence to start. DeliverByStartSequence // DeliverByStartTime will select the first messsage with a timestamp >= to StartTime. DeliverByStartTime // DeliverLastPerSubject will start the consumer with the last message for all subjects received. DeliverLastPerSubject ) func (dp DeliverPolicy) String() string { switch dp { case DeliverAll: return "all" case DeliverLast: return "last" case DeliverNew: return "new" case DeliverByStartSequence: return "by_start_sequence" case DeliverByStartTime: return "by_start_time" case DeliverLastPerSubject: return "last_per_subject" default: return "undefined" } } // AckPolicy determines how the consumer should acknowledge delivered messages. type AckPolicy int const ( // AckNone requires no acks for delivered messages. AckNone AckPolicy = iota // AckAll when acking a sequence number, this implicitly acks all sequences below this one as well. AckAll // AckExplicit requires ack or nack for all messages. AckExplicit ) func (a AckPolicy) String() string { switch a { case AckNone: return "none" case AckAll: return "all" default: return "explicit" } } // ReplayPolicy determines how the consumer should replay messages it already has queued in the stream. type ReplayPolicy int const ( // ReplayInstant will replay messages as fast as possible. ReplayInstant ReplayPolicy = iota // ReplayOriginal will maintain the same timing as the messages were received. ReplayOriginal ) func (r ReplayPolicy) String() string { switch r { case ReplayInstant: return "instant" default: return "original" } } // OK const OK = "+OK" // Ack responses. Note that a nil or no payload is same as AckAck var ( // Ack AckAck = []byte("+ACK") // nil or no payload to ack subject also means ACK AckOK = []byte(OK) // deprecated but +OK meant ack as well. // Nack AckNak = []byte("-NAK") // Progress indicator AckProgress = []byte("+WPI") // Ack + Deliver the next message(s). AckNext = []byte("+NXT") // Terminate delivery of the message. AckTerm = []byte("+TERM") ) // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 { // Matches old-school ephemerals only, where the replica count is 0. return 1 } return strCfg.Replicas } return consCfg.Replicas } // Consumer is a jetstream consumer. type consumer struct { // Atomic used to notify that we want to process an ack. // This will be checked in checkPending to abort processing // and let ack be processed in priority. awl int64 mu sync.RWMutex js *jetStream mset *stream acc *Account srv *Server client *client sysc *client sid int name string stream string sseq uint64 // next stream sequence subjf subjectFilters // subject filters and their sequences dseq uint64 // delivered consumer sequence adflr uint64 // ack delivery floor asflr uint64 // ack store floor npc int64 // Num Pending Count npf uint64 // Num Pending Floor Sequence dsubj string qgroup string lss *lastSeqSkipList rlimit *rate.Limiter reqSub *subscription ackSub *subscription ackReplyT string ackSubj string nextMsgSubj string nextMsgReqs *ipQueue[*nextMsgReq] maxp int pblimit int maxpb int pbytes int fcsz int fcid string fcSub *subscription outq *jsOutQ pending map[uint64]*Pending ptmr *time.Timer rdq []uint64 rdqi avl.SequenceSet rdc map[uint64]uint64 maxdc uint64 waiting *waitQueue cfg ConsumerConfig ici *ConsumerInfo store ConsumerStore active bool replay bool dtmr *time.Timer gwdtmr *time.Timer dthresh time.Duration mch chan struct{} qch chan struct{} inch chan bool sfreq int32 ackEventT string nakEventT string deliveryExcEventT string created time.Time ldt time.Time lat time.Time closed bool // Clustered. ca *consumerAssignment node RaftNode infoSub *subscription lqsent time.Time prm map[string]struct{} prOk bool uch chan struct{} retention RetentionPolicy monitorWg sync.WaitGroup inMonitor bool // R>1 proposals pch chan struct{} phead *proposal ptail *proposal // Ack queue ackMsgs *ipQueue[*jsAckMsg] // for stream signaling when multiple filters are set. sigSubs []*subscription } // A single subject filter. type subjectFilter struct { subject string nextSeq uint64 currentSeq uint64 pmsg *jsPubMsg err error hasWildcard bool } type subjectFilters []*subjectFilter // subjects is a helper function used for updating consumers. // It is not used and should not be used in hotpath. func (s subjectFilters) subjects() []string { subjects := make([]string, 0, len(s)) for _, filter := range s { subjects = append(subjects, filter.subject) } return subjects } type proposal struct { data []byte next *proposal } const ( // JsAckWaitDefault is the default AckWait, only applicable on explicit ack policy consumers. JsAckWaitDefault = 30 * time.Second // JsDeleteWaitTimeDefault is the default amount of time we will wait for non-durable // consumers to be in an inactive state before deleting them. JsDeleteWaitTimeDefault = 5 * time.Second // JsFlowControlMaxPending specifies default pending bytes during flow control that can be // outstanding. JsFlowControlMaxPending = 32 * 1024 * 1024 // JsDefaultMaxAckPending is set for consumers with explicit ack that do not set the max ack pending. JsDefaultMaxAckPending = 1000 ) // Helper function to set consumer config defaults from above. func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits) { // Set to default if not specified. if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 { config.MaxWaiting = JSWaitQueueDefaultMax } // Setup proper default for ack wait if we are in explicit ack mode. if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) { config.AckWait = JsAckWaitDefault } // Setup default of -1, meaning no limit for MaxDeliver. if config.MaxDeliver == 0 { config.MaxDeliver = -1 } // If BackOff was specified that will override the AckWait and the MaxDeliver. if len(config.BackOff) > 0 { config.AckWait = config.BackOff[0] } if config.MaxAckPending == 0 { config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending } if config.InactiveThreshold == 0 { config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold } // Set proper default for max ack pending if we are ack explicit and none has been set. if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 { accPending := JsDefaultMaxAckPending if lim.MaxAckPending > 0 && lim.MaxAckPending < accPending { accPending = lim.MaxAckPending } if accLim.MaxAckPending > 0 && accLim.MaxAckPending < accPending { accPending = accLim.MaxAckPending } config.MaxAckPending = accPending } // if applicable set max request batch size if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 { config.MaxRequestBatch = lim.MaxRequestBatch } } // Check the consumer config. If we are recovering don't check filter subjects. func checkConsumerCfg( config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamConfig, _ *Account, accLim *JetStreamAccountLimits, isRecovering bool, ) *ApiError { // Check if replicas is defined but exceeds parent stream. if config.Replicas > 0 && config.Replicas > cfg.Replicas { return NewJSConsumerReplicasExceedsStreamError() } // Check that it is not negative if config.Replicas < 0 { return NewJSReplicasCountCannotBeNegativeError() } // If the stream is interest or workqueue retention make sure the replicas // match that of the stream. This is REQUIRED for now. if cfg.Retention == InterestPolicy || cfg.Retention == WorkQueuePolicy { // Only error here if not recovering. // We handle recovering in a different spot to allow consumer to come up // if previous version allowed it to be created. We do not want it to not come up. if !isRecovering && config.Replicas != 0 && config.Replicas != cfg.Replicas { return NewJSConsumerReplicasShouldMatchStreamError() } } // Check if we have a BackOff defined that MaxDeliver is within range etc. if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo { return NewJSConsumerMaxDeliverBackoffError() } if len(config.Description) > JSMaxDescriptionLen { return NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen) } // For now expect a literal subject if its not empty. Empty means work queue mode (pull mode). if config.DeliverSubject != _EMPTY_ { if !subjectIsLiteral(config.DeliverSubject) { return NewJSConsumerDeliverToWildcardsError() } if !IsValidSubject(config.DeliverSubject) { return NewJSConsumerInvalidDeliverSubjectError() } if deliveryFormsCycle(cfg, config.DeliverSubject) { return NewJSConsumerDeliverCycleError() } if config.MaxWaiting != 0 { return NewJSConsumerPushMaxWaitingError() } if config.MaxAckPending > 0 && config.AckPolicy == AckNone { return NewJSConsumerMaxPendingAckPolicyRequiredError() } if config.Heartbeat > 0 && config.Heartbeat < 100*time.Millisecond { return NewJSConsumerSmallHeartbeatError() } } else { // Pull mode with work queue retention from the stream requires an explicit ack. if config.AckPolicy == AckNone && cfg.Retention == WorkQueuePolicy { return NewJSConsumerPullRequiresAckError() } if config.RateLimit > 0 { return NewJSConsumerPullWithRateLimitError() } if config.MaxWaiting < 0 { return NewJSConsumerMaxWaitingNegativeError() } if config.Heartbeat > 0 { return NewJSConsumerHBRequiresPushError() } if config.FlowControl { return NewJSConsumerFCRequiresPushError() } if config.MaxRequestBatch < 0 { return NewJSConsumerMaxRequestBatchNegativeError() } if config.MaxRequestExpires != 0 && config.MaxRequestExpires < time.Millisecond { return NewJSConsumerMaxRequestExpiresToSmallError() } if srvLim.MaxRequestBatch > 0 && config.MaxRequestBatch > srvLim.MaxRequestBatch { return NewJSConsumerMaxRequestBatchExceededError(srvLim.MaxRequestBatch) } } if srvLim.MaxAckPending > 0 && config.MaxAckPending > srvLim.MaxAckPending { return NewJSConsumerMaxPendingAckExcessError(srvLim.MaxAckPending) } if accLim.MaxAckPending > 0 && config.MaxAckPending > accLim.MaxAckPending { return NewJSConsumerMaxPendingAckExcessError(accLim.MaxAckPending) } if cfg.ConsumerLimits.MaxAckPending > 0 && config.MaxAckPending > cfg.ConsumerLimits.MaxAckPending { return NewJSConsumerMaxPendingAckExcessError(cfg.ConsumerLimits.MaxAckPending) } if cfg.ConsumerLimits.InactiveThreshold > 0 && config.InactiveThreshold > cfg.ConsumerLimits.InactiveThreshold { return NewJSConsumerInactiveThresholdExcessError(cfg.ConsumerLimits.InactiveThreshold) } // Direct need to be non-mapped ephemerals. if config.Direct { if config.DeliverSubject == _EMPTY_ { return NewJSConsumerDirectRequiresPushError() } if isDurableConsumer(config) { return NewJSConsumerDirectRequiresEphemeralError() } } // Do not allow specifying both FilterSubject and FilterSubjects, // as that's probably unintentional without any difference from passing // all filters in FilterSubjects. if config.FilterSubject != _EMPTY_ && len(config.FilterSubjects) > 0 { return NewJSConsumerDuplicateFilterSubjectsError() } if config.FilterSubject != _EMPTY_ && !IsValidSubject(config.FilterSubject) { return NewJSStreamInvalidConfigError(ErrBadSubject) } // We treat FilterSubjects: []string{""} as a misconfig, so we validate against it. for _, filter := range config.FilterSubjects { if filter == _EMPTY_ { return NewJSConsumerEmptyFilterError() } } subjectFilters := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) // Check subject filters overlap. for outer, subject := range subjectFilters { if !IsValidSubject(subject) { return NewJSStreamInvalidConfigError(ErrBadSubject) } for inner, ssubject := range subjectFilters { if inner != outer && subjectIsSubsetMatch(subject, ssubject) { return NewJSConsumerOverlappingSubjectFiltersError() } } } // Helper function to formulate similar errors. badStart := func(dp, start string) error { return fmt.Errorf("consumer delivery policy is deliver %s, but optional start %s is also set", dp, start) } notSet := func(dp, notSet string) error { return fmt.Errorf("consumer delivery policy is deliver %s, but optional %s is not set", dp, notSet) } // Check on start position conflicts. switch config.DeliverPolicy { case DeliverAll: if config.OptStartSeq > 0 { return NewJSConsumerInvalidPolicyError(badStart("all", "sequence")) } if config.OptStartTime != nil { return NewJSConsumerInvalidPolicyError(badStart("all", "time")) } case DeliverLast: if config.OptStartSeq > 0 { return NewJSConsumerInvalidPolicyError(badStart("last", "sequence")) } if config.OptStartTime != nil { return NewJSConsumerInvalidPolicyError(badStart("last", "time")) } case DeliverLastPerSubject: if config.OptStartSeq > 0 { return NewJSConsumerInvalidPolicyError(badStart("last per subject", "sequence")) } if config.OptStartTime != nil { return NewJSConsumerInvalidPolicyError(badStart("last per subject", "time")) } if config.FilterSubject == _EMPTY_ && len(config.FilterSubjects) == 0 { return NewJSConsumerInvalidPolicyError(notSet("last per subject", "filter subject")) } case DeliverNew: if config.OptStartSeq > 0 { return NewJSConsumerInvalidPolicyError(badStart("new", "sequence")) } if config.OptStartTime != nil { return NewJSConsumerInvalidPolicyError(badStart("new", "time")) } case DeliverByStartSequence: if config.OptStartSeq == 0 { return NewJSConsumerInvalidPolicyError(notSet("by start sequence", "start sequence")) } if config.OptStartTime != nil { return NewJSConsumerInvalidPolicyError(badStart("by start sequence", "time")) } case DeliverByStartTime: if config.OptStartTime == nil { return NewJSConsumerInvalidPolicyError(notSet("by start time", "start time")) } if config.OptStartSeq != 0 { return NewJSConsumerInvalidPolicyError(badStart("by start time", "start sequence")) } } if config.SampleFrequency != _EMPTY_ { s := strings.TrimSuffix(config.SampleFrequency, "%") if sampleFreq, err := strconv.Atoi(s); err != nil || sampleFreq < 0 { return NewJSConsumerInvalidSamplingError(err) } } // We reject if flow control is set without heartbeats. if config.FlowControl && config.Heartbeat == 0 { return NewJSConsumerWithFlowControlNeedsHeartbeatsError() } if config.Durable != _EMPTY_ && config.Name != _EMPTY_ { if config.Name != config.Durable { return NewJSConsumerCreateDurableAndNameMismatchError() } } var metadataLen int for k, v := range config.Metadata { metadataLen += len(k) + len(v) } if metadataLen > JSMaxMetadataLen { return NewJSConsumerMetadataLengthError(fmt.Sprintf("%dKB", JSMaxMetadataLen/1024)) } return nil } func (mset *stream) addConsumerWithAction(config *ConsumerConfig, action ConsumerAction) (*consumer, error) { return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false, action) } func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { return mset.addConsumerWithAction(config, ActionCreateOrUpdate) } func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) { mset.mu.RLock() s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc retention := cfg.Retention mset.mu.RUnlock() // If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn. // This can happen on startup with restored state where on meta replay we still do not have // the assignment. Running in single server mode this always returns true. if oname != _EMPTY_ && !jsa.consumerAssigned(mset.name(), oname) { s.Debugf("Consumer %q > %q does not seem to be assigned to this server", mset.name(), oname) } if config == nil { return nil, NewJSConsumerConfigRequiredError() } jsa.usageMu.RLock() selectedLimits, limitsFound := jsa.limits[tierName] jsa.usageMu.RUnlock() if !limitsFound { return nil, NewJSNoLimitsError() } srvLim := &s.getOpts().JetStreamLimits // Make sure we have sane defaults. Do so with the JS lock, otherwise a // badly timed meta snapshot can result in a race condition. mset.js.mu.Lock() setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits) mset.js.mu.Unlock() if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil { return nil, err } sampleFreq := 0 if config.SampleFrequency != _EMPTY_ { // Can't fail as checkConsumerCfg checks correct format sampleFreq, _ = strconv.Atoi(strings.TrimSuffix(config.SampleFrequency, "%")) } // Grab the client, account and server reference. c := mset.client if c == nil { return nil, NewJSStreamInvalidError() } var accName string c.mu.Lock() s, a := c.srv, c.acc if a != nil { accName = a.Name } c.mu.Unlock() // Hold mset lock here. mset.mu.Lock() if mset.client == nil || mset.store == nil || mset.consumers == nil { mset.mu.Unlock() return nil, NewJSStreamInvalidError() } // If this one is durable and already exists, we let that be ok as long as only updating what should be allowed. var cName string if isDurableConsumer(config) { cName = config.Durable } else if config.Name != _EMPTY_ { cName = config.Name } if cName != _EMPTY_ { if eo, ok := mset.consumers[cName]; ok { mset.mu.Unlock() if action == ActionCreate && !reflect.DeepEqual(*config, eo.config()) { return nil, NewJSConsumerAlreadyExistsError() } // Check for overlapping subjects. if mset.cfg.Retention == WorkQueuePolicy { subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if !mset.partitionUnique(cName, subjects) { return nil, NewJSConsumerWQConsumerNotUniqueError() } } err := eo.updateConfig(config) if err == nil { return eo, nil } return nil, NewJSConsumerCreateError(err, Unless(err)) } } if action == ActionUpdate { mset.mu.Unlock() return nil, NewJSConsumerDoesNotExistError() } // Check for any limits, if the config for the consumer sets a limit we check against that // but if not we use the value from account limits, if account limits is more restrictive // than stream config we prefer the account limits to handle cases where account limits are // updated during the lifecycle of the stream maxc := mset.cfg.MaxConsumers if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) { maxc = selectedLimits.MaxConsumers } if maxc > 0 && mset.numPublicConsumers() >= maxc { mset.mu.Unlock() return nil, NewJSMaximumConsumersLimitError() } // Check on stream type conflicts with WorkQueues. if mset.cfg.Retention == WorkQueuePolicy && !config.Direct { // Force explicit acks here. if config.AckPolicy != AckExplicit { mset.mu.Unlock() return nil, NewJSConsumerWQRequiresExplicitAckError() } subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if len(mset.consumers) > 0 { if len(subjects) == 0 { mset.mu.Unlock() return nil, NewJSConsumerWQMultipleUnfilteredError() } else if !mset.partitionUnique(cName, subjects) { // Prior to v2.9.7, on a stream with WorkQueue policy, the servers // were not catching the error of having multiple consumers with // overlapping filter subjects depending on the scope, for instance // creating "foo.*.bar" and then "foo.>" was not detected, while // "foo.>" and then "foo.*.bar" would have been. Failing here // in recovery mode would leave the rejected consumer in a bad state, // so we will simply warn here, asking the user to remove this // consumer administratively. Otherwise, if this is the creation // of a new consumer, we will return the error. if isRecovering { s.Warnf("Consumer %q > %q has a filter subject that overlaps "+ "with other consumers, which is not allowed for a stream "+ "with WorkQueue policy, it should be administratively deleted", cfg.Name, cName) } else { // We have a partition but it is not unique amongst the others. mset.mu.Unlock() return nil, NewJSConsumerWQConsumerNotUniqueError() } } } if config.DeliverPolicy != DeliverAll { mset.mu.Unlock() return nil, NewJSConsumerWQConsumerNotDeliverAllError() } } // Set name, which will be durable name if set, otherwise we create one at random. o := &consumer{ mset: mset, js: s.getJetStream(), acc: a, srv: s, client: s.createInternalJetStreamClient(), sysc: s.createInternalJetStreamClient(), cfg: *config, dsubj: config.DeliverSubject, outq: mset.outq, active: true, qch: make(chan struct{}), uch: make(chan struct{}, 1), mch: make(chan struct{}, 1), sfreq: int32(sampleFreq), maxdc: uint64(config.MaxDeliver), maxp: config.MaxAckPending, retention: retention, created: time.Now().UTC(), } // Bind internal client to the user account. o.client.registerWithAccount(a) // Bind to the system account. o.sysc.registerWithAccount(s.SystemAccount()) if isDurableConsumer(config) { if len(config.Durable) > JSMaxNameLen { mset.mu.Unlock() o.deleteWithoutAdvisory() return nil, NewJSConsumerNameTooLongError(JSMaxNameLen) } o.name = config.Durable } else if oname != _EMPTY_ { o.name = oname } else { if config.Name != _EMPTY_ { o.name = config.Name } else { // Legacy ephemeral auto-generated. for { o.name = createConsumerName() if _, ok := mset.consumers[o.name]; !ok { break } } config.Name = o.name } } // Create ackMsgs queue now that we have a consumer name o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name)) // Create our request waiting queue. if o.isPullMode() { o.waiting = newWaitQueue(config.MaxWaiting) // Create our internal queue for next msg requests. o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name)) } // already under lock, mset.Name() would deadlock o.stream = mset.cfg.Name o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name if !isValidName(o.name) { mset.mu.Unlock() o.deleteWithoutAdvisory() return nil, NewJSConsumerBadDurableNameError() } // Setup our storage if not a direct consumer. if !config.Direct { store, err := mset.store.ConsumerStore(o.name, config) if err != nil { mset.mu.Unlock() o.deleteWithoutAdvisory() return nil, NewJSConsumerStoreFailedError(err) } o.store = store } subjects := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects) for _, filter := range subjects { sub := &subjectFilter{ subject: filter, hasWildcard: subjectHasWildcard(filter), } o.subjf = append(o.subjf, sub) } if o.store != nil && o.store.HasState() { // Restore our saved state. o.mu.Lock() o.readStoredState(0) o.mu.Unlock() } else { // Select starting sequence number o.selectStartingSeqNo() } // Now register with mset and create the ack subscription. // Check if we already have this one registered. if eo, ok := mset.consumers[o.name]; ok { mset.mu.Unlock() if !o.isDurable() || !o.isPushMode() { o.name = _EMPTY_ // Prevent removal since same name. o.deleteWithoutAdvisory() return nil, NewJSConsumerNameExistError() } // If we are here we have already registered this durable. If it is still active that is an error. if eo.isActive() { o.name = _EMPTY_ // Prevent removal since same name. o.deleteWithoutAdvisory() return nil, NewJSConsumerExistingActiveError() } // Since we are here this means we have a potentially new durable so we should update here. // Check that configs are the same. if !configsEqualSansDelivery(o.cfg, eo.cfg) { o.name = _EMPTY_ // Prevent removal since same name. o.deleteWithoutAdvisory() return nil, NewJSConsumerReplacementWithDifferentNameError() } // Once we are here we have a replacement push-based durable. eo.updateDeliverSubject(o.cfg.DeliverSubject) return eo, nil } // Set up the ack subscription for this consumer. Will use wildcard for all acks. // We will remember the template to generate replies with sequence numbers and use // that to scanf them back in. // Escape '%' in consumer and stream names, as `pre` is used as a template later // in consumer.ackReply(), resulting in erroneous formatting of the ack subject. mn := strings.ReplaceAll(mset.cfg.Name, "%", "%%") pre := fmt.Sprintf(jsAckT, mn, strings.ReplaceAll(o.name, "%", "%%")) o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre) o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre) o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name) // Check/update the inactive threshold o.updateInactiveThreshold(&o.cfg) if o.isPushMode() { // Check if we are running only 1 replica and that the delivery subject has interest. // Check in place here for interest. Will setup properly in setLeader. if config.replicas(&mset.cfg) == 1 { r := o.acc.sl.Match(o.cfg.DeliverSubject) if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) { // Let the interest come to us eventually, but setup delete timer. o.updateDeliveryInterest(false) } } } // Set our ca. if ca != nil { o.setConsumerAssignment(ca) } // Check if we have a rate limit set. if config.RateLimit != 0 { o.setRateLimit(config.RateLimit) } mset.setConsumer(o) mset.mu.Unlock() if config.Direct || (!s.JetStreamIsClustered() && s.standAloneMode()) { o.setLeader(true) } // This is always true in single server mode. if o.IsLeader() { // Send advisory. var suppress bool if !s.standAloneMode() && ca == nil { suppress = true } else if ca != nil { suppress = ca.responded } if !suppress { o.sendCreateAdvisory() } } return o, nil } // Updates the consumer `dthresh` delete timer duration and set // cfg.InactiveThreshold to JsDeleteWaitTimeDefault for ephemerals // if not explicitly already specified by the user. // Lock should be held. func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) { // Ephemerals will always have inactive thresholds. if !o.isDurable() && cfg.InactiveThreshold <= 0 { // Add in 1 sec of jitter above and beyond the default of 5s. o.dthresh = JsDeleteWaitTimeDefault + 100*time.Millisecond + time.Duration(rand.Int63n(900))*time.Millisecond // Only stamp config with default sans jitter. cfg.InactiveThreshold = JsDeleteWaitTimeDefault } else if cfg.InactiveThreshold > 0 { // Add in up to 1 sec of jitter if pull mode. if o.isPullMode() { o.dthresh = cfg.InactiveThreshold + 100*time.Millisecond + time.Duration(rand.Int63n(900))*time.Millisecond } else { o.dthresh = cfg.InactiveThreshold } } else if cfg.InactiveThreshold <= 0 { // We accept InactiveThreshold be set to 0 (for durables) o.dthresh = 0 } } func (o *consumer) consumerAssignment() *consumerAssignment { o.mu.RLock() defer o.mu.RUnlock() return o.ca } func (o *consumer) setConsumerAssignment(ca *consumerAssignment) { o.mu.Lock() defer o.mu.Unlock() o.ca = ca if ca == nil { return } // Set our node. o.node = ca.Group.node // Trigger update chan. select { case o.uch <- struct{}{}: default: } } func (o *consumer) updateC() <-chan struct{} { o.mu.RLock() defer o.mu.RUnlock() return o.uch } // checkQueueInterest will check on our interest's queue group status. // Lock should be held. func (o *consumer) checkQueueInterest() { if !o.active || o.cfg.DeliverSubject == _EMPTY_ { return } subj := o.dsubj if subj == _EMPTY_ { subj = o.cfg.DeliverSubject } if rr := o.acc.sl.Match(subj); len(rr.qsubs) > 0 { // Just grab first if qsubs := rr.qsubs[0]; len(qsubs) > 0 { if sub := rr.qsubs[0][0]; len(sub.queue) > 0 { o.qgroup = string(sub.queue) } } } } // clears our node if we have one. When we scale down to 1. func (o *consumer) clearNode() { o.mu.Lock() defer o.mu.Unlock() if o.node != nil { o.node.Delete() o.node = nil } } // IsLeader will return if we are the current leader. func (o *consumer) IsLeader() bool { o.mu.RLock() defer o.mu.RUnlock() return o.isLeader() } // Lock should be held. func (o *consumer) isLeader() bool { if o.node != nil { return o.node.Leader() } return true } func (o *consumer) setLeader(isLeader bool) { o.mu.RLock() mset := o.mset isRunning := o.ackSub != nil o.mu.RUnlock() // If we are here we have a change in leader status. if isLeader { if mset == nil { return } if isRunning { // If we detect we are scaling up, make sure to create clustered routines and channels. o.mu.Lock() if o.node != nil && o.pch == nil { // We are moving from R1 to clustered. o.pch = make(chan struct{}, 1) go o.loopAndForwardProposals(o.qch) if o.phead != nil { select { case o.pch <- struct{}{}: default: } } } o.mu.Unlock() return } mset.mu.RLock() s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq mset.mu.RUnlock() // Register as a leader with our parent stream. mset.setConsumerAsLeader(o) o.mu.Lock() o.rdq = nil o.rdqi.Empty() // Restore our saved state. During non-leader status we just update our underlying store. o.readStoredState(lseq) // Setup initial num pending. o.streamNumPending() // Cleanup lss when we take over in clustered mode. if o.hasSkipListPending() && o.sseq >= o.lss.resume { o.lss = nil } // Update the group on the our starting sequence if we are starting but we skipped some in the stream. if o.dseq == 1 && o.sseq > 1 { o.updateSkipped(o.sseq) } // Do info sub. if o.infoSub == nil && jsa != nil { isubj := fmt.Sprintf(clusterConsumerInfoT, jsa.acc(), stream, o.name) // Note below the way we subscribe here is so that we can send requests to ourselves. o.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, o.sysc, o.handleClusterConsumerInfoRequest) } var err error if o.cfg.AckPolicy != AckNone { if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil { o.mu.Unlock() o.deleteWithoutAdvisory() return } } // Setup the internal sub for next message requests regardless. // Will error if wrong mode to provide feedback to users. if o.reqSub, err = o.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil { o.mu.Unlock() o.deleteWithoutAdvisory() return } // Check on flow control settings. if o.cfg.FlowControl { o.setMaxPendingBytes(JsFlowControlMaxPending) fcsubj := fmt.Sprintf(jsFlowControl, stream, o.name) if o.fcSub, err = o.subscribeInternal(fcsubj, o.processFlowControl); err != nil { o.mu.Unlock() o.deleteWithoutAdvisory() return } } // If push mode, register for notifications on interest. if o.isPushMode() { o.inch = make(chan bool, 8) o.acc.sl.registerNotification(o.cfg.DeliverSubject, o.cfg.DeliverGroup, o.inch) if o.active = <-o.inch; o.active { o.checkQueueInterest() } // Check gateways in case they are enabled. if s.gateway.enabled { if !o.active { o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject) } stopAndClearTimer(&o.gwdtmr) o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() }) } } if o.dthresh > 0 && (o.isPullMode() || !o.active) { // Pull consumer. We run the dtmr all the time for this one. stopAndClearTimer(&o.dtmr) o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } // If we are not in ReplayInstant mode mark us as in replay state until resolved. if o.cfg.ReplayPolicy != ReplayInstant { o.replay = true } // Recreate quit channel. o.qch = make(chan struct{}) qch := o.qch node := o.node if node != nil && o.pch == nil { o.pch = make(chan struct{}, 1) } pullMode := o.isPullMode() o.mu.Unlock() // Snapshot initial info. o.infoWithSnap(true) // These are the labels we will use to annotate our goroutines. labels := pprofLabels{ "type": "consumer", "account": mset.accName(), "stream": mset.name(), "consumer": o.name, } // Now start up Go routine to deliver msgs. go func() { setGoRoutineLabels(labels) o.loopAndGatherMsgs(qch) }() // Now start up Go routine to process acks. go func() { setGoRoutineLabels(labels) o.processInboundAcks(qch) }() if pullMode { // Now start up Go routine to process inbound next message requests. go func() { setGoRoutineLabels(labels) o.processInboundNextMsgReqs(qch) }() } // If we are R>1 spin up our proposal loop. if node != nil { // Determine if we can send pending requests info to the group. // They must be on server versions >= 2.7.1 o.checkAndSetPendingRequestsOk() o.checkPendingRequests() go func() { setGoRoutineLabels(labels) o.loopAndForwardProposals(qch) }() } } else { // Shutdown the go routines and the subscriptions. o.mu.Lock() if o.qch != nil { close(o.qch) o.qch = nil } // Stop any inactivity timers. Should only be running on leaders. stopAndClearTimer(&o.dtmr) // Make sure to clear out any re-deliver queues stopAndClearTimer(&o.ptmr) o.rdq = nil o.rdqi.Empty() o.pending = nil // ok if they are nil, we protect inside unsubscribe() o.unsubscribe(o.ackSub) o.unsubscribe(o.reqSub) o.unsubscribe(o.fcSub) o.ackSub, o.reqSub, o.fcSub = nil, nil, nil if o.infoSub != nil { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) o.nextMsgReqs.drain() } else if o.srv.gateway.enabled { stopAndClearTimer(&o.gwdtmr) } o.mu.Unlock() // Unregister as a leader with our parent stream. if mset != nil { mset.removeConsumerAsLeader(o) } } } // This is coming on the wire so do not block here. func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { go o.infoWithSnapAndReply(false, reply) } // Lock should be held. func (o *consumer) subscribeInternal(subject string, cb msgHandler) (*subscription, error) { c := o.client if c == nil { return nil, fmt.Errorf("invalid consumer") } if !c.srv.EventsEnabled() { return nil, ErrNoSysAccount } if cb == nil { return nil, fmt.Errorf("undefined message handler") } o.sid++ // Now create the subscription return c.processSub([]byte(subject), nil, []byte(strconv.Itoa(o.sid)), cb, false) } // Unsubscribe from our subscription. // Lock should be held. func (o *consumer) unsubscribe(sub *subscription) { if sub == nil || o.client == nil { return } o.client.processUnsub(sub.sid) } // We need to make sure we protect access to the outq. // Do all advisory sends here. func (o *consumer) sendAdvisory(subj string, msg []byte) { o.outq.sendMsg(subj, msg) } func (o *consumer) sendDeleteAdvisoryLocked() { e := JSConsumerActionAdvisory{ TypedEvent: TypedEvent{ Type: JSConsumerActionAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: o.stream, Consumer: o.name, Action: DeleteEvent, Domain: o.srv.getOpts().JetStreamDomain, } j, err := json.Marshal(e) if err != nil { return } subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name o.sendAdvisory(subj, j) } func (o *consumer) sendCreateAdvisory() { o.mu.Lock() defer o.mu.Unlock() e := JSConsumerActionAdvisory{ TypedEvent: TypedEvent{ Type: JSConsumerActionAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: o.stream, Consumer: o.name, Action: CreateEvent, Domain: o.srv.getOpts().JetStreamDomain, } j, err := json.Marshal(e) if err != nil { return } subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name o.sendAdvisory(subj, j) } // Created returns created time. func (o *consumer) createdTime() time.Time { o.mu.Lock() created := o.created o.mu.Unlock() return created } // Internal to allow creation time to be restored. func (o *consumer) setCreatedTime(created time.Time) { o.mu.Lock() o.created = created o.mu.Unlock() } // This will check for extended interest in a subject. If we have local interest we just return // that, but in the absence of local interest and presence of gateways or service imports we need // to check those as well. func (o *consumer) hasDeliveryInterest(localInterest bool) bool { o.mu.Lock() mset := o.mset if mset == nil { o.mu.Unlock() return false } acc := o.acc deliver := o.cfg.DeliverSubject o.mu.Unlock() if localInterest { return true } // If we are here check gateways. if s := acc.srv; s != nil && s.hasGatewayInterest(acc.Name, deliver) { return true } return false } func (s *Server) hasGatewayInterest(account, subject string) bool { gw := s.gateway if !gw.enabled { return false } gw.RLock() defer gw.RUnlock() for _, gwc := range gw.outo { psi, qr := gwc.gatewayInterest(account, subject) if psi || qr != nil { return true } } return false } // This processes an update to the local interest for a deliver subject. func (o *consumer) updateDeliveryInterest(localInterest bool) bool { interest := o.hasDeliveryInterest(localInterest) o.mu.Lock() defer o.mu.Unlock() mset := o.mset if mset == nil || o.isPullMode() { return false } if interest && !o.active { o.signalNewMessages() } // Update active status, if not active clear any queue group we captured. if o.active = interest; !o.active { o.qgroup = _EMPTY_ } else { o.checkQueueInterest() } // If the delete timer has already been set do not clear here and return. // Note that durable can now have an inactive threshold, so don't check // for durable status, instead check for dthresh > 0. if o.dtmr != nil && o.dthresh > 0 && !interest { return true } // Stop and clear the delete timer always. stopAndClearTimer(&o.dtmr) // If we do not have interest anymore and have a delete threshold set, then set // a timer to delete us. We wait for a bit in case of server reconnect. if !interest && o.dthresh > 0 { o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) return true } return false } func (o *consumer) deleteNotActive() { o.mu.Lock() if o.mset == nil { o.mu.Unlock() return } // Push mode just look at active. if o.isPushMode() { // If we are active simply return. if o.active { o.mu.Unlock() return } } else { // Pull mode. elapsed := time.Since(o.waiting.last) if elapsed <= o.cfg.InactiveThreshold { // These need to keep firing so reset but use delta. if o.dtmr != nil { o.dtmr.Reset(o.dthresh - elapsed) } else { o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive) } o.mu.Unlock() return } // Check if we still have valid requests waiting. if o.checkWaitingForInterest() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh) } else { o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } o.mu.Unlock() return } } s, js := o.mset.srv, o.srv.js.Load() acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct o.mu.Unlock() // If we are clustered, check if we still have this consumer assigned. // If we do forward a proposal to delete ourselves to the metacontroller leader. if !isDirect && s.JetStreamIsClustered() { js.mu.RLock() var ( cca consumerAssignment meta RaftNode removeEntry []byte ) ca, cc := js.consumerAssignment(acc, stream, name), js.cluster if ca != nil && cc != nil { meta = cc.meta cca = *ca cca.Reply = _EMPTY_ removeEntry = encodeDeleteConsumerAssignment(&cca) meta.ForwardProposal(removeEntry) } js.mu.RUnlock() if ca != nil && cc != nil { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { const ( startInterval = 30 * time.Second maxInterval = 5 * time.Minute ) jitter := time.Duration(rand.Int63n(int64(startInterval))) interval := startInterval + jitter ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { js.mu.RLock() if js.shuttingDown { js.mu.RUnlock() return } nca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() // Make sure this is not a new consumer with the same name. if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) if interval < maxInterval { interval *= 2 ticker.Reset(interval) } continue } // We saw that consumer has been removed, all done. return } }() } } // We will delete here regardless. o.delete() } func (o *consumer) watchGWinterest() { pa := o.isActive() // If there is no local interest... if o.hasNoLocalInterest() { o.updateDeliveryInterest(false) if !pa && o.isActive() { o.signalNewMessages() } } // We want this to always be running so we can also pick up on interest returning. o.mu.Lock() if o.gwdtmr != nil { o.gwdtmr.Reset(time.Second) } else { stopAndClearTimer(&o.gwdtmr) o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() }) } o.mu.Unlock() } // Config returns the consumer's configuration. func (o *consumer) config() ConsumerConfig { o.mu.Lock() defer o.mu.Unlock() return o.cfg } // Force expiration of all pending. // Lock should be held. func (o *consumer) forceExpirePending() { var expired []uint64 for seq := range o.pending { if !o.onRedeliverQueue(seq) { expired = append(expired, seq) } } if len(expired) > 0 { sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] }) o.addToRedeliverQueue(expired...) // Now we should update the timestamp here since we are redelivering. // We will use an incrementing time to preserve order for any other redelivery. off := time.Now().UnixNano() - o.pending[expired[0]].Timestamp for _, seq := range expired { if p, ok := o.pending[seq]; ok && p != nil { p.Timestamp += off } } o.ptmr.Reset(o.ackWait(0)) } o.signalNewMessages() } // Acquire proper locks and update rate limit. // Will use what is in config. func (o *consumer) setRateLimitNeedsLocks() { o.mu.RLock() mset := o.mset o.mu.RUnlock() if mset == nil { return } mset.mu.RLock() o.mu.Lock() o.setRateLimit(o.cfg.RateLimit) o.mu.Unlock() mset.mu.RUnlock() } // Set the rate limiter // Both mset and consumer lock should be held. func (o *consumer) setRateLimit(bps uint64) { if bps == 0 { o.rlimit = nil return } // TODO(dlc) - Make sane values or error if not sane? // We are configured in bits per sec so adjust to bytes. rl := rate.Limit(bps / 8) mset := o.mset // Burst should be set to maximum msg size for this account, etc. var burst int if mset.cfg.MaxMsgSize > 0 { burst = int(mset.cfg.MaxMsgSize) } else if mset.jsa.account.limits.mpay > 0 { burst = int(mset.jsa.account.limits.mpay) } else { s := mset.jsa.account.srv burst = int(s.getOpts().MaxPayload) } o.rlimit = rate.NewLimiter(rl, burst) } // Check if new consumer config allowed vs old. func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { if reflect.DeepEqual(cfg, ncfg) { return nil } // Something different, so check since we only allow certain things to be updated. if cfg.DeliverPolicy != ncfg.DeliverPolicy { return errors.New("deliver policy can not be updated") } if cfg.OptStartSeq != ncfg.OptStartSeq { return errors.New("start sequence can not be updated") } if cfg.OptStartTime != ncfg.OptStartTime { return errors.New("start time can not be updated") } if cfg.AckPolicy != ncfg.AckPolicy { return errors.New("ack policy can not be updated") } if cfg.ReplayPolicy != ncfg.ReplayPolicy { return errors.New("replay policy can not be updated") } if cfg.Heartbeat != ncfg.Heartbeat { return errors.New("heart beats can not be updated") } if cfg.FlowControl != ncfg.FlowControl { return errors.New("flow control can not be updated") } if cfg.MaxWaiting != ncfg.MaxWaiting { return errors.New("max waiting can not be updated") } // Deliver Subject is conditional on if its bound. if cfg.DeliverSubject != ncfg.DeliverSubject { if cfg.DeliverSubject == _EMPTY_ { return errors.New("can not update pull consumer to push based") } if ncfg.DeliverSubject == _EMPTY_ { return errors.New("can not update push consumer to pull based") } rr := acc.sl.Match(cfg.DeliverSubject) if len(rr.psubs)+len(rr.qsubs) != 0 { return NewJSConsumerNameExistError() } } // Check if BackOff is defined, MaxDeliver is within range. if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver <= lbo { return NewJSConsumerMaxDeliverBackoffError() } return nil } // Update the config based on the new config, or error if update not allowed. func (o *consumer) updateConfig(cfg *ConsumerConfig) error { o.mu.Lock() defer o.mu.Unlock() if err := o.acc.checkNewConsumerConfig(&o.cfg, cfg); err != nil { return err } if o.store != nil { // Update local state always. if err := o.store.UpdateConfig(cfg); err != nil { return err } } // DeliverSubject if cfg.DeliverSubject != o.cfg.DeliverSubject { o.updateDeliverSubjectLocked(cfg.DeliverSubject) } // MaxAckPending if cfg.MaxAckPending != o.cfg.MaxAckPending { o.maxp = cfg.MaxAckPending o.signalNewMessages() } // AckWait if cfg.AckWait != o.cfg.AckWait { if o.ptmr != nil { o.ptmr.Reset(100 * time.Millisecond) } } // Rate Limit if cfg.RateLimit != o.cfg.RateLimit { // We need both locks here so do in Go routine. go o.setRateLimitNeedsLocks() } if cfg.SampleFrequency != o.cfg.SampleFrequency { s := strings.TrimSuffix(cfg.SampleFrequency, "%") // String has been already verified for validity up in the stack, so no // need to check for error here. sampleFreq, _ := strconv.Atoi(s) o.sfreq = int32(sampleFreq) } // Set MaxDeliver if changed if cfg.MaxDeliver != o.cfg.MaxDeliver { o.maxdc = uint64(cfg.MaxDeliver) } // Set InactiveThreshold if changed. if val := cfg.InactiveThreshold; val != o.cfg.InactiveThreshold { o.updateInactiveThreshold(cfg) stopAndClearTimer(&o.dtmr) // Restart timer only if we are the leader. if o.isLeader() && o.dthresh > 0 { o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } } // Check for Subject Filters update. newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects) if !subjectSliceEqual(newSubjects, o.subjf.subjects()) { newSubjf := make(subjectFilters, 0, len(newSubjects)) for _, newFilter := range newSubjects { fs := &subjectFilter{ subject: newFilter, hasWildcard: subjectHasWildcard(newFilter), } // If given subject was present, we will retain its fields values // so `getNextMgs` can take advantage of already buffered `pmsgs`. for _, oldFilter := range o.subjf { if oldFilter.subject == newFilter { fs.currentSeq = oldFilter.currentSeq fs.nextSeq = oldFilter.nextSeq fs.pmsg = oldFilter.pmsg } continue } newSubjf = append(newSubjf, fs) } // Make sure we have correct signaling setup. // Consumer lock can not be held. mset := o.mset o.mu.Unlock() mset.swapSigSubs(o, newSubjf.subjects()) o.mu.Lock() // When we're done with signaling, we can replace the subjects. // If filters were removed, set `o.subjf` to nil. if len(newSubjf) == 0 { o.subjf = nil } else { o.subjf = newSubjf } } // Record new config for others that do not need special handling. // Allowed but considered no-op, [Description, SampleFrequency, MaxWaiting, HeadersOnly] o.cfg = *cfg // Re-calculate num pending on update. o.streamNumPending() return nil } // This is a config change for the delivery subject for a // push based consumer. func (o *consumer) updateDeliverSubject(newDeliver string) { // Update the config and the dsubj o.mu.Lock() defer o.mu.Unlock() o.updateDeliverSubjectLocked(newDeliver) } // This is a config change for the delivery subject for a // push based consumer. func (o *consumer) updateDeliverSubjectLocked(newDeliver string) { if o.closed || o.isPullMode() || o.cfg.DeliverSubject == newDeliver { return } // Force redeliver of all pending on change of delivery subject. if len(o.pending) > 0 { o.forceExpirePending() } o.acc.sl.clearNotification(o.dsubj, o.cfg.DeliverGroup, o.inch) o.dsubj, o.cfg.DeliverSubject = newDeliver, newDeliver // When we register new one it will deliver to update state loop. o.acc.sl.registerNotification(newDeliver, o.cfg.DeliverGroup, o.inch) } // Check that configs are equal but allow delivery subjects to be different. func configsEqualSansDelivery(a, b ConsumerConfig) bool { // These were copied in so can set Delivery here. a.DeliverSubject, b.DeliverSubject = _EMPTY_, _EMPTY_ return reflect.DeepEqual(a, b) } // Helper to send a reply to an ack. func (o *consumer) sendAckReply(subj string) { o.mu.Lock() defer o.mu.Unlock() o.sendAdvisory(subj, nil) } type jsAckMsg struct { subject string reply string hdr int msg []byte } var jsAckMsgPool sync.Pool func newJSAckMsg(subj, reply string, hdr int, msg []byte) *jsAckMsg { var m *jsAckMsg am := jsAckMsgPool.Get() if am != nil { m = am.(*jsAckMsg) } else { m = &jsAckMsg{} } // When getting something from a pool it is critical that all fields are // initialized. Doing this way guarantees that if someone adds a field to // the structure, the compiler will fail the build if this line is not updated. (*m) = jsAckMsg{subj, reply, hdr, msg} return m } func (am *jsAckMsg) returnToPool() { if am == nil { return } am.subject, am.reply, am.hdr, am.msg = _EMPTY_, _EMPTY_, -1, nil jsAckMsgPool.Put(am) } // Push the ack message to the consumer's ackMsgs queue func (o *consumer) pushAck(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { atomic.AddInt64(&o.awl, 1) o.ackMsgs.push(newJSAckMsg(subject, reply, c.pa.hdr, copyBytes(rmsg))) } // Processes a message for the ack reply subject delivered with a message. func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { defer atomic.AddInt64(&o.awl, -1) var msg []byte if hdr > 0 { msg = rmsg[hdr:] } else { msg = rmsg } sseq, dseq, dc := ackReplyInfo(subject) skipAckReply := sseq == 0 switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): o.processAckMsg(sseq, dseq, dc, true) case bytes.HasPrefix(msg, AckNext): o.processAckMsg(sseq, dseq, dc, true) o.processNextMsgRequest(reply, msg[len(AckNext):]) skipAckReply = true case bytes.HasPrefix(msg, AckNak): o.processNak(sseq, dseq, dc, msg) case bytes.Equal(msg, AckProgress): o.progressUpdate(sseq) case bytes.Equal(msg, AckTerm): o.processTerm(sseq, dseq, dc) } // Ack the ack if requested. if len(reply) > 0 && !skipAckReply { o.sendAckReply(reply) } } // Used to process a working update to delay redelivery. func (o *consumer) progressUpdate(seq uint64) { o.mu.Lock() defer o.mu.Unlock() if p, ok := o.pending[seq]; ok { p.Timestamp = time.Now().UnixNano() // Update store system. o.updateDelivered(p.Sequence, seq, 1, p.Timestamp) } } // Lock should be held. func (o *consumer) updateSkipped(seq uint64) { // Clustered mode and R>1 only. if o.node == nil || !o.isLeader() { return } var b [1 + 8]byte b[0] = byte(updateSkipOp) var le = binary.LittleEndian le.PutUint64(b[1:], seq) o.propose(b[:]) } func (o *consumer) loopAndForwardProposals(qch chan struct{}) { o.mu.RLock() node, pch := o.node, o.pch o.mu.RUnlock() if node == nil || pch == nil { return } forwardProposals := func() error { o.mu.Lock() if o.node != node || node.State() != Leader { o.mu.Unlock() return errors.New("no longer leader") } proposal := o.phead o.phead, o.ptail = nil, nil o.mu.Unlock() // 256k max for now per batch. const maxBatch = 256 * 1024 var entries []*Entry for sz := 0; proposal != nil; proposal = proposal.next { entry := entryPool.Get().(*Entry) entry.Type, entry.Data = EntryNormal, proposal.data entries = append(entries, entry) sz += len(proposal.data) if sz > maxBatch { node.ProposeDirect(entries) // We need to re-create `entries` because there is a reference // to it in the node's pae map. sz, entries = 0, nil } } if len(entries) > 0 { node.ProposeDirect(entries) } return nil } // In case we have anything pending on entry. forwardProposals() for { select { case <-qch: forwardProposals() return case <-pch: if err := forwardProposals(); err != nil { return } } } } // Lock should be held. func (o *consumer) propose(entry []byte) { var notify bool p := &proposal{data: entry} if o.phead == nil { o.phead = p notify = true } else { o.ptail.next = p } o.ptail = p // Kick our looper routine if needed. if notify { select { case o.pch <- struct{}{}: default: } } } // Lock should be held. func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) { // Clustered mode and R>1. if o.node != nil { // Inline for now, use variable compression. var b [4*binary.MaxVarintLen64 + 1]byte b[0] = byte(updateDeliveredOp) n := 1 n += binary.PutUvarint(b[n:], dseq) n += binary.PutUvarint(b[n:], sseq) n += binary.PutUvarint(b[n:], dc) n += binary.PutVarint(b[n:], ts) o.propose(b[:n]) } if o.store != nil { // Update local state always. o.store.UpdateDelivered(dseq, sseq, dc, ts) } // Update activity. o.ldt = time.Now() } // Lock should be held. func (o *consumer) updateAcks(dseq, sseq uint64) { if o.node != nil { // Inline for now, use variable compression. var b [2*binary.MaxVarintLen64 + 1]byte b[0] = byte(updateAcksOp) n := 1 n += binary.PutUvarint(b[n:], dseq) n += binary.PutUvarint(b[n:], sseq) o.propose(b[:n]) } else if o.store != nil { o.store.UpdateAcks(dseq, sseq) } // Update activity. o.lat = time.Now() } // Communicate to the cluster an addition of a pending request. // Lock should be held. func (o *consumer) addClusterPendingRequest(reply string) { if o.node == nil || !o.pendingRequestsOk() { return } b := make([]byte, len(reply)+1) b[0] = byte(addPendingRequest) copy(b[1:], reply) o.propose(b) } // Communicate to the cluster a removal of a pending request. // Lock should be held. func (o *consumer) removeClusterPendingRequest(reply string) { if o.node == nil || !o.pendingRequestsOk() { return } b := make([]byte, len(reply)+1) b[0] = byte(removePendingRequest) copy(b[1:], reply) o.propose(b) } // Set whether or not we can send pending requests to followers. func (o *consumer) setPendingRequestsOk(ok bool) { o.mu.Lock() o.prOk = ok o.mu.Unlock() } // Lock should be held. func (o *consumer) pendingRequestsOk() bool { return o.prOk } // Set whether or not we can send info about pending pull requests to our group. // Will require all peers have a minimum version. func (o *consumer) checkAndSetPendingRequestsOk() { o.mu.RLock() s, isValid := o.srv, o.mset != nil o.mu.RUnlock() if !isValid { return } if ca := o.consumerAssignment(); ca != nil && len(ca.Group.Peers) > 1 { for _, pn := range ca.Group.Peers { if si, ok := s.nodeToInfo.Load(pn); ok { if !versionAtLeast(si.(nodeInfo).version, 2, 7, 1) { // We expect all of our peers to eventually be up to date. // So check again in awhile. time.AfterFunc(eventsHBInterval, func() { o.checkAndSetPendingRequestsOk() }) o.setPendingRequestsOk(false) return } } } } o.setPendingRequestsOk(true) } // On leadership change make sure we alert the pending requests that they are no longer valid. func (o *consumer) checkPendingRequests() { o.mu.Lock() defer o.mu.Unlock() if o.mset == nil || o.outq == nil { return } hdr := []byte("NATS/1.0 409 Leadership Change\r\n\r\n") for reply := range o.prm { o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) } o.prm = nil } // This will release any pending pull requests if applicable. // Should be called only by the leader being deleted or stopped. // Lock should be held. func (o *consumer) releaseAnyPendingRequests() { if o.mset == nil || o.outq == nil || o.waiting.len() == 0 { return } hdr := []byte("NATS/1.0 409 Consumer Deleted\r\n\r\n") wq := o.waiting o.waiting = nil for i, rp := 0, wq.rp; i < wq.n; i++ { if wr := wq.reqs[rp]; wr != nil { o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) wr.recycle() } rp = (rp + 1) % cap(wq.reqs) } } // Process a NAK. func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { o.mu.Lock() defer o.mu.Unlock() // Check for out of range. if dseq <= o.adflr || dseq > o.dseq { return } // If we are explicit ack make sure this is still on our pending list. if _, ok := o.pending[sseq]; !ok { return } // Deliver an advisory e := JSConsumerDeliveryNakAdvisory{ TypedEvent: TypedEvent{ Type: JSConsumerDeliveryNakAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: o.stream, Consumer: o.name, ConsumerSeq: dseq, StreamSeq: sseq, Deliveries: dc, Domain: o.srv.getOpts().JetStreamDomain, } j, err := json.Marshal(e) if err != nil { return } o.sendAdvisory(o.nakEventT, j) // Check to see if we have delays attached. if len(nak) > len(AckNak) { arg := bytes.TrimSpace(nak[len(AckNak):]) if len(arg) > 0 { var d time.Duration var err error if arg[0] == '{' { var nd ConsumerNakOptions if err = json.Unmarshal(arg, &nd); err == nil { d = nd.Delay } } else { d, err = time.ParseDuration(string(arg)) } if err != nil { // Treat this as normal NAK. o.srv.Warnf("JetStream consumer '%s > %s > %s' bad NAK delay value: %q", o.acc.Name, o.stream, o.name, arg) } else { // We have a parsed duration that the user wants us to wait before retrying. // Make sure we are not on the rdq. o.removeFromRedeliverQueue(sseq) if p, ok := o.pending[sseq]; ok { // now - ackWait is expired now, so offset from there. p.Timestamp = time.Now().Add(-o.cfg.AckWait).Add(d).UnixNano() // Update store system which will update followers as well. o.updateDelivered(p.Sequence, sseq, dc, p.Timestamp) if o.ptmr != nil { // Want checkPending to run and figure out the next timer ttl. // TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire. o.ptmr.Reset(10 * time.Millisecond) } } // Nothing else for use to do now so return. return } } } // If already queued up also ignore. if !o.onRedeliverQueue(sseq) { o.addToRedeliverQueue(sseq) } o.signalNewMessages() } // Process a TERM func (o *consumer) processTerm(sseq, dseq, dc uint64) { // Treat like an ack to suppress redelivery. o.processAckMsg(sseq, dseq, dc, false) o.mu.Lock() defer o.mu.Unlock() // Deliver an advisory e := JSConsumerDeliveryTerminatedAdvisory{ TypedEvent: TypedEvent{ Type: JSConsumerDeliveryTerminatedAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: o.stream, Consumer: o.name, ConsumerSeq: dseq, StreamSeq: sseq, Deliveries: dc, Domain: o.srv.getOpts().JetStreamDomain, } j, err := json.Marshal(e) if err != nil { return } subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name o.sendAdvisory(subj, j) } // Introduce a small delay in when timer fires to check pending. // Allows bursts to be treated in same time frame. const ackWaitDelay = time.Millisecond // ackWait returns how long to wait to fire the pending timer. func (o *consumer) ackWait(next time.Duration) time.Duration { if next > 0 { return next + ackWaitDelay } return o.cfg.AckWait + ackWaitDelay } // Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check. // Lock should be held. func (o *consumer) checkRedelivered(slseq uint64) { var lseq uint64 if mset := o.mset; mset != nil { lseq = slseq } var shouldUpdateState bool for sseq := range o.rdc { if sseq < o.asflr || (lseq > 0 && sseq > lseq) { delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) shouldUpdateState = true } } if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err) } } } // This will restore the state from disk. // Lock should be held. func (o *consumer) readStoredState(slseq uint64) error { if o.store == nil { return nil } state, err := o.store.State() if err == nil { o.applyState(state) if len(o.rdc) > 0 { o.checkRedelivered(slseq) } } return err } // Apply the consumer stored state. // Lock should be held. func (o *consumer) applyState(state *ConsumerState) { if state == nil { return } // If o.sseq is greater don't update. Don't go backwards on o.sseq. if o.sseq <= state.Delivered.Stream { o.sseq = state.Delivered.Stream + 1 } o.dseq = state.Delivered.Consumer + 1 o.adflr = state.AckFloor.Consumer o.asflr = state.AckFloor.Stream o.pending = state.Pending o.rdc = state.Redelivered // Setup tracking timer if we have restored pending. if len(o.pending) > 0 { // This is on startup or leader change. We want to check pending // sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond // If normal is lower than this just use that. if o.cfg.AckWait < delay { delay = o.ackWait(0) } if o.ptmr == nil { o.ptmr = time.AfterFunc(delay, o.checkPending) } else { o.ptmr.Reset(delay) } } } // Sets our store state from another source. Used in clustered mode on snapshot restore. // Lock should be held. func (o *consumer) setStoreState(state *ConsumerState) error { if state == nil || o.store == nil { return nil } o.applyState(state) return o.store.Update(state) } // Update our state to the store. func (o *consumer) writeStoreState() error { o.mu.Lock() defer o.mu.Unlock() return o.writeStoreStateUnlocked() } // Update our state to the store. // Lock should be held. func (o *consumer) writeStoreStateUnlocked() error { if o.store == nil { return nil } state := ConsumerState{ Delivered: SequencePair{ Consumer: o.dseq - 1, Stream: o.sseq - 1, }, AckFloor: SequencePair{ Consumer: o.adflr, Stream: o.asflr, }, Pending: o.pending, Redelivered: o.rdc, } return o.store.Update(&state) } // Returns an initial info. Only applicable for non-clustered consumers. // We will clear after we return it, so one shot. func (o *consumer) initialInfo() *ConsumerInfo { o.mu.Lock() ici := o.ici o.ici = nil // gc friendly o.mu.Unlock() if ici == nil { ici = o.info() } return ici } // Clears our initial info. // Used when we have a leader change in cluster mode but do not send a response. func (o *consumer) clearInitialInfo() { o.mu.Lock() o.ici = nil // gc friendly o.mu.Unlock() } // Info returns our current consumer state. func (o *consumer) info() *ConsumerInfo { return o.infoWithSnap(false) } func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo { return o.infoWithSnapAndReply(snap, _EMPTY_) } func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { o.mu.Lock() mset := o.mset if o.closed || mset == nil || mset.srv == nil { o.mu.Unlock() return nil } js := o.js if js == nil { o.mu.Unlock() return nil } // Capture raftGroup. var rg *raftGroup if o.ca != nil { rg = o.ca.Group } cfg := o.cfg info := &ConsumerInfo{ Stream: o.stream, Name: o.name, Created: o.created, Config: &cfg, Delivered: SequenceInfo{ Consumer: o.dseq - 1, Stream: o.sseq - 1, }, AckFloor: SequenceInfo{ Consumer: o.adflr, Stream: o.asflr, }, NumAckPending: len(o.pending), NumRedelivered: len(o.rdc), NumPending: o.checkNumPending(), PushBound: o.isPushMode() && o.active, TimeStamp: time.Now().UTC(), } // If we are replicated and we are not the leader we need to pull certain data from our store. if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil { state, _ := o.store.BorrowState() info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream info.NumAckPending = len(state.Pending) info.NumRedelivered = len(state.Redelivered) } // Adjust active based on non-zero etc. Also make UTC here. if !o.ldt.IsZero() { ldt := o.ldt.UTC() // This copies as well. info.Delivered.Last = &ldt } if !o.lat.IsZero() { lat := o.lat.UTC() // This copies as well. info.AckFloor.Last = &lat } // If we are a pull mode consumer, report on number of waiting requests. if o.isPullMode() { o.processWaiting(false) info.NumWaiting = o.waiting.len() } // If we were asked to snapshot do so here. if snap { o.ici = info } sysc := o.sysc o.mu.Unlock() // Do cluster. if rg != nil { info.Cluster = js.clusterInfo(rg) } // If we have a reply subject send the response here. if reply != _EMPTY_ && sysc != nil { sysc.sendInternalMsg(reply, _EMPTY_, nil, info) } return info } // Will signal us that new messages are available. Will break out of waiting. func (o *consumer) signalNewMessages() { // Kick our new message channel select { case o.mch <- struct{}{}: default: } } // shouldSample lets us know if we are sampling metrics on acks. func (o *consumer) shouldSample() bool { switch { case o.sfreq <= 0: return false case o.sfreq >= 100: return true } // TODO(ripienaar) this is a tad slow so we need to rethink here, however this will only // hit for those with sampling enabled and its not the default return rand.Int31n(100) <= o.sfreq } func (o *consumer) sampleAck(sseq, dseq, dc uint64) { if !o.shouldSample() { return } now := time.Now().UTC() unow := now.UnixNano() e := JSConsumerAckMetric{ TypedEvent: TypedEvent{ Type: JSConsumerAckMetricType, ID: nuid.Next(), Time: now, }, Stream: o.stream, Consumer: o.name, ConsumerSeq: dseq, StreamSeq: sseq, Delay: unow - o.pending[sseq].Timestamp, Deliveries: dc, Domain: o.srv.getOpts().JetStreamDomain, } j, err := json.Marshal(e) if err != nil { return } o.sendAdvisory(o.ackEventT, j) } func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { o.mu.Lock() if o.closed { o.mu.Unlock() return } var sagap uint64 var needSignal bool switch o.cfg.AckPolicy { case AckExplicit: if p, ok := o.pending[sseq]; ok { if doSample { o.sampleAck(sseq, dseq, dc) } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true } delete(o.pending, sseq) // Use the original deliver sequence from our pending record. dseq = p.Sequence } if len(o.pending) == 0 { o.adflr, o.asflr = o.dseq-1, o.sseq-1 } else if dseq == o.adflr+1 { o.adflr, o.asflr = dseq, sseq for ss := sseq + 1; ss < o.sseq; ss++ { if p, ok := o.pending[ss]; ok { if p.Sequence > 0 { o.adflr, o.asflr = p.Sequence-1, ss-1 } break } } } // We do these regardless. delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) case AckAll: // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() return } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true } sagap = sseq - o.asflr o.adflr, o.asflr = dseq, sseq for seq := sseq; seq > sseq-sagap; seq-- { delete(o.pending, seq) delete(o.rdc, seq) o.removeFromRedeliverQueue(seq) } case AckNone: // FIXME(dlc) - This is error but do we care? o.mu.Unlock() return } // Update underlying store. o.updateAcks(dseq, sseq) mset := o.mset clustered := o.node != nil // In case retention changes for a stream, this ought to have been updated // using the consumer lock to avoid a race. retention := o.retention o.mu.Unlock() // Let the owning stream know if we are interest or workqueue retention based. // If this consumer is clustered this will be handled by processReplicatedAck // after the ack has propagated. if !clustered && mset != nil && retention != LimitsPolicy { if sagap > 1 { // FIXME(dlc) - This is very inefficient, will need to fix. for seq := sseq; seq > sseq-sagap; seq-- { mset.ackMsg(o, seq) } } else { mset.ackMsg(o, sseq) } } // If we had max ack pending set and were at limit we need to unblock ourselves. if needSignal { o.signalNewMessages() } } // Determine if this is a truly filtered consumer. Modern clients will place filtered subjects // even if the stream only has a single non-wildcard subject designation. // Read lock should be held. func (o *consumer) isFiltered() bool { if o.subjf == nil { return false } // If we are here we want to check if the filtered subject is // a direct match for our only listed subject. mset := o.mset if mset == nil { return true } // `isFiltered` need to be performant, so we do // as any checks as possible to avoid unnecessary work. // Here we avoid iteration over slices if there is only one subject in stream // and one filter for the consumer. if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 { return mset.cfg.Subjects[0] != o.subjf[0].subject } // if the list is not equal length, we can return early, as this is filtered. if len(mset.cfg.Subjects) != len(o.subjf) { return true } // if in rare case scenario that user passed all stream subjects as consumer filters, // we need to do a more expensive operation. // reflect.DeepEqual would return false if the filters are the same, but in different order // so it can't be used here. cfilters := make(map[string]struct{}, len(o.subjf)) for _, val := range o.subjf { cfilters[val.subject] = struct{}{} } for _, val := range mset.cfg.Subjects { if _, ok := cfilters[val]; !ok { return true } } return false } // Check if we need an ack for this store seq. // This is called for interest based retention streams to remove messages. func (o *consumer) needAck(sseq uint64, subj string) bool { var needAck bool var asflr, osseq uint64 var pending map[uint64]*Pending o.mu.RLock() defer o.mu.RUnlock() isFiltered := o.isFiltered() if isFiltered && o.mset == nil { return false } // Check if we are filtered, and if so check if this is even applicable to us. if isFiltered { if subj == _EMPTY_ { var svp StoreMsg if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { return false } subj = svp.subj } if !o.isFilteredMatch(subj) { return false } } if o.isLeader() { asflr, osseq = o.asflr, o.sseq pending = o.pending } else { if o.store == nil { return false } state, err := o.store.BorrowState() if err != nil || state == nil { // Fall back to what we track internally for now. return sseq > o.asflr && !o.isFiltered() } // If loading state as here, the osseq is +1. asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending } switch o.cfg.AckPolicy { case AckNone, AckAll: needAck = sseq > asflr case AckExplicit: if sseq > asflr { if sseq >= osseq { needAck = true } else { _, needAck = pending[sseq] } } } return needAck } // Helper for the next message requests. func nextReqFromMsg(msg []byte) (time.Time, int, int, bool, time.Duration, time.Time, error) { req := bytes.TrimSpace(msg) switch { case len(req) == 0: return time.Time{}, 1, 0, false, 0, time.Time{}, nil case req[0] == '{': var cr JSApiConsumerGetNextRequest if err := json.Unmarshal(req, &cr); err != nil { return time.Time{}, -1, 0, false, 0, time.Time{}, err } var hbt time.Time if cr.Heartbeat > 0 { if cr.Heartbeat*2 > cr.Expires { return time.Time{}, 1, 0, false, 0, time.Time{}, errors.New("heartbeat value too large") } hbt = time.Now().Add(cr.Heartbeat) } if cr.Expires == time.Duration(0) { return time.Time{}, cr.Batch, cr.MaxBytes, cr.NoWait, cr.Heartbeat, hbt, nil } return time.Now().Add(cr.Expires), cr.Batch, cr.MaxBytes, cr.NoWait, cr.Heartbeat, hbt, nil default: if n, err := strconv.Atoi(string(req)); err == nil { return time.Time{}, n, 0, false, 0, time.Time{}, nil } } return time.Time{}, 1, 0, false, 0, time.Time{}, nil } // Represents a request that is on the internal waiting queue type waitingRequest struct { acc *Account interest string reply string n int // For batching d int b int // For max bytes tracking. expires time.Time received time.Time hb time.Duration hbt time.Time noWait bool } // sync.Pool for waiting requests. var wrPool = sync.Pool{ New: func() interface{} { return new(waitingRequest) }, } // Recycle this request. This request can not be accessed after this call. func (wr *waitingRequest) recycleIfDone() bool { if wr != nil && wr.n <= 0 { wr.recycle() return true } return false } // Force a recycle. func (wr *waitingRequest) recycle() { if wr != nil { wr.acc, wr.interest, wr.reply = nil, _EMPTY_, _EMPTY_ wrPool.Put(wr) } } // waiting queue for requests that are waiting for new messages to arrive. type waitQueue struct { rp, wp, n int last time.Time reqs []*waitingRequest } // Create a new ring buffer with at most max items. func newWaitQueue(max int) *waitQueue { return &waitQueue{rp: -1, reqs: make([]*waitingRequest, max)} } var ( errWaitQueueFull = errors.New("wait queue is full") errWaitQueueNil = errors.New("wait queue is nil") ) // Adds in a new request. func (wq *waitQueue) add(wr *waitingRequest) error { if wq == nil { return errWaitQueueNil } if wq.isFull() { return errWaitQueueFull } wq.reqs[wq.wp] = wr // TODO(dlc) - Could make pow2 and get rid of mod. wq.wp = (wq.wp + 1) % cap(wq.reqs) // Adjust read pointer if we were empty. if wq.rp < 0 { wq.rp = 0 } // Track last active via when we receive a request. wq.last = wr.received wq.n++ return nil } func (wq *waitQueue) isFull() bool { return wq.n == cap(wq.reqs) } func (wq *waitQueue) isEmpty() bool { return wq.len() == 0 } func (wq *waitQueue) len() int { if wq == nil { return 0 } return wq.n } // Peek will return the next request waiting or nil if empty. func (wq *waitQueue) peek() *waitingRequest { if wq == nil { return nil } var wr *waitingRequest if wq.rp >= 0 { wr = wq.reqs[wq.rp] } return wr } // pop will return the next request and move the read cursor. // This will now place a request that still has pending items at the ends of the list. func (wq *waitQueue) pop() *waitingRequest { wr := wq.peek() if wr != nil { wr.d++ wr.n-- // Always remove current now on a pop, and move to end if still valid. // If we were the only one don't need to remove since this can be a no-op. if wr.n > 0 && wq.n > 1 { wq.removeCurrent() wq.add(wr) } else if wr.n <= 0 { wq.removeCurrent() } } return wr } // Removes the current read pointer (head FIFO) entry. func (wq *waitQueue) removeCurrent() { if wq.rp < 0 { return } wq.reqs[wq.rp] = nil wq.rp = (wq.rp + 1) % cap(wq.reqs) wq.n-- // Check if we are empty. if wq.n == 0 { wq.rp, wq.wp = -1, 0 } } // Will compact when we have interior deletes. func (wq *waitQueue) compact() { if wq.isEmpty() { return } nreqs, i := make([]*waitingRequest, cap(wq.reqs)), 0 for j, rp := 0, wq.rp; j < wq.n; j++ { if wr := wq.reqs[rp]; wr != nil { nreqs[i] = wr i++ } rp = (rp + 1) % cap(wq.reqs) } // Reset here. wq.rp, wq.wp, wq.n, wq.reqs = 0, i, i, nreqs } // Return the map of pending requests keyed by the reply subject. // No-op if push consumer or invalid etc. func (o *consumer) pendingRequests() map[string]*waitingRequest { if o.waiting == nil { return nil } wq, m := o.waiting, make(map[string]*waitingRequest) for i, rp := 0, wq.rp; i < wq.n; i++ { if wr := wq.reqs[rp]; wr != nil { m[wr.reply] = wr } rp = (rp + 1) % cap(wq.reqs) } return m } // Return next waiting request. This will check for expirations but not noWait or interest. // That will be handled by processWaiting. // Lock should be held. func (o *consumer) nextWaiting(sz int) *waitingRequest { if o.waiting == nil || o.waiting.isEmpty() { return nil } for wr := o.waiting.peek(); !o.waiting.isEmpty(); wr = o.waiting.peek() { if wr == nil { break } // Check if we have max bytes set. if wr.b > 0 { if sz <= wr.b { wr.b -= sz // If we are right now at zero, set batch to 1 to deliver this one but stop after. if wr.b == 0 { wr.n = 1 } } else { // Since we can't send that message to the requestor, we need to // notify that we are closing the request. const maxBytesT = "NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n" hdr := []byte(fmt.Sprintf(maxBytesT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) // Remove the current one, no longer valid due to max bytes limit. o.waiting.removeCurrent() if o.node != nil { o.removeClusterPendingRequest(wr.reply) } wr.recycle() continue } } if wr.expires.IsZero() || time.Now().Before(wr.expires) { rr := wr.acc.sl.Match(wr.interest) if len(rr.psubs)+len(rr.qsubs) > 0 { return o.waiting.pop() } else if time.Since(wr.received) < defaultGatewayRecentSubExpiration && (o.srv.leafNodeEnabled || o.srv.gateway.enabled) { return o.waiting.pop() } else if o.srv.gateway.enabled && o.srv.hasGatewayInterest(wr.acc.Name, wr.interest) { return o.waiting.pop() } } else { // We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there. hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) o.waiting.removeCurrent() if o.node != nil { o.removeClusterPendingRequest(wr.reply) } wr.recycle() continue } if wr.interest != wr.reply { const intExpT = "NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n" hdr := []byte(fmt.Sprintf(intExpT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) } // Remove the current one, no longer valid. o.waiting.removeCurrent() if o.node != nil { o.removeClusterPendingRequest(wr.reply) } wr.recycle() } return nil } // Next message request. type nextMsgReq struct { reply string msg []byte } var nextMsgReqPool sync.Pool func newNextMsgReq(reply string, msg []byte) *nextMsgReq { var nmr *nextMsgReq m := nextMsgReqPool.Get() if m != nil { nmr = m.(*nextMsgReq) } else { nmr = &nextMsgReq{} } // When getting something from a pool it is critical that all fields are // initialized. Doing this way guarantees that if someone adds a field to // the structure, the compiler will fail the build if this line is not updated. (*nmr) = nextMsgReq{reply, msg} return nmr } func (nmr *nextMsgReq) returnToPool() { if nmr == nil { return } nmr.reply, nmr.msg = _EMPTY_, nil nextMsgReqPool.Put(nmr) } // processNextMsgReq will process a request for the next message available. A nil message payload means deliver // a single message. If the payload is a formal request or a number parseable with Atoi(), then we will send a // batch of messages without requiring another request to this endpoint, or an ACK. func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, reply string, msg []byte) { if reply == _EMPTY_ { return } // Short circuit error here. if o.nextMsgReqs == nil { hdr := []byte("NATS/1.0 409 Consumer is push based\r\n\r\n") o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) return } _, msg = c.msgParts(msg) o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg))) } func (o *consumer) processNextMsgRequest(reply string, msg []byte) { o.mu.Lock() defer o.mu.Unlock() mset := o.mset if mset == nil { return } sendErr := func(status int, description string) { hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description)) o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) } if o.isPushMode() || o.waiting == nil { sendErr(409, "Consumer is push based") return } // Check payload here to see if they sent in batch size or a formal request. expires, batchSize, maxBytes, noWait, hb, hbt, err := nextReqFromMsg(msg) if err != nil { sendErr(400, fmt.Sprintf("Bad Request - %v", err)) return } // Check for request limits if o.cfg.MaxRequestBatch > 0 && batchSize > o.cfg.MaxRequestBatch { sendErr(409, fmt.Sprintf("Exceeded MaxRequestBatch of %d", o.cfg.MaxRequestBatch)) return } if !expires.IsZero() && o.cfg.MaxRequestExpires > 0 && expires.After(time.Now().Add(o.cfg.MaxRequestExpires)) { sendErr(409, fmt.Sprintf("Exceeded MaxRequestExpires of %v", o.cfg.MaxRequestExpires)) return } if maxBytes > 0 && o.cfg.MaxRequestMaxBytes > 0 && maxBytes > o.cfg.MaxRequestMaxBytes { sendErr(409, fmt.Sprintf("Exceeded MaxRequestMaxBytes of %v", o.cfg.MaxRequestMaxBytes)) return } // If we have the max number of requests already pending try to expire. if o.waiting.isFull() { // Try to expire some of the requests. o.processWaiting(false) } // If the request is for noWait and we have pending requests already, check if we have room. if noWait { msgsPending := o.numPending() + uint64(len(o.rdq)) // If no pending at all, decide what to do with request. // If no expires was set then fail. if msgsPending == 0 && expires.IsZero() { o.waiting.last = time.Now() sendErr(404, "No Messages") return } if msgsPending > 0 { _, _, batchPending, _ := o.processWaiting(false) if msgsPending < uint64(batchPending) { o.waiting.last = time.Now() sendErr(408, "Requests Pending") return } } // If we are here this should be considered a one-shot situation. // We will wait for expires but will return as soon as we have any messages. } // If we receive this request though an account export, we need to track that interest subject and account. acc, interest := trackDownAccountAndInterest(o.acc, reply) // Create a waiting request. wr := wrPool.Get().(*waitingRequest) wr.acc, wr.interest, wr.reply, wr.n, wr.d, wr.noWait, wr.expires, wr.hb, wr.hbt = acc, interest, reply, batchSize, 0, noWait, expires, hb, hbt wr.b = maxBytes wr.received = time.Now() if err := o.waiting.add(wr); err != nil { sendErr(409, "Exceeded MaxWaiting") return } o.signalNewMessages() // If we are clustered update our followers about this request. if o.node != nil { o.addClusterPendingRequest(wr.reply) } } func trackDownAccountAndInterest(acc *Account, interest string) (*Account, string) { for strings.HasPrefix(interest, replyPrefix) { oa := acc oa.mu.RLock() if oa.exports.responses == nil { oa.mu.RUnlock() break } si := oa.exports.responses[interest] if si == nil { oa.mu.RUnlock() break } acc, interest = si.acc, si.to oa.mu.RUnlock() } return acc, interest } // Increase the delivery count for this message. // ONLY used on redelivery semantics. // Lock should be held. func (o *consumer) incDeliveryCount(sseq uint64) uint64 { if o.rdc == nil { o.rdc = make(map[uint64]uint64) } o.rdc[sseq] += 1 return o.rdc[sseq] + 1 } // Used if we have to adjust on failed delivery or bad lookups. // Those failed attempts should not increase deliver count. // Lock should be held. func (o *consumer) decDeliveryCount(sseq uint64) { if o.rdc == nil { return } if dc, ok := o.rdc[sseq]; ok { if dc == 1 { delete(o.rdc, sseq) } else { o.rdc[sseq] -= 1 } } } // send a delivery exceeded advisory. func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) { e := JSConsumerDeliveryExceededAdvisory{ TypedEvent: TypedEvent{ Type: JSConsumerDeliveryExceededAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: o.stream, Consumer: o.name, StreamSeq: sseq, Deliveries: dc, Domain: o.srv.getOpts().JetStreamDomain, } j, err := json.Marshal(e) if err != nil { return } o.sendAdvisory(o.deliveryExcEventT, j) } // Check to see if the candidate subject matches a filter if its present. // Lock should be held. func (o *consumer) isFilteredMatch(subj string) bool { // No filter is automatic match. if o.subjf == nil { return true } for _, filter := range o.subjf { if !filter.hasWildcard && subj == filter.subject { return true } } // It's quicker to first check for non-wildcard filters, then // iterate again to check for subset match. // TODO(dlc) at speed might be better to just do a sublist with L2 and/or possibly L1. for _, filter := range o.subjf { if subjectIsSubsetMatch(subj, filter.subject) { return true } } return false } var ( errMaxAckPending = errors.New("max ack pending reached") errBadConsumer = errors.New("consumer not valid") errNoInterest = errors.New("consumer requires interest for delivery subject when ephemeral") ) // Get next available message from underlying store. // Is partition aware and redeliver aware. // Lock should be held. func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { if o.mset == nil || o.mset.store == nil { return nil, 0, errBadConsumer } // Process redelivered messages before looking at possibly "skip list" (deliver last per subject) if o.hasRedeliveries() { var seq, dc uint64 for seq = o.getNextToRedeliver(); seq > 0; seq = o.getNextToRedeliver() { dc = o.incDeliveryCount(seq) if o.maxdc > 0 && dc > o.maxdc { // Only send once if dc == o.maxdc+1 { o.notifyDeliveryExceeded(seq, dc-1) } // Make sure to remove from pending. if p, ok := o.pending[seq]; ok && p != nil { delete(o.pending, seq) o.updateDelivered(p.Sequence, seq, dc, p.Timestamp) } continue } if seq > 0 { pmsg := getJSPubMsgFromPool() sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) if sm == nil || err != nil { pmsg.returnToPool() pmsg, dc = nil, 0 // Adjust back deliver count. o.decDeliveryCount(seq) } return pmsg, dc, err } } } // Check if we have max pending. if o.maxp > 0 && len(o.pending) >= o.maxp { // maxp only set when ack policy != AckNone and user set MaxAckPending // Stall if we have hit max pending. return nil, 0, errMaxAckPending } if o.hasSkipListPending() { seq := o.lss.seqs[0] if len(o.lss.seqs) == 1 { o.sseq = o.lss.resume o.lss = nil o.updateSkipped(o.sseq) } else { o.lss.seqs = o.lss.seqs[1:] } pmsg := getJSPubMsgFromPool() sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) if sm == nil || err != nil { pmsg.returnToPool() } o.sseq++ return pmsg, 1, err } // Hold onto this since we release the lock. store := o.mset.store // If no filters are specified, optimize to fetch just non-filtered messages. if len(o.subjf) == 0 { // Grab next message applicable to us. // We will unlock here in case lots of contention, e.g. WQ. o.mu.Unlock() pmsg := getJSPubMsgFromPool() sm, sseq, err := store.LoadNextMsg(_EMPTY_, false, o.sseq, &pmsg.StoreMsg) if sm == nil { pmsg.returnToPool() pmsg = nil } o.mu.Lock() if sseq >= o.sseq { o.sseq = sseq + 1 if err == ErrStoreEOF { o.updateSkipped(o.sseq) } } return pmsg, 1, err } // if we have filters, iterate over filters and optimize by buffering found messages. for _, filter := range o.subjf { if filter.nextSeq < o.sseq { // o.subjf should always point to the right starting point for reading messages // if anything modified it, make sure our sequence do not start earlier. filter.nextSeq = o.sseq } // if this subject didn't fetch any message before, do it now if filter.pmsg == nil { // We will unlock here in case lots of contention, e.g. WQ. filterSubject, filterWC, nextSeq := filter.subject, filter.hasWildcard, filter.nextSeq o.mu.Unlock() pmsg := getJSPubMsgFromPool() sm, sseq, err := store.LoadNextMsg(filterSubject, filterWC, nextSeq, &pmsg.StoreMsg) o.mu.Lock() filter.err = err if sm != nil { filter.pmsg = pmsg } else { pmsg.returnToPool() pmsg = nil } if sseq >= filter.nextSeq { filter.nextSeq = sseq + 1 } // If we're sure that this filter has continuous sequence of messages, skip looking up other filters. if nextSeq == sseq && err != ErrStoreEOF { break } } } // Don't sort the o.subjf if it's only one entry // Sort uses `reflect` and can noticeably slow down fetching, // even if len == 0 or 1. // TODO(tp): we should have sort based off generics for server // to avoid reflection. if len(o.subjf) > 1 { sort.Slice(o.subjf, func(i, j int) bool { if o.subjf[j].pmsg != nil && o.subjf[i].pmsg == nil { return false } if o.subjf[i].pmsg != nil && o.subjf[j].pmsg == nil { return true } return o.subjf[j].nextSeq > o.subjf[i].nextSeq }) } // Grab next message applicable to us. // Sort sequences first, to grab the first message. filter := o.subjf[0] err := filter.err // This means we got a message in this subject fetched. if filter.pmsg != nil { filter.currentSeq = filter.nextSeq o.sseq = filter.currentSeq returned := filter.pmsg filter.pmsg = nil return returned, 1, err } if err == ErrStoreEOF { o.updateSkipped(filter.nextSeq) } // set o.sseq to the first subject sequence if filter.nextSeq > o.sseq { o.sseq = filter.nextSeq } return nil, 0, err } // Will check for expiration and lack of interest on waiting requests. // Will also do any heartbeats and return the next expiration or HB interval. func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { var fexp time.Time if o.srv == nil || o.waiting.isEmpty() { return 0, 0, 0, fexp } var expired, brp int s, now := o.srv, time.Now() // Signals interior deletes, which we will compact if needed. var hid bool remove := func(wr *waitingRequest, i int) { if i == o.waiting.rp { o.waiting.removeCurrent() } else { o.waiting.reqs[i] = nil hid = true } if o.node != nil { o.removeClusterPendingRequest(wr.reply) } expired++ wr.recycle() } wq := o.waiting for i, rp, n := 0, wq.rp, wq.n; i < n; rp = (rp + 1) % cap(wq.reqs) { wr := wq.reqs[rp] // Check expiration. if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) remove(wr, rp) i++ continue } // Now check interest. rr := wr.acc.sl.Match(wr.interest) interest := len(rr.psubs)+len(rr.qsubs) > 0 if !interest && (s.leafNodeEnabled || s.gateway.enabled) { // If we are here check on gateways and leaf nodes (as they can mask gateways on the other end). // If we have interest or the request is too young break and do not expire. if time.Since(wr.received) < defaultGatewayRecentSubExpiration { interest = true } else if s.gateway.enabled && s.hasGatewayInterest(wr.acc.Name, wr.interest) { interest = true } } // If interest, update batch pending requests counter and update fexp timer. if interest { brp += wr.n if !wr.hbt.IsZero() { if now.After(wr.hbt) { // Fire off a heartbeat here. o.sendIdleHeartbeat(wr.reply) // Update next HB. wr.hbt = now.Add(wr.hb) } if fexp.IsZero() || wr.hbt.Before(fexp) { fexp = wr.hbt } } if !wr.expires.IsZero() && (fexp.IsZero() || wr.expires.Before(fexp)) { fexp = wr.expires } i++ continue } // No more interest here so go ahead and remove this one from our list. remove(wr, rp) i++ } // If we have interior deletes from out of order invalidation, compact the waiting queue. if hid { o.waiting.compact() } return expired, wq.len(), brp, fexp } // Will check to make sure those waiting still have registered interest. func (o *consumer) checkWaitingForInterest() bool { o.processWaiting(true) return o.waiting.len() > 0 } // Lock should be held. func (o *consumer) hbTimer() (time.Duration, *time.Timer) { if o.cfg.Heartbeat == 0 { return 0, nil } return o.cfg.Heartbeat, time.NewTimer(o.cfg.Heartbeat) } // Check here for conditions when our ack floor may have drifted below the streams first sequence. // In general this is accounted for in normal operations, but if the consumer misses the signal from // the stream it will not clear the message and move the ack state. // Should only be called from consumer leader. func (o *consumer) checkAckFloor() { o.mu.RLock() mset, closed, asflr, numPending := o.mset, o.closed, o.asflr, len(o.pending) o.mu.RUnlock() if asflr == 0 || closed || mset == nil { return } var ss StreamState mset.store.FastState(&ss) // If our floor is equal or greater that is normal and nothing for us to do. if ss.FirstSeq == 0 || asflr >= ss.FirstSeq-1 { return } // Check which linear space is less to walk. if ss.FirstSeq-asflr-1 < uint64(numPending) { // Process all messages that no longer exist. for seq := asflr + 1; seq < ss.FirstSeq; seq++ { // Check if this message was pending. o.mu.RLock() p, isPending := o.pending[seq] var rdc uint64 = 1 if o.rdc != nil { rdc = o.rdc[seq] } o.mu.RUnlock() // If it was pending for us, get rid of it. if isPending { o.processTerm(seq, p.Sequence, rdc) } } } else if numPending > 0 { // here it is shorter to walk pending. // toTerm is seq, dseq, rcd for each entry. toTerm := make([]uint64, 0, numPending*3) o.mu.RLock() for seq, p := range o.pending { if seq < ss.FirstSeq { var dseq uint64 = 1 if p != nil { dseq = p.Sequence } var rdc uint64 = 1 if o.rdc != nil { rdc = o.rdc[seq] } toTerm = append(toTerm, seq, dseq, rdc) } } o.mu.RUnlock() for i := 0; i < len(toTerm); i += 3 { seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2] o.processTerm(seq, dseq, rdc) } } // Do one final check here. o.mu.Lock() defer o.mu.Unlock() // If we are here, and this should be rare, we still are off with our ack floor. // We will set it explicitly to 1 behind our current lowest in pending, or if // pending is empty, to our current delivered -1. if o.asflr < ss.FirstSeq-1 { var psseq, pdseq uint64 for seq, p := range o.pending { if psseq == 0 || seq < psseq { psseq, pdseq = seq, p.Sequence } } // If we still have none, set to current delivered -1. if psseq == 0 { psseq, pdseq = o.sseq-1, o.dseq-1 // If still not adjusted. if psseq < ss.FirstSeq-1 { psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1 } } o.asflr, o.adflr = psseq, pdseq } } func (o *consumer) processInboundAcks(qch chan struct{}) { // Grab the server lock to watch for server quit. o.mu.RLock() s, mset := o.srv, o.mset hasInactiveThresh := o.cfg.InactiveThreshold > 0 o.mu.RUnlock() if s == nil || mset == nil { return } // We will check this on entry and periodically. o.checkAckFloor() // How often we will check for ack floor drift. // Spread these out for large numbers on a server restart. delta := time.Duration(rand.Int63n(int64(time.Minute))) var ackFloorCheck = time.Minute + delta for { select { case <-o.ackMsgs.ch: acks := o.ackMsgs.pop() for _, ack := range acks { o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg) ack.returnToPool() } o.ackMsgs.recycle(&acks) // If we have an inactiveThreshold set, mark our activity. if hasInactiveThresh { o.suppressDeletion() } case <-time.After(ackFloorCheck): o.checkAckFloor() case <-qch: return case <-s.quitCh: return } } } // Process inbound next message requests. func (o *consumer) processInboundNextMsgReqs(qch chan struct{}) { // Grab the server lock to watch for server quit. o.mu.RLock() s := o.srv o.mu.RUnlock() for { select { case <-o.nextMsgReqs.ch: reqs := o.nextMsgReqs.pop() for _, req := range reqs { o.processNextMsgRequest(req.reply, req.msg) req.returnToPool() } o.nextMsgReqs.recycle(&reqs) case <-qch: return case <-s.quitCh: return } } } // Suppress auto cleanup on ack activity of any kind. func (o *consumer) suppressDeletion() { o.mu.Lock() defer o.mu.Unlock() if o.closed { return } if o.isPushMode() && o.dtmr != nil { // if dtmr is not nil we have started the countdown, simply reset to threshold. o.dtmr.Reset(o.dthresh) } else if o.isPullMode() && o.waiting != nil { // Pull mode always has timer running, just update last on waiting queue. o.waiting.last = time.Now() } } func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { // On startup check to see if we are in a reply situation where replay policy is not instant. var ( lts int64 // last time stamp seen, used for replay. lseq uint64 ) o.mu.RLock() mset := o.mset getLSeq := o.replay o.mu.RUnlock() // consumer is closed when mset is set to nil. if mset == nil { return } if getLSeq { lseq = mset.state().LastSeq } o.mu.Lock() s := o.srv // need to check again if consumer is closed if o.mset == nil { o.mu.Unlock() return } // For idle heartbeat support. var hbc <-chan time.Time hbd, hb := o.hbTimer() if hb != nil { hbc = hb.C } // Interest changes. inch := o.inch o.mu.Unlock() // Grab the stream's retention policy mset.mu.RLock() rp := mset.cfg.Retention mset.mu.RUnlock() var err error // Deliver all the msgs we have now, once done or on a condition, we wait for new ones. for { var ( pmsg *jsPubMsg dc uint64 dsubj string ackReply string delay time.Duration sz int wrn, wrb int ) o.mu.Lock() // consumer is closed when mset is set to nil. if o.mset == nil { o.mu.Unlock() return } // Clear last error. err = nil // If we are in push mode and not active or under flowcontrol let's stop sending. if o.isPushMode() { if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) { goto waitForMsgs } } else if o.waiting.isEmpty() { // If we are in pull mode and no one is waiting already break and wait. goto waitForMsgs } // Grab our next msg. pmsg, dc, err = o.getNextMsg() // We can release the lock now under getNextMsg so need to check this condition again here. // consumer is closed when mset is set to nil. if o.mset == nil { o.mu.Unlock() return } // On error either wait or return. if err != nil || pmsg == nil { // On EOF we can optionally fast sync num pending state. if err == ErrStoreEOF { o.checkNumPendingOnEOF() } if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending { goto waitForMsgs } else if err == errPartialCache { s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'", o.mset.acc, o.mset.cfg.Name, o.cfg.Name) goto waitForMsgs } else { s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v", o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err) goto waitForMsgs } } // Update our cached num pending here first. if dc == 1 { o.npc-- } // Pre-calculate ackReply ackReply = o.ackReply(pmsg.seq, o.dseq, dc, pmsg.ts, o.numPending()) // If headers only do not send msg payload. // Add in msg size itself as header. if o.cfg.HeadersOnly { convertToHeadersOnly(pmsg) } // Calculate payload size. This can be calculated on client side. // We do not include transport subject here since not generally known on client. sz = len(pmsg.subj) + len(ackReply) + len(pmsg.hdr) + len(pmsg.msg) if o.isPushMode() { dsubj = o.dsubj } else if wr := o.nextWaiting(sz); wr != nil { wrn, wrb = wr.n, wr.b dsubj = wr.reply if done := wr.recycleIfDone(); done && o.node != nil { o.removeClusterPendingRequest(dsubj) } else if !done && wr.hb > 0 { wr.hbt = time.Now().Add(wr.hb) } } else { if o.subjf != nil { for i, filter := range o.subjf { if subjectIsSubsetMatch(pmsg.subj, filter.subject) { o.subjf[i].currentSeq-- o.subjf[i].nextSeq-- break } } } // We will redo this one. o.sseq-- if dc == 1 { o.npc++ } pmsg.returnToPool() goto waitForMsgs } // If we are in a replay scenario and have not caught up check if we need to delay here. if o.replay && lts > 0 { if delay = time.Duration(pmsg.ts - lts); delay > time.Millisecond { o.mu.Unlock() select { case <-qch: pmsg.returnToPool() return case <-time.After(delay): } o.mu.Lock() } } // Track this regardless. lts = pmsg.ts // If we have a rate limit set make sure we check that here. if o.rlimit != nil { now := time.Now() r := o.rlimit.ReserveN(now, sz) delay := r.DelayFrom(now) if delay > 0 { o.mu.Unlock() select { case <-qch: pmsg.returnToPool() return case <-time.After(delay): } o.mu.Lock() } } // Do actual delivery. o.deliverMsg(dsubj, ackReply, pmsg, dc, rp) // If given request fulfilled batch size, but there are still pending bytes, send information about it. if wrn <= 0 && wrb > 0 { o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, []byte(fmt.Sprintf(JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb)), nil, nil, 0)) } // Reset our idle heartbeat timer if set. if hb != nil { hb.Reset(hbd) } o.mu.Unlock() continue waitForMsgs: // If we were in a replay state check to see if we are caught up. If so clear. if o.replay && o.sseq > lseq { o.replay = false } // Make sure to process any expired requests that are pending. var wrExp <-chan time.Time if o.isPullMode() { // Dont expire oneshots if we are here because of max ack pending limit. _, _, _, fexp := o.processWaiting(err != errMaxAckPending) if !fexp.IsZero() { expires := time.Until(fexp) if expires <= 0 { expires = time.Millisecond } wrExp = time.NewTimer(expires).C } } // We will wait here for new messages to arrive. mch, odsubj := o.mch, o.cfg.DeliverSubject o.mu.Unlock() select { case <-mch: // Messages are waiting. case interest := <-inch: // inch can be nil on pull-based, but then this will // just block and not fire. o.updateDeliveryInterest(interest) case <-qch: return case <-wrExp: o.mu.Lock() o.processWaiting(true) o.mu.Unlock() case <-hbc: if o.isActive() { o.mu.RLock() o.sendIdleHeartbeat(odsubj) o.mu.RUnlock() } // Reset our idle heartbeat timer. hb.Reset(hbd) } } } // Lock should be held. func (o *consumer) sendIdleHeartbeat(subj string) { const t = "NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n%s: %d\r\n\r\n" sseq, dseq := o.sseq-1, o.dseq-1 hdr := []byte(fmt.Sprintf(t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq)) if fcp := o.fcid; fcp != _EMPTY_ { // Add in that we are stalled on flow control here. addOn := []byte(fmt.Sprintf("%s: %s\r\n\r\n", JSConsumerStalled, fcp)) hdr = append(hdr[:len(hdr)-LEN_CR_LF], []byte(addOn)...) } o.outq.send(newJSPubMsg(subj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) } func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) string { return fmt.Sprintf(o.ackReplyT, dc, sseq, dseq, ts, pending) } // Used mostly for testing. Sets max pending bytes for flow control setups. func (o *consumer) setMaxPendingBytes(limit int) { o.pblimit = limit o.maxpb = limit / 16 if o.maxpb == 0 { o.maxpb = 1 } } // Does some sanity checks to see if we should re-calculate. // Since there is a race when decrementing when there is contention at the beginning of the stream. // The race is a getNextMsg skips a deleted msg, and then the decStreamPending call fires. // This does some quick sanity checks to see if we should re-calculate num pending. // Lock should be held. func (o *consumer) checkNumPending() uint64 { if o.mset != nil { var state StreamState o.mset.store.FastState(&state) if o.sseq > state.LastSeq && o.npc != 0 || o.npc > int64(state.Msgs) { // Re-calculate. o.streamNumPending() } } return o.numPending() } // Lock should be held. func (o *consumer) numPending() uint64 { if o.npc < 0 { return 0 } return uint64(o.npc) } // This will do a quick sanity check on num pending when we encounter // and EOF in the loop and gather. // Lock should be held. func (o *consumer) checkNumPendingOnEOF() { if o.mset == nil { return } var state StreamState o.mset.store.FastState(&state) if o.sseq > state.LastSeq && o.npc != 0 { // We know here we can reset our running state for num pending. o.npc, o.npf = 0, state.LastSeq } } // Call into streamNumPending after acquiring the consumer lock. func (o *consumer) streamNumPendingLocked() uint64 { o.mu.Lock() defer o.mu.Unlock() return o.streamNumPending() } // Will force a set from the stream store of num pending. // Depends on delivery policy, for last per subject we calculate differently. // Lock should be held. func (o *consumer) streamNumPending() uint64 { if o.mset == nil || o.mset.store == nil { o.npc, o.npf = 0, 0 return 0 } isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject // Deliver Last Per Subject calculates num pending differently. if isLastPerSubject { o.npc, o.npf = 0, 0 // Consumer without filters. if o.subjf == nil { npc, npf := o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) o.npc, o.npf = int64(npc), npf return o.numPending() } // Consumer with filters. for _, filter := range o.subjf { npc, npf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject) o.npc += int64(npc) if npf > o.npf { o.npf = npf // Always last } } return o.numPending() } // Every other Delivery Policy is handled here. // Consumer without filters. if o.subjf == nil { npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) o.npc, o.npf = int64(npc), npf return o.numPending() } // Consumer with filters. o.npc, o.npf = 0, 0 for _, filter := range o.subjf { // We might loose state of o.subjf, so if we do recover from o.sseq if filter.currentSeq < o.sseq { filter.currentSeq = o.sseq } npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject, isLastPerSubject) o.npc += int64(npc) if npf > o.npf { o.npf = npf // Always last } } return o.numPending() } func convertToHeadersOnly(pmsg *jsPubMsg) { // If headers only do not send msg payload. // Add in msg size itself as header. hdr, msg := pmsg.hdr, pmsg.msg var bb bytes.Buffer if len(hdr) == 0 { bb.WriteString(hdrLine) } else { bb.Write(hdr) bb.Truncate(len(hdr) - LEN_CR_LF) } bb.WriteString(JSMsgSize) bb.WriteString(": ") bb.WriteString(strconv.FormatInt(int64(len(msg)), 10)) bb.WriteString(CR_LF) bb.WriteString(CR_LF) // Replace underlying buf which we can use directly when we send. // TODO(dlc) - Probably just use directly when forming bytes.Buffer? pmsg.buf = pmsg.buf[:0] pmsg.buf = append(pmsg.buf, bb.Bytes()...) // Replace with new header. pmsg.hdr = pmsg.buf // Cancel msg payload pmsg.msg = nil } // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) { if o.mset == nil { pmsg.returnToPool() return } dseq := o.dseq o.dseq++ pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, ackReply, o psz := pmsg.size() if o.maxpb > 0 { o.pbytes += psz } mset := o.mset ap := o.cfg.AckPolicy // Cant touch pmsg after this sending so capture what we need. seq, ts := pmsg.seq, pmsg.ts // Send message. o.outq.send(pmsg) if ap == AckExplicit || ap == AckAll { o.trackPending(seq, dseq) } else if ap == AckNone { o.adflr = dseq o.asflr = seq } // Flow control. if o.maxpb > 0 && o.needFlowControl(psz) { o.sendFlowControl() } // If pull mode and we have inactivity threshold, signaled by dthresh, update last activity. if o.isPullMode() && o.dthresh > 0 { o.waiting.last = time.Now() } // FIXME(dlc) - Capture errors? o.updateDelivered(dseq, seq, dc, ts) // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && rp != LimitsPolicy { if o.node == nil || o.cfg.Direct { mset.ackq.push(seq) } else { o.updateAcks(dseq, seq) } } } func (o *consumer) needFlowControl(sz int) bool { if o.maxpb == 0 { return false } // Decide whether to send a flow control message which we will need the user to respond. // We send when we are over 50% of our current window limit. if o.fcid == _EMPTY_ && o.pbytes > o.maxpb/2 { return true } // If we have an existing outstanding FC, check to see if we need to expand the o.fcsz if o.fcid != _EMPTY_ && (o.pbytes-o.fcsz) >= o.maxpb { o.fcsz += sz } return false } func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, subj, _ string, _ []byte) { o.mu.Lock() defer o.mu.Unlock() // Ignore if not the latest we have sent out. if subj != o.fcid { return } // For slow starts and ramping up. if o.maxpb < o.pblimit { o.maxpb *= 2 if o.maxpb > o.pblimit { o.maxpb = o.pblimit } } // Update accounting. o.pbytes -= o.fcsz if o.pbytes < 0 { o.pbytes = 0 } o.fcid, o.fcsz = _EMPTY_, 0 o.signalNewMessages() } // Lock should be held. func (o *consumer) fcReply() string { var sb strings.Builder sb.WriteString(jsFlowControlPre) sb.WriteString(o.stream) sb.WriteByte(btsep) sb.WriteString(o.name) sb.WriteByte(btsep) var b [4]byte rn := rand.Int63() for i, l := 0, rn; i < len(b); i++ { b[i] = digits[l%base] l /= base } sb.Write(b[:]) return sb.String() } // sendFlowControl will send a flow control packet to the consumer. // Lock should be held. func (o *consumer) sendFlowControl() { if !o.isPushMode() { return } subj, rply := o.cfg.DeliverSubject, o.fcReply() o.fcsz, o.fcid = o.pbytes, rply hdr := []byte("NATS/1.0 100 FlowControl Request\r\n\r\n") o.outq.send(newJSPubMsg(subj, _EMPTY_, rply, hdr, nil, nil, 0)) } // Tracks our outstanding pending acks. Only applicable to AckExplicit mode. // Lock should be held. func (o *consumer) trackPending(sseq, dseq uint64) { if o.pending == nil { o.pending = make(map[uint64]*Pending) } if o.ptmr == nil { o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending) } if p, ok := o.pending[sseq]; ok { p.Timestamp = time.Now().UnixNano() p.Sequence = dseq } else { o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()} } } // Credit back a failed delivery. // lock should be held. func (o *consumer) creditWaitingRequest(reply string) { for i, rp := 0, o.waiting.rp; i < o.waiting.n; i++ { if wr := o.waiting.reqs[rp]; wr != nil { if wr.reply == reply { wr.n++ wr.d-- return } } rp = (rp + 1) % cap(o.waiting.reqs) } } // didNotDeliver is called when a delivery for a consumer message failed. // Depending on our state, we will process the failure. func (o *consumer) didNotDeliver(seq uint64, subj string) { o.mu.Lock() mset := o.mset if mset == nil { o.mu.Unlock() return } // Adjust back deliver count. o.decDeliveryCount(seq) var checkDeliveryInterest bool if o.isPushMode() { o.active = false checkDeliveryInterest = true } else if o.pending != nil { o.creditWaitingRequest(subj) // pull mode and we have pending. if _, ok := o.pending[seq]; ok { // We found this messsage on pending, we need // to queue it up for immediate redelivery since // we know it was not delivered. if !o.onRedeliverQueue(seq) { o.addToRedeliverQueue(seq) o.signalNewMessages() } } } o.mu.Unlock() // If we do not have interest update that here. if checkDeliveryInterest && o.hasNoLocalInterest() { o.updateDeliveryInterest(false) } } // Lock should be held. func (o *consumer) addToRedeliverQueue(seqs ...uint64) { o.rdq = append(o.rdq, seqs...) for _, seq := range seqs { o.rdqi.Insert(seq) } } // Lock should be held. func (o *consumer) hasRedeliveries() bool { return len(o.rdq) > 0 } func (o *consumer) getNextToRedeliver() uint64 { if len(o.rdq) == 0 { return 0 } seq := o.rdq[0] if len(o.rdq) == 1 { o.rdq = nil o.rdqi.Empty() } else { o.rdq = append(o.rdq[:0], o.rdq[1:]...) o.rdqi.Delete(seq) } return seq } // This checks if we already have this sequence queued for redelivery. // FIXME(dlc) - This is O(n) but should be fast with small redeliver size. // Lock should be held. func (o *consumer) onRedeliverQueue(seq uint64) bool { return o.rdqi.Exists(seq) } // Remove a sequence from the redelivery queue. // Lock should be held. func (o *consumer) removeFromRedeliverQueue(seq uint64) bool { if !o.onRedeliverQueue(seq) { return false } for i, rseq := range o.rdq { if rseq == seq { if len(o.rdq) == 1 { o.rdq = nil o.rdqi.Empty() } else { o.rdq = append(o.rdq[:i], o.rdq[i+1:]...) o.rdqi.Delete(seq) } return true } } return false } // Checks the pending messages. func (o *consumer) checkPending() { o.mu.RLock() mset := o.mset // On stop, mset and timer will be nil. if o.closed || mset == nil || o.ptmr == nil { stopAndClearTimer(&o.ptmr) o.mu.RUnlock() return } o.mu.RUnlock() var shouldUpdateState bool var state StreamState mset.store.FastState(&state) fseq := state.FirstSeq o.mu.Lock() defer o.mu.Unlock() now := time.Now().UnixNano() ttl := int64(o.cfg.AckWait) next := int64(o.ackWait(0)) // However, if there is backoff, initializes with the largest backoff. // It will be adjusted as needed. if l := len(o.cfg.BackOff); l > 0 { next = int64(o.cfg.BackOff[l-1]) } // Since we can update timestamps, we have to review all pending. // We will now bail if we see an ack pending in bound to us via o.awl. var expired []uint64 check := len(o.pending) > 1024 for seq, p := range o.pending { if check && atomic.LoadInt64(&o.awl) > 0 { o.ptmr.Reset(100 * time.Millisecond) return } // Check if these are no longer valid. if seq < fseq || seq <= o.asflr { delete(o.pending, seq) delete(o.rdc, seq) o.removeFromRedeliverQueue(seq) shouldUpdateState = true // Check if we need to move ack floors. if seq > o.asflr { o.asflr = seq } if p.Sequence > o.adflr { o.adflr = p.Sequence } continue } elapsed, deadline := now-p.Timestamp, ttl if len(o.cfg.BackOff) > 0 { // This is ok even if o.rdc is nil, we would get dc == 0, which is what we want. dc := int(o.rdc[seq]) // This will be the index for the next backoff, will set to last element if needed. nbi := dc + 1 if dc+1 >= len(o.cfg.BackOff) { dc = len(o.cfg.BackOff) - 1 nbi = dc } deadline = int64(o.cfg.BackOff[dc]) // Set `next` to the next backoff (if smaller than current `next` value). if nextBackoff := int64(o.cfg.BackOff[nbi]); nextBackoff < next { next = nextBackoff } } if elapsed >= deadline { if !o.onRedeliverQueue(seq) { expired = append(expired, seq) } } else if deadline-elapsed < next { // Update when we should fire next. next = deadline - elapsed } } if len(expired) > 0 { // We need to sort. sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] }) o.addToRedeliverQueue(expired...) // Now we should update the timestamp here since we are redelivering. // We will use an incrementing time to preserve order for any other redelivery. off := now - o.pending[expired[0]].Timestamp for _, seq := range expired { if p, ok := o.pending[seq]; ok { p.Timestamp += off } } o.signalNewMessages() } if len(o.pending) > 0 { delay := time.Duration(next) if o.ptmr == nil { o.ptmr = time.AfterFunc(delay, o.checkPending) } else { o.ptmr.Reset(o.ackWait(delay)) } } else { // Make sure to stop timer and clear out any re delivery queues stopAndClearTimer(&o.ptmr) o.rdq = nil o.rdqi.Empty() o.pending = nil } // Update our state if needed. if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err) } } } // SeqFromReply will extract a sequence number from a reply subject. func (o *consumer) seqFromReply(reply string) uint64 { _, dseq, _ := ackReplyInfo(reply) return dseq } // StreamSeqFromReply will extract the stream sequence from the reply subject. func (o *consumer) streamSeqFromReply(reply string) uint64 { sseq, _, _ := ackReplyInfo(reply) return sseq } // Quick parser for positive numbers in ack reply encoding. func parseAckReplyNum(d string) (n int64) { if len(d) == 0 { return -1 } for _, dec := range d { if dec < asciiZero || dec > asciiNine { return -1 } n = n*10 + (int64(dec) - asciiZero) } return n } const expectedNumReplyTokens = 9 // Grab encoded information in the reply subject for a delivered message. func replyInfo(subject string) (sseq, dseq, dc uint64, ts int64, pending uint64) { tsa := [expectedNumReplyTokens]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(subject); i++ { if subject[i] == btsep { tokens = append(tokens, subject[start:i]) start = i + 1 } } tokens = append(tokens, subject[start:]) if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { return 0, 0, 0, 0, 0 } // TODO(dlc) - Should we error if we do not match consumer name? // stream is tokens[2], consumer is 3. dc = uint64(parseAckReplyNum(tokens[4])) sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6])) ts = parseAckReplyNum(tokens[7]) pending = uint64(parseAckReplyNum(tokens[8])) return sseq, dseq, dc, ts, pending } func ackReplyInfo(subject string) (sseq, dseq, dc uint64) { tsa := [expectedNumReplyTokens]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(subject); i++ { if subject[i] == btsep { tokens = append(tokens, subject[start:i]) start = i + 1 } } tokens = append(tokens, subject[start:]) if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { return 0, 0, 0 } dc = uint64(parseAckReplyNum(tokens[4])) sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6])) return sseq, dseq, dc } // NextSeq returns the next delivered sequence number for this consumer. func (o *consumer) nextSeq() uint64 { o.mu.RLock() dseq := o.dseq o.mu.RUnlock() return dseq } // Used to hold skip list when deliver policy is last per subject. type lastSeqSkipList struct { resume uint64 seqs []uint64 } // Will create a skip list for us from a store's subjects state. func createLastSeqSkipList(mss map[string]SimpleState) []uint64 { seqs := make([]uint64, 0, len(mss)) for _, ss := range mss { seqs = append(seqs, ss.Last) } sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) return seqs } // Let's us know we have a skip list, which is for deliver last per subject and we are just starting. // Lock should be held. func (o *consumer) hasSkipListPending() bool { return o.lss != nil && len(o.lss.seqs) > 0 } // Will select the starting sequence. func (o *consumer) selectStartingSeqNo() { if o.mset == nil || o.mset.store == nil { o.sseq = 1 } else { var state StreamState o.mset.store.FastState(&state) if o.cfg.OptStartSeq == 0 { if o.cfg.DeliverPolicy == DeliverAll { o.sseq = state.FirstSeq } else if o.cfg.DeliverPolicy == DeliverLast { if o.subjf == nil { o.sseq = state.LastSeq return } // If we are partitioned here this will be properly set when we become leader. for _, filter := range o.subjf { ss := o.mset.store.FilteredState(1, filter.subject) filter.nextSeq = ss.Last if ss.Last > o.sseq { o.sseq = ss.Last } } } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { if o.subjf == nil { if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 { o.lss = &lastSeqSkipList{ resume: state.LastSeq, seqs: createLastSeqSkipList(mss), } o.sseq = o.lss.seqs[0] } else { // If no mapping info just set to last. o.sseq = state.LastSeq } return } lss := &lastSeqSkipList{ resume: state.LastSeq, } for _, filter := range o.subjf { if mss := o.mset.store.SubjectsState(filter.subject); len(mss) > 0 { lss.seqs = append(lss.seqs, createLastSeqSkipList(mss)...) } } if len(lss.seqs) == 0 { o.sseq = state.LastSeq } // Sort the skip list sort.Slice(lss.seqs, func(i, j int) bool { return lss.seqs[j] > lss.seqs[i] }) o.lss = lss if len(o.lss.seqs) != 0 { o.sseq = o.lss.seqs[0] } } else if o.cfg.OptStartTime != nil { // If we are here we are time based. // TODO(dlc) - Once clustered can't rely on this. o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime) } else { // DeliverNew o.sseq = state.LastSeq + 1 } } else { o.sseq = o.cfg.OptStartSeq } if state.FirstSeq == 0 { o.sseq = 1 for _, filter := range o.subjf { filter.nextSeq = 1 } } else if o.sseq < state.FirstSeq { o.sseq = state.FirstSeq } else if o.sseq > state.LastSeq { o.sseq = state.LastSeq + 1 } for _, filter := range o.subjf { if state.FirstSeq == 0 { filter.nextSeq = 1 } if filter.nextSeq < state.FirstSeq { filter.nextSeq = state.FirstSeq } if filter.nextSeq > state.LastSeq { filter.nextSeq = state.LastSeq + 1 } } } if o.subjf != nil { sort.Slice(o.subjf, func(i, j int) bool { return o.subjf[j].nextSeq > o.subjf[i].nextSeq }) } // Always set delivery sequence to 1. o.dseq = 1 // Set ack delivery floor to delivery-1 o.adflr = o.dseq - 1 // Set ack store floor to store-1 o.asflr = o.sseq - 1 // Set our starting sequence state. if o.store != nil && o.sseq > 0 { o.store.SetStarting(o.sseq - 1) } } // Test whether a config represents a durable subscriber. func isDurableConsumer(config *ConsumerConfig) bool { return config != nil && config.Durable != _EMPTY_ } func (o *consumer) isDurable() bool { return o.cfg.Durable != _EMPTY_ } // Are we in push mode, delivery subject, etc. func (o *consumer) isPushMode() bool { return o.cfg.DeliverSubject != _EMPTY_ } func (o *consumer) isPullMode() bool { return o.cfg.DeliverSubject == _EMPTY_ } // Name returns the name of this consumer. func (o *consumer) String() string { o.mu.RLock() n := o.name o.mu.RUnlock() return n } func createConsumerName() string { return getHash(nuid.Next()) } // deleteConsumer will delete the consumer from this stream. func (mset *stream) deleteConsumer(o *consumer) error { return o.delete() } func (o *consumer) getStream() *stream { o.mu.RLock() mset := o.mset o.mu.RUnlock() return mset } func (o *consumer) streamName() string { o.mu.RLock() mset := o.mset o.mu.RUnlock() if mset != nil { return mset.name() } return _EMPTY_ } // Active indicates if this consumer is still active. func (o *consumer) isActive() bool { o.mu.RLock() active := o.active && o.mset != nil o.mu.RUnlock() return active } // hasNoLocalInterest return true if we have no local interest. func (o *consumer) hasNoLocalInterest() bool { o.mu.RLock() rr := o.acc.sl.Match(o.cfg.DeliverSubject) o.mu.RUnlock() return len(rr.psubs)+len(rr.qsubs) == 0 } // This is when the underlying stream has been purged. // sseq is the new first seq for the stream after purge. // Lock should be held. func (o *consumer) purge(sseq uint64, slseq uint64) { // Do not update our state unless we know we are the leader. if !o.isLeader() { return } // Signals all have been purged for this consumer. if sseq == 0 { sseq = slseq + 1 } o.mu.Lock() // Do not go backwards if o.sseq < sseq { o.sseq = sseq } if o.asflr < sseq { o.asflr = sseq - 1 // We need to remove those no longer relevant from pending. for seq, p := range o.pending { if seq <= o.asflr { if p.Sequence > o.adflr { o.adflr = p.Sequence if o.adflr > o.dseq { o.dseq = o.adflr } } delete(o.pending, seq) delete(o.rdc, seq) // rdq handled below. } } } // This means we can reset everything at this point. if len(o.pending) == 0 { o.pending, o.rdc = nil, nil } // We need to remove all those being queued for redelivery under o.rdq if len(o.rdq) > 0 { rdq := o.rdq o.rdq = nil o.rdqi.Empty() for _, sseq := range rdq { if sseq >= o.sseq { o.addToRedeliverQueue(sseq) } } } // Grab some info in case of error below. s, acc, mset, name := o.srv, o.acc, o.mset, o.name o.mu.Unlock() if err := o.writeStoreState(); err != nil && s != nil && mset != nil { s.Warnf("Consumer '%s > %s > %s' error on write store state from purge: %v", acc, mset.name(), name, err) } } func stopAndClearTimer(tp **time.Timer) { if *tp == nil { return } // Will get drained in normal course, do not try to // drain here. (*tp).Stop() *tp = nil } // Stop will shutdown the consumer for the associated stream. func (o *consumer) stop() error { return o.stopWithFlags(false, false, true, false) } func (o *consumer) deleteWithoutAdvisory() error { return o.stopWithFlags(true, false, true, false) } // Delete will delete the consumer for the associated stream and send advisories. func (o *consumer) delete() error { return o.stopWithFlags(true, false, true, true) } // To test for closed state. func (o *consumer) isClosed() bool { o.mu.RLock() defer o.mu.RUnlock() return o.closed } func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { o.mu.Lock() js := o.js if o.closed { o.mu.Unlock() return nil } o.closed = true // Check if we are the leader and are being deleted. if dflag && o.isLeader() { // If we are clustered and node leader (probable from above), stepdown. if node := o.node; node != nil && node.Leader() { node.StepDown() } if advisory { o.sendDeleteAdvisoryLocked() } if o.isPullMode() { // Release any pending. o.releaseAnyPendingRequests() } } if o.qch != nil { close(o.qch) o.qch = nil } a := o.acc store := o.store mset := o.mset o.mset = nil o.active = false o.unsubscribe(o.ackSub) o.unsubscribe(o.reqSub) o.unsubscribe(o.fcSub) o.ackSub = nil o.reqSub = nil o.fcSub = nil if o.infoSub != nil { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } c := o.client o.client = nil sysc := o.sysc o.sysc = nil stopAndClearTimer(&o.ptmr) stopAndClearTimer(&o.dtmr) stopAndClearTimer(&o.gwdtmr) delivery := o.cfg.DeliverSubject o.waiting = nil // Break us out of the readLoop. if doSignal { o.signalNewMessages() } n := o.node qgroup := o.cfg.DeliverGroup o.ackMsgs.unregister() if o.nextMsgReqs != nil { o.nextMsgReqs.unregister() } // For cleaning up the node assignment. var ca *consumerAssignment if dflag { ca = o.ca } sigSubs := o.sigSubs o.mu.Unlock() if c != nil { c.closeConnection(ClientClosed) } if sysc != nil { sysc.closeConnection(ClientClosed) } if delivery != _EMPTY_ { a.sl.clearNotification(delivery, qgroup, o.inch) } var rp RetentionPolicy if mset != nil { if len(sigSubs) > 0 { mset.removeConsumerAsLeader(o) } mset.mu.Lock() mset.removeConsumer(o) rp = mset.cfg.Retention mset.mu.Unlock() } // We need to optionally remove all messages since we are interest based retention. // We will do this consistently on all replicas. Note that if in clustered mode the // non-leader consumers will need to restore state first. if dflag && rp == InterestPolicy { state := mset.state() stop := state.LastSeq o.mu.Lock() if !o.isLeader() { o.readStoredState(stop) } start := o.asflr o.mu.Unlock() // Make sure we start at worst with first sequence in the stream. if start < state.FirstSeq { start = state.FirstSeq } var rmseqs []uint64 mset.mu.Lock() for seq := start; seq <= stop; seq++ { if mset.noInterest(seq, o) { rmseqs = append(rmseqs, seq) } } mset.mu.Unlock() // These can be removed. for _, seq := range rmseqs { mset.store.RemoveMsg(seq) } } // Cluster cleanup. if n != nil { if dflag { n.Delete() } else { // Try to install snapshot on clean exit if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) { if snap, err := o.store.EncodedState(); err == nil { n.InstallSnapshot(snap) } } n.Stop() } } if ca != nil { js.mu.Lock() if ca.Group != nil { ca.Group.node = nil } js.mu.Unlock() } // Clean up our store. var err error if store != nil { if dflag { if sdflag { err = store.StreamDelete() } else { err = store.Delete() } } else { err = store.Stop() } } return err } // Check that we do not form a cycle by delivering to a delivery subject // that is part of the interest group. func deliveryFormsCycle(cfg *StreamConfig, deliverySubject string) bool { for _, subject := range cfg.Subjects { if subjectIsSubsetMatch(deliverySubject, subject) { return true } } return false } // switchToEphemeral is called on startup when recovering ephemerals. func (o *consumer) switchToEphemeral() { o.mu.Lock() o.cfg.Durable = _EMPTY_ store, ok := o.store.(*consumerFileStore) rr := o.acc.sl.Match(o.cfg.DeliverSubject) // Setup dthresh. o.updateInactiveThreshold(&o.cfg) o.mu.Unlock() // Update interest o.updateDeliveryInterest(len(rr.psubs)+len(rr.qsubs) > 0) // Write out new config if ok { store.updateConfig(o.cfg) } } // RequestNextMsgSubject returns the subject to request the next message when in pull or worker mode. // Returns empty otherwise. func (o *consumer) requestNextMsgSubject() string { return o.nextMsgSubj } func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() // Update our cached num pending only if we think deliverMsg has not done so. if sseq >= o.sseq && o.isFilteredMatch(subj) { o.npc-- } // Check if this message was pending. p, wasPending := o.pending[sseq] var rdc uint64 = 1 if o.rdc != nil { rdc = o.rdc[sseq] } o.mu.Unlock() // If it was pending process it like an ack. // TODO(dlc) - we could do a term here instead with a reason to generate the advisory. if wasPending { // We could have lock for stream so do this in a go routine. // TODO(dlc) - We should do this with ipq vs naked go routines. go o.processTerm(sseq, p.Sequence, rdc) } } func (o *consumer) account() *Account { o.mu.RLock() a := o.acc o.mu.RUnlock() return a } // Creates a sublist for consumer. // All subjects share the same callback. func (o *consumer) signalSubs() []*subscription { o.mu.Lock() defer o.mu.Unlock() if o.sigSubs != nil { return o.sigSubs } subs := []*subscription{} if o.subjf == nil { subs = append(subs, &subscription{subject: []byte(fwcs), icb: o.processStreamSignal}) o.sigSubs = subs return subs } for _, filter := range o.subjf { subs = append(subs, &subscription{subject: []byte(filter.subject), icb: o.processStreamSignal}) } o.sigSubs = subs return subs } // This is what will be called when our parent stream wants to kick us regarding a new message. // We know that we are the leader and that this subject matches us by how the parent handles registering // us with the signaling sublist. // We do need the sequence of the message however and we use the msg as the encoded seq. func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) { var le = binary.LittleEndian seq := le.Uint64(seqb) o.mu.Lock() defer o.mu.Unlock() if o.mset == nil { return } if seq > o.npf { o.npc++ } if seq < o.sseq { return } if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() { o.signalNewMessages() } } // Used to compare if two multiple filtered subject lists are equal. func subjectSliceEqual(slice1 []string, slice2 []string) bool { if len(slice1) != len(slice2) { return false } set2 := make(map[string]struct{}, len(slice2)) for _, val := range slice2 { set2[val] = struct{}{} } for _, val := range slice1 { if _, ok := set2[val]; !ok { return false } } return true } // Utility for simpler if conditions in Consumer config checks. // In future iteration, we can immediately create `o.subjf` and // use it to validate things. func gatherSubjectFilters(filter string, filters []string) []string { if filter != _EMPTY_ { filters = append(filters, filter) } // list of filters should never contain non-empty filter. return filters } // Will check if we are running in the monitor already and if not set the appropriate flag. func (o *consumer) checkInMonitor() bool { o.mu.Lock() defer o.mu.Unlock() if o.inMonitor { return true } o.inMonitor = true return false } // Clear us being in the monitor routine. func (o *consumer) clearMonitorRunning() { o.mu.Lock() defer o.mu.Unlock() o.inMonitor = false } // Test whether we are in the monitor routine. func (o *consumer) isMonitorRunning() bool { o.mu.RLock() defer o.mu.RUnlock() return o.inMonitor } // If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. func (o *consumer) checkStateForInterestStream() { o.mu.Lock() // See if we need to process this update if our parent stream is not a limits policy stream. mset := o.mset shouldProcessState := mset != nil && o.retention != LimitsPolicy if o.closed || !shouldProcessState { o.mu.Unlock() return } state, err := o.store.State() o.mu.Unlock() if err != nil { return } // We should make sure to update the acks. var ss StreamState mset.store.FastState(&ss) asflr := state.AckFloor.Stream for seq := ss.FirstSeq; seq <= asflr; seq++ { mset.ackMsg(o, seq) } o.mu.RLock() // See if we need to process this update if our parent stream is not a limits policy stream. state, _ = o.store.State() o.mu.RUnlock() // If we have pending, we will need to walk through to delivered in case we missed any of those acks as well. if state != nil && len(state.Pending) > 0 { for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ { if _, ok := state.Pending[seq]; !ok { mset.ackMsg(o, seq) } } } }