diff --git a/go.mod b/go.mod index e25284cf..f6d29dd9 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,13 @@ module github.com/nats-io/nats-server/v2 -go 1.16 +go 1.17 require ( github.com/golang/protobuf v1.4.2 // indirect github.com/klauspost/compress v1.13.4 github.com/minio/highwayhash v1.0.1 - github.com/nats-io/jwt/v2 v2.2.0 - github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f + github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 + github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 diff --git a/go.sum b/go.sum index 1965e7e8..58c1ab13 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEE github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE= -github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= +github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU= github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f h1:WtlVcCLq7NOOskKidJ1rRZoXwOTs6RIJSl0R/gIPBMg= diff --git a/server/accounts.go b/server/accounts.go index e4778ae9..d2102589 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3214,10 +3214,11 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 { // JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited a.jsLimits = &JetStreamAccountLimits{ - MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage, - MaxStore: ac.Limits.JetStreamLimits.DiskStorage, - MaxStreams: int(ac.Limits.JetStreamLimits.Streams), - MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer), + MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage, + MaxStore: ac.Limits.JetStreamLimits.DiskStorage, + MaxStreams: int(ac.Limits.JetStreamLimits.Streams), + MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer), + MaxBytesRequired: ac.Limits.JetStreamLimits.MaxBytesRequired, } } else if a.jsLimits != nil { // covers failed update followed by disable diff --git a/server/filestore.go b/server/filestore.go index bfbc468c..94b17839 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3171,7 +3171,9 @@ checkCache: mb.mu.Unlock() var ld *LostStreamData if ld, err = mb.rebuildState(); ld != nil { - fs.rebuildState(ld) + // We do not know if fs is locked or not at this point. + // This should be an exceptional condition so do so in Go routine. + go fs.rebuildState(ld) } mb.mu.Lock() } diff --git a/server/jwt_test.go b/server/jwt_test.go index 9c0f395a..a0a42386 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -4259,7 +4259,7 @@ func TestJWTJetStreamLimits(t *testing.T) { sysKp.Seed() sysCreds := genCredsFile(t, sysUserJwt, sysUSeed) // limits to apply and check - limits1 := jwt.JetStreamLimits{MemoryStorage: 1024 * 1024, DiskStorage: 2048 * 1024, Streams: 1, Consumer: 2} + limits1 := jwt.JetStreamLimits{MemoryStorage: 1024 * 1024, DiskStorage: 2048 * 1024, Streams: 1, Consumer: 2, MaxBytesRequired: true} // has valid limits that would fail when incorrectly applied twice limits2 := jwt.JetStreamLimits{MemoryStorage: 4096 * 1024, DiskStorage: 8192 * 1024, Streams: 3, Consumer: 4} // limits exceeding actual configured value of DiskStorage diff --git a/server/opts.go b/server/opts.go index 2881545a..dc170786 100644 --- a/server/opts.go +++ b/server/opts.go @@ -249,6 +249,7 @@ type Options struct { TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` TLSPinnedCerts PinnedCertSet `json:"-"` + TLSRateLimit int64 `json:"-"` AllowNonTLS bool `json:"-"` WriteDeadline time.Duration `json:"-"` MaxClosedClients int `json:"-"` @@ -513,6 +514,7 @@ type TLSConfigOpts struct { Map bool TLSCheckKnownURLs bool Timeout float64 + RateLimit int64 Ciphers []uint16 CurvePreferences []tls.CurveID PinnedCerts PinnedCertSet @@ -902,6 +904,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error o.TLSTimeout = tc.Timeout o.TLSMap = tc.Map o.TLSPinnedCerts = tc.PinnedCerts + o.TLSRateLimit = tc.RateLimit // Need to keep track of path of the original TLS config // and certs path for OCSP Stapling monitoring. @@ -3737,6 +3740,15 @@ func parseTLS(v interface{}, isClientCtx bool) (t *TLSConfigOpts, retErr error) return nil, &configErr{tk, "error parsing tls config, 'timeout' wrong type"} } tc.Timeout = at + case "connection_rate_limit": + at := int64(0) + switch mv := mv.(type) { + case int64: + at = mv + default: + return nil, &configErr{tk, "error parsing tls config, 'connection_rate_limit' wrong type"} + } + tc.RateLimit = at case "pinned_certs": ra, ok := mv.([]interface{}) if !ok { diff --git a/server/rate_counter.go b/server/rate_counter.go new file mode 100644 index 00000000..37b47dc7 --- /dev/null +++ b/server/rate_counter.go @@ -0,0 +1,65 @@ +// Copyright 2021-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "sync" + "time" +) + +type rateCounter struct { + limit int64 + count int64 + blocked uint64 + end time.Time + interval time.Duration + mu sync.Mutex +} + +func newRateCounter(limit int64) *rateCounter { + return &rateCounter{ + limit: limit, + interval: time.Second, + } +} + +func (r *rateCounter) allow() bool { + now := time.Now() + + r.mu.Lock() + + if now.After(r.end) { + r.count = 0 + r.end = now.Add(r.interval) + } else { + r.count++ + } + allow := r.count < r.limit + if !allow { + r.blocked++ + } + + r.mu.Unlock() + + return allow +} + +func (r *rateCounter) countBlocked() uint64 { + r.mu.Lock() + blocked := r.blocked + r.blocked = 0 + r.mu.Unlock() + + return blocked +} diff --git a/server/rate_counter_test.go b/server/rate_counter_test.go new file mode 100644 index 00000000..80f4a14e --- /dev/null +++ b/server/rate_counter_test.go @@ -0,0 +1,52 @@ +// Copyright 2021-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "testing" + "time" +) + +func TestRateCounter(t *testing.T) { + counter := newRateCounter(10) + counter.interval = 100 * time.Millisecond + + var i int + for i = 0; i < 10; i++ { + if !counter.allow() { + t.Errorf("counter should allow (iteration %d)", i) + } + } + for i = 0; i < 5; i++ { + if counter.allow() { + t.Errorf("counter should not allow (iteration %d)", i) + } + } + + blocked := counter.countBlocked() + if blocked != 5 { + t.Errorf("Expected blocked = 5, got %d", blocked) + } + + blocked = counter.countBlocked() + if blocked != 0 { + t.Errorf("Expected blocked = 0, got %d", blocked) + } + + time.Sleep(150 * time.Millisecond) + + if !counter.allow() { + t.Errorf("Expected true after current time window expired") + } +} diff --git a/server/server.go b/server/server.go index 6e3ded5e..93d31c6e 100644 --- a/server/server.go +++ b/server/server.go @@ -259,6 +259,8 @@ type Server struct { rerrMu sync.Mutex rerrLast time.Time + connRateCounter *rateCounter + // If there is a system account configured, to still support the $G account, // the server will create a fake user and add it to the list of users. // Keep track of what that user name is for config reload purposes. @@ -365,6 +367,10 @@ func NewServer(opts *Options) (*Server, error) { httpReqStats: make(map[string]uint64), // Used to track HTTP requests } + if opts.TLSRateLimit > 0 { + s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit) + } + // Trusted root operator keys. if !s.processTrustedKeys() { return nil, fmt.Errorf("Error processing trusted operator keys") @@ -513,6 +519,23 @@ func NewServer(opts *Options) (*Server, error) { return s, nil } +func (s *Server) logRejectedTLSConns() { + defer s.grWG.Done() + t := time.NewTicker(time.Second) + defer t.Stop() + for { + select { + case <-s.quitCh: + return + case <-t.C: + blocked := s.connRateCounter.countBlocked() + if blocked > 0 { + s.Warnf("Rejected %d connections due to TLS rate limiting", blocked) + } + } + } +} + // clusterName returns our cluster name which could be dynamic. func (s *Server) ClusterName() string { s.mu.Lock() @@ -1612,7 +1635,7 @@ func (s *Server) Start() { s.checkResolvePreloads() } - // Log the pid to a file + // Log the pid to a file. if opts.PidFile != _EMPTY_ { if err := s.logPid(); err != nil { s.Fatalf("Could not write pidfile: %v", err) @@ -1631,14 +1654,21 @@ func (s *Server) Start() { s.SetDefaultSystemAccount() } - // start up resolver machinery + // Start monitoring before enabling other subsystems of the + // server to be able to monitor during startup. + if err := s.StartMonitoring(); err != nil { + s.Fatalf("Can't start monitoring: %v", err) + return + } + + // Start up resolver machinery. if ar := s.AccountResolver(); ar != nil { if err := ar.Start(s); err != nil { s.Fatalf("Could not start resolver: %v", err) return } // In operator mode, when the account resolver depends on an external system and - // the system account is the bootstrapping account, start fetching it + // the system account is the bootstrapping account, start fetching it. if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT { _, isMemResolver := ar.(*MemAccResolver) if v, ok := s.accounts.Load(s.opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" { @@ -1726,12 +1756,6 @@ func (s *Server) Start() { // Start OCSP Stapling monitoring for TLS certificates if enabled. s.startOCSPMonitoring() - // Start monitoring if needed. - if err := s.StartMonitoring(); err != nil { - s.Fatalf("Can't start monitoring: %v", err) - return - } - // Start up gateway if needed. Do this before starting the routes, because // we want to resolve the gateway host:port so that this information can // be sent to other routes. @@ -1787,6 +1811,10 @@ func (s *Server) Start() { s.logPorts() } + if opts.TLSRateLimit > 0 { + s.startGoRoutine(s.logRejectedTLSConns) + } + // Wait for clients. s.AcceptLoop(clientListenReady) } @@ -2480,6 +2508,13 @@ func (s *Server) createClient(conn net.Conn) *client { // Check for TLS if !isClosed && tlsRequired { + if s.connRateCounter != nil && !s.connRateCounter.allow() { + c.mu.Unlock() + c.sendErr("Connection throttling is active. Please try again later.") + c.closeConnection(MaxConnectionsExceeded) + return nil + } + // If we have a prebuffer create a multi-reader. if len(pre) > 0 { c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)} diff --git a/test/tls_test.go b/test/tls_test.go index 6e92f827..ac17e722 100644 --- a/test/tls_test.go +++ b/test/tls_test.go @@ -1852,6 +1852,74 @@ func TestTLSPinnedCertsClient(t *testing.T) { nc.Close() } +type captureWarnLogger struct { + dummyLogger + receive chan string +} + +func newCaptureWarnLogger() *captureWarnLogger { + return &captureWarnLogger{ + receive: make(chan string, 100), + } +} + +func (l *captureWarnLogger) Warnf(format string, v ...interface{}) { + l.receive <- fmt.Sprintf(format, v...) +} + +func (l *captureWarnLogger) waitFor(expect string, timeout time.Duration) bool { + for { + select { + case msg := <-l.receive: + if strings.Contains(msg, expect) { + return true + } + case <-time.After(timeout): + return false + } + } +} + +func TestTLSConnectionRate(t *testing.T) { + config := ` + listen: "127.0.0.1:-1" + tls { + cert_file: "./configs/certs/server-cert.pem" + key_file: "./configs/certs/server-key.pem" + connection_rate_limit: 3 + } + ` + + confFileName := createConfFile(t, []byte(config)) + defer removeFile(t, confFileName) + + srv, _ := RunServerWithConfig(confFileName) + logger := newCaptureWarnLogger() + srv.SetLogger(logger, false, false) + defer srv.Shutdown() + + var err error + count := 0 + for count < 10 { + var nc *nats.Conn + nc, err = nats.Connect(srv.ClientURL(), nats.RootCAs("./configs/certs/ca.pem")) + + if err != nil { + break + } + nc.Close() + count++ + } + + if count != 3 { + t.Fatalf("Expected 3 connections per second, got %d (%v)", count, err) + } + + if !logger.waitFor("connections due to TLS rate limiting", time.Second) { + t.Fatalf("did not log 'TLS rate limiting' warning") + } +} + func TestTLSPinnedCertsRoute(t *testing.T) { tmplSeed := ` host: localhost diff --git a/vendor/github.com/minio/highwayhash/go.mod b/vendor/github.com/minio/highwayhash/go.mod deleted file mode 100644 index 3015eba9..00000000 --- a/vendor/github.com/minio/highwayhash/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module github.com/minio/highwayhash - -go 1.15 - -require golang.org/x/sys v0.0.0-20190130150945-aca44879d564 diff --git a/vendor/github.com/minio/highwayhash/go.sum b/vendor/github.com/minio/highwayhash/go.sum deleted file mode 100644 index b45a8e12..00000000 --- a/vendor/github.com/minio/highwayhash/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -golang.org/x/sys v0.0.0-20190130150945-aca44879d564 h1:o6ENHFwwr1TZ9CUPQcfo1HGvLP1OPsPOTB7xCIOPNmU= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/vendor/github.com/nats-io/jwt/v2/account_claims.go b/vendor/github.com/nats-io/jwt/v2/account_claims.go index 04065b6a..8568da16 100644 --- a/vendor/github.com/nats-io/jwt/v2/account_claims.go +++ b/vendor/github.com/nats-io/jwt/v2/account_claims.go @@ -51,15 +51,16 @@ func (n *NatsLimits) IsUnlimited() bool { } type JetStreamLimits struct { - MemoryStorage int64 `json:"mem_storage,omitempty"` // Max number of bytes stored in memory across all streams. (0 means disabled) - DiskStorage int64 `json:"disk_storage,omitempty"` // Max number of bytes stored on disk across all streams. (0 means disabled) - Streams int64 `json:"streams,omitempty"` // Max number of streams - Consumer int64 `json:"consumer,omitempty"` // Max number of consumer + MemoryStorage int64 `json:"mem_storage,omitempty"` // Max number of bytes stored in memory across all streams. (0 means disabled) + DiskStorage int64 `json:"disk_storage,omitempty"` // Max number of bytes stored on disk across all streams. (0 means disabled) + Streams int64 `json:"streams,omitempty"` // Max number of streams + Consumer int64 `json:"consumer,omitempty"` // Max number of consumers + MaxBytesRequired bool `json:"max_bytes_required,omitempty"` // Max bytes required by all Streams } // IsUnlimited returns true if all limits are unlimited func (j *JetStreamLimits) IsUnlimited() bool { - return *j == JetStreamLimits{NoLimit, NoLimit, NoLimit, NoLimit} + return *j == JetStreamLimits{NoLimit, NoLimit, NoLimit, NoLimit, false} } // OperatorLimits are used to limit access by an account @@ -188,7 +189,7 @@ func NewAccountClaims(subject string) *AccountClaims { c.Limits = OperatorLimits{ NatsLimits{NoLimit, NoLimit, NoLimit}, AccountLimits{NoLimit, NoLimit, true, NoLimit, NoLimit}, - JetStreamLimits{0, 0, 0, 0}} + JetStreamLimits{0, 0, 0, 0, false}} c.Subject = subject c.Mappings = Mapping{} return c diff --git a/vendor/github.com/nats-io/jwt/v2/creds_utils.go b/vendor/github.com/nats-io/jwt/v2/creds_utils.go index c532c887..94312ff5 100644 --- a/vendor/github.com/nats-io/jwt/v2/creds_utils.go +++ b/vendor/github.com/nats-io/jwt/v2/creds_utils.go @@ -21,6 +21,7 @@ import ( "fmt" "regexp" "strings" + "time" "github.com/nats-io/nkeys" ) @@ -216,3 +217,49 @@ func ParseDecoratedUserNKey(contents []byte) (nkeys.KeyPair, error) { } return kp, nil } + +// IssueUserJWT takes an account scoped signing key, account id, and use public key (and optionally a user's name, an expiration duration and tags) and returns a valid signed JWT. +// The scopedSigningKey, is a mandatory account scoped signing nkey pair to sign the generated jwt (note that it _must_ be a signing key attached to the account (and a _scoped_ signing key), not the account's private (seed) key). +// The accountId, is a mandatory public account nkey. Will return error when not set or not account nkey. +// The publicUserKey, is a mandatory public user nkey. Will return error when not set or not user nkey. +// The name, is an optional human-readable name. When absent, default to publicUserKey. +// The expirationDuration, is an optional but recommended duration, when the generated jwt needs to expire. If not set, JWT will not expire. +// The tags, is an optional list of tags to be included in the JWT. +// +// Returns: +// string, resulting jwt. +// error, when issues arose. +func IssueUserJWT(scopedSigningKey nkeys.KeyPair, accountId string, publicUserKey string, name string, expirationDuration time.Duration, tags ...string) (string, error) { + + if !nkeys.IsValidPublicAccountKey(accountId) { + return "", errors.New("issueUserJWT requires an account key for the accountId parameter, but got " + nkeys.Prefix(accountId).String()) + } + + if !nkeys.IsValidPublicUserKey(publicUserKey) { + return "", errors.New("issueUserJWT requires an account key for the publicUserKey parameter, but got " + nkeys.Prefix(publicUserKey).String()) + } + + claim := NewUserClaims(publicUserKey) + claim.SetScoped(true) + + if expirationDuration != 0 { + claim.Expires = time.Now().Add(expirationDuration).UTC().Unix() + } + + claim.IssuerAccount = accountId + if name != "" { + claim.Name = name + } else { + claim.Name = publicUserKey + } + + claim.Subject = publicUserKey + claim.Tags = tags + + encoded, err := claim.Encode(scopedSigningKey) + if err != nil { + return "", errors.New("err encoding claim " + err.Error()) + } + + return encoded, nil +} diff --git a/vendor/github.com/nats-io/jwt/v2/decoder_account.go b/vendor/github.com/nats-io/jwt/v2/decoder_account.go index 607d9840..820982c9 100644 --- a/vendor/github.com/nats-io/jwt/v2/decoder_account.go +++ b/vendor/github.com/nats-io/jwt/v2/decoder_account.go @@ -73,7 +73,7 @@ func (oa v1AccountClaims) migrateV1() (*AccountClaims, error) { a.Account.Exports = oa.v1NatsAccount.Exports a.Account.Limits.AccountLimits = oa.v1NatsAccount.Limits.AccountLimits a.Account.Limits.NatsLimits = oa.v1NatsAccount.Limits.NatsLimits - a.Account.Limits.JetStreamLimits = JetStreamLimits{0, 0, 0, 0} + a.Account.Limits.JetStreamLimits = JetStreamLimits{0, 0, 0, 0, false} a.Account.SigningKeys = make(SigningKeys) for _, v := range oa.SigningKeys { a.Account.SigningKeys.Add(v) diff --git a/vendor/github.com/nats-io/jwt/v2/go.mod b/vendor/github.com/nats-io/jwt/v2/go.mod deleted file mode 100644 index 56a8ddb6..00000000 --- a/vendor/github.com/nats-io/jwt/v2/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module github.com/nats-io/jwt/v2 - -go 1.16 - -require github.com/nats-io/nkeys v0.3.0 diff --git a/vendor/github.com/nats-io/jwt/v2/go.sum b/vendor/github.com/nats-io/jwt/v2/go.sum deleted file mode 100644 index e2eb2522..00000000 --- a/vendor/github.com/nats-io/jwt/v2/go.sum +++ /dev/null @@ -1,9 +0,0 @@ -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/vendor/github.com/nats-io/nats.go/go.mod b/vendor/github.com/nats-io/nats.go/go.mod deleted file mode 100644 index 63faedf4..00000000 --- a/vendor/github.com/nats-io/nats.go/go.mod +++ /dev/null @@ -1,8 +0,0 @@ -module github.com/nats-io/nats.go - -go 1.16 - -require ( - github.com/nats-io/nkeys v0.3.0 - github.com/nats-io/nuid v1.0.1 -) diff --git a/vendor/github.com/nats-io/nats.go/go.sum b/vendor/github.com/nats-io/nats.go/go.sum deleted file mode 100644 index 2138ffb5..00000000 --- a/vendor/github.com/nats-io/nats.go/go.sum +++ /dev/null @@ -1,11 +0,0 @@ -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 88203e89..38d7be5a 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -859,11 +859,6 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error { return nil } -func (ctx ContextOpt) configureSubscribe(opts *subOpts) error { - opts.ctx = ctx - return nil -} - func (ctx ContextOpt) configurePull(opts *pullOpts) error { opts.ctx = ctx return nil @@ -970,9 +965,6 @@ type jsSub struct { fcd uint64 fciseq uint64 csfct *time.Timer - - // Cancellation function to cancel context on drain/unsubscribe. - cancel func() } // Deletes the JS Consumer. @@ -1251,7 +1243,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, consumer = o.consumer isDurable = o.cfg.Durable != _EMPTY_ consumerBound = o.bound - ctx = o.ctx notFoundErr bool lookupErr bool nc = js.nc @@ -1398,13 +1389,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, deliver = nc.newInbox() } - // In case this has a context, then create a child context that - // is possible to cancel via unsubscribe / drain. - var cancel func() - if ctx != nil { - ctx, cancel = context.WithCancel(ctx) - } - jsi := &jsSub{ js: js, stream: stream, @@ -1417,7 +1401,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, pull: isPullMode, nms: nms, psubj: subj, - cancel: cancel, } // Check if we are manual ack. @@ -1556,14 +1539,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, sub.chanSubcheckForFlowControlResponse() } - // Wait for context to get canceled if there is one. - if ctx != nil { - go func() { - <-ctx.Done() - sub.Unsubscribe() - }() - } - return sub, nil } @@ -1838,7 +1813,7 @@ func (sub *Subscription) scheduleFlowControlResponse(reply string) { func (sub *Subscription) activityCheck() { sub.mu.Lock() jsi := sub.jsi - if jsi == nil || sub.closed { + if jsi == nil { sub.mu.Unlock() return } @@ -1847,9 +1822,10 @@ func (sub *Subscription) activityCheck() { jsi.hbc.Reset(jsi.hbi) jsi.active = false nc := sub.conn + closed := sub.closed sub.mu.Unlock() - if !active { + if !active && !closed { nc.mu.Lock() if errCB := nc.Opts.AsyncErrorCB; errCB != nil { nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) @@ -1977,7 +1953,6 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool - ctx context.Context } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index a727249f..deaefde2 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -689,9 +689,6 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { if o.metaOnly { subOpts = append(subOpts, HeadersOnly()) } - if o.ctx != nil { - subOpts = append(subOpts, Context(o.ctx)) - } sub, err := kv.js.Subscribe(keys, update, subOpts...) if err != nil { return nil, err diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 8b963c57..72759717 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -1191,7 +1191,7 @@ func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) { nc.Opts.DiscoveredServersCB = dscb } -// SetClosedHandler will set the closed event handler. +// SetClosedHandler will set the reconnect event handler. func (nc *Conn) SetClosedHandler(cb ConnHandler) { if nc == nil { return @@ -4144,20 +4144,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { nc.bw.appendString(fmt.Sprintf(unsubProto, s.sid, maxStr)) nc.kickFlusher() } - - // For JetStream subscriptions cancel the attached context if there is any. - var cancel func() - sub.mu.Lock() - jsi := sub.jsi - if jsi != nil { - cancel = jsi.cancel - jsi.cancel = nil - } - sub.mu.Unlock() - if cancel != nil { - cancel() - } - return nil } diff --git a/vendor/github.com/nats-io/nkeys/go.mod b/vendor/github.com/nats-io/nkeys/go.mod deleted file mode 100644 index 2384db3f..00000000 --- a/vendor/github.com/nats-io/nkeys/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module github.com/nats-io/nkeys - -go 1.16 - -require golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b diff --git a/vendor/github.com/nats-io/nkeys/go.sum b/vendor/github.com/nats-io/nkeys/go.sum deleted file mode 100644 index 42fec3ae..00000000 --- a/vendor/github.com/nats-io/nkeys/go.sum +++ /dev/null @@ -1,7 +0,0 @@ -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/vendor/modules.txt b/vendor/modules.txt index d591106f..6cadcbda 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,27 +1,27 @@ # github.com/golang/protobuf v1.4.2 -## explicit +## explicit; go 1.9 # github.com/klauspost/compress v1.13.4 -## explicit +## explicit; go 1.13 github.com/klauspost/compress/s2 # github.com/minio/highwayhash v1.0.1 -## explicit +## explicit; go 1.15 github.com/minio/highwayhash -# github.com/nats-io/jwt/v2 v2.2.0 -## explicit +# github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 +## explicit; go 1.16 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f -## explicit +# github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc +## explicit; go 1.16 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/util # github.com/nats-io/nkeys v0.3.0 -## explicit +## explicit; go 1.16 github.com/nats-io/nkeys # github.com/nats-io/nuid v1.0.1 ## explicit github.com/nats-io/nuid # golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 -## explicit +## explicit; go 1.17 golang.org/x/crypto/bcrypt golang.org/x/crypto/blowfish golang.org/x/crypto/chacha20 @@ -32,7 +32,7 @@ golang.org/x/crypto/internal/poly1305 golang.org/x/crypto/internal/subtle golang.org/x/crypto/ocsp # golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 -## explicit +## explicit; go 1.17 golang.org/x/sys/cpu golang.org/x/sys/internal/unsafeheader golang.org/x/sys/unix