From 42818f06c1b16168e6951bef0f4a62e88b56708b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 10 Jan 2022 16:58:52 -0800 Subject: [PATCH] Update client Signed-off-by: Derek Collison --- go.mod | 4 +-- go.sum | 4 +++ vendor/github.com/nats-io/nats.go/js.go | 31 ++++++++++++++++++++--- vendor/github.com/nats-io/nats.go/kv.go | 3 +++ vendor/github.com/nats-io/nats.go/nats.go | 16 +++++++++++- vendor/modules.txt | 4 +-- 6 files changed, 54 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 6619a037..e25284cf 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,10 @@ require ( github.com/klauspost/compress v1.13.4 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.2.0 - github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc + github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 - golang.org/x/crypto v0.0.0-20211202192323-5770296d904e + golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 ) diff --git a/go.sum b/go.sum index 5136dc1b..1965e7e8 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE= github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU= github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f h1:WtlVcCLq7NOOskKidJ1rRZoXwOTs6RIJSl0R/gIPBMg= +github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -25,6 +27,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211202192323-5770296d904e h1:MUP6MR3rJ7Gk9LEia0LP2ytiH6MuCfs7qYz+47jGdD8= golang.org/x/crypto v0.0.0-20211202192323-5770296d904e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M= +golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 38d7be5a..88203e89 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -859,6 +859,11 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error { return nil } +func (ctx ContextOpt) configureSubscribe(opts *subOpts) error { + opts.ctx = ctx + return nil +} + func (ctx ContextOpt) configurePull(opts *pullOpts) error { opts.ctx = ctx return nil @@ -965,6 +970,9 @@ type jsSub struct { fcd uint64 fciseq uint64 csfct *time.Timer + + // Cancellation function to cancel context on drain/unsubscribe. + cancel func() } // Deletes the JS Consumer. @@ -1243,6 +1251,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, consumer = o.consumer isDurable = o.cfg.Durable != _EMPTY_ consumerBound = o.bound + ctx = o.ctx notFoundErr bool lookupErr bool nc = js.nc @@ -1389,6 +1398,13 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, deliver = nc.newInbox() } + // In case this has a context, then create a child context that + // is possible to cancel via unsubscribe / drain. + var cancel func() + if ctx != nil { + ctx, cancel = context.WithCancel(ctx) + } + jsi := &jsSub{ js: js, stream: stream, @@ -1401,6 +1417,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, pull: isPullMode, nms: nms, psubj: subj, + cancel: cancel, } // Check if we are manual ack. @@ -1539,6 +1556,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, sub.chanSubcheckForFlowControlResponse() } + // Wait for context to get canceled if there is one. + if ctx != nil { + go func() { + <-ctx.Done() + sub.Unsubscribe() + }() + } + return sub, nil } @@ -1813,7 +1838,7 @@ func (sub *Subscription) scheduleFlowControlResponse(reply string) { func (sub *Subscription) activityCheck() { sub.mu.Lock() jsi := sub.jsi - if jsi == nil { + if jsi == nil || sub.closed { sub.mu.Unlock() return } @@ -1822,10 +1847,9 @@ func (sub *Subscription) activityCheck() { jsi.hbc.Reset(jsi.hbi) jsi.active = false nc := sub.conn - closed := sub.closed sub.mu.Unlock() - if !active && !closed { + if !active { nc.mu.Lock() if errCB := nc.Opts.AsyncErrorCB; errCB != nil { nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) @@ -1953,6 +1977,7 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool + ctx context.Context } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index deaefde2..a727249f 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -689,6 +689,9 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { if o.metaOnly { subOpts = append(subOpts, HeadersOnly()) } + if o.ctx != nil { + subOpts = append(subOpts, Context(o.ctx)) + } sub, err := kv.js.Subscribe(keys, update, subOpts...) if err != nil { return nil, err diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 72759717..8b963c57 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -1191,7 +1191,7 @@ func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) { nc.Opts.DiscoveredServersCB = dscb } -// SetClosedHandler will set the reconnect event handler. +// SetClosedHandler will set the closed event handler. func (nc *Conn) SetClosedHandler(cb ConnHandler) { if nc == nil { return @@ -4144,6 +4144,20 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { nc.bw.appendString(fmt.Sprintf(unsubProto, s.sid, maxStr)) nc.kickFlusher() } + + // For JetStream subscriptions cancel the attached context if there is any. + var cancel func() + sub.mu.Lock() + jsi := sub.jsi + if jsi != nil { + cancel = jsi.cancel + jsi.cancel = nil + } + sub.mu.Unlock() + if cancel != nil { + cancel() + } + return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index c78f5afe..d591106f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.2.0 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc +# github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin @@ -20,7 +20,7 @@ github.com/nats-io/nkeys # github.com/nats-io/nuid v1.0.1 ## explicit github.com/nats-io/nuid -# golang.org/x/crypto v0.0.0-20211202192323-5770296d904e +# golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 ## explicit golang.org/x/crypto/bcrypt golang.org/x/crypto/blowfish