Merge pull request #1826 from nats-io/fix_consumer_loop_delivery_exit

Fix stop of consumer's delivery loop
This commit is contained in:
Ivan Kozlovic
2021-01-20 10:34:57 -07:00
committed by GitHub
3 changed files with 178 additions and 178 deletions

View File

@@ -653,12 +653,13 @@ func (o *Consumer) setLeader(isLeader bool) {
o.sendq = make(chan *jsPubMsg, msetSendQSize)
// Recreate quit channel.
o.qch = make(chan struct{})
qch := o.qch
o.mu.Unlock()
// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs()
go o.loopAndGatherMsgs(qch)
// Startup our deliver loop.
go o.loopAndDeliverMsgs()
go o.loopAndDeliverMsgs(qch)
} else {
// Shutdown the go routines and the subscriptions.
@@ -1114,9 +1115,9 @@ func (o *Consumer) writeState() {
}
// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes.
func (o *Consumer) loopAndDeliverMsgs() {
func (o *Consumer) loopAndDeliverMsgs(qch chan struct{}) {
o.mu.Lock()
qch, inch, sendq := o.qch, o.inch, o.sendq
inch, sendq := o.inch, o.sendq
s, acc := o.acc.srv, o.acc
o.mu.Unlock()
@@ -1489,16 +1490,17 @@ func (wq *waitQueue) pop() *waitingRequest {
func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string, msg []byte) {
o.mu.Lock()
mset := o.mset
if mset == nil || o.isPushMode() {
if mset == nil || o.isPushMode() || o.sendq == nil {
o.mu.Unlock()
return
}
sendErr := func(status int, description string) {
sendq := o.sendq
o.mu.Unlock()
hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description))
pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0}
o.sendq <- pmsg // Send message.
sendq <- pmsg // Send message.
}
if o.waiting.isFull() {
@@ -1659,7 +1661,7 @@ func (o *Consumer) forceExpireFirstWaiting() *waitingRequest {
return wr
}
// If we are expiring this and we think there is still interest, alert.
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil {
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil {
// We still appear to have interest, so send alert as courtesy.
hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n")
pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0}
@@ -1695,7 +1697,7 @@ func (o *Consumer) checkWaitingForInterest() bool {
return o.waiting.len() > 0
}
func (o *Consumer) loopAndGatherMsgs() {
func (o *Consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a a reply situtation where replay policy is not instant.
var (
lts int64 // last time stamp seen, used for replay.
@@ -1763,7 +1765,6 @@ func (o *Consumer) loopAndGatherMsgs() {
// If we are in a replay scenario and have not caught up check if we need to delay here.
if o.replay && lts > 0 {
if delay = time.Duration(ts - lts); delay > time.Millisecond {
qch := o.qch
o.mu.Unlock()
select {
case <-qch:
@@ -1783,7 +1784,6 @@ func (o *Consumer) loopAndGatherMsgs() {
r := o.rlimit.ReserveN(now, len(msg)+len(hdr)+len(subj)+len(dsubj)+len(o.ackReplyT))
delay := r.DelayFrom(now)
if delay > 0 {
qch := o.qch
o.mu.Unlock()
select {
case <-qch:
@@ -1807,7 +1807,6 @@ func (o *Consumer) loopAndGatherMsgs() {
// We will wait here for new messages to arrive.
mch := o.mch
qch := o.qch
o.mu.Unlock()
select {
@@ -1876,7 +1875,7 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t
// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) {
if o.mset == nil {
if o.mset == nil || o.sendq == nil {
return
}
// Update pending on first attempt
@@ -1897,9 +1896,10 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
ap := o.config.AckPolicy
// This needs to be unlocked since the other side may need this lock on a failed delivery.
sendq := o.sendq
o.mu.Unlock()
// Send message.
o.sendq <- pmsg
sendq <- pmsg
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) {
mset.store.RemoveMsg(seq)

View File

@@ -3543,34 +3543,6 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {
}
}
func TestJetStreamDeleteStreamManyConsumers(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "MYS"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// This number needs to be higher than the internal sendq size to trigger what this test is testing.
for i := 0; i < 2000; i++ {
_, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: fmt.Sprintf("D-%d", i),
DeliverSubject: fmt.Sprintf("deliver.%d", i),
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
}
// With bug this would not return and would hang.
mset.Delete()
}
func TestJetStreamConsumerRateLimit(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
@@ -7575,143 +7547,6 @@ func TestJetStreamFilteredStreamNames(t *testing.T) {
expectStreams("*.22", []string{"S4", "S5"})
}
func TestJetStreamAPIStreamListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
// Create 2X limit
streamsNum := 2 * server.JSApiNamesLimit
for i := 1; i <= streamsNum; i++ {
name := fmt.Sprintf("STREAM-%06d", i)
cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage}
_, err := s.GlobalAccount().AddStream(&cfg)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset})
}
resp, err := nc.Request(server.JSApiStreams, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiStreamNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Streams) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams))
}
if listResponse.Total != streamsNum {
t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
if expectedLen < 1 {
return
}
// Make sure we get the right stream.
sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1)
if listResponse.Streams[0] != sname {
t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0])
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit)
checkResp(reqList(streamsNum), 0, streamsNum)
checkResp(reqList(streamsNum-22), 22, streamsNum-22)
checkResp(reqList(streamsNum+22), 0, streamsNum)
}
func TestJetStreamAPIConsumerListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
sname := "MYSTREAM"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
consumersNum := server.JSApiNamesLimit
for i := 1; i <= consumersNum; i++ {
dsubj := fmt.Sprintf("d.%d", i)
sub, _ := nc.SubscribeSync(dsubj)
defer sub.Unsubscribe()
nc.Flush()
_, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname)
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}})
}
resp, err := nc.Request(reqListSubject, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiConsumerNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Consumers) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers))
}
if listResponse.Total != consumersNum {
t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(consumersNum-22), 22, consumersNum-22)
checkResp(reqList(consumersNum+22), 0, consumersNum)
}
func TestJetStreamUpdateStream(t *testing.T) {
cases := []struct {
name string

View File

@@ -788,3 +788,168 @@ func TestNoRaceSlowProxy(t *testing.T) {
t.Fatalf("bps is off, target is %v, actual is %v", bwTarget, bps)
}
}
func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "MYS"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// This number needs to be higher than the internal sendq size to trigger what this test is testing.
for i := 0; i < 2000; i++ {
_, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: fmt.Sprintf("D-%d", i),
DeliverSubject: fmt.Sprintf("deliver.%d", i),
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
}
// With bug this would not return and would hang.
mset.Delete()
}
func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
// Create 2X limit
streamsNum := 2 * server.JSApiNamesLimit
for i := 1; i <= streamsNum; i++ {
name := fmt.Sprintf("STREAM-%06d", i)
cfg := server.StreamConfig{Name: name, Storage: server.MemoryStorage}
_, err := s.GlobalAccount().AddStream(&cfg)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.ApiPagedRequest{Offset: offset})
}
resp, err := nc.Request(server.JSApiStreams, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiStreamNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Streams) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Streams))
}
if listResponse.Total != streamsNum {
t.Fatalf("Expected total to be %d but got %d", streamsNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
if expectedLen < 1 {
return
}
// Make sure we get the right stream.
sname := fmt.Sprintf("STREAM-%06d", expectedOffset+1)
if listResponse.Streams[0] != sname {
t.Fatalf("Expected stream %q to be first, got %q", sname, listResponse.Streams[0])
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(server.JSApiNamesLimit), server.JSApiNamesLimit, server.JSApiNamesLimit)
checkResp(reqList(streamsNum), 0, streamsNum)
checkResp(reqList(streamsNum-22), 22, streamsNum-22)
checkResp(reqList(streamsNum+22), 0, streamsNum)
}
func TestNoRaceJetStreamAPIConsumerListPaging(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
sname := "MYSTREAM"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: sname})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
consumersNum := server.JSApiNamesLimit
for i := 1; i <= consumersNum; i++ {
dsubj := fmt.Sprintf("d.%d", i)
sub, _ := nc.SubscribeSync(dsubj)
defer sub.Unsubscribe()
nc.Flush()
_, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
reqListSubject := fmt.Sprintf(server.JSApiConsumersT, sname)
reqList := func(offset int) []byte {
t.Helper()
var req []byte
if offset > 0 {
req, _ = json.Marshal(&server.JSApiConsumersRequest{ApiPagedRequest: server.ApiPagedRequest{Offset: offset}})
}
resp, err := nc.Request(reqListSubject, req, time.Second)
if err != nil {
t.Fatalf("Unexpected error getting stream list: %v", err)
}
return resp.Data
}
checkResp := func(resp []byte, expectedLen, expectedOffset int) {
t.Helper()
var listResponse server.JSApiConsumerNamesResponse
if err := json.Unmarshal(resp, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Consumers) != expectedLen {
t.Fatalf("Expected only %d streams but got %d", expectedLen, len(listResponse.Consumers))
}
if listResponse.Total != consumersNum {
t.Fatalf("Expected total to be %d but got %d", consumersNum, listResponse.Total)
}
if listResponse.Offset != expectedOffset {
t.Fatalf("Expected offset to be %d but got %d", expectedOffset, listResponse.Offset)
}
}
checkResp(reqList(0), server.JSApiNamesLimit, 0)
checkResp(reqList(consumersNum-22), 22, consumersNum-22)
checkResp(reqList(consumersNum+22), 0, consumersNum)
}