mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix for a bug that would allow old leaders of pull based durables to delete a consumer from an inactivity threshold.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1091,7 +1091,7 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
if o.dthresh > 0 && (o.isPullMode() || !o.active) {
|
||||
// Pull consumer. We run the dtmr all the time for this one.
|
||||
stopAndClearTimer(&o.dtmr)
|
||||
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
|
||||
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
|
||||
}
|
||||
|
||||
// If we are not in ReplayInstant mode mark us as in replay state until resolved.
|
||||
@@ -1121,7 +1121,6 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
if pullMode {
|
||||
// Now start up Go routine to process inbound next message requests.
|
||||
go o.processInboundNextMsgReqs(qch)
|
||||
|
||||
}
|
||||
|
||||
// If we are R>1 spin up our proposal loop.
|
||||
@@ -1140,7 +1139,10 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
close(o.qch)
|
||||
o.qch = nil
|
||||
}
|
||||
// Make sure to clear out any re delivery queues
|
||||
// Stop any inactivity timers. Should only be running on leaders.
|
||||
stopAndClearTimer(&o.dtmr)
|
||||
|
||||
// Make sure to clear out any re-deliver queues
|
||||
stopAndClearTimer(&o.ptmr)
|
||||
o.rdq, o.rdqi = nil, nil
|
||||
o.pending = nil
|
||||
@@ -1156,9 +1158,6 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
// Reset waiting if we are in pull mode.
|
||||
if o.isPullMode() {
|
||||
o.waiting = newWaitQueue(o.cfg.MaxWaiting)
|
||||
if !o.isDurable() {
|
||||
stopAndClearTimer(&o.dtmr)
|
||||
}
|
||||
o.nextMsgReqs.drain()
|
||||
} else if o.srv.gateway.enabled {
|
||||
stopAndClearTimer(&o.gwdtmr)
|
||||
@@ -1349,7 +1348,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
|
||||
// If we do not have interest anymore and have a delete threshold set, then set
|
||||
// a timer to delete us. We wait for a bit in case of server reconnect.
|
||||
if !interest && o.dthresh > 0 {
|
||||
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
|
||||
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
@@ -1376,7 +1375,7 @@ func (o *consumer) deleteNotActive() {
|
||||
if o.dtmr != nil {
|
||||
o.dtmr.Reset(o.dthresh - elapsed)
|
||||
} else {
|
||||
o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() })
|
||||
o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive)
|
||||
}
|
||||
o.mu.Unlock()
|
||||
return
|
||||
@@ -1386,7 +1385,7 @@ func (o *consumer) deleteNotActive() {
|
||||
if o.dtmr != nil {
|
||||
o.dtmr.Reset(o.dthresh)
|
||||
} else {
|
||||
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
|
||||
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
|
||||
}
|
||||
o.mu.Unlock()
|
||||
return
|
||||
@@ -1640,7 +1639,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
|
||||
stopAndClearTimer(&o.dtmr)
|
||||
// Restart timer only if we are the leader.
|
||||
if o.isLeader() && o.dthresh > 0 {
|
||||
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
|
||||
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5026,3 +5026,49 @@ func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) {
|
||||
require_NotEqual(t, info.Cluster.Leader, "")
|
||||
require_Equal(t, len(info.Cluster.Replicas), 2)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Queue a msg.
|
||||
sendStreamMsg(t, nc, "foo", "ok")
|
||||
|
||||
thresh := 250 * time.Millisecond
|
||||
|
||||
// This will start the timer.
|
||||
sub, err := js.PullSubscribe("foo", "dlc", nats.InactiveThreshold(thresh))
|
||||
require_NoError(t, err)
|
||||
|
||||
// Switch over leader.
|
||||
cl := c.consumerLeader(globalAccountName, "TEST", "dlc")
|
||||
cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "dlc")
|
||||
c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc")
|
||||
|
||||
// Create activity on this consumer.
|
||||
msgs, err := sub.Fetch(1)
|
||||
require_NoError(t, err)
|
||||
require_True(t, len(msgs) == 1)
|
||||
|
||||
// This is consider activity as well. So we can watch now up to thresh to make sure consumer still active.
|
||||
msgs[0].AckSync()
|
||||
|
||||
// The consumer should not disappear for next `thresh` interval unless old leader does so.
|
||||
timeout := time.Now().Add(thresh)
|
||||
for time.Now().Before(timeout) {
|
||||
_, err := js.ConsumerInfo("TEST", "dlc")
|
||||
if err == nats.ErrConsumerNotFound {
|
||||
t.Fatalf("Consumer deleted when it should not have been")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user