Merge pull request #2139 from nats-io/cfix

Fix for consumer on restore being deleted
This commit is contained in:
Derek Collison
2021-04-21 06:58:34 -07:00
committed by GitHub
2 changed files with 16 additions and 2 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.2-beta.8"
VERSION = "2.2.2-beta.10"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2424,6 +2424,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Place into our internal map under the stream assignment.
// Ok to replace an existing one, we check on process call below.
sa.consumers[ca.Name] = ca
// See if we are a member
ourID := cc.meta.ID()
isMember := ca.Group.isMember(ourID)
@@ -2532,7 +2533,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
if o != nil {
if o.isDurable() && o.isPushMode() {
ocfg := o.config()
if configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest() {
if ocfg == *ca.Config || (configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest()) {
o.updateDeliverSubject(ca.Config.DeliverSubject)
} else {
// This is essentially and update that has failed.
@@ -3983,6 +3984,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
return
}
// Setup proper default for ack wait if we are in explicit ack mode.
if cfg.AckWait == 0 && (cfg.AckPolicy == AckExplicit || cfg.AckPolicy == AckAll) {
cfg.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if cfg.MaxDeliver == 0 {
cfg.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if cfg.AckPolicy == AckExplicit && cfg.MaxAckPending == 0 {
cfg.MaxAckPending = JsDefaultMaxAckPending
}
rg := cc.createGroupForConsumer(sa)
if rg == nil {
resp.Error = jsInsufficientErr