mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Refactor the way client is initialized. We need to ensure that clients are not added to the clients map and readLoop started if the server is in the process of being shutdown otherwise there is a chance that the server already gathered the list of connections to close and this one would not be included, leaving a readLoop running. Same occurs for routes, with the complexity that the readLoop is started well before the route connection is added to the server routes' list. We need a temporary map that contains those connections to be able to close them on server Shutdown. Fixed some flapping tests.
445 lines
11 KiB
Go
445 lines
11 KiB
Go
// Copyright 2013-2016 Apcera Inc. All rights reserved.
|
|
|
|
package test
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"runtime"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/gnatsd/server"
|
|
)
|
|
|
|
// Helper function to check that a cluster is formed
|
|
func checkClusterFormed(t *testing.T, servers ...*server.Server) {
|
|
// Wait for the cluster to form
|
|
var err string
|
|
expectedNumRoutes := len(servers) - 1
|
|
maxTime := time.Now().Add(5 * time.Second)
|
|
for time.Now().Before(maxTime) {
|
|
err = ""
|
|
for _, s := range servers {
|
|
if numRoutes := s.NumRoutes(); numRoutes != expectedNumRoutes {
|
|
err = fmt.Sprintf("Expected %d routes for server %q, got %d", expectedNumRoutes, s.ID(), numRoutes)
|
|
break
|
|
}
|
|
}
|
|
if err != "" {
|
|
time.Sleep(100 * time.Millisecond)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
if err != "" {
|
|
t.Fatalf("%s", err)
|
|
}
|
|
}
|
|
|
|
// Helper function to check that a server (or list of servers) have the
|
|
// expected number of subscriptions
|
|
func checkExpectedSubs(expected int, servers ...*server.Server) error {
|
|
var err string
|
|
maxTime := time.Now().Add(5 * time.Second)
|
|
for time.Now().Before(maxTime) {
|
|
err = ""
|
|
for _, s := range servers {
|
|
if numSubs := int(s.NumSubscriptions()); numSubs != expected {
|
|
err = fmt.Sprintf("Expected %d subscriptions for server %q, got %d", expected, s.ID(), numSubs)
|
|
break
|
|
}
|
|
}
|
|
if err != "" {
|
|
time.Sleep(100 * time.Millisecond)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
if err != "" {
|
|
return errors.New(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runServers(t *testing.T) (srvA, srvB *server.Server, optsA, optsB *server.Options) {
|
|
srvA, optsA = RunServerWithConfig("./configs/srv_a.conf")
|
|
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
|
|
|
|
checkClusterFormed(t, srvA, srvB)
|
|
return
|
|
}
|
|
|
|
func TestProperServerWithRoutesShutdown(t *testing.T) {
|
|
before := runtime.NumGoroutine()
|
|
srvA, srvB, _, _ := runServers(t)
|
|
srvA.Shutdown()
|
|
srvB.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
after := runtime.NumGoroutine()
|
|
delta := after - before
|
|
// There may be some finalizers or IO, but in general more than
|
|
// 2 as a delta represents a problem.
|
|
if delta > 2 {
|
|
t.Fatalf("Expected same number of goroutines, %d vs %d\n", before, after)
|
|
}
|
|
}
|
|
|
|
func TestDoubleRouteConfig(t *testing.T) {
|
|
srvA, srvB, _, _ := runServers(t)
|
|
defer srvA.Shutdown()
|
|
defer srvB.Shutdown()
|
|
}
|
|
|
|
func TestBasicClusterPubSub(t *testing.T) {
|
|
srvA, srvB, optsA, optsB := runServers(t)
|
|
defer srvA.Shutdown()
|
|
defer srvB.Shutdown()
|
|
|
|
clientA := createClientConn(t, optsA.Host, optsA.Port)
|
|
defer clientA.Close()
|
|
|
|
clientB := createClientConn(t, optsB.Host, optsB.Port)
|
|
defer clientB.Close()
|
|
|
|
sendA, expectA := setupConn(t, clientA)
|
|
sendA("SUB foo 22\r\n")
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
if err := checkExpectedSubs(1, srvA, srvB); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
|
|
sendB, expectB := setupConn(t, clientB)
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
expectMsgs := expectMsgsCommand(t, expectA)
|
|
|
|
matches := expectMsgs(1)
|
|
checkMsg(t, matches[0], "foo", "22", "", "2", "ok")
|
|
}
|
|
|
|
func TestClusterQueueSubs(t *testing.T) {
|
|
srvA, srvB, optsA, optsB := runServers(t)
|
|
defer srvA.Shutdown()
|
|
defer srvB.Shutdown()
|
|
|
|
clientA := createClientConn(t, optsA.Host, optsA.Port)
|
|
defer clientA.Close()
|
|
|
|
clientB := createClientConn(t, optsB.Host, optsB.Port)
|
|
defer clientB.Close()
|
|
|
|
sendA, expectA := setupConn(t, clientA)
|
|
sendB, expectB := setupConn(t, clientB)
|
|
|
|
expectMsgsA := expectMsgsCommand(t, expectA)
|
|
expectMsgsB := expectMsgsCommand(t, expectB)
|
|
|
|
// Capture sids for checking later.
|
|
qg1SidsA := []string{"1", "2", "3"}
|
|
|
|
// Three queue subscribers
|
|
for _, sid := range qg1SidsA {
|
|
sendA(fmt.Sprintf("SUB foo qg1 %s\r\n", sid))
|
|
}
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
// Make sure the subs have propagated to srvB before continuing
|
|
if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
// Make sure we get only 1.
|
|
matches := expectMsgsA(1)
|
|
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
|
|
|
// Capture sids for checking later.
|
|
pSids := []string{"4", "5", "6"}
|
|
|
|
// Create 3 normal subscribers
|
|
for _, sid := range pSids {
|
|
sendA(fmt.Sprintf("SUB foo %s\r\n", sid))
|
|
}
|
|
|
|
// Create a FWC Subscriber
|
|
pSids = append(pSids, "7")
|
|
sendA("SUB > 7\r\n")
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
// Make sure the subs have propagated to srvB before continuing
|
|
if err := checkExpectedSubs(len(qg1SidsA)+len(pSids), srvB); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
|
|
// Send to B
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
// Should receive 5.
|
|
matches = expectMsgsA(5)
|
|
checkForQueueSid(t, matches, qg1SidsA)
|
|
checkForPubSids(t, matches, pSids)
|
|
|
|
// Send to A
|
|
sendA("PUB foo 2\r\nok\r\n")
|
|
|
|
// Should receive 5.
|
|
matches = expectMsgsA(5)
|
|
checkForQueueSid(t, matches, qg1SidsA)
|
|
checkForPubSids(t, matches, pSids)
|
|
|
|
// Now add queue subscribers to B
|
|
qg2SidsB := []string{"1", "2", "3"}
|
|
for _, sid := range qg2SidsB {
|
|
sendB(fmt.Sprintf("SUB foo qg2 %s\r\n", sid))
|
|
}
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
// Make sure the subs have propagated to srvA before continuing
|
|
if err := checkExpectedSubs(len(qg1SidsA)+len(pSids)+len(qg2SidsB), srvA); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
|
|
// Send to B
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
|
|
// Should receive 1 from B.
|
|
matches = expectMsgsB(1)
|
|
checkForQueueSid(t, matches, qg2SidsB)
|
|
|
|
// Should receive 5 still from A.
|
|
matches = expectMsgsA(5)
|
|
checkForQueueSid(t, matches, qg1SidsA)
|
|
checkForPubSids(t, matches, pSids)
|
|
|
|
// Now drop queue subscribers from A
|
|
for _, sid := range qg1SidsA {
|
|
sendA(fmt.Sprintf("UNSUB %s\r\n", sid))
|
|
}
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
// Make sure the subs have propagated to srvB before continuing
|
|
if err := checkExpectedSubs(len(pSids)+len(qg2SidsB), srvB); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
|
|
// Send to B
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
|
|
// Should receive 1 from B.
|
|
matches = expectMsgsB(1)
|
|
checkForQueueSid(t, matches, qg2SidsB)
|
|
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
// Should receive 4 now.
|
|
matches = expectMsgsA(4)
|
|
checkForPubSids(t, matches, pSids)
|
|
|
|
// Send to A
|
|
sendA("PUB foo 2\r\nok\r\n")
|
|
|
|
// Should receive 4 now.
|
|
matches = expectMsgsA(4)
|
|
checkForPubSids(t, matches, pSids)
|
|
}
|
|
|
|
// Issue #22
|
|
func TestClusterDoubleMsgs(t *testing.T) {
|
|
srvA, srvB, optsA, optsB := runServers(t)
|
|
defer srvA.Shutdown()
|
|
defer srvB.Shutdown()
|
|
|
|
clientA1 := createClientConn(t, optsA.Host, optsA.Port)
|
|
defer clientA1.Close()
|
|
|
|
clientA2 := createClientConn(t, optsA.Host, optsA.Port)
|
|
defer clientA2.Close()
|
|
|
|
clientB := createClientConn(t, optsB.Host, optsB.Port)
|
|
defer clientB.Close()
|
|
|
|
sendA1, expectA1 := setupConn(t, clientA1)
|
|
sendA2, expectA2 := setupConn(t, clientA2)
|
|
sendB, expectB := setupConn(t, clientB)
|
|
|
|
expectMsgsA1 := expectMsgsCommand(t, expectA1)
|
|
expectMsgsA2 := expectMsgsCommand(t, expectA2)
|
|
|
|
// Capture sids for checking later.
|
|
qg1SidsA := []string{"1", "2", "3"}
|
|
|
|
// Three queue subscribers
|
|
for _, sid := range qg1SidsA {
|
|
sendA1(fmt.Sprintf("SUB foo qg1 %s\r\n", sid))
|
|
}
|
|
sendA1("PING\r\n")
|
|
expectA1(pongRe)
|
|
|
|
// Make sure the subs have propagated to srvB before continuing
|
|
if err := checkExpectedSubs(len(qg1SidsA), srvB); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
// Make sure we get only 1.
|
|
matches := expectMsgsA1(1)
|
|
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
|
checkForQueueSid(t, matches, qg1SidsA)
|
|
|
|
// Add a FWC subscriber on A2
|
|
sendA2("SUB > 1\r\n")
|
|
sendA2("SUB foo 2\r\n")
|
|
sendA2("PING\r\n")
|
|
expectA2(pongRe)
|
|
pSids := []string{"1", "2"}
|
|
|
|
// Make sure the subs have propagated to srvB before continuing
|
|
if err := checkExpectedSubs(len(qg1SidsA)+2, srvB); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
matches = expectMsgsA1(1)
|
|
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
|
checkForQueueSid(t, matches, qg1SidsA)
|
|
|
|
matches = expectMsgsA2(2)
|
|
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
|
checkForPubSids(t, matches, pSids)
|
|
|
|
// Close ClientA1
|
|
clientA1.Close()
|
|
|
|
sendB("PUB foo 2\r\nok\r\n")
|
|
sendB("PING\r\n")
|
|
expectB(pongRe)
|
|
|
|
matches = expectMsgsA2(2)
|
|
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
|
checkForPubSids(t, matches, pSids)
|
|
}
|
|
|
|
// This will test that we drop remote sids correctly.
|
|
func TestClusterDropsRemoteSids(t *testing.T) {
|
|
srvA, srvB, optsA, _ := runServers(t)
|
|
defer srvA.Shutdown()
|
|
defer srvB.Shutdown()
|
|
|
|
clientA := createClientConn(t, optsA.Host, optsA.Port)
|
|
defer clientA.Close()
|
|
|
|
sendA, expectA := setupConn(t, clientA)
|
|
|
|
// Add a subscription
|
|
sendA("SUB foo 1\r\n")
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
// Wait for propagation.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if sc := srvA.NumSubscriptions(); sc != 1 {
|
|
t.Fatalf("Expected one subscription for srvA, got %d\n", sc)
|
|
}
|
|
if sc := srvB.NumSubscriptions(); sc != 1 {
|
|
t.Fatalf("Expected one subscription for srvB, got %d\n", sc)
|
|
}
|
|
|
|
// Add another subscription
|
|
sendA("SUB bar 2\r\n")
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
// Wait for propagation.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if sc := srvA.NumSubscriptions(); sc != 2 {
|
|
t.Fatalf("Expected two subscriptions for srvA, got %d\n", sc)
|
|
}
|
|
if sc := srvB.NumSubscriptions(); sc != 2 {
|
|
t.Fatalf("Expected two subscriptions for srvB, got %d\n", sc)
|
|
}
|
|
|
|
// unsubscription
|
|
sendA("UNSUB 1\r\n")
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
// Wait for propagation.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if sc := srvA.NumSubscriptions(); sc != 1 {
|
|
t.Fatalf("Expected one subscription for srvA, got %d\n", sc)
|
|
}
|
|
if sc := srvB.NumSubscriptions(); sc != 1 {
|
|
t.Fatalf("Expected one subscription for srvB, got %d\n", sc)
|
|
}
|
|
|
|
// Close the client and make sure we remove subscription state.
|
|
clientA.Close()
|
|
|
|
// Wait for propagation.
|
|
time.Sleep(100 * time.Millisecond)
|
|
if sc := srvA.NumSubscriptions(); sc != 0 {
|
|
t.Fatalf("Expected no subscriptions for srvA, got %d\n", sc)
|
|
}
|
|
if sc := srvB.NumSubscriptions(); sc != 0 {
|
|
t.Fatalf("Expected no subscriptions for srvB, got %d\n", sc)
|
|
}
|
|
}
|
|
|
|
// This will test that we drop remote sids correctly.
|
|
func TestAutoUnsubscribePropagation(t *testing.T) {
|
|
srvA, srvB, optsA, _ := runServers(t)
|
|
defer srvA.Shutdown()
|
|
defer srvB.Shutdown()
|
|
|
|
clientA := createClientConn(t, optsA.Host, optsA.Port)
|
|
defer clientA.Close()
|
|
|
|
sendA, expectA := setupConn(t, clientA)
|
|
expectMsgs := expectMsgsCommand(t, expectA)
|
|
|
|
// We will create subscriptions that will auto-unsubscribe and make sure
|
|
// we are not accumulating orphan subscriptions on the other side.
|
|
for i := 1; i <= 100; i++ {
|
|
sub := fmt.Sprintf("SUB foo %d\r\n", i)
|
|
auto := fmt.Sprintf("UNSUB %d 1\r\n", i)
|
|
sendA(sub)
|
|
sendA(auto)
|
|
// This will trip the auto-unsubscribe
|
|
sendA("PUB foo 2\r\nok\r\n")
|
|
expectMsgs(1)
|
|
}
|
|
|
|
sendA("PING\r\n")
|
|
expectA(pongRe)
|
|
|
|
// Make sure number of subscriptions on B is correct
|
|
if subs := srvB.NumSubscriptions(); subs != 0 {
|
|
t.Fatalf("Expected no subscriptions on remote server, got %d\n", subs)
|
|
}
|
|
}
|