Merge branch 'main' into dev

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-08-21 15:55:00 -07:00
19 changed files with 517 additions and 93 deletions

View File

@@ -276,14 +276,14 @@ var (
// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 {
// Matches old-school ephemerals only, where the replica count is 0.
return 1
}
return strCfg.Replicas
} else {
return consCfg.Replicas
}
return consCfg.Replicas
}
// Consumer is a jetstream consumer.
@@ -1219,7 +1219,7 @@ func (o *consumer) setLeader(isLeader bool) {
if o.dthresh > 0 && (o.isPullMode() || !o.active) {
// Pull consumer. We run the dtmr all the time for this one.
stopAndClearTimer(&o.dtmr)
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
// If we are not in ReplayInstant mode mark us as in replay state until resolved.
@@ -1249,7 +1249,6 @@ func (o *consumer) setLeader(isLeader bool) {
if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)
}
// If we are R>1 spin up our proposal loop.
@@ -1268,7 +1267,10 @@ func (o *consumer) setLeader(isLeader bool) {
close(o.qch)
o.qch = nil
}
// Make sure to clear out any re delivery queues
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq = nil
o.rdqi.Empty()
@@ -1285,9 +1287,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Reset waiting if we are in pull mode.
if o.isPullMode() {
o.waiting = newWaitQueue(o.cfg.MaxWaiting)
if !o.isDurable() {
stopAndClearTimer(&o.dtmr)
}
o.nextMsgReqs.drain()
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
@@ -1478,7 +1477,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
// If we do not have interest anymore and have a delete threshold set, then set
// a timer to delete us. We wait for a bit in case of server reconnect.
if !interest && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
return true
}
return false
@@ -1505,7 +1504,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh - elapsed)
} else {
o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive)
}
o.mu.Unlock()
return
@@ -1515,7 +1514,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh)
} else {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
o.mu.Unlock()
return
@@ -1769,7 +1768,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
stopAndClearTimer(&o.dtmr)
// Restart timer only if we are the leader.
if o.isLeader() && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}