diff --git a/server/const.go b/server/const.go index 34310bb5..56030699 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.2-beta.8" + VERSION = "2.2.2-beta.10" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1f768500..7fa3a5c1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2424,6 +2424,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca + // See if we are a member ourID := cc.meta.ID() isMember := ca.Group.isMember(ourID) @@ -2532,7 +2533,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { if o != nil { if o.isDurable() && o.isPushMode() { ocfg := o.config() - if configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest() { + if ocfg == *ca.Config || (configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest()) { o.updateDeliverSubject(ca.Config.DeliverSubject) } else { // This is essentially and update that has failed. @@ -3983,6 +3984,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec return } + // Setup proper default for ack wait if we are in explicit ack mode. + if cfg.AckWait == 0 && (cfg.AckPolicy == AckExplicit || cfg.AckPolicy == AckAll) { + cfg.AckWait = JsAckWaitDefault + } + // Setup default of -1, meaning no limit for MaxDeliver. + if cfg.MaxDeliver == 0 { + cfg.MaxDeliver = -1 + } + // Set proper default for max ack pending if we are ack explicit and none has been set. + if cfg.AckPolicy == AckExplicit && cfg.MaxAckPending == 0 { + cfg.MaxAckPending = JsDefaultMaxAckPending + } + rg := cc.createGroupForConsumer(sa) if rg == nil { resp.Error = jsInsufficientErr