mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2193 from nats-io/filtered_interest
When dealing with filtered subjects do not assume the current state
This commit is contained in:
@@ -544,6 +544,15 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name
|
||||
o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name
|
||||
|
||||
if !isValidName(o.name) {
|
||||
mset.mu.Unlock()
|
||||
o.deleteWithoutAdvisory()
|
||||
return nil, fmt.Errorf("durable name can not contain '.', '*', '>'")
|
||||
}
|
||||
|
||||
// Select starting sequence number
|
||||
o.selectStartingSeqNo()
|
||||
|
||||
if !config.Direct {
|
||||
store, err := mset.store.ConsumerStore(o.name, config)
|
||||
if err != nil {
|
||||
@@ -554,15 +563,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
o.store = store
|
||||
}
|
||||
|
||||
if !isValidName(o.name) {
|
||||
mset.mu.Unlock()
|
||||
o.deleteWithoutAdvisory()
|
||||
return nil, fmt.Errorf("durable name can not contain '.', '*', '>'")
|
||||
}
|
||||
|
||||
// Select starting sequence number
|
||||
o.selectStartingSeqNo()
|
||||
|
||||
// Now register with mset and create the ack subscription.
|
||||
// Check if we already have this one registered.
|
||||
if eo, ok := mset.consumers[o.name]; ok {
|
||||
@@ -1604,7 +1604,7 @@ func (o *consumer) needAck(sseq uint64) bool {
|
||||
state, err := o.store.State()
|
||||
if err != nil || state == nil {
|
||||
// Fall back to what we track internally for now.
|
||||
needsAck := sseq > o.asflr
|
||||
needsAck := sseq > o.asflr && o.cfg.FilterSubject == _EMPTY_
|
||||
o.mu.RUnlock()
|
||||
return needsAck
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user