diff --git a/.goreleaser.yml b/.goreleaser.yml index 01f06bc0..fc9fad12 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -35,7 +35,7 @@ builds: goarch: 386 nfpms: - - name_template: '{{.ProjectName}}-{{.Tag}}-{{.Arch}}{{if .Arm}}{{.Arm}}{{end}}' + - file_name_template: '{{.ProjectName}}-{{.Tag}}-{{.Arch}}{{if .Arm}}{{.Arm}}{{end}}' homepage: https://nats.io description: High-Performance server for NATS, the cloud native messaging system. maintainer: Ivan Kozlovic diff --git a/.travis.yml b/.travis.yml index 780ea3a3..fa2bf4d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,9 +31,8 @@ script: deploy: provider: script - skip_cleanup: false + cleanup: true script: curl -sL http://git.io/goreleaser | bash - verbose: true on: tags: true condition: $TRAVIS_GO_VERSION =~ 1.13 diff --git a/README.md b/README.md index 374da37b..3aac13d3 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,8 @@ If you are interested in contributing to NATS, read about our... [Fossa-Image]: https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fgnatsd.svg?type=shield [Build-Status-Url]: https://travis-ci.org/nats-io/nats-server [Build-Status-Image]: https://travis-ci.org/nats-io/nats-server.svg?branch=master -[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.1.6 -[Release-image]: https://img.shields.io/badge/release-v2.1.6-1eb0fc.svg +[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.1.7 +[Release-image]: https://img.shields.io/badge/release-v2.1.7-1eb0fc.svg [Coverage-Url]: https://coveralls.io/r/nats-io/nats-server?branch=master [Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-server/badge.svg?branch=master [ReportCard-Url]: https://goreportcard.com/report/nats-io/nats-server diff --git a/docker/Dockerfile.alpine b/docker/Dockerfile.alpine index 4622c4e4..90c4a4ea 100644 --- a/docker/Dockerfile.alpine +++ b/docker/Dockerfile.alpine @@ -1,4 +1,4 @@ -FROM golang:1.13.9-alpine3.11 AS builder +FROM golang:1.13.10-alpine3.11 AS builder WORKDIR $GOPATH/src/github.com/nats-io/nats-server diff --git a/go.mod b/go.mod index e13b6394..37e824dd 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,11 @@ module github.com/nats-io/nats-server/v2 require ( - github.com/golang/protobuf v1.3.5 // indirect github.com/nats-io/jwt v0.3.2 - github.com/nats-io/nats.go v1.9.2 + github.com/nats-io/nats.go v1.10.0 github.com/nats-io/nkeys v0.1.4 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e + google.golang.org/protobuf v1.22.0 // indirect ) - -go 1.13 diff --git a/go.sum b/go.sum index 5cce3487..00112e77 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,17 @@ -github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/nats.go v1.9.2 h1:oDeERm3NcZVrPpdR/JpGdWHMv3oJ8yY30YwxKq+DU2s= -github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= +github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= +github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= @@ -19,3 +27,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2MY= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= diff --git a/server/const.go b/server/const.go index 1daa9357..6edd64ab 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.1.6" + VERSION = "2.1.7" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml index b5228774..3b00ae19 100644 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ b/vendor/github.com/nats-io/nats.go/.travis.yml @@ -1,8 +1,7 @@ language: go -sudo: false go: +- 1.14.x - 1.13.x -- 1.12.x env: - GO111MODULE=off go_import_path: github.com/nats-io/nats.go @@ -21,4 +20,4 @@ before_script: script: - go test -i -race ./... - go test -v -run=TestNoRace -p=1 ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.13 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.14 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index c059c253..5eb7c968 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -20,7 +20,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.9.2 +go get github.com/nats-io/nats.go/@v1.10.0 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 @@ -325,6 +325,18 @@ nc, err := nats.Connect(servers) // This example means 10 seconds total per backend. nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second)) +// You can also add some jitter for the reconnection. +// This call will add up to 500 milliseconds for non TLS connections and 2 seconds for TLS connections. +// If not specified, the library defaults to 100 milliseconds and 1 second, respectively. +nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second)) + +// You can also specify a custom reconnect delay handler. If set, the library will invoke it when it has tried +// all URLs in its list. The value returned will be used as the total sleep time, so add your own jitter. +// The library will pass the number of times it went through the whole list. +nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration { + return someBackoffFunction(attempts) +})) + // Optionally disable randomization of the server pool nc, err = nats.Connect(servers, nats.DontRandomize()) diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index f741894b..d0d569cc 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -1,4 +1,4 @@ -// Copyright 2012-2019 The NATS Authors +// Copyright 2012-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -45,19 +45,21 @@ import ( // Default Constants const ( - Version = "1.9.2" - DefaultURL = "nats://127.0.0.1:4222" - DefaultPort = 4222 - DefaultMaxReconnect = 60 - DefaultReconnectWait = 2 * time.Second - DefaultTimeout = 2 * time.Second - DefaultPingInterval = 2 * time.Minute - DefaultMaxPingOut = 2 - DefaultMaxChanLen = 8192 // 8k - DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB - RequestChanLen = 8 - DefaultDrainTimeout = 30 * time.Second - LangString = "go" + Version = "1.10.0" + DefaultURL = "nats://127.0.0.1:4222" + DefaultPort = 4222 + DefaultMaxReconnect = 60 + DefaultReconnectWait = 2 * time.Second + DefaultReconnectJitter = 100 * time.Millisecond + DefaultReconnectJitterTLS = time.Second + DefaultTimeout = 2 * time.Second + DefaultPingInterval = 2 * time.Minute + DefaultMaxPingOut = 2 + DefaultMaxChanLen = 8192 // 8k + DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB + RequestChanLen = 8 + DefaultDrainTimeout = 30 * time.Second + LangString = "go" ) const ( @@ -116,6 +118,8 @@ var ( ErrTokenAlreadySet = errors.New("nats: token and token handler both set") ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") ErrMsgNoReply = errors.New("nats: message does not have a reply") + ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") + ErrDisconnected = errors.New("nats: server is disconnected") ) func init() { @@ -125,15 +129,17 @@ func init() { // GetDefaultOptions returns default configuration options for the client. func GetDefaultOptions() Options { return Options{ - AllowReconnect: true, - MaxReconnect: DefaultMaxReconnect, - ReconnectWait: DefaultReconnectWait, - Timeout: DefaultTimeout, - PingInterval: DefaultPingInterval, - MaxPingsOut: DefaultMaxPingOut, - SubChanLen: DefaultMaxChanLen, - ReconnectBufSize: DefaultReconnectBufSize, - DrainTimeout: DefaultDrainTimeout, + AllowReconnect: true, + MaxReconnect: DefaultMaxReconnect, + ReconnectWait: DefaultReconnectWait, + ReconnectJitter: DefaultReconnectJitter, + ReconnectJitterTLS: DefaultReconnectJitterTLS, + Timeout: DefaultTimeout, + PingInterval: DefaultPingInterval, + MaxPingsOut: DefaultMaxPingOut, + SubChanLen: DefaultMaxChanLen, + ReconnectBufSize: DefaultReconnectBufSize, + DrainTimeout: DefaultDrainTimeout, } } @@ -180,6 +186,12 @@ type SignatureHandler func([]byte) ([]byte, error) // AuthTokenHandler is used to generate a new token. type AuthTokenHandler func() string +// ReconnectDelayHandler is used to get from the user the desired +// delay the library should pause before attempting to reconnect +// again. Note that this is invoked after the library tried the +// whole list of URLs and failed to reconnect. +type ReconnectDelayHandler func(attempts int) time.Duration + // asyncCB is used to preserve order for async callbacks. type asyncCB struct { f func() @@ -256,6 +268,24 @@ type Options struct { // to a server that we were already connected to previously. ReconnectWait time.Duration + // CustomReconnectDelayCB is invoked after the library tried every + // URL in the server list and failed to reconnect. It passes to the + // user the current number of attempts. This function returns the + // amount of time the library will sleep before attempting to reconnect + // again. It is strongly recommended that this value contains some + // jitter to prevent all connections to attempt reconnecting at the same time. + CustomReconnectDelayCB ReconnectDelayHandler + + // ReconnectJitter sets the upper bound for a random delay added to + // ReconnectWait during a reconnect when no TLS is used. + // Note that any jitter is capped with ReconnectJitterMax. + ReconnectJitter time.Duration + + // ReconnectJitterTLS sets the upper bound for a random delay added to + // ReconnectWait during a reconnect when TLS is used. + // Note that any jitter is capped with ReconnectJitterMax. + ReconnectJitterTLS time.Duration + // Timeout sets the timeout for a Dial operation on a connection. Timeout time.Duration @@ -409,6 +439,7 @@ type Conn struct { ptmr *time.Timer pout int ar bool // abort reconnect + rqch chan struct{} // New style response handler respSub string // The wildcard subject @@ -486,17 +517,16 @@ type Statistics struct { // Tracks individual backend servers. type srv struct { - url *url.URL - didConnect bool - reconnects int - lastAttempt time.Time - lastErr error - isImplicit bool - tlsName string + url *url.URL + didConnect bool + reconnects int + lastErr error + isImplicit bool + tlsName string } type serverInfo struct { - Id string `json:"server_id"` + ID string `json:"server_id"` Host string `json:"host"` Port uint `json:"port"` Version string `json:"version"` @@ -506,6 +536,7 @@ type serverInfo struct { ConnectURLs []string `json:"connect_urls,omitempty"` Proto int `json:"proto,omitempty"` CID uint64 `json:"client_id,omitempty"` + ClientIP string `json:"client_ip,omitempty"` Nonce string `json:"nonce,omitempty"` } @@ -669,6 +700,24 @@ func MaxReconnects(max int) Option { } } +// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait. +func ReconnectJitter(jitter, jitterForTLS time.Duration) Option { + return func(o *Options) error { + o.ReconnectJitter = jitter + o.ReconnectJitterTLS = jitterForTLS + return nil + } +} + +// CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option. +// See CustomReconnectDelayCB Option for more details. +func CustomReconnectDelay(cb ReconnectDelayHandler) Option { + return func(o *Options) error { + o.CustomReconnectDelayCB = cb + return nil + } +} + // PingInterval is an Option to set the period for client ping commands. func PingInterval(t time.Duration) Option { return func(o *Options) error { @@ -1123,7 +1172,7 @@ func (nc *Conn) setupServerPool() error { // Randomize if allowed to if !nc.Opts.NoRandomize { - nc.shufflePool() + nc.shufflePool(0) } // Normally, if this one is set, Options.Servers should not be, @@ -1220,14 +1269,16 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error { } // shufflePool swaps randomly elements in the server pool -func (nc *Conn) shufflePool() { - if len(nc.srvPool) <= 1 { +// The `offset` value indicates that the shuffling should start at +// this offset and leave the elements from [0..offset) intact. +func (nc *Conn) shufflePool(offset int) { + if len(nc.srvPool) <= offset+1 { return } source := rand.NewSource(time.Now().UnixNano()) r := rand.New(source) - for i := range nc.srvPool { - j := r.Intn(i + 1) + for i := offset; i < len(nc.srvPool); i++ { + j := offset + r.Intn(i+1-offset) nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i] } } @@ -1249,8 +1300,6 @@ func (nc *Conn) createConn() (err error) { } if _, cur := nc.currentServer(); cur == nil { return ErrNoServers - } else { - cur.lastAttempt = time.Now() } // We will auto-expand host names if they resolve to multiple IPs @@ -1384,7 +1433,7 @@ func (nc *Conn) ConnectedServerId() string { if nc.status != CONNECTED { return _EMPTY_ } - return nc.info.Id + return nc.info.ID } // Low level setup for structs, etc @@ -1393,6 +1442,7 @@ func (nc *Conn) setup() { nc.pongs = make([]chan struct{}, 0, 8) nc.fch = make(chan struct{}, flushChanSize) + nc.rqch = make(chan struct{}) // Setup scratch outbound buffer for PUB pub := nc.scratch[:len(_PUB_P_)] @@ -1450,6 +1500,7 @@ func (nc *Conn) connect() error { // For first connect we walk all servers in the pool and try // to connect immediately. nc.mu.Lock() + defer nc.mu.Unlock() nc.initc = true // The pool may change inside the loop iteration due to INFO protocol. for i := 0; i < len(nc.srvPool); i++ { @@ -1463,8 +1514,8 @@ func (nc *Conn) connect() error { err = nc.processConnectInit() if err == nil { - nc.srvPool[i].didConnect = true - nc.srvPool[i].reconnects = 0 + nc.current.didConnect = true + nc.current.reconnects = 0 nc.current.lastErr = nil returnedErr = nil break @@ -1487,7 +1538,7 @@ func (nc *Conn) connect() error { if returnedErr == nil && nc.status != CONNECTED { returnedErr = ErrNoServers } - nc.mu.Unlock() + return returnedErr } @@ -1814,33 +1865,63 @@ func (nc *Conn) doReconnect(err error) { // This is used to wait on go routines exit if we start them in the loop // but an error occurs after that. waitForGoRoutines := false + var rt *time.Timer + // Channel used to kick routine out of sleep when conn is closed. + rqch := nc.rqch + // Counter that is increased when the whole list of servers has been tried. + var wlf int - for len(nc.srvPool) > 0 { + var jitter time.Duration + var rw time.Duration + // If a custom reconnect delay handler is set, this takes precedence. + crd := nc.Opts.CustomReconnectDelayCB + if crd == nil { + rw = nc.Opts.ReconnectWait + // TODO: since we sleep only after the whole list has been tried, we can't + // rely on individual *srv to know if it is a TLS or non-TLS url. + // We have to pick which type of jitter to use, for now, we use these hints: + jitter = nc.Opts.ReconnectJitter + if nc.Opts.Secure || nc.Opts.TLSConfig != nil { + jitter = nc.Opts.ReconnectJitterTLS + } + } + + for i := 0; len(nc.srvPool) > 0; { cur, err := nc.selectNextServer() if err != nil { nc.err = err break } - sleepTime := int64(0) - - // Sleep appropriate amount of time before the - // connection attempt if connecting to same server - // we just got disconnected from.. - if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait { - sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt)) - } - - // On Windows, createConn() will take more than a second when no - // server is running at that address. So it could be that the - // time elapsed between reconnect attempts is always > than - // the set option. Release the lock to give a chance to a parallel - // nc.Close() to break the loop. + doSleep := i+1 >= len(nc.srvPool) nc.mu.Unlock() - if sleepTime <= 0 { + + if !doSleep { + i++ + // Release the lock to give a chance to a concurrent nc.Close() to break the loop. runtime.Gosched() } else { - time.Sleep(time.Duration(sleepTime)) + i = 0 + var st time.Duration + if crd != nil { + wlf++ + st = crd(wlf) + } else { + st = rw + if jitter > 0 { + st += time.Duration(rand.Int63n(int64(jitter))) + } + } + if rt == nil { + rt = time.NewTimer(st) + } else { + rt.Reset(st) + } + select { + case <-rqch: + rt.Stop() + case <-rt.C: + } } // If the readLoop, etc.. go routines were started, wait for them to complete. if waitForGoRoutines { @@ -2427,8 +2508,14 @@ func (nc *Conn) processInfo(info string) error { } nc.addURLToPool(fmt.Sprintf("%s://%s", nc.connScheme(), curl), true, saveTLS) } - if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil { - nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) }) + if hasNew { + // Randomize the pool if allowed but leave the first URL in place. + if !nc.Opts.NoRandomize { + nc.shufflePool(1) + } + if !nc.initc && nc.Opts.DiscoveredServersCB != nil { + nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) }) + } } return nil @@ -2748,7 +2835,7 @@ func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Ms inbox := NewInbox() ch := make(chan *Msg, RequestChanLen) - s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, false) + s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true) if err != nil { return nil, err } @@ -3541,10 +3628,25 @@ func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) { return } +// RTT calculates the round trip time between this client and the server. +func (nc *Conn) RTT() (time.Duration, error) { + if nc.IsClosed() { + return 0, ErrConnectionClosed + } + if nc.IsReconnecting() { + return 0, ErrDisconnected + } + start := time.Now() + if err := nc.FlushTimeout(10 * time.Second); err != nil { + return 0, err + } + return time.Since(start), nil +} + // Flush will perform a round trip to the server and return when it // receives the internal reply. func (nc *Conn) Flush() error { - return nc.FlushTimeout(60 * time.Second) + return nc.FlushTimeout(10 * time.Second) } // Buffered will return the number of bytes buffered to be sent to the server. @@ -3636,9 +3738,13 @@ func (nc *Conn) close(status Status, doCBs bool, err error) { // Kick the Go routines so they fall out. nc.kickFlusher() - nc.mu.Unlock() - nc.mu.Lock() + // If the reconnect timer is waiting between a reconnect attempt, + // this will kick it out. + if nc.rqch != nil { + close(nc.rqch) + nc.rqch = nil + } // Clear any queued pongs, e.g. pending flush calls. nc.clearPendingFlushCalls() @@ -4005,10 +4111,25 @@ func (nc *Conn) Barrier(f func()) error { return nil } +// GetClientIP returns the client IP as known by the server. +// Supported as of server version 2.1.6. +func (nc *Conn) GetClientIP() (net.IP, error) { + nc.mu.RLock() + defer nc.mu.RUnlock() + if nc.isClosed() { + return nil, ErrConnectionClosed + } + if nc.info.ClientIP == "" { + return nil, ErrClientIPNotSupported + } + ip := net.ParseIP(nc.info.ClientIP) + return ip, nil +} + // GetClientID returns the client ID assigned by the server to which // the client is currently connected to. Note that the value may change if // the client reconnects. -// This function returns ErrNoClientIDReturned if the server is of a +// This function returns ErrClientIDNotSupported if the server is of a // version prior to 1.2.0. func (nc *Conn) GetClientID() (uint64, error) { nc.mu.RLock() diff --git a/vendor/github.com/nats-io/nats.go/parser.go b/vendor/github.com/nats-io/nats.go/parser.go index a4b3ea0e..630142a3 100644 --- a/vendor/github.com/nats-io/nats.go/parser.go +++ b/vendor/github.com/nats-io/nats.go/parser.go @@ -24,7 +24,7 @@ type msgArg struct { size int } -const MAX_CONTROL_LINE_SIZE = 1024 +const MAX_CONTROL_LINE_SIZE = 4096 type parseState struct { state int diff --git a/vendor/modules.txt b/vendor/modules.txt index 8cf331f6..249024a0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,19 +1,27 @@ +# github.com/golang/protobuf v1.3.5 +## explicit # github.com/nats-io/jwt v0.3.2 +## explicit github.com/nats-io/jwt -# github.com/nats-io/nats.go v1.9.2 +# github.com/nats-io/nats.go v1.10.0 +## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/util # github.com/nats-io/nkeys v0.1.4 +## explicit github.com/nats-io/nkeys # github.com/nats-io/nuid v1.0.1 +## explicit github.com/nats-io/nuid # golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 +## explicit golang.org/x/crypto/bcrypt golang.org/x/crypto/blowfish golang.org/x/crypto/ed25519 golang.org/x/crypto/ed25519/internal/edwards25519 # golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e +## explicit golang.org/x/sys/windows golang.org/x/sys/windows/registry golang.org/x/sys/windows/svc