mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Add TestRouteNoLeakOnSlowConsumer and TestRouteNoLeakOnAuthTimeout
Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
@@ -14,9 +14,11 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
@@ -1755,3 +1757,81 @@ func TestRouteSaveTLSName(t *testing.T) {
|
||||
reloadUpdateConfig(t, s2, c2And3Conf, fmt.Sprintf(tmpl, "localhost", o1.Cluster.Port))
|
||||
checkClusterFormed(t, s1, s2, s3)
|
||||
}
|
||||
|
||||
func TestRouteNoLeakOnSlowConsumer(t *testing.T) {
|
||||
o1 := DefaultOptions()
|
||||
s1 := RunServer(o1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
o2 := DefaultOptions()
|
||||
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
|
||||
s2 := RunServer(o2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, s1, s2)
|
||||
|
||||
// For any route connections on the first server, drop the write
|
||||
// deadline down and then get the client to try sending something.
|
||||
// This should result in an effectively immediate write timeout,
|
||||
// which will surface as a slow consumer.
|
||||
s1.mu.Lock()
|
||||
for _, cli := range s1.routes {
|
||||
cli.out.wdl = time.Nanosecond
|
||||
cli.sendRTTPing()
|
||||
}
|
||||
s1.mu.Unlock()
|
||||
|
||||
// By now the routes should have gone down, so check that there
|
||||
// aren't any routes listed still.
|
||||
checkFor(t, time.Millisecond*500, time.Millisecond*25, func() error {
|
||||
if nc := s1.NumRoutes(); nc != 0 {
|
||||
return fmt.Errorf("Server 1 should have no route connections, got %v", nc)
|
||||
}
|
||||
if nc := s2.NumRoutes(); nc != 0 {
|
||||
return fmt.Errorf("Server 2 should have no route connections, got %v", nc)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestRouteNoLeakOnAuthTimeout(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.Cluster.Username = "foo"
|
||||
opts.Cluster.Password = "bar"
|
||||
opts.AuthTimeout = 0.01 // Deliberately short timeout
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Cluster.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error connecting: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
cr := bufio.NewReader(c)
|
||||
|
||||
// Wait for INFO...
|
||||
line, _, _ := cr.ReadLine()
|
||||
var info serverInfo
|
||||
if err = json.Unmarshal(line[5:], &info); err != nil {
|
||||
t.Fatalf("Could not parse INFO json: %v\n", err)
|
||||
}
|
||||
|
||||
// The server will send a PING, too
|
||||
line, _, _ = cr.ReadLine()
|
||||
if string(line) != "PING" {
|
||||
t.Fatalf("Expected 'PING' but got %q", line)
|
||||
}
|
||||
|
||||
// Wait out the clock so we hit the auth timeout
|
||||
time.Sleep(secondsToDuration(opts.AuthTimeout) * 2)
|
||||
line, _, _ = cr.ReadLine()
|
||||
if string(line) != "-ERR 'Authentication Timeout'" {
|
||||
t.Fatalf("Expected '-ERR 'Authentication Timeout'' but got %q", line)
|
||||
}
|
||||
|
||||
// There shouldn't be a route entry as we didn't set up.
|
||||
if nc := s.NumRoutes(); nc != 0 {
|
||||
t.Fatalf("Server should have no route connections, got %v", nc)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user