mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Use atomic.Load to access fields used in /varz and /subsz requests.
* Includes a unit test that checks all endpoints for data races.
This commit is contained in:
@@ -13,6 +13,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/nats-io/gnatsd/server"
|
||||
"github.com/nats-io/go-nats"
|
||||
)
|
||||
@@ -30,6 +33,32 @@ func runMonitorServer() *server.Server {
|
||||
return RunServer(&opts)
|
||||
}
|
||||
|
||||
// Runs a clustered pair of monitor servers for testing the /routez endpoint
|
||||
func runMonitorServerClusteredPair() (*server.Server, *server.Server) {
|
||||
resetPreviousHTTPConnections()
|
||||
opts := DefaultTestOptions
|
||||
opts.Port = CLIENT_PORT
|
||||
opts.HTTPPort = MONITOR_PORT
|
||||
opts.HTTPHost = "localhost"
|
||||
opts.Cluster.Host = "127.0.0.1"
|
||||
opts.Cluster.Port = 10223
|
||||
opts.Routes = server.RoutesFromStr("nats-route://127.0.0.1:10222")
|
||||
|
||||
s1 := RunServer(&opts)
|
||||
|
||||
opts2 := DefaultTestOptions
|
||||
opts2.Port = CLIENT_PORT + 1
|
||||
opts2.HTTPPort = MONITOR_PORT + 1
|
||||
opts2.HTTPHost = "localhost"
|
||||
opts2.Cluster.Host = "127.0.0.1"
|
||||
opts2.Cluster.Port = 10222
|
||||
opts2.Routes = server.RoutesFromStr("nats-route://127.0.0.1:10223")
|
||||
|
||||
s2 := RunServer(&opts2)
|
||||
|
||||
return s1, s2
|
||||
}
|
||||
|
||||
func runMonitorServerNoHTTPPort() *server.Server {
|
||||
resetPreviousHTTPConnections()
|
||||
opts := DefaultTestOptions
|
||||
@@ -60,6 +89,71 @@ func TestNoMonitorPort(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// testEndpointDataRace tests a monitoring endpoint for data races by polling
|
||||
// while client code acts to ensure statistics are updated. It is designed to
|
||||
// run under the -race flag to catch violations. The caller must start the
|
||||
// NATS server.
|
||||
func testEndpointDataRace(endpoint string, t *testing.T) {
|
||||
var doneWg sync.WaitGroup
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
|
||||
// Poll as fast as we can, while creating connections, publishing,
|
||||
// and subscribing.
|
||||
clientDone := int64(0)
|
||||
doneWg.Add(1)
|
||||
go func() {
|
||||
for atomic.LoadInt64(&clientDone) == 0 {
|
||||
resp, err := http.Get(url + endpoint)
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error: Got %v\n", err)
|
||||
} else {
|
||||
resp.Body.Close()
|
||||
}
|
||||
}
|
||||
doneWg.Done()
|
||||
}()
|
||||
|
||||
// create connections, subscriptions, and publish messages to
|
||||
// update the monitor variables.
|
||||
var conns []net.Conn
|
||||
for i := 0; i < 50; i++ {
|
||||
cl := createClientConnSubscribeAndPublish(t)
|
||||
// keep a few connections around to test monitor variables.
|
||||
if i%10 == 0 {
|
||||
conns = append(conns, cl)
|
||||
} else {
|
||||
cl.Close()
|
||||
}
|
||||
}
|
||||
atomic.AddInt64(&clientDone, 1)
|
||||
|
||||
// wait for the endpoint polling goroutine to exit
|
||||
doneWg.Wait()
|
||||
|
||||
// cleanup the conns
|
||||
for _, cl := range conns {
|
||||
cl.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointDataRaces(t *testing.T) {
|
||||
// setup a small cluster to test /routez
|
||||
s1, s2 := runMonitorServerClusteredPair()
|
||||
defer s1.Shutdown()
|
||||
defer s2.Shutdown()
|
||||
|
||||
// give some time for a route to form
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// test all of our endpoints
|
||||
testEndpointDataRace("varz", t)
|
||||
testEndpointDataRace("connz", t)
|
||||
testEndpointDataRace("routez", t)
|
||||
testEndpointDataRace("subsz", t)
|
||||
testEndpointDataRace("stacksz", t)
|
||||
}
|
||||
|
||||
func TestVarz(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user