diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 958c3547..23fa5311 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -548,11 +548,12 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { counts := make([]int32, numWorkers) var received int32 + rwg := &sync.WaitGroup{} + rwg.Add(numWorkers) + wg := &sync.WaitGroup{} wg.Add(numWorkers) - - dwg := &sync.WaitGroup{} - dwg.Add(numWorkers) + ch := make(chan bool) toSend := 1000 @@ -561,13 +562,12 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { defer nc.Close() go func(index int32) { - counter := &counts[index] - // Signal we are ready - wg.Done() - defer dwg.Done() + rwg.Done() + defer wg.Done() + <-ch - for { - if _, err := nc.Request(reqMsgSubj, nil, 50*time.Millisecond); err != nil { + for counter := &counts[index]; ; { + if _, err := nc.Request(reqMsgSubj, nil, 100*time.Millisecond); err != nil { return } atomic.AddInt32(counter, 1) @@ -578,8 +578,9 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { }(int32(i)) } - // Wait for requestors. - wg.Wait() + // Wait for requestors to be ready + rwg.Wait() + close(ch) sendSubj := "bar" for i := 0; i < toSend; i++ { @@ -588,7 +589,7 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { } // Wait for test to complete. - dwg.Wait() + wg.Wait() target := toSend / numWorkers delta := target / 3 @@ -596,7 +597,7 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) { for i := 0; i < numWorkers; i++ { if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high { - t.Fatalf("Messages received for worker too far off from target of %d, got %d", target, msgs) + t.Fatalf("Messages received for worker [%d] too far off from target of %d, got %d", i, target, msgs) } } }