diff --git a/server/configs/cluster.conf b/server/configs/cluster.conf new file mode 100644 index 00000000..99275dbd --- /dev/null +++ b/server/configs/cluster.conf @@ -0,0 +1,35 @@ + +# Cluster config file + +port: 4242 +net: apcera.me # net interface + +authorization { + user: derek + password: bella + timeout: 1 +} + +pid_file: '/tmp/nats_cluster_test.pid' +log_file: '/tmp/nats_cluster_test.log' + +cluster { + host: '127.0.0.1' + port: 4244 + + authorization { + user: route_user + password: top_secret + timeout: 1 + } + + # Routes are actively solicited and connected to from this server. + # Other servers can connect to us if they supply the correct credentials + # in their routes definitions from above. + + routes = [ + nats-route://foo:bar@apcera.me:4245 + nats-route://foo:bar@apcera.me:4246 + ] +} + diff --git a/server/opts.go b/server/opts.go index ee49be22..ea496ac6 100644 --- a/server/opts.go +++ b/server/opts.go @@ -3,7 +3,10 @@ package server import ( + "fmt" + "io/ioutil" + "net/url" "strings" "time" @@ -11,24 +14,36 @@ import ( ) type Options struct { - Host string `json:"addr"` - Port int `json:"port"` - Trace bool `json:"-"` - Debug bool `json:"-"` - NoLog bool `json:"-"` - NoSigs bool `json:"-"` - Logtime bool `json:"-"` - MaxConn int `json:"max_connections"` - Username string `json:"user,omitempty"` - Password string `json:"-"` - Authorization string `json:"-"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HttpPort int `json:"http_port"` - SslTimeout float64 `json:"ssl_timeout"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int `json:"max_control_line"` - MaxPayload int `json:"max_payload"` + Host string `json:"addr"` + Port int `json:"port"` + Trace bool `json:"-"` + Debug bool `json:"-"` + NoLog bool `json:"-"` + NoSigs bool `json:"-"` + Logtime bool `json:"-"` + MaxConn int `json:"max_connections"` + Username string `json:"user,omitempty"` + Password string `json:"-"` + Authorization string `json:"-"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HttpPort int `json:"http_port"` + SslTimeout float64 `json:"ssl_timeout"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int `json:"max_control_line"` + MaxPayload int `json:"max_payload"` + ClusterHost string `json:"addr"` + ClusterPort int `json:"port"` + ClusterUsername string `json:"-"` + ClusterPassword string `json:"-"` + ClusterAuthTimeout float64 `json:"auth_timeout"` + Routes []*route `json:"-"` +} + +type authorization struct { + user string + pass string + timeout float64 } // FIXME(dlc): Hacky @@ -63,28 +78,74 @@ func ProcessConfigFile(configFile string) (*Options, error) { opts.Logtime = v.(bool) case "authorization": am := v.(map[string]interface{}) - for mk, mv := range am { - switch strings.ToLower(mk) { - case "user", "username": - opts.Username = mv.(string) - case "pass", "password": - opts.Password = mv.(string) - case "timeout": - at := float64(1) - switch mv.(type) { - case int64: - at = float64(mv.(int64)) - case float64: - at = mv.(float64) - } - opts.AuthTimeout = at / float64(time.Second) - } + auth := parseAuthorization(am) + opts.Username = auth.user + opts.Password = auth.pass + opts.AuthTimeout = auth.timeout + case "cluster": + cm := v.(map[string]interface{}) + if err := parseCluster(cm, opts); err != nil { + return nil, err } } } return opts, nil } +// parseCluster will parse the cluster config. +func parseCluster(cm map[string]interface{}, opts *Options) error { + for mk, mv := range cm { + switch strings.ToLower(mk) { + case "port": + opts.ClusterPort = int(mv.(int64)) + case "host", "net": + opts.ClusterHost = mv.(string) + case "authorization": + am := mv.(map[string]interface{}) + auth := parseAuthorization(am) + opts.ClusterUsername = auth.user + opts.ClusterPassword = auth.pass + opts.ClusterAuthTimeout = auth.timeout + case "routes": + ra := mv.([]interface{}) + opts.Routes = make([]*route, 0, len(ra)) + for _, r := range ra { + routeUrl := r.(string) + url, err := url.Parse(routeUrl) + if err != nil { + return fmt.Errorf("Error parsing route url [%q]", routeUrl) + } + route := &route{url: url} + opts.Routes = append(opts.Routes, route) + } + } + } + return nil +} + +// Helper function to parse Authorization configs. +func parseAuthorization(am map[string]interface{}) authorization { + auth := authorization{} + for mk, mv := range am { + switch strings.ToLower(mk) { + case "user", "username": + auth.user = mv.(string) + case "pass", "password": + auth.pass = mv.(string) + case "timeout": + at := float64(1) + switch mv.(type) { + case int64: + at = float64(mv.(int64)) + case float64: + at = mv.(float64) + } + auth.timeout = at / float64(time.Second) + } + } + return auth +} + // Will merge two options giving preference to the flagOpts if the item is present. func MergeOptions(fileOpts, flagOpts *Options) *Options { if fileOpts == nil { diff --git a/server/route.go b/server/route.go new file mode 100644 index 00000000..78e49712 --- /dev/null +++ b/server/route.go @@ -0,0 +1,13 @@ +// Copyright 2013 Apcera Inc. All rights reserved. + +package server + +import ( + "net/url" + "sync" +) + +type route struct { + mu sync.Mutex + url *url.URL +} diff --git a/server/routes_test.go b/server/routes_test.go new file mode 100644 index 00000000..f1856777 --- /dev/null +++ b/server/routes_test.go @@ -0,0 +1,43 @@ +// Copyright 2013 Apcera Inc. All rights reserved. + +package server + +import ( + "net/url" + "reflect" + "testing" + "time" +) + +func TestRouteConfig(t *testing.T) { + opts, err := ProcessConfigFile("./configs/cluster.conf") + if err != nil { + t.Fatalf("Received an error reading route config file: %v\n", err) + } + + golden := &Options{ + Host: "apcera.me", + Port: 4242, + Username: "derek", + Password: "bella", + AuthTimeout: 1.0 / float64(time.Second), + ClusterHost: "127.0.0.1", + ClusterPort: 4244, + ClusterUsername: "route_user", + ClusterPassword: "top_secret", + ClusterAuthTimeout: 1.0 / float64(time.Second), + } + + // Setup URLs + r1, _ := url.Parse("nats-route://foo:bar@apcera.me:4245") + r2, _ := url.Parse("nats-route://foo:bar@apcera.me:4246") + + golden.Routes = []*route{ + &route{url: r1}, &route{url: r2}, + } + + if !reflect.DeepEqual(golden, opts) { + t.Fatalf("Options are incorrect from config file.\nexpected: %+v\ngot: %+v", + golden, opts) + } +}