diff --git a/go.mod b/go.mod index 40320e0d..7dd4f91b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/klauspost/compress v1.11.12 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.3 - github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 + github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e diff --git a/go.sum b/go.sum index b0ec2ea4..46e4fff1 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls= github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41 h1:GUUkiOgD00OMr4foruBN6YG1be3lFnHl0LJIoEs8cQg= +github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/vendor/github.com/nats-io/nats.go/context.go b/vendor/github.com/nats-io/nats.go/context.go index 666a483a..b098a6d7 100644 --- a/vendor/github.com/nats-io/nats.go/context.go +++ b/vendor/github.com/nats-io/nats.go/context.go @@ -92,7 +92,7 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [ // oldRequestWithContext utilizes inbox and subscription per request. func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) { - inbox := NewInbox() + inbox := nc.newInbox() ch := make(chan *Msg, RequestChanLen) s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil) diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 7a0b8e47..0e68192a 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -695,7 +695,7 @@ func ExpectLastSequence(seq uint64) PubOpt { }) } -// ExpectLastMsgId sets the expected sequence in the response from the publish. +// ExpectLastMsgId sets the expected last msgId in the response from the publish. func ExpectLastMsgId(id string) PubOpt { return pubOptFn(func(opts *pubOpts) error { opts.lid = id @@ -1048,7 +1048,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // to which it should be attaching to. if consumer != _EMPTY_ { info, err = js.ConsumerInfo(stream, consumer) - notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found") + notFoundErr = errors.Is(err, ErrConsumerNotFound) lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded } @@ -1194,6 +1194,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } attached = true } else { + if cinfo.Error.Code == 404 { + return nil, ErrStreamNotFound + } return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) } } @@ -1355,6 +1358,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) { if err := json.Unmarshal(resp.Data, &slr); err != nil { return _EMPTY_, err } + if slr.Error != nil || len(slr.Streams) != 1 { return _EMPTY_, ErrNoMatchingStream } @@ -1889,6 +1893,9 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin return nil, err } if info.Error != nil { + if info.Error.Code == 404 { + return nil, ErrConsumerNotFound + } return nil, fmt.Errorf("nats: %s", info.Error.Description) } return info.ConsumerInfo, nil diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index e485ae14..00ea173a 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -74,23 +74,24 @@ type JetStreamManager interface { // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. type StreamConfig struct { - Name string `json:"name"` - Subjects []string `json:"subjects,omitempty"` - Retention RetentionPolicy `json:"retention"` - MaxConsumers int `json:"max_consumers"` - MaxMsgs int64 `json:"max_msgs"` - MaxBytes int64 `json:"max_bytes"` - Discard DiscardPolicy `json:"discard"` - MaxAge time.Duration `json:"max_age"` - MaxMsgSize int32 `json:"max_msg_size,omitempty"` - Storage StorageType `json:"storage"` - Replicas int `json:"num_replicas"` - NoAck bool `json:"no_ack,omitempty"` - Template string `json:"template_owner,omitempty"` - Duplicates time.Duration `json:"duplicate_window,omitempty"` - Placement *Placement `json:"placement,omitempty"` - Mirror *StreamSource `json:"mirror,omitempty"` - Sources []*StreamSource `json:"sources,omitempty"` + Name string `json:"name"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxConsumers int `json:"max_consumers"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + Discard DiscardPolicy `json:"discard"` + MaxAge time.Duration `json:"max_age"` + MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` + Template string `json:"template_owner,omitempty"` + Duplicates time.Duration `json:"duplicate_window,omitempty"` + Placement *Placement `json:"placement,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` } // Placement is used to guide placement of streams in clustered JetStream. @@ -258,6 +259,9 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C return nil, err } if info.Error != nil { + if info.Error.Code == 404 { + return nil, ErrConsumerNotFound + } return nil, errors.New(info.Error.Description) } return info.ConsumerInfo, nil @@ -292,7 +296,11 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { if err := json.Unmarshal(r.Data, &resp); err != nil { return err } + if resp.Error != nil { + if resp.Error.Code == 404 { + return ErrConsumerNotFound + } return errors.New(resp.Error.Description) } return nil @@ -559,6 +567,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { if resp.Error != nil { return nil, errors.New(resp.Error.Description) } + return resp.StreamInfo, nil } @@ -587,8 +596,12 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { return nil, err } if resp.Error != nil { + if resp.Error.Code == 404 { + return nil, ErrStreamNotFound + } return nil, errors.New(resp.Error.Description) } + return resp.StreamInfo, nil } @@ -701,7 +714,11 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { if err := json.Unmarshal(r.Data, &resp); err != nil { return err } + if resp.Error != nil { + if resp.Error.Code == 404 { + return ErrStreamNotFound + } return errors.New(resp.Error.Description) } return nil diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 50216834..ce8db129 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -146,6 +146,8 @@ var ( ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") ErrStreamNameRequired = errors.New("nats: stream name is required") + ErrStreamNotFound = errors.New("nats: stream not found") + ErrConsumerNotFound = errors.New("nats: consumer not found") ErrConsumerNameRequired = errors.New("nats: consumer name is required") ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") @@ -433,6 +435,9 @@ type Options struct { // For websocket connections, indicates to the server that the connection // supports compression. If the server does too, then data will be compressed. Compression bool + + // InboxPrefix allows the default _INBOX prefix to be customized + InboxPrefix string } const ( @@ -494,11 +499,13 @@ type Conn struct { ws bool // true if a websocket connection // New style response handler - respSub string // The wildcard subject - respScanf string // The scanf template to extract mux token - respMux *Subscription // A single response subscription - respMap map[string]chan *Msg // Request map for the response msg channels - respRand *rand.Rand // Used for generating suffix + respSub string // The wildcard subject + respSubPrefix string // the wildcard prefix including trailing . + respSubLen int // the length of the wildcard prefix excluding trailing . + respScanf string // The scanf template to extract mux token + respMux *Subscription // A single response subscription + respMap map[string]chan *Msg // Request map for the response msg channels + respRand *rand.Rand // Used for generating suffix } type natsReader struct { @@ -1101,6 +1108,17 @@ func Compression(enabled bool) Option { } } +// CustomInboxPrefix configures the request + reply inbox prefix +func CustomInboxPrefix(p string) Option { + return func(o *Options) error { + if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") { + return fmt.Errorf("nats: invald custom prefix") + } + o.InboxPrefix = p + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -3120,10 +3138,7 @@ func decodeHeadersMsg(data []byte) (Header, error) { // // https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeader func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) { - var ( - m = make(textproto.MIMEHeader) - strs []string - ) + m := make(textproto.MIMEHeader) for { kv, err := tp.ReadLine() if len(kv) == 0 { @@ -3145,16 +3160,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) { i++ } value := string(kv[i:]) - vv := m[key] - if vv == nil && len(strs) > 0 { - // Single value header. - vv, strs = strs[:1:1], strs[1:] - vv[0] = value - m[key] = vv - } else { - // Multi value header. - m[key] = append(vv, value) - } + m[key] = append(m[key], value) if err != nil { return m, err } @@ -3350,7 +3356,8 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms // Create new literal Inbox and map to a chan msg. mch := make(chan *Msg, RequestChanLen) respInbox := nc.newRespInbox() - token := respInbox[respInboxPrefixLen:] + token := respInbox[nc.respSubLen:] + nc.respMap[token] = mch if nc.respMux == nil { // Create the response subscription we will use for all new style responses. @@ -3457,7 +3464,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) // with the Inbox reply and return the first reply received. // This is optimized for the case of multiple responses. func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { - inbox := NewInbox() + inbox := nc.newInbox() ch := make(chan *Msg, RequestChanLen) s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil) @@ -3477,12 +3484,10 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) // InboxPrefix is the prefix for all inbox subjects. const ( - InboxPrefix = "_INBOX." - inboxPrefixLen = len(InboxPrefix) - respInboxPrefixLen = inboxPrefixLen + nuidSize + 1 - replySuffixLen = 8 // Gives us 62^8 - rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" - base = 62 + InboxPrefix = "_INBOX." + inboxPrefixLen = len(InboxPrefix) + rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + base = 62 ) // NewInbox will return an inbox string which can be used for directed replies from @@ -3497,10 +3502,23 @@ func NewInbox() string { return string(b[:]) } +func (nc *Conn) newInbox() string { + if nc.Opts.InboxPrefix == _EMPTY_ { + return NewInbox() + } + + var sb strings.Builder + sb.WriteString(nc.Opts.InboxPrefix) + sb.WriteByte('.') + sb.WriteString(nuid.Next()) + return sb.String() +} + // Function to init new response structures. func (nc *Conn) initNewResp() { - // _INBOX wildcard - nc.respSub = fmt.Sprintf("%s.*", NewInbox()) + nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox()) + nc.respSubLen = len(nc.respSubPrefix) + nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix) nc.respMap = make(map[string]chan *Msg) nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano())) } @@ -3512,15 +3530,17 @@ func (nc *Conn) newRespInbox() string { if nc.respMap == nil { nc.initNewResp() } - var b [respInboxPrefixLen + replySuffixLen]byte - pres := b[:respInboxPrefixLen] - copy(pres, nc.respSub) + + var sb strings.Builder + sb.WriteString(nc.respSubPrefix) + rn := nc.respRand.Int63() - for i, l := respInboxPrefixLen, rn; i < len(b); i++ { - b[i] = rdigits[l%base] - l /= base + for i := 0; i < nuidSize; i++ { + sb.WriteByte(rdigits[rn%base]) + rn /= base } - return string(b[:]) + + return sb.String() } // NewRespInbox is the new format used for _INBOX. diff --git a/vendor/modules.txt b/vendor/modules.txt index 50cad5d3..87318715 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.3 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 +# github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin