Move load balance test to norace

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-09 20:20:42 -07:00
parent 4698292d4f
commit 16e6952cd6
2 changed files with 87 additions and 88 deletions

View File

@@ -18,8 +18,6 @@ import (
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@@ -511,92 +509,6 @@ func TestJetStreamBasicWorkQueue(t *testing.T) {
}
}
func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mname := "MY_MSG_SET"
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
// Create basic work queue mode observable.
oname := "WQ"
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
// To send messages.
nc := clientConnectToServer(t, s)
defer nc.Close()
// For normal work queue semantics, you send requests to the subject with message set and observable name.
reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname)
numWorkers := 25
counts := make([]int32, numWorkers)
var received int32
rwg := &sync.WaitGroup{}
rwg.Add(numWorkers)
wg := &sync.WaitGroup{}
wg.Add(numWorkers)
ch := make(chan bool)
toSend := 1000
for i := 0; i < numWorkers; i++ {
nc := clientConnectToServer(t, s)
defer nc.Close()
go func(index int32) {
rwg.Done()
defer wg.Done()
<-ch
for counter := &counts[index]; ; {
m, err := nc.Request(reqMsgSubj, nil, 100*time.Millisecond)
if err != nil {
return
}
m.Respond(nil)
atomic.AddInt32(counter, 1)
if total := atomic.AddInt32(&received, 1); total >= int32(toSend) {
return
}
}
}(int32(i))
}
// Wait for requestors to be ready
rwg.Wait()
close(ch)
sendSubj := "bar"
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
// Wait for test to complete.
wg.Wait()
target := toSend / numWorkers
delta := target / 3
low, high := int32(target-delta), int32(target+delta)
for i := 0; i < numWorkers; i++ {
if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high {
t.Fatalf("Messages received for worker [%d] too far off from target of %d, got %d", i, target, msgs)
}
}
}
func TestJetStreamPartitioning(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

View File

@@ -27,6 +27,7 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@@ -534,3 +535,89 @@ func TestNoRaceClusterLeaksSubscriptions(t *testing.T) {
return nil
})
}
func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mname := "MY_MSG_SET"
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname, Subjects: []string{"foo", "bar"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
// Create basic work queue mode observable.
oname := "WQ"
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, DeliverAll: true})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
// To send messages.
nc := clientConnectToServer(t, s)
defer nc.Close()
// For normal work queue semantics, you send requests to the subject with message set and observable name.
reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname)
numWorkers := 25
counts := make([]int32, numWorkers)
var received int32
rwg := &sync.WaitGroup{}
rwg.Add(numWorkers)
wg := &sync.WaitGroup{}
wg.Add(numWorkers)
ch := make(chan bool)
toSend := 1000
for i := 0; i < numWorkers; i++ {
nc := clientConnectToServer(t, s)
defer nc.Close()
go func(index int32) {
rwg.Done()
defer wg.Done()
<-ch
for counter := &counts[index]; ; {
m, err := nc.Request(reqMsgSubj, nil, 100*time.Millisecond)
if err != nil {
return
}
m.Respond(nil)
atomic.AddInt32(counter, 1)
if total := atomic.AddInt32(&received, 1); total >= int32(toSend) {
return
}
}
}(int32(i))
}
// Wait for requestors to be ready
rwg.Wait()
close(ch)
sendSubj := "bar"
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
// Wait for test to complete.
wg.Wait()
target := toSend / numWorkers
delta := target / 3
low, high := int32(target-delta), int32(target+delta)
for i := 0; i < numWorkers; i++ {
if msgs := atomic.LoadInt32(&counts[i]); msgs < low || msgs > high {
t.Fatalf("Messages received for worker [%d] too far off from target of %d, got %d", i, target, msgs)
}
}
}