diff --git a/server/consumer.go b/server/consumer.go index 8215dd01..358d3647 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -544,6 +544,15 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name + if !isValidName(o.name) { + mset.mu.Unlock() + o.deleteWithoutAdvisory() + return nil, fmt.Errorf("durable name can not contain '.', '*', '>'") + } + + // Select starting sequence number + o.selectStartingSeqNo() + if !config.Direct { store, err := mset.store.ConsumerStore(o.name, config) if err != nil { @@ -554,15 +563,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.store = store } - if !isValidName(o.name) { - mset.mu.Unlock() - o.deleteWithoutAdvisory() - return nil, fmt.Errorf("durable name can not contain '.', '*', '>'") - } - - // Select starting sequence number - o.selectStartingSeqNo() - // Now register with mset and create the ack subscription. // Check if we already have this one registered. if eo, ok := mset.consumers[o.name]; ok { @@ -1604,7 +1604,7 @@ func (o *consumer) needAck(sseq uint64) bool { state, err := o.store.State() if err != nil || state == nil { // Fall back to what we track internally for now. - needsAck := sseq > o.asflr + needsAck := sseq > o.asflr && o.cfg.FilterSubject == _EMPTY_ o.mu.RUnlock() return needsAck }