Merge from main

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-01-13 10:01:33 -08:00
25 changed files with 324 additions and 135 deletions

6
go.mod
View File

@@ -1,13 +1,13 @@
module github.com/nats-io/nats-server/v2
go 1.16
go 1.17
require (
github.com/golang/protobuf v1.4.2 // indirect
github.com/klauspost/compress v1.13.4
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.2.0
github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3

4
go.sum
View File

@@ -14,8 +14,8 @@ github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEE
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE=
github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f h1:WtlVcCLq7NOOskKidJ1rRZoXwOTs6RIJSl0R/gIPBMg=

View File

@@ -3214,10 +3214,11 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 {
// JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited
a.jsLimits = &JetStreamAccountLimits{
MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage,
MaxStore: ac.Limits.JetStreamLimits.DiskStorage,
MaxStreams: int(ac.Limits.JetStreamLimits.Streams),
MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer),
MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage,
MaxStore: ac.Limits.JetStreamLimits.DiskStorage,
MaxStreams: int(ac.Limits.JetStreamLimits.Streams),
MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer),
MaxBytesRequired: ac.Limits.JetStreamLimits.MaxBytesRequired,
}
} else if a.jsLimits != nil {
// covers failed update followed by disable

View File

@@ -3171,7 +3171,9 @@ checkCache:
mb.mu.Unlock()
var ld *LostStreamData
if ld, err = mb.rebuildState(); ld != nil {
fs.rebuildState(ld)
// We do not know if fs is locked or not at this point.
// This should be an exceptional condition so do so in Go routine.
go fs.rebuildState(ld)
}
mb.mu.Lock()
}

View File

@@ -4259,7 +4259,7 @@ func TestJWTJetStreamLimits(t *testing.T) {
sysKp.Seed()
sysCreds := genCredsFile(t, sysUserJwt, sysUSeed)
// limits to apply and check
limits1 := jwt.JetStreamLimits{MemoryStorage: 1024 * 1024, DiskStorage: 2048 * 1024, Streams: 1, Consumer: 2}
limits1 := jwt.JetStreamLimits{MemoryStorage: 1024 * 1024, DiskStorage: 2048 * 1024, Streams: 1, Consumer: 2, MaxBytesRequired: true}
// has valid limits that would fail when incorrectly applied twice
limits2 := jwt.JetStreamLimits{MemoryStorage: 4096 * 1024, DiskStorage: 8192 * 1024, Streams: 3, Consumer: 4}
// limits exceeding actual configured value of DiskStorage

View File

@@ -249,6 +249,7 @@ type Options struct {
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
TLSPinnedCerts PinnedCertSet `json:"-"`
TLSRateLimit int64 `json:"-"`
AllowNonTLS bool `json:"-"`
WriteDeadline time.Duration `json:"-"`
MaxClosedClients int `json:"-"`
@@ -513,6 +514,7 @@ type TLSConfigOpts struct {
Map bool
TLSCheckKnownURLs bool
Timeout float64
RateLimit int64
Ciphers []uint16
CurvePreferences []tls.CurveID
PinnedCerts PinnedCertSet
@@ -902,6 +904,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
o.TLSTimeout = tc.Timeout
o.TLSMap = tc.Map
o.TLSPinnedCerts = tc.PinnedCerts
o.TLSRateLimit = tc.RateLimit
// Need to keep track of path of the original TLS config
// and certs path for OCSP Stapling monitoring.
@@ -3737,6 +3740,15 @@ func parseTLS(v interface{}, isClientCtx bool) (t *TLSConfigOpts, retErr error)
return nil, &configErr{tk, "error parsing tls config, 'timeout' wrong type"}
}
tc.Timeout = at
case "connection_rate_limit":
at := int64(0)
switch mv := mv.(type) {
case int64:
at = mv
default:
return nil, &configErr{tk, "error parsing tls config, 'connection_rate_limit' wrong type"}
}
tc.RateLimit = at
case "pinned_certs":
ra, ok := mv.([]interface{})
if !ok {

65
server/rate_counter.go Normal file
View File

@@ -0,0 +1,65 @@
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"sync"
"time"
)
type rateCounter struct {
limit int64
count int64
blocked uint64
end time.Time
interval time.Duration
mu sync.Mutex
}
func newRateCounter(limit int64) *rateCounter {
return &rateCounter{
limit: limit,
interval: time.Second,
}
}
func (r *rateCounter) allow() bool {
now := time.Now()
r.mu.Lock()
if now.After(r.end) {
r.count = 0
r.end = now.Add(r.interval)
} else {
r.count++
}
allow := r.count < r.limit
if !allow {
r.blocked++
}
r.mu.Unlock()
return allow
}
func (r *rateCounter) countBlocked() uint64 {
r.mu.Lock()
blocked := r.blocked
r.blocked = 0
r.mu.Unlock()
return blocked
}

View File

@@ -0,0 +1,52 @@
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"testing"
"time"
)
func TestRateCounter(t *testing.T) {
counter := newRateCounter(10)
counter.interval = 100 * time.Millisecond
var i int
for i = 0; i < 10; i++ {
if !counter.allow() {
t.Errorf("counter should allow (iteration %d)", i)
}
}
for i = 0; i < 5; i++ {
if counter.allow() {
t.Errorf("counter should not allow (iteration %d)", i)
}
}
blocked := counter.countBlocked()
if blocked != 5 {
t.Errorf("Expected blocked = 5, got %d", blocked)
}
blocked = counter.countBlocked()
if blocked != 0 {
t.Errorf("Expected blocked = 0, got %d", blocked)
}
time.Sleep(150 * time.Millisecond)
if !counter.allow() {
t.Errorf("Expected true after current time window expired")
}
}

View File

@@ -259,6 +259,8 @@ type Server struct {
rerrMu sync.Mutex
rerrLast time.Time
connRateCounter *rateCounter
// If there is a system account configured, to still support the $G account,
// the server will create a fake user and add it to the list of users.
// Keep track of what that user name is for config reload purposes.
@@ -365,6 +367,10 @@ func NewServer(opts *Options) (*Server, error) {
httpReqStats: make(map[string]uint64), // Used to track HTTP requests
}
if opts.TLSRateLimit > 0 {
s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit)
}
// Trusted root operator keys.
if !s.processTrustedKeys() {
return nil, fmt.Errorf("Error processing trusted operator keys")
@@ -513,6 +519,23 @@ func NewServer(opts *Options) (*Server, error) {
return s, nil
}
func (s *Server) logRejectedTLSConns() {
defer s.grWG.Done()
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-s.quitCh:
return
case <-t.C:
blocked := s.connRateCounter.countBlocked()
if blocked > 0 {
s.Warnf("Rejected %d connections due to TLS rate limiting", blocked)
}
}
}
}
// clusterName returns our cluster name which could be dynamic.
func (s *Server) ClusterName() string {
s.mu.Lock()
@@ -1612,7 +1635,7 @@ func (s *Server) Start() {
s.checkResolvePreloads()
}
// Log the pid to a file
// Log the pid to a file.
if opts.PidFile != _EMPTY_ {
if err := s.logPid(); err != nil {
s.Fatalf("Could not write pidfile: %v", err)
@@ -1631,14 +1654,21 @@ func (s *Server) Start() {
s.SetDefaultSystemAccount()
}
// start up resolver machinery
// Start monitoring before enabling other subsystems of the
// server to be able to monitor during startup.
if err := s.StartMonitoring(); err != nil {
s.Fatalf("Can't start monitoring: %v", err)
return
}
// Start up resolver machinery.
if ar := s.AccountResolver(); ar != nil {
if err := ar.Start(s); err != nil {
s.Fatalf("Could not start resolver: %v", err)
return
}
// In operator mode, when the account resolver depends on an external system and
// the system account is the bootstrapping account, start fetching it
// the system account is the bootstrapping account, start fetching it.
if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
_, isMemResolver := ar.(*MemAccResolver)
if v, ok := s.accounts.Load(s.opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" {
@@ -1726,12 +1756,6 @@ func (s *Server) Start() {
// Start OCSP Stapling monitoring for TLS certificates if enabled.
s.startOCSPMonitoring()
// Start monitoring if needed.
if err := s.StartMonitoring(); err != nil {
s.Fatalf("Can't start monitoring: %v", err)
return
}
// Start up gateway if needed. Do this before starting the routes, because
// we want to resolve the gateway host:port so that this information can
// be sent to other routes.
@@ -1787,6 +1811,10 @@ func (s *Server) Start() {
s.logPorts()
}
if opts.TLSRateLimit > 0 {
s.startGoRoutine(s.logRejectedTLSConns)
}
// Wait for clients.
s.AcceptLoop(clientListenReady)
}
@@ -2480,6 +2508,13 @@ func (s *Server) createClient(conn net.Conn) *client {
// Check for TLS
if !isClosed && tlsRequired {
if s.connRateCounter != nil && !s.connRateCounter.allow() {
c.mu.Unlock()
c.sendErr("Connection throttling is active. Please try again later.")
c.closeConnection(MaxConnectionsExceeded)
return nil
}
// If we have a prebuffer create a multi-reader.
if len(pre) > 0 {
c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}

View File

@@ -1852,6 +1852,74 @@ func TestTLSPinnedCertsClient(t *testing.T) {
nc.Close()
}
type captureWarnLogger struct {
dummyLogger
receive chan string
}
func newCaptureWarnLogger() *captureWarnLogger {
return &captureWarnLogger{
receive: make(chan string, 100),
}
}
func (l *captureWarnLogger) Warnf(format string, v ...interface{}) {
l.receive <- fmt.Sprintf(format, v...)
}
func (l *captureWarnLogger) waitFor(expect string, timeout time.Duration) bool {
for {
select {
case msg := <-l.receive:
if strings.Contains(msg, expect) {
return true
}
case <-time.After(timeout):
return false
}
}
}
func TestTLSConnectionRate(t *testing.T) {
config := `
listen: "127.0.0.1:-1"
tls {
cert_file: "./configs/certs/server-cert.pem"
key_file: "./configs/certs/server-key.pem"
connection_rate_limit: 3
}
`
confFileName := createConfFile(t, []byte(config))
defer removeFile(t, confFileName)
srv, _ := RunServerWithConfig(confFileName)
logger := newCaptureWarnLogger()
srv.SetLogger(logger, false, false)
defer srv.Shutdown()
var err error
count := 0
for count < 10 {
var nc *nats.Conn
nc, err = nats.Connect(srv.ClientURL(), nats.RootCAs("./configs/certs/ca.pem"))
if err != nil {
break
}
nc.Close()
count++
}
if count != 3 {
t.Fatalf("Expected 3 connections per second, got %d (%v)", count, err)
}
if !logger.waitFor("connections due to TLS rate limiting", time.Second) {
t.Fatalf("did not log 'TLS rate limiting' warning")
}
}
func TestTLSPinnedCertsRoute(t *testing.T) {
tmplSeed := `
host: localhost

View File

@@ -1,5 +0,0 @@
module github.com/minio/highwayhash
go 1.15
require golang.org/x/sys v0.0.0-20190130150945-aca44879d564

View File

@@ -1,2 +0,0 @@
golang.org/x/sys v0.0.0-20190130150945-aca44879d564 h1:o6ENHFwwr1TZ9CUPQcfo1HGvLP1OPsPOTB7xCIOPNmU=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@@ -51,15 +51,16 @@ func (n *NatsLimits) IsUnlimited() bool {
}
type JetStreamLimits struct {
MemoryStorage int64 `json:"mem_storage,omitempty"` // Max number of bytes stored in memory across all streams. (0 means disabled)
DiskStorage int64 `json:"disk_storage,omitempty"` // Max number of bytes stored on disk across all streams. (0 means disabled)
Streams int64 `json:"streams,omitempty"` // Max number of streams
Consumer int64 `json:"consumer,omitempty"` // Max number of consumer
MemoryStorage int64 `json:"mem_storage,omitempty"` // Max number of bytes stored in memory across all streams. (0 means disabled)
DiskStorage int64 `json:"disk_storage,omitempty"` // Max number of bytes stored on disk across all streams. (0 means disabled)
Streams int64 `json:"streams,omitempty"` // Max number of streams
Consumer int64 `json:"consumer,omitempty"` // Max number of consumers
MaxBytesRequired bool `json:"max_bytes_required,omitempty"` // Max bytes required by all Streams
}
// IsUnlimited returns true if all limits are unlimited
func (j *JetStreamLimits) IsUnlimited() bool {
return *j == JetStreamLimits{NoLimit, NoLimit, NoLimit, NoLimit}
return *j == JetStreamLimits{NoLimit, NoLimit, NoLimit, NoLimit, false}
}
// OperatorLimits are used to limit access by an account
@@ -188,7 +189,7 @@ func NewAccountClaims(subject string) *AccountClaims {
c.Limits = OperatorLimits{
NatsLimits{NoLimit, NoLimit, NoLimit},
AccountLimits{NoLimit, NoLimit, true, NoLimit, NoLimit},
JetStreamLimits{0, 0, 0, 0}}
JetStreamLimits{0, 0, 0, 0, false}}
c.Subject = subject
c.Mappings = Mapping{}
return c

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"regexp"
"strings"
"time"
"github.com/nats-io/nkeys"
)
@@ -216,3 +217,49 @@ func ParseDecoratedUserNKey(contents []byte) (nkeys.KeyPair, error) {
}
return kp, nil
}
// IssueUserJWT takes an account scoped signing key, account id, and use public key (and optionally a user's name, an expiration duration and tags) and returns a valid signed JWT.
// The scopedSigningKey, is a mandatory account scoped signing nkey pair to sign the generated jwt (note that it _must_ be a signing key attached to the account (and a _scoped_ signing key), not the account's private (seed) key).
// The accountId, is a mandatory public account nkey. Will return error when not set or not account nkey.
// The publicUserKey, is a mandatory public user nkey. Will return error when not set or not user nkey.
// The name, is an optional human-readable name. When absent, default to publicUserKey.
// The expirationDuration, is an optional but recommended duration, when the generated jwt needs to expire. If not set, JWT will not expire.
// The tags, is an optional list of tags to be included in the JWT.
//
// Returns:
// string, resulting jwt.
// error, when issues arose.
func IssueUserJWT(scopedSigningKey nkeys.KeyPair, accountId string, publicUserKey string, name string, expirationDuration time.Duration, tags ...string) (string, error) {
if !nkeys.IsValidPublicAccountKey(accountId) {
return "", errors.New("issueUserJWT requires an account key for the accountId parameter, but got " + nkeys.Prefix(accountId).String())
}
if !nkeys.IsValidPublicUserKey(publicUserKey) {
return "", errors.New("issueUserJWT requires an account key for the publicUserKey parameter, but got " + nkeys.Prefix(publicUserKey).String())
}
claim := NewUserClaims(publicUserKey)
claim.SetScoped(true)
if expirationDuration != 0 {
claim.Expires = time.Now().Add(expirationDuration).UTC().Unix()
}
claim.IssuerAccount = accountId
if name != "" {
claim.Name = name
} else {
claim.Name = publicUserKey
}
claim.Subject = publicUserKey
claim.Tags = tags
encoded, err := claim.Encode(scopedSigningKey)
if err != nil {
return "", errors.New("err encoding claim " + err.Error())
}
return encoded, nil
}

View File

@@ -73,7 +73,7 @@ func (oa v1AccountClaims) migrateV1() (*AccountClaims, error) {
a.Account.Exports = oa.v1NatsAccount.Exports
a.Account.Limits.AccountLimits = oa.v1NatsAccount.Limits.AccountLimits
a.Account.Limits.NatsLimits = oa.v1NatsAccount.Limits.NatsLimits
a.Account.Limits.JetStreamLimits = JetStreamLimits{0, 0, 0, 0}
a.Account.Limits.JetStreamLimits = JetStreamLimits{0, 0, 0, 0, false}
a.Account.SigningKeys = make(SigningKeys)
for _, v := range oa.SigningKeys {
a.Account.SigningKeys.Add(v)

View File

@@ -1,5 +0,0 @@
module github.com/nats-io/jwt/v2
go 1.16
require github.com/nats-io/nkeys v0.3.0

View File

@@ -1,9 +0,0 @@
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -1,8 +0,0 @@
module github.com/nats-io/nats.go
go 1.16
require (
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
)

View File

@@ -1,11 +0,0 @@
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -859,11 +859,6 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
return nil
}
func (ctx ContextOpt) configureSubscribe(opts *subOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configurePull(opts *pullOpts) error {
opts.ctx = ctx
return nil
@@ -970,9 +965,6 @@ type jsSub struct {
fcd uint64
fciseq uint64
csfct *time.Timer
// Cancellation function to cancel context on drain/unsubscribe.
cancel func()
}
// Deletes the JS Consumer.
@@ -1251,7 +1243,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.bound
ctx = o.ctx
notFoundErr bool
lookupErr bool
nc = js.nc
@@ -1398,13 +1389,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
deliver = nc.newInbox()
}
// In case this has a context, then create a child context that
// is possible to cancel via unsubscribe / drain.
var cancel func()
if ctx != nil {
ctx, cancel = context.WithCancel(ctx)
}
jsi := &jsSub{
js: js,
stream: stream,
@@ -1417,7 +1401,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
pull: isPullMode,
nms: nms,
psubj: subj,
cancel: cancel,
}
// Check if we are manual ack.
@@ -1556,14 +1539,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
sub.chanSubcheckForFlowControlResponse()
}
// Wait for context to get canceled if there is one.
if ctx != nil {
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()
}
return sub, nil
}
@@ -1838,7 +1813,7 @@ func (sub *Subscription) scheduleFlowControlResponse(reply string) {
func (sub *Subscription) activityCheck() {
sub.mu.Lock()
jsi := sub.jsi
if jsi == nil || sub.closed {
if jsi == nil {
sub.mu.Unlock()
return
}
@@ -1847,9 +1822,10 @@ func (sub *Subscription) activityCheck() {
jsi.hbc.Reset(jsi.hbi)
jsi.active = false
nc := sub.conn
closed := sub.closed
sub.mu.Unlock()
if !active {
if !active && !closed {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
@@ -1977,7 +1953,6 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
ctx context.Context
}
// OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages.

View File

@@ -689,9 +689,6 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if o.metaOnly {
subOpts = append(subOpts, HeadersOnly())
}
if o.ctx != nil {
subOpts = append(subOpts, Context(o.ctx))
}
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err

View File

@@ -1191,7 +1191,7 @@ func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
nc.Opts.DiscoveredServersCB = dscb
}
// SetClosedHandler will set the closed event handler.
// SetClosedHandler will set the reconnect event handler.
func (nc *Conn) SetClosedHandler(cb ConnHandler) {
if nc == nil {
return
@@ -4144,20 +4144,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
nc.bw.appendString(fmt.Sprintf(unsubProto, s.sid, maxStr))
nc.kickFlusher()
}
// For JetStream subscriptions cancel the attached context if there is any.
var cancel func()
sub.mu.Lock()
jsi := sub.jsi
if jsi != nil {
cancel = jsi.cancel
jsi.cancel = nil
}
sub.mu.Unlock()
if cancel != nil {
cancel()
}
return nil
}

View File

@@ -1,5 +0,0 @@
module github.com/nats-io/nkeys
go 1.16
require golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b

View File

@@ -1,7 +0,0 @@
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

20
vendor/modules.txt vendored
View File

@@ -1,27 +1,27 @@
# github.com/golang/protobuf v1.4.2
## explicit
## explicit; go 1.9
# github.com/klauspost/compress v1.13.4
## explicit
## explicit; go 1.13
github.com/klauspost/compress/s2
# github.com/minio/highwayhash v1.0.1
## explicit
## explicit; go 1.15
github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.2.0
## explicit
# github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296
## explicit; go 1.16
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.13.1-0.20220106142636-ebcdff697d3f
## explicit
# github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
## explicit; go 1.16
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
github.com/nats-io/nats.go/util
# github.com/nats-io/nkeys v0.3.0
## explicit
## explicit; go 1.16
github.com/nats-io/nkeys
# github.com/nats-io/nuid v1.0.1
## explicit
github.com/nats-io/nuid
# golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3
## explicit
## explicit; go 1.17
golang.org/x/crypto/bcrypt
golang.org/x/crypto/blowfish
golang.org/x/crypto/chacha20
@@ -32,7 +32,7 @@ golang.org/x/crypto/internal/poly1305
golang.org/x/crypto/internal/subtle
golang.org/x/crypto/ocsp
# golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1
## explicit
## explicit; go 1.17
golang.org/x/sys/cpu
golang.org/x/sys/internal/unsafeheader
golang.org/x/sys/unix