diff --git a/server/consumer.go b/server/consumer.go index 7d0e6c12..4b9e1ca0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -337,10 +337,19 @@ func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim } func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { - return mset.addConsumerWithAssignment(config, _EMPTY_, nil) + return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false) } -func checkConsumerCfg(config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamConfig, acc *Account, accLim *JetStreamAccountLimits) *ApiError { +// Check the consumer config. If we are recovering don't check filter subjects. +func checkConsumerCfg( + config *ConsumerConfig, + srvLim *JSLimitOpts, + cfg *StreamConfig, + acc *Account, + accLim *JetStreamAccountLimits, + isRecovering bool, +) *ApiError { + // Check if we have a BackOff defined that MaxDeliver is within range etc. if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo { return NewJSConsumerMaxDeliverBackoffError() @@ -415,7 +424,7 @@ func checkConsumerCfg(config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamCo } // As best we can make sure the filtered subject is valid. - if config.FilterSubject != _EMPTY_ { + if config.FilterSubject != _EMPTY_ && !isRecovering { subjects, hasExt := allSubjects(cfg, acc) if !validFilteredSubject(config.FilterSubject, subjects) && !hasExt { return NewJSConsumerFilterNotSubsetError() @@ -494,7 +503,7 @@ func checkConsumerCfg(config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamCo return nil } -func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment) (*consumer, error) { +func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool) (*consumer, error) { mset.mu.RLock() s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc retention := cfg.Retention @@ -522,7 +531,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Make sure we have sane defaults. setConsumerConfigDefaults(config, srvLim, &selectedLimits) - if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits); err != nil { + if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil { return nil, err } @@ -1595,7 +1604,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { sz += len(proposal.data) if sz > maxBatch { node.ProposeDirect(entries) - // We need to re-craete `entries` because there is a reference + // We need to re-create `entries` because there is a reference // to it in the node's pae map. sz, entries = 0, nil } diff --git a/server/filestore.go b/server/filestore.go index 06867888..2a65f7a2 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5164,7 +5164,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt if err := os.MkdirAll(odir, defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create consumer directory - %v", err) } - csi := &FileConsumerInfo{ConsumerConfig: *cfg} + csi := &FileConsumerInfo{Name: name, Created: time.Now().UTC(), ConsumerConfig: *cfg} o := &consumerFileStore{ fs: fs, cfg: csi, diff --git a/server/jetstream.go b/server/jetstream.go index 9d7ce4ee..0a71d1d0 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1253,7 +1253,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // the consumer can reconnect. We will create it as a durable and switch it. cfg.ConsumerConfig.Durable = ofi.Name() } - obs, err := e.mset.addConsumer(&cfg.ConsumerConfig) + obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true) if err != nil { s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) continue diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 32d71cde..d9327b40 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -350,7 +350,8 @@ func (cc *jetStreamCluster) isCurrent() bool { return cc.meta.Current() } -// isStreamCurrent will determine if this node is a participant for the stream and if its up to date. +// isStreamCurrent will determine if the stream is up to date. +// For R1 it will make sure the stream is present on this server. // Read lock should be held. func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { if cc == nil { @@ -366,12 +367,11 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { return false } rg := sa.Group - if rg == nil || rg.node == nil { + if rg == nil { return false } - isCurrent := rg.node.Current() - if isCurrent { + if rg.node == nil || rg.node.Current() { // Check if we are processing a snapshot and are catching up. acc, err := cc.s.LookupAccount(account) if err != nil { @@ -384,9 +384,37 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { if mset.isCatchingUp() { return false } + // Success. + return true } - return isCurrent + return false +} + +// isConsumerCurrent will determine if the consumer is up to date. +// For R1 it will make sure the consunmer is present on this server. +// Read lock should be held. +func (cc *jetStreamCluster) isConsumerCurrent(account, stream, consumer string) bool { + if cc == nil { + // Non-clustered mode + return true + } + acc, err := cc.s.LookupAccount(account) + if err != nil { + return false + } + mset, err := acc.lookupStream(stream) + if err != nil { + return false + } + o := mset.lookupConsumer(consumer) + if o == nil { + return false + } + if n := o.raftNode(); n != nil && !n.Current() { + return false + } + return true } func (a *Account) getJetStreamFromAccount() (*Server, *jetStream, *jsAccount) { @@ -3112,7 +3140,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate bool if o == nil { // Add in the consumer if needed. - o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca) + o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false) didCreate = true } else { if err := o.updateConfig(ca.Config); err != nil { @@ -5268,7 +5296,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // Make sure we have sane defaults setConsumerConfigDefaults(cfg, srvLim, selectedLimits) - if err := checkConsumerCfg(cfg, srvLim, &streamCfg, acc, selectedLimits); err != nil { + if err := checkConsumerCfg(cfg, srvLim, &streamCfg, acc, selectedLimits, false); err != nil { resp.Error = err s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index c31a910e..521b85d2 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10471,3 +10471,60 @@ func TestJetStreamClusterMirrorDeDupWindow(t *testing.T) { send(100) check(200) } + +func TestJetStreamClusterNewHealthz(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "R1", + Subjects: []string{"foo"}, + Replicas: 1, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "R3", + Subjects: []string{"bar"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create subscribers (durable and ephemeral for each) + fsube, err := js.SubscribeSync("foo") + require_NoError(t, err) + fsubd, err := js.SubscribeSync("foo", nats.Durable("d")) + require_NoError(t, err) + + _, err = js.SubscribeSync("bar") + require_NoError(t, err) + bsubd, err := js.SubscribeSync("bar", nats.Durable("d")) + require_NoError(t, err) + + for i := 0; i < 20; i++ { + _, err = js.Publish("foo", []byte("foo")) + require_NoError(t, err) + } + checkSubsPending(t, fsube, 20) + checkSubsPending(t, fsubd, 20) + + // Select the server where we know the R1 stream is running. + sl := c.streamLeader("$G", "R1") + sl.Shutdown() + + // Do same on R3 so that sl has to recover some things before healthz should be good. + c.waitOnStreamLeader("$G", "R3") + + for i := 0; i < 10; i++ { + _, err = js.Publish("bar", []byte("bar")) + require_NoError(t, err) + } + // Ephemeral is skipped, might have been on the downed server. + checkSubsPending(t, bsubd, 10) + + sl = c.restartServer(sl) + c.waitOnServerHealthz(sl) +} diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index cd742a54..0d580f99 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2419,3 +2419,67 @@ func TestJetStreamSuperClusterStreamAlternates(t *testing.T) { nc, _ = jsClientConnect(t, sc.clusterForName("C3").randomServer()) getStreamInfo(nc, "C3") } + +// We had a scenario where a consumer would not recover properly on restart due to +// the cluster state not being set properly when checking source subjects. +func TestJetStreamSuperClusterStateOnRestartPreventsConsumerRecovery(t *testing.T) { + sc := createJetStreamTaggedSuperCluster(t) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.randomServer()) + defer nc.Close() + + // C1 + _, err := js.AddStream(&nats.StreamConfig{ + Name: "SOURCE", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}}, + }) + require_NoError(t, err) + + // C2 + _, err = js.AddStream(&nats.StreamConfig{ + Name: "DS", + Subjects: []string{"baz"}, + Replicas: 3, + Sources: []*nats.StreamSource{{Name: "SOURCE"}}, + Placement: &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}}, + }) + require_NoError(t, err) + + // Bind to DS and match filter subject of SOURCE. + _, err = js.AddConsumer("DS", &nats.ConsumerConfig{ + Durable: "dlc", + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: "foo", + DeliverSubject: "d", + }) + require_NoError(t, err) + + // Send a few messages. + for i := 0; i < 100; i++ { + _, err := js.Publish("foo", []byte("HELLO")) + require_NoError(t, err) + } + sub := natsSubSync(t, nc, "d") + natsNexMsg(t, sub, time.Second) + + c := sc.clusterForName("C2") + cl := c.consumerLeader("$G", "DS", "dlc") + + // Pull source out from underneath the downstream stream. + err = js.DeleteStream("SOURCE") + require_NoError(t, err) + + cl.Shutdown() + cl = c.restartServer(cl) + c.waitOnServerHealthz(cl) + + // Now make sure the consumer is still on this server and has restarted properly. + mset, err := cl.GlobalAccount().lookupStream("DS") + require_NoError(t, err) + if o := mset.lookupConsumer("dlc"); o == nil { + t.Fatalf("Consumer was not properly restarted") + } +} diff --git a/server/monitor.go b/server/monitor.go index 12d2c83a..ff7ade80 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2869,74 +2869,68 @@ func (s *Server) healthz() *HealthStatus { if err := s.readyForConnections(time.Millisecond); err != nil { health.Status = "error" health.Error = err.Error() - } else if js := s.getJetStream(); js != nil { - // Check JetStream status here. - js.mu.RLock() - clustered, cc := !js.standAlone, js.cluster - js.mu.RUnlock() - if clustered { - // We do more checking for clustered mode to allow for proper rolling updates. - // We will make sure that we have seen the meta leader and that we are current with all assets. - node := js.getMetaGroup() - if node.GroupLeader() == _EMPTY_ { - health.Status = "unavailable" - health.Error = "JetStream has not established contact with a meta leader" - } else if !node.Current() { - health.Status = "unavailable" - health.Error = "JetStream is not current with the meta leader" - } else { - // If we are here we are current and have seen our meta leader. - // Now check assets. - var _a [512]*jsAccount - accounts := _a[:0] - js.mu.RLock() - // Collect accounts. - for _, jsa := range js.accounts { - accounts = append(accounts, jsa) - } - js.mu.RUnlock() + return health + } - var streams []*stream - Err: - // Walk our accounts and assets. - for _, jsa := range accounts { - if len(streams) > 0 { - streams = streams[:0] - } - jsa.mu.RLock() - accName := jsa.account.Name - for _, stream := range jsa.streams { - streams = append(streams, stream) - } - jsa.mu.RUnlock() - // Now walk the streams themselves. - js.mu.RLock() - for _, stream := range streams { - // Skip non-replicated. - if stream.cfg.Replicas <= 1 { - continue - } - sname := stream.name() - if !cc.isStreamCurrent(accName, sname) { + // Check JetStream + js := s.getJetStream() + if js == nil { + return health + } + + // Clustered JetStream + js.mu.RLock() + defer js.mu.RUnlock() + + cc := js.cluster + + // Currently single server mode this is a no-op. + if cc == nil || cc.meta == nil { + return health + } + + // If we are here we want to check for any assets assigned to us. + meta := cc.meta + ourID := meta.ID() + + // If no meta leader. + if meta.GroupLeader() == _EMPTY_ { + health.Status = "unavailable" + health.Error = "JetStream has not established contact with a meta leader" + return health + } + // If we are not current with the meta leader. + if !meta.Current() { + health.Status = "unavailable" + health.Error = "JetStream is not current with the meta leader" + return health + } + + // Range across all accounts, the streams assigned to them, and the consumers. + // If they are assigned to this server check their status. + for acc, asa := range cc.streams { + for stream, sa := range asa { + if sa.Group.isMember(ourID) { + // Make sure we can look up + if !cc.isStreamCurrent(acc, stream) { + health.Status = "unavailable" + health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream) + return health + } + // Now check consumers. + for consumer, ca := range sa.consumers { + if ca.Group.isMember(ourID) { + if !cc.isConsumerCurrent(acc, stream, consumer) { health.Status = "unavailable" - health.Error = fmt.Sprintf("JetStream stream %q for account %q is not current", sname, accName) - js.mu.RUnlock() - break Err - } - // Now do consumers. - for _, o := range stream.getConsumers() { - if node := o.raftNode(); node != nil && !node.Current() { - health.Status = "unavailable" - health.Error = fmt.Sprintf("JetStream consumer %q for stream %q and account %q is not current", o.String(), sname, accName) - js.mu.RUnlock() - break Err - } + health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) + return health } } - js.mu.RUnlock() } } } } + + // Success. return health } diff --git a/server/stream.go b/server/stream.go index 16af9359..464abf0d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1453,20 +1453,23 @@ func allSubjects(cfg *StreamConfig, acc *Account) ([]string, bool) { var seen map[string]bool if cfg.Mirror != nil { - var subjs []string - seen = make(map[string]bool) - subjs, hasExt = acc.streamSourceSubjects(cfg.Mirror, seen) + subjs, localHasExt := acc.streamSourceSubjects(cfg.Mirror, make(map[string]bool)) if len(subjs) > 0 { subjects = append(subjects, subjs...) } + if localHasExt { + hasExt = true + } } else if len(cfg.Sources) > 0 { - var subjs []string seen = make(map[string]bool) for _, si := range cfg.Sources { - subjs, hasExt = acc.streamSourceSubjects(si, seen) + subjs, localHasExt := acc.streamSourceSubjects(si, seen) if len(subjs) > 0 { subjects = append(subjects, subjs...) } + if localHasExt { + hasExt = true + } } } @@ -1566,6 +1569,7 @@ func (a *Account) streamSourceSubjectsNotClustered(streamName string, seen map[s } } } + return subjects, hasExt }