Files
nats-server/server/server_test.go
Ivan Kozlovic b4128693ed Ensure file path is correct during stream restore
Also had to change all references from `path.` to `filepath.` when
dealing with files, so that it works properly on Windows.

Fixed also lots of tests to defer the shutdown of the server
after the removal of the storage, and fixed some config files
directories to use the single quote `'` to surround the file path,
again to work on Windows.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-03-09 13:31:51 -07:00

2000 lines
54 KiB
Go

// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/url"
"os"
"reflect"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/nats.go"
)
func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
t.Helper()
timeout := time.Now().Add(totalWait)
var err error
for time.Now().Before(timeout) {
err = f()
if err == nil {
return
}
time.Sleep(sleepDur)
}
if err != nil {
t.Fatal(err.Error())
}
}
func DefaultOptions() *Options {
return &Options{
Host: "127.0.0.1",
Port: -1,
HTTPPort: -1,
Cluster: ClusterOpts{Port: -1, Name: "abc"},
NoLog: true,
NoSigs: true,
Debug: true,
Trace: true,
}
}
// New Go Routine based server
func RunServer(opts *Options) *Server {
if opts == nil {
opts = DefaultOptions()
}
s, err := NewServer(opts)
if err != nil || s == nil {
panic(fmt.Sprintf("No NATS Server object returned: %v", err))
}
if !opts.NoLog {
s.ConfigureLogger()
}
// Run server in Go routine.
go s.Start()
// Wait for accept loop(s) to be started
if err := s.readyForConnections(10 * time.Second); err != nil {
panic(err)
}
return s
}
// LoadConfig loads a configuration from a filename
func LoadConfig(configFile string) (opts *Options) {
opts, err := ProcessConfigFile(configFile)
if err != nil {
panic(fmt.Sprintf("Error processing configuration file: %v", err))
}
opts.NoSigs, opts.NoLog = true, opts.LogFile == _EMPTY_
return
}
// RunServerWithConfig starts a new Go routine based server with a configuration file.
func RunServerWithConfig(configFile string) (srv *Server, opts *Options) {
opts = LoadConfig(configFile)
srv = RunServer(opts)
return
}
func TestVersionMatchesTag(t *testing.T) {
tag := os.Getenv("TRAVIS_TAG")
// Travis started to return '' when no tag is set. Support both now.
if tag == "" || tag == "''" {
t.SkipNow()
}
// We expect a tag of the form vX.Y.Z. If that's not the case,
// we need someone to have a look. So fail if first letter is not
// a `v`
if tag[0] != 'v' {
t.Fatalf("Expect tag to start with `v`, tag is: %s", tag)
}
// Strip the `v` from the tag for the version comparison.
if VERSION != tag[1:] {
t.Fatalf("Version (%s) does not match tag (%s)", VERSION, tag[1:])
}
}
func TestStartProfiler(t *testing.T) {
s := New(DefaultOptions())
s.StartProfiler()
s.mu.Lock()
s.profiler.Close()
s.mu.Unlock()
}
func TestStartupAndShutdown(t *testing.T) {
opts := DefaultOptions()
opts.NoSystemAccount = true
s := RunServer(opts)
defer s.Shutdown()
if !s.isRunning() {
t.Fatal("Could not run server")
}
// Debug stuff.
numRoutes := s.NumRoutes()
if numRoutes != 0 {
t.Fatalf("Expected numRoutes to be 0 vs %d\n", numRoutes)
}
numRemotes := s.NumRemotes()
if numRemotes != 0 {
t.Fatalf("Expected numRemotes to be 0 vs %d\n", numRemotes)
}
numClients := s.NumClients()
if numClients != 0 && numClients != 1 {
t.Fatalf("Expected numClients to be 1 or 0 vs %d\n", numClients)
}
numSubscriptions := s.NumSubscriptions()
if numSubscriptions != 0 {
t.Fatalf("Expected numSubscriptions to be 0 vs %d\n", numSubscriptions)
}
}
func TestTLSVersions(t *testing.T) {
for _, test := range []struct {
name string
value uint16
expected string
}{
{"1.0", tls.VersionTLS10, "1.0"},
{"1.1", tls.VersionTLS11, "1.1"},
{"1.2", tls.VersionTLS12, "1.2"},
{"1.3", tls.VersionTLS13, "1.3"},
{"unknown", 0x999, "Unknown [0x999]"},
} {
t.Run(test.name, func(t *testing.T) {
if v := tlsVersion(test.value); v != test.expected {
t.Fatalf("Expected value 0x%x to be %q, got %q", test.value, test.expected, v)
}
})
}
}
func TestTlsCipher(t *testing.T) {
if strings.Compare(tlsCipher(0x0005), "TLS_RSA_WITH_RC4_128_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x000a), "TLS_RSA_WITH_3DES_EDE_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x002f), "TLS_RSA_WITH_AES_128_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x0035), "TLS_RSA_WITH_AES_256_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc007), "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc009), "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc00a), "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc011), "TLS_ECDHE_RSA_WITH_RC4_128_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc012), "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc013), "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc014), "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA") != 0 {
t.Fatalf("IUnknownnvalid tls cipher")
}
if strings.Compare(tlsCipher(0xc02f), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc02b), "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc030), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc02c), "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x1301), "TLS_AES_128_GCM_SHA256") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x1302), "TLS_AES_256_GCM_SHA384") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x1303), "TLS_CHACHA20_POLY1305_SHA256") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x9999), "Unknown [0x9999]") != 0 {
t.Fatalf("Expected an unknown cipher")
}
}
func TestGetConnectURLs(t *testing.T) {
opts := DefaultOptions()
opts.Port = 4222
var globalIP net.IP
checkGlobalConnectURLs := func() {
s := New(opts)
defer s.Shutdown()
s.mu.Lock()
urls := s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) == 0 {
t.Fatalf("Expected to get a list of urls, got none for listen addr: %v", opts.Host)
}
for _, u := range urls {
tcpaddr, err := net.ResolveTCPAddr("tcp", u)
if err != nil {
t.Fatalf("Error resolving: %v", err)
}
ip := tcpaddr.IP
if !ip.IsGlobalUnicast() {
t.Fatalf("IP %v is not global", ip.String())
}
if ip.IsUnspecified() {
t.Fatalf("IP %v is unspecified", ip.String())
}
addr := strings.TrimSuffix(u, ":4222")
if addr == opts.Host {
t.Fatalf("Returned url is not right: %v", u)
}
if globalIP == nil {
globalIP = ip
}
}
}
listenAddrs := []string{"0.0.0.0", "::"}
for _, listenAddr := range listenAddrs {
opts.Host = listenAddr
checkGlobalConnectURLs()
}
checkConnectURLsHasOnlyOne := func() {
s := New(opts)
defer s.Shutdown()
s.mu.Lock()
urls := s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) != 1 {
t.Fatalf("Expected one URL, got %v", urls)
}
tcpaddr, err := net.ResolveTCPAddr("tcp", urls[0])
if err != nil {
t.Fatalf("Error resolving: %v", err)
}
ip := tcpaddr.IP
if ip.String() != opts.Host {
t.Fatalf("Expected connect URL to be %v, got %v", opts.Host, ip.String())
}
}
singleConnectReturned := []string{"127.0.0.1", "::1"}
if globalIP != nil {
singleConnectReturned = append(singleConnectReturned, globalIP.String())
}
for _, listenAddr := range singleConnectReturned {
opts.Host = listenAddr
checkConnectURLsHasOnlyOne()
}
}
func TestInfoServerNameDefaultsToPK(t *testing.T) {
opts := DefaultOptions()
opts.Port = 4222
opts.ClientAdvertise = "nats.example.com"
s := New(opts)
defer s.Shutdown()
if s.info.Name != s.info.ID {
t.Fatalf("server info hostname is incorrect, got: '%v' expected: '%v'", s.info.Name, s.info.ID)
}
}
func TestInfoServerNameIsSettable(t *testing.T) {
opts := DefaultOptions()
opts.Port = 4222
opts.ClientAdvertise = "nats.example.com"
opts.ServerName = "test_server_name"
s := New(opts)
defer s.Shutdown()
if s.info.Name != "test_server_name" {
t.Fatalf("server info hostname is incorrect, got: '%v' expected: 'test_server_name'", s.info.Name)
}
}
func TestClientAdvertiseConnectURL(t *testing.T) {
opts := DefaultOptions()
opts.Port = 4222
opts.ClientAdvertise = "nats.example.com"
s := New(opts)
defer s.Shutdown()
s.mu.Lock()
urls := s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) != 1 {
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
opts.Host, opts.ClientAdvertise)
}
if urls[0] != "nats.example.com:4222" {
t.Fatalf("Expected to get '%s', got: '%v'", "nats.example.com:4222", urls[0])
}
s.Shutdown()
opts.ClientAdvertise = "nats.example.com:7777"
s = New(opts)
s.mu.Lock()
urls = s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) != 1 {
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
opts.Host, opts.ClientAdvertise)
}
if urls[0] != "nats.example.com:7777" {
t.Fatalf("Expected 'nats.example.com:7777', got: '%v'", urls[0])
}
if s.info.Host != "nats.example.com" {
t.Fatalf("Expected host to be set to nats.example.com")
}
if s.info.Port != 7777 {
t.Fatalf("Expected port to be set to 7777")
}
s.Shutdown()
opts = DefaultOptions()
opts.Port = 0
opts.ClientAdvertise = "nats.example.com:7777"
s = New(opts)
if s.info.Host != "nats.example.com" && s.info.Port != 7777 {
t.Fatalf("Expected Client Advertise Host:Port to be nats.example.com:7777, got: %s:%d",
s.info.Host, s.info.Port)
}
s.Shutdown()
}
func TestClientAdvertiseInCluster(t *testing.T) {
optsA := DefaultOptions()
optsA.ClientAdvertise = "srvA:4222"
srvA := RunServer(optsA)
defer srvA.Shutdown()
nc := natsConnect(t, srvA.ClientURL())
defer nc.Close()
optsB := DefaultOptions()
optsB.ClientAdvertise = "srvBC:4222"
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Cluster.Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
checkURLs := func(expected string) {
t.Helper()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
srvs := nc.DiscoveredServers()
for _, u := range srvs {
if u == expected {
return nil
}
}
return fmt.Errorf("Url %q not found in %q", expected, srvs)
})
}
checkURLs("nats://srvBC:4222")
optsC := DefaultOptions()
optsC.ClientAdvertise = "srvBC:4222"
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Cluster.Port))
srvC := RunServer(optsC)
defer srvC.Shutdown()
checkClusterFormed(t, srvA, srvB, srvC)
checkURLs("nats://srvBC:4222")
srvB.Shutdown()
checkNumRoutes(t, srvA, 1)
checkURLs("nats://srvBC:4222")
}
func TestClientAdvertiseErrorOnStartup(t *testing.T) {
opts := DefaultOptions()
// Set invalid address
opts.ClientAdvertise = "addr:::123"
testFatalErrorOnStart(t, opts, "ClientAdvertise")
}
func TestNoDeadlockOnStartFailure(t *testing.T) {
opts := DefaultOptions()
opts.Host = "x.x.x.x" // bad host
opts.Port = 4222
opts.HTTPHost = opts.Host
opts.Cluster.Host = "127.0.0.1"
opts.Cluster.Port = -1
opts.ProfPort = -1
s := New(opts)
// This should return since it should fail to start a listener
// on x.x.x.x:4222
ch := make(chan struct{})
go func() {
s.Start()
close(ch)
}()
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Start() should have returned due to failure to start listener")
}
// We should be able to shutdown
s.Shutdown()
}
func TestMaxConnections(t *testing.T) {
opts := DefaultOptions()
opts.MaxConn = 1
s := RunServer(opts)
defer s.Shutdown()
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(addr)
if err != nil {
t.Fatalf("Error creating client: %v\n", err)
}
defer nc.Close()
nc2, err := nats.Connect(addr)
if err == nil {
nc2.Close()
t.Fatal("Expected connection to fail")
}
}
func TestMaxSubscriptions(t *testing.T) {
opts := DefaultOptions()
opts.MaxSubs = 10
s := RunServer(opts)
defer s.Shutdown()
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(addr)
if err != nil {
t.Fatalf("Error creating client: %v\n", err)
}
defer nc.Close()
for i := 0; i < 10; i++ {
_, err := nc.Subscribe(fmt.Sprintf("foo.%d", i), func(*nats.Msg) {})
if err != nil {
t.Fatalf("Error subscribing: %v\n", err)
}
}
// This should cause the error.
nc.Subscribe("foo.22", func(*nats.Msg) {})
nc.Flush()
if err := nc.LastError(); err == nil {
t.Fatal("Expected an error but got none\n")
}
}
func TestProcessCommandLineArgs(t *testing.T) {
var host string
var port int
cmd := flag.NewFlagSet("nats-server", flag.ExitOnError)
cmd.StringVar(&host, "a", "0.0.0.0", "Host.")
cmd.IntVar(&port, "p", 4222, "Port.")
cmd.Parse([]string{"-a", "127.0.0.1", "-p", "9090"})
showVersion, showHelp, err := ProcessCommandLineArgs(cmd)
if err != nil {
t.Errorf("Expected no errors, got: %s", err)
}
if showVersion || showHelp {
t.Errorf("Expected not having to handle subcommands")
}
cmd.Parse([]string{"version"})
showVersion, showHelp, err = ProcessCommandLineArgs(cmd)
if err != nil {
t.Errorf("Expected no errors, got: %s", err)
}
if !showVersion {
t.Errorf("Expected having to handle version command")
}
if showHelp {
t.Errorf("Expected not having to handle help command")
}
cmd.Parse([]string{"help"})
showVersion, showHelp, err = ProcessCommandLineArgs(cmd)
if err != nil {
t.Errorf("Expected no errors, got: %s", err)
}
if showVersion {
t.Errorf("Expected not having to handle version command")
}
if !showHelp {
t.Errorf("Expected having to handle help command")
}
cmd.Parse([]string{"foo", "-p", "9090"})
_, _, err = ProcessCommandLineArgs(cmd)
if err == nil {
t.Errorf("Expected an error handling the command arguments")
}
}
func TestRandomPorts(t *testing.T) {
opts := DefaultOptions()
opts.HTTPPort = -1
opts.Port = -1
s := RunServer(opts)
defer s.Shutdown()
if s.Addr() == nil || s.Addr().(*net.TCPAddr).Port <= 0 {
t.Fatal("Should have dynamically assigned server port.")
}
if s.Addr() == nil || s.Addr().(*net.TCPAddr).Port == 4222 {
t.Fatal("Should not have dynamically assigned default port: 4222.")
}
if s.MonitorAddr() == nil || s.MonitorAddr().Port <= 0 {
t.Fatal("Should have dynamically assigned monitoring port.")
}
}
func TestNilMonitoringPort(t *testing.T) {
opts := DefaultOptions()
opts.HTTPPort = 0
opts.HTTPSPort = 0
s := RunServer(opts)
defer s.Shutdown()
if s.MonitorAddr() != nil {
t.Fatal("HttpAddr should be nil.")
}
}
type DummyAuth struct {
t *testing.T
needNonce bool
}
func (d *DummyAuth) Check(c ClientAuthentication) bool {
if d.needNonce && len(c.GetNonce()) == 0 {
d.t.Fatalf("Expected a nonce but received none")
} else if !d.needNonce && len(c.GetNonce()) > 0 {
d.t.Fatalf("Received a nonce when none was expected")
}
return c.GetOpts().Username == "valid"
}
func TestCustomClientAuthentication(t *testing.T) {
testAuth := func(t *testing.T, nonce bool) {
clientAuth := &DummyAuth{t, nonce}
opts := DefaultOptions()
opts.CustomClientAuthentication = clientAuth
opts.AlwaysEnableNonce = nonce
s := RunServer(opts)
defer s.Shutdown()
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(addr, nats.UserInfo("valid", ""))
if err != nil {
t.Fatalf("Expected client to connect, got: %s", err)
}
nc.Close()
if _, err := nats.Connect(addr, nats.UserInfo("invalid", "")); err == nil {
t.Fatal("Expected client to fail to connect")
}
}
t.Run("with nonce", func(t *testing.T) { testAuth(t, true) })
t.Run("without nonce", func(t *testing.T) { testAuth(t, false) })
}
func TestCustomRouterAuthentication(t *testing.T) {
opts := DefaultOptions()
opts.CustomRouterAuthentication = &DummyAuth{}
opts.Cluster.Host = "127.0.0.1"
s := RunServer(opts)
defer s.Shutdown()
clusterPort := s.ClusterAddr().Port
opts2 := DefaultOptions()
opts2.Cluster.Host = "127.0.0.1"
opts2.Routes = RoutesFromStr(fmt.Sprintf("nats://invalid@127.0.0.1:%d", clusterPort))
s2 := RunServer(opts2)
defer s2.Shutdown()
// s2 will attempt to connect to s, which should reject.
// Keep in mind that s2 will try again...
time.Sleep(50 * time.Millisecond)
checkNumRoutes(t, s2, 0)
opts3 := DefaultOptions()
opts3.Cluster.Host = "127.0.0.1"
opts3.Routes = RoutesFromStr(fmt.Sprintf("nats://valid@127.0.0.1:%d", clusterPort))
s3 := RunServer(opts3)
defer s3.Shutdown()
checkClusterFormed(t, s, s3)
checkNumRoutes(t, s3, 1)
}
func TestMonitoringNoTimeout(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
s.mu.Lock()
srv := s.monitoringServer
s.mu.Unlock()
if srv == nil {
t.Fatalf("Monitoring server not set")
}
if srv.ReadTimeout != 0 {
t.Fatalf("ReadTimeout should not be set, was set to %v", srv.ReadTimeout)
}
if srv.WriteTimeout != 0 {
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
}
}
func TestProfilingNoTimeout(t *testing.T) {
opts := DefaultOptions()
opts.ProfPort = -1
s := RunServer(opts)
defer s.Shutdown()
paddr := s.ProfilerAddr()
if paddr == nil {
t.Fatalf("Profiler not started")
}
pport := paddr.Port
if pport <= 0 {
t.Fatalf("Expected profiler port to be set, got %v", pport)
}
s.mu.Lock()
srv := s.profilingServer
s.mu.Unlock()
if srv == nil {
t.Fatalf("Profiling server not set")
}
if srv.ReadTimeout != 0 {
t.Fatalf("ReadTimeout should not be set, was set to %v", srv.ReadTimeout)
}
if srv.WriteTimeout != 0 {
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
}
}
func TestLameDuckOptionsValidation(t *testing.T) {
o := DefaultOptions()
o.LameDuckDuration = 5 * time.Second
o.LameDuckGracePeriod = 10 * time.Second
s, err := NewServer(o)
if s != nil {
s.Shutdown()
}
if err == nil || !strings.Contains(err.Error(), "should be strictly lower") {
t.Fatalf("Expected error saying that ldm grace period should be lower than ldm duration, got %v", err)
}
}
func testSetLDMGracePeriod(o *Options, val time.Duration) {
// For tests, we set the grace period as a negative value
// so we can have a grace period bigger than the total duration.
// When validating options, we would not be able to run the
// server without this trick.
o.LameDuckGracePeriod = val * -1
}
func TestLameDuckMode(t *testing.T) {
optsA := DefaultOptions()
testSetLDMGracePeriod(optsA, time.Nanosecond)
optsA.Cluster.Host = "127.0.0.1"
srvA := RunServer(optsA)
defer srvA.Shutdown()
// Check that if there is no client, server is shutdown
srvA.lameDuckMode()
srvA.mu.Lock()
shutdown := srvA.shutdown
srvA.mu.Unlock()
if !shutdown {
t.Fatalf("Server should have shutdown")
}
optsA.LameDuckDuration = 10 * time.Nanosecond
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
total := 50
connectClients := func() []*nats.Conn {
ncs := make([]*nats.Conn, 0, total)
for i := 0; i < total; i++ {
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port),
nats.ReconnectWait(50*time.Millisecond))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
ncs = append(ncs, nc)
}
return ncs
}
stopClientsAndSrvB := func(ncs []*nats.Conn) {
for _, nc := range ncs {
nc.Close()
}
srvB.Shutdown()
}
ncs := connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
start := time.Now()
srvA.lameDuckMode()
// Make sure that nothing bad happens if called twice
srvA.lameDuckMode()
// Wait that shutdown completes
elapsed := time.Since(start)
// It should have taken more than the allotted time of 10ms since we had 50 clients.
if elapsed <= optsA.LameDuckDuration {
t.Fatalf("Expected to take more than %v, got %v", optsA.LameDuckDuration, elapsed)
}
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
// Check closed status on server A
// Connections are saved in go routines, so although we have evaluated the number
// of connections in the server A to be 0, the polling of connection closed may
// need a bit more time.
checkFor(t, time.Second, 15*time.Millisecond, func() error {
cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed})
if n := len(cz.Conns); n != total {
return fmt.Errorf("expected %v closed connections, got %v", total, n)
}
return nil
})
cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed})
if n := len(cz.Conns); n != total {
t.Fatalf("Expected %v closed connections, got %v", total, n)
}
for _, c := range cz.Conns {
checkReason(t, c.Reason, ServerShutdown)
}
stopClientsAndSrvB(ncs)
optsA.LameDuckDuration = time.Second
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB = RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
ncs = connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
start = time.Now()
go srvA.lameDuckMode()
// Check that while in lameDuckMode, it is not possible to connect
// to the server. Wait to be in LD mode first
checkFor(t, 500*time.Millisecond, 15*time.Millisecond, func() error {
srvA.mu.Lock()
ldm := srvA.ldm
srvA.mu.Unlock()
if !ldm {
return fmt.Errorf("Did not reach lame duck mode")
}
return nil
})
if _, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)); err != nats.ErrNoServers {
t.Fatalf("Expected %v, got %v", nats.ErrNoServers, err)
}
srvA.grWG.Wait()
elapsed = time.Since(start)
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
if elapsed > time.Duration(float64(optsA.LameDuckDuration)*1.1) {
t.Fatalf("Expected to not take more than %v, got %v", optsA.LameDuckDuration, elapsed)
}
stopClientsAndSrvB(ncs)
// Now check that we can shutdown server while in LD mode.
optsA.LameDuckDuration = 60 * time.Second
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB = RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
ncs = connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
start = time.Now()
go srvA.lameDuckMode()
time.Sleep(100 * time.Millisecond)
srvA.Shutdown()
elapsed = time.Since(start)
// Make sure that it did not take that long
if elapsed > time.Second {
t.Fatalf("Took too long: %v", elapsed)
}
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
stopClientsAndSrvB(ncs)
// Now test that we introduce delay before starting closing client connections.
// This allow to "signal" multiple servers and avoid their clients to reconnect
// to a server that is going to be going in LD mode.
testSetLDMGracePeriod(optsA, 100*time.Millisecond)
optsA.LameDuckDuration = 10 * time.Millisecond
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
testSetLDMGracePeriod(optsB, 100*time.Millisecond)
optsB.LameDuckDuration = 10 * time.Millisecond
srvB = RunServer(optsB)
defer srvB.Shutdown()
optsC := DefaultOptions()
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
testSetLDMGracePeriod(optsC, 100*time.Millisecond)
optsC.LameDuckGracePeriod = -100 * time.Millisecond
optsC.LameDuckDuration = 10 * time.Millisecond
srvC := RunServer(optsC)
defer srvC.Shutdown()
checkClusterFormed(t, srvA, srvB, srvC)
rt := int32(0)
nc, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Port),
nats.ReconnectWait(15*time.Millisecond),
nats.ReconnectHandler(func(*nats.Conn) {
atomic.AddInt32(&rt, 1)
}))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
go srvA.lameDuckMode()
// Wait a bit, but less than lameDuckModeInitialDelay that we set in this
// test to 100ms.
time.Sleep(30 * time.Millisecond)
go srvB.lameDuckMode()
srvA.grWG.Wait()
srvB.grWG.Wait()
checkClientsCount(t, srvC, 1)
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
if n := atomic.LoadInt32(&rt); n != 1 {
return fmt.Errorf("Expected client to reconnect only once, got %v", n)
}
return nil
})
}
func TestLameDuckModeInfo(t *testing.T) {
optsA := testWSOptions()
optsA.Cluster.Name = "abc"
optsA.Cluster.Host = "127.0.0.1"
optsA.Cluster.Port = -1
// Ensure that initial delay is set very high so that we can
// check that some events occur as expected before the client
// is disconnected.
testSetLDMGracePeriod(optsA, 5*time.Second)
optsA.LameDuckDuration = 50 * time.Millisecond
optsA.DisableShortFirstPing = true
srvA := RunServer(optsA)
defer srvA.Shutdown()
curla := fmt.Sprintf("127.0.0.1:%d", optsA.Port)
wscurla := fmt.Sprintf("127.0.0.1:%d", optsA.Websocket.Port)
c, err := net.Dial("tcp", curla)
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer c.Close()
client := bufio.NewReaderSize(c, maxBufSize)
wsconn, wsclient := testWSCreateClient(t, false, false, optsA.Websocket.Host, optsA.Websocket.Port)
defer wsconn.Close()
getInfo := func(ws bool) *serverInfo {
t.Helper()
var l string
var err error
if ws {
l = string(testWSReadFrame(t, wsclient))
} else {
l, err = client.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
}
}
var info serverInfo
if err = json.Unmarshal([]byte(l[5:]), &info); err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
return &info
}
getInfo(false)
c.Write([]byte("CONNECT {\"protocol\":1,\"verbose\":false}\r\nPING\r\n"))
client.ReadString('\n')
optsB := testWSOptions()
optsB.Cluster.Name = "abc"
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
checkConnectURLs := func(expected [][]string) *serverInfo {
t.Helper()
var si *serverInfo
for i, ws := range []bool{false, true} {
sort.Strings(expected[i])
si = getInfo(ws)
sort.Strings(si.ConnectURLs)
if !reflect.DeepEqual(expected[i], si.ConnectURLs) {
t.Fatalf("Expected %q, got %q", expected, si.ConnectURLs)
}
}
return si
}
curlb := fmt.Sprintf("127.0.0.1:%d", optsB.Port)
wscurlb := fmt.Sprintf("127.0.0.1:%d", optsB.Websocket.Port)
expected := [][]string{{curla, curlb}, {wscurla, wscurlb}}
checkConnectURLs(expected)
optsC := testWSOptions()
testSetLDMGracePeriod(optsA, 5*time.Second)
optsC.Cluster.Name = "abc"
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvC := RunServer(optsC)
defer srvC.Shutdown()
checkClusterFormed(t, srvA, srvB, srvC)
curlc := fmt.Sprintf("127.0.0.1:%d", optsC.Port)
wscurlc := fmt.Sprintf("127.0.0.1:%d", optsC.Websocket.Port)
expected = [][]string{{curla, curlb, curlc}, {wscurla, wscurlb, wscurlc}}
checkConnectURLs(expected)
optsD := testWSOptions()
optsD.Cluster.Name = "abc"
optsD.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvD := RunServer(optsD)
defer srvD.Shutdown()
checkClusterFormed(t, srvA, srvB, srvC, srvD)
curld := fmt.Sprintf("127.0.0.1:%d", optsD.Port)
wscurld := fmt.Sprintf("127.0.0.1:%d", optsD.Websocket.Port)
expected = [][]string{{curla, curlb, curlc, curld}, {wscurla, wscurlb, wscurlc, wscurld}}
checkConnectURLs(expected)
// Now lame duck server A and C. We should have client connected to A
// receive info that A is in LDM without A's URL, but also receive
// an update with C's URL gone.
// But first we need to create a client to C because otherwise the
// LDM signal will just shut it down because it would have no client.
nc, err := nats.Connect(srvC.ClientURL())
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer nc.Close()
nc.Flush()
start := time.Now()
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
srvA.lameDuckMode()
}()
expected = [][]string{{curlb, curlc, curld}, {wscurlb, wscurlc, wscurld}}
si := checkConnectURLs(expected)
if !si.LameDuckMode {
t.Fatal("Expected LameDuckMode to be true, it was not")
}
// Start LDM for server C. This should send an update to A
// which in turn should remove C from the list of URLs and
// update its client.
go func() {
defer wg.Done()
srvC.lameDuckMode()
}()
expected = [][]string{{curlb, curld}, {wscurlb, wscurld}}
si = checkConnectURLs(expected)
// This update should not say that it is LDM.
if si.LameDuckMode {
t.Fatal("Expected LameDuckMode to be false, it was true")
}
// Now shutdown D, and we also should get an update.
srvD.Shutdown()
expected = [][]string{{curlb}, {wscurlb}}
si = checkConnectURLs(expected)
// This update should not say that it is LDM.
if si.LameDuckMode {
t.Fatal("Expected LameDuckMode to be false, it was true")
}
if time.Since(start) > 2*time.Second {
t.Fatalf("Did not get the expected events prior of server A and C shutting down")
}
// Now explicitly shutdown srvA. When a server shutdown, it closes all its
// connections. For routes, it means that it is going to remove the remote's
// URL from its map. We want to make sure that in that case, server does not
// actually send an updated INFO to its clients.
srvA.Shutdown()
// Expect nothing to be received on the client connection.
if l, err := client.ReadString('\n'); err == nil {
t.Fatalf("Expected connection to fail, instead got %q", l)
}
c.Close()
nc.Close()
// Don't need to wait for actual disconnect of clients.
srvC.Shutdown()
wg.Wait()
}
func TestServerValidateGatewaysOptions(t *testing.T) {
baseOpt := testDefaultOptionsForGateway("A")
u, _ := url.Parse("host:5222")
g := &RemoteGatewayOpts{
URLs: []*url.URL{u},
}
baseOpt.Gateway.Gateways = append(baseOpt.Gateway.Gateways, g)
for _, test := range []struct {
name string
opts func() *Options
expectedErr string
}{
{
name: "gateway_has_no_name",
opts: func() *Options {
o := baseOpt.Clone()
o.Gateway.Name = ""
return o
},
expectedErr: "has no name",
},
{
name: "gateway_has_no_port",
opts: func() *Options {
o := baseOpt.Clone()
o.Gateway.Port = 0
return o
},
expectedErr: "no port specified",
},
{
name: "gateway_dst_has_no_name",
opts: func() *Options {
o := baseOpt.Clone()
return o
},
expectedErr: "has no name",
},
{
name: "gateway_dst_urls_is_nil",
opts: func() *Options {
o := baseOpt.Clone()
o.Gateway.Gateways[0].Name = "B"
o.Gateway.Gateways[0].URLs = nil
return o
},
expectedErr: "has no URL",
},
{
name: "gateway_dst_urls_is_empty",
opts: func() *Options {
o := baseOpt.Clone()
o.Gateway.Gateways[0].Name = "B"
o.Gateway.Gateways[0].URLs = []*url.URL{}
return o
},
expectedErr: "has no URL",
},
} {
t.Run(test.name, func(t *testing.T) {
if err := validateOptions(test.opts()); err == nil || !strings.Contains(err.Error(), test.expectedErr) {
t.Fatalf("Expected error about %q, got %v", test.expectedErr, err)
}
})
}
}
func TestAcceptError(t *testing.T) {
o := DefaultOptions()
s := New(o)
s.mu.Lock()
s.running = true
s.mu.Unlock()
defer s.Shutdown()
orgDelay := time.Hour
delay := s.acceptError("Test", fmt.Errorf("any error"), orgDelay)
if delay != orgDelay {
t.Fatalf("With this type of error, delay should have stayed same, got %v", delay)
}
// Create any net.Error and make it a temporary
ne := &net.DNSError{IsTemporary: true}
orgDelay = 10 * time.Millisecond
delay = s.acceptError("Test", ne, orgDelay)
if delay != 2*orgDelay {
t.Fatalf("Expected delay to double, got %v", delay)
}
// Now check the max
orgDelay = 60 * ACCEPT_MAX_SLEEP / 100
delay = s.acceptError("Test", ne, orgDelay)
if delay != ACCEPT_MAX_SLEEP {
t.Fatalf("Expected delay to double, got %v", delay)
}
wg := sync.WaitGroup{}
wg.Add(1)
start := time.Now()
go func() {
s.acceptError("Test", ne, orgDelay)
wg.Done()
}()
time.Sleep(100 * time.Millisecond)
// This should kick out the sleep in acceptError
s.Shutdown()
if dur := time.Since(start); dur >= ACCEPT_MAX_SLEEP {
t.Fatalf("Shutdown took too long: %v", dur)
}
wg.Wait()
if d := s.acceptError("Test", ne, orgDelay); d >= 0 {
t.Fatalf("Expected delay to be negative, got %v", d)
}
}
func TestServerShutdownDuringStart(t *testing.T) {
o := DefaultOptions()
o.ServerName = "server"
o.DisableShortFirstPing = true
o.Accounts = []*Account{NewAccount("$SYS")}
o.SystemAccount = "$SYS"
o.Cluster.Name = "abc"
o.Cluster.Host = "127.0.0.1"
o.Cluster.Port = -1
o.Gateway.Name = "abc"
o.Gateway.Host = "127.0.0.1"
o.Gateway.Port = -1
o.LeafNode.Host = "127.0.0.1"
o.LeafNode.Port = -1
o.Websocket.Host = "127.0.0.1"
o.Websocket.Port = -1
o.Websocket.HandshakeTimeout = 1
o.Websocket.NoTLS = true
o.MQTT.Host = "127.0.0.1"
o.MQTT.Port = -1
// We are going to test that if the server is shutdown
// while Start() runs (in this case, before), we don't
// start the listeners and therefore leave accept loops
// hanging.
s, err := NewServer(o)
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
s.Shutdown()
// Start() should not block, but just in case, start in
// different go routine.
ch := make(chan struct{}, 1)
go func() {
s.Start()
// Since the server has already been shutdown and we don't want to leave
// the ipqLog run() routine running, stop it now.
s.ipqLog.stop()
close(ch)
}()
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("Start appear to have blocked after server was shutdown")
}
// Now make sure that none of the listeners have been created
listeners := []string{}
s.mu.Lock()
if s.listener != nil {
listeners = append(listeners, "client")
}
if s.routeListener != nil {
listeners = append(listeners, "route")
}
if s.gatewayListener != nil {
listeners = append(listeners, "gateway")
}
if s.leafNodeListener != nil {
listeners = append(listeners, "leafnode")
}
if s.websocket.listener != nil {
listeners = append(listeners, "websocket")
}
if s.mqtt.listener != nil {
listeners = append(listeners, "mqtt")
}
s.mu.Unlock()
if len(listeners) > 0 {
lst := ""
for i, l := range listeners {
if i > 0 {
lst += ", "
}
lst += l
}
t.Fatalf("Following listeners have been created: %s", lst)
}
}
type myDummyDNSResolver struct {
ips []string
err error
}
func (r *myDummyDNSResolver) LookupHost(ctx context.Context, host string) ([]string, error) {
if r.err != nil {
return nil, r.err
}
return r.ips, nil
}
func TestGetRandomIP(t *testing.T) {
s := &Server{}
resolver := &myDummyDNSResolver{}
// no port...
if _, err := s.getRandomIP(resolver, "noport", nil); err == nil || !strings.Contains(err.Error(), "port") {
t.Fatalf("Expected error about port missing, got %v", err)
}
resolver.err = fmt.Errorf("on purpose")
if _, err := s.getRandomIP(resolver, "localhost:4222", nil); err == nil || !strings.Contains(err.Error(), "on purpose") {
t.Fatalf("Expected error about no port, got %v", err)
}
resolver.err = nil
a, err := s.getRandomIP(resolver, "localhost:4222", nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if a != "localhost:4222" {
t.Fatalf("Expected address to be %q, got %q", "localhost:4222", a)
}
resolver.ips = []string{"1.2.3.4"}
a, err = s.getRandomIP(resolver, "localhost:4222", nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if a != "1.2.3.4:4222" {
t.Fatalf("Expected address to be %q, got %q", "1.2.3.4:4222", a)
}
// Check for randomness
resolver.ips = []string{"1.2.3.4", "2.2.3.4", "3.2.3.4"}
dist := [3]int{}
for i := 0; i < 100; i++ {
ip, err := s.getRandomIP(resolver, "localhost:4222", nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
v := int(ip[0]-'0') - 1
dist[v]++
}
low := 20
high := 47
for i, d := range dist {
if d == 0 || d == 100 {
t.Fatalf("Unexpected distribution for ip %v, got %v", i, d)
} else if d < low || d > high {
t.Logf("Warning: out of expected range [%v,%v] for ip %v, got %v", low, high, i, d)
}
}
// Check IP exclusions
excludedIPs := map[string]struct{}{"1.2.3.4:4222": {}}
for i := 0; i < 100; i++ {
ip, err := s.getRandomIP(resolver, "localhost:4222", excludedIPs)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ip[0] == '1' {
t.Fatalf("Should not have returned this ip: %q", ip)
}
}
excludedIPs["2.2.3.4:4222"] = struct{}{}
for i := 0; i < 100; i++ {
ip, err := s.getRandomIP(resolver, "localhost:4222", excludedIPs)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ip[0] != '3' {
t.Fatalf("Should only have returned '3.2.3.4', got returned %q", ip)
}
}
excludedIPs["3.2.3.4:4222"] = struct{}{}
for i := 0; i < 100; i++ {
if _, err := s.getRandomIP(resolver, "localhost:4222", excludedIPs); err != errNoIPAvail {
t.Fatalf("Unexpected error: %v", err)
}
}
// Now check that exclusion takes into account the port number.
resolver.ips = []string{"127.0.0.1"}
excludedIPs = map[string]struct{}{"127.0.0.1:4222": {}}
for i := 0; i < 100; i++ {
if _, err := s.getRandomIP(resolver, "localhost:4223", excludedIPs); err == errNoIPAvail {
t.Fatal("Should not have failed")
}
}
}
type shortWriteConn struct {
net.Conn
}
func (swc *shortWriteConn) Write(b []byte) (int, error) {
// Limit the write to 10 bytes at a time.
short := false
max := len(b)
if max > 10 {
max = 10
short = true
}
n, err := swc.Conn.Write(b[:max])
if err == nil && short {
return n, io.ErrShortWrite
}
return n, err
}
func TestClientWriteLoopStall(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
errCh := make(chan error, 1)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url,
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) {
select {
case errCh <- e:
default:
}
}))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc.Flush()
cid, _ := nc.GetClientID()
sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()
c := s.getClient(cid)
c.mu.Lock()
c.nc = &shortWriteConn{Conn: c.nc}
c.mu.Unlock()
sender.Publish("foo", make([]byte, 100))
if _, err := sub.NextMsg(3 * time.Second); err != nil {
t.Fatalf("WriteLoop has stalled!")
}
// Make sure that we did not get any async error
select {
case e := <-errCh:
t.Fatalf("Got error: %v", e)
case <-time.After(250 * time.Millisecond):
}
}
func TestInsecureSkipVerifyWarning(t *testing.T) {
checkWarnReported := func(t *testing.T, o *Options, expectedWarn string) {
t.Helper()
s, err := NewServer(o)
if err != nil {
t.Fatalf("Error on new server: %v", err)
}
l := &captureWarnLogger{warn: make(chan string, 1)}
s.SetLogger(l, false, false)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()
if err := s.readyForConnections(time.Second); err != nil {
t.Fatal(err)
}
select {
case w := <-l.warn:
if !strings.Contains(w, expectedWarn) {
t.Fatalf("Expected warning %q, got %q", expectedWarn, w)
}
case <-time.After(2 * time.Second):
t.Fatalf("Did not get warning %q", expectedWarn)
}
s.Shutdown()
wg.Wait()
}
tc := &TLSConfigOpts{}
tc.CertFile = "../test/configs/certs/server-cert.pem"
tc.KeyFile = "../test/configs/certs/server-key.pem"
tc.CaFile = "../test/configs/certs/ca.pem"
tc.Insecure = true
config, err := GenTLSConfig(tc)
if err != nil {
t.Fatalf("Error generating tls config: %v", err)
}
o := DefaultOptions()
o.Cluster.Name = "A"
o.Cluster.Port = -1
o.Cluster.TLSConfig = config.Clone()
checkWarnReported(t, o, clusterTLSInsecureWarning)
// Remove the route setting
o.Cluster.Port = 0
o.Cluster.TLSConfig = nil
// Configure LeafNode with no TLS in the main block first, but only with remotes.
o.LeafNode.Port = -1
rurl, _ := url.Parse("nats://127.0.0.1:1234")
o.LeafNode.Remotes = []*RemoteLeafOpts{
{
URLs: []*url.URL{rurl},
TLSConfig: config.Clone(),
},
}
checkWarnReported(t, o, leafnodeTLSInsecureWarning)
// Now add to main block.
o.LeafNode.TLSConfig = config.Clone()
checkWarnReported(t, o, leafnodeTLSInsecureWarning)
// Now remove remote and check warning still reported
o.LeafNode.Remotes = nil
checkWarnReported(t, o, leafnodeTLSInsecureWarning)
// Remove the LN setting
o.LeafNode.Port = 0
o.LeafNode.TLSConfig = nil
// Configure GW with no TLS in main block first, but only with remotes
o.Gateway.Name = "A"
o.Gateway.Host = "127.0.0.1"
o.Gateway.Port = -1
o.Gateway.Gateways = []*RemoteGatewayOpts{
{
Name: "B",
URLs: []*url.URL{rurl},
TLSConfig: config.Clone(),
},
}
checkWarnReported(t, o, gatewayTLSInsecureWarning)
// Now add to main block.
o.Gateway.TLSConfig = config.Clone()
checkWarnReported(t, o, gatewayTLSInsecureWarning)
// Now remove remote and check warning still reported
o.Gateway.Gateways = nil
checkWarnReported(t, o, gatewayTLSInsecureWarning)
}
func TestConnectErrorReports(t *testing.T) {
// On Windows, an attempt to connect to a port that has no listener will
// take whatever timeout specified in DialTimeout() before failing.
// So skip for now.
if runtime.GOOS == "windows" {
t.Skip()
}
// Check that default report attempts is as expected
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
if ra := s.getOpts().ConnectErrorReports; ra != DEFAULT_CONNECT_ERROR_REPORTS {
t.Fatalf("Expected default value to be %v, got %v", DEFAULT_CONNECT_ERROR_REPORTS, ra)
}
tmpFile := createFile(t, "")
log := tmpFile.Name()
tmpFile.Close()
defer removeFile(t, log)
remoteURLs := RoutesFromStr("nats://127.0.0.1:1234")
opts = DefaultOptions()
opts.ConnectErrorReports = 3
opts.Cluster.Port = -1
opts.Routes = remoteURLs
opts.NoLog = false
opts.LogFile = log
opts.Logtime = true
opts.Debug = true
s = RunServer(opts)
defer s.Shutdown()
checkContent := func(t *testing.T, txt string, attempt int, shouldBeThere bool) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
content, err := ioutil.ReadFile(log)
if err != nil {
return fmt.Errorf("Error reading log file: %v", err)
}
present := bytes.Contains(content, []byte(fmt.Sprintf("%s (attempt %d)", txt, attempt)))
if shouldBeThere && !present {
return fmt.Errorf("Did not find expected log statement (%s) for attempt %d: %s", txt, attempt, content)
} else if !shouldBeThere && present {
return fmt.Errorf("Log statement (%s) for attempt %d should not be present: %s", txt, attempt, content)
}
return nil
})
}
type testConnect struct {
name string
attempt int
errExpected bool
}
for _, test := range []testConnect{
{"route_attempt_1", 1, true},
{"route_attempt_2", 2, false},
{"route_attempt_3", 3, true},
{"route_attempt_4", 4, false},
{"route_attempt_6", 6, true},
{"route_attempt_7", 7, false},
} {
t.Run(test.name, func(t *testing.T) {
debugExpected := !test.errExpected
checkContent(t, "[DBG] Error trying to connect to route", test.attempt, debugExpected)
checkContent(t, "[ERR] Error trying to connect to route", test.attempt, test.errExpected)
})
}
s.Shutdown()
removeFile(t, log)
// Now try with leaf nodes
opts.Cluster.Port = 0
opts.Routes = nil
opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{remoteURLs[0]}}}
opts.LeafNode.ReconnectInterval = 15 * time.Millisecond
s = RunServer(opts)
defer s.Shutdown()
checkLeafContent := func(t *testing.T, txt, host string, attempt int, shouldBeThere bool) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
content, err := ioutil.ReadFile(log)
if err != nil {
return fmt.Errorf("Error reading log file: %v", err)
}
present := bytes.Contains(content, []byte(fmt.Sprintf("%s %q (attempt %d)", txt, host, attempt)))
if shouldBeThere && !present {
return fmt.Errorf("Did not find expected log statement (%s %q) for attempt %d: %s", txt, host, attempt, content)
} else if !shouldBeThere && present {
return fmt.Errorf("Log statement (%s %q) for attempt %d should not be present: %s", txt, host, attempt, content)
}
return nil
})
}
for _, test := range []testConnect{
{"leafnode_attempt_1", 1, true},
{"leafnode_attempt_2", 2, false},
{"leafnode_attempt_3", 3, true},
{"leafnode_attempt_4", 4, false},
{"leafnode_attempt_6", 6, true},
{"leafnode_attempt_7", 7, false},
} {
t.Run(test.name, func(t *testing.T) {
debugExpected := !test.errExpected
checkLeafContent(t, "[DBG] Error trying to connect as leafnode to remote server", remoteURLs[0].Host, test.attempt, debugExpected)
checkLeafContent(t, "[ERR] Error trying to connect as leafnode to remote server", remoteURLs[0].Host, test.attempt, test.errExpected)
})
}
s.Shutdown()
removeFile(t, log)
// Now try with gateways
opts.LeafNode.Remotes = nil
opts.Cluster.Name = "A"
opts.Gateway.Name = "A"
opts.Gateway.Port = -1
opts.Gateway.Gateways = []*RemoteGatewayOpts{
{
Name: "B",
URLs: remoteURLs,
},
}
opts.gatewaysSolicitDelay = 15 * time.Millisecond
s = RunServer(opts)
defer s.Shutdown()
for _, test := range []testConnect{
{"gateway_attempt_1", 1, true},
{"gateway_attempt_2", 2, false},
{"gateway_attempt_3", 3, true},
{"gateway_attempt_4", 4, false},
{"gateway_attempt_6", 6, true},
{"gateway_attempt_7", 7, false},
} {
t.Run(test.name, func(t *testing.T) {
debugExpected := !test.errExpected
infoExpected := test.errExpected
// For gateways, we also check our notice that we attempt to connect
checkContent(t, "[DBG] Connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, debugExpected)
checkContent(t, "[INF] Connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, infoExpected)
checkContent(t, "[DBG] Error connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, debugExpected)
checkContent(t, "[ERR] Error connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, test.errExpected)
})
}
}
func TestReconnectErrorReports(t *testing.T) {
// On Windows, an attempt to connect to a port that has no listener will
// take whatever timeout specified in DialTimeout() before failing.
// So skip for now.
if runtime.GOOS == "windows" {
t.Skip()
}
// Check that default report attempts is as expected
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
if ra := s.getOpts().ReconnectErrorReports; ra != DEFAULT_RECONNECT_ERROR_REPORTS {
t.Fatalf("Expected default value to be %v, got %v", DEFAULT_RECONNECT_ERROR_REPORTS, ra)
}
tmpFile := createFile(t, "")
log := tmpFile.Name()
tmpFile.Close()
defer removeFile(t, log)
csOpts := DefaultOptions()
csOpts.Cluster.Port = -1
cs := RunServer(csOpts)
defer cs.Shutdown()
opts = DefaultOptions()
opts.ReconnectErrorReports = 3
opts.Cluster.Port = -1
opts.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", cs.ClusterAddr().Port))
opts.NoLog = false
opts.LogFile = log
opts.Logtime = true
opts.Debug = true
s = RunServer(opts)
defer s.Shutdown()
// Wait for cluster to be formed
checkClusterFormed(t, s, cs)
// Now shutdown the server s connected to.
cs.Shutdown()
// Specifically for route test, wait at least reconnect interval before checking logs
time.Sleep(DEFAULT_ROUTE_RECONNECT)
checkContent := func(t *testing.T, txt string, attempt int, shouldBeThere bool) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
content, err := ioutil.ReadFile(log)
if err != nil {
return fmt.Errorf("Error reading log file: %v", err)
}
present := bytes.Contains(content, []byte(fmt.Sprintf("%s (attempt %d)", txt, attempt)))
if shouldBeThere && !present {
return fmt.Errorf("Did not find expected log statement (%s) for attempt %d: %s", txt, attempt, content)
} else if !shouldBeThere && present {
return fmt.Errorf("Log statement (%s) for attempt %d should not be present: %s", txt, attempt, content)
}
return nil
})
}
type testConnect struct {
name string
attempt int
errExpected bool
}
for _, test := range []testConnect{
{"route_attempt_1", 1, true},
{"route_attempt_2", 2, false},
{"route_attempt_3", 3, true},
{"route_attempt_4", 4, false},
{"route_attempt_6", 6, true},
{"route_attempt_7", 7, false},
} {
t.Run(test.name, func(t *testing.T) {
debugExpected := !test.errExpected
checkContent(t, "[DBG] Error trying to connect to route", test.attempt, debugExpected)
checkContent(t, "[ERR] Error trying to connect to route", test.attempt, test.errExpected)
})
}
s.Shutdown()
removeFile(t, log)
// Now try with leaf nodes
csOpts.Cluster.Port = 0
csOpts.LeafNode.Host = "127.0.0.1"
csOpts.LeafNode.Port = -1
cs = RunServer(csOpts)
defer cs.Shutdown()
opts.Cluster.Port = 0
opts.Routes = nil
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", csOpts.LeafNode.Port))
opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
opts.LeafNode.ReconnectInterval = 15 * time.Millisecond
s = RunServer(opts)
defer s.Shutdown()
checkLeafNodeConnected(t, s)
// Now shutdown the server s is connected to
cs.Shutdown()
checkLeafContent := func(t *testing.T, txt, host string, attempt int, shouldBeThere bool) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
content, err := ioutil.ReadFile(log)
if err != nil {
return fmt.Errorf("Error reading log file: %v", err)
}
present := bytes.Contains(content, []byte(fmt.Sprintf("%s %q (attempt %d)", txt, host, attempt)))
if shouldBeThere && !present {
return fmt.Errorf("Did not find expected log statement (%s %q) for attempt %d: %s", txt, host, attempt, content)
} else if !shouldBeThere && present {
return fmt.Errorf("Log statement (%s %q) for attempt %d should not be present: %s", txt, host, attempt, content)
}
return nil
})
}
for _, test := range []testConnect{
{"leafnode_attempt_1", 1, true},
{"leafnode_attempt_2", 2, false},
{"leafnode_attempt_3", 3, true},
{"leafnode_attempt_4", 4, false},
{"leafnode_attempt_6", 6, true},
{"leafnode_attempt_7", 7, false},
} {
t.Run(test.name, func(t *testing.T) {
debugExpected := !test.errExpected
checkLeafContent(t, "[DBG] Error trying to connect as leafnode to remote server", u.Host, test.attempt, debugExpected)
checkLeafContent(t, "[ERR] Error trying to connect as leafnode to remote server", u.Host, test.attempt, test.errExpected)
})
}
s.Shutdown()
removeFile(t, log)
// Now try with gateways
csOpts.LeafNode.Port = 0
csOpts.Cluster.Name = "B"
csOpts.Gateway.Name = "B"
csOpts.Gateway.Port = -1
cs = RunServer(csOpts)
opts.LeafNode.Remotes = nil
opts.Cluster.Name = "A"
opts.Gateway.Name = "A"
opts.Gateway.Port = -1
remoteGWPort := cs.GatewayAddr().Port
u, _ = url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", remoteGWPort))
opts.Gateway.Gateways = []*RemoteGatewayOpts{
{
Name: "B",
URLs: []*url.URL{u},
},
}
opts.gatewaysSolicitDelay = 15 * time.Millisecond
s = RunServer(opts)
defer s.Shutdown()
waitForOutboundGateways(t, s, 1, 2*time.Second)
waitForInboundGateways(t, s, 1, 2*time.Second)
// Now stop server s is connecting to
cs.Shutdown()
connTxt := fmt.Sprintf("Connecting to explicit gateway \"B\" (127.0.0.1:%d) at 127.0.0.1:%d", remoteGWPort, remoteGWPort)
dbgConnTxt := fmt.Sprintf("[DBG] %s", connTxt)
infConnTxt := fmt.Sprintf("[INF] %s", connTxt)
errTxt := fmt.Sprintf("Error connecting to explicit gateway \"B\" (127.0.0.1:%d) at 127.0.0.1:%d", remoteGWPort, remoteGWPort)
dbgErrTxt := fmt.Sprintf("[DBG] %s", errTxt)
errErrTxt := fmt.Sprintf("[ERR] %s", errTxt)
for _, test := range []testConnect{
{"gateway_attempt_1", 1, true},
{"gateway_attempt_2", 2, false},
{"gateway_attempt_3", 3, true},
{"gateway_attempt_4", 4, false},
{"gateway_attempt_6", 6, true},
{"gateway_attempt_7", 7, false},
} {
t.Run(test.name, func(t *testing.T) {
debugExpected := !test.errExpected
infoExpected := test.errExpected
// For gateways, we also check our notice that we attempt to connect
checkContent(t, dbgConnTxt, test.attempt, debugExpected)
checkContent(t, infConnTxt, test.attempt, infoExpected)
checkContent(t, dbgErrTxt, test.attempt, debugExpected)
checkContent(t, errErrTxt, test.attempt, test.errExpected)
})
}
}
func TestServerLogsConfigurationFile(t *testing.T) {
tmpDir := createDir(t, "_nats-server")
defer removeDir(t, tmpDir)
file := createFileAtDir(t, tmpDir, "nats_server_log_")
file.Close()
conf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
logfile: '%s'
`, file.Name())))
defer removeFile(t, conf)
o := LoadConfig(conf)
o.ConfigFile = file.Name()
o.NoLog = false
s := RunServer(o)
s.Shutdown()
log, err := ioutil.ReadFile(file.Name())
if err != nil {
t.Fatalf("Error reading log file: %v", err)
}
if !bytes.Contains(log, []byte(fmt.Sprintf("Using configuration file: %s", file.Name()))) {
t.Fatalf("Config file location was not reported in log: %s", log)
}
}
func TestServerIPQueueLogger(t *testing.T) {
o := DefaultOptions()
s := RunServer(o)
defer s.Shutdown()
l := &captureWarnLogger{warn: make(chan string, 100)}
s.SetLogger(l, false, false)
q := newIPQueue(ipQueue_Logger("test", s.ipqLog))
// Normally, lt is immutable and set to ipQueueDefaultWarnThreshold, but
// for test, we set it to a low value.
q.lt = 2
q.push(1)
// This one should case a warning
q.push(2)
for {
select {
case w := <-l.warn:
// In case we get other warnings a runtime, just check that we
// get the one we expect and be done.
if strings.Contains(w, "test queue") {
if strings.Contains(w, "test queue pending size: 2") {
return
}
t.Fatalf("Invalid warning: %v", w)
}
case <-time.After(time.Second):
t.Fatal("Did not get warning")
}
}
}