Files
nats-server/server/norace_test.go
Matthias Hanel b316cccfd1 Fixed a quorum formation issue that caused truncation
When a new leader is elected it has to give everyone a chance to reply,
so that we can observe rejections with higher term.

The maximum election timeout is 7.5 seconds.
The new behavior of waiting for the election timeout caused unit tests
to fail. Hence upping the timeout there as well.

Signed-off-by: Matthias Hanel <mh@synadia.com>
2021-03-11 19:44:47 -05:00

1744 lines
48 KiB
Go

// Copyright 2018-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !race
package server
import (
"bufio"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/url"
"os"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
// IMPORTANT: Tests in this file are not executed when running with the -race flag.
// The test name should be prefixed with TestNoRace so we can run only
// those tests: go test -run=TestNoRace ...
func TestNoRaceAvoidSlowConsumerBigMessages(t *testing.T) {
opts := DefaultOptions() // Use defaults to make sure they avoid pending slow consumer.
opts.NoSystemAccount = true
s := RunServer(opts)
defer s.Shutdown()
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
data := make([]byte, 1024*1024) // 1MB payload
rand.Read(data)
expected := int32(500)
received := int32(0)
done := make(chan bool)
// Create Subscription.
nc1.Subscribe("slow.consumer", func(m *nats.Msg) {
// Just eat it so that we are not measuring
// code time, just delivery.
atomic.AddInt32(&received, 1)
if received >= expected {
done <- true
}
})
// Create Error handler
nc1.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
t.Fatalf("Received an error on the subscription's connection: %v\n", err)
})
nc1.Flush()
for i := 0; i < int(expected); i++ {
nc2.Publish("slow.consumer", data)
}
nc2.Flush()
select {
case <-done:
return
case <-time.After(10 * time.Second):
r := atomic.LoadInt32(&received)
if s.NumSlowConsumers() > 0 {
t.Fatalf("Did not receive all large messages due to slow consumer status: %d of %d", r, expected)
}
t.Fatalf("Failed to receive all large messages: %d of %d\n", r, expected)
}
}
func TestNoRaceRoutedQueueAutoUnsubscribe(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoSigs, optsA.NoLog = true, true
optsA.NoSystemAccount = true
srvA := RunServer(optsA)
defer srvA.Shutdown()
srvARouteURL := fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, srvA.ClusterAddr().Port)
optsB := nextServerOpts(optsA)
optsB.Routes = RoutesFromStr(srvARouteURL)
srvB := RunServer(optsB)
defer srvB.Shutdown()
// Wait for these 2 to connect to each other
checkClusterFormed(t, srvA, srvB)
// Have a client connection to each server
ncA, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncA.Close()
ncB, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncB.Close()
rbar := int32(0)
barCb := func(m *nats.Msg) {
atomic.AddInt32(&rbar, 1)
}
rbaz := int32(0)
bazCb := func(m *nats.Msg) {
atomic.AddInt32(&rbaz, 1)
}
// Create 125 queue subs with auto-unsubscribe to each server for
// group bar and group baz. So 250 total per queue group.
cons := []*nats.Conn{ncA, ncB}
for _, c := range cons {
for i := 0; i < 100; i++ {
qsub, err := c.QueueSubscribe("foo", "bar", barCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
qsub, err = c.QueueSubscribe("foo", "baz", bazCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
}
c.Subscribe("TEST.COMPLETE", func(m *nats.Msg) {})
}
// We coelasce now so for each server we will have all local (200) plus
// two from the remote side for each queue group. We also create one more
// and will wait til each server has 204 subscriptions, that will make sure
// that we have everything setup.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
subsA := srvA.NumSubscriptions()
subsB := srvB.NumSubscriptions()
if subsA != 204 || subsB != 204 {
return fmt.Errorf("Not all subs processed yet: %d and %d", subsA, subsB)
}
return nil
})
expected := int32(200)
// Now send messages from each server
for i := int32(0); i < expected; i++ {
c := cons[i%2]
c.Publish("foo", []byte("Don't Drop Me!"))
}
for _, c := range cons {
c.Flush()
}
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
nbar := atomic.LoadInt32(&rbar)
nbaz := atomic.LoadInt32(&rbaz)
if nbar == expected && nbaz == expected {
return nil
}
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
})
}
func TestNoRaceClosedSlowConsumerWriteDeadline(t *testing.T) {
opts := DefaultOptions()
opts.NoSystemAccount = true
opts.WriteDeadline = 10 * time.Millisecond // Make very small to trip.
opts.MaxPending = 500 * 1024 * 1024 // Set high so it will not trip here.
s := RunServer(opts)
defer s.Shutdown()
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer c.Close()
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
t.Fatalf("Error sending protocols to server: %v", err)
}
// Reduce socket buffer to increase reliability of data backing up in the server destined
// for our subscribed client.
c.(*net.TCPConn).SetReadBuffer(128)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()
payload := make([]byte, 1024*1024)
for i := 0; i < 100; i++ {
if err := sender.Publish("foo", payload); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
// Flush sender connection to ensure that all data has been sent.
if err := sender.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}
// At this point server should have closed connection c.
checkClosedConns(t, s, 1, 2*time.Second)
conns := s.closedClients()
if lc := len(conns); lc != 1 {
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
}
checkReason(t, conns[0].Reason, SlowConsumerWriteDeadline)
}
func TestNoRaceClosedSlowConsumerPendingBytes(t *testing.T) {
opts := DefaultOptions()
opts.NoSystemAccount = true
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
s := RunServer(opts)
defer s.Shutdown()
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer c.Close()
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
t.Fatalf("Error sending protocols to server: %v", err)
}
// Reduce socket buffer to increase reliability of data backing up in the server destined
// for our subscribed client.
c.(*net.TCPConn).SetReadBuffer(128)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()
payload := make([]byte, 1024*1024)
for i := 0; i < 100; i++ {
if err := sender.Publish("foo", payload); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
// Flush sender connection to ensure that all data has been sent.
if err := sender.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}
// At this point server should have closed connection c.
checkClosedConns(t, s, 1, 2*time.Second)
conns := s.closedClients()
if lc := len(conns); lc != 1 {
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
}
checkReason(t, conns[0].Reason, SlowConsumerPendingBytes)
}
func TestNoRaceSlowConsumerPendingBytes(t *testing.T) {
opts := DefaultOptions()
opts.NoSystemAccount = true
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
s := RunServer(opts)
defer s.Shutdown()
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer c.Close()
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
t.Fatalf("Error sending protocols to server: %v", err)
}
// Reduce socket buffer to increase reliability of data backing up in the server destined
// for our subscribed client.
c.(*net.TCPConn).SetReadBuffer(128)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()
payload := make([]byte, 1024*1024)
for i := 0; i < 100; i++ {
if err := sender.Publish("foo", payload); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
// Flush sender connection to ensure that all data has been sent.
if err := sender.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}
// At this point server should have closed connection c.
// On certain platforms, it may take more than one call before
// getting the error.
for i := 0; i < 100; i++ {
if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil {
// ok
return
}
}
t.Fatal("Connection should have been closed")
}
func TestNoRaceGatewayNoMissingReplies(t *testing.T) {
// This test will have following setup:
//
// responder1 requestor
// | |
// v v
// [A1]<-------gw------------[B1]
// | \ |
// | \______gw__________ | route
// | _\| |
// [ ]--------gw----------->[ ]
// [A2]<-------gw------------[B2]
// [ ] [ ]
// ^
// |
// responder2
//
// There is a possible race that when the requestor creates
// a subscription on the reply subject, the subject interest
// being sent from the inbound gateway, and B1 having none,
// the SUB first goes to B2 before being sent to A1 from
// B2's inbound GW. But the request can go from B1 to A1
// right away and the responder1 connecting to A1 may send
// back the reply before the interest on the reply makes it
// to A1 (from B2).
// This test will also verify that if the responder is instead
// connected to A2, the reply is properly received by requestor
// on B1.
// For this test we want to be in interestOnly mode, so
// make it happen quickly
gatewayMaxRUnsubBeforeSwitch = 1
defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }()
// Start with setting up A2 and B2.
ob2 := testDefaultOptionsForGateway("B")
sb2 := runGatewayServer(ob2)
defer sb2.Shutdown()
oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
sa2 := runGatewayServer(oa2)
defer sa2.Shutdown()
waitForOutboundGateways(t, sa2, 1, time.Second)
waitForInboundGateways(t, sa2, 1, time.Second)
waitForOutboundGateways(t, sb2, 1, time.Second)
waitForInboundGateways(t, sb2, 1, time.Second)
// Now start A1 which will connect to B2
oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
oa1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa2.Cluster.Host, oa2.Cluster.Port))
sa1 := runGatewayServer(oa1)
defer sa1.Shutdown()
waitForOutboundGateways(t, sa1, 1, time.Second)
waitForInboundGateways(t, sb2, 2, time.Second)
checkClusterFormed(t, sa1, sa2)
// Finally, start B1 that will connect to A1.
ob1 := testGatewayOptionsFromToWithServers(t, "B", "A", sa1)
ob1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob2.Cluster.Host, ob2.Cluster.Port))
sb1 := runGatewayServer(ob1)
defer sb1.Shutdown()
// Check that we have the outbound gateway from B1 to A1
checkFor(t, 3*time.Second, 15*time.Millisecond, func() error {
c := sb1.getOutboundGatewayConnection("A")
if c == nil {
return fmt.Errorf("Outbound connection to A not created yet")
}
c.mu.Lock()
name := c.opts.Name
nc := c.nc
c.mu.Unlock()
if name != sa1.ID() {
// Force a disconnect
nc.Close()
return fmt.Errorf("Was unable to have B1 connect to A1")
}
return nil
})
waitForInboundGateways(t, sa1, 1, time.Second)
checkClusterFormed(t, sb1, sb2)
a1URL := fmt.Sprintf("nats://%s:%d", oa1.Host, oa1.Port)
a2URL := fmt.Sprintf("nats://%s:%d", oa2.Host, oa2.Port)
b1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port)
b2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port)
ncb1 := natsConnect(t, b1URL)
defer ncb1.Close()
ncb2 := natsConnect(t, b2URL)
defer ncb2.Close()
natsSubSync(t, ncb1, "just.a.sub")
natsSubSync(t, ncb2, "just.a.sub")
checkExpectedSubs(t, 2, sb1, sb2)
// For this test, we want A to be checking B's interest in order
// to send messages (which would cause replies to be dropped if
// there is no interest registered on A). So from A servers,
// send to various subjects and cause B's to switch to interestOnly
// mode.
nca1 := natsConnect(t, a1URL)
defer nca1.Close()
for i := 0; i < 10; i++ {
natsPub(t, nca1, fmt.Sprintf("reject.%d", i), []byte("hello"))
}
nca2 := natsConnect(t, a2URL)
defer nca2.Close()
for i := 0; i < 10; i++ {
natsPub(t, nca2, fmt.Sprintf("reject.%d", i), []byte("hello"))
}
checkSwitchedMode := func(t *testing.T, s *Server) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
var switchedMode bool
c := s.getOutboundGatewayConnection("B")
ei, _ := c.gw.outsim.Load(globalAccountName)
if ei != nil {
e := ei.(*outsie)
e.RLock()
switchedMode = e.ni == nil && e.mode == InterestOnly
e.RUnlock()
}
if !switchedMode {
return fmt.Errorf("Still not switched mode")
}
return nil
})
}
checkSwitchedMode(t, sa1)
checkSwitchedMode(t, sa2)
// Setup a subscriber on _INBOX.> on each of A's servers.
total := 1000
expected := int32(total)
rcvOnA := int32(0)
qrcvOnA := int32(0)
natsSub(t, nca1, "myreply.>", func(_ *nats.Msg) {
atomic.AddInt32(&rcvOnA, 1)
})
natsQueueSub(t, nca2, "myreply.>", "bar", func(_ *nats.Msg) {
atomic.AddInt32(&qrcvOnA, 1)
})
checkExpectedSubs(t, 2, sa1, sa2)
// Ok.. so now we will run the actual test where we
// create a responder on A1 and make sure that every
// single request from B1 gets the reply. Will repeat
// test with responder connected to A2.
sendReqs := func(t *testing.T, subConn *nats.Conn) {
t.Helper()
responder := natsSub(t, subConn, "foo", func(m *nats.Msg) {
m.Respond([]byte("reply"))
})
natsFlush(t, subConn)
checkExpectedSubs(t, 3, sa1, sa2)
// We are not going to use Request() because this sets
// a wildcard subscription on an INBOX and less likely
// to produce the race. Instead we will explicitly set
// the subscription on the reply subject and create one
// per request.
for i := 0; i < total/2; i++ {
reply := fmt.Sprintf("myreply.%d", i)
replySub := natsQueueSubSync(t, ncb1, reply, "bar")
natsFlush(t, ncb1)
// Let's make sure we have interest on B2.
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
checkFor(t, time.Second, time.Millisecond, func() error {
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
return fmt.Errorf("B still not registered interest on %s", reply)
}
return nil
})
}
natsPubReq(t, ncb1, "foo", reply, []byte("request"))
if _, err := replySub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not receive reply: %v", err)
}
natsUnsub(t, replySub)
}
responder.Unsubscribe()
natsFlush(t, subConn)
checkExpectedSubs(t, 2, sa1, sa2)
}
sendReqs(t, nca1)
sendReqs(t, nca2)
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if n := atomic.LoadInt32(&rcvOnA); n != expected {
return fmt.Errorf("Subs on A expected to get %v replies, got %v", expected, n)
}
return nil
})
// We should not have received a single message on the queue sub
// on cluster A because messages will have been delivered to
// the member on cluster B.
if n := atomic.LoadInt32(&qrcvOnA); n != 0 {
t.Fatalf("Queue sub on A should not have received message, got %v", n)
}
}
func TestNoRaceRouteMemUsage(t *testing.T) {
oa := DefaultOptions()
sa := RunServer(oa)
defer sa.Shutdown()
ob := DefaultOptions()
ob.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa.Cluster.Host, oa.Cluster.Port))
sb := RunServer(ob)
defer sb.Shutdown()
checkClusterFormed(t, sa, sb)
responder := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port))
defer responder.Close()
for i := 0; i < 10; i++ {
natsSub(t, responder, "foo", func(m *nats.Msg) {
m.Respond(m.Data)
})
}
natsFlush(t, responder)
payload := make([]byte, 50*1024)
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
// Capture mem usage
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
inUseBefore := mem.HeapInuse
for i := 0; i < 100; i++ {
requestor := natsConnect(t, bURL)
inbox := nats.NewInbox()
sub := natsSubSync(t, requestor, inbox)
natsPubReq(t, requestor, "foo", inbox, payload)
for j := 0; j < 10; j++ {
natsNexMsg(t, sub, time.Second)
}
requestor.Close()
}
runtime.GC()
debug.FreeOSMemory()
runtime.ReadMemStats(&mem)
inUseNow := mem.HeapInuse
if inUseNow > 3*inUseBefore {
t.Fatalf("Heap in-use before was %v, now %v: too high", inUseBefore, inUseNow)
}
}
func TestNoRaceRouteCache(t *testing.T) {
maxPerAccountCacheSize = 20
prunePerAccountCacheSize = 5
closedSubsCheckInterval = 250 * time.Millisecond
defer func() {
maxPerAccountCacheSize = defaultMaxPerAccountCacheSize
prunePerAccountCacheSize = defaultPrunePerAccountCacheSize
closedSubsCheckInterval = defaultClosedSubsCheckInterval
}()
for _, test := range []struct {
name string
useQueue bool
}{
{"plain_sub", false},
{"queue_sub", true},
} {
t.Run(test.name, func(t *testing.T) {
oa := DefaultOptions()
oa.NoSystemAccount = true
sa := RunServer(oa)
defer sa.Shutdown()
ob := DefaultOptions()
ob.NoSystemAccount = true
ob.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa.Cluster.Host, oa.Cluster.Port))
sb := RunServer(ob)
defer sb.Shutdown()
checkClusterFormed(t, sa, sb)
responder := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port))
defer responder.Close()
natsSub(t, responder, "foo", func(m *nats.Msg) {
m.Respond(m.Data)
})
natsFlush(t, responder)
checkExpectedSubs(t, 1, sa)
checkExpectedSubs(t, 1, sb)
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
requestor := natsConnect(t, bURL)
defer requestor.Close()
ch := make(chan struct{}, 1)
cb := func(_ *nats.Msg) {
select {
case ch <- struct{}{}:
default:
}
}
sendReqs := func(t *testing.T, nc *nats.Conn, count int, unsub bool) {
t.Helper()
for i := 0; i < count; i++ {
inbox := nats.NewInbox()
var sub *nats.Subscription
if test.useQueue {
sub = natsQueueSub(t, nc, inbox, "queue", cb)
} else {
sub = natsSub(t, nc, inbox, cb)
}
natsPubReq(t, nc, "foo", inbox, []byte("hello"))
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Failed to get reply")
}
if unsub {
natsUnsub(t, sub)
}
}
}
sendReqs(t, requestor, maxPerAccountCacheSize+1, true)
var route *client
sb.mu.Lock()
for _, r := range sb.routes {
route = r
break
}
sb.mu.Unlock()
checkExpected := func(t *testing.T, expected int) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
route.mu.Lock()
n := len(route.in.pacache)
route.mu.Unlock()
if n != expected {
return fmt.Errorf("Expected %v subs in the cache, got %v", expected, n)
}
return nil
})
}
checkExpected(t, (maxPerAccountCacheSize+1)-(prunePerAccountCacheSize+1))
// Wait for more than the orphan check
time.Sleep(2 * closedSubsCheckInterval)
// Add a new subs up to point where new prune would occur
sendReqs(t, requestor, prunePerAccountCacheSize+1, false)
// Now closed subs should have been removed, so expected
// subs in the cache should be the new ones.
checkExpected(t, prunePerAccountCacheSize+1)
// Now try wil implicit unsubscribe (due to connection close)
sendReqs(t, requestor, maxPerAccountCacheSize+1, false)
requestor.Close()
checkExpected(t, maxPerAccountCacheSize-prunePerAccountCacheSize)
// Wait for more than the orphan check
time.Sleep(2 * closedSubsCheckInterval)
// Now create new connection and send prunePerAccountCacheSize+1
// and that should cause all subs from previous connection to be
// removed from cache
requestor = natsConnect(t, bURL)
defer requestor.Close()
sendReqs(t, requestor, prunePerAccountCacheSize+1, false)
checkExpected(t, prunePerAccountCacheSize+1)
})
}
}
func TestNoRaceFetchAccountDoesNotRegisterAccountTwice(t *testing.T) {
sa, oa, sb, ob, _ := runTrustedGateways(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Let's create a user account.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
jwt, _ := nac.Encode(okp)
userAcc := pub
// Replace B's account resolver with one that introduces
// delay during the Fetch()
sac := &slowAccResolver{AccountResolver: sb.AccountResolver()}
sb.SetAccountResolver(sac)
// Add the account in sa and sb
addAccountToMemResolver(sa, userAcc, jwt)
addAccountToMemResolver(sb, userAcc, jwt)
// Tell the slow account resolver which account to slow down
sac.Lock()
sac.acc = userAcc
sac.Unlock()
urlA := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
urlB := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
nca, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Error connecting to A: %v", err)
}
defer nca.Close()
// Since there is an optimistic send, this message will go to B
// and on processing this message, B will lookup/fetch this
// account, which can produce race with the fetch of this
// account from A's system account that sent a notification
// about this account, or with the client connect just after
// that.
nca.Publish("foo", []byte("hello"))
// Now connect and create a subscription on B
ncb, err := nats.Connect(urlB, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error connecting to A: %v", err)
}
defer ncb.Close()
sub, err := ncb.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
ncb.Flush()
// Now send messages from A and B should ultimately start to receive
// them (once the subscription has been correctly registered)
ok := false
for i := 0; i < 10; i++ {
nca.Publish("foo", []byte("hello"))
if _, err := sub.NextMsg(100 * time.Millisecond); err != nil {
continue
}
ok = true
break
}
if !ok {
t.Fatalf("B should be able to receive messages")
}
checkTmpAccounts := func(t *testing.T, s *Server) {
t.Helper()
empty := true
s.tmpAccounts.Range(func(_, _ interface{}) bool {
empty = false
return false
})
if !empty {
t.Fatalf("tmpAccounts is not empty")
}
}
checkTmpAccounts(t, sa)
checkTmpAccounts(t, sb)
}
func TestNoRaceWriteDeadline(t *testing.T) {
opts := DefaultOptions()
opts.NoSystemAccount = true
opts.WriteDeadline = 30 * time.Millisecond
s := RunServer(opts)
defer s.Shutdown()
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer c.Close()
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
t.Fatalf("Error sending protocols to server: %v", err)
}
// Reduce socket buffer to increase reliability of getting
// write deadline errors.
c.(*net.TCPConn).SetReadBuffer(4)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()
payload := make([]byte, 1000000)
total := 1000
for i := 0; i < total; i++ {
if err := sender.Publish("foo", payload); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
// Flush sender connection to ensure that all data has been sent.
if err := sender.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}
// At this point server should have closed connection c.
// On certain platforms, it may take more than one call before
// getting the error.
for i := 0; i < 100; i++ {
if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil {
// ok
return
}
}
t.Fatal("Connection should have been closed")
}
func TestNoRaceLeafNodeClusterNameConflictDeadlock(t *testing.T) {
o := DefaultOptions()
o.LeafNode.Port = -1
s := RunServer(o)
defer s.Shutdown()
u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o.LeafNode.Port))
if err != nil {
t.Fatalf("Error parsing url: %v", err)
}
o1 := DefaultOptions()
o1.ServerName = "A1"
o1.Cluster.Name = "clusterA"
o1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
s1 := RunServer(o1)
defer s1.Shutdown()
checkLeafNodeConnected(t, s1)
o2 := DefaultOptions()
o2.ServerName = "A2"
o2.Cluster.Name = "clusterA"
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
s2 := RunServer(o2)
defer s2.Shutdown()
checkLeafNodeConnected(t, s2)
checkClusterFormed(t, s1, s2)
o3 := DefaultOptions()
o3.ServerName = "A3"
o3.Cluster.Name = "" // intentionally not set
o3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
o3.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
s3 := RunServer(o3)
defer s3.Shutdown()
checkLeafNodeConnected(t, s3)
checkClusterFormed(t, s1, s2, s3)
}
// This test is same than TestAccountAddServiceImportRace but running
// without the -race flag, it would capture more easily the possible
// duplicate sid, resulting in less than expected number of subscriptions
// in the account's internal subscriptions map.
func TestNoRaceAccountAddServiceImportRace(t *testing.T) {
TestAccountAddServiceImportRace(t)
}
// Similar to the routed version. Make sure we receive all of the
// messages with auto-unsubscribe enabled.
func TestNoRaceQueueAutoUnsubscribe(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
rbar := int32(0)
barCb := func(m *nats.Msg) {
atomic.AddInt32(&rbar, 1)
}
rbaz := int32(0)
bazCb := func(m *nats.Msg) {
atomic.AddInt32(&rbaz, 1)
}
// Create 1000 subscriptions with auto-unsubscribe of 1.
// Do two groups, one bar and one baz.
total := 1000
for i := 0; i < total; i++ {
qsub, err := nc.QueueSubscribe("foo", "bar", barCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
qsub, err = nc.QueueSubscribe("foo", "baz", bazCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
}
nc.Flush()
expected := int32(total)
for i := int32(0); i < expected; i++ {
nc.Publish("foo", []byte("Don't Drop Me!"))
}
nc.Flush()
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
nbar := atomic.LoadInt32(&rbar)
nbaz := atomic.LoadInt32(&rbaz)
if nbar == expected && nbaz == expected {
return nil
}
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
})
}
func TestNoRaceAcceptLoopsDoNotLeaveOpenedConn(t *testing.T) {
for _, test := range []struct {
name string
url func(o *Options) (string, int)
}{
{"client", func(o *Options) (string, int) { return o.Host, o.Port }},
{"route", func(o *Options) (string, int) { return o.Cluster.Host, o.Cluster.Port }},
{"gateway", func(o *Options) (string, int) { return o.Gateway.Host, o.Gateway.Port }},
{"leafnode", func(o *Options) (string, int) { return o.LeafNode.Host, o.LeafNode.Port }},
{"websocket", func(o *Options) (string, int) { return o.Websocket.Host, o.Websocket.Port }},
} {
t.Run(test.name, func(t *testing.T) {
o := DefaultOptions()
o.DisableShortFirstPing = true
o.Accounts = []*Account{NewAccount("$SYS")}
o.SystemAccount = "$SYS"
o.Cluster.Name = "abc"
o.Cluster.Host = "127.0.0.1"
o.Cluster.Port = -1
o.Gateway.Name = "abc"
o.Gateway.Host = "127.0.0.1"
o.Gateway.Port = -1
o.LeafNode.Host = "127.0.0.1"
o.LeafNode.Port = -1
o.Websocket.Host = "127.0.0.1"
o.Websocket.Port = -1
o.Websocket.HandshakeTimeout = 1
o.Websocket.NoTLS = true
s := RunServer(o)
defer s.Shutdown()
host, port := test.url(o)
url := fmt.Sprintf("%s:%d", host, port)
var conns []net.Conn
wg := sync.WaitGroup{}
wg.Add(1)
done := make(chan struct{}, 1)
go func() {
defer wg.Done()
// Have an upper limit
for i := 0; i < 200; i++ {
c, err := net.Dial("tcp", url)
if err != nil {
return
}
conns = append(conns, c)
select {
case <-done:
return
default:
}
}
}()
time.Sleep(15 * time.Millisecond)
s.Shutdown()
close(done)
wg.Wait()
for _, c := range conns {
c.SetReadDeadline(time.Now().Add(2 * time.Second))
br := bufio.NewReader(c)
// Read INFO for connections that were accepted
_, _, err := br.ReadLine()
if err == nil {
// After that, the connection should be closed,
// so we should get an error here.
_, _, err = br.ReadLine()
}
// We expect an io.EOF or any other error indicating the use of a closed
// connection, but we should not get the timeout error.
if ne, ok := err.(net.Error); ok && ne.Timeout() {
err = nil
}
if err == nil {
var buf [10]byte
c.SetDeadline(time.Now().Add(2 * time.Second))
c.Write([]byte("C"))
_, err = c.Read(buf[:])
if ne, ok := err.(net.Error); ok && ne.Timeout() {
err = nil
}
}
if err == nil {
t.Fatalf("Connection should have been closed")
}
c.Close()
}
})
}
}
func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "MYS"
mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// This number needs to be higher than the internal sendq size to trigger what this test is testing.
for i := 0; i < 2000; i++ {
_, err := mset.addConsumer(&ConsumerConfig{
Durable: fmt.Sprintf("D-%d", i),
DeliverSubject: fmt.Sprintf("deliver.%d", i),
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
}
// With bug this would not return and would hang.
mset.delete()
}
func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
// Create 2X limit
streamsNum := 2 * JSApiNamesLimit
for i := 1; i <= streamsNum; i++ {
name := fmt.Sprintf("STREAM-%06d", i)
cfg := StreamConfig{Name: name, Storage: MemoryStorage}
_, err := s.GlobalAccount().addStream(&cfg)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&ApiPagedRequest{Offset: offset})
}
resp, err := nc.Request(JSApiStreams, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse JSApiStreamNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Streams) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams))
}
if listResponse.Total != streamsNum {
t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
if expectedLen < 1 {
return
}
// Make sure we get the right stream.
sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1)
if listResponse.Streams[0] != sname {
t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0])
}
}
checkResp(reqList(0), JSApiNamesLimit, 0)
checkResp(reqList(JSApiNamesLimit), JSApiNamesLimit, JSApiNamesLimit)
checkResp(reqList(streamsNum), 0, streamsNum)
checkResp(reqList(streamsNum-22), 22, streamsNum-22)
checkResp(reqList(streamsNum+22), 0, streamsNum)
}
func TestNoRaceJetStreamAPIConsumerListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
sname := "MYSTREAM"
mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: sname})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
consumersNum := JSApiNamesLimit
for i := 1; i <= consumersNum; i++ {
dsubj := fmt.Sprintf("d.%d", i)
sub, _ := nc.SubscribeSync(dsubj)
defer sub.Unsubscribe()
nc.Flush()
_, err := mset.addConsumer(&ConsumerConfig{DeliverSubject: dsubj})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
reqListSubject := fmt.Sprintf(JSApiConsumersT, sname)
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&JSApiConsumersRequest{ApiPagedRequest: ApiPagedRequest{Offset: offset}})
}
resp, err := nc.Request(reqListSubject, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse JSApiConsumerNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Consumers) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers))
}
if listResponse.Total != consumersNum {
t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
}
checkResp(reqList(0), JSApiNamesLimit, 0)
checkResp(reqList(consumersNum-22), 22, consumersNum-22)
checkResp(reqList(consumersNum+22), 0, consumersNum)
}
func TestNoRaceJetStreamWorkQueueLoadBalance(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mname := "MY_MSG_SET"
mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Subjects: []string{"foo", "bar"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer mset.delete()
// Create basic work queue mode consumer.
oname := "WQ"
o, err := mset.addConsumer(&ConsumerConfig{Durable: oname, AckPolicy: AckExplicit})
if err != nil {
t.Fatalf("Expected no error with durable, got %v", err)
}
defer o.delete()
// To send messages.
nc := clientConnectToServer(t, s)
defer nc.Close()
// For normal work queue semantics, you send requests to the subject with stream and consumer name.
reqMsgSubj := o.requestNextMsgSubject()
numWorkers := 25
counts := make([]int32, numWorkers)
var received int32
rwg := &sync.WaitGroup{}
rwg.Add(numWorkers)
wg := &sync.WaitGroup{}
wg.Add(numWorkers)
ch := make(chan bool)
toSend := 1000
for i := 0; i < numWorkers; i++ {
nc := clientConnectToServer(t, s)
defer nc.Close()
go func(index int32) {
rwg.Done()
defer wg.Done()
<-ch
for counter := &counts[index]; ; {
m, err := nc.Request(reqMsgSubj, nil, 100*time.Millisecond)
if err != nil {
return
}
m.Respond(nil)
atomic.AddInt32(counter, 1)
if total := atomic.AddInt32(&received, 1); total >= int32(toSend) {
return
}
}
}(int32(i))
}
// Wait for requestors to be ready
rwg.Wait()
close(ch)
sendSubj := "bar"
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, sendSubj, "Hello World!")
}
// Wait for test to complete.
wg.Wait()
target := toSend / numWorkers
delta := target/2 + 5
low, high := int32(target-delta), int32(target+delta)
for i := 0; i < numWorkers; i++ {
if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high {
t.Fatalf("Messages received for worker [%d] too far off from target of %d, got %d", i, target, msgs)
}
}
}
func TestNoRaceJetStreamClusterLargeStreamInlineCatchup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "LSS", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sr := c.randomNonStreamLeader("$G", "TEST")
sr.Shutdown()
// In case sr was meta leader.
c.waitOnLeader()
msg, toSend := []byte("Hello JS Clustering"), 5000
// Now fill up stream.
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Check active state as well, shows that the owner answered.
if si.State.Msgs != uint64(toSend) {
t.Fatalf("Expected %d msgs, got bad state: %+v", toSend, si.State)
}
// Kill our current leader to make just 2.
c.streamLeader("$G", "TEST").Shutdown()
// Now restart the shutdown peer and wait for it to be current.
sr = c.restartServer(sr)
c.waitOnStreamCurrent(sr, "$G", "TEST")
// Ask other servers to stepdown as leader so that sr becomes the leader.
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader("$G", "TEST")
if sl := c.streamLeader("$G", "TEST"); sl != sr {
sl.JetStreamStepdownStream("$G", "TEST")
return fmt.Errorf("Server %s is not leader yet", sr)
}
return nil
})
si, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Check that we have all of our messsages stored.
// Wait for a bit for upper layers to process.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Expected %d msgs, got %d", toSend, si.State.Msgs)
}
return nil
})
}
func TestNoRaceJetStreamClusterStreamCreateAndLostQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
sub, err := nc.SubscribeSync(JSAdvisoryStreamQuorumLostPre + ".*")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.AddStream(&nats.StreamConfig{Name: "NO-LQ-START", Replicas: 3}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c.waitOnStreamLeader("$G", "NO-LQ-START")
checkSubsPending(t, sub, 0)
c.stopAll()
// Start up the one we were connected to first and wait for it to be connected.
s = c.restartServer(s)
nc, err = nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
sub, err = nc.SubscribeSync(JSAdvisoryStreamQuorumLostPre + ".*")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc.Flush()
c.restartAll()
c.waitOnStreamLeader("$G", "NO-LQ-START")
checkSubsPending(t, sub, 0)
}
func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
// These pass locally but are flaky on Travis.
// Disable for now.
skip(t)
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Client based API
s := sc.clusterForName("C2").randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Create source stream.
_, err := js.AddStream(&nats.StreamConfig{Name: "S1", Subjects: []string{"foo", "bar"}, Replicas: 3, Placement: &nats.Placement{Cluster: "C2"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Needed while Go client does not have mirror support.
createStream := func(cfg *StreamConfig) {
t.Helper()
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiStreamCreateResponse
if err := json.Unmarshal(rm.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
}
// Send 100 messages.
for i := 0; i < 100; i++ {
if _, err := js.Publish("foo", []byte("MIRRORS!")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
createStream(&StreamConfig{
Name: "M1",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1"},
Placement: &Placement{Cluster: "C1"},
})
// Faster timeout since we loop below checking for condition.
js2, err := nc.JetStream(nats.MaxWait(50 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("M1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 100 {
return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State)
}
return nil
})
// Purge the source stream.
if err := js.PurgeStream("S1"); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
// Send 50 more msgs now.
for i := 0; i < 50; i++ {
if _, err := js.Publish("bar", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
createStream(&StreamConfig{
Name: "M2",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1"},
Replicas: 3,
Placement: &Placement{Cluster: "C3"},
})
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("M2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 50 {
return fmt.Errorf("Expected 50 msgs, got state: %+v", si.State)
}
if si.State.FirstSeq != 101 {
return fmt.Errorf("Expected start seq of 101, got state: %+v", si.State)
}
return nil
})
sl := sc.clusterForName("C3").streamLeader("$G", "M2")
doneCh := make(chan bool)
// Now test that if the mirror get's interrupted that it picks up where it left off etc.
go func() {
// Send 100 more messages.
for i := 0; i < 100; i++ {
if _, err := js.Publish("foo", []byte("MIRRORS!")); err != nil {
t.Errorf("Unexpected publish on %d error: %v", i, err)
}
time.Sleep(2 * time.Millisecond)
}
doneCh <- true
}()
time.Sleep(20 * time.Millisecond)
sl.Shutdown()
<-doneCh
sc.clusterForName("C3").waitOnStreamLeader("$G", "M2")
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("M2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 150 {
return fmt.Errorf("Expected 150 msgs, got state: %+v", si.State)
}
if si.State.FirstSeq != 101 {
return fmt.Errorf("Expected start seq of 101, got state: %+v", si.State)
}
return nil
})
}
func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) {
// These pass locally but are flaky on Travis.
// Disable for now.
skip(t)
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Client based API
s := sc.clusterForName("C1").randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Create our source streams.
for _, sname := range []string{"foo", "bar", "baz"} {
if _, err := js.AddStream(&nats.StreamConfig{Name: sname, Replicas: 1}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
sendBatch := func(subject string, n int) {
for i := 0; i < n; i++ {
msg := fmt.Sprintf("MSG-%d", i+1)
if _, err := js.Publish(subject, []byte(msg)); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}
// Populate each one.
sendBatch("foo", 10)
sendBatch("bar", 15)
sendBatch("baz", 25)
// Needed while Go client does not have mirror support for creating mirror or source streams.
createStream := func(cfg *StreamConfig) {
t.Helper()
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiStreamCreateResponse
if err := json.Unmarshal(rm.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
}
cfg := &StreamConfig{
Name: "MS",
Storage: FileStorage,
Sources: []*StreamSource{
&StreamSource{Name: "foo"},
&StreamSource{Name: "bar"},
&StreamSource{Name: "baz"},
},
}
createStream(cfg)
time.Sleep(time.Second)
// Faster timeout since we loop below checking for condition.
js2, err := nc.JetStream(nats.MaxWait(50 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("MS")
if err != nil {
return err
}
if si.State.Msgs != 50 {
return fmt.Errorf("Expected 50 msgs, got state: %+v", si.State)
}
return nil
})
// Purge the source streams.
for _, sname := range []string{"foo", "bar", "baz"} {
if err := js.PurgeStream(sname); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
}
if err := js.DeleteStream("MS"); err != nil {
t.Fatalf("Unexpected delete error: %v", err)
}
// Send more msgs now.
sendBatch("foo", 10)
sendBatch("bar", 15)
sendBatch("baz", 25)
cfg = &StreamConfig{
Name: "MS2",
Storage: FileStorage,
Sources: []*StreamSource{
&StreamSource{Name: "foo"},
&StreamSource{Name: "bar"},
&StreamSource{Name: "baz"},
},
Replicas: 3,
Placement: &Placement{Cluster: "C3"},
}
createStream(cfg)
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("MS2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 50 {
return fmt.Errorf("Expected 50 msgs, got state: %+v", si.State)
}
if si.State.FirstSeq != 1 {
return fmt.Errorf("Expected start seq of 1, got state: %+v", si.State)
}
return nil
})
sl := sc.clusterForName("C3").streamLeader("$G", "MS2")
doneCh := make(chan bool)
if sl == sc.leader() {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
sc.waitOnLeader()
}
// Now test that if the mirror get's interrupted that it picks up where it left off etc.
go func() {
// Send 50 more messages each.
for i := 0; i < 50; i++ {
msg := fmt.Sprintf("R-MSG-%d", i+1)
for _, sname := range []string{"foo", "bar", "baz"} {
if _, err := js.Publish(sname, []byte(msg)); err != nil {
t.Errorf("Unexpected publish error: %v", err)
}
}
time.Sleep(2 * time.Millisecond)
}
doneCh <- true
}()
time.Sleep(20 * time.Millisecond)
sl.Shutdown()
sc.clusterForName("C3").waitOnStreamLeader("$G", "MS2")
<-doneCh
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("MS2")
if err != nil {
return err
}
if si.State.Msgs != 200 {
return fmt.Errorf("Expected 200 msgs, got state: %+v", si.State)
}
return nil
})
}