mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
@@ -35,7 +35,7 @@ builds:
|
||||
goarch: 386
|
||||
|
||||
nfpms:
|
||||
- name_template: '{{.ProjectName}}-{{.Tag}}-{{.Arch}}{{if .Arm}}{{.Arm}}{{end}}'
|
||||
- file_name_template: '{{.ProjectName}}-{{.Tag}}-{{.Arch}}{{if .Arm}}{{.Arm}}{{end}}'
|
||||
homepage: https://nats.io
|
||||
description: High-Performance server for NATS, the cloud native messaging system.
|
||||
maintainer: Ivan Kozlovic <ivan@synadia.com>
|
||||
|
||||
@@ -31,9 +31,8 @@ script:
|
||||
|
||||
deploy:
|
||||
provider: script
|
||||
skip_cleanup: false
|
||||
cleanup: true
|
||||
script: curl -sL http://git.io/goreleaser | bash
|
||||
verbose: true
|
||||
on:
|
||||
tags: true
|
||||
condition: $TRAVIS_GO_VERSION =~ 1.13
|
||||
|
||||
@@ -29,8 +29,8 @@ If you are interested in contributing to NATS, read about our...
|
||||
[Fossa-Image]: https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fgnatsd.svg?type=shield
|
||||
[Build-Status-Url]: https://travis-ci.org/nats-io/nats-server
|
||||
[Build-Status-Image]: https://travis-ci.org/nats-io/nats-server.svg?branch=master
|
||||
[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.1.6
|
||||
[Release-image]: https://img.shields.io/badge/release-v2.1.6-1eb0fc.svg
|
||||
[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.1.7
|
||||
[Release-image]: https://img.shields.io/badge/release-v2.1.7-1eb0fc.svg
|
||||
[Coverage-Url]: https://coveralls.io/r/nats-io/nats-server?branch=master
|
||||
[Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-server/badge.svg?branch=master
|
||||
[ReportCard-Url]: https://goreportcard.com/report/nats-io/nats-server
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM golang:1.13.9-alpine3.11 AS builder
|
||||
FROM golang:1.13.10-alpine3.11 AS builder
|
||||
|
||||
WORKDIR $GOPATH/src/github.com/nats-io/nats-server
|
||||
|
||||
|
||||
6
go.mod
6
go.mod
@@ -1,13 +1,11 @@
|
||||
module github.com/nats-io/nats-server/v2
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.3.5 // indirect
|
||||
github.com/nats-io/jwt v0.3.2
|
||||
github.com/nats-io/nats.go v1.9.2
|
||||
github.com/nats-io/nats.go v1.10.0
|
||||
github.com/nats-io/nkeys v0.1.4
|
||||
github.com/nats-io/nuid v1.0.1
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
|
||||
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e
|
||||
google.golang.org/protobuf v1.22.0 // indirect
|
||||
)
|
||||
|
||||
go 1.13
|
||||
|
||||
25
go.sum
25
go.sum
@@ -1,9 +1,17 @@
|
||||
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
|
||||
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
|
||||
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
|
||||
github.com/nats-io/nats.go v1.9.2 h1:oDeERm3NcZVrPpdR/JpGdWHMv3oJ8yY30YwxKq+DU2s=
|
||||
github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
|
||||
github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY=
|
||||
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
|
||||
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
|
||||
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
@@ -19,3 +27,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0=
|
||||
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2MY=
|
||||
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
|
||||
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.1.6"
|
||||
VERSION = "2.1.7"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
5
vendor/github.com/nats-io/nats.go/.travis.yml
generated
vendored
5
vendor/github.com/nats-io/nats.go/.travis.yml
generated
vendored
@@ -1,8 +1,7 @@
|
||||
language: go
|
||||
sudo: false
|
||||
go:
|
||||
- 1.14.x
|
||||
- 1.13.x
|
||||
- 1.12.x
|
||||
env:
|
||||
- GO111MODULE=off
|
||||
go_import_path: github.com/nats-io/nats.go
|
||||
@@ -21,4 +20,4 @@ before_script:
|
||||
script:
|
||||
- go test -i -race ./...
|
||||
- go test -v -run=TestNoRace -p=1 ./...
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.13 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.14 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi
|
||||
|
||||
14
vendor/github.com/nats-io/nats.go/README.md
generated
vendored
14
vendor/github.com/nats-io/nats.go/README.md
generated
vendored
@@ -20,7 +20,7 @@ When using or transitioning to Go modules support:
|
||||
```bash
|
||||
# Go client latest or explicit version
|
||||
go get github.com/nats-io/nats.go/@latest
|
||||
go get github.com/nats-io/nats.go/@v1.9.2
|
||||
go get github.com/nats-io/nats.go/@v1.10.0
|
||||
|
||||
# For latest NATS Server, add /v2 at the end
|
||||
go get github.com/nats-io/nats-server/v2
|
||||
@@ -325,6 +325,18 @@ nc, err := nats.Connect(servers)
|
||||
// This example means 10 seconds total per backend.
|
||||
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))
|
||||
|
||||
// You can also add some jitter for the reconnection.
|
||||
// This call will add up to 500 milliseconds for non TLS connections and 2 seconds for TLS connections.
|
||||
// If not specified, the library defaults to 100 milliseconds and 1 second, respectively.
|
||||
nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second))
|
||||
|
||||
// You can also specify a custom reconnect delay handler. If set, the library will invoke it when it has tried
|
||||
// all URLs in its list. The value returned will be used as the total sleep time, so add your own jitter.
|
||||
// The library will pass the number of times it went through the whole list.
|
||||
nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration {
|
||||
return someBackoffFunction(attempts)
|
||||
}))
|
||||
|
||||
// Optionally disable randomization of the server pool
|
||||
nc, err = nats.Connect(servers, nats.DontRandomize())
|
||||
|
||||
|
||||
253
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
253
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2019 The NATS Authors
|
||||
// Copyright 2012-2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -45,19 +45,21 @@ import (
|
||||
|
||||
// Default Constants
|
||||
const (
|
||||
Version = "1.9.2"
|
||||
DefaultURL = "nats://127.0.0.1:4222"
|
||||
DefaultPort = 4222
|
||||
DefaultMaxReconnect = 60
|
||||
DefaultReconnectWait = 2 * time.Second
|
||||
DefaultTimeout = 2 * time.Second
|
||||
DefaultPingInterval = 2 * time.Minute
|
||||
DefaultMaxPingOut = 2
|
||||
DefaultMaxChanLen = 8192 // 8k
|
||||
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
|
||||
RequestChanLen = 8
|
||||
DefaultDrainTimeout = 30 * time.Second
|
||||
LangString = "go"
|
||||
Version = "1.10.0"
|
||||
DefaultURL = "nats://127.0.0.1:4222"
|
||||
DefaultPort = 4222
|
||||
DefaultMaxReconnect = 60
|
||||
DefaultReconnectWait = 2 * time.Second
|
||||
DefaultReconnectJitter = 100 * time.Millisecond
|
||||
DefaultReconnectJitterTLS = time.Second
|
||||
DefaultTimeout = 2 * time.Second
|
||||
DefaultPingInterval = 2 * time.Minute
|
||||
DefaultMaxPingOut = 2
|
||||
DefaultMaxChanLen = 8192 // 8k
|
||||
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
|
||||
RequestChanLen = 8
|
||||
DefaultDrainTimeout = 30 * time.Second
|
||||
LangString = "go"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -116,6 +118,8 @@ var (
|
||||
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
|
||||
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
|
||||
ErrMsgNoReply = errors.New("nats: message does not have a reply")
|
||||
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
|
||||
ErrDisconnected = errors.New("nats: server is disconnected")
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -125,15 +129,17 @@ func init() {
|
||||
// GetDefaultOptions returns default configuration options for the client.
|
||||
func GetDefaultOptions() Options {
|
||||
return Options{
|
||||
AllowReconnect: true,
|
||||
MaxReconnect: DefaultMaxReconnect,
|
||||
ReconnectWait: DefaultReconnectWait,
|
||||
Timeout: DefaultTimeout,
|
||||
PingInterval: DefaultPingInterval,
|
||||
MaxPingsOut: DefaultMaxPingOut,
|
||||
SubChanLen: DefaultMaxChanLen,
|
||||
ReconnectBufSize: DefaultReconnectBufSize,
|
||||
DrainTimeout: DefaultDrainTimeout,
|
||||
AllowReconnect: true,
|
||||
MaxReconnect: DefaultMaxReconnect,
|
||||
ReconnectWait: DefaultReconnectWait,
|
||||
ReconnectJitter: DefaultReconnectJitter,
|
||||
ReconnectJitterTLS: DefaultReconnectJitterTLS,
|
||||
Timeout: DefaultTimeout,
|
||||
PingInterval: DefaultPingInterval,
|
||||
MaxPingsOut: DefaultMaxPingOut,
|
||||
SubChanLen: DefaultMaxChanLen,
|
||||
ReconnectBufSize: DefaultReconnectBufSize,
|
||||
DrainTimeout: DefaultDrainTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,6 +186,12 @@ type SignatureHandler func([]byte) ([]byte, error)
|
||||
// AuthTokenHandler is used to generate a new token.
|
||||
type AuthTokenHandler func() string
|
||||
|
||||
// ReconnectDelayHandler is used to get from the user the desired
|
||||
// delay the library should pause before attempting to reconnect
|
||||
// again. Note that this is invoked after the library tried the
|
||||
// whole list of URLs and failed to reconnect.
|
||||
type ReconnectDelayHandler func(attempts int) time.Duration
|
||||
|
||||
// asyncCB is used to preserve order for async callbacks.
|
||||
type asyncCB struct {
|
||||
f func()
|
||||
@@ -256,6 +268,24 @@ type Options struct {
|
||||
// to a server that we were already connected to previously.
|
||||
ReconnectWait time.Duration
|
||||
|
||||
// CustomReconnectDelayCB is invoked after the library tried every
|
||||
// URL in the server list and failed to reconnect. It passes to the
|
||||
// user the current number of attempts. This function returns the
|
||||
// amount of time the library will sleep before attempting to reconnect
|
||||
// again. It is strongly recommended that this value contains some
|
||||
// jitter to prevent all connections to attempt reconnecting at the same time.
|
||||
CustomReconnectDelayCB ReconnectDelayHandler
|
||||
|
||||
// ReconnectJitter sets the upper bound for a random delay added to
|
||||
// ReconnectWait during a reconnect when no TLS is used.
|
||||
// Note that any jitter is capped with ReconnectJitterMax.
|
||||
ReconnectJitter time.Duration
|
||||
|
||||
// ReconnectJitterTLS sets the upper bound for a random delay added to
|
||||
// ReconnectWait during a reconnect when TLS is used.
|
||||
// Note that any jitter is capped with ReconnectJitterMax.
|
||||
ReconnectJitterTLS time.Duration
|
||||
|
||||
// Timeout sets the timeout for a Dial operation on a connection.
|
||||
Timeout time.Duration
|
||||
|
||||
@@ -409,6 +439,7 @@ type Conn struct {
|
||||
ptmr *time.Timer
|
||||
pout int
|
||||
ar bool // abort reconnect
|
||||
rqch chan struct{}
|
||||
|
||||
// New style response handler
|
||||
respSub string // The wildcard subject
|
||||
@@ -486,17 +517,16 @@ type Statistics struct {
|
||||
|
||||
// Tracks individual backend servers.
|
||||
type srv struct {
|
||||
url *url.URL
|
||||
didConnect bool
|
||||
reconnects int
|
||||
lastAttempt time.Time
|
||||
lastErr error
|
||||
isImplicit bool
|
||||
tlsName string
|
||||
url *url.URL
|
||||
didConnect bool
|
||||
reconnects int
|
||||
lastErr error
|
||||
isImplicit bool
|
||||
tlsName string
|
||||
}
|
||||
|
||||
type serverInfo struct {
|
||||
Id string `json:"server_id"`
|
||||
ID string `json:"server_id"`
|
||||
Host string `json:"host"`
|
||||
Port uint `json:"port"`
|
||||
Version string `json:"version"`
|
||||
@@ -506,6 +536,7 @@ type serverInfo struct {
|
||||
ConnectURLs []string `json:"connect_urls,omitempty"`
|
||||
Proto int `json:"proto,omitempty"`
|
||||
CID uint64 `json:"client_id,omitempty"`
|
||||
ClientIP string `json:"client_ip,omitempty"`
|
||||
Nonce string `json:"nonce,omitempty"`
|
||||
}
|
||||
|
||||
@@ -669,6 +700,24 @@ func MaxReconnects(max int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
|
||||
func ReconnectJitter(jitter, jitterForTLS time.Duration) Option {
|
||||
return func(o *Options) error {
|
||||
o.ReconnectJitter = jitter
|
||||
o.ReconnectJitterTLS = jitterForTLS
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option.
|
||||
// See CustomReconnectDelayCB Option for more details.
|
||||
func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
|
||||
return func(o *Options) error {
|
||||
o.CustomReconnectDelayCB = cb
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// PingInterval is an Option to set the period for client ping commands.
|
||||
func PingInterval(t time.Duration) Option {
|
||||
return func(o *Options) error {
|
||||
@@ -1123,7 +1172,7 @@ func (nc *Conn) setupServerPool() error {
|
||||
|
||||
// Randomize if allowed to
|
||||
if !nc.Opts.NoRandomize {
|
||||
nc.shufflePool()
|
||||
nc.shufflePool(0)
|
||||
}
|
||||
|
||||
// Normally, if this one is set, Options.Servers should not be,
|
||||
@@ -1220,14 +1269,16 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
|
||||
}
|
||||
|
||||
// shufflePool swaps randomly elements in the server pool
|
||||
func (nc *Conn) shufflePool() {
|
||||
if len(nc.srvPool) <= 1 {
|
||||
// The `offset` value indicates that the shuffling should start at
|
||||
// this offset and leave the elements from [0..offset) intact.
|
||||
func (nc *Conn) shufflePool(offset int) {
|
||||
if len(nc.srvPool) <= offset+1 {
|
||||
return
|
||||
}
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
r := rand.New(source)
|
||||
for i := range nc.srvPool {
|
||||
j := r.Intn(i + 1)
|
||||
for i := offset; i < len(nc.srvPool); i++ {
|
||||
j := offset + r.Intn(i+1-offset)
|
||||
nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
|
||||
}
|
||||
}
|
||||
@@ -1249,8 +1300,6 @@ func (nc *Conn) createConn() (err error) {
|
||||
}
|
||||
if _, cur := nc.currentServer(); cur == nil {
|
||||
return ErrNoServers
|
||||
} else {
|
||||
cur.lastAttempt = time.Now()
|
||||
}
|
||||
|
||||
// We will auto-expand host names if they resolve to multiple IPs
|
||||
@@ -1384,7 +1433,7 @@ func (nc *Conn) ConnectedServerId() string {
|
||||
if nc.status != CONNECTED {
|
||||
return _EMPTY_
|
||||
}
|
||||
return nc.info.Id
|
||||
return nc.info.ID
|
||||
}
|
||||
|
||||
// Low level setup for structs, etc
|
||||
@@ -1393,6 +1442,7 @@ func (nc *Conn) setup() {
|
||||
nc.pongs = make([]chan struct{}, 0, 8)
|
||||
|
||||
nc.fch = make(chan struct{}, flushChanSize)
|
||||
nc.rqch = make(chan struct{})
|
||||
|
||||
// Setup scratch outbound buffer for PUB
|
||||
pub := nc.scratch[:len(_PUB_P_)]
|
||||
@@ -1450,6 +1500,7 @@ func (nc *Conn) connect() error {
|
||||
// For first connect we walk all servers in the pool and try
|
||||
// to connect immediately.
|
||||
nc.mu.Lock()
|
||||
defer nc.mu.Unlock()
|
||||
nc.initc = true
|
||||
// The pool may change inside the loop iteration due to INFO protocol.
|
||||
for i := 0; i < len(nc.srvPool); i++ {
|
||||
@@ -1463,8 +1514,8 @@ func (nc *Conn) connect() error {
|
||||
err = nc.processConnectInit()
|
||||
|
||||
if err == nil {
|
||||
nc.srvPool[i].didConnect = true
|
||||
nc.srvPool[i].reconnects = 0
|
||||
nc.current.didConnect = true
|
||||
nc.current.reconnects = 0
|
||||
nc.current.lastErr = nil
|
||||
returnedErr = nil
|
||||
break
|
||||
@@ -1487,7 +1538,7 @@ func (nc *Conn) connect() error {
|
||||
if returnedErr == nil && nc.status != CONNECTED {
|
||||
returnedErr = ErrNoServers
|
||||
}
|
||||
nc.mu.Unlock()
|
||||
|
||||
return returnedErr
|
||||
}
|
||||
|
||||
@@ -1814,33 +1865,63 @@ func (nc *Conn) doReconnect(err error) {
|
||||
// This is used to wait on go routines exit if we start them in the loop
|
||||
// but an error occurs after that.
|
||||
waitForGoRoutines := false
|
||||
var rt *time.Timer
|
||||
// Channel used to kick routine out of sleep when conn is closed.
|
||||
rqch := nc.rqch
|
||||
// Counter that is increased when the whole list of servers has been tried.
|
||||
var wlf int
|
||||
|
||||
for len(nc.srvPool) > 0 {
|
||||
var jitter time.Duration
|
||||
var rw time.Duration
|
||||
// If a custom reconnect delay handler is set, this takes precedence.
|
||||
crd := nc.Opts.CustomReconnectDelayCB
|
||||
if crd == nil {
|
||||
rw = nc.Opts.ReconnectWait
|
||||
// TODO: since we sleep only after the whole list has been tried, we can't
|
||||
// rely on individual *srv to know if it is a TLS or non-TLS url.
|
||||
// We have to pick which type of jitter to use, for now, we use these hints:
|
||||
jitter = nc.Opts.ReconnectJitter
|
||||
if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
|
||||
jitter = nc.Opts.ReconnectJitterTLS
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; len(nc.srvPool) > 0; {
|
||||
cur, err := nc.selectNextServer()
|
||||
if err != nil {
|
||||
nc.err = err
|
||||
break
|
||||
}
|
||||
|
||||
sleepTime := int64(0)
|
||||
|
||||
// Sleep appropriate amount of time before the
|
||||
// connection attempt if connecting to same server
|
||||
// we just got disconnected from..
|
||||
if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
|
||||
sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
|
||||
}
|
||||
|
||||
// On Windows, createConn() will take more than a second when no
|
||||
// server is running at that address. So it could be that the
|
||||
// time elapsed between reconnect attempts is always > than
|
||||
// the set option. Release the lock to give a chance to a parallel
|
||||
// nc.Close() to break the loop.
|
||||
doSleep := i+1 >= len(nc.srvPool)
|
||||
nc.mu.Unlock()
|
||||
if sleepTime <= 0 {
|
||||
|
||||
if !doSleep {
|
||||
i++
|
||||
// Release the lock to give a chance to a concurrent nc.Close() to break the loop.
|
||||
runtime.Gosched()
|
||||
} else {
|
||||
time.Sleep(time.Duration(sleepTime))
|
||||
i = 0
|
||||
var st time.Duration
|
||||
if crd != nil {
|
||||
wlf++
|
||||
st = crd(wlf)
|
||||
} else {
|
||||
st = rw
|
||||
if jitter > 0 {
|
||||
st += time.Duration(rand.Int63n(int64(jitter)))
|
||||
}
|
||||
}
|
||||
if rt == nil {
|
||||
rt = time.NewTimer(st)
|
||||
} else {
|
||||
rt.Reset(st)
|
||||
}
|
||||
select {
|
||||
case <-rqch:
|
||||
rt.Stop()
|
||||
case <-rt.C:
|
||||
}
|
||||
}
|
||||
// If the readLoop, etc.. go routines were started, wait for them to complete.
|
||||
if waitForGoRoutines {
|
||||
@@ -2427,8 +2508,14 @@ func (nc *Conn) processInfo(info string) error {
|
||||
}
|
||||
nc.addURLToPool(fmt.Sprintf("%s://%s", nc.connScheme(), curl), true, saveTLS)
|
||||
}
|
||||
if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
|
||||
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
|
||||
if hasNew {
|
||||
// Randomize the pool if allowed but leave the first URL in place.
|
||||
if !nc.Opts.NoRandomize {
|
||||
nc.shufflePool(1)
|
||||
}
|
||||
if !nc.initc && nc.Opts.DiscoveredServersCB != nil {
|
||||
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -2748,7 +2835,7 @@ func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Ms
|
||||
inbox := NewInbox()
|
||||
ch := make(chan *Msg, RequestChanLen)
|
||||
|
||||
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, false)
|
||||
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -3541,10 +3628,25 @@ func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// RTT calculates the round trip time between this client and the server.
|
||||
func (nc *Conn) RTT() (time.Duration, error) {
|
||||
if nc.IsClosed() {
|
||||
return 0, ErrConnectionClosed
|
||||
}
|
||||
if nc.IsReconnecting() {
|
||||
return 0, ErrDisconnected
|
||||
}
|
||||
start := time.Now()
|
||||
if err := nc.FlushTimeout(10 * time.Second); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return time.Since(start), nil
|
||||
}
|
||||
|
||||
// Flush will perform a round trip to the server and return when it
|
||||
// receives the internal reply.
|
||||
func (nc *Conn) Flush() error {
|
||||
return nc.FlushTimeout(60 * time.Second)
|
||||
return nc.FlushTimeout(10 * time.Second)
|
||||
}
|
||||
|
||||
// Buffered will return the number of bytes buffered to be sent to the server.
|
||||
@@ -3636,9 +3738,13 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {
|
||||
|
||||
// Kick the Go routines so they fall out.
|
||||
nc.kickFlusher()
|
||||
nc.mu.Unlock()
|
||||
|
||||
nc.mu.Lock()
|
||||
// If the reconnect timer is waiting between a reconnect attempt,
|
||||
// this will kick it out.
|
||||
if nc.rqch != nil {
|
||||
close(nc.rqch)
|
||||
nc.rqch = nil
|
||||
}
|
||||
|
||||
// Clear any queued pongs, e.g. pending flush calls.
|
||||
nc.clearPendingFlushCalls()
|
||||
@@ -4005,10 +4111,25 @@ func (nc *Conn) Barrier(f func()) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClientIP returns the client IP as known by the server.
|
||||
// Supported as of server version 2.1.6.
|
||||
func (nc *Conn) GetClientIP() (net.IP, error) {
|
||||
nc.mu.RLock()
|
||||
defer nc.mu.RUnlock()
|
||||
if nc.isClosed() {
|
||||
return nil, ErrConnectionClosed
|
||||
}
|
||||
if nc.info.ClientIP == "" {
|
||||
return nil, ErrClientIPNotSupported
|
||||
}
|
||||
ip := net.ParseIP(nc.info.ClientIP)
|
||||
return ip, nil
|
||||
}
|
||||
|
||||
// GetClientID returns the client ID assigned by the server to which
|
||||
// the client is currently connected to. Note that the value may change if
|
||||
// the client reconnects.
|
||||
// This function returns ErrNoClientIDReturned if the server is of a
|
||||
// This function returns ErrClientIDNotSupported if the server is of a
|
||||
// version prior to 1.2.0.
|
||||
func (nc *Conn) GetClientID() (uint64, error) {
|
||||
nc.mu.RLock()
|
||||
|
||||
2
vendor/github.com/nats-io/nats.go/parser.go
generated
vendored
2
vendor/github.com/nats-io/nats.go/parser.go
generated
vendored
@@ -24,7 +24,7 @@ type msgArg struct {
|
||||
size int
|
||||
}
|
||||
|
||||
const MAX_CONTROL_LINE_SIZE = 1024
|
||||
const MAX_CONTROL_LINE_SIZE = 4096
|
||||
|
||||
type parseState struct {
|
||||
state int
|
||||
|
||||
10
vendor/modules.txt
vendored
10
vendor/modules.txt
vendored
@@ -1,19 +1,27 @@
|
||||
# github.com/golang/protobuf v1.3.5
|
||||
## explicit
|
||||
# github.com/nats-io/jwt v0.3.2
|
||||
## explicit
|
||||
github.com/nats-io/jwt
|
||||
# github.com/nats-io/nats.go v1.9.2
|
||||
# github.com/nats-io/nats.go v1.10.0
|
||||
## explicit
|
||||
github.com/nats-io/nats.go
|
||||
github.com/nats-io/nats.go/encoders/builtin
|
||||
github.com/nats-io/nats.go/util
|
||||
# github.com/nats-io/nkeys v0.1.4
|
||||
## explicit
|
||||
github.com/nats-io/nkeys
|
||||
# github.com/nats-io/nuid v1.0.1
|
||||
## explicit
|
||||
github.com/nats-io/nuid
|
||||
# golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
|
||||
## explicit
|
||||
golang.org/x/crypto/bcrypt
|
||||
golang.org/x/crypto/blowfish
|
||||
golang.org/x/crypto/ed25519
|
||||
golang.org/x/crypto/ed25519/internal/edwards25519
|
||||
# golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e
|
||||
## explicit
|
||||
golang.org/x/sys/windows
|
||||
golang.org/x/sys/windows/registry
|
||||
golang.org/x/sys/windows/svc
|
||||
|
||||
Reference in New Issue
Block a user