From 143f145364861dbf46485801ac67bd6713b9e18c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 19 Aug 2021 18:19:39 -0700 Subject: [PATCH] Update Go client Signed-off-by: Derek Collison --- go.mod | 2 +- go.sum | 4 + vendor/github.com/nats-io/nats.go/js.go | 280 ++++++++++++----------- vendor/github.com/nats-io/nats.go/jsm.go | 1 + vendor/modules.txt | 2 +- 5 files changed, 154 insertions(+), 135 deletions(-) diff --git a/go.mod b/go.mod index b0de92b0..eb92a1b0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/klauspost/compress v1.11.12 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.3 - github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 + github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e diff --git a/go.sum b/go.sum index e77ed911..ca43c6c6 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,10 @@ github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 h1:9WQzXoYc37xB github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 h1:aJYmbbVrq6rsFGAvQnAvoChjkjUOJGqVBdQ47vbEWD4= github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66 h1:J3LNTmD/AUgjKJjZK2IEsGl2GD1znemMOq64ZKu83ok= +github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 h1:lffRgFiHXqxwf8lYNSXXeOZdOgAIOabGwOSwdttqCn0= +github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 92b6e544..cc92f8c8 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -260,6 +260,7 @@ type pubOpts struct { lid string // Expected last msgId str string // Expected stream name seq uint64 // Expected last sequence + lss uint64 // Expected last sequence per subject } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -278,10 +279,11 @@ type PubAck struct { // Headers for published messages. const ( - MsgIdHdr = "Nats-Msg-Id" - ExpectedStreamHdr = "Nats-Expected-Stream" - ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" - ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" + MsgIdHdr = "Nats-Msg-Id" + ExpectedStreamHdr = "Nats-Expected-Stream" + ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" + ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" + ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" ) // PublishMsg publishes a Msg to a stream from JetStream. @@ -317,6 +319,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { if o.seq > 0 { m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) } + if o.lss > 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + } var resp *Msg var err error @@ -618,6 +623,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if o.seq > 0 { m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) } + if o.lss > 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + } // Reply if m.Reply != _EMPTY_ { @@ -687,6 +695,14 @@ func ExpectLastSequence(seq uint64) PubOpt { }) } +// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish. +func ExpectLastSequencePerSubject(seq uint64) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.lss = seq + return nil + }) +} + // ExpectLastMsgId sets the expected last msgId in the response from the publish. func ExpectLastMsgId(id string) PubOpt { return pubOptFn(func(opts *pubOpts) error { @@ -772,6 +788,7 @@ func Context(ctx context.Context) ContextOpt { // ConsumerConfig is the configuration of a JetStream consumer. type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` + Description string `json:"description,omitempty"` DeliverSubject string `json:"deliver_subject,omitempty"` DeliverGroup string `json:"deliver_group,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` @@ -1012,7 +1029,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // If no stream name is specified, or if option SubjectIsDelivery is // specified, the subject cannot be empty. - if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) { + if subj == _EMPTY_ && o.stream == _EMPTY_ { return nil, fmt.Errorf("nats: subject required") } @@ -1027,7 +1044,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) } // No deliver subject should be provided - if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery { + if o.cfg.DeliverSubject != _EMPTY_ { return nil, ErrPullSubscribeToPushConsumer } } @@ -1081,6 +1098,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 { return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer") } + // No deliver subject, we pick our own. + if o.cfg.DeliverSubject != _EMPTY_ { + return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer") + } // Queue groups not allowed. if queue != _EMPTY_ { return nil, fmt.Errorf("nats: queues not be set for an ordered consumer") @@ -1106,95 +1127,89 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, hbi = o.cfg.Heartbeat } - // With this option, we go directly create the NATS subscription - // and skip all lookup/create. - if o.subjIsDelivery { - deliver = subj + // In case a consumer has not been set explicitly, then the + // durable name will be used as the consumer name. + if consumer == _EMPTY_ { + consumer = o.cfg.Durable + } + + // Find the stream mapped to the subject if not bound to a stream already. + if o.stream == _EMPTY_ { + stream, err = js.lookupStreamBySubject(subj) + if err != nil { + return nil, err + } } else { - // In case a consumer has not been set explicitly, then the - // durable name will be used as the consumer name. - if consumer == _EMPTY_ { - consumer = o.cfg.Durable + stream = o.stream + } + + // With an explicit durable name, we can lookup the consumer first + // to which it should be attaching to. + if consumer != _EMPTY_ { + info, err = js.ConsumerInfo(stream, consumer) + notFoundErr = errors.Is(err, ErrConsumerNotFound) + lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded + } + + switch { + case info != nil: + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err } - - // Find the stream mapped to the subject if not bound to a stream already. - if o.stream == _EMPTY_ { - stream, err = js.lookupStreamBySubject(subj) - if err != nil { - return nil, err - } - } else { - stream = o.stream + icfg := &info.Config + hasFC, hbi = icfg.FlowControl, icfg.Heartbeat + hasHeartbeats = hbi > 0 + case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): + // If the consumer is being bound and we got an error on pull subscribe then allow the error. + if !(isPullMode && lookupErr && consumerBound) { + return nil, err } - - // With an explicit durable name, we can lookup the consumer first - // to which it should be attaching to. - if consumer != _EMPTY_ { - info, err = js.ConsumerInfo(stream, consumer) - notFoundErr = errors.Is(err, ErrConsumerNotFound) - lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded - } - - switch { - case info != nil: - deliver, err = processConsInfo(info, isPullMode, subj, queue) - if err != nil { - return nil, err - } - icfg := &info.Config - hasFC, hbi = icfg.FlowControl, icfg.Heartbeat - hasHeartbeats = hbi > 0 - case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): - // If the consumer is being bound and we got an error on pull subscribe then allow the error. - if !(isPullMode && lookupErr && consumerBound) { - return nil, err - } - default: - // Attempt to create consumer if not found nor using Bind. - shouldCreate = true - if o.cfg.DeliverSubject != _EMPTY_ { - deliver = o.cfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() - cfg.DeliverSubject = deliver - } - - // Do filtering always, server will clear as needed. - cfg.FilterSubject = subj - - // Pass the queue to the consumer config - if queue != _EMPTY_ { - cfg.DeliverGroup = queue - } - - // If not set default to ack explicit. - if cfg.AckPolicy == ackPolicyNotSet { - cfg.AckPolicy = AckExplicitPolicy - } - // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max. - // TODO(dlc) - We should be able to update this if client updates PendingLimits. - if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { - if !isPullMode && cb != nil && hasFC { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 - } else if ch != nil { - cfg.MaxAckPending = cap(ch) - } else { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit - } - } - // Create request here. - ccreq = &createConsumerRequest{ - Stream: stream, - Config: &cfg, - } - hbi = cfg.Heartbeat - } - - if isPullMode { - nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + default: + // Attempt to create consumer if not found nor using Bind. + shouldCreate = true + if o.cfg.DeliverSubject != _EMPTY_ { + deliver = o.cfg.DeliverSubject + } else if !isPullMode { deliver = nc.newInbox() + cfg.DeliverSubject = deliver } + + // Do filtering always, server will clear as needed. + cfg.FilterSubject = subj + + // Pass the queue to the consumer config + if queue != _EMPTY_ { + cfg.DeliverGroup = queue + } + + // If not set default to ack explicit. + if cfg.AckPolicy == ackPolicyNotSet { + cfg.AckPolicy = AckExplicitPolicy + } + // If we have acks at all and the MaxAckPending is not set go ahead + // and set to the internal max. + // TODO(dlc) - We should be able to update this if client updates PendingLimits. + if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { + if !isPullMode && cb != nil && hasFC { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 + } else if ch != nil { + cfg.MaxAckPending = cap(ch) + } else { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit + } + } + // Create request here. + ccreq = &createConsumerRequest{ + Stream: stream, + Config: &cfg, + } + hbi = cfg.Heartbeat + } + + if isPullMode { + nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + deliver = nc.newInbox() } jsi := &jsSub{ @@ -1671,10 +1686,6 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool - // Means that the subject passed to subscribe call will be used - // for the low level NATS subscription and no stream nor consumer - // lookup/creation will be done. - subjIsDelivery bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1731,6 +1742,15 @@ func DeliverLast() SubOpt { }) } +// DeliverLastPerSubject configures a Consumer to receive messages +// starting with the latest one for each filtered subject. +func DeliverLastPerSubject() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy + return nil + }) +} + // DeliverNew configures a Consumer to receive messages // published after the subscription. func DeliverNew() SubOpt { @@ -1893,21 +1913,6 @@ func DeliverSubject(subject string) SubOpt { }) } -// SubjectIsDelivery specifies that the subject parameter in the subscribe -// call shall be used to create the NATS subscription and matches the -// JetStream consumer's deliver subject. -// -// NOTE: This is an "expert" API and should only be used when consumer lookup or -// creation by the library is not possible (for instance cross accounts). -// Since no lookup of the JetStream consumer is done, there is no way for -// the library to check the validity of this JetStream subscription. -func SubjectIsDelivery() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.subjIsDelivery = true - return nil - }) -} - func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -2296,12 +2301,12 @@ const ( ) func getMetadataFields(subject string) ([]string, error) { - const noDomainNoHashsExpectedAckTokens = 9 - const withDomainNoHashExpectedAckTokens = 10 - const withDomainAndHashExpectedAckTokens = 11 + const v1TokenCounts = 9 + const v2TokenCounts = 12 + const noDomainName = "_" const btsep = '.' - tsa := [withDomainAndHashExpectedAckTokens]string{} + tsa := [v2TokenCounts]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(subject); i++ { if subject[i] == btsep { @@ -2311,37 +2316,38 @@ func getMetadataFields(subject string) ([]string, error) { } tokens = append(tokens, subject[start:]) // - // Newer server will include an account hash in the subject, and possibly the domain. - // So the subject could be: + // Newer server will include the domain name and account hash in the subject, + // and a token at the end. // - // no domain: $JS.ACK...... - // with domain: $JS.ACK....... + // Old subject was: + // $JS.ACK....... // - // So old server number of tokens is 9, newer is 10 or 11. + // New subject would be: + // $JS.ACK.......... + // + // v1 has 9 tokens, v2 has 12. // l := len(tokens) - if l < noDomainNoHashsExpectedAckTokens || l > withDomainAndHashExpectedAckTokens { + // If lower than 9 or more than 9 but less than 12, report an error + if l < v1TokenCounts || (l > v1TokenCounts && l < v2TokenCounts) { return nil, ErrNotJSMessage } if tokens[0] != "$JS" || tokens[1] != "ACK" { return nil, ErrNotJSMessage } - // To make the rest of the library agnostic of that, we always return the tokens - // as if it is coming from a new server will all possible tokens. If domain or account - // hash are not specified, the tokens at those locations will simply be empty. - if l == noDomainNoHashsExpectedAckTokens || l == withDomainNoHashExpectedAckTokens { + // For v1 style, we insert 2 empty tokens (domain and hash) so that the + // rest of the library references known fields at a constant location. + if l == 9 { // Extend the array (we know the backend is big enough) - // Compute how many tokens we need to insert. - itc := withDomainAndHashExpectedAckTokens - l - for i := 0; i < itc; i++ { - tokens = append(tokens, _EMPTY_) - } + tokens = append(tokens, _EMPTY_, _EMPTY_) // Move to the right anything that is after "ACK" token. - copy(tokens[ackDomainTokenPos+itc:], tokens[ackDomainTokenPos:]) - // Set the missing tokens to empty - for i := 0; i < itc; i++ { - tokens[ackDomainTokenPos+i] = _EMPTY_ - } + copy(tokens[ackDomainTokenPos+2:], tokens[ackDomainTokenPos:]) + // Clear the domain and hash tokens + tokens[ackDomainTokenPos], tokens[ackAccHashTokenPos] = _EMPTY_, _EMPTY_ + + } else if tokens[ackDomainTokenPos] == noDomainName { + // If domain is "_", replace with empty value. + tokens[ackDomainTokenPos] = _EMPTY_ } return tokens, nil } @@ -2522,6 +2528,10 @@ const ( // DeliverByStartTimePolicy will deliver messages starting from a given // time. DeliverByStartTimePolicy + + // DeliverLastPerSubjectPolicy will start the consumer with the last message + // for all subjects received. + DeliverLastPerSubjectPolicy ) func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { @@ -2536,6 +2546,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { *p = DeliverByStartSequencePolicy case jsonString("by_start_time"): *p = DeliverByStartTimePolicy + case jsonString("last_per_subject"): + *p = DeliverLastPerSubjectPolicy } return nil @@ -2553,6 +2565,8 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { return json.Marshal("by_start_sequence") case DeliverByStartTimePolicy: return json.Marshal("by_start_time") + case DeliverLastPerSubjectPolicy: + return json.Marshal("last_per_subject") default: return nil, fmt.Errorf("nats: unknown deliver policy %v", p) } diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 00ea173a..f40086b2 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -75,6 +75,7 @@ type JetStreamManager interface { // given the name will be used as the only subject. type StreamConfig struct { Name string `json:"name"` + Description string `json:"description,omitempty"` Subjects []string `json:"subjects,omitempty"` Retention RetentionPolicy `json:"retention"` MaxConsumers int `json:"max_consumers"` diff --git a/vendor/modules.txt b/vendor/modules.txt index 30be76e0..9e7626eb 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.3 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 +# github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin