Make sure that o.subjf is nil or checked if empty

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
Tomasz Pietrek
2023-05-28 21:03:15 +02:00
parent a463dfe5c4
commit 261f39bb7d
2 changed files with 58 additions and 3 deletions

View File

@@ -1735,7 +1735,12 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.mu.Lock()
// When we're done with signaling, we can replace the subjects.
o.subjf = newSubjf
// If filters were removed, set `o.subjf` to nil.
if len(newSubjf) == 0 {
o.subjf = nil
} else {
o.subjf = newSubjf
}
}
// Record new config for others that do not need special handling.
@@ -3247,7 +3252,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
store := o.mset.store
// If no filters are specified, optimize to fetch just non-filtered messages.
if o.subjf == nil {
if len(o.subjf) == 0 {
// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
o.mu.Unlock()
@@ -3308,7 +3313,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// even if len == 0 or 1.
// TODO(tp): we should have sort based off generics for server
// to avoid reflection.
if o.subjf != nil && len(o.subjf) > 1 {
if len(o.subjf) > 1 {
sort.Slice(o.subjf, func(i, j int) bool {
if o.subjf[j].pmsg != nil && o.subjf[i].pmsg == nil {
return false

View File

@@ -28,6 +28,56 @@ import (
"github.com/nats-io/nats.go"
)
func TestJetStreamConsumerMultipleFiltersRemoveFilters(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
acc := s.GlobalAccount()
mset, err := acc.addStream(&StreamConfig{
Name: "TEST",
Retention: LimitsPolicy,
Subjects: []string{"one", "two", "three"},
MaxAge: time.Second * 90,
})
require_NoError(t, err)
_, err = mset.addConsumer(&ConsumerConfig{
Durable: "consumer",
FilterSubjects: []string{"one", "two"},
})
require_NoError(t, err)
sendStreamMsg(t, nc, "one", "data")
sendStreamMsg(t, nc, "two", "data")
sendStreamMsg(t, nc, "three", "data")
consumer, err := js.PullSubscribe("", "consumer", nats.Bind("TEST", "consumer"))
require_NoError(t, err)
msgs, err := consumer.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)
_, err = mset.addConsumer(&ConsumerConfig{
Durable: "consumer",
FilterSubjects: []string{},
})
require_NoError(t, err)
msgs, err = consumer.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)
msgs, err = consumer.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)
}
func TestJetStreamConsumerMultipleFiltersRace(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()