Files
nats-server/test/gateway_test.go
Waldemar Quevedo d366027bbf Fix resetting TLS name from solicited remotes
In +Go 1.20, the x509.HostnameError changed to be wrapped in a
tls.CertificateVerificationError so sometimes the name would not
be reset causing tests to be extra flaky.

Signed-off-by: Waldemar Quevedo <wally@nats.io>
2023-08-28 10:09:55 -07:00

838 lines
23 KiB
Go

// Copyright 2018-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/url"
"regexp"
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)
func testDefaultOptionsForGateway(name string) *server.Options {
o := DefaultTestOptions
o.Port = -1
o.Cluster.Name = name
o.Gateway.Name = name
o.Gateway.Host = "127.0.0.1"
o.Gateway.Port = -1
return &o
}
func runGatewayServer(o *server.Options) *server.Server {
s := RunServer(o)
return s
}
func createGatewayConn(t testing.TB, host string, port int) net.Conn {
t.Helper()
return createClientConn(t, host, port)
}
func setupGatewayConn(t testing.TB, c net.Conn, org, dst string) (sendFun, expectFun) {
t.Helper()
dstInfo := checkInfoMsg(t, c)
if dstInfo.Gateway != dst {
t.Fatalf("Expected to connect to %q, got %q", dst, dstInfo.Gateway)
}
cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"tls_required\":%v,\"gateway\":%q}\r\n",
false, false, false, org)
sendProto(t, c, cs)
sendProto(t, c, fmt.Sprintf("INFO {\"gateway\":%q}\r\n", org))
return sendCommand(t, c), expectCommand(t, c)
}
func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, expected int, ignore ...*regexp.Regexp) {
t.Helper()
buf := []byte(nil)
for count := 0; count != expected; {
buf = append(buf, expFn(anyRe)...)
for _, skip := range ignore {
buf = skip.ReplaceAll(buf, []byte(``))
}
count += len(proto.FindAllSubmatch(buf, -1))
if count > expected {
t.Fatalf("Expected %v matches, got %v", expected, count)
}
buf = proto.ReplaceAll(buf, []byte(``))
}
if len(buf) != 0 {
t.Fatalf("did not consume everything, left with: %q", buf)
}
}
func TestGatewayAccountInterest(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob := testDefaultOptionsForGateway("B")
sb := runGatewayServer(ob)
defer sb.Shutdown()
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gA.Close()
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
gASend("PING\r\n")
gAExpect(pongRe)
// Sending a bunch of messages. On the first, "B" will send an A-
// protocol.
for i := 0; i < 100; i++ {
gASend("RMSG $foo foo 2\r\nok\r\n")
}
// We expect single A- followed by PONG. If "B" was sending more
// this expect call would fail.
gAExpect(aunsubRe)
gASend("PING\r\n")
gAExpect(pongRe)
// Start gateway C that connects to B
gC := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gC.Close()
gCSend, gCExpect := setupGatewayConn(t, gC, "C", "B")
gCSend("PING\r\n")
gCExpect(pongRe)
// Send more messages, C should get A-, but A should not (already
// got it).
for i := 0; i < 100; i++ {
gCSend("RMSG $foo foo 2\r\nok\r\n")
}
gCExpect(aunsubRe)
gCSend("PING\r\n")
gCExpect(pongRe)
expectNothing(t, gA)
// Restart one of the gateway, and resend a message, verify
// that it receives A- (things get cleared on reconnect)
gC.Close()
gC = createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gC.Close()
gCSend, gCExpect = setupGatewayConn(t, gC, "C", "B")
gCSend("PING\r\n")
gCExpect(pongRe)
gCSend("RMSG $foo foo 2\r\nok\r\n")
gCExpect(aunsubRe)
gCSend("PING\r\n")
gCExpect(pongRe)
expectNothing(t, gA)
// Close again and re-create, but this time don't send anything.
gC.Close()
gC = createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gC.Close()
gCSend, gCExpect = setupGatewayConn(t, gC, "C", "B")
gCSend("PING\r\n")
gCExpect(pongRe)
// Now register the $foo account on B and create a subscription,
// A should receive an A+ because B knows that it previously sent
// an A-, but since it did not send one to C, C should not receive
// the A+.
client := createClientConn(t, ob.Host, ob.Port)
defer client.Close()
clientSend, clientExpect := setupConnWithAccount(t, sb, client, "$foo")
clientSend("SUB not.used 1234567\r\nPING\r\n")
clientExpect(pongRe)
gAExpect(asubRe)
expectNothing(t, gC)
}
func TestGatewaySubjectInterest(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob := testDefaultOptionsForGateway("B")
fooAcc := server.NewAccount("$foo")
ob.Accounts = []*server.Account{fooAcc}
ob.Users = []*server.User{{Username: "ivan", Password: "password", Account: fooAcc}}
sb := runGatewayServer(ob)
defer sb.Shutdown()
// Create a client on B
client := createClientConn(t, ob.Host, ob.Port)
defer client.Close()
clientSend, clientExpect := setupConnWithUserPass(t, client, "ivan", "password")
// Since we want to test RS+/-, we need to have at
// least a subscription on B so that sending from A does
// not result in A-
clientSend("SUB not.used 1234567\r\nPING\r\n")
clientExpect(pongRe)
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gA.Close()
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
gASend("PING\r\n")
gAExpect(pongRe)
for i := 0; i < 100; i++ {
gASend("RMSG $foo foo 2\r\nok\r\n")
}
// We expect single RS- followed by PONG. If "B" was sending more
// this expect call would fail.
gAExpect(runsubRe)
gASend("PING\r\n")
gAExpect(pongRe)
// Start gateway C that connects to B
gC := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gC.Close()
gCSend, gCExpect := setupGatewayConn(t, gC, "C", "B")
gCSend("PING\r\n")
gCExpect(pongRe)
// Send more messages, C should get RS-, but A should not (already
// got it).
for i := 0; i < 100; i++ {
gCSend("RMSG $foo foo 2\r\nok\r\n")
}
gCExpect(runsubRe)
gCSend("PING\r\n")
gCExpect(pongRe)
expectNothing(t, gA)
// Restart one of the gateway, and resend a message, verify
// that it receives RS- (things get cleared on reconnect)
gC.Close()
gC = createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gC.Close()
gCSend, gCExpect = setupGatewayConn(t, gC, "C", "B")
gCSend("PING\r\n")
gCExpect(pongRe)
gCSend("RMSG $foo foo 2\r\nok\r\n")
gCExpect(runsubRe)
gCSend("PING\r\n")
gCExpect(pongRe)
expectNothing(t, gA)
// Close again and re-create, but this time don't send anything.
gC.Close()
gC = createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gC.Close()
gCSend, gCExpect = setupGatewayConn(t, gC, "C", "B")
gCSend("PING\r\n")
gCExpect(pongRe)
// Now register a subscription on foo for account $foo on B.
// A should receive a RS+ because B knows that it previously
// sent a RS-, but since it did not send one to C, C should
// not receive the RS+.
clientSend("SUB foo 1\r\nSUB foo 2\r\n")
// Also subscribe to subject that was not used before,
// so there should be no RS+ for this one.
clientSend("SUB bar 3\r\nPING\r\n")
clientExpect(pongRe)
gAExpect(rsubRe)
expectNothing(t, gC)
// Check that we get only one protocol
expectNothing(t, gA)
// Unsubscribe the 2 subs on foo, expect to receive nothing.
clientSend("UNSUB 1\r\nUNSUB 2\r\nPING\r\n")
clientExpect(pongRe)
expectNothing(t, gC)
expectNothing(t, gA)
gC.Close()
// Send on foo, should get an RS-
gASend("RMSG $foo foo 2\r\nok\r\n")
gAExpect(runsubRe)
// Subscribe on foo, should get an RS+ that removes the no-interest
clientSend("SUB foo 4\r\nPING\r\n")
clientExpect(pongRe)
gAExpect(rsubRe)
// Send on bar, message should be received.
gASend("RMSG $foo bar 2\r\nok\r\n")
clientExpect(msgRe)
// Unsub foo and bar
clientSend("UNSUB 3\r\nUNSUB 4\r\nPING\r\n")
clientExpect(pongRe)
expectNothing(t, gA)
// Send on both foo and bar expect RS-
gASend("RMSG $foo foo 2\r\nok\r\n")
gAExpect(runsubRe)
gASend("RMSG $foo bar 2\r\nok\r\n")
gAExpect(runsubRe)
// Now have client create sub on "*", this should cause RS+ on *
// The remote will have cleared its no-interest on foo and bar
// and this receiving side is supposed to be doing the same.
clientSend("SUB * 5\r\nPING\r\n")
clientExpect(pongRe)
buf := gAExpect(rsubRe)
if !bytes.Contains(buf, []byte("$foo *")) {
t.Fatalf("Expected RS+ on %q, got %q", "*", buf)
}
// Check that the remote has cleared by sending from the client
// on foo and bar
clientSend("PUB foo 2\r\nok\r\n")
clientExpect(msgRe)
clientSend("PUB bar 2\r\nok\r\n")
clientExpect(msgRe)
// Check that A can send too and does not receive an RS-
gASend("RMSG $foo foo 2\r\nok\r\n")
expectNothing(t, gA)
clientExpect(msgRe)
gASend("RMSG $foo bar 2\r\nok\r\n")
expectNothing(t, gA)
clientExpect(msgRe)
}
func TestGatewayQueue(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob := testDefaultOptionsForGateway("B")
fooAcc := server.NewAccount("$foo")
ob.Accounts = []*server.Account{fooAcc}
ob.Users = []*server.User{{Username: "ivan", Password: "password", Account: fooAcc}}
sb := runGatewayServer(ob)
defer sb.Shutdown()
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gA.Close()
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
gASend("PING\r\n")
gAExpect(pongRe)
client := createClientConn(t, ob.Host, ob.Port)
defer client.Close()
clientSend, clientExpect := setupConnWithUserPass(t, client, "ivan", "password")
// Create one queue sub on foo.* for group bar.
clientSend("SUB foo.* bar 1\r\nPING\r\n")
clientExpect(pongRe)
// Expect RS+
gAExpect(rsubRe)
// Add another queue sub on same group
clientSend("SUB foo.* bar 2\r\nPING\r\n")
clientExpect(pongRe)
// Should not receive another RS+ for that one
expectNothing(t, gA)
// However, if subject is different, we can expect to receive another RS+
clientSend("SUB foo.> bar 3\r\nPING\r\n")
clientExpect(pongRe)
gAExpect(rsubRe)
// Unsub one of the foo.* qsub, no RS- should be received
clientSend("UNSUB 1\r\nPING\r\n")
clientExpect(pongRe)
expectNothing(t, gA)
// Remove the other one, now we should get the RS-
clientSend("UNSUB 2\r\nPING\r\n")
clientExpect(pongRe)
gAExpect(runsubRe)
// Remove last one
clientSend("UNSUB 3\r\nPING\r\n")
clientExpect(pongRe)
gAExpect(runsubRe)
// Create some queues and check that interest is sent
// when GW reconnects.
clientSend("SUB foo bar 4\r\n")
gAExpect(rsubRe)
clientSend("SUB foo baz 5\r\n")
gAExpect(rsubRe)
clientSend("SUB foo bat 6\r\n")
gAExpect(rsubRe)
// There is already one on foo/bar, so nothing sent
clientSend("SUB foo bar 7\r\n")
expectNothing(t, gA)
// Add regular sub that should not cause RS+
clientSend("SUB foo 8\r\n")
expectNothing(t, gA)
// Recreate gA
gA.Close()
gA = createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gA.Close()
gASend, gAExpect = setupGatewayConn(t, gA, "A", "B")
// A should receive 3 RS+
expectNumberOfProtos(t, gAExpect, rsubRe, 3)
// Nothing more
expectNothing(t, gA)
gASend("PING\r\n")
gAExpect(pongRe)
// Have A send a message on subject that has no sub
gASend("RMSG $foo new.subject 2\r\nok\r\n")
gAExpect(runsubRe)
// Now create a queue sub and check that we do not receive
// an RS+ without the queue name.
clientSend("SUB new.* queue 9\r\nPING\r\n")
clientExpect(pongRe)
buf := gAExpect(rsubRe)
if !bytes.Contains(buf, []byte("new.* queue")) {
t.Fatalf("Should have receives RS+ for new.* for queue, did not: %v", buf)
}
// Check for no other RS+. A should still keep an RS- for plain
// sub on new.subject
expectNothing(t, gA)
// Send message, expected to be received by client
gASend("RMSG $foo new.subject | queue 2\r\nok\r\n")
clientExpect(msgRe)
// Unsubscribe the queue sub
clientSend("UNSUB 9\r\nPING\r\n")
clientExpect(pongRe)
// A should receive RS- for this queue sub
buf = gAExpect(runsubRe)
if !bytes.Contains(buf, []byte("new.* queue")) {
t.Fatalf("Should have receives RS- for new.* for queue, did not: %v", buf)
}
expectNothing(t, gA)
}
func TestGatewaySendAllSubs(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob := testDefaultOptionsForGateway("B")
sb := runGatewayServer(ob)
defer sb.Shutdown()
// Create a client on B
client := createClientConn(t, ob.Host, ob.Port)
defer client.Close()
clientSend, clientExpect := setupConn(t, client)
// Since we want to test RS+/-, we need to have at
// least a subscription on B so that sending from A does
// not result in A-
clientSend("SUB not.used 1234567\r\nPING\r\n")
clientExpect(pongRe)
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gA.Close()
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
gASend("PING\r\n")
gAExpect(pongRe)
// Bombard B with messages on different subjects.
// TODO(ik): Adapt if/when we change the conditions for the
// switch.
for i := 0; i < 1010; i++ {
gASend(fmt.Sprintf("RMSG $G foo.%d 2\r\nok\r\n", i))
if i < 1000 {
gAExpect(runsubRe)
}
}
// Expect an INFO + RS+ $G not.used + INFO
buf := bufio.NewReader(gA)
for i := 0; i < 3; i++ {
line, _, err := buf.ReadLine()
if err != nil {
t.Fatalf("Error reading: %v", err)
}
switch i {
case 0:
case 2:
if !bytes.HasPrefix(line, []byte("INFO {")) {
t.Fatalf("Expected INFO, got: %s", line)
}
case 1:
if !bytes.HasPrefix(line, []byte("RS+ ")) {
t.Fatalf("Expected RS+, got: %s", line)
}
}
}
// After this point, any new sub or unsub on B should be
// sent to A.
clientSend("SUB foo 1\r\n")
gAExpect(rsubRe)
clientSend("UNSUB 1\r\n")
gAExpect(runsubRe)
}
func TestGatewayNoPanicOnBadProtocol(t *testing.T) {
ob := testDefaultOptionsForGateway("B")
sb := runGatewayServer(ob)
defer sb.Shutdown()
for _, test := range []struct {
name string
proto string
}{
{"sub", "SUB > 1\r\n"},
{"unsub", "UNSUB 1\r\n"},
{"rsub", "RS+ $foo foo 2\r\n"},
{"runsub", "RS- $foo foo 2\r\n"},
{"pub", "PUB foo 2\r\nok\r\n"},
{"msg", "MSG foo 2\r\nok\r\n"},
{"rmsg", "RMSG $foo foo 2\r\nok\r\n"},
{"anything", "xxxx\r\n"},
} {
t.Run(test.name, func(t *testing.T) {
// Create raw tcp connection to gateway port
client := createClientConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer client.Close()
clientSend := sendCommand(t, client)
clientSend(test.proto)
})
}
// Server should not have crashed.
client := createClientConn(t, ob.Host, ob.Port)
defer client.Close()
clientSend, clientExpect := setupConn(t, client)
clientSend("PING\r\n")
clientExpect(pongRe)
}
func TestGatewayNoAccUnsubAfterQSub(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob := testDefaultOptionsForGateway("B")
sb := runGatewayServer(ob)
defer sb.Shutdown()
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gA.Close()
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
gASend("PING\r\n")
gAExpect(pongRe)
// Simulate a client connecting to A and publishing a message
// so we get an A- from B since there is no interest.
gASend("RMSG $G foo 2\r\nok\r\n")
gAExpect(runsubRe)
// Now create client on B and create queue sub.
client := createClientConn(t, ob.Host, ob.Port)
defer client.Close()
clientSend, clientExpect := setupConn(t, client)
clientSend("SUB bar queue 1\r\nPING\r\n")
clientExpect(pongRe)
// A should receive an RS+ for this queue sub.
gAExpect(rsubRe)
// On B, create a plain sub now. We should get nothing.
clientSend("SUB baz 2\r\nPING\r\n")
clientExpect(pongRe)
expectNothing(t, gA)
}
func TestGatewayErrorOnRSentFromOutbound(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob := testDefaultOptionsForGateway("B")
sb := runGatewayServer(ob)
defer sb.Shutdown()
for _, test := range []struct {
name string
proto string
}{
{"RS+", "RS+"},
{"RS-", "RS-"},
} {
t.Run(test.name, func(t *testing.T) {
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer gA.Close()
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
gASend("PING\r\n")
gAExpect(pongRe)
gASend(fmt.Sprintf("%s foo bar\r\n", test.proto))
expectDisconnect(t, gA)
})
}
}
func TestGatewaySystemConnectionAllowedToPublishOnGWPrefix(t *testing.T) {
sc := createSuperCluster(t, 2, 2)
defer sc.shutdown()
o := sc.clusters[1].opts[1]
url := fmt.Sprintf("nats://sys:pass@%s:%d", o.Host, o.Port)
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
reply := nats.NewInbox()
sub, err := nc.SubscribeSync(reply)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := nc.PublishRequest("$SYS.REQ.SERVER.PING", reply, nil); err != nil {
t.Fatalf("Failed to send request: %v", err)
}
for i := 0; i < 4; i++ {
if _, err := sub.NextMsg(time.Second); err != nil {
t.Fatalf("Expected to get a response, got %v", err)
}
}
}
func TestGatewayTLSMixedIPAndDNS(t *testing.T) {
server.SetGatewaysSolicitDelay(5 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()
// Run this test extra times to make sure not flaky since it
// on solicit time.
for i := 0; i < 10; i++ {
t.Run("", func(t *testing.T) {
confA1 := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
server_name: A1
gateway {
name: "A"
listen: "127.0.0.1:-1"
tls {
cert_file: "./configs/certs/server-iponly.pem"
key_file: "./configs/certs/server-key-iponly.pem"
ca_file: "./configs/certs/ca.pem"
timeout: 2
}
}
cluster {
listen: "127.0.0.1:-1"
}`))
srvA1, optsA1 := RunServerWithConfig(confA1)
defer srvA1.Shutdown()
confA2Template := `
listen: 127.0.0.1:-1
server_name: A2
gateway {
name: "A"
listen: "localhost:-1"
tls {
cert_file: "./configs/certs/server-cert.pem"
key_file: "./configs/certs/server-key.pem"
ca_file: "./configs/certs/ca.pem"
timeout: 2
}
}
cluster {
listen: "127.0.0.1:-1"
routes [
"nats://%s:%d"
]
}`
confA2 := createConfFile(t, []byte(fmt.Sprintf(confA2Template,
optsA1.Cluster.Host, optsA1.Cluster.Port)))
srvA2, optsA2 := RunServerWithConfig(confA2)
defer srvA2.Shutdown()
checkClusterFormed(t, srvA1, srvA2)
// Create a GW connection to cluster "A". Don't use the helper since we need verification etc.
o := DefaultTestOptions
o.Port = -1
o.ServerName = "B1"
o.Gateway.Name = "B"
o.Gateway.Host = "127.0.0.1"
o.Gateway.Port = -1
tc := &server.TLSConfigOpts{}
tc.CertFile = "./configs/certs/server-cert.pem"
tc.KeyFile = "./configs/certs/server-key.pem"
tc.CaFile = "./configs/certs/ca.pem"
tc.Timeout = 2.0
tlsConfig, err := server.GenTLSConfig(tc)
if err != nil {
t.Fatalf("Error generating TLS config: %v", err)
}
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
tlsConfig.RootCAs = tlsConfig.ClientCAs
o.Gateway.TLSConfig = tlsConfig.Clone()
rurl, _ := url.Parse(fmt.Sprintf("nats://%s:%d", optsA2.Gateway.Host, optsA2.Gateway.Port))
remote := &server.RemoteGatewayOpts{Name: "A", URLs: []*url.URL{rurl}}
remote.TLSConfig = tlsConfig.Clone()
o.Gateway.Gateways = []*server.RemoteGatewayOpts{remote}
srvB := RunServer(&o)
defer srvB.Shutdown()
waitForOutboundGateways(t, srvB, 1, 10*time.Second)
waitForOutboundGateways(t, srvA1, 1, 10*time.Second)
waitForOutboundGateways(t, srvA2, 1, 10*time.Second)
// Now kill off srvA2 and force serverB to connect to srvA1.
srvA2.Shutdown()
// Make sure this works.
waitForOutboundGateways(t, srvB, 1, 30*time.Second)
})
}
}
func TestGatewayAdvertiseInCluster(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob1 := testDefaultOptionsForGateway("B")
ob1.Cluster.Name = "B"
ob1.Cluster.Host = "127.0.0.1"
ob1.Cluster.Port = -1
sb1 := runGatewayServer(ob1)
defer sb1.Shutdown()
gA := createGatewayConn(t, ob1.Gateway.Host, ob1.Gateway.Port)
defer gA.Close()
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
gASend("PING\r\n")
gAExpect(pongRe)
ob2 := testDefaultOptionsForGateway("B")
ob2.Cluster.Name = "B"
ob2.Cluster.Host = "127.0.0.1"
ob2.Cluster.Port = -1
ob2.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ob1.Cluster.Port))
ob2.Gateway.Advertise = "srvB:7222"
sb2 := runGatewayServer(ob2)
defer sb2.Shutdown()
checkClusterFormed(t, sb1, sb2)
buf := gAExpect(infoRe)
si := &server.Info{}
json.Unmarshal(buf[5:], si)
var ok bool
for _, u := range si.GatewayURLs {
if u == "srvB:7222" {
ok = true
break
}
}
if !ok {
t.Fatalf("Url srvB:7222 was not found: %q", si.GatewayURLs)
}
ob3 := testDefaultOptionsForGateway("B")
ob3.Cluster.Name = "B"
ob3.Cluster.Host = "127.0.0.1"
ob3.Cluster.Port = -1
ob3.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ob1.Cluster.Port))
ob3.Gateway.Advertise = "srvB:7222"
sb3 := runGatewayServer(ob3)
defer sb3.Shutdown()
checkClusterFormed(t, sb1, sb2, sb3)
// Since it is the save srvB:7222 url, we should not get an update.
expectNothing(t, gA)
// Now shutdown sb2 and make sure that we are not getting an update
// with srvB:7222 missing.
sb2.Shutdown()
expectNothing(t, gA)
}
func TestGatewayAuthTimeout(t *testing.T) {
for _, test := range []struct {
name string
setAuth bool //
wait time.Duration
}{
{"auth not explicitly set", false, 2500 * time.Millisecond},
{"auth set", true, 500 * time.Millisecond},
} {
t.Run(test.name, func(t *testing.T) {
ob := testDefaultOptionsForGateway("B")
if test.setAuth {
ob.Gateway.AuthTimeout = 0.25
}
sb := RunServer(ob)
defer sb.Shutdown()
sa := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer sa.Close()
gAExpect := expectCommand(t, sa)
dstInfo := checkInfoMsg(t, sa)
if dstInfo.Gateway != "B" {
t.Fatalf("Expected to connect to %q, got %q", "B", dstInfo.Gateway)
}
// Don't send our CONNECT and we should be disconnected due to auth timeout.
time.Sleep(test.wait)
gAExpect(errRe)
expectDisconnect(t, sa)
})
}
}
func TestGatewayFirstPingGoesAfterConnect(t *testing.T) {
server.GatewayDoNotForceInterestOnlyMode(true)
defer server.GatewayDoNotForceInterestOnlyMode(false)
ob := testDefaultOptionsForGateway("B")
// For this test, we want the first ping to NOT be disabled.
ob.DisableShortFirstPing = false
// Also, for this test increase auth_timeout so that it does not disconnect
// while checking...
ob.Gateway.AuthTimeout = 10.0
sb := RunServer(ob)
defer sb.Shutdown()
sa := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
defer sa.Close()
gASend, gAExpect := sendCommand(t, sa), expectCommand(t, sa)
dstInfo := checkInfoMsg(t, sa)
if dstInfo.Gateway != "B" {
t.Fatalf("Expected to connect to %q, got %q", "B", dstInfo.Gateway)
}
// Wait and we should not be receiving a PING from server B until we send
// a CONNECT. We need to wait for more than the initial PING, so cannot
// use expectNothing() helper here.
buf := make([]byte, 256)
sa.SetReadDeadline(time.Now().Add(2 * time.Second))
if n, err := sa.Read(buf); err == nil {
t.Fatalf("Expected nothing, got %s", buf[:n])
}
// Now send connect and INFO
cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"tls_required\":%v,\"gateway\":%q}\r\n",
false, false, false, "A")
gASend(cs)
gASend(fmt.Sprintf("INFO {\"gateway\":%q}\r\n", "A"))
// We should get the first PING
gAExpect(pingRe)
}