diff --git a/go.mod b/go.mod index 8dbd89c5..c060aafa 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/klauspost/compress v1.13.4 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.3 - github.com/nats-io/nats.go v1.12.1 + github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e diff --git a/go.sum b/go.sum index c68eeb46..22c748b5 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats.go v1.12.1 h1:+0ndxwUPz3CmQ2vjbXdkC1fo3FdiOQDim4gl3Mge8Qo= -github.com/nats-io/nats.go v1.12.1/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891 h1:Aw3d+rbmA3BxN9AgUn/fapMmYgoCpTsOR9HQO6l3uNM= +github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 539b55e6..4d4adbda 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -125,24 +125,51 @@ type JetStream interface { PublishAsyncComplete() <-chan struct{} // Subscribe creates an async Subscription for JetStream. + // The stream and consumer names can be provided with the nats.Bind() option. + // For creating an ephemeral (where the consumer name is picked by the server), + // you can provide the stream name with nats.BindStream(). + // If no stream name is specified, the library will attempt to figure out which + // stream the subscription is for. See important notes below for more details. + // + // IMPORTANT NOTES: + // * If none of the options Bind() nor Durable() are specified, the library will + // send a request to the server to create an ephemeral JetStream consumer, + // which will be deleted after an Unsubscribe() or Drain(), or automatically + // by the server after a short period of time after the NATS subscription is + // gone. + // * If Durable() option is specified, the library will attempt to lookup a JetStream + // consumer with this name, and if found, will bind to it and not attempt to + // delete it. However, if not found, the library will send a request to create + // such durable JetStream consumer. The library will delete the JetStream consumer + // after an Unsubscribe() or Drain(). + // * If Bind() option is provided, the library will attempt to lookup the + // consumer with the given name, and if successful, bind to it. If the lookup fails, + // then the Subscribe() call will return an error. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // SubscribeSync creates a Subscription that can be used to process messages synchronously. + // See important note in Subscribe() SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) // ChanSubscribe creates channel based Subscription. + // See important note in Subscribe() ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // ChanQueueSubscribe creates channel based Subscription with a queue group. + // See important note in QueueSubscribe() ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // QueueSubscribe creates a Subscription with a queue group. + // If no optional durable name nor binding options are specified, the queue name will be used as a durable name. + // See important note in Subscribe() QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. + // See important note in QueueSubscribe() QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) // PullSubscribe creates a Subscription that can fetch messages. + // See important note in Subscribe() PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) } @@ -900,8 +927,7 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { return opt(opts) } -// Subscribe will create a subscription to the appropriate stream and consumer. -// +// Subscribe creates an async Subscription for JetStream. // The stream and consumer names can be provided with the nats.Bind() option. // For creating an ephemeral (where the consumer name is picked by the server), // you can provide the stream name with nats.BindStream(). @@ -909,19 +935,19 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { // stream the subscription is for. See important notes below for more details. // // IMPORTANT NOTES: -// * If Bind() and Durable() options are not specified, the library will +// * If none of the options Bind() nor Durable() are specified, the library will // send a request to the server to create an ephemeral JetStream consumer, // which will be deleted after an Unsubscribe() or Drain(), or automatically // by the server after a short period of time after the NATS subscription is // gone. -// * If Durable() only is specified, the library will attempt to lookup a JetStream -// consumer with this name and if found, will bind to it and not attempt to +// * If Durable() option is specified, the library will attempt to lookup a JetStream +// consumer with this name, and if found, will bind to it and not attempt to // delete it. However, if not found, the library will send a request to create -// such durable JetStream consumer, but will still attempt to delete it after -// an Unsubscribe() or Drain(). +// such durable JetStream consumer. The library will delete the JetStream consumer +// after an Unsubscribe() or Drain(). // * If Bind() option is provided, the library will attempt to lookup the -// consumer with the given name, and if the lookup fails, then the Subscribe() -// call will return an error. +// consumer with the given name, and if successful, bind to it. If the lookup fails, +// then the Subscribe() call will return an error. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -929,15 +955,15 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts) } -// SubscribeSync will create a sync subscription to the appropriate stream and consumer. +// SubscribeSync creates a Subscription that can be used to process messages synchronously. // See important note in Subscribe() func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts) } -// QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. -// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// QueueSubscribe creates a Subscription with a queue group. +// If no optional durable name nor binding options are specified, the queue name will be used as a durable name. // See important note in Subscribe() func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { @@ -946,28 +972,27 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) return js.subscribe(subj, queue, cb, nil, false, false, opts) } -// QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. -// If not optional durable name or binding option is specified, the queue name will be used as a durable name. -// See important note in Subscribe() +// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. +// See important note in QueueSubscribe() func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, queue, nil, mch, true, false, opts) } -// ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// ChanSubscribe creates channel based Subscription. // See important note in Subscribe() func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts) } -// ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. -// If not optional durable name or binding option is specified, the queue name will be used as a durable name. -// See important note in Subscribe() +// ChanQueueSubscribe creates channel based Subscription with a queue group. +// See important note in QueueSubscribe() func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, queue, nil, ch, false, false, opts) } -// PullSubscribe creates a pull subscriber. +// PullSubscribe creates a Subscription that can fetch messages. +// See important note in Subscribe() func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) @@ -2080,9 +2105,9 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) { case reqTimeoutSts: // Older servers may send a 408 when a request in the server was expired // and interest is still found, which will be the case for our - // implementation. Regardless, ignore 408 errors, the caller will - // go back to wait for the next message. - err = nil + // implementation. Regardless, ignore 408 errors until receiving at least + // one message. + err = ErrTimeout default: err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) } @@ -2227,6 +2252,10 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // wait this time. noWait = false err = sendReq() + } else if err == ErrTimeout && len(msgs) == 0 { + // If we get a 408, we will bail if we already collected some + // messages, otherwise ignore and go back calling nextMsg. + err = nil } } } diff --git a/vendor/github.com/nats-io/nats.go/ws.go b/vendor/github.com/nats-io/nats.go/ws.go index 4231f102..a35ce084 100644 --- a/vendor/github.com/nats-io/nats.go/ws.go +++ b/vendor/github.com/nats-io/nats.go/ws.go @@ -53,6 +53,7 @@ const ( wsContinuationFrame = 0 wsMaxFrameHeaderSize = 14 wsMaxControlPayloadSize = 125 + wsCloseSatusSize = 2 // From https://tools.ietf.org/html/rfc6455#section-11.7 wsCloseStatusNormalClosure = 1000 @@ -372,7 +373,6 @@ func (r *websocketReader) handleControlFrame(frameType wsOpCode, buf []byte, pos var payload []byte var err error - statusPos := pos if rem > 0 { payload, pos, err = wsGet(r.r, buf, pos, rem) if err != nil { @@ -382,17 +382,24 @@ func (r *websocketReader) handleControlFrame(frameType wsOpCode, buf []byte, pos switch frameType { case wsCloseMessage: status := wsCloseStatusNoStatusReceived - body := "" - // If there is a payload, it should contain 2 unsigned bytes - // that represent the status code and then optional payload. - if len(payload) >= 2 { - status = int(binary.BigEndian.Uint16(buf[statusPos : statusPos+2])) - body = string(buf[statusPos+2 : statusPos+len(payload)]) - if body != "" && !utf8.ValidString(body) { - // https://tools.ietf.org/html/rfc6455#section-5.5.1 - // If body is present, it must be a valid utf8 - status = wsCloseStatusInvalidPayloadData - body = "invalid utf8 body in close frame" + var body string + lp := len(payload) + // If there is a payload, the status is represented as a 2-byte + // unsigned integer (in network byte order). Then, there may be an + // optional body. + hasStatus, hasBody := lp >= wsCloseSatusSize, lp > wsCloseSatusSize + if hasStatus { + // Decode the status + status = int(binary.BigEndian.Uint16(payload[:wsCloseSatusSize])) + // Now if there is a body, capture it and make sure this is a valid UTF-8. + if hasBody { + body = string(payload[wsCloseSatusSize:]) + if !utf8.ValidString(body) { + // https://tools.ietf.org/html/rfc6455#section-5.5.1 + // If body is present, it must be a valid utf8 + status = wsCloseStatusInvalidPayloadData + body = "invalid utf8 body in close frame" + } } } r.nc.wsEnqueueCloseMsg(status, body) diff --git a/vendor/modules.txt b/vendor/modules.txt index d2d059eb..23a2df8c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.3 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.12.1 +# github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin