Reworked load balance test

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-09 07:20:41 -07:00
parent cd3c1c7a3f
commit ea85d0130f

View File

@@ -548,11 +548,12 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
counts := make([]int32, numWorkers)
var received int32
rwg := &sync.WaitGroup{}
rwg.Add(numWorkers)
wg := &sync.WaitGroup{}
wg.Add(numWorkers)
dwg := &sync.WaitGroup{}
dwg.Add(numWorkers)
ch := make(chan bool)
toSend := 1000
@@ -561,13 +562,12 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
defer nc.Close()
go func(index int32) {
counter := &counts[index]
// Signal we are ready
wg.Done()
defer dwg.Done()
rwg.Done()
defer wg.Done()
<-ch
for {
if _, err := nc.Request(reqMsgSubj, nil, 50*time.Millisecond); err != nil {
for counter := &counts[index]; ; {
if _, err := nc.Request(reqMsgSubj, nil, 100*time.Millisecond); err != nil {
return
}
atomic.AddInt32(counter, 1)
@@ -578,8 +578,9 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
}(int32(i))
}
// Wait for requestors.
wg.Wait()
// Wait for requestors to be ready
rwg.Wait()
close(ch)
sendSubj := "bar"
for i := 0; i < toSend; i++ {
@@ -588,7 +589,7 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
}
// Wait for test to complete.
dwg.Wait()
wg.Wait()
target := toSend / numWorkers
delta := target / 3
@@ -596,7 +597,7 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
for i := 0; i < numWorkers; i++ {
if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high {
t.Fatalf("Messages received for worker too far off from target of %d, got %d", target, msgs)
t.Fatalf("Messages received for worker [%d] too far off from target of %d, got %d", i, target, msgs)
}
}
}