From 72ff2edb5fbc70bf43f98e0e76ceacb580962f31 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 3 Nov 2022 12:36:06 -0700 Subject: [PATCH] Fix for #3603. Signed-off-by: Derek Collison --- server/consumer.go | 31 ++++++++++++++++++-- server/jetstream_cluster_3_test.go | 47 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 84e384cd..cea2c44f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1882,6 +1882,25 @@ func (o *consumer) checkPendingRequests() { o.prm = nil } +// This will release any pending pull requests if applicable. +// Should be called only by the leader being deleted or stopped. +// Lock should be held. +func (o *consumer) releaseAnyPendingRequests() { + if o.mset == nil || o.outq == nil || o.waiting.len() == 0 { + return + } + hdr := []byte("NATS/1.0 409 Consumer Deleted\r\n\r\n") + wq := o.waiting + o.waiting = nil + for i, rp := 0, wq.rp; i < wq.n; i++ { + if wr := wq.reqs[rp]; wr != nil { + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr.recycle() + } + rp = (rp + 1) % cap(wq.reqs) + } +} + // Process a NAK. func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { o.mu.Lock() @@ -2618,7 +2637,7 @@ func (o *consumer) pendingRequests() map[string]*waitingRequest { return nil } wq, m := o.waiting, make(map[string]*waitingRequest) - for i, rp := 0, o.waiting.rp; i < wq.n; i++ { + for i, rp := 0, wq.rp; i < wq.n; i++ { if wr := wq.reqs[rp]; wr != nil { m[wr.reply] = wr } @@ -2649,7 +2668,8 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { } else { // Since we can't send that message to the requestor, we need to // notify that we are closing the request. - hdr := []byte(fmt.Sprintf("NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) + const maxBytesT = "NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n" + hdr := []byte(fmt.Sprintf(maxBytesT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) // Remove the current one, no longer valid due to max bytes limit. o.waiting.removeCurrent() @@ -2672,7 +2692,8 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { } } if wr.interest != wr.reply { - hdr := []byte(fmt.Sprintf("NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) + const intExpT = "NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n" + hdr := []byte(fmt.Sprintf(intExpT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) } // Remove the current one, no longer valid. @@ -4014,6 +4035,10 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { if advisory { o.sendDeleteAdvisoryLocked() } + if o.isPullMode() { + // Release any pending. + o.releaseAnyPendingRequests() + } } if o.qch != nil { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index f6500f04..c622e2ee 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1003,3 +1003,50 @@ func TestJetStreamClusterStreamLagWarning(t *testing.T) { // OK } } + +// https://github.com/nats-io/nats-server/issues/3603 +func TestJetStreamClusterSignalPullConsumersOnDelete(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create 2 pull consumers. + sub1, err := js.PullSubscribe("foo", "d1") + require_NoError(t, err) + + sub2, err := js.PullSubscribe("foo", "d2") + require_NoError(t, err) + + // We want to make sure we get kicked out prior to the timeout + // when consumers are being deleted or the parent stream is being deleted. + // Note this should be lower case, Go client needs to be updated. + expectedErr := errors.New("nats: Consumer Deleted") + + // Queue up the delete for sub1 + time.AfterFunc(250*time.Millisecond, func() { js.DeleteConsumer("TEST", "d1") }) + start := time.Now() + _, err = sub1.Fetch(1, nats.MaxWait(10*time.Second)) + require_Error(t, err, expectedErr) + + // Check that we bailed early. + if time.Since(start) > time.Second { + t.Fatalf("Took to long to bail out on consumer delete") + } + + time.AfterFunc(250*time.Millisecond, func() { js.DeleteStream("TEST") }) + start = time.Now() + _, err = sub2.Fetch(1, nats.MaxWait(10*time.Second)) + require_Error(t, err, expectedErr) + if time.Since(start) > time.Second { + t.Fatalf("Took to long to bail out on stream delete") + } +}