Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-11-03 12:36:06 -07:00
parent 0f8aa11422
commit 72ff2edb5f
2 changed files with 75 additions and 3 deletions

View File

@@ -1882,6 +1882,25 @@ func (o *consumer) checkPendingRequests() {
o.prm = nil
}
// This will release any pending pull requests if applicable.
// Should be called only by the leader being deleted or stopped.
// Lock should be held.
func (o *consumer) releaseAnyPendingRequests() {
if o.mset == nil || o.outq == nil || o.waiting.len() == 0 {
return
}
hdr := []byte("NATS/1.0 409 Consumer Deleted\r\n\r\n")
wq := o.waiting
o.waiting = nil
for i, rp := 0, wq.rp; i < wq.n; i++ {
if wr := wq.reqs[rp]; wr != nil {
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
wr.recycle()
}
rp = (rp + 1) % cap(wq.reqs)
}
}
// Process a NAK.
func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
o.mu.Lock()
@@ -2618,7 +2637,7 @@ func (o *consumer) pendingRequests() map[string]*waitingRequest {
return nil
}
wq, m := o.waiting, make(map[string]*waitingRequest)
for i, rp := 0, o.waiting.rp; i < wq.n; i++ {
for i, rp := 0, wq.rp; i < wq.n; i++ {
if wr := wq.reqs[rp]; wr != nil {
m[wr.reply] = wr
}
@@ -2649,7 +2668,8 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
} else {
// Since we can't send that message to the requestor, we need to
// notify that we are closing the request.
hdr := []byte(fmt.Sprintf("NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
const maxBytesT = "NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n"
hdr := []byte(fmt.Sprintf(maxBytesT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
// Remove the current one, no longer valid due to max bytes limit.
o.waiting.removeCurrent()
@@ -2672,7 +2692,8 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
}
if wr.interest != wr.reply {
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
const intExpT = "NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n"
hdr := []byte(fmt.Sprintf(intExpT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
// Remove the current one, no longer valid.
@@ -4014,6 +4035,10 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if advisory {
o.sendDeleteAdvisoryLocked()
}
if o.isPullMode() {
// Release any pending.
o.releaseAnyPendingRequests()
}
}
if o.qch != nil {

View File

@@ -1003,3 +1003,50 @@ func TestJetStreamClusterStreamLagWarning(t *testing.T) {
// OK
}
}
// https://github.com/nats-io/nats-server/issues/3603
func TestJetStreamClusterSignalPullConsumersOnDelete(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
// Create 2 pull consumers.
sub1, err := js.PullSubscribe("foo", "d1")
require_NoError(t, err)
sub2, err := js.PullSubscribe("foo", "d2")
require_NoError(t, err)
// We want to make sure we get kicked out prior to the timeout
// when consumers are being deleted or the parent stream is being deleted.
// Note this should be lower case, Go client needs to be updated.
expectedErr := errors.New("nats: Consumer Deleted")
// Queue up the delete for sub1
time.AfterFunc(250*time.Millisecond, func() { js.DeleteConsumer("TEST", "d1") })
start := time.Now()
_, err = sub1.Fetch(1, nats.MaxWait(10*time.Second))
require_Error(t, err, expectedErr)
// Check that we bailed early.
if time.Since(start) > time.Second {
t.Fatalf("Took to long to bail out on consumer delete")
}
time.AfterFunc(250*time.Millisecond, func() { js.DeleteStream("TEST") })
start = time.Now()
_, err = sub2.Fetch(1, nats.MaxWait(10*time.Second))
require_Error(t, err, expectedErr)
if time.Since(start) > time.Second {
t.Fatalf("Took to long to bail out on stream delete")
}
}