Merge pull request #1446 from nats-io/ldm_websocket

LameDuckMode takes into account websocket accept loop
This commit is contained in:
Ivan Kozlovic
2020-06-02 18:30:03 -06:00
committed by GitHub
4 changed files with 69 additions and 40 deletions

View File

@@ -2706,14 +2706,23 @@ func (s *Server) lameDuckMode() {
}
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
s.ldmCh = make(chan bool, 1)
expected := 1
s.listener.Close()
s.listener = nil
if s.websocket.server != nil {
expected++
s.websocket.server.Close()
s.websocket.server = nil
s.websocket.listener = nil
}
s.ldmCh = make(chan bool, expected)
s.mu.Unlock()
// Wait for accept loop to be done to make sure that no new
// Wait for accept loops to be done to make sure that no new
// client can connect
<-s.ldmCh
for i := 0; i < expected; i++ {
<-s.ldmCh
}
s.mu.Lock()
// Need to recheck few things

View File

@@ -884,7 +884,7 @@ func TestLameDuckModeInfo(t *testing.T) {
atomic.StoreInt64(&lameDuckModeInitialDelay, int64(5*time.Second))
defer atomic.StoreInt64(&lameDuckModeInitialDelay, lameDuckModeDefaultInitialDelay)
optsA := DefaultOptions()
optsA := testWSOptions()
optsA.Cluster.Host = "127.0.0.1"
optsA.Cluster.Port = -1
optsA.LameDuckDuration = 50 * time.Millisecond
@@ -893,19 +893,28 @@ func TestLameDuckModeInfo(t *testing.T) {
defer srvA.Shutdown()
curla := fmt.Sprintf("127.0.0.1:%d", optsA.Port)
wscurla := fmt.Sprintf("127.0.0.1:%d", optsA.Websocket.Port)
c, err := net.Dial("tcp", curla)
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer c.Close()
client := bufio.NewReaderSize(c, maxBufSize)
getInfo := func() *serverInfo {
wsconn, wsclient := testWSCreateClient(t, false, false, optsA.Websocket.Host, optsA.Websocket.Port)
defer wsconn.Close()
getInfo := func(ws bool) *serverInfo {
t.Helper()
l, err := client.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
var l string
var err error
if ws {
l = string(testWSReadFrame(t, wsclient))
} else {
l, err = client.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
}
}
var info serverInfo
if err = json.Unmarshal([]byte(l[5:]), &info); err != nil {
@@ -913,33 +922,38 @@ func TestLameDuckModeInfo(t *testing.T) {
}
return &info
}
getInfo()
getInfo(false)
c.Write([]byte("CONNECT {\"protocol\":1,\"verbose\":false}\r\nPING\r\n"))
client.ReadString('\n')
optsB := DefaultOptions()
optsB := testWSOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
checkConnectURLs := func(expected []string) *serverInfo {
checkConnectURLs := func(expected [][]string) *serverInfo {
t.Helper()
sort.Strings(expected)
si := getInfo()
sort.Strings(si.ConnectURLs)
if !reflect.DeepEqual(expected, si.ConnectURLs) {
t.Fatalf("Expected %q, got %q", expected, si.ConnectURLs)
var si *serverInfo
for i, ws := range []bool{false, true} {
sort.Strings(expected[i])
si = getInfo(ws)
sort.Strings(si.ConnectURLs)
if !reflect.DeepEqual(expected[i], si.ConnectURLs) {
t.Fatalf("Expected %q, got %q", expected, si.ConnectURLs)
}
}
return si
}
curlb := fmt.Sprintf("127.0.0.1:%d", optsB.Port)
expected := []string{curla, curlb}
wscurlb := fmt.Sprintf("127.0.0.1:%d", optsB.Websocket.Port)
expected := [][]string{{curla, curlb}, {wscurla, wscurlb}}
checkConnectURLs(expected)
optsC := DefaultOptions()
optsC := testWSOptions()
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvC := RunServer(optsC)
defer srvC.Shutdown()
@@ -947,10 +961,11 @@ func TestLameDuckModeInfo(t *testing.T) {
checkClusterFormed(t, srvA, srvB, srvC)
curlc := fmt.Sprintf("127.0.0.1:%d", optsC.Port)
expected = append(expected, curlc)
wscurlc := fmt.Sprintf("127.0.0.1:%d", optsC.Websocket.Port)
expected = [][]string{{curla, curlb, curlc}, {wscurla, wscurlb, wscurlc}}
checkConnectURLs(expected)
optsD := DefaultOptions()
optsD := testWSOptions()
optsD.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvD := RunServer(optsD)
defer srvD.Shutdown()
@@ -958,7 +973,8 @@ func TestLameDuckModeInfo(t *testing.T) {
checkClusterFormed(t, srvA, srvB, srvC, srvD)
curld := fmt.Sprintf("127.0.0.1:%d", optsD.Port)
expected = append(expected, curld)
wscurld := fmt.Sprintf("127.0.0.1:%d", optsD.Websocket.Port)
expected = [][]string{{curla, curlb, curlc, curld}, {wscurla, wscurlb, wscurlc, wscurld}}
checkConnectURLs(expected)
// Now lame duck server A and C. We should have client connected to A
@@ -981,7 +997,7 @@ func TestLameDuckModeInfo(t *testing.T) {
srvA.lameDuckMode()
}()
expected = []string{curlb, curlc, curld}
expected = [][]string{{curlb, curlc, curld}, {wscurlb, wscurlc, wscurld}}
si := checkConnectURLs(expected)
if !si.LameDuckMode {
t.Fatal("Expected LameDuckMode to be true, it was not")
@@ -995,7 +1011,7 @@ func TestLameDuckModeInfo(t *testing.T) {
srvC.lameDuckMode()
}()
expected = []string{curlb, curld}
expected = [][]string{{curlb, curld}, {wscurlb, wscurld}}
si = checkConnectURLs(expected)
// This update should not say that it is LDM.
if si.LameDuckMode {
@@ -1005,7 +1021,7 @@ func TestLameDuckModeInfo(t *testing.T) {
// Now shutdown D, and we also should get an update.
srvD.Shutdown()
expected = []string{curlb}
expected = [][]string{{curlb}, {wscurlb}}
si = checkConnectURLs(expected)
// This update should not say that it is LDM.
if si.LameDuckMode {

View File

@@ -836,6 +836,13 @@ func (s *Server) startWebsocketServer() {
if err := hs.Serve(hl); err != http.ErrServerClosed {
s.Fatalf("websocket listener error: %v", err)
}
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return
}
s.done <- true
})
}

View File

@@ -1623,7 +1623,7 @@ func testWSCreateClient(t testing.TB, compress, web bool, host string, port int)
}
// Wait for the PONG
if msg := testWSReadFrame(t, br); !bytes.HasPrefix(msg, []byte("PONG\r\n")) {
t.Fatalf("Expected INFO, got %s", msg)
t.Fatalf("Expected PONG, got %s", msg)
}
return wsc, br
}
@@ -2350,20 +2350,17 @@ func TestWSCompressionWithPartialWrite(t *testing.T) {
t.Fatal("Did not get the ping response")
}
ws.mu.Lock()
pb := ws.out.pb
wf := ws.ws.frames
fs := ws.ws.fs
ws.mu.Unlock()
if pb != 0 {
t.Fatalf("Expected pb to be 0, got %v", pb)
}
if len(wf) != 0 {
t.Fatalf("Should not be any frames left to send, got %v", wf)
}
if fs != 0 {
t.Fatalf("Frame size should be 0, got %v", fs)
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
ws.mu.Lock()
pb := ws.out.pb
wf := ws.ws.frames
fs := ws.ws.fs
ws.mu.Unlock()
if pb != 0 || len(wf) != 0 || fs != 0 {
return fmt.Errorf("Expected pb, wf and fs to be 0, got %v, %v, %v", pb, wf, fs)
}
return nil
})
}
func TestWSCompressionFrameSizeLimit(t *testing.T) {