Fix stop of consumer's delivery loop

I noticed that some consumer go routines were left running at the end
of the test suite.
It turns out that there was a race the way the consumer's qch was closed.
Since it was closed and then set to nil, it is possible that the go
routines that are started and then try to capture o.qch would actually
get qch==nil, wich then when doing a select on that nil channel would
block forever.

So we know pass the qch to the 2 go routines loopAndGatherMsgs() and
loopAndDeliverMsgs() so that when we close the channel there is
no risk of that race happening.

I do believe that there is still something that should be looked at:
it seems that a consumer's delivery loop can now be started/stopped
many times based on leadership acquired/lost. If that is the case,
I think that the consumer should wait for previous go routine to
complete before trying to start new ones.

Also moved 3 JetStream tests to the test/norace_test.go file because
they would consumer several GB of memory when running with the -race flag.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-01-19 16:49:33 -07:00
parent 51a321e739
commit c4a284b58f
3 changed files with 171 additions and 173 deletions

View File

@@ -653,12 +653,13 @@ func (o *Consumer) setLeader(isLeader bool) {
o.sendq = make(chan *jsPubMsg, msetSendQSize)
// Recreate quit channel.
o.qch = make(chan struct{})
qch := o.qch
o.mu.Unlock()
// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs()
go o.loopAndGatherMsgs(qch)
// Startup our deliver loop.
go o.loopAndDeliverMsgs()
go o.loopAndDeliverMsgs(qch)
} else {
// Shutdown the go routines and the subscriptions.
@@ -1114,9 +1115,9 @@ func (o *Consumer) writeState() {
}
// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes.
func (o *Consumer) loopAndDeliverMsgs() {
func (o *Consumer) loopAndDeliverMsgs(qch chan struct{}) {
o.mu.Lock()
qch, inch, sendq := o.qch, o.inch, o.sendq
inch, sendq := o.inch, o.sendq
s, acc := o.acc.srv, o.acc
o.mu.Unlock()
@@ -1695,7 +1696,7 @@ func (o *Consumer) checkWaitingForInterest() bool {
return o.waiting.len() > 0
}
func (o *Consumer) loopAndGatherMsgs() {
func (o *Consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a a reply situtation where replay policy is not instant.
var (
lts int64 // last time stamp seen, used for replay.
@@ -1763,7 +1764,6 @@ func (o *Consumer) loopAndGatherMsgs() {
// If we are in a replay scenario and have not caught up check if we need to delay here.
if o.replay && lts > 0 {
if delay = time.Duration(ts - lts); delay > time.Millisecond {
qch := o.qch
o.mu.Unlock()
select {
case <-qch:
@@ -1783,7 +1783,6 @@ func (o *Consumer) loopAndGatherMsgs() {
r := o.rlimit.ReserveN(now, len(msg)+len(hdr)+len(subj)+len(dsubj)+len(o.ackReplyT))
delay := r.DelayFrom(now)
if delay > 0 {
qch := o.qch
o.mu.Unlock()
select {
case <-qch:
@@ -1807,7 +1806,6 @@ func (o *Consumer) loopAndGatherMsgs() {
// We will wait here for new messages to arrive.
mch := o.mch
qch := o.qch
o.mu.Unlock()
select {

View File

@@ -3543,34 +3543,6 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {
}
}
func TestJetStreamDeleteStreamManyConsumers(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "MYS"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// This number needs to be higher than the internal sendq size to trigger what this test is testing.
for i := 0; i < 2000; i++ {
_, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: fmt.Sprintf("D-%d", i),
DeliverSubject: fmt.Sprintf("deliver.%d", i),
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
}
// With bug this would not return and would hang.
mset.Delete()
}
func TestJetStreamConsumerRateLimit(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
@@ -7575,143 +7547,6 @@ func TestJetStreamFilteredStreamNames(t *testing.T) {
expectStreams("*.22", []string{"S4", "S5"})
}
func TestJetStreamAPIStreamListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
// Create 2X limit
streamsNum := 2 * server.JSApiNamesLimit
for i := 1; i <= streamsNum; i++ {
name := fmt.Sprintf("STREAM-%06d", i)
cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage}
_, err := s.GlobalAccount().AddStream(&cfg)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset})
}
resp, err := nc.Request(server.JSApiStreams, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiStreamNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Streams) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams))
}
if listResponse.Total != streamsNum {
t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
if expectedLen < 1 {
return
}
// Make sure we get the right stream.
sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1)
if listResponse.Streams[0] != sname {
t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0])
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit)
checkResp(reqList(streamsNum), 0, streamsNum)
checkResp(reqList(streamsNum-22), 22, streamsNum-22)
checkResp(reqList(streamsNum+22), 0, streamsNum)
}
func TestJetStreamAPIConsumerListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
sname := "MYSTREAM"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
consumersNum := server.JSApiNamesLimit
for i := 1; i <= consumersNum; i++ {
dsubj := fmt.Sprintf("d.%d", i)
sub, _ := nc.SubscribeSync(dsubj)
defer sub.Unsubscribe()
nc.Flush()
_, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname)
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}})
}
resp, err := nc.Request(reqListSubject, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiConsumerNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Consumers) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers))
}
if listResponse.Total != consumersNum {
t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(consumersNum-22), 22, consumersNum-22)
checkResp(reqList(consumersNum+22), 0, consumersNum)
}
func TestJetStreamUpdateStream(t *testing.T) {
cases := []struct {
name string

View File

@@ -785,3 +785,168 @@ func TestNoRaceSlowProxy(t *testing.T) {
t.Fatalf("bps is off, target is %v, actual is %v", bwTarget, bps)
}
}
func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "MYS"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// This number needs to be higher than the internal sendq size to trigger what this test is testing.
for i := 0; i < 2000; i++ {
_, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: fmt.Sprintf("D-%d", i),
DeliverSubject: fmt.Sprintf("deliver.%d", i),
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
}
// With bug this would not return and would hang.
mset.Delete()
}
func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
// Create 2X limit
streamsNum := 2 * server.JSApiNamesLimit
for i := 1; i <= streamsNum; i++ {
name := fmt.Sprintf("STREAM-%06d", i)
cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage}
_, err := s.GlobalAccount().AddStream(&cfg)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset})
}
resp, err := nc.Request(server.JSApiStreams, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiStreamNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Streams) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams))
}
if listResponse.Total != streamsNum {
t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
if expectedLen < 1 {
return
}
// Make sure we get the right stream.
sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1)
if listResponse.Streams[0] != sname {
t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0])
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit)
checkResp(reqList(streamsNum), 0, streamsNum)
checkResp(reqList(streamsNum-22), 22, streamsNum-22)
checkResp(reqList(streamsNum+22), 0, streamsNum)
}
func TestNoRaceJetStreamAPIConsumerListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
sname := "MYSTREAM"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
consumersNum := server.JSApiNamesLimit
for i := 1; i <= consumersNum; i++ {
dsubj := fmt.Sprintf("d.%d", i)
sub, _ := nc.SubscribeSync(dsubj)
defer sub.Unsubscribe()
nc.Flush()
_, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname)
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}})
}
resp, err := nc.Request(reqListSubject, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiConsumerNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Consumers) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers))
}
if listResponse.Total != consumersNum {
t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(consumersNum-22), 22, consumersNum-22)
checkResp(reqList(consumersNum+22), 0, consumersNum)
}