mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Allow flag to connect to routes in clustered mode
This commit is contained in:
@@ -51,6 +51,7 @@ func main() {
|
||||
flag.BoolVar(&showVersion, "version", false, "Print version information.")
|
||||
flag.BoolVar(&showVersion, "v", false, "Print version information.")
|
||||
flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port")
|
||||
flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
|
||||
|
||||
flag.Usage = server.Usage
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ const (
|
||||
DEFAULT_FLUSH_DEADLINE = 2 * time.Second
|
||||
|
||||
// DEFAULT_HTTP_PORT is the default monitoring port.
|
||||
DEFAULT_HTTP_PORT = 8333
|
||||
DEFAULT_HTTP_PORT = 8222
|
||||
|
||||
// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
|
||||
ACCEPT_MIN_SLEEP = 10 * time.Millisecond
|
||||
|
||||
@@ -40,12 +40,13 @@ type Options struct {
|
||||
ClusterUsername string `json:"-"`
|
||||
ClusterPassword string `json:"-"`
|
||||
ClusterAuthTimeout float64 `json:"auth_timeout"`
|
||||
Routes []*url.URL `json:"-"`
|
||||
ProfPort int `json:"-"`
|
||||
PidFile string `json:"-"`
|
||||
LogFile string `json:"-"`
|
||||
Syslog bool `json:"-"`
|
||||
RemoteSyslog string `json:"-"`
|
||||
Routes []*url.URL `json:"-"`
|
||||
RoutesStr string `json:"-"`
|
||||
}
|
||||
|
||||
type authorization struct {
|
||||
@@ -222,9 +223,36 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
|
||||
if flagOpts.ProfPort != 0 {
|
||||
opts.ProfPort = flagOpts.ProfPort
|
||||
}
|
||||
if flagOpts.RoutesStr != "" {
|
||||
mergeRoutes(&opts, flagOpts)
|
||||
}
|
||||
return &opts
|
||||
}
|
||||
|
||||
func RoutesFromStr(routesStr string) []*url.URL {
|
||||
routes := strings.Split(routesStr, ",")
|
||||
if len(routes) == 0 {
|
||||
return nil
|
||||
}
|
||||
routeUrls := []*url.URL{}
|
||||
for _, r := range routes {
|
||||
r = strings.TrimSpace(r)
|
||||
u, _ := url.Parse(r)
|
||||
routeUrls = append(routeUrls, u)
|
||||
}
|
||||
return routeUrls
|
||||
}
|
||||
|
||||
// This will merge the flag routes and override anything that was present.
|
||||
func mergeRoutes(opts, flagOpts *Options) {
|
||||
routeUrls := RoutesFromStr(flagOpts.RoutesStr)
|
||||
if routeUrls == nil {
|
||||
return
|
||||
}
|
||||
opts.Routes = routeUrls
|
||||
opts.RoutesStr = flagOpts.RoutesStr
|
||||
}
|
||||
|
||||
func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) {
|
||||
var cleanRoutes []*url.URL
|
||||
cport := strconv.Itoa(clusterPort)
|
||||
|
||||
@@ -151,3 +151,67 @@ func TestAllowRouteWithDifferentPort(t *testing.T) {
|
||||
t.Fatalf("Wrong number of routes: %d", len(newroutes))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteFlagOverride(t *testing.T) {
|
||||
routeFlag := "nats-route://ruser:top_secret@127.0.0.1:8246"
|
||||
rurl, _ := url.Parse(routeFlag)
|
||||
|
||||
golden := &Options{
|
||||
Port: 7222,
|
||||
ClusterHost: "127.0.0.1",
|
||||
ClusterPort: 7244,
|
||||
ClusterUsername: "ruser",
|
||||
ClusterPassword: "top_secret",
|
||||
ClusterAuthTimeout: 0.5,
|
||||
Routes: []*url.URL{rurl},
|
||||
RoutesStr: routeFlag,
|
||||
}
|
||||
|
||||
fopts, err := ProcessConfigFile("./configs/srv_a.conf")
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
|
||||
// Overrides via flags
|
||||
opts := &Options{
|
||||
RoutesStr: routeFlag,
|
||||
}
|
||||
merged := MergeOptions(fopts, opts)
|
||||
|
||||
if !reflect.DeepEqual(golden, merged) {
|
||||
t.Fatalf("Options are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
golden, merged)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteFlagOverrideWithMultiple(t *testing.T) {
|
||||
routeFlag := "nats-route://ruser:top_secret@127.0.0.1:8246, nats-route://ruser:top_secret@127.0.0.1:8266"
|
||||
rurls := RoutesFromStr(routeFlag)
|
||||
|
||||
golden := &Options{
|
||||
Port: 7222,
|
||||
ClusterHost: "127.0.0.1",
|
||||
ClusterPort: 7244,
|
||||
ClusterUsername: "ruser",
|
||||
ClusterPassword: "top_secret",
|
||||
ClusterAuthTimeout: 0.5,
|
||||
Routes: rurls,
|
||||
RoutesStr: routeFlag,
|
||||
}
|
||||
|
||||
fopts, err := ProcessConfigFile("./configs/srv_a.conf")
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
|
||||
// Overrides via flags
|
||||
opts := &Options{
|
||||
RoutesStr: routeFlag,
|
||||
}
|
||||
merged := MergeOptions(fopts, opts)
|
||||
|
||||
if !reflect.DeepEqual(golden, merged) {
|
||||
t.Fatalf("Options are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
golden, merged)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user