diff --git a/go.mod b/go.mod index e81923e5..9416450b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.11.12 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.1 - github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99 + github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b diff --git a/go.sum b/go.sum index fafc7029..59b8527e 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,7 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= @@ -31,7 +32,7 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1 github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY= github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0= github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA= -github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= @@ -39,8 +40,8 @@ github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zE github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk= github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= -github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99 h1:KHz7ujBiN9Zg9lqK5IvxW6ZwhW1v/PIRHCCVHkn0XZ0= -github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99/go.mod h1:OieyGzlIObT5YMgJfjuZS4tXG7fUUdRH+hDqioUKbDw= +github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 h1:z/0dTBxMgMfWOtmpyHrbIDKx2duzrxkUeQYJMUnRPj4= +github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8/go.mod h1:Zq9IEHy7zurF0kFbU5aLIknnFI7guh8ijHk+2v+Vf5g= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= @@ -69,6 +70,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/server/jwt_test.go b/server/jwt_test.go index 02c7e1b0..5b133eaa 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -4405,7 +4405,8 @@ func TestJWTUserRevocation(t *testing.T) { defer ncSys.Close() ncChan := make(chan *nats.Msg, 10) defer close(ncChan) - ncSys.ChanSubscribe(fmt.Sprintf(disconnectEventSubj, apub), ncChan) // observe disconnect message + sub, _ := ncSys.ChanSubscribe(fmt.Sprintf(disconnectEventSubj, apub), ncChan) // observe disconnect message + defer sub.Unsubscribe() // use credentials that will be revoked ans assure that the connection will be disconnected nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { @@ -4516,8 +4517,8 @@ func TestJWTAccountOps(t *testing.T) { }) // connect so there is a reason to cache the request and so disconnect can be observed ncA := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1), nats.NoReconnect(), - nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { - if lErr := conn.LastError(); strings.Contains(lErr.Error(), "Account Authentication Expired") { + nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + if err != nil && strings.Contains(err.Error(), "account authentication expired") { disconnectErrChan <- struct{}{} } })) diff --git a/vendor/github.com/nats-io/nats.go/go.mod b/vendor/github.com/nats-io/nats.go/go.mod index 91b5659d..21f393d5 100644 --- a/vendor/github.com/nats-io/nats.go/go.mod +++ b/vendor/github.com/nats-io/nats.go/go.mod @@ -4,7 +4,7 @@ go 1.15 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 + github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/vendor/github.com/nats-io/nats.go/go.sum b/vendor/github.com/nats-io/nats.go/go.sum index 109fb481..49ca66f9 100644 --- a/vendor/github.com/nats-io/nats.go/go.sum +++ b/vendor/github.com/nats-io/nats.go/go.sum @@ -32,8 +32,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1 github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY= github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0= github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA= -github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 h1:ybeT5VFA73CVQb4rCL+48+up91xWheriSBbJ3M2Pzps= -github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d h1:Fi5DT3pdyqP280FPGdkQD+bDjfpR5orUhZ2hhVEU/JA= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 58545765..ae2f204f 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -677,6 +677,15 @@ func ExpectLastMsgId(id string) PubOpt { }) } +type ackOpts struct { + ttl time.Duration + ctx context.Context +} + +type AckOpt interface { + configureAck(opts *ackOpts) error +} + // MaxWait sets the maximum amount of time we will wait for a response. type MaxWait time.Duration @@ -703,6 +712,11 @@ func (ttl AckWait) configureSubscribe(opts *subOpts) error { return nil } +func (ttl AckWait) configureAck(opts *ackOpts) error { + opts.ttl = time.Duration(ttl) + return nil +} + // ContextOpt is an option used to set a context.Context. type ContextOpt struct { context.Context @@ -723,6 +737,11 @@ func (ctx ContextOpt) configurePull(opts *pullOpts) error { return nil } +func (ctx ContextOpt) configureAck(opts *ackOpts) error { + opts.ctx = ctx + return nil +} + // Context returns an option that can be used to configure a context. func Context(ctx context.Context) ContextOpt { return ContextOpt{ctx} @@ -814,13 +833,13 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti if cb == nil { return nil, ErrBadSubscription } - return js.subscribe(subj, _EMPTY_, cb, nil, opts) + return js.subscribe(subj, _EMPTY_, cb, nil, false, opts) } // SubscribeSync will create a sync subscription to the appropriate stream and consumer. func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) - return js.subscribe(subj, _EMPTY_, nil, mch, opts) + return js.subscribe(subj, _EMPTY_, nil, mch, true, opts) } // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. @@ -828,26 +847,26 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) if cb == nil { return nil, ErrBadSubscription } - return js.subscribe(subj, queue, cb, nil, opts) + return js.subscribe(subj, queue, cb, nil, false, opts) } // QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) - return js.subscribe(subj, queue, nil, mch, opts) + return js.subscribe(subj, queue, nil, mch, true, opts) } // Subscribe will create a subscription to the appropriate stream and consumer. func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, _EMPTY_, nil, ch, opts) + return js.subscribe(subj, _EMPTY_, nil, ch, false, opts) } // PullSubscribe creates a pull subscriber. func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { - return js.subscribe(subj, _EMPTY_, nil, nil, append(opts, Durable(durable))) + return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable))) } -func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []SubOpt) (*Subscription, error) { +func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} if len(opts) > 0 { @@ -932,7 +951,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] if isPullMode { sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}} } else { - sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) + sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js}) if err != nil { return nil, err } @@ -1549,13 +1568,14 @@ func (m *Msg) checkReply() (*js, bool, error) { // ackReply handles all acks. Will do the right thing for pull and sync mode. // It ensures that an ack is only sent a single time, regardless of // how many times it is being called to avoid duplicated acks. -func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { - var o pubOpts +func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error { + var o ackOpts for _, opt := range opts { - if err := opt.configurePublish(&o); err != nil { + if err := opt.configureAck(&o); err != nil { return err } } + js, _, err := m.checkReply() if err != nil { return err @@ -1570,16 +1590,19 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { nc := m.Sub.conn m.Sub.mu.Unlock() + usesCtx := o.ctx != nil + usesWait := o.ttl > 0 + sync = sync || usesCtx || usesWait ctx := o.ctx wait := defaultRequestWait - if o.ttl > 0 { + if usesWait { wait = o.ttl } else if js != nil { wait = js.opts.wait } if sync { - if ctx != nil { + if usesCtx { _, err = nc.RequestWithContext(ctx, m.Reply, ackType) } else { _, err = nc.Request(m.Reply, ackType, wait) @@ -1599,33 +1622,33 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { // Ack acknowledges a message. This tells the server that the message was // successfully processed and it can move on to the next message. -func (m *Msg) Ack() error { - return m.ackReply(ackAck, false) +func (m *Msg) Ack(opts ...AckOpt) error { + return m.ackReply(ackAck, false, opts...) } // Ack is the synchronous version of Ack. This indicates successful message // processing. -func (m *Msg) AckSync(opts ...PubOpt) error { +func (m *Msg) AckSync(opts ...AckOpt) error { return m.ackReply(ackAck, true, opts...) } // Nak negatively acknowledges a message. This tells the server to redeliver // the message. You can configure the number of redeliveries by passing // nats.MaxDeliver when you Subscribe. The default is infinite redeliveries. -func (m *Msg) Nak() error { - return m.ackReply(ackNak, false) +func (m *Msg) Nak(opts ...AckOpt) error { + return m.ackReply(ackNak, false, opts...) } // Term tells the server to not redeliver this message, regardless of the value // of nats.MaxDeliver. -func (m *Msg) Term() error { - return m.ackReply(ackTerm, false) +func (m *Msg) Term(opts ...AckOpt) error { + return m.ackReply(ackTerm, false, opts...) } // InProgress tells the server that this message is being worked on. It resets // the redelivery timer on the server. -func (m *Msg) InProgress() error { - return m.ackReply(ackProgress, false) +func (m *Msg) InProgress(opts ...AckOpt) error { + return m.ackReply(ackProgress, false, opts...) } // MsgMetadata is the JetStream metadata associated with received messages. diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 57029abd..eabd9adf 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -78,6 +78,9 @@ const ( // AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked. AUTHENTICATION_REVOKED_ERR = "user authentication revoked" + + // ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired. + ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired" ) // Errors @@ -98,6 +101,7 @@ var ( ErrAuthorization = errors.New("nats: authorization violation") ErrAuthExpired = errors.New("nats: authentication expired") ErrAuthRevoked = errors.New("nats: authentication revoked") + ErrAccountAuthExpired = errors.New("nats: account authentication expired") ErrNoServers = errors.New("nats: no servers available for connection") ErrJsonParse = errors.New("nats: connect message, json parse error") ErrChanArg = errors.New("nats: argument needs to be a channel type") @@ -2766,6 +2770,9 @@ func checkAuthError(e string) error { if strings.HasPrefix(e, AUTHENTICATION_REVOKED_ERR) { return ErrAuthRevoked } + if strings.HasPrefix(e, ACCOUNT_AUTHENTICATION_EXPIRED_ERR) { + return ErrAccountAuthExpired + } return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index e51131d4..26c4e76a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -7,7 +7,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.1 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99 +# github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin