From c4a284b58fa91eb20f4f1049373b9e99898f9a63 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 19 Jan 2021 16:49:33 -0700 Subject: [PATCH 1/2] Fix stop of consumer's delivery loop I noticed that some consumer go routines were left running at the end of the test suite. It turns out that there was a race the way the consumer's qch was closed. Since it was closed and then set to nil, it is possible that the go routines that are started and then try to capture o.qch would actually get qch==nil, wich then when doing a select on that nil channel would block forever. So we know pass the qch to the 2 go routines loopAndGatherMsgs() and loopAndDeliverMsgs() so that when we close the channel there is no risk of that race happening. I do believe that there is still something that should be looked at: it seems that a consumer's delivery loop can now be started/stopped many times based on leadership acquired/lost. If that is the case, I think that the consumer should wait for previous go routine to complete before trying to start new ones. Also moved 3 JetStream tests to the test/norace_test.go file because they would consumer several GB of memory when running with the -race flag. Signed-off-by: Ivan Kozlovic --- server/consumer.go | 14 ++-- test/jetstream_test.go | 165 ----------------------------------------- test/norace_test.go | 165 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 173 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 6709e606..503e82dd 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -653,12 +653,13 @@ func (o *Consumer) setLeader(isLeader bool) { o.sendq = make(chan *jsPubMsg, msetSendQSize) // Recreate quit channel. o.qch = make(chan struct{}) + qch := o.qch o.mu.Unlock() // Now start up Go routine to deliver msgs. - go o.loopAndGatherMsgs() + go o.loopAndGatherMsgs(qch) // Startup our deliver loop. - go o.loopAndDeliverMsgs() + go o.loopAndDeliverMsgs(qch) } else { // Shutdown the go routines and the subscriptions. @@ -1114,9 +1115,9 @@ func (o *Consumer) writeState() { } // loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes. -func (o *Consumer) loopAndDeliverMsgs() { +func (o *Consumer) loopAndDeliverMsgs(qch chan struct{}) { o.mu.Lock() - qch, inch, sendq := o.qch, o.inch, o.sendq + inch, sendq := o.inch, o.sendq s, acc := o.acc.srv, o.acc o.mu.Unlock() @@ -1695,7 +1696,7 @@ func (o *Consumer) checkWaitingForInterest() bool { return o.waiting.len() > 0 } -func (o *Consumer) loopAndGatherMsgs() { +func (o *Consumer) loopAndGatherMsgs(qch chan struct{}) { // On startup check to see if we are in a a reply situtation where replay policy is not instant. var ( lts int64 // last time stamp seen, used for replay. @@ -1763,7 +1764,6 @@ func (o *Consumer) loopAndGatherMsgs() { // If we are in a replay scenario and have not caught up check if we need to delay here. if o.replay && lts > 0 { if delay = time.Duration(ts - lts); delay > time.Millisecond { - qch := o.qch o.mu.Unlock() select { case <-qch: @@ -1783,7 +1783,6 @@ func (o *Consumer) loopAndGatherMsgs() { r := o.rlimit.ReserveN(now, len(msg)+len(hdr)+len(subj)+len(dsubj)+len(o.ackReplyT)) delay := r.DelayFrom(now) if delay > 0 { - qch := o.qch o.mu.Unlock() select { case <-qch: @@ -1807,7 +1806,6 @@ func (o *Consumer) loopAndGatherMsgs() { // We will wait here for new messages to arrive. mch := o.mch - qch := o.qch o.mu.Unlock() select { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index f4e2ef38..469fd5b4 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -3543,34 +3543,6 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) { } } -func TestJetStreamDeleteStreamManyConsumers(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - mname := "MYS" - mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage}) - if err != nil { - t.Fatalf("Unexpected error adding stream: %v", err) - } - - // This number needs to be higher than the internal sendq size to trigger what this test is testing. - for i := 0; i < 2000; i++ { - _, err := mset.AddConsumer(&server.ConsumerConfig{ - Durable: fmt.Sprintf("D-%d", i), - DeliverSubject: fmt.Sprintf("deliver.%d", i), - }) - if err != nil { - t.Fatalf("Error creating consumer: %v", err) - } - } - // With bug this would not return and would hang. - mset.Delete() -} - func TestJetStreamConsumerRateLimit(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -7575,143 +7547,6 @@ func TestJetStreamFilteredStreamNames(t *testing.T) { expectStreams("*.22", []string{"S4", "S5"}) } -func TestJetStreamAPIStreamListPaging(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - // Create 2X limit - streamsNum := 2 * server.JSApiNamesLimit - for i := 1; i <= streamsNum; i++ { - name := fmt.Sprintf("STREAM-%06d", i) - cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage} - _, err := s.GlobalAccount().AddStream(&cfg) - if err != nil { - t.Fatalf("Unexpected error adding stream: %v", err) - } - } - - // Client for API requests. - nc := clientConnectToServer(t, s) - defer nc.Close() - - reqList := func(offset int) []byte { - t.Helper() - var req []byte - if offset > 0 { - req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset}) - } - resp, err := nc.Request(server.JSApiStreams, req, time.Second) - if err != nil { - t.Fatalf("Unexpected error getting stream list: %v", err) - } - return resp.Data - } - - checkResp := func(resp []byte, expectedLen, expectedOffset int) { - t.Helper() - var listResponse server.JSApiStreamNamesResponse - if err := json.Unmarshal(resp, &listResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(listResponse.Streams) != expectedLen { - t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams)) - } - if listResponse.Total != streamsNum { - t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total) - } - if listResponse.Offset != expectedOffset { - t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) - } - if expectedLen < 1 { - return - } - // Make sure we get the right stream. - sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1) - if listResponse.Streams[0] != sname { - t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0]) - } - } - - checkResp(reqList(0), server.JSApiNamesLimit, 0) - checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit) - checkResp(reqList(streamsNum), 0, streamsNum) - checkResp(reqList(streamsNum-22), 22, streamsNum-22) - checkResp(reqList(streamsNum+22), 0, streamsNum) -} - -func TestJetStreamAPIConsumerListPaging(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - sname := "MYSTREAM" - mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname}) - if err != nil { - t.Fatalf("Unexpected error adding stream: %v", err) - } - - // Client for API requests. - nc := clientConnectToServer(t, s) - defer nc.Close() - - consumersNum := server.JSApiNamesLimit - for i := 1; i <= consumersNum; i++ { - dsubj := fmt.Sprintf("d.%d", i) - sub, _ := nc.SubscribeSync(dsubj) - defer sub.Unsubscribe() - nc.Flush() - - _, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - } - - reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname) - reqList := func(offset int) []byte { - t.Helper() - var req []byte - if offset > 0 { - req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}}) - } - resp, err := nc.Request(reqListSubject, req, time.Second) - if err != nil { - t.Fatalf("Unexpected error getting stream list: %v", err) - } - return resp.Data - } - - checkResp := func(resp []byte, expectedLen, expectedOffset int) { - t.Helper() - var listResponse server.JSApiConsumerNamesResponse - if err := json.Unmarshal(resp, &listResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(listResponse.Consumers) != expectedLen { - t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers)) - } - if listResponse.Total != consumersNum { - t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total) - } - if listResponse.Offset != expectedOffset { - t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) - } - } - - checkResp(reqList(0), server.JSApiNamesLimit, 0) - checkResp(reqList(consumersNum-22), 22, consumersNum-22) - checkResp(reqList(consumersNum+22), 0, consumersNum) -} - func TestJetStreamUpdateStream(t *testing.T) { cases := []struct { name string diff --git a/test/norace_test.go b/test/norace_test.go index 32b734b4..6688182a 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -785,3 +785,168 @@ func TestNoRaceSlowProxy(t *testing.T) { t.Fatalf("bps is off, target is %v, actual is %v", bwTarget, bps) } } + +func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mname := "MYS" + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + + // This number needs to be higher than the internal sendq size to trigger what this test is testing. + for i := 0; i < 2000; i++ { + _, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: fmt.Sprintf("D-%d", i), + DeliverSubject: fmt.Sprintf("deliver.%d", i), + }) + if err != nil { + t.Fatalf("Error creating consumer: %v", err) + } + } + // With bug this would not return and would hang. + mset.Delete() +} + +func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + // Forced cleanup of all persisted state. + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + // Create 2X limit + streamsNum := 2 * server.JSApiNamesLimit + for i := 1; i <= streamsNum; i++ { + name := fmt.Sprintf("STREAM-%06d", i) + cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage} + _, err := s.GlobalAccount().AddStream(&cfg) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + } + + // Client for API requests. + nc := clientConnectToServer(t, s) + defer nc.Close() + + reqList := func(offset int) []byte { + t.Helper() + var req []byte + if offset > 0 { + req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset}) + } + resp, err := nc.Request(server.JSApiStreams, req, time.Second) + if err != nil { + t.Fatalf("Unexpected error getting stream list: %v", err) + } + return resp.Data + } + + checkResp := func(resp []byte, expectedLen, expectedOffset int) { + t.Helper() + var listResponse server.JSApiStreamNamesResponse + if err := json.Unmarshal(resp, &listResponse); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(listResponse.Streams) != expectedLen { + t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams)) + } + if listResponse.Total != streamsNum { + t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total) + } + if listResponse.Offset != expectedOffset { + t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) + } + if expectedLen < 1 { + return + } + // Make sure we get the right stream. + sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1) + if listResponse.Streams[0] != sname { + t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0]) + } + } + + checkResp(reqList(0), server.JSApiNamesLimit, 0) + checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit) + checkResp(reqList(streamsNum), 0, streamsNum) + checkResp(reqList(streamsNum-22), 22, streamsNum-22) + checkResp(reqList(streamsNum+22), 0, streamsNum) +} + +func TestNoRaceJetStreamAPIConsumerListPaging(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + // Forced cleanup of all persisted state. + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + sname := "MYSTREAM" + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + + // Client for API requests. + nc := clientConnectToServer(t, s) + defer nc.Close() + + consumersNum := server.JSApiNamesLimit + for i := 1; i <= consumersNum; i++ { + dsubj := fmt.Sprintf("d.%d", i) + sub, _ := nc.SubscribeSync(dsubj) + defer sub.Unsubscribe() + nc.Flush() + + _, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname) + reqList := func(offset int) []byte { + t.Helper() + var req []byte + if offset > 0 { + req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}}) + } + resp, err := nc.Request(reqListSubject, req, time.Second) + if err != nil { + t.Fatalf("Unexpected error getting stream list: %v", err) + } + return resp.Data + } + + checkResp := func(resp []byte, expectedLen, expectedOffset int) { + t.Helper() + var listResponse server.JSApiConsumerNamesResponse + if err := json.Unmarshal(resp, &listResponse); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(listResponse.Consumers) != expectedLen { + t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers)) + } + if listResponse.Total != consumersNum { + t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total) + } + if listResponse.Offset != expectedOffset { + t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) + } + } + + checkResp(reqList(0), server.JSApiNamesLimit, 0) + checkResp(reqList(consumersNum-22), 22, consumersNum-22) + checkResp(reqList(consumersNum+22), 0, consumersNum) +} From a1f011747442667c10f872dab4a9400891a1ca80 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 20 Jan 2021 10:05:15 -0700 Subject: [PATCH 2/2] Fixed consumer sending to nil channel on shutdown/leader change. Signed-off-by: Ivan Kozlovic --- server/consumer.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 503e82dd..531227a8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1490,16 +1490,17 @@ func (wq *waitQueue) pop() *waitingRequest { func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string, msg []byte) { o.mu.Lock() mset := o.mset - if mset == nil || o.isPushMode() { + if mset == nil || o.isPushMode() || o.sendq == nil { o.mu.Unlock() return } sendErr := func(status int, description string) { + sendq := o.sendq o.mu.Unlock() hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description)) pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0} - o.sendq <- pmsg // Send message. + sendq <- pmsg // Send message. } if o.waiting.isFull() { @@ -1660,7 +1661,7 @@ func (o *Consumer) forceExpireFirstWaiting() *waitingRequest { return wr } // If we are expiring this and we think there is still interest, alert. - if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil { + if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil { // We still appear to have interest, so send alert as courtesy. hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n") pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0} @@ -1874,7 +1875,7 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { - if o.mset == nil { + if o.mset == nil || o.sendq == nil { return } // Update pending on first attempt @@ -1895,9 +1896,10 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 ap := o.config.AckPolicy // This needs to be unlocked since the other side may need this lock on a failed delivery. + sendq := o.sendq o.mu.Unlock() // Send message. - o.sendq <- pmsg + sendq <- pmsg // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) { mset.store.RemoveMsg(seq)