Use new consumer create subject when single subject filter specified in SubjectFilters

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-09-19 23:34:50 +01:00
parent 29ba4aa5dc
commit ad63d702c4
2 changed files with 78 additions and 0 deletions

View File

@@ -21853,3 +21853,78 @@ func TestJetStreamSyncInterval(t *testing.T) {
})
}
}
func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
nc, _ := jsClientConnect(t, s)
defer nc.Close()
extEndpoint := make(chan *nats.Msg, 1)
normalEndpoint := make(chan *nats.Msg, 1)
_, err := nc.ChanSubscribe(JSApiConsumerCreateEx, extEndpoint)
require_NoError(t, err)
_, err = nc.ChanSubscribe(JSApiConsumerCreate, normalEndpoint)
require_NoError(t, err)
testStreamSource := func(name string, shouldBeExtended bool, ss StreamSource) {
t.Run(name, func(t *testing.T) {
req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("foo.%s", name)},
Sources: []*StreamSource{&ss},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)
select {
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for receive consumer create")
case <-extEndpoint:
if !shouldBeExtended {
t.Fatalf("Expected normal consumer create, got extended")
}
case <-normalEndpoint:
if shouldBeExtended {
t.Fatalf("Expected extended consumer create, got normal")
}
}
})
}
testStreamSource("OneFilterSubject", true, StreamSource{
Name: "source",
FilterSubject: "bar.>",
})
testStreamSource("OneTransform", true, StreamSource{
Name: "source",
SubjectTransforms: []SubjectTransformConfig{
{
Source: "bar.one.>",
Destination: "bar.two.>",
},
},
})
testStreamSource("TwoTransforms", false, StreamSource{
Name: "source",
SubjectTransforms: []SubjectTransformConfig{
{
Source: "bar.one.>",
Destination: "bar.two.>",
},
{
Source: "baz.one.>",
Destination: "baz.two.>",
},
},
})
}

View File

@@ -2893,6 +2893,9 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if req.Config.FilterSubject != _EMPTY_ {
req.Config.Name = fmt.Sprintf("src-%s", createConsumerName())
subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject)
} else if len(req.Config.FilterSubjects) == 1 {
req.Config.Name = fmt.Sprintf("src-%s", createConsumerName())
subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubjects[0])
} else {
subject = fmt.Sprintf(JSApiConsumerCreateT, si.name)
}