mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Fix bug in needAck
needAck has reverse logic for checking single subject-single filter scenario. Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
@@ -2569,9 +2569,9 @@ func (o *consumer) isFiltered() bool {
|
||||
// `isFiltered` need to be performant, so we do
|
||||
// as any checks as possible to avoid unnecessary work.
|
||||
// Here we avoid iteration over slices if there is only one subject in stream
|
||||
// and one equal filter for the consumer.
|
||||
if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 && mset.cfg.Subjects[0] == o.subjf[0].subject {
|
||||
return true
|
||||
// and one filter for the consumer.
|
||||
if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 {
|
||||
return mset.cfg.Subjects[0] != o.subjf[0].subject
|
||||
}
|
||||
|
||||
// if the list is not equal length, we can return early, as this is filtered.
|
||||
|
||||
@@ -20573,3 +20573,64 @@ func TestJetStreamConsumerAckFloorWithExpired(t *testing.T) {
|
||||
require_True(t, ci.NumPending == 0)
|
||||
require_True(t, ci.NumRedelivered == 0)
|
||||
}
|
||||
|
||||
func TestJetStreamConsumerIsFiltered(t *testing.T) {
|
||||
s := RunBasicJetStreamServer(t)
|
||||
defer s.Shutdown()
|
||||
acc := s.GlobalAccount()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
streamSubjects []string
|
||||
filters []string
|
||||
isFiltered bool
|
||||
}{
|
||||
{
|
||||
name: "single_subject",
|
||||
streamSubjects: []string{"one"},
|
||||
filters: []string{"one"},
|
||||
isFiltered: false,
|
||||
},
|
||||
{
|
||||
name: "single_subject_filtered",
|
||||
streamSubjects: []string{"one.>"},
|
||||
filters: []string{"one.filter"},
|
||||
isFiltered: true,
|
||||
},
|
||||
{
|
||||
name: "multi_subject_non_filtered",
|
||||
streamSubjects: []string{"multi", "foo", "bar.>"},
|
||||
filters: []string{"multi", "bar.>", "foo"},
|
||||
isFiltered: false,
|
||||
},
|
||||
{
|
||||
name: "multi_subject_filtered_wc",
|
||||
streamSubjects: []string{"events", "data"},
|
||||
filters: []string{"data"},
|
||||
isFiltered: true,
|
||||
},
|
||||
{
|
||||
name: "multi_subject_filtered",
|
||||
streamSubjects: []string{"machines", "floors"},
|
||||
filters: []string{"machines"},
|
||||
isFiltered: true,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
mset, err := acc.addStream(&StreamConfig{
|
||||
Name: test.name,
|
||||
Subjects: test.streamSubjects,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
o, err := mset.addConsumer(&ConsumerConfig{
|
||||
FilterSubjects: test.filters,
|
||||
Durable: test.name,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
require_True(t, o.isFiltered() == test.isFiltered)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user