mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Allow filter subjects to have wildcards. Fix for https://github.com/nats-io/jetstream/issues/136
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -142,7 +142,7 @@ type Consumer struct {
|
||||
mset *Stream
|
||||
acc *Account
|
||||
name string
|
||||
streamName string
|
||||
stream string
|
||||
sseq uint64
|
||||
dseq uint64
|
||||
adflr uint64
|
||||
@@ -162,6 +162,7 @@ type Consumer struct {
|
||||
store ConsumerStore
|
||||
active bool
|
||||
replay bool
|
||||
filterWC bool
|
||||
dtmr *time.Timer
|
||||
dthresh time.Duration
|
||||
fch chan struct{}
|
||||
@@ -218,9 +219,6 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
|
||||
|
||||
// Make sure any partition subject is also a literal.
|
||||
if config.FilterSubject != "" {
|
||||
if !subjectIsLiteral(config.FilterSubject) {
|
||||
return nil, fmt.Errorf("consumer filter subject has wildcards")
|
||||
}
|
||||
// Make sure this is a valid partition of the interest subjects.
|
||||
if !mset.validSubject(config.FilterSubject) {
|
||||
return nil, fmt.Errorf("consumer filter subject is not a valid subset of the interest subjects")
|
||||
@@ -299,10 +297,15 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
|
||||
o.name = createConsumerName()
|
||||
}
|
||||
|
||||
// Check if we have filtered subject that is a wildcard.
|
||||
if config.FilterSubject != _EMPTY_ && !subjectIsLiteral(config.FilterSubject) {
|
||||
o.filterWC = true
|
||||
}
|
||||
|
||||
// already under lock, mset.Name() would deadlock
|
||||
o.streamName = mset.config.Name
|
||||
o.ackEventT = JetStreamMetricConsumerAckPre + "." + o.streamName + "." + o.name
|
||||
o.deliveryExcEventT = JetStreamAdvisoryConsumerMaxDeliveryExceedPre + "." + o.streamName + "." + o.name
|
||||
o.stream = mset.config.Name
|
||||
o.ackEventT = JetStreamMetricConsumerAckPre + "." + o.stream + "." + o.name
|
||||
o.deliveryExcEventT = JetStreamAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name
|
||||
|
||||
store, err := mset.store.ConsumerStore(o.name, config)
|
||||
if err != nil {
|
||||
@@ -620,7 +623,7 @@ func (o *Consumer) Info() *ConsumerInfo {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
info := &ConsumerInfo{
|
||||
Stream: o.streamName,
|
||||
Stream: o.stream,
|
||||
Name: o.name,
|
||||
Config: o.config,
|
||||
State: ConsumerState{
|
||||
@@ -692,7 +695,7 @@ func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) {
|
||||
Schema: "io.nats.jetstream.metric.v1.consumer_ack",
|
||||
ID: nuid.Next(),
|
||||
Time: now.Format(time.RFC3339Nano),
|
||||
Stream: o.streamName,
|
||||
Stream: o.stream,
|
||||
Consumer: o.name,
|
||||
ConsumerSeq: dseq,
|
||||
StreamSeq: sseq,
|
||||
@@ -828,7 +831,7 @@ func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
|
||||
Schema: "io.nats.jetstream.advisory.v1.max_deliver",
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC().Format(time.RFC3339Nano),
|
||||
Stream: o.streamName,
|
||||
Stream: o.stream,
|
||||
Consumer: o.name,
|
||||
StreamSeq: sseq,
|
||||
Deliveries: dcount,
|
||||
@@ -845,6 +848,16 @@ func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if the candidate subject matches a filter if its present.
|
||||
func (o *Consumer) isFilteredMatch(subj string) bool {
|
||||
if !o.filterWC {
|
||||
return subj == o.config.FilterSubject
|
||||
}
|
||||
// If we are here we have a wildcard filter subject.
|
||||
// TODO(dlc) at speed might be better to just do a sublist with L2 and/or possibly L1.
|
||||
return subjectIsSubsetMatch(subj, o.config.FilterSubject)
|
||||
}
|
||||
|
||||
// Get next available message from underlying store.
|
||||
// Is partition aware and redeliver aware.
|
||||
// Lock should be held.
|
||||
@@ -872,7 +885,7 @@ func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) {
|
||||
if err == nil {
|
||||
if dcount == 1 { // First delivery.
|
||||
o.sseq++
|
||||
if o.config.FilterSubject != _EMPTY_ && subj != o.config.FilterSubject {
|
||||
if o.config.FilterSubject != _EMPTY_ && !o.isFilteredMatch(subj) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -981,7 +994,7 @@ func (o *Consumer) processReplay() error {
|
||||
}
|
||||
}
|
||||
// We have a message to deliver here.
|
||||
if err == nil && (partition == _EMPTY_ || subj == partition) {
|
||||
if err == nil && (partition == _EMPTY_ || o.isFilteredMatch(subj)) {
|
||||
// FIXME(dlc) - pull based.
|
||||
if !pullMode {
|
||||
o.deliverMsg(o.dsubj, subj, msg, o.sseq, 1)
|
||||
|
||||
@@ -1059,6 +1059,118 @@ func TestJetStreamWorkQueueSubjectFiltering(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamWildcardSubjectFiltering(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
mconfig *server.StreamConfig
|
||||
}{
|
||||
{"MemoryStore", &server.StreamConfig{Name: "ORDERS", Storage: server.MemoryStorage, Subjects: []string{"orders.*.*"}}},
|
||||
{"FileStore", &server.StreamConfig{Name: "ORDERS", Storage: server.FileStorage, Subjects: []string{"orders.*.*"}}},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
mset, err := s.GlobalAccount().AddStream(c.mconfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding stream: %v", err)
|
||||
}
|
||||
defer mset.Delete()
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
toSend := 100
|
||||
for i := 1; i <= toSend; i++ {
|
||||
subj := fmt.Sprintf("orders.%d.%s", i, "NEW")
|
||||
sendStreamMsg(t, nc, subj, "new order")
|
||||
}
|
||||
// Randomly move 25 to shipped.
|
||||
toShip := 25
|
||||
shipped := make(map[int]bool)
|
||||
for i := 0; i < toShip; {
|
||||
orderId := rand.Intn(toSend-1) + 1
|
||||
if shipped[orderId] {
|
||||
continue
|
||||
}
|
||||
subj := fmt.Sprintf("orders.%d.%s", orderId, "SHIPPED")
|
||||
sendStreamMsg(t, nc, subj, "shipped order")
|
||||
shipped[orderId] = true
|
||||
i++
|
||||
}
|
||||
state := mset.State()
|
||||
if state.Msgs != uint64(toSend+toShip) {
|
||||
t.Fatalf("Expected %d messages, got %d", toSend+toShip, state.Msgs)
|
||||
}
|
||||
|
||||
delivery := nats.NewInbox()
|
||||
sub, _ := nc.SubscribeSync(delivery)
|
||||
defer sub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
// Get all shipped.
|
||||
o, err := mset.AddConsumer(&server.ConsumerConfig{Delivery: delivery, FilterSubject: "orders.*.SHIPPED", DeliverAll: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
checkFor(t, time.Second, 25*time.Millisecond, func() error {
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toShip {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toShip)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
for nmsgs, _, _ := sub.Pending(); nmsgs > 0; nmsgs, _, _ = sub.Pending() {
|
||||
sub.NextMsg(time.Second)
|
||||
}
|
||||
if nmsgs, _, _ := sub.Pending(); nmsgs != 0 {
|
||||
t.Fatalf("Expected no pending, got %d", nmsgs)
|
||||
}
|
||||
|
||||
// Get all new
|
||||
o, err = mset.AddConsumer(&server.ConsumerConfig{Delivery: delivery, FilterSubject: "orders.*.NEW", DeliverAll: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
checkFor(t, time.Second, 25*time.Millisecond, func() error {
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
for nmsgs, _, _ := sub.Pending(); nmsgs > 0; nmsgs, _, _ = sub.Pending() {
|
||||
sub.NextMsg(time.Second)
|
||||
}
|
||||
if nmsgs, _, _ := sub.Pending(); nmsgs != 0 {
|
||||
t.Fatalf("Expected no pending, got %d", nmsgs)
|
||||
}
|
||||
|
||||
// Now grab a single orderId that has shipped, so we should have two messages.
|
||||
var orderId int
|
||||
for orderId = range shipped {
|
||||
break
|
||||
}
|
||||
subj := fmt.Sprintf("orders.%d.*", orderId)
|
||||
o, err = mset.AddConsumer(&server.ConsumerConfig{Delivery: delivery, FilterSubject: subj, DeliverAll: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error with registered interest, got %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
checkFor(t, time.Second, 25*time.Millisecond, func() error {
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 2 {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, 2)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamWorkQueueAckAndNext(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
|
||||
Reference in New Issue
Block a user