Cluster startup

This commit is contained in:
Derek Collison
2013-07-27 16:29:25 -07:00
parent dbca2a8faa
commit 5189dba7b6
10 changed files with 154 additions and 22 deletions

View File

@@ -81,6 +81,11 @@ func main() {
s.StartHTTPMonitoring()
}
// Start up clustering as well if needed.
if opts.ClusterPort != 0 {
s.StartCluster()
}
// Profiler
go func() {
log.Println(http.ListenAndServe("localhost:6062", nil))

View File

@@ -88,7 +88,6 @@ func TestClientCreateAndInfo(t *testing.T) {
info.Port != DEFAULT_PORT {
t.Fatalf("INFO inconsistent: %+v\n", info)
}
}
func TestClientConnect(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
)
const (
VERSION = "go-0.2.12.alpha.1"
VERSION = "go-0.3.0.alpha.2"
DEFAULT_PORT = 4222
DEFAULT_HOST = "0.0.0.0"

View File

@@ -37,7 +37,7 @@ type Options struct {
ClusterUsername string `json:"-"`
ClusterPassword string `json:"-"`
ClusterAuthTimeout float64 `json:"auth_timeout"`
Routes []*route `json:"-"`
Routes []*url.URL `json:"-"`
}
type authorization struct {
@@ -108,15 +108,14 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.ClusterAuthTimeout = auth.timeout
case "routes":
ra := mv.([]interface{})
opts.Routes = make([]*route, 0, len(ra))
opts.Routes = make([]*url.URL, 0, len(ra))
for _, r := range ra {
routeUrl := r.(string)
url, err := url.Parse(routeUrl)
if err != nil {
return fmt.Errorf("Error parsing route url [%q]", routeUrl)
}
route := &route{url: url}
opts.Routes = append(opts.Routes, route)
opts.Routes = append(opts.Routes, url)
}
}
}

View File

@@ -1,3 +1,4 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server

View File

@@ -3,11 +3,36 @@
package server
import (
"bufio"
"fmt"
"net"
"net/url"
"sync"
"sync/atomic"
"time"
)
type route struct {
mu sync.Mutex
url *url.URL
mu sync.Mutex
rid uint64
conn net.Conn
bw *bufio.Writer
srv *Server
url *url.URL
atmr *time.Timer
ptmr *time.Timer
pout int
parseState
stats
}
func (r *route) String() string {
return fmt.Sprintf("rid:%d", r.rid)
}
func (s *Server) createRoute(conn net.Conn) *route {
r := &route{srv: s, conn: conn}
r.rid = atomic.AddUint64(&s.grid, 1)
r.bw = bufio.NewWriterSize(conn, defaultBufSize)
return r
}

View File

@@ -43,6 +43,9 @@ type Server struct {
done chan bool
start time.Time
stats
routeListener net.Listener
grid uint64
}
type stats struct {
@@ -95,6 +98,10 @@ func New(opts *Options) *Server {
s.handleSignals()
Logf("Starting nats-server version %s", VERSION)
s.running = true
return s
}
@@ -140,11 +147,22 @@ func (s *Server) Shutdown() {
clients[i] = c
}
// Kick AcceptLoop()
// Number of done channel responses we expect.
doneExpected := 0
// Kick client AcceptLoop()
if s.listener != nil {
doneExpected++
s.listener.Close()
s.listener = nil
}
if s.routeListener != nil {
doneExpected++
s.routeListener.Close()
s.routeListener = nil
}
s.mu.Unlock()
// Close client connections
@@ -152,13 +170,16 @@ func (s *Server) Shutdown() {
c.closeConnection()
}
<-s.done
// Block until the accept loops exit
for doneExpected > 0 {
<-s.done
doneExpected--
}
}
func (s *Server) AcceptLoop() {
Logf("Starting nats-server version %s on port %d", VERSION, s.opts.Port)
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port)
Logf("Listening for client connections on %s", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
Fatalf("Error listening on port: %d - %v", s.opts.Port, e)
@@ -170,7 +191,6 @@ func (s *Server) AcceptLoop() {
// Setup state that can enable shutdown
s.mu.Lock()
s.listener = l
s.running = true
s.mu.Unlock()
tmpDelay := ACCEPT_MIN_SLEEP
@@ -179,7 +199,7 @@ func (s *Server) AcceptLoop() {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
Debug("Temporary Accept Error(%v), sleeping %dms",
Debug("Temporary Client Accept Error(%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)
time.Sleep(tmpDelay)
tmpDelay *= 2
@@ -194,8 +214,53 @@ func (s *Server) AcceptLoop() {
tmpDelay = ACCEPT_MIN_SLEEP
s.createClient(conn)
}
s.done <- true
Log("Server Exiting..")
s.done <- true
}
func (s *Server) routeAcceptLoop() {
hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort)
Logf("Listening for route connections on %s", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
Fatalf("Error listening on router port: %d - %v", s.opts.Port, e)
return
}
// Setup state that can enable shutdown
s.mu.Lock()
s.routeListener = l
s.mu.Unlock()
tmpDelay := ACCEPT_MIN_SLEEP
for s.isRunning() {
_, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
Debug("Temporary Route Accept Error(%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)
time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else if s.isRunning() {
Logf("Accept error: %v", err)
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
// s.createRoute(conn)
}
Debug("Router accept loop exiting..")
s.done <- true
}
// StartCluster will start the accept loop on the cluster host:port
// and will actively try to connect to listed routes.
func (s *Server) StartCluster() {
go s.routeAcceptLoop()
}
func (s *Server) StartHTTPMonitoring() {

32
test/configs/cluster.conf Normal file
View File

@@ -0,0 +1,32 @@
# Cluster config file
port: 4242
#net: apcera.me # net interface
authorization {
user: derek
password: bella
timeout: 1
}
cluster {
host: '127.0.0.1'
port: 4244
authorization {
user: route_user
password: top_secret
timeout: 1
}
# Routes are actively solicited and connected to from this server.
# Other servers can connect to us if they supply the correct credentials
# in their routes definitions from above.
routes = [
nats-route://foo:bar@apcera.me:4245
nats-route://foo:bar@apcera.me:4246
]
}

View File

@@ -9,26 +9,27 @@ import (
)
func TestSimpleGoServerShutdown(t *testing.T) {
s := runDefaultServer()
base := runtime.NumGoroutine()
s := runDefaultServer()
s.Shutdown()
time.Sleep(10 * time.Millisecond)
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
if delta > 1 {
t.Fatalf("%d Go routines still exist post Shutdown()", delta)
}
}
func TestGoServerShutdownWithClients(t *testing.T) {
base := runtime.NumGoroutine()
s := runDefaultServer()
for i := 0 ; i < 10 ; i++ {
createClientConn(t, "localhost", 4222)
}
base := runtime.NumGoroutine()
s.Shutdown()
time.Sleep(10 * time.Millisecond)
// Wait longer for client connections
time.Sleep(50 * time.Millisecond)
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
if delta > 1 {
t.Fatalf("%d Go routines still exist post Shutdown()", delta)
}
}

View File

@@ -46,7 +46,12 @@ func RunServer(opts *server.Options) *server.Server {
}
s := server.New(opts)
if s == nil {
panic("No nats server object returned.")
panic("No NATS Server object returned.")
}
// Start up clustering as well if needed.
if opts.ClusterPort != 0 {
s.StartCluster()
}
go s.AcceptLoop()
@@ -64,7 +69,7 @@ func RunServer(opts *server.Options) *server.Server {
conn.Close()
return s
}
panic("Unable to start NATs Server in Go Routine")
panic("Unable to start NATS Server in Go Routine")
}
func startServer(t tLogger, port int, other string) *natsServer {