diff --git a/main.go b/main.go index e56318c7..2165c1dd 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ Cluster Options: --routes Routes to solicit and connect --cluster Cluster URL for solicited routes --no_advertise Advertise known cluster IPs to clients + --conn_retries For implicit routes, number of connect retries Common Options: @@ -110,6 +111,7 @@ func main() { flag.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") flag.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") flag.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") + flag.IntVar(&opts.Cluster.ConnRetries, "conn_retries", 0, "For implicit routes, number of connect retries") flag.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") flag.BoolVar(&opts.TLS, "tls", false, "Enable TLS.") flag.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.") diff --git a/server/opts.go b/server/opts.go index 455d496e..f012a84d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -42,6 +42,7 @@ type ClusterOpts struct { TLSConfig *tls.Config `json:"-"` ListenStr string `json:"-"` NoAdvertise bool `json:"-"` + ConnRetries int `json:"-"` } // Options block for gnatsd server. @@ -314,6 +315,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.Cluster.TLSTimeout = tc.Timeout case "no_advertise": opts.Cluster.NoAdvertise = mv.(bool) + case "conn_retries": + opts.Cluster.ConnRetries = int(mv.(int64)) } } return nil @@ -647,6 +650,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.Cluster.NoAdvertise { opts.Cluster.NoAdvertise = true } + if flagOpts.Cluster.ConnRetries != 0 { + opts.Cluster.ConnRetries = flagOpts.Cluster.ConnRetries + } if flagOpts.RoutesStr != "" { mergeRoutes(&opts, flagOpts) } diff --git a/server/opts_test.go b/server/opts_test.go index 54ac3c2a..508f3a55 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -192,6 +192,7 @@ func TestMergeOverrides(t *testing.T) { MaxPingsOut: 3, Cluster: ClusterOpts{ NoAdvertise: true, + ConnRetries: 2, }, } fopts, err := ProcessConfigFile("./configs/test.conf") @@ -208,6 +209,7 @@ func TestMergeOverrides(t *testing.T) { ProfPort: 6789, Cluster: ClusterOpts{ NoAdvertise: true, + ConnRetries: 2, }, } merged := MergeOptions(fopts, opts) diff --git a/server/route.go b/server/route.go index f9e04343..b55ffe0b 100644 --- a/server/route.go +++ b/server/route.go @@ -689,18 +689,25 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) { func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) { defer s.grWG.Done() + attempts := 0 for s.isRunning() && rURL != nil { Debugf("Trying to connect to route on %s", rURL.Host) conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL) if err != nil { Debugf("Error trying to connect to route: %v", err) + if !tryForEver { + if s.opts.Cluster.ConnRetries <= 0 { + return + } + attempts++ + if attempts > s.opts.Cluster.ConnRetries { + return + } + } select { case <-s.rcQuit: return case <-time.After(DEFAULT_ROUTE_CONNECT): - if !tryForEver { - return - } continue } } diff --git a/test/route_discovery_test.go b/test/route_discovery_test.go index 64b4434d..8197a638 100644 --- a/test/route_discovery_test.go +++ b/test/route_discovery_test.go @@ -3,6 +3,7 @@ package test import ( + "bufio" "encoding/json" "fmt" "io/ioutil" @@ -638,3 +639,65 @@ func TestSeedReturnIPInInfo(t *testing.T) { t.Fatalf("Expected IP %s, got %s", s1, s2) } } + +func TestImplicitRouteRetry(t *testing.T) { + srvSeed, optsSeed := runSeedServer(t) + defer srvSeed.Shutdown() + + optsA := nextServerOpts(optsSeed) + optsA.Routes = server.RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsA.Cluster.ConnRetries = 5 + srvA := RunServer(optsA) + defer srvA.Shutdown() + + optsB := nextServerOpts(optsA) + rcb := createRouteConn(t, optsSeed.Cluster.Host, optsSeed.Cluster.Port) + defer rcb.Close() + rcbID := "ServerB" + routeBSend, routeBExpect := setupRouteEx(t, rcb, optsB, rcbID) + routeBExpect(infoRe) + // register ourselves via INFO + rbInfo := server.Info{ID: rcbID, Host: optsB.Cluster.Host, Port: optsB.Cluster.Port} + b, _ := json.Marshal(rbInfo) + infoJSON := fmt.Sprintf(server.InfoProto, b) + routeBSend(infoJSON) + routeBSend("PING\r\n") + routeBExpect(pongRe) + + // srvA should try to connect. Wait to make sure that it fails. + time.Sleep(1200 * time.Millisecond) + + // Setup a fake route listen for routeB + rbListen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", optsB.Cluster.Host, optsB.Cluster.Port)) + if err != nil { + t.Fatalf("Error during listen: %v", err) + } + c, err := rbListen.Accept() + if err != nil { + t.Fatalf("Error during accept: %v", err) + } + defer c.Close() + + br := bufio.NewReaderSize(c, 32768) + // Consume CONNECT and INFO + for i := 0; i < 2; i++ { + c.SetReadDeadline(time.Now().Add(2 * time.Second)) + buf, _, err := br.ReadLine() + c.SetReadDeadline(time.Time{}) + if err != nil { + t.Fatalf("Error reading: %v", err) + } + if i == 0 { + continue + } + buf = buf[len("INFO "):] + info := &server.Info{} + if err := json.Unmarshal(buf, info); err != nil { + t.Fatalf("Error during unmarshal: %v", err) + } + // Check INFO is from server A. + if info.ID != srvA.ID() { + t.Fatalf("Expected CONNECT from %v, got CONNECT from %v", srvA.ID(), info.ID) + } + } +}