diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 11e3426b..73a85059 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -18,8 +18,6 @@ import ( "os" "path/filepath" "strconv" - "sync" - "sync/atomic" "testing" "time" @@ -511,92 +509,6 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { } } -func TestJetStreamWorkQueueLoadBalance(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - mname := "MY_MSG_SET" - mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}}) - if err != nil { - t.Fatalf("Unexpected error adding message set: %v", err) - } - defer s.JetStreamDeleteMsgSet(mset) - - // Create basic work queue mode observable. - oname := "WQ" - o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true}) - if err != nil { - t.Fatalf("Expected no error with registered interest, got %v", err) - } - defer o.Delete() - - // To send messages. - nc := clientConnectToServer(t, s) - defer nc.Close() - - // For normal work queue semantics, you send requests to the subject with message set and observable name. - reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname) - - numWorkers := 25 - counts := make([]int32, numWorkers) - var received int32 - - rwg := &sync.WaitGroup{} - rwg.Add(numWorkers) - - wg := &sync.WaitGroup{} - wg.Add(numWorkers) - ch := make(chan bool) - - toSend := 1000 - - for i := 0; i < numWorkers; i++ { - nc := clientConnectToServer(t, s) - defer nc.Close() - - go func(index int32) { - rwg.Done() - defer wg.Done() - <-ch - - for counter := &counts[index]; ; { - m, err := nc.Request(reqMsgSubj, nil, 100*time.Millisecond) - if err != nil { - return - } - m.Respond(nil) - atomic.AddInt32(counter, 1) - if total := atomic.AddInt32(&received, 1); total >= int32(toSend) { - return - } - } - }(int32(i)) - } - - // Wait for requestors to be ready - rwg.Wait() - close(ch) - - sendSubj := "bar" - for i := 0; i < toSend; i++ { - resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) - expectOKResponse(t, resp) - } - - // Wait for test to complete. - wg.Wait() - - target := toSend / numWorkers - delta := target / 3 - low, high := int32(target-delta), int32(target+delta) - - for i := 0; i < numWorkers; i++ { - if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high { - t.Fatalf("Messages received for worker [%d] too far off from target of %d, got %d", i, target, msgs) - } - } -} - func TestJetStreamPartitioning(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() diff --git a/test/norace_test.go b/test/norace_test.go index bd1453fb..3080a2df 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -27,6 +27,7 @@ import ( "runtime" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -534,3 +535,89 @@ func TestNoRaceClusterLeaksSubscriptions(t *testing.T) { return nil }) } + +func TestJetStreamWorkQueueLoadBalance(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mname := "MY_MSG_SET" + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + // Create basic work queue mode observable. + oname := "WQ" + o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + // To send messages. + nc := clientConnectToServer(t, s) + defer nc.Close() + + // For normal work queue semantics, you send requests to the subject with message set and observable name. + reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname) + + numWorkers := 25 + counts := make([]int32, numWorkers) + var received int32 + + rwg := &sync.WaitGroup{} + rwg.Add(numWorkers) + + wg := &sync.WaitGroup{} + wg.Add(numWorkers) + ch := make(chan bool) + + toSend := 1000 + + for i := 0; i < numWorkers; i++ { + nc := clientConnectToServer(t, s) + defer nc.Close() + + go func(index int32) { + rwg.Done() + defer wg.Done() + <-ch + + for counter := &counts[index]; ; { + m, err := nc.Request(reqMsgSubj, nil, 100*time.Millisecond) + if err != nil { + return + } + m.Respond(nil) + atomic.AddInt32(counter, 1) + if total := atomic.AddInt32(&received, 1); total >= int32(toSend) { + return + } + } + }(int32(i)) + } + + // Wait for requestors to be ready + rwg.Wait() + close(ch) + + sendSubj := "bar" + for i := 0; i < toSend; i++ { + resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + + // Wait for test to complete. + wg.Wait() + + target := toSend / numWorkers + delta := target / 3 + low, high := int32(target-delta), int32(target+delta) + + for i := 0; i < numWorkers; i++ { + if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high { + t.Fatalf("Messages received for worker [%d] too far off from target of %d, got %d", i, target, msgs) + } + } +}