diff --git a/server/consumer.go b/server/consumer.go index 6709e606..531227a8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -653,12 +653,13 @@ func (o *Consumer) setLeader(isLeader bool) { o.sendq = make(chan *jsPubMsg, msetSendQSize) // Recreate quit channel. o.qch = make(chan struct{}) + qch := o.qch o.mu.Unlock() // Now start up Go routine to deliver msgs. - go o.loopAndGatherMsgs() + go o.loopAndGatherMsgs(qch) // Startup our deliver loop. - go o.loopAndDeliverMsgs() + go o.loopAndDeliverMsgs(qch) } else { // Shutdown the go routines and the subscriptions. @@ -1114,9 +1115,9 @@ func (o *Consumer) writeState() { } // loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes. -func (o *Consumer) loopAndDeliverMsgs() { +func (o *Consumer) loopAndDeliverMsgs(qch chan struct{}) { o.mu.Lock() - qch, inch, sendq := o.qch, o.inch, o.sendq + inch, sendq := o.inch, o.sendq s, acc := o.acc.srv, o.acc o.mu.Unlock() @@ -1489,16 +1490,17 @@ func (wq *waitQueue) pop() *waitingRequest { func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string, msg []byte) { o.mu.Lock() mset := o.mset - if mset == nil || o.isPushMode() { + if mset == nil || o.isPushMode() || o.sendq == nil { o.mu.Unlock() return } sendErr := func(status int, description string) { + sendq := o.sendq o.mu.Unlock() hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description)) pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0} - o.sendq <- pmsg // Send message. + sendq <- pmsg // Send message. } if o.waiting.isFull() { @@ -1659,7 +1661,7 @@ func (o *Consumer) forceExpireFirstWaiting() *waitingRequest { return wr } // If we are expiring this and we think there is still interest, alert. - if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil { + if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil { // We still appear to have interest, so send alert as courtesy. hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n") pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0} @@ -1695,7 +1697,7 @@ func (o *Consumer) checkWaitingForInterest() bool { return o.waiting.len() > 0 } -func (o *Consumer) loopAndGatherMsgs() { +func (o *Consumer) loopAndGatherMsgs(qch chan struct{}) { // On startup check to see if we are in a a reply situtation where replay policy is not instant. var ( lts int64 // last time stamp seen, used for replay. @@ -1763,7 +1765,6 @@ func (o *Consumer) loopAndGatherMsgs() { // If we are in a replay scenario and have not caught up check if we need to delay here. if o.replay && lts > 0 { if delay = time.Duration(ts - lts); delay > time.Millisecond { - qch := o.qch o.mu.Unlock() select { case <-qch: @@ -1783,7 +1784,6 @@ func (o *Consumer) loopAndGatherMsgs() { r := o.rlimit.ReserveN(now, len(msg)+len(hdr)+len(subj)+len(dsubj)+len(o.ackReplyT)) delay := r.DelayFrom(now) if delay > 0 { - qch := o.qch o.mu.Unlock() select { case <-qch: @@ -1807,7 +1807,6 @@ func (o *Consumer) loopAndGatherMsgs() { // We will wait here for new messages to arrive. mch := o.mch - qch := o.qch o.mu.Unlock() select { @@ -1876,7 +1875,7 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { - if o.mset == nil { + if o.mset == nil || o.sendq == nil { return } // Update pending on first attempt @@ -1897,9 +1896,10 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 ap := o.config.AckPolicy // This needs to be unlocked since the other side may need this lock on a failed delivery. + sendq := o.sendq o.mu.Unlock() // Send message. - o.sendq <- pmsg + sendq <- pmsg // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) { mset.store.RemoveMsg(seq) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index f4e2ef38..469fd5b4 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -3543,34 +3543,6 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) { } } -func TestJetStreamDeleteStreamManyConsumers(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - mname := "MYS" - mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage}) - if err != nil { - t.Fatalf("Unexpected error adding stream: %v", err) - } - - // This number needs to be higher than the internal sendq size to trigger what this test is testing. - for i := 0; i < 2000; i++ { - _, err := mset.AddConsumer(&server.ConsumerConfig{ - Durable: fmt.Sprintf("D-%d", i), - DeliverSubject: fmt.Sprintf("deliver.%d", i), - }) - if err != nil { - t.Fatalf("Error creating consumer: %v", err) - } - } - // With bug this would not return and would hang. - mset.Delete() -} - func TestJetStreamConsumerRateLimit(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -7575,143 +7547,6 @@ func TestJetStreamFilteredStreamNames(t *testing.T) { expectStreams("*.22", []string{"S4", "S5"}) } -func TestJetStreamAPIStreamListPaging(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - // Create 2X limit - streamsNum := 2 * server.JSApiNamesLimit - for i := 1; i <= streamsNum; i++ { - name := fmt.Sprintf("STREAM-%06d", i) - cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage} - _, err := s.GlobalAccount().AddStream(&cfg) - if err != nil { - t.Fatalf("Unexpected error adding stream: %v", err) - } - } - - // Client for API requests. - nc := clientConnectToServer(t, s) - defer nc.Close() - - reqList := func(offset int) []byte { - t.Helper() - var req []byte - if offset > 0 { - req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset}) - } - resp, err := nc.Request(server.JSApiStreams, req, time.Second) - if err != nil { - t.Fatalf("Unexpected error getting stream list: %v", err) - } - return resp.Data - } - - checkResp := func(resp []byte, expectedLen, expectedOffset int) { - t.Helper() - var listResponse server.JSApiStreamNamesResponse - if err := json.Unmarshal(resp, &listResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(listResponse.Streams) != expectedLen { - t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams)) - } - if listResponse.Total != streamsNum { - t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total) - } - if listResponse.Offset != expectedOffset { - t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) - } - if expectedLen < 1 { - return - } - // Make sure we get the right stream. - sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1) - if listResponse.Streams[0] != sname { - t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0]) - } - } - - checkResp(reqList(0), server.JSApiNamesLimit, 0) - checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit) - checkResp(reqList(streamsNum), 0, streamsNum) - checkResp(reqList(streamsNum-22), 22, streamsNum-22) - checkResp(reqList(streamsNum+22), 0, streamsNum) -} - -func TestJetStreamAPIConsumerListPaging(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. - if config := s.JetStreamConfig(); config != nil { - defer os.RemoveAll(config.StoreDir) - } - - sname := "MYSTREAM" - mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname}) - if err != nil { - t.Fatalf("Unexpected error adding stream: %v", err) - } - - // Client for API requests. - nc := clientConnectToServer(t, s) - defer nc.Close() - - consumersNum := server.JSApiNamesLimit - for i := 1; i <= consumersNum; i++ { - dsubj := fmt.Sprintf("d.%d", i) - sub, _ := nc.SubscribeSync(dsubj) - defer sub.Unsubscribe() - nc.Flush() - - _, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - } - - reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname) - reqList := func(offset int) []byte { - t.Helper() - var req []byte - if offset > 0 { - req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}}) - } - resp, err := nc.Request(reqListSubject, req, time.Second) - if err != nil { - t.Fatalf("Unexpected error getting stream list: %v", err) - } - return resp.Data - } - - checkResp := func(resp []byte, expectedLen, expectedOffset int) { - t.Helper() - var listResponse server.JSApiConsumerNamesResponse - if err := json.Unmarshal(resp, &listResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(listResponse.Consumers) != expectedLen { - t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers)) - } - if listResponse.Total != consumersNum { - t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total) - } - if listResponse.Offset != expectedOffset { - t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) - } - } - - checkResp(reqList(0), server.JSApiNamesLimit, 0) - checkResp(reqList(consumersNum-22), 22, consumersNum-22) - checkResp(reqList(consumersNum+22), 0, consumersNum) -} - func TestJetStreamUpdateStream(t *testing.T) { cases := []struct { name string diff --git a/test/norace_test.go b/test/norace_test.go index 5988bd58..0ef346d0 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -788,3 +788,168 @@ func TestNoRaceSlowProxy(t *testing.T) { t.Fatalf("bps is off, target is %v, actual is %v", bwTarget, bps) } } + +func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mname := "MYS" + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + + // This number needs to be higher than the internal sendq size to trigger what this test is testing. + for i := 0; i < 2000; i++ { + _, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: fmt.Sprintf("D-%d", i), + DeliverSubject: fmt.Sprintf("deliver.%d", i), + }) + if err != nil { + t.Fatalf("Error creating consumer: %v", err) + } + } + // With bug this would not return and would hang. + mset.Delete() +} + +func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + // Forced cleanup of all persisted state. + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + // Create 2X limit + streamsNum := 2 * server.JSApiNamesLimit + for i := 1; i <= streamsNum; i++ { + name := fmt.Sprintf("STREAM-%06d", i) + cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage} + _, err := s.GlobalAccount().AddStream(&cfg) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + } + + // Client for API requests. + nc := clientConnectToServer(t, s) + defer nc.Close() + + reqList := func(offset int) []byte { + t.Helper() + var req []byte + if offset > 0 { + req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset}) + } + resp, err := nc.Request(server.JSApiStreams, req, time.Second) + if err != nil { + t.Fatalf("Unexpected error getting stream list: %v", err) + } + return resp.Data + } + + checkResp := func(resp []byte, expectedLen, expectedOffset int) { + t.Helper() + var listResponse server.JSApiStreamNamesResponse + if err := json.Unmarshal(resp, &listResponse); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(listResponse.Streams) != expectedLen { + t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams)) + } + if listResponse.Total != streamsNum { + t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total) + } + if listResponse.Offset != expectedOffset { + t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) + } + if expectedLen < 1 { + return + } + // Make sure we get the right stream. + sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1) + if listResponse.Streams[0] != sname { + t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0]) + } + } + + checkResp(reqList(0), server.JSApiNamesLimit, 0) + checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit) + checkResp(reqList(streamsNum), 0, streamsNum) + checkResp(reqList(streamsNum-22), 22, streamsNum-22) + checkResp(reqList(streamsNum+22), 0, streamsNum) +} + +func TestNoRaceJetStreamAPIConsumerListPaging(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + // Forced cleanup of all persisted state. + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + sname := "MYSTREAM" + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + + // Client for API requests. + nc := clientConnectToServer(t, s) + defer nc.Close() + + consumersNum := server.JSApiNamesLimit + for i := 1; i <= consumersNum; i++ { + dsubj := fmt.Sprintf("d.%d", i) + sub, _ := nc.SubscribeSync(dsubj) + defer sub.Unsubscribe() + nc.Flush() + + _, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname) + reqList := func(offset int) []byte { + t.Helper() + var req []byte + if offset > 0 { + req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}}) + } + resp, err := nc.Request(reqListSubject, req, time.Second) + if err != nil { + t.Fatalf("Unexpected error getting stream list: %v", err) + } + return resp.Data + } + + checkResp := func(resp []byte, expectedLen, expectedOffset int) { + t.Helper() + var listResponse server.JSApiConsumerNamesResponse + if err := json.Unmarshal(resp, &listResponse); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(listResponse.Consumers) != expectedLen { + t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers)) + } + if listResponse.Total != consumersNum { + t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total) + } + if listResponse.Offset != expectedOffset { + t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset) + } + } + + checkResp(reqList(0), server.JSApiNamesLimit, 0) + checkResp(reqList(consumersNum-22), 22, consumersNum-22) + checkResp(reqList(consumersNum+22), 0, consumersNum) +}