Update nats.go version

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
Waldemar Quevedo
2021-09-16 15:50:39 -07:00
parent 670a81e213
commit 0bef39ab5b
5 changed files with 75 additions and 39 deletions

View File

@@ -125,24 +125,51 @@ type JetStream interface {
PublishAsyncComplete() <-chan struct{}
// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
// ChanSubscribe creates channel based Subscription.
// See important note in Subscribe()
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// See important note in Subscribe()
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}
@@ -900,8 +927,7 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error {
return opt(opts)
}
// Subscribe will create a subscription to the appropriate stream and consumer.
//
// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
@@ -909,19 +935,19 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error {
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If Bind() and Durable() options are not specified, the library will
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() only is specified, the library will attempt to lookup a JetStream
// consumer with this name and if found, will bind to it and not attempt to
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer, but will still attempt to delete it after
// an Unsubscribe() or Drain().
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if the lookup fails, then the Subscribe()
// call will return an error.
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
if cb == nil {
return nil, ErrBadSubscription
@@ -929,15 +955,15 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti
return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts)
}
// SubscribeSync will create a sync subscription to the appropriate stream and consumer.
// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts)
}
// QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics.
// If not optional durable name or binding option is specified, the queue name will be used as a durable name.
// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// See important note in Subscribe()
func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
if cb == nil {
@@ -946,28 +972,27 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt)
return js.subscribe(subj, queue, cb, nil, false, false, opts)
}
// QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics.
// If not optional durable name or binding option is specified, the queue name will be used as a durable name.
// See important note in Subscribe()
// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, queue, nil, mch, true, false, opts)
}
// ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel.
// ChanSubscribe creates channel based Subscription.
// See important note in Subscribe()
func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts)
}
// ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel.
// If not optional durable name or binding option is specified, the queue name will be used as a durable name.
// See important note in Subscribe()
// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, queue, nil, ch, false, false, opts)
}
// PullSubscribe creates a pull subscriber.
// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
@@ -2080,9 +2105,9 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) {
case reqTimeoutSts:
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors, the caller will
// go back to wait for the next message.
err = nil
// implementation. Regardless, ignore 408 errors until receiving at least
// one message.
err = ErrTimeout
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
@@ -2227,6 +2252,10 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// wait this time.
noWait = false
err = sendReq()
} else if err == ErrTimeout && len(msgs) == 0 {
// If we get a 408, we will bail if we already collected some
// messages, otherwise ignore and go back calling nextMsg.
err = nil
}
}
}

View File

@@ -53,6 +53,7 @@ const (
wsContinuationFrame = 0
wsMaxFrameHeaderSize = 14
wsMaxControlPayloadSize = 125
wsCloseSatusSize = 2
// From https://tools.ietf.org/html/rfc6455#section-11.7
wsCloseStatusNormalClosure = 1000
@@ -372,7 +373,6 @@ func (r *websocketReader) handleControlFrame(frameType wsOpCode, buf []byte, pos
var payload []byte
var err error
statusPos := pos
if rem > 0 {
payload, pos, err = wsGet(r.r, buf, pos, rem)
if err != nil {
@@ -382,17 +382,24 @@ func (r *websocketReader) handleControlFrame(frameType wsOpCode, buf []byte, pos
switch frameType {
case wsCloseMessage:
status := wsCloseStatusNoStatusReceived
body := ""
// If there is a payload, it should contain 2 unsigned bytes
// that represent the status code and then optional payload.
if len(payload) >= 2 {
status = int(binary.BigEndian.Uint16(buf[statusPos : statusPos+2]))
body = string(buf[statusPos+2 : statusPos+len(payload)])
if body != "" && !utf8.ValidString(body) {
// https://tools.ietf.org/html/rfc6455#section-5.5.1
// If body is present, it must be a valid utf8
status = wsCloseStatusInvalidPayloadData
body = "invalid utf8 body in close frame"
var body string
lp := len(payload)
// If there is a payload, the status is represented as a 2-byte
// unsigned integer (in network byte order). Then, there may be an
// optional body.
hasStatus, hasBody := lp >= wsCloseSatusSize, lp > wsCloseSatusSize
if hasStatus {
// Decode the status
status = int(binary.BigEndian.Uint16(payload[:wsCloseSatusSize]))
// Now if there is a body, capture it and make sure this is a valid UTF-8.
if hasBody {
body = string(payload[wsCloseSatusSize:])
if !utf8.ValidString(body) {
// https://tools.ietf.org/html/rfc6455#section-5.5.1
// If body is present, it must be a valid utf8
status = wsCloseStatusInvalidPayloadData
body = "invalid utf8 body in close frame"
}
}
}
r.nc.wsEnqueueCloseMsg(status, body)