Update Go client

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-08-19 18:19:39 -07:00
parent 7dcd75aa1d
commit 143f145364
5 changed files with 154 additions and 135 deletions

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/klauspost/compress v1.11.12
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.0.3
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e

4
go.sum
View File

@@ -26,6 +26,10 @@ github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 h1:9WQzXoYc37xB
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 h1:aJYmbbVrq6rsFGAvQnAvoChjkjUOJGqVBdQ47vbEWD4=
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66 h1:J3LNTmD/AUgjKJjZK2IEsGl2GD1znemMOq64ZKu83ok=
github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 h1:lffRgFiHXqxwf8lYNSXXeOZdOgAIOabGwOSwdttqCn0=
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=

View File

@@ -260,6 +260,7 @@ type pubOpts struct {
lid string // Expected last msgId
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject
}
// pubAckResponse is the ack response from the JetStream API when publishing a message.
@@ -278,10 +279,11 @@ type PubAck struct {
// Headers for published messages.
const (
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
)
// PublishMsg publishes a Msg to a stream from JetStream.
@@ -317,6 +319,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
}
var resp *Msg
var err error
@@ -618,6 +623,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
}
// Reply
if m.Reply != _EMPTY_ {
@@ -687,6 +695,14 @@ func ExpectLastSequence(seq uint64) PubOpt {
})
}
// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectLastSequencePerSubject(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lss = seq
return nil
})
}
// ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastMsgId(id string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
@@ -772,6 +788,7 @@ func Context(ctx context.Context) ContextOpt {
// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
@@ -1012,7 +1029,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// If no stream name is specified, or if option SubjectIsDelivery is
// specified, the subject cannot be empty.
if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) {
if subj == _EMPTY_ && o.stream == _EMPTY_ {
return nil, fmt.Errorf("nats: subject required")
}
@@ -1027,7 +1044,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}
// No deliver subject should be provided
if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery {
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, ErrPullSubscribeToPushConsumer
}
}
@@ -1081,6 +1098,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
@@ -1106,95 +1127,89 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hbi = o.cfg.Heartbeat
}
// With this option, we go directly create the NATS subscription
// and skip all lookup/create.
if o.subjIsDelivery {
deliver = subj
// In case a consumer has not been set explicitly, then the
// durable name will be used as the consumer name.
if consumer == _EMPTY_ {
consumer = o.cfg.Durable
}
// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
}
} else {
// In case a consumer has not been set explicitly, then the
// durable name will be used as the consumer name.
if consumer == _EMPTY_ {
consumer = o.cfg.Durable
stream = o.stream
}
// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
if consumer != _EMPTY_ {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
}
switch {
case info != nil:
deliver, err = processConsInfo(info, isPullMode, subj, queue)
if err != nil {
return nil, err
}
// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
}
} else {
stream = o.stream
icfg := &info.Config
hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
hasHeartbeats = hbi > 0
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// If the consumer is being bound and we got an error on pull subscribe then allow the error.
if !(isPullMode && lookupErr && consumerBound) {
return nil, err
}
// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
if consumer != _EMPTY_ {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
}
switch {
case info != nil:
deliver, err = processConsInfo(info, isPullMode, subj, queue)
if err != nil {
return nil, err
}
icfg := &info.Config
hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
hasHeartbeats = hbi > 0
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// If the consumer is being bound and we got an error on pull subscribe then allow the error.
if !(isPullMode && lookupErr && consumerBound) {
return nil, err
}
default:
// Attempt to create consumer if not found nor using Bind.
shouldCreate = true
if o.cfg.DeliverSubject != _EMPTY_ {
deliver = o.cfg.DeliverSubject
} else if !isPullMode {
deliver = nc.newInbox()
cfg.DeliverSubject = deliver
}
// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj
// Pass the queue to the consumer config
if queue != _EMPTY_ {
cfg.DeliverGroup = queue
}
// If not set default to ack explicit.
if cfg.AckPolicy == ackPolicyNotSet {
cfg.AckPolicy = AckExplicitPolicy
}
// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max.
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
if !isPullMode && cb != nil && hasFC {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16
} else if ch != nil {
cfg.MaxAckPending = cap(ch)
} else {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit
}
}
// Create request here.
ccreq = &createConsumerRequest{
Stream: stream,
Config: &cfg,
}
hbi = cfg.Heartbeat
}
if isPullMode {
nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
default:
// Attempt to create consumer if not found nor using Bind.
shouldCreate = true
if o.cfg.DeliverSubject != _EMPTY_ {
deliver = o.cfg.DeliverSubject
} else if !isPullMode {
deliver = nc.newInbox()
cfg.DeliverSubject = deliver
}
// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj
// Pass the queue to the consumer config
if queue != _EMPTY_ {
cfg.DeliverGroup = queue
}
// If not set default to ack explicit.
if cfg.AckPolicy == ackPolicyNotSet {
cfg.AckPolicy = AckExplicitPolicy
}
// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max.
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
if !isPullMode && cb != nil && hasFC {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16
} else if ch != nil {
cfg.MaxAckPending = cap(ch)
} else {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit
}
}
// Create request here.
ccreq = &createConsumerRequest{
Stream: stream,
Config: &cfg,
}
hbi = cfg.Heartbeat
}
if isPullMode {
nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
deliver = nc.newInbox()
}
jsi := &jsSub{
@@ -1671,10 +1686,6 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
// Means that the subject passed to subscribe call will be used
// for the low level NATS subscription and no stream nor consumer
// lookup/creation will be done.
subjIsDelivery bool
}
// OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages.
@@ -1731,6 +1742,15 @@ func DeliverLast() SubOpt {
})
}
// DeliverLastPerSubject configures a Consumer to receive messages
// starting with the latest one for each filtered subject.
func DeliverLastPerSubject() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
return nil
})
}
// DeliverNew configures a Consumer to receive messages
// published after the subscription.
func DeliverNew() SubOpt {
@@ -1893,21 +1913,6 @@ func DeliverSubject(subject string) SubOpt {
})
}
// SubjectIsDelivery specifies that the subject parameter in the subscribe
// call shall be used to create the NATS subscription and matches the
// JetStream consumer's deliver subject.
//
// NOTE: This is an "expert" API and should only be used when consumer lookup or
// creation by the library is not possible (for instance cross accounts).
// Since no lookup of the JetStream consumer is done, there is no way for
// the library to check the validity of this JetStream subscription.
func SubjectIsDelivery() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.subjIsDelivery = true
return nil
})
}
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
@@ -2296,12 +2301,12 @@ const (
)
func getMetadataFields(subject string) ([]string, error) {
const noDomainNoHashsExpectedAckTokens = 9
const withDomainNoHashExpectedAckTokens = 10
const withDomainAndHashExpectedAckTokens = 11
const v1TokenCounts = 9
const v2TokenCounts = 12
const noDomainName = "_"
const btsep = '.'
tsa := [withDomainAndHashExpectedAckTokens]string{}
tsa := [v2TokenCounts]string{}
start, tokens := 0, tsa[:0]
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
@@ -2311,37 +2316,38 @@ func getMetadataFields(subject string) ([]string, error) {
}
tokens = append(tokens, subject[start:])
//
// Newer server will include an account hash in the subject, and possibly the domain.
// So the subject could be:
// Newer server will include the domain name and account hash in the subject,
// and a token at the end.
//
// no domain: $JS.ACK.<account hash>.<stream>.<consumer>...
// with domain: $JS.ACK.<domain>.<account hash>.<stream>.<consumer>...
// Old subject was:
// $JS.ACK.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<tm>.<pending>
//
// So old server number of tokens is 9, newer is 10 or 11.
// New subject would be:
// $JS.ACK.<domain>.<account hash>.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<tm>.<pending>.<a token with a random value>
//
// v1 has 9 tokens, v2 has 12.
//
l := len(tokens)
if l < noDomainNoHashsExpectedAckTokens || l > withDomainAndHashExpectedAckTokens {
// If lower than 9 or more than 9 but less than 12, report an error
if l < v1TokenCounts || (l > v1TokenCounts && l < v2TokenCounts) {
return nil, ErrNotJSMessage
}
if tokens[0] != "$JS" || tokens[1] != "ACK" {
return nil, ErrNotJSMessage
}
// To make the rest of the library agnostic of that, we always return the tokens
// as if it is coming from a new server will all possible tokens. If domain or account
// hash are not specified, the tokens at those locations will simply be empty.
if l == noDomainNoHashsExpectedAckTokens || l == withDomainNoHashExpectedAckTokens {
// For v1 style, we insert 2 empty tokens (domain and hash) so that the
// rest of the library references known fields at a constant location.
if l == 9 {
// Extend the array (we know the backend is big enough)
// Compute how many tokens we need to insert.
itc := withDomainAndHashExpectedAckTokens - l
for i := 0; i < itc; i++ {
tokens = append(tokens, _EMPTY_)
}
tokens = append(tokens, _EMPTY_, _EMPTY_)
// Move to the right anything that is after "ACK" token.
copy(tokens[ackDomainTokenPos+itc:], tokens[ackDomainTokenPos:])
// Set the missing tokens to empty
for i := 0; i < itc; i++ {
tokens[ackDomainTokenPos+i] = _EMPTY_
}
copy(tokens[ackDomainTokenPos+2:], tokens[ackDomainTokenPos:])
// Clear the domain and hash tokens
tokens[ackDomainTokenPos], tokens[ackAccHashTokenPos] = _EMPTY_, _EMPTY_
} else if tokens[ackDomainTokenPos] == noDomainName {
// If domain is "_", replace with empty value.
tokens[ackDomainTokenPos] = _EMPTY_
}
return tokens, nil
}
@@ -2522,6 +2528,10 @@ const (
// DeliverByStartTimePolicy will deliver messages starting from a given
// time.
DeliverByStartTimePolicy
// DeliverLastPerSubjectPolicy will start the consumer with the last message
// for all subjects received.
DeliverLastPerSubjectPolicy
)
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
@@ -2536,6 +2546,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
*p = DeliverByStartSequencePolicy
case jsonString("by_start_time"):
*p = DeliverByStartTimePolicy
case jsonString("last_per_subject"):
*p = DeliverLastPerSubjectPolicy
}
return nil
@@ -2553,6 +2565,8 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
return json.Marshal("by_start_sequence")
case DeliverByStartTimePolicy:
return json.Marshal("by_start_time")
case DeliverLastPerSubjectPolicy:
return json.Marshal("last_per_subject")
default:
return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
}

View File

@@ -75,6 +75,7 @@ type JetStreamManager interface {
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`

2
vendor/modules.txt vendored
View File

@@ -9,7 +9,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.3
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9
# github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin