diff --git a/gnatsd.go b/gnatsd.go index 50f6ea3a..2330272b 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -81,6 +81,11 @@ func main() { s.StartHTTPMonitoring() } + // Start up clustering as well if needed. + if opts.ClusterPort != 0 { + s.StartCluster() + } + // Profiler go func() { log.Println(http.ListenAndServe("localhost:6062", nil)) diff --git a/server/client_test.go b/server/client_test.go index e75b384f..b3f25220 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -88,7 +88,6 @@ func TestClientCreateAndInfo(t *testing.T) { info.Port != DEFAULT_PORT { t.Fatalf("INFO inconsistent: %+v\n", info) } - } func TestClientConnect(t *testing.T) { diff --git a/server/const.go b/server/const.go index add6e6af..250c0464 100644 --- a/server/const.go +++ b/server/const.go @@ -7,7 +7,7 @@ import ( ) const ( - VERSION = "go-0.2.12.alpha.1" + VERSION = "go-0.3.0.alpha.2" DEFAULT_PORT = 4222 DEFAULT_HOST = "0.0.0.0" diff --git a/server/opts.go b/server/opts.go index ea496ac6..4cc87540 100644 --- a/server/opts.go +++ b/server/opts.go @@ -37,7 +37,7 @@ type Options struct { ClusterUsername string `json:"-"` ClusterPassword string `json:"-"` ClusterAuthTimeout float64 `json:"auth_timeout"` - Routes []*route `json:"-"` + Routes []*url.URL `json:"-"` } type authorization struct { @@ -108,15 +108,14 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.ClusterAuthTimeout = auth.timeout case "routes": ra := mv.([]interface{}) - opts.Routes = make([]*route, 0, len(ra)) + opts.Routes = make([]*url.URL, 0, len(ra)) for _, r := range ra { routeUrl := r.(string) url, err := url.Parse(routeUrl) if err != nil { return fmt.Errorf("Error parsing route url [%q]", routeUrl) } - route := &route{url: url} - opts.Routes = append(opts.Routes, route) + opts.Routes = append(opts.Routes, url) } } } diff --git a/server/parser_test.go b/server/parser_test.go index 6302794d..b52bf9da 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -1,3 +1,4 @@ + // Copyright 2012 Apcera Inc. All rights reserved. package server diff --git a/server/route.go b/server/route.go index 78e49712..cd679f6f 100644 --- a/server/route.go +++ b/server/route.go @@ -3,11 +3,36 @@ package server import ( + "bufio" + "fmt" + "net" "net/url" "sync" + "sync/atomic" + "time" ) type route struct { - mu sync.Mutex - url *url.URL + mu sync.Mutex + rid uint64 + conn net.Conn + bw *bufio.Writer + srv *Server + url *url.URL + atmr *time.Timer + ptmr *time.Timer + pout int + parseState + stats +} + +func (r *route) String() string { + return fmt.Sprintf("rid:%d", r.rid) +} + +func (s *Server) createRoute(conn net.Conn) *route { + r := &route{srv: s, conn: conn} + r.rid = atomic.AddUint64(&s.grid, 1) + r.bw = bufio.NewWriterSize(conn, defaultBufSize) + return r } diff --git a/server/server.go b/server/server.go index 7c5e46af..22691ff5 100644 --- a/server/server.go +++ b/server/server.go @@ -43,6 +43,9 @@ type Server struct { done chan bool start time.Time stats + + routeListener net.Listener + grid uint64 } type stats struct { @@ -95,6 +98,10 @@ func New(opts *Options) *Server { s.handleSignals() + Logf("Starting nats-server version %s", VERSION) + + s.running = true + return s } @@ -140,11 +147,22 @@ func (s *Server) Shutdown() { clients[i] = c } - // Kick AcceptLoop() + // Number of done channel responses we expect. + doneExpected := 0 + + // Kick client AcceptLoop() if s.listener != nil { + doneExpected++ s.listener.Close() s.listener = nil } + + if s.routeListener != nil { + doneExpected++ + s.routeListener.Close() + s.routeListener = nil + } + s.mu.Unlock() // Close client connections @@ -152,13 +170,16 @@ func (s *Server) Shutdown() { c.closeConnection() } - <-s.done + // Block until the accept loops exit + for doneExpected > 0 { + <-s.done + doneExpected-- + } } func (s *Server) AcceptLoop() { - Logf("Starting nats-server version %s on port %d", VERSION, s.opts.Port) - hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port) + Logf("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { Fatalf("Error listening on port: %d - %v", s.opts.Port, e) @@ -170,7 +191,6 @@ func (s *Server) AcceptLoop() { // Setup state that can enable shutdown s.mu.Lock() s.listener = l - s.running = true s.mu.Unlock() tmpDelay := ACCEPT_MIN_SLEEP @@ -179,7 +199,7 @@ func (s *Server) AcceptLoop() { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - Debug("Temporary Accept Error(%v), sleeping %dms", + Debug("Temporary Client Accept Error(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -194,8 +214,53 @@ func (s *Server) AcceptLoop() { tmpDelay = ACCEPT_MIN_SLEEP s.createClient(conn) } - s.done <- true Log("Server Exiting..") + s.done <- true +} + +func (s *Server) routeAcceptLoop() { + hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort) + Logf("Listening for route connections on %s", hp) + l, e := net.Listen("tcp", hp) + if e != nil { + Fatalf("Error listening on router port: %d - %v", s.opts.Port, e) + return + } + + // Setup state that can enable shutdown + s.mu.Lock() + s.routeListener = l + s.mu.Unlock() + + tmpDelay := ACCEPT_MIN_SLEEP + + for s.isRunning() { + _, err := l.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + Debug("Temporary Route Accept Error(%v), sleeping %dms", + ne, tmpDelay/time.Millisecond) + time.Sleep(tmpDelay) + tmpDelay *= 2 + if tmpDelay > ACCEPT_MAX_SLEEP { + tmpDelay = ACCEPT_MAX_SLEEP + } + } else if s.isRunning() { + Logf("Accept error: %v", err) + } + continue + } + tmpDelay = ACCEPT_MIN_SLEEP + // s.createRoute(conn) + } + Debug("Router accept loop exiting..") + s.done <- true +} + +// StartCluster will start the accept loop on the cluster host:port +// and will actively try to connect to listed routes. +func (s *Server) StartCluster() { + go s.routeAcceptLoop() } func (s *Server) StartHTTPMonitoring() { diff --git a/test/configs/cluster.conf b/test/configs/cluster.conf new file mode 100644 index 00000000..563d36ad --- /dev/null +++ b/test/configs/cluster.conf @@ -0,0 +1,32 @@ + +# Cluster config file + +port: 4242 +#net: apcera.me # net interface + +authorization { + user: derek + password: bella + timeout: 1 +} + +cluster { + host: '127.0.0.1' + port: 4244 + + authorization { + user: route_user + password: top_secret + timeout: 1 + } + + # Routes are actively solicited and connected to from this server. + # Other servers can connect to us if they supply the correct credentials + # in their routes definitions from above. + + routes = [ + nats-route://foo:bar@apcera.me:4245 + nats-route://foo:bar@apcera.me:4246 + ] +} + diff --git a/test/gosrv_test.go b/test/gosrv_test.go index 848673fd..5419745f 100644 --- a/test/gosrv_test.go +++ b/test/gosrv_test.go @@ -9,26 +9,27 @@ import ( ) func TestSimpleGoServerShutdown(t *testing.T) { - s := runDefaultServer() base := runtime.NumGoroutine() + s := runDefaultServer() s.Shutdown() time.Sleep(10 * time.Millisecond) delta := (runtime.NumGoroutine() - base) - if delta > 0 { + if delta > 1 { t.Fatalf("%d Go routines still exist post Shutdown()", delta) } } func TestGoServerShutdownWithClients(t *testing.T) { + base := runtime.NumGoroutine() s := runDefaultServer() for i := 0 ; i < 10 ; i++ { createClientConn(t, "localhost", 4222) } - base := runtime.NumGoroutine() s.Shutdown() - time.Sleep(10 * time.Millisecond) + // Wait longer for client connections + time.Sleep(50 * time.Millisecond) delta := (runtime.NumGoroutine() - base) - if delta > 0 { + if delta > 1 { t.Fatalf("%d Go routines still exist post Shutdown()", delta) } } diff --git a/test/test.go b/test/test.go index 91938b2f..209a4765 100644 --- a/test/test.go +++ b/test/test.go @@ -46,7 +46,12 @@ func RunServer(opts *server.Options) *server.Server { } s := server.New(opts) if s == nil { - panic("No nats server object returned.") + panic("No NATS Server object returned.") + } + + // Start up clustering as well if needed. + if opts.ClusterPort != 0 { + s.StartCluster() } go s.AcceptLoop() @@ -64,7 +69,7 @@ func RunServer(opts *server.Options) *server.Server { conn.Close() return s } - panic("Unable to start NATs Server in Go Routine") + panic("Unable to start NATS Server in Go Routine") } func startServer(t tLogger, port int, other string) *natsServer {