mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Update tests on Travis with tweaked GC settings
Moved some tests to "no race" tests that are run separately. Removing -v and adding -p=1. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1,10 +1,5 @@
|
||||
# Tests are starting to consume enough memory during --race tests to exceed container env on Travis of 4GB.
|
||||
sudo: required
|
||||
dist: trusty
|
||||
|
||||
language: go
|
||||
go:
|
||||
- 1.9.x
|
||||
- 1.10.x
|
||||
- 1.11.x
|
||||
install:
|
||||
@@ -24,6 +19,7 @@ before_script:
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]] && [ "$TRAVIS_TAG" != "" ]; then ./scripts/cross_compile.sh $TRAVIS_TAG; fi
|
||||
script:
|
||||
- go test -i $EXCLUDE_VENDOR
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race $EXCLUDE_VENDOR; fi
|
||||
- go test -run=TestNoRace $EXCLUDE_VENDOR
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else GOGC=10 go test -race -p=1 $EXCLUDE_VENDOR; fi
|
||||
after_success:
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]] && [ "$TRAVIS_TAG" != "" ]; then ghr --owner nats-io --token $GITHUB_TOKEN --draft --replace $TRAVIS_TAG pkg/; fi
|
||||
|
||||
@@ -241,100 +241,6 @@ func TestClosedMaxPayload(t *testing.T) {
|
||||
checkReason(t, conns[0].Reason, MaxPayloadExceeded)
|
||||
}
|
||||
|
||||
func TestClosedSlowConsumerWriteDeadline(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.WriteDeadline = 10 * time.Millisecond // Make very small to trip.
|
||||
opts.MaxPending = 500 * 1024 * 1024 // Set high so it will not trip here.
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
||||
t.Fatalf("Error sending protocols to server: %v", err)
|
||||
}
|
||||
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
||||
// for our subscribed client.
|
||||
c.(*net.TCPConn).SetReadBuffer(128)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
sender, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer sender.Close()
|
||||
|
||||
payload := make([]byte, 1024*1024)
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sender.Publish("foo", payload); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sender connection to ensure that all data has been sent.
|
||||
if err := sender.Flush(); err != nil {
|
||||
t.Fatalf("Error on flush: %v", err)
|
||||
}
|
||||
|
||||
// At this point server should have closed connection c.
|
||||
checkClosedConns(t, s, 1, 2*time.Second)
|
||||
conns := s.closedClients()
|
||||
if lc := len(conns); lc != 1 {
|
||||
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
|
||||
}
|
||||
checkReason(t, conns[0].Reason, SlowConsumerWriteDeadline)
|
||||
}
|
||||
|
||||
func TestClosedSlowConsumerPendingBytes(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
|
||||
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
||||
t.Fatalf("Error sending protocols to server: %v", err)
|
||||
}
|
||||
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
||||
// for our subscribed client.
|
||||
c.(*net.TCPConn).SetReadBuffer(128)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
sender, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer sender.Close()
|
||||
|
||||
payload := make([]byte, 1024*1024)
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sender.Publish("foo", payload); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sender connection to ensure that all data has been sent.
|
||||
if err := sender.Flush(); err != nil {
|
||||
t.Fatalf("Error on flush: %v", err)
|
||||
}
|
||||
|
||||
// At this point server should have closed connection c.
|
||||
checkClosedConns(t, s, 1, 2*time.Second)
|
||||
conns := s.closedClients()
|
||||
if lc := len(conns); lc != 1 {
|
||||
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
|
||||
}
|
||||
checkReason(t, conns[0].Reason, SlowConsumerPendingBytes)
|
||||
}
|
||||
|
||||
func TestClosedTLSHandshake(t *testing.T) {
|
||||
opts, err := ProcessConfigFile("./configs/tls.conf")
|
||||
if err != nil {
|
||||
|
||||
@@ -18,6 +18,7 @@ package server
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -26,8 +27,10 @@ import (
|
||||
)
|
||||
|
||||
// IMPORTANT: Tests in this file are not executed when running with the -race flag.
|
||||
// The test name should be prefixed with TestNoRace so we can run only
|
||||
// those tests: go test -run=TestNoRace ...
|
||||
|
||||
func TestAvoidSlowConsumerBigMessages(t *testing.T) {
|
||||
func TestNoRaceAvoidSlowConsumerBigMessages(t *testing.T) {
|
||||
opts := DefaultOptions() // Use defaults to make sure they avoid pending slow consumer.
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
@@ -85,3 +88,244 @@ func TestAvoidSlowConsumerBigMessages(t *testing.T) {
|
||||
t.Fatalf("Failed to receive all large messages: %d of %d\n", r, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoRaceRoutedQueueAutoUnsubscribe(t *testing.T) {
|
||||
optsA, _ := ProcessConfigFile("./configs/seed.conf")
|
||||
optsA.NoSigs, optsA.NoLog = true, true
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
srvARouteURL := fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, srvA.ClusterAddr().Port)
|
||||
optsB := nextServerOpts(optsA)
|
||||
optsB.Routes = RoutesFromStr(srvARouteURL)
|
||||
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
// Wait for these 2 to connect to each other
|
||||
checkClusterFormed(t, srvA, srvB)
|
||||
|
||||
// Have a client connection to each server
|
||||
ncA, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer ncA.Close()
|
||||
|
||||
ncB, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer ncB.Close()
|
||||
|
||||
rbar := int32(0)
|
||||
barCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbar, 1)
|
||||
}
|
||||
rbaz := int32(0)
|
||||
bazCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbaz, 1)
|
||||
}
|
||||
|
||||
// Create 125 queue subs with auto-unsubscribe to each server for
|
||||
// group bar and group baz. So 250 total per queue group.
|
||||
cons := []*nats.Conn{ncA, ncB}
|
||||
for _, c := range cons {
|
||||
for i := 0; i < 125; i++ {
|
||||
qsub, err := c.QueueSubscribe("foo", "bar", barCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
qsub, err = c.QueueSubscribe("foo", "baz", bazCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
}
|
||||
c.Subscribe("TEST.COMPLETE", func(m *nats.Msg) {})
|
||||
}
|
||||
|
||||
// We coelasce now so for each server we will have all local (250) plus
|
||||
// two from the remote side for each queue group. We also create one more
|
||||
// and will wait til each server has 254 subscriptions, that will make sure
|
||||
// that we have everything setup.
|
||||
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
||||
subsA := srvA.NumSubscriptions()
|
||||
subsB := srvB.NumSubscriptions()
|
||||
if subsA != 254 || subsB != 254 {
|
||||
return fmt.Errorf("Not all subs processed yet: %d and %d", subsA, subsB)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
expected := int32(250)
|
||||
// Now send messages from each server
|
||||
for i := int32(0); i < expected; i++ {
|
||||
c := cons[i%2]
|
||||
c.Publish("foo", []byte("Don't Drop Me!"))
|
||||
}
|
||||
for _, c := range cons {
|
||||
c.Flush()
|
||||
}
|
||||
|
||||
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
||||
nbar := atomic.LoadInt32(&rbar)
|
||||
nbaz := atomic.LoadInt32(&rbaz)
|
||||
if nbar == expected && nbaz == expected {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
|
||||
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
|
||||
})
|
||||
}
|
||||
|
||||
func TestNoRaceClosedSlowConsumerWriteDeadline(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.WriteDeadline = 10 * time.Millisecond // Make very small to trip.
|
||||
opts.MaxPending = 500 * 1024 * 1024 // Set high so it will not trip here.
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
||||
t.Fatalf("Error sending protocols to server: %v", err)
|
||||
}
|
||||
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
||||
// for our subscribed client.
|
||||
c.(*net.TCPConn).SetReadBuffer(128)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
sender, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer sender.Close()
|
||||
|
||||
payload := make([]byte, 1024*1024)
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sender.Publish("foo", payload); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sender connection to ensure that all data has been sent.
|
||||
if err := sender.Flush(); err != nil {
|
||||
t.Fatalf("Error on flush: %v", err)
|
||||
}
|
||||
|
||||
// At this point server should have closed connection c.
|
||||
checkClosedConns(t, s, 1, 2*time.Second)
|
||||
conns := s.closedClients()
|
||||
if lc := len(conns); lc != 1 {
|
||||
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
|
||||
}
|
||||
checkReason(t, conns[0].Reason, SlowConsumerWriteDeadline)
|
||||
}
|
||||
|
||||
func TestNoRaceClosedSlowConsumerPendingBytes(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
|
||||
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
||||
t.Fatalf("Error sending protocols to server: %v", err)
|
||||
}
|
||||
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
||||
// for our subscribed client.
|
||||
c.(*net.TCPConn).SetReadBuffer(128)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
sender, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer sender.Close()
|
||||
|
||||
payload := make([]byte, 1024*1024)
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sender.Publish("foo", payload); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sender connection to ensure that all data has been sent.
|
||||
if err := sender.Flush(); err != nil {
|
||||
t.Fatalf("Error on flush: %v", err)
|
||||
}
|
||||
|
||||
// At this point server should have closed connection c.
|
||||
checkClosedConns(t, s, 1, 2*time.Second)
|
||||
conns := s.closedClients()
|
||||
if lc := len(conns); lc != 1 {
|
||||
t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc)
|
||||
}
|
||||
checkReason(t, conns[0].Reason, SlowConsumerPendingBytes)
|
||||
}
|
||||
|
||||
func TestNoRaceSlowConsumerPendingBytes(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
|
||||
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
||||
t.Fatalf("Error sending protocols to server: %v", err)
|
||||
}
|
||||
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
||||
// for our subscribed client.
|
||||
c.(*net.TCPConn).SetReadBuffer(128)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
sender, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer sender.Close()
|
||||
|
||||
payload := make([]byte, 1024*1024)
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sender.Publish("foo", payload); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sender connection to ensure that all data has been sent.
|
||||
if err := sender.Flush(); err != nil {
|
||||
t.Fatalf("Error on flush: %v", err)
|
||||
}
|
||||
|
||||
// At this point server should have closed connection c.
|
||||
|
||||
// On certain platforms, it may take more than one call before
|
||||
// getting the error.
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil {
|
||||
// ok
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatal("Connection should have been closed")
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -860,102 +859,6 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
|
||||
nc.Close()
|
||||
}
|
||||
|
||||
func TestRoutedQueueAutoUnsubscribe(t *testing.T) {
|
||||
optsA, _ := ProcessConfigFile("./configs/seed.conf")
|
||||
optsA.NoSigs, optsA.NoLog = true, true
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
srvARouteURL := fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, srvA.ClusterAddr().Port)
|
||||
optsB := nextServerOpts(optsA)
|
||||
optsB.Routes = RoutesFromStr(srvARouteURL)
|
||||
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
// Wait for these 2 to connect to each other
|
||||
checkClusterFormed(t, srvA, srvB)
|
||||
|
||||
// Have a client connection to each server
|
||||
ncA, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer ncA.Close()
|
||||
|
||||
ncB, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer ncB.Close()
|
||||
|
||||
rbar := int32(0)
|
||||
barCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbar, 1)
|
||||
}
|
||||
rbaz := int32(0)
|
||||
bazCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbaz, 1)
|
||||
}
|
||||
|
||||
// Create 125 queue subs with auto-unsubscribe to each server for
|
||||
// group bar and group baz. So 250 total per queue group.
|
||||
cons := []*nats.Conn{ncA, ncB}
|
||||
for _, c := range cons {
|
||||
for i := 0; i < 125; i++ {
|
||||
qsub, err := c.QueueSubscribe("foo", "bar", barCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
qsub, err = c.QueueSubscribe("foo", "baz", bazCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
}
|
||||
c.Subscribe("TEST.COMPLETE", func(m *nats.Msg) {})
|
||||
}
|
||||
|
||||
// We coelasce now so for each server we will have all local (250) plus
|
||||
// two from the remote side for each queue group. We also create one more
|
||||
// and will wait til each server has 254 subscriptions, that will make sure
|
||||
// that we have everything setup.
|
||||
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
||||
subsA := srvA.NumSubscriptions()
|
||||
subsB := srvB.NumSubscriptions()
|
||||
if subsA != 254 || subsB != 254 {
|
||||
return fmt.Errorf("Not all subs processed yet: %d and %d", subsA, subsB)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
expected := int32(250)
|
||||
// Now send messages from each server
|
||||
for i := int32(0); i < expected; i++ {
|
||||
c := cons[i%2]
|
||||
c.Publish("foo", []byte("Don't Drop Me!"))
|
||||
}
|
||||
for _, c := range cons {
|
||||
c.Flush()
|
||||
}
|
||||
|
||||
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
||||
nbar := atomic.LoadInt32(&rbar)
|
||||
nbaz := atomic.LoadInt32(&rbaz)
|
||||
if nbar == expected && nbaz == expected {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
|
||||
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
|
||||
})
|
||||
}
|
||||
|
||||
func TestRouteFailedConnRemovedFromTmpMap(t *testing.T) {
|
||||
optsA, _ := ProcessConfigFile("./configs/srv_a.conf")
|
||||
optsA.NoSigs, optsA.NoLog = true, true
|
||||
|
||||
@@ -509,57 +509,6 @@ func TestWriteDeadline(t *testing.T) {
|
||||
t.Fatal("Connection should have been closed")
|
||||
}
|
||||
|
||||
func TestSlowConsumerPendingBytes(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer.
|
||||
opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip.
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
|
||||
t.Fatalf("Error sending protocols to server: %v", err)
|
||||
}
|
||||
// Reduce socket buffer to increase reliability of data backing up in the server destined
|
||||
// for our subscribed client.
|
||||
c.(*net.TCPConn).SetReadBuffer(128)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
sender, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer sender.Close()
|
||||
|
||||
payload := make([]byte, 1024*1024)
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := sender.Publish("foo", payload); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sender connection to ensure that all data has been sent.
|
||||
if err := sender.Flush(); err != nil {
|
||||
t.Fatalf("Error on flush: %v", err)
|
||||
}
|
||||
|
||||
// At this point server should have closed connection c.
|
||||
|
||||
// On certain platforms, it may take more than one call before
|
||||
// getting the error.
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil {
|
||||
// ok
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatal("Connection should have been closed")
|
||||
}
|
||||
|
||||
func TestRandomPorts(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.HTTPPort = -1
|
||||
|
||||
@@ -25,10 +25,12 @@ import (
|
||||
)
|
||||
|
||||
// IMPORTANT: Tests in this file are not executed when running with the -race flag.
|
||||
// The test name should be prefixed with TestNoRace so we can run only
|
||||
// those tests: go test -run=TestNoRace ...
|
||||
|
||||
// As we look to improve high fanout situations make sure we
|
||||
// have a test that checks ordering for all subscriptions from a single subscriber.
|
||||
func TestHighFanoutOrdering(t *testing.T) {
|
||||
func TestNoRaceHighFanoutOrdering(t *testing.T) {
|
||||
opts := &server.Options{Host: "127.0.0.1", Port: server.RANDOM_PORT}
|
||||
|
||||
s := RunServer(opts)
|
||||
@@ -87,7 +89,7 @@ func TestHighFanoutOrdering(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestRouteFormTimeWithHighSubscriptions(t *testing.T) {
|
||||
func TestNoRaceRouteFormTimeWithHighSubscriptions(t *testing.T) {
|
||||
srvA, optsA := RunServerWithConfig("./configs/srv_a.conf")
|
||||
defer srvA.Shutdown()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user