Update client

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-01-10 16:58:52 -08:00
parent 08ff14a24e
commit 42818f06c1
6 changed files with 54 additions and 8 deletions

4
go.mod
View File

@@ -7,10 +7,10 @@ require (
github.com/klauspost/compress v1.13.4
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.2.0
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20211202192323-5770296d904e
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
)

4
go.sum
View File

@@ -18,6 +18,8 @@ github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE=
github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f h1:WtlVcCLq7NOOskKidJ1rRZoXwOTs6RIJSl0R/gIPBMg=
github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -25,6 +27,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20211202192323-5770296d904e h1:MUP6MR3rJ7Gk9LEia0LP2ytiH6MuCfs7qYz+47jGdD8=
golang.org/x/crypto v0.0.0-20211202192323-5770296d904e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@@ -859,6 +859,11 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
return nil
}
func (ctx ContextOpt) configureSubscribe(opts *subOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configurePull(opts *pullOpts) error {
opts.ctx = ctx
return nil
@@ -965,6 +970,9 @@ type jsSub struct {
fcd uint64
fciseq uint64
csfct *time.Timer
// Cancellation function to cancel context on drain/unsubscribe.
cancel func()
}
// Deletes the JS Consumer.
@@ -1243,6 +1251,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.bound
ctx = o.ctx
notFoundErr bool
lookupErr bool
nc = js.nc
@@ -1389,6 +1398,13 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
deliver = nc.newInbox()
}
// In case this has a context, then create a child context that
// is possible to cancel via unsubscribe / drain.
var cancel func()
if ctx != nil {
ctx, cancel = context.WithCancel(ctx)
}
jsi := &jsSub{
js: js,
stream: stream,
@@ -1401,6 +1417,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
pull: isPullMode,
nms: nms,
psubj: subj,
cancel: cancel,
}
// Check if we are manual ack.
@@ -1539,6 +1556,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
sub.chanSubcheckForFlowControlResponse()
}
// Wait for context to get canceled if there is one.
if ctx != nil {
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()
}
return sub, nil
}
@@ -1813,7 +1838,7 @@ func (sub *Subscription) scheduleFlowControlResponse(reply string) {
func (sub *Subscription) activityCheck() {
sub.mu.Lock()
jsi := sub.jsi
if jsi == nil {
if jsi == nil || sub.closed {
sub.mu.Unlock()
return
}
@@ -1822,10 +1847,9 @@ func (sub *Subscription) activityCheck() {
jsi.hbc.Reset(jsi.hbi)
jsi.active = false
nc := sub.conn
closed := sub.closed
sub.mu.Unlock()
if !active && !closed {
if !active {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
@@ -1953,6 +1977,7 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
ctx context.Context
}
// OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages.

View File

@@ -689,6 +689,9 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if o.metaOnly {
subOpts = append(subOpts, HeadersOnly())
}
if o.ctx != nil {
subOpts = append(subOpts, Context(o.ctx))
}
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err

View File

@@ -1191,7 +1191,7 @@ func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
nc.Opts.DiscoveredServersCB = dscb
}
// SetClosedHandler will set the reconnect event handler.
// SetClosedHandler will set the closed event handler.
func (nc *Conn) SetClosedHandler(cb ConnHandler) {
if nc == nil {
return
@@ -4144,6 +4144,20 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
nc.bw.appendString(fmt.Sprintf(unsubProto, s.sid, maxStr))
nc.kickFlusher()
}
// For JetStream subscriptions cancel the attached context if there is any.
var cancel func()
sub.mu.Lock()
jsi := sub.jsi
if jsi != nil {
cancel = jsi.cancel
jsi.cancel = nil
}
sub.mu.Unlock()
if cancel != nil {
cancel()
}
return nil
}

4
vendor/modules.txt vendored
View File

@@ -9,7 +9,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.2.0
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
# github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
@@ -20,7 +20,7 @@ github.com/nats-io/nkeys
# github.com/nats-io/nuid v1.0.1
## explicit
github.com/nats-io/nuid
# golang.org/x/crypto v0.0.0-20211202192323-5770296d904e
# golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3
## explicit
golang.org/x/crypto/bcrypt
golang.org/x/crypto/blowfish