[ADDED] Ability to configure number of connect retries for implicit routes

When a server is told to connect to a server (with auto-discovery),
it tries to connect once. There have been a report where that
connection fails, but would probably succeed if tried again (#408).
This new parameter allows to configure the number of times a failed
implicit connect should be tried.

Resolves #408
This commit is contained in:
Ivan Kozlovic
2016-12-20 18:37:23 -07:00
parent 5dcad241bc
commit a8dfaeae3d
5 changed files with 83 additions and 3 deletions

View File

@@ -50,6 +50,7 @@ Cluster Options:
--routes <rurl-1, rurl-2> Routes to solicit and connect
--cluster <cluster-url> Cluster URL for solicited routes
--no_advertise <bool> Advertise known cluster IPs to clients
--conn_retries <number> For implicit routes, number of connect retries
Common Options:
@@ -110,6 +111,7 @@ func main() {
flag.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.")
flag.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.")
flag.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.")
flag.IntVar(&opts.Cluster.ConnRetries, "conn_retries", 0, "For implicit routes, number of connect retries")
flag.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.")
flag.BoolVar(&opts.TLS, "tls", false, "Enable TLS.")
flag.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.")

View File

@@ -42,6 +42,7 @@ type ClusterOpts struct {
TLSConfig *tls.Config `json:"-"`
ListenStr string `json:"-"`
NoAdvertise bool `json:"-"`
ConnRetries int `json:"-"`
}
// Options block for gnatsd server.
@@ -314,6 +315,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.Cluster.TLSTimeout = tc.Timeout
case "no_advertise":
opts.Cluster.NoAdvertise = mv.(bool)
case "conn_retries":
opts.Cluster.ConnRetries = int(mv.(int64))
}
}
return nil
@@ -647,6 +650,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
if flagOpts.Cluster.NoAdvertise {
opts.Cluster.NoAdvertise = true
}
if flagOpts.Cluster.ConnRetries != 0 {
opts.Cluster.ConnRetries = flagOpts.Cluster.ConnRetries
}
if flagOpts.RoutesStr != "" {
mergeRoutes(&opts, flagOpts)
}

View File

@@ -192,6 +192,7 @@ func TestMergeOverrides(t *testing.T) {
MaxPingsOut: 3,
Cluster: ClusterOpts{
NoAdvertise: true,
ConnRetries: 2,
},
}
fopts, err := ProcessConfigFile("./configs/test.conf")
@@ -208,6 +209,7 @@ func TestMergeOverrides(t *testing.T) {
ProfPort: 6789,
Cluster: ClusterOpts{
NoAdvertise: true,
ConnRetries: 2,
},
}
merged := MergeOptions(fopts, opts)

View File

@@ -689,18 +689,25 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) {
func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
defer s.grWG.Done()
attempts := 0
for s.isRunning() && rURL != nil {
Debugf("Trying to connect to route on %s", rURL.Host)
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL)
if err != nil {
Debugf("Error trying to connect to route: %v", err)
if !tryForEver {
if s.opts.Cluster.ConnRetries <= 0 {
return
}
attempts++
if attempts > s.opts.Cluster.ConnRetries {
return
}
}
select {
case <-s.rcQuit:
return
case <-time.After(DEFAULT_ROUTE_CONNECT):
if !tryForEver {
return
}
continue
}
}

View File

@@ -3,6 +3,7 @@
package test
import (
"bufio"
"encoding/json"
"fmt"
"io/ioutil"
@@ -638,3 +639,65 @@ func TestSeedReturnIPInInfo(t *testing.T) {
t.Fatalf("Expected IP %s, got %s", s1, s2)
}
}
func TestImplicitRouteRetry(t *testing.T) {
srvSeed, optsSeed := runSeedServer(t)
defer srvSeed.Shutdown()
optsA := nextServerOpts(optsSeed)
optsA.Routes = server.RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
optsA.Cluster.ConnRetries = 5
srvA := RunServer(optsA)
defer srvA.Shutdown()
optsB := nextServerOpts(optsA)
rcb := createRouteConn(t, optsSeed.Cluster.Host, optsSeed.Cluster.Port)
defer rcb.Close()
rcbID := "ServerB"
routeBSend, routeBExpect := setupRouteEx(t, rcb, optsB, rcbID)
routeBExpect(infoRe)
// register ourselves via INFO
rbInfo := server.Info{ID: rcbID, Host: optsB.Cluster.Host, Port: optsB.Cluster.Port}
b, _ := json.Marshal(rbInfo)
infoJSON := fmt.Sprintf(server.InfoProto, b)
routeBSend(infoJSON)
routeBSend("PING\r\n")
routeBExpect(pongRe)
// srvA should try to connect. Wait to make sure that it fails.
time.Sleep(1200 * time.Millisecond)
// Setup a fake route listen for routeB
rbListen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", optsB.Cluster.Host, optsB.Cluster.Port))
if err != nil {
t.Fatalf("Error during listen: %v", err)
}
c, err := rbListen.Accept()
if err != nil {
t.Fatalf("Error during accept: %v", err)
}
defer c.Close()
br := bufio.NewReaderSize(c, 32768)
// Consume CONNECT and INFO
for i := 0; i < 2; i++ {
c.SetReadDeadline(time.Now().Add(2 * time.Second))
buf, _, err := br.ReadLine()
c.SetReadDeadline(time.Time{})
if err != nil {
t.Fatalf("Error reading: %v", err)
}
if i == 0 {
continue
}
buf = buf[len("INFO "):]
info := &server.Info{}
if err := json.Unmarshal(buf, info); err != nil {
t.Fatalf("Error during unmarshal: %v", err)
}
// Check INFO is from server A.
if info.ID != srvA.ID() {
t.Fatalf("Expected CONNECT from %v, got CONNECT from %v", srvA.ID(), info.ID)
}
}
}