From 5ead954feeb68d4cd36ea66115b91a14ec6ad45a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Nov 2021 07:42:19 -0700 Subject: [PATCH] [ADDED] Allow certain consumer attributes to be updated #2670, #2603 Signed-off-by: Derek Collison --- server/consumer.go | 206 ++++++++++++++++++++++++------- server/dirstore_test.go | 67 ---------- server/jetstream_cluster.go | 199 +++++++++++++++++------------ server/jetstream_cluster_test.go | 156 +++++++++++++++++++++-- server/test_test.go | 68 ++++++++++ 5 files changed, 499 insertions(+), 197 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 1d3ee4d1..5618e924 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -382,28 +382,36 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } + // Helper function to formulate similar errors. + badStart := func(dp, start string) error { + return fmt.Errorf("consumer delivery policy is deliver %s, but optional start %s is also set", dp, start) + } + notSet := func(dp, notSet string) error { + return fmt.Errorf("consumer delivery policy is deliver %s, but optional %s is not set", dp, notSet) + } + // Check on start position conflicts. switch config.DeliverPolicy { case DeliverAll: if config.OptStartSeq > 0 { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver all, but optional start sequence is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("all", "sequence")) } if config.OptStartTime != nil { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver all, but optional start time is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("all", "time")) } case DeliverLast: if config.OptStartSeq > 0 { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last, but optional start sequence is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("last", "sequence")) } if config.OptStartTime != nil { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last, but optional start time is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("last", "time")) } case DeliverLastPerSubject: if config.OptStartSeq > 0 { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but optional start sequence is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("last per subject", "sequence")) } if config.OptStartTime != nil { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but optional start time is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("last per subject", "time")) } badConfig := config.FilterSubject == _EMPTY_ if !badConfig { @@ -413,28 +421,28 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } if badConfig { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but filter subject is not set")) + return nil, NewJSConsumerInvalidPolicyError(notSet("deliver last per subject", "filter subject")) } case DeliverNew: if config.OptStartSeq > 0 { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver new, but optional start sequence is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("new", "sequence")) } if config.OptStartTime != nil { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver new, but optional start time is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("new", "time")) } case DeliverByStartSequence: if config.OptStartSeq == 0 { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start sequence, but optional start sequence is not set")) + return nil, NewJSConsumerInvalidPolicyError(notSet("deliver by start sequence", "start sequence")) } if config.OptStartTime != nil { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start sequence, but optional start time is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("deliver by start sequence", "time")) } case DeliverByStartTime: if config.OptStartTime == nil { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start time, but optional start time is not set")) + return nil, NewJSConsumerInvalidPolicyError(notSet("deliver by start time", "start time")) } if config.OptStartSeq != 0 { - return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start time, but optional start sequence is also set")) + return nil, NewJSConsumerInvalidPolicyError(badStart("deliver by start time", "start sequence")) } } @@ -463,23 +471,15 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, errors.New("invalid stream") } - // If this one is durable and already exists, we let that be ok as long as the configs match. + // If this one is durable and already exists, we let that be ok as long as only updating what should be allowed. if isDurableConsumer(config) { if eo, ok := mset.consumers[config.Durable]; ok { mset.mu.Unlock() - ocfg := eo.config() - if reflect.DeepEqual(&ocfg, config) { + err := eo.updateConfig(config) + if err == nil { return eo, nil - } else { - // If we are a push mode and not active and the only difference - // is deliver subject then update and return. - if configsEqualSansDelivery(ocfg, *config) && eo.hasNoLocalInterest() { - eo.updateDeliverSubject(config.DeliverSubject) - return eo, nil - } else { - return nil, NewJSConsumerNameExistError() - } } + return nil, NewJSConsumerCreateError(err, Unless(err)) } } @@ -566,24 +566,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } - // Check if we have a rate limit set. - if config.RateLimit != 0 { - // TODO(dlc) - Make sane values or error if not sane? - // We are configured in bits per sec so adjust to bytes. - rl := rate.Limit(config.RateLimit / 8) - // Burst should be set to maximum msg size for this account, etc. - var burst int - if mset.cfg.MaxMsgSize > 0 { - burst = int(mset.cfg.MaxMsgSize) - } else if mset.jsa.account.limits.mpay > 0 { - burst = int(mset.jsa.account.limits.mpay) - } else { - s := mset.jsa.account.srv - burst = int(s.getOpts().MaxPayload) - } - o.rlimit = rate.NewLimiter(rl, burst) - } - // Check if we have filtered subject that is a wildcard. if config.FilterSubject != _EMPTY_ && subjectHasWildcard(config.FilterSubject) { o.filterWC = true @@ -667,6 +649,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.setConsumerAssignment(ca) } + // Check if we have a rate limit set. + if config.RateLimit != 0 { + o.setRateLimit(config.RateLimit) + } + mset.setConsumer(o) mset.mu.Unlock() @@ -1138,13 +1125,146 @@ func (o *consumer) forceExpirePending() { o.signalNewMessages() } +// Acquire proper locks and update rate limit. +// Will use what is in config. +func (o *consumer) setRateLimitNeedsLocks() { + o.mu.RLock() + mset := o.mset + o.mu.RUnlock() + + if mset == nil { + return + } + + mset.mu.RLock() + o.mu.Lock() + o.setRateLimit(o.cfg.RateLimit) + o.mu.Unlock() + mset.mu.RUnlock() +} + +// Set the rate limiter +// Both mset and consumer lock should be held. +func (o *consumer) setRateLimit(bps uint64) { + if bps == 0 { + o.rlimit = nil + return + } + + // TODO(dlc) - Make sane values or error if not sane? + // We are configured in bits per sec so adjust to bytes. + rl := rate.Limit(bps / 8) + mset := o.mset + + // Burst should be set to maximum msg size for this account, etc. + var burst int + if mset.cfg.MaxMsgSize > 0 { + burst = int(mset.cfg.MaxMsgSize) + } else if mset.jsa.account.limits.mpay > 0 { + burst = int(mset.jsa.account.limits.mpay) + } else { + s := mset.jsa.account.srv + burst = int(s.getOpts().MaxPayload) + } + + o.rlimit = rate.NewLimiter(rl, burst) +} + +// Check if new consumer config allowed vs old. +func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { + if reflect.DeepEqual(cfg, ncfg) { + return nil + } + // Something different, so check since we only allow certain things to be updated. + if cfg.FilterSubject != ncfg.FilterSubject { + return errors.New("filter subject can not be updated") + } + if cfg.DeliverPolicy != ncfg.DeliverPolicy { + return errors.New("deliver policy can not be updated") + } + if cfg.OptStartSeq != ncfg.OptStartSeq { + return errors.New("start sequence can not be updated") + } + if cfg.OptStartTime != ncfg.OptStartTime { + return errors.New("start time can not be updated") + } + if cfg.AckPolicy != ncfg.AckPolicy { + return errors.New("ack policy can not be updated") + } + if cfg.ReplayPolicy != ncfg.ReplayPolicy { + return errors.New("replay policy can not be updated") + } + if cfg.Heartbeat != ncfg.Heartbeat { + return errors.New("heart beats can not be updated") + } + if cfg.FlowControl != ncfg.FlowControl { + return errors.New("flow control can not be updated") + } + + // Deliver Subject is conditional on if its bound. + if cfg.DeliverSubject != ncfg.DeliverSubject { + if cfg.DeliverSubject == _EMPTY_ { + return errors.New("can not update pull consumer to push based") + } + rr := acc.sl.Match(cfg.DeliverSubject) + if len(rr.psubs)+len(rr.qsubs) != 0 { + return NewJSConsumerNameExistError() + } + } + + return nil +} + +// Update the config based on the new config, or error if update not allowed. +func (o *consumer) updateConfig(cfg *ConsumerConfig) error { + o.mu.Lock() + defer o.mu.Unlock() + + if err := o.acc.checkNewConsumerConfig(&o.cfg, cfg); err != nil { + return err + } + + // DeliverSubject + if cfg.DeliverSubject != o.cfg.DeliverSubject { + o.updateDeliverSubjectLocked(cfg.DeliverSubject) + } + + // MaxAckPending + if cfg.MaxAckPending != o.cfg.MaxAckPending { + o.maxp = cfg.MaxAckPending + o.signalNewMessages() + } + // AckWait + if cfg.AckWait != o.cfg.AckWait { + if o.ptmr != nil { + o.ptmr.Reset(100 * time.Millisecond) + } + } + // Rate Limit + if cfg.RateLimit != o.cfg.RateLimit { + // We need both locks here so do in Go routine. + go o.setRateLimitNeedsLocks() + } + + // Record new config for others that do not need special handling. + // Allowed but considered no-op, [Description, MaxDeliver, SampleFrequency, MaxWaiting, HeadersOnly] + o.cfg = *cfg + + return nil +} + // This is a config change for the delivery subject for a // push based consumer. func (o *consumer) updateDeliverSubject(newDeliver string) { // Update the config and the dsubj o.mu.Lock() defer o.mu.Unlock() + o.updateDeliverSubjectLocked(newDeliver) +} +// This is a config change for the delivery subject for a +// push based consumer. +func (o *consumer) updateDeliverSubjectLocked(newDeliver string) { if o.closed || o.isPullMode() || o.cfg.DeliverSubject == newDeliver { return } diff --git a/server/dirstore_test.go b/server/dirstore_test.go index a2d6c306..6c4171cb 100644 --- a/server/dirstore_test.go +++ b/server/dirstore_test.go @@ -30,73 +30,6 @@ import ( "github.com/nats-io/nkeys" ) -func require_True(t *testing.T, b bool) { - t.Helper() - if !b { - t.Fatalf("require true, but got false") - } -} - -func require_False(t *testing.T, b bool) { - t.Helper() - if b { - t.Fatalf("require no false, but got true") - } -} - -func require_NoError(t testing.TB, err error) { - t.Helper() - if err != nil { - t.Fatalf("require no error, but got: %v", err) - } -} - -func require_Contains(t *testing.T, s string, subStrs ...string) { - t.Helper() - for _, subStr := range subStrs { - if !strings.Contains(s, subStr) { - t.Fatalf("require %q to be contained in %q", subStr, s) - } - } -} - -func require_Error(t *testing.T, err error, expected ...error) { - t.Helper() - if err == nil { - t.Fatalf("require error, but got none") - } - if len(expected) == 0 { - return - } - for _, e := range expected { - if err == e || strings.Contains(e.Error(), err.Error()) { - return - } - } - t.Fatalf("Expected one of %+v, got '%v'", expected, err) -} - -func require_Equal(t *testing.T, a, b string) { - t.Helper() - if strings.Compare(a, b) != 0 { - t.Fatalf("require equal, but got: %v != %v", a, b) - } -} - -func require_NotEqual(t *testing.T, a, b [32]byte) { - t.Helper() - if bytes.Equal(a[:], b[:]) { - t.Fatalf("require not equal, but got: %v != %v", a, b) - } -} - -func require_Len(t *testing.T, a, b int) { - t.Helper() - if a != b { - t.Fatalf("require len, but got: %v != %v", a, b) - } -} - func TestShardedDirStoreWriteAndReadonly(t *testing.T) { t.Parallel() dir := createDir(t, "jwtstore_test") diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 734655a1..4939af57 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1696,7 +1696,7 @@ func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) { js.mu.Unlock() for _, ca := range consumers { - js.processClusterCreateConsumer(ca, nil) + js.processClusterCreateConsumer(ca, nil, false) } } @@ -2544,7 +2544,6 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { ourID = cc.meta.ID() } var isMember bool - if ca.Group != nil && ourID != _EMPTY_ { isMember = ca.Group.isMember(ourID) } @@ -2579,15 +2578,15 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { return } + // Track if this existed already. + var wasExisting bool + // Check if we have an existing consumer assignment. js.mu.Lock() if sa.consumers == nil { sa.consumers = make(map[string]*consumerAssignment) - } else if oca := sa.consumers[ca.Name]; oca != nil && !oca.pending { - // Copy over private existing state from former CA. - ca.Group.node = oca.Group.node - ca.responded = oca.responded - ca.err = oca.err + } else if oca := sa.consumers[ca.Name]; oca != nil { + wasExisting = true } // Capture the optional state. We will pass it along if we are a member to apply. @@ -2602,7 +2601,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Check if this is for us.. if isMember { - js.processClusterCreateConsumer(ca, state) + js.processClusterCreateConsumer(ca, state, wasExisting) } else { // Check if we have a raft node running, meaning we are no longer part of the group but were. js.mu.Lock() @@ -2649,7 +2648,7 @@ type consumerAssignmentResult struct { } // processClusterCreateConsumer is when we are a member of the group and need to create the consumer. -func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState) { +func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState, wasExisting bool) { if ca == nil { return } @@ -2683,38 +2682,47 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state return } - // Process the raft group and make sure its running if needed. - js.createRaftGroup(rg, mset.config().Storage) - - // Check if we already have this consumer running. - o := mset.lookupConsumer(ca.Name) - if o != nil { - if o.isDurable() && o.isPushMode() { - ocfg := o.config() - if ocfg == *ca.Config || (configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest()) { - o.updateDeliverSubject(ca.Config.DeliverSubject) - } else { - // This is essentially and update that has failed. - js.mu.Lock() - result := &consumerAssignmentResult{ - Account: ca.Client.serviceAccount(), - Stream: ca.Stream, - Consumer: ca.Name, - Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, - } - result.Response.Error = NewJSConsumerNameExistError() - s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) - js.mu.Unlock() - return - } - } - o.setConsumerAssignment(ca) - s.Debugf("JetStream cluster, consumer was already running") + if !alreadyRunning { + // Process the raft group and make sure its running if needed. + js.createRaftGroup(rg, mset.config().Storage) } - // Add in the consumer if needed. + // Check if we already have this consumer running. + var didCreate bool + o := mset.lookupConsumer(ca.Name) if o == nil { + // Add in the consumer if needed. o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca) + didCreate = true + } else { + if err := o.updateConfig(ca.Config); err != nil { + // This is essentially an update that has failed. + js.mu.Lock() + result := &consumerAssignmentResult{ + Account: ca.Client.serviceAccount(), + Stream: ca.Stream, + Consumer: ca.Name, + Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + } + result.Response.Error = NewJSConsumerNameExistError() + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) + js.mu.Unlock() + return + } + // Check if we already had a consumer assignment and its still pending. + cca, oca := ca, o.consumerAssignment() + js.mu.Lock() + if oca != nil && !oca.responded { + // We can't over ride info for replying here otherwise leader once elected can not respond. + // So just update Config, leave off client and reply to the originals. + cac := *oca + cac.Config = ca.Config + cca = &cac + } + js.mu.Unlock() + // Set CA for our consumer. + o.setConsumerAssignment(cca) + s.Debugf("JetStream cluster, consumer was already running") } // If we have an initial state set apply that now. @@ -2763,15 +2771,27 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b) } } else { - o.setCreatedTime(ca.Created) + if didCreate { + o.setCreatedTime(ca.Created) + } // Start our monitoring routine. - if rg.node != nil { + if rg.node == nil { + // Single replica consumer, process manually here. + js.processConsumerLeaderChange(o, true) + } else { if !alreadyRunning { s.startGoRoutine(func() { js.monitorConsumer(o, ca) }) } - } else { - // Single replica consumer, process manually here. - js.processConsumerLeaderChange(o, true) + // Process if existing. + if wasExisting && (o.isLeader() || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) { + // This is essentially an update, so make sure to respond if needed. + js.mu.RLock() + client, subject, reply := ca.Client, ca.Subject, ca.Reply + js.mu.RUnlock() + var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} + resp.ConsumerInfo = o.info() + s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + } } } } @@ -4266,52 +4286,71 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec cfg.MaxAckPending = JsDefaultMaxAckPending } - rg := cc.createGroupForConsumer(sa) - if rg == nil { - resp.Error = NewJSInsufficientResourcesError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return - } - // Pick a preferred leader. - rg.setPreferred() - - // We need to set the ephemeral here before replicating. + var ca *consumerAssignment var oname string - if !isDurableConsumer(cfg) { - // We chose to have ephemerals be R=1 unless stream is interest or workqueue. - if sa.Config.Retention == LimitsPolicy { - rg.Peers = []string{rg.Preferred} - rg.Name = groupNameForConsumer(rg.Peers, rg.Storage) - } - // Make sure name is unique. - for { - oname = createConsumerName() - if sa.consumers != nil { - if sa.consumers[oname] != nil { - continue - } - } - break - } - } else { + + // See if we have an existing one already under same durable name. + if isDurableConsumer(cfg) { oname = cfg.Durable - if ca := sa.consumers[oname]; ca != nil && !ca.deleted { - isPull := ca.Config.DeliverSubject == _EMPTY_ - // This can be ok if delivery subject update. - shouldErr := isPull || ca.pending || (!reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config)) - if !shouldErr { - rr := acc.sl.Match(ca.Config.DeliverSubject) - shouldErr = len(rr.psubs)+len(rr.qsubs) != 0 - } - if shouldErr { - resp.Error = NewJSConsumerNameExistError() + if ca = sa.consumers[oname]; ca != nil && !ca.deleted { + // Do quick sanity check on new cfg to prevent here if possible. + if err := acc.checkNewConsumerConfig(ca.Config, cfg); err != nil { + resp.Error = NewJSConsumerCreateError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } } } - ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} + // If this is new consumer. + if ca == nil { + rg := cc.createGroupForConsumer(sa) + if rg == nil { + resp.Error = NewJSInsufficientResourcesError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + // Pick a preferred leader. + rg.setPreferred() + + // We need to set the ephemeral here before replicating. + if !isDurableConsumer(cfg) { + // We chose to have ephemerals be R=1 unless stream is interest or workqueue. + if sa.Config.Retention == LimitsPolicy { + rg.Peers = []string{rg.Preferred} + rg.Name = groupNameForConsumer(rg.Peers, rg.Storage) + } + // Make sure name is unique. + for { + oname = createConsumerName() + if sa.consumers != nil { + if sa.consumers[oname] != nil { + continue + } + } + break + } + } + ca = &consumerAssignment{ + Group: rg, + Stream: stream, + Name: oname, + Config: cfg, + Subject: subject, + Reply: reply, + Client: ci, + Created: time.Now().UTC(), + } + } else { + // Update config and client info on copy of existing. + nca := *ca + nca.Config = cfg + nca.Client = ci + nca.Subject = subject + nca.Reply = reply + ca = &nca + } + eca := encodeAddConsumerAssignment(ca) // Mark this as pending. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 78b6b572..13477264 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1426,9 +1426,9 @@ func TestJetStreamClusterDoubleAdd(t *testing.T) { if _, err := js.AddConsumer("TEST", cfg); err != nil { t.Fatalf("Unexpected error: %v", err) } - // Check double add fails. - if _, err := js.AddConsumer("TEST", cfg); err == nil || err == nats.ErrTimeout { - t.Fatalf("Expected error but got none or timeout") + // Check double add ok. + if _, err := js.AddConsumer("TEST", cfg); err != nil { + t.Fatalf("Expected no error but got: %v", err) } } @@ -8437,7 +8437,7 @@ func TestJetStreamRaceOnRAFTCreate(t *testing.T) { t.Fatalf("Error creating stream: %v", err) } - js, err = nc.JetStream(nats.MaxWait(time.Second)) + js, err = nc.JetStream(nats.MaxWait(2 * time.Second)) if err != nil { t.Fatal(err) } @@ -8446,12 +8446,12 @@ func TestJetStreamRaceOnRAFTCreate(t *testing.T) { wg := sync.WaitGroup{} wg.Add(size) for i := 0; i < size; i++ { - go func() { + go func(i int) { defer wg.Done() if _, err := js.PullSubscribe("foo", "shared"); err != nil { - t.Errorf("Unexpected error: %v", err) + t.Errorf("Unexpected error on %v: %v", i, err) } - }() + }(i) } wg.Wait() } @@ -9440,6 +9440,148 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) { } } +func TestJetStreamConsumerUpdates(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + c := createJetStreamClusterExplicit(t, "JSC", 5) + defer c.shutdown() + + testConsumerUpdate := func(t *testing.T, s *Server, replicas int) { + nc, js := jsClientConnect(t, s) + defer nc.Close() + // Create a stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: replicas, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + js.PublishAsync("foo", []byte("OK")) + } + + cfg := &nats.ConsumerConfig{ + Durable: "dlc", + Description: "Update TEST", + FilterSubject: "foo", + DeliverSubject: "d.foo", + AckPolicy: nats.AckExplicitPolicy, + AckWait: time.Minute, + MaxDeliver: 5, + MaxAckPending: 50, + } + + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + + // Update delivery subject, which worked before, but upon review had issues unless replica count == clustered size. + cfg.DeliverSubject = "d.bar" + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + + // Bind deliver subject. + sub, err := nc.SubscribeSync("d.bar") + require_NoError(t, err) + defer sub.Unsubscribe() + + ncfg := *cfg + ncfg.DeliverSubject = "d.baz" + + // Should fail. + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + // Description + cfg.Description = "New Description" + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + + // MaxAckPending + checkSubsPending(t, sub, 50) + cfg.MaxAckPending = 75 + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + checkSubsPending(t, sub, 75) + + // Drain sub, do not ack first ten though so we can test shortening AckWait. + for i := 0; i < 100; i++ { + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + if i >= 10 { + m.Ack() + } + } + + // AckWait + checkSubsPending(t, sub, 0) + cfg.AckWait = 200 * time.Millisecond + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + checkSubsPending(t, sub, 10) + + // Rate Limit + cfg.RateLimit = 8 * 1024 + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + + cfg.RateLimit = 0 + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + + // These all should fail. + ncfg = *cfg + ncfg.FilterSubject = "bar" + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + ncfg = *cfg + ncfg.DeliverPolicy = nats.DeliverLastPolicy + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + ncfg = *cfg + ncfg.OptStartSeq = 22 + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + ncfg = *cfg + now := time.Now() + ncfg.OptStartTime = &now + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + ncfg = *cfg + ncfg.AckPolicy = nats.AckAllPolicy + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + ncfg = *cfg + ncfg.ReplayPolicy = nats.ReplayOriginalPolicy + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + ncfg = *cfg + ncfg.Heartbeat = time.Second + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + ncfg = *cfg + ncfg.FlowControl = true + _, err = js.AddConsumer("TEST", &ncfg) + require_Error(t, err) + + } + + t.Run("Single", func(t *testing.T) { testConsumerUpdate(t, s, 1) }) + t.Run("Clustered", func(t *testing.T) { testConsumerUpdate(t, c.randomServer(), 2) }) +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/test_test.go b/server/test_test.go index 116079c3..29cc0ed2 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -14,6 +14,7 @@ package server import ( + "bytes" "fmt" "math/rand" "net/url" @@ -59,6 +60,73 @@ type cluster struct { t *testing.T } +func require_True(t *testing.T, b bool) { + t.Helper() + if !b { + t.Fatalf("require true, but got false") + } +} + +func require_False(t *testing.T, b bool) { + t.Helper() + if b { + t.Fatalf("require no false, but got true") + } +} + +func require_NoError(t testing.TB, err error) { + t.Helper() + if err != nil { + t.Fatalf("require no error, but got: %v", err) + } +} + +func require_Contains(t *testing.T, s string, subStrs ...string) { + t.Helper() + for _, subStr := range subStrs { + if !strings.Contains(s, subStr) { + t.Fatalf("require %q to be contained in %q", subStr, s) + } + } +} + +func require_Error(t *testing.T, err error, expected ...error) { + t.Helper() + if err == nil { + t.Fatalf("require error, but got none") + } + if len(expected) == 0 { + return + } + for _, e := range expected { + if err == e || strings.Contains(e.Error(), err.Error()) { + return + } + } + t.Fatalf("Expected one of %+v, got '%v'", expected, err) +} + +func require_Equal(t *testing.T, a, b string) { + t.Helper() + if strings.Compare(a, b) != 0 { + t.Fatalf("require equal, but got: %v != %v", a, b) + } +} + +func require_NotEqual(t *testing.T, a, b [32]byte) { + t.Helper() + if bytes.Equal(a[:], b[:]) { + t.Fatalf("require not equal, but got: %v != %v", a, b) + } +} + +func require_Len(t *testing.T, a, b int) { + t.Helper() + if a != b { + t.Fatalf("require len, but got: %v != %v", a, b) + } +} + func checkNatsError(t *testing.T, e *ApiError, id ErrorIdentifier) { t.Helper() ae, ok := ApiErrors[id]