mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Release route connect Go routines on shutdown
This commit is contained in:
@@ -49,7 +49,7 @@ const (
|
||||
DEFAULT_ROUTE_CONNECT = 1 * time.Second
|
||||
|
||||
// Route dial timeout
|
||||
DEFAULT_ROUTE_DIAL = 2 * time.Second
|
||||
DEFAULT_ROUTE_DIAL = 1 * time.Second
|
||||
|
||||
// Default size of proto to print on parse errors
|
||||
PROTO_SNIPPET_SIZE = 32
|
||||
|
||||
@@ -76,6 +76,8 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client {
|
||||
|
||||
Debug("Route connection created", clientConnStr(c.nc), c.cid)
|
||||
|
||||
c.mu.Lock()
|
||||
|
||||
// Queue Connect proto if we solicited the connection.
|
||||
if didSolicit {
|
||||
r.url = rUrl
|
||||
@@ -91,6 +93,7 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client {
|
||||
ttl := secondsToDuration(s.opts.ClusterAuthTimeout)
|
||||
c.setAuthTimer(ttl)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// Register with the server.
|
||||
s.mu.Lock()
|
||||
@@ -260,19 +263,21 @@ func (s *Server) StartRouting() {
|
||||
s.solicitRoutes()
|
||||
}
|
||||
|
||||
// FIXME(dlc): Need to shutdown when exiting
|
||||
func (s *Server) connectToRoute(rUrl *url.URL) {
|
||||
for s.isRunning() {
|
||||
Debugf("Trying to connect to route on %s", rUrl.Host)
|
||||
conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL)
|
||||
if err != nil {
|
||||
Debugf("Error trying to connect to route: %v", err)
|
||||
// FIXME(dlc): wait on kick out
|
||||
time.Sleep(DEFAULT_ROUTE_CONNECT)
|
||||
continue
|
||||
select {
|
||||
case <-s.rcQuit:
|
||||
return
|
||||
case <-time.After(DEFAULT_ROUTE_CONNECT):
|
||||
continue
|
||||
}
|
||||
}
|
||||
// We have a connection here. Go ahead and create it and
|
||||
// exit this func.
|
||||
// We have a route connection here.
|
||||
// Go ahead and create it and exit this func.
|
||||
s.createRoute(conn, rUrl)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ type Server struct {
|
||||
grid uint64
|
||||
routeInfo Info
|
||||
routeInfoJson []byte
|
||||
rcQuit chan bool
|
||||
}
|
||||
|
||||
type stats struct {
|
||||
@@ -92,6 +93,10 @@ func New(opts *Options) *Server {
|
||||
// For tracking routes
|
||||
s.routes = make(map[uint64]*client)
|
||||
|
||||
// Used to kick out all of the route
|
||||
// connect Go routines.
|
||||
s.rcQuit = make(chan bool)
|
||||
|
||||
// Generate the info json
|
||||
b, err := json.Marshal(s.info)
|
||||
if err != nil {
|
||||
@@ -160,12 +165,16 @@ func (s *Server) Shutdown() {
|
||||
s.listener = nil
|
||||
}
|
||||
|
||||
// Kick route AcceptLoop()
|
||||
if s.routeListener != nil {
|
||||
doneExpected++
|
||||
s.routeListener.Close()
|
||||
s.routeListener = nil
|
||||
}
|
||||
|
||||
// Release the solicited routes connect go routines.
|
||||
close(s.rcQuit)
|
||||
|
||||
s.mu.Unlock()
|
||||
|
||||
// Close client connections
|
||||
|
||||
@@ -5,6 +5,7 @@ package test
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -18,9 +19,9 @@ func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
|
||||
|
||||
// Override for running in Go routine.
|
||||
opts.NoSigs = true
|
||||
opts.Debug = true
|
||||
opts.Trace = true
|
||||
//opts.NoLog = true
|
||||
opts.Debug = true
|
||||
opts.Trace = true
|
||||
opts.NoLog = true
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Error parsing config file: %v\n", err)
|
||||
@@ -44,6 +45,7 @@ func TestRouteGoServerShutdown(t *testing.T) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
delta := (runtime.NumGoroutine() - base)
|
||||
if delta > 1 {
|
||||
panic("foo")
|
||||
t.Fatalf("%d Go routines still exist post Shutdown()", delta)
|
||||
}
|
||||
}
|
||||
@@ -210,7 +212,7 @@ func TestRouteOnlySendOnce(t *testing.T) {
|
||||
client := createClientConn(t, opts.Host, opts.Port)
|
||||
defer client.Close()
|
||||
|
||||
clientSend, _ := setupConn(t, client)
|
||||
clientSend, clientExpect := setupConn(t, client)
|
||||
|
||||
route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
expectAuthRequired(t, route)
|
||||
@@ -220,9 +222,13 @@ func TestRouteOnlySendOnce(t *testing.T) {
|
||||
// Express multiple interest on this route for foo.
|
||||
routeSend("SUB foo RSID:2:1\r\n")
|
||||
routeSend("SUB foo RSID:2:2\r\n")
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
|
||||
// Send PUB via client connection
|
||||
clientSend("PUB foo 2\r\nok\r\n")
|
||||
clientSend("PING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
matches := expectMsgs(1)
|
||||
checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok")
|
||||
@@ -339,12 +345,13 @@ func TestMultipleRoutesSameId(t *testing.T) {
|
||||
|
||||
route1 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
expectAuthRequired(t, route1)
|
||||
route1Send, route1Expect := setupRouteEx(t, route1, opts, "ROUTE:2222")
|
||||
route1ExpectMsgs := expectMsgsCommand(t, route1Expect)
|
||||
route1Send, _ := setupRouteEx(t, route1, opts, "ROUTE:2222")
|
||||
// route1ExpectMsgs := expectMsgsCommand(t, route1Expect)
|
||||
|
||||
route2 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
expectAuthRequired(t, route2)
|
||||
route2Send, _ := setupRouteEx(t, route2, opts, "ROUTE:2222")
|
||||
// route2ExpectMsgs := expectMsgsCommand(t, route1Expect)
|
||||
|
||||
// Send SUB via route connections
|
||||
sub := "SUB foo RSID:2:22\r\n"
|
||||
@@ -360,17 +367,33 @@ func TestMultipleRoutesSameId(t *testing.T) {
|
||||
|
||||
// Setup a client
|
||||
client := createClientConn(t, opts.Host, opts.Port)
|
||||
clientSend, _ := setupConn(t, client)
|
||||
clientSend, clientExpect := setupConn(t, client)
|
||||
defer client.Close()
|
||||
|
||||
// Send PUB via client connection
|
||||
clientSend("PUB foo 2\r\nok\r\n")
|
||||
clientSend("PING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
// We should only receive on the first route.
|
||||
route1ExpectMsgs(1)
|
||||
// We should only receive on one route, not both.
|
||||
// Check both manually.
|
||||
route1.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
buf, _ := ioutil.ReadAll(route1)
|
||||
route1.SetReadDeadline(time.Time{})
|
||||
if len(buf) <= 0 {
|
||||
route2.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
buf, _ = ioutil.ReadAll(route2)
|
||||
route2.SetReadDeadline(time.Time{})
|
||||
if len(buf) <= 0 {
|
||||
t.Fatal("Expected to get one message on a route, received none.")
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing on the second.
|
||||
expectNothing(t, route2)
|
||||
matches := msgRe.FindAllSubmatch(buf, -1)
|
||||
if len(matches) != 1 {
|
||||
t.Fatalf("Expected 1 msg, got %d\n", len(matches))
|
||||
}
|
||||
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
||||
}
|
||||
|
||||
func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
|
||||
|
||||
@@ -286,7 +286,7 @@ var expBuf = make([]byte, 32768)
|
||||
// Test result from server against regexp
|
||||
func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
|
||||
// Wait for commands to be processed and results queued for read
|
||||
c.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
c.SetReadDeadline(time.Now().Add(1 * time.Second))
|
||||
defer c.SetReadDeadline(time.Time{})
|
||||
|
||||
n, err := c.Read(expBuf)
|
||||
|
||||
Reference in New Issue
Block a user