Merge pull request #3537 from nats-io/js_cons_name

[FIXED] JetStream: User-defined ephemeral Name not used in cluster mode
This commit is contained in:
Ivan Kozlovic
2022-10-10 14:01:47 -06:00
committed by GitHub
2 changed files with 81 additions and 10 deletions

View File

@@ -6076,9 +6076,14 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
var ca *consumerAssignment
var oname string
// See if we have an existing one already under same durable name.
if isDurableConsumer(cfg) {
oname = cfg.Durable
// See if we have an existing one already under same durable name or
// if name was set by the user.
if isDurableConsumer(cfg) || cfg.Name != _EMPTY_ {
if cfg.Name != _EMPTY_ {
oname = cfg.Name
} else {
oname = cfg.Durable
}
if ca = sa.consumers[oname]; ca != nil && !ca.deleted {
// Do quick sanity check on new cfg to prevent here if possible.
if err := acc.checkNewConsumerConfig(ca.Config, cfg); err != nil {
@@ -6110,15 +6115,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
rg.Peers = []string{rg.Preferred}
rg.Name = groupNameForConsumer(rg.Peers, rg.Storage)
}
// Make sure name is unique.
for {
oname = createConsumerName()
if sa.consumers != nil {
if sa.consumers[oname] != nil {
continue
if cfg.Name != _EMPTY_ {
oname = cfg.Name
} else {
// Make sure name is unique.
for {
oname = createConsumerName()
if sa.consumers != nil {
if sa.consumers[oname] != nil {
continue
}
}
break
}
break
}
}
if len(rg.Peers) > 1 {

View File

@@ -519,3 +519,65 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) {
t.Run("Standalone", func(t *testing.T) { testBadReplicas(t, s, "TEST1") })
t.Run("Clustered", func(t *testing.T) { testBadReplicas(t, c.randomServer(), "TEST2") })
}
func TestJetStreamClusterUserSelectedConsName(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
test := func(t *testing.T, s *Server, stream string, replicas int, cons string) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: stream,
Replicas: replicas,
})
require_NoError(t, err)
cc := &CreateConsumerRequest{
Stream: stream,
Config: ConsumerConfig{
Name: cons,
FilterSubject: stream,
InactiveThreshold: 10 * time.Second,
},
}
subj := fmt.Sprintf(JSApiConsumerCreateExT, stream, cons, stream)
req, err := json.Marshal(cc)
require_NoError(t, err)
reply, err := nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
var cresp JSApiConsumerCreateResponse
json.Unmarshal(reply.Data, &cresp)
if cresp.Error != nil {
t.Fatalf("Unexpected error: %v", cresp.Error)
}
require_Equal(t, cresp.Name, cons)
require_Equal(t, cresp.Config.Name, cons)
// Resend the add request but before change something that the server
// should reject since the consumer already exist and we don't support
// the update of the consumer that way.
cc.Config.DeliverPolicy = DeliverNew
req, err = json.Marshal(cc)
require_NoError(t, err)
reply, err = nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
cresp = JSApiConsumerCreateResponse{}
json.Unmarshal(reply.Data, &cresp)
require_Error(t, cresp.Error, NewJSConsumerCreateError(errors.New("deliver policy can not be updated")))
}
t.Run("Standalone", func(t *testing.T) { test(t, s, "TEST", 1, "cons") })
t.Run("Clustered R1", func(t *testing.T) { test(t, c.randomServer(), "TEST2", 1, "cons2") })
t.Run("Clustered R3", func(t *testing.T) { test(t, c.randomServer(), "TEST3", 3, "cons3") })
}