diff --git a/.github/ISSUE_TEMPLATE/blank_issue.md b/.github/ISSUE_TEMPLATE/blank_issue.md deleted file mode 100644 index 38ba9463..00000000 --- a/.github/ISSUE_TEMPLATE/blank_issue.md +++ /dev/null @@ -1,4 +0,0 @@ ---- -name: Blank Issue -about: Create an issue with a blank template. ---- diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml index 989f7213..ad105397 100644 --- a/.github/ISSUE_TEMPLATE/config.yml +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -1,5 +1,8 @@ -blank_issues_enabled: true +blank_issues_enabled: false contact_links: - - name: NATS Slack + - name: Discussion + url: https://github.com/nats-io/nats-server/discussions + about: Ideal for ideas, feedback, or longer form questions. + - name: Chat url: https://slack.nats.io - about: Please ask and answer questions in our Slack server here. + about: Ideal for short, one-off questions, general conversation, and meeting other NATS users! diff --git a/.github/ISSUE_TEMPLATE/defect.md b/.github/ISSUE_TEMPLATE/defect.md deleted file mode 100644 index 1367c509..00000000 --- a/.github/ISSUE_TEMPLATE/defect.md +++ /dev/null @@ -1,23 +0,0 @@ ---- -name: Defect Report -about: Report a bug found in the NATS Server -labels: 🐞 bug ---- - -## Defect - -Make sure that these boxes are checked before submitting your issue -- thank you! - - - [ ] Included `nats-server -DV` output - - [ ] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve) - -#### Versions of `nats-server` and affected client libraries used: - -#### OS/Container environment: - -#### Steps or code to reproduce the issue: - -#### Expected result: - -#### Actual result: - diff --git a/.github/ISSUE_TEMPLATE/defect.yml b/.github/ISSUE_TEMPLATE/defect.yml new file mode 100644 index 00000000..99b4800a --- /dev/null +++ b/.github/ISSUE_TEMPLATE/defect.yml @@ -0,0 +1,41 @@ +--- +name: Defect +description: Report a defect, such as a bug or regression. +labels: + - defect +body: + - type: textarea + id: versions + attributes: + label: What version were you using? + description: Include the server version (`nats-server --version`) and any client versions when observing the issue. + validations: + required: true + - type: textarea + id: environment + attributes: + label: What environment was the server running in? + description: This pertains to the operating system, CPU architecture, and/or Docker image that was used. + validations: + required: true + - type: textarea + id: steps + attributes: + label: Is this defect reproducible? + description: Provide best-effort steps to showcase the defect. + validations: + required: true + - type: textarea + id: expected + attributes: + label: Given the capability you are leveraging, describe your expectation? + description: This may be the expected behavior or performance characteristics. + validations: + required: true + - type: textarea + id: actual + attributes: + label: Given the expectation, what is the defect you are observing? + description: This may be an unexpected behavior or regression in performance. + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md deleted file mode 100644 index 51e54c6c..00000000 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ /dev/null @@ -1,16 +0,0 @@ ---- -name: Feature Request -about: Request a feature for the NATS Server -labels: 🎉 enhancement ---- - -## Feature Request - -#### Use Case: - -#### Proposed Change: - -#### Who Benefits From The Change(s)? - -#### Alternative Approaches - diff --git a/.github/ISSUE_TEMPLATE/proposal.yml b/.github/ISSUE_TEMPLATE/proposal.yml new file mode 100644 index 00000000..d7da0ca4 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/proposal.yml @@ -0,0 +1,34 @@ +--- +name: Proposal +description: Propose an enhancement or new feature. +labels: + - proposal +body: + - type: textarea + id: usecase + attributes: + label: What motivated this proposal? + description: Describe the use case justifying this request. + validations: + required: true + - type: textarea + id: change + attributes: + label: What is the proposed change? + description: This could be a behavior change, enhanced API, or a branch new feature. + validations: + required: true + - type: textarea + id: benefits + attributes: + label: Who benefits from this change? + description: Describe how this not only benefits you. + validations: + required: false + - type: textarea + id: alternates + attributes: + label: What alternatives have you evaluated? + description: This could be using existing features or relying on an external dependency. + validations: + required: false diff --git a/conf/parse.go b/conf/parse.go index 4d2a20ad..05713409 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -137,18 +137,20 @@ func parse(data, fp string, pedantic bool) (p *parser, err error) { } p.pushContext(p.mapping) + var prevItem itemType for { it := p.next() if it.typ == itemEOF { + if prevItem == itemKey { + return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos) + } break } + prevItem = it.typ if err := p.processItem(it, fp); err != nil { return nil, err } } - if len(p.mapping) == 0 { - return nil, fmt.Errorf("config has no values or is empty") - } return p, nil } diff --git a/conf/parse_test.go b/conf/parse_test.go index 3fc7c927..a6d5d2e8 100644 --- a/conf/parse_test.go +++ b/conf/parse_test.go @@ -3,6 +3,7 @@ package conf import ( "fmt" "os" + "path/filepath" "reflect" "strings" "testing" @@ -404,29 +405,162 @@ func TestParserNoInfiniteLoop(t *testing.T) { } } -func TestParseWithNoValues(t *testing.T) { - for _, test := range []string{ - ``, - `aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`, - ` aaaaaaaaaaaaaaaaaaaaaaaaaaa`, - ` aaaaaaaaaaaaaaaaaaaaaaaaaaa `, - ` - # just comments with no values - # is also invalid. - `, - ` - # with comments and no spaces to create key values - # is also an invalid config. - aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa - `, - ` - a,a,a,a,a,a,a,a,a,a,a - `, +func TestParseWithNoValuesAreInvalid(t *testing.T) { + for _, test := range []struct { + name string + conf string + err string + }{ + { + "invalid key without values", + `aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`, + "config is invalid (:1:41)", + }, + { + "invalid untrimmed key without values", + ` aaaaaaaaaaaaaaaaaaaaaaaaaaa`, + "config is invalid (:1:41)", + }, + { + "invalid untrimmed key without values", + ` aaaaaaaaaaaaaaaaaaaaaaaaaaa `, + "config is invalid (:1:41)", + }, + { + "invalid keys after comments", + ` + # with comments and no spaces to create key values + # is also an invalid config. + aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + `, + "config is invalid (:5:25)", + }, + { + "comma separated without values are invalid", + ` + a,a,a,a,a,a,a,a,a,a,a + `, + "config is invalid (:3:25)", + }, + { + // trailing brackets accidentally can become keys, these are also invalid. + "trailing brackets after config", + ` + accounts { users = [{}]} + } + `, + "config is invalid (:4:25)", + }, } { - if _, err := parse(test, "", true); err == nil { - t.Fatal("expected an error") - } else if !strings.Contains(err.Error(), "config has no values or is empty") { - t.Fatal("expected invalid conf error") - } + t.Run(test.name, func(t *testing.T) { + if _, err := parse(test.conf, "", true); err == nil { + t.Error("expected an error") + } else if !strings.Contains(err.Error(), test.err) { + t.Errorf("expected invalid conf error, got: %v", err) + } + }) + } +} + +func TestParseWithNoValuesEmptyConfigsAreValid(t *testing.T) { + for _, test := range []struct { + name string + conf string + }{ + { + "empty conf", + "", + }, + { + "empty conf with line breaks", + ` + + + `, + }, + { + "just comments with no values", + ` + # just comments with no values + # is still valid. + `, + }, + } { + t.Run(test.name, func(t *testing.T) { + if _, err := parse(test.conf, "", true); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} + +func TestParseWithNoValuesIncludes(t *testing.T) { + for _, test := range []struct { + input string + includes map[string]string + err string + linepos string + }{ + { + `# includes + accounts { + foo { include 'foo.conf'} + bar { users = [{user = "bar"}] } + quux { include 'quux.conf'} + } + `, + map[string]string{ + "foo.conf": ``, + "quux.conf": `?????????????`, + }, + "error parsing include file 'quux.conf', config is invalid", + "quux.conf:1:1", + }, + { + `# includes + accounts { + foo { include 'foo.conf'} + bar { include 'bar.conf'} + quux { include 'quux.conf'} + } + `, + map[string]string{ + "foo.conf": ``, // Empty configs are ok + "bar.conf": `AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA`, + "quux.conf": ` + # just some comments, + # and no key values also ok. + `, + }, + "error parsing include file 'bar.conf', config is invalid", + "bar.conf:1:34", + }, + } { + t.Run("", func(t *testing.T) { + sdir := t.TempDir() + f, err := os.CreateTemp(sdir, "nats.conf-") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(f.Name(), []byte(test.input), 066); err != nil { + t.Error(err) + } + if test.includes != nil { + for includeFile, contents := range test.includes { + inf, err := os.Create(filepath.Join(sdir, includeFile)) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(inf.Name(), []byte(contents), 066); err != nil { + t.Error(err) + } + } + } + if _, err := parse(test.input, f.Name(), true); err == nil { + t.Error("expected an error") + } else if !strings.Contains(err.Error(), test.err) || !strings.Contains(err.Error(), test.linepos) { + t.Errorf("expected invalid conf error, got: %v", err) + } + }) } } diff --git a/server/config_check_test.go b/server/config_check_test.go index f1718b35..f2df4831 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1791,6 +1791,22 @@ func TestConfigCheck(t *testing.T) { errorLine: 9, errorPos: 9, }, + name: "show warnings on empty configs without values", + config: ``, + warningErr: errors.New(`config has no values or is empty`), + errorLine: 0, + errorPos: 0, + reason: "", + }, + { + name: "show warnings on empty configs without values and only comments", + config: `# Valid file but has no usable values. + `, + warningErr: errors.New(`config has no values or is empty`), + errorLine: 0, + errorPos: 0, + reason: "", + }, } checkConfig := func(config string) error { @@ -1832,6 +1848,8 @@ func TestConfigCheck(t *testing.T) { if test.reason != "" { msg += ": " + test.reason } + } else if test.warningErr != nil { + msg = expectedErr.Error() } else { msg = test.reason } diff --git a/server/consumer.go b/server/consumer.go index fa593c43..860e362d 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -276,14 +276,14 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { - if consCfg.Replicas == 0 { - if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { + if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { + if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 { + // Matches old-school ephemerals only, where the replica count is 0. return 1 } return strCfg.Replicas - } else { - return consCfg.Replicas } + return consCfg.Replicas } // Consumer is a jetstream consumer. @@ -1219,7 +1219,7 @@ func (o *consumer) setLeader(isLeader bool) { if o.dthresh > 0 && (o.isPullMode() || !o.active) { // Pull consumer. We run the dtmr all the time for this one. stopAndClearTimer(&o.dtmr) - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } // If we are not in ReplayInstant mode mark us as in replay state until resolved. @@ -1249,7 +1249,6 @@ func (o *consumer) setLeader(isLeader bool) { if pullMode { // Now start up Go routine to process inbound next message requests. go o.processInboundNextMsgReqs(qch) - } // If we are R>1 spin up our proposal loop. @@ -1268,7 +1267,10 @@ func (o *consumer) setLeader(isLeader bool) { close(o.qch) o.qch = nil } - // Make sure to clear out any re delivery queues + // Stop any inactivity timers. Should only be running on leaders. + stopAndClearTimer(&o.dtmr) + + // Make sure to clear out any re-deliver queues stopAndClearTimer(&o.ptmr) o.rdq = nil o.rdqi.Empty() @@ -1285,9 +1287,6 @@ func (o *consumer) setLeader(isLeader bool) { // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) - if !o.isDurable() { - stopAndClearTimer(&o.dtmr) - } o.nextMsgReqs.drain() } else if o.srv.gateway.enabled { stopAndClearTimer(&o.gwdtmr) @@ -1478,7 +1477,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { // If we do not have interest anymore and have a delete threshold set, then set // a timer to delete us. We wait for a bit in case of server reconnect. if !interest && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) return true } return false @@ -1505,7 +1504,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh - elapsed) } else { - o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive) } o.mu.Unlock() return @@ -1515,7 +1514,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh) } else { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } o.mu.Unlock() return @@ -1769,7 +1768,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { stopAndClearTimer(&o.dtmr) // Restart timer only if we are the leader. if o.isLeader() && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } } diff --git a/server/events.go b/server/events.go index e5e6af5d..16cd236d 100644 --- a/server/events.go +++ b/server/events.go @@ -56,6 +56,8 @@ const ( connsRespSubj = "$SYS._INBOX_.%s" accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS" accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility + lameDuckEventSubj = "$SYS.SERVER.%s.LAMEDUCK" + shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" clientKickReqSubj = "$SYS.REQ.SERVER.%s.KICK" clientLDMReqSubj = "$SYS.REQ.SERVER.%s.LDM" @@ -589,6 +591,19 @@ RESET: } } +// Will send a shutdown message for lame-duck. Unlike sendShutdownEvent, this will +// not close off the send queue or reply handler, as we may still have a workload +// that needs migrating off. +// Lock should be held. +func (s *Server) sendLDMShutdownEventLocked() { + if s.sys == nil || s.sys.sendq == nil { + return + } + subj := fmt.Sprintf(lameDuckEventSubj, s.info.ID) + si := &ServerInfo{} + s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true)) +} + // Will send a shutdown message. func (s *Server) sendShutdownEvent() { s.mu.Lock() @@ -1021,6 +1036,13 @@ func (s *Server) initEventTracking() { if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } + // Listen for servers entering lame-duck mode. + // NOTE: This currently is handled in the same way as a server shutdown, but has + // a different subject in case we need to handle differently in future. + subject = fmt.Sprintf(lameDuckEventSubj, "*") + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } // Listen for account claims updates. subscribeToUpdate := true if s.accResolver != nil { diff --git a/server/events_test.go b/server/events_test.go index e3541999..b231d0ac 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1668,7 +1668,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 55, sa) + checkExpectedSubs(t, 56, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) diff --git a/server/jetstream.go b/server/jetstream.go index 2bdccdc6..ce4ae376 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -149,6 +149,9 @@ type jsAccount struct { // From server sendq *ipQueue[*pubMsg] + // For limiting only running one checkAndSync at a time. + sync atomic.Bool + // Usage/limits related fields that will be protected by usageMu usageMu sync.RWMutex limits map[string]JetStreamAccountLimits // indexed by tierName @@ -1828,6 +1831,12 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account // When we detect a skew of some sort this will verify the usage reporting is correct. // No locks should be held. func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) { + // This will run in a separate go routine, so check that we are only running once. + if !jsa.sync.CompareAndSwap(false, true) { + return + } + defer jsa.sync.Store(false) + // Hold the account read lock and the usage lock while we calculate. // We scope by tier and storage type, but if R3 File has 200 streams etc. could // show a pause. I did test with > 100 non-active streams and was 80-200ns or so. @@ -1933,7 +1942,10 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta jsa.usageMu.Unlock() if needsCheck { - jsa.checkAndSyncUsage(tierName, storeType) + // We could be holding the stream lock from up in the stack, and this + // will want the jsa lock, which would violate locking order. + // So do this in a Go routine. The function will check if it is already running. + go jsa.checkAndSyncUsage(tierName, storeType) } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6cb208c7..91ba60c1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4058,7 +4058,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. - if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false, ActionCreateOrUpdate); err == nil { + if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting, ActionCreateOrUpdate); err == nil { didCreate = true } } else { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index f99c0956..212fab66 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3379,6 +3379,90 @@ func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) { } } +func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Grab the first server and set lameduck option directly. + s := c.servers[0] + s.optsMu.Lock() + s.opts.LameDuckDuration = 5 * time.Second + s.opts.LameDuckGracePeriod = -5 * time.Second + s.optsMu.Unlock() + + // Connect to the server to keep it alive when we go into LDM. + dummy, _ := jsClientConnect(t, s) + defer dummy.Close() + + // Connect to the third server. + nc, js := jsClientConnect(t, c.servers[2]) + defer nc.Close() + + // Now put the first server into lame duck mode. + go s.lameDuckMode() + + // Wait for news to arrive that the first server has gone into + // lame duck mode and been marked offline. + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + id := s.info.ID + s := c.servers[2] + s.mu.RLock() + defer s.mu.RUnlock() + + var isOffline bool + s.nodeToInfo.Range(func(_, v any) bool { + ni := v.(nodeInfo) + if ni.id == id { + isOffline = ni.offline + return false + } + return true + }) + + if !isOffline { + return fmt.Errorf("first node is still online unexpectedly") + } + return nil + }) + + // Create a go routine that will create streams constantly. + qch := make(chan bool) + go func() { + var index int + for { + select { + case <-time.After(time.Millisecond * 25): + index++ + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("NEW_TEST_%d", index), + Subjects: []string{fmt.Sprintf("bar.%d", index)}, + Replicas: 1, + }) + if err != nil { + return + } + case <-qch: + return + } + } + }() + defer close(qch) + + // Make sure we do not have any R1 assets placed on the lameduck server. + for s.isRunning() { + s.rnMu.RLock() + if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil { + s.rnMu.RUnlock() + break + } + hasAsset := len(s.js.srv.gacc.streams()) > 0 + s.rnMu.RUnlock() + if hasAsset { + t.Fatalf("Server had an R1 asset when it should not due to lameduck mode") + } + } +} + // If a consumer has not been registered (possible in heavily loaded systems with lots of assets) // it could miss the signal of a message going away. If that message was pending and expires the // ack floor could fall below the stream first sequence. This test will force that condition and @@ -5094,3 +5178,90 @@ func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) { t.Fatalf("Expected no errors, got %d", len(errCh)) } } + +func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>", "bar.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "consumer_foo", + Durable: "consumer_foo", + FilterSubject: "foo.something", + }) + require_NoError(t, err) + + for _, replicas := range []int{3, 1, 3} { + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"bar.>"}, + Replicas: replicas, + }) + require_NoError(t, err) + c.waitOnAllCurrent() + } + + c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "consumer_foo") + + info, err := js.ConsumerInfo("TEST", "consumer_foo") + require_NoError(t, err) + require_True(t, info.Cluster != nil) + require_NotEqual(t, info.Cluster.Leader, "") + require_Equal(t, len(info.Cluster.Replicas), 2) +} + +func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Queue a msg. + sendStreamMsg(t, nc, "foo", "ok") + + thresh := 250 * time.Millisecond + + // This will start the timer. + sub, err := js.PullSubscribe("foo", "dlc", nats.InactiveThreshold(thresh)) + require_NoError(t, err) + + // Switch over leader. + cl := c.consumerLeader(globalAccountName, "TEST", "dlc") + cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "dlc") + c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc") + + // Create activity on this consumer. + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + + // This is consider activity as well. So we can watch now up to thresh to make sure consumer still active. + msgs[0].AckSync() + + // The consumer should not disappear for next `thresh` interval unless old leader does so. + timeout := time.Now().Add(thresh) + for time.Now().Before(timeout) { + _, err := js.ConsumerInfo("TEST", "dlc") + if err == nats.ErrConsumerNotFound { + t.Fatalf("Consumer deleted when it should not have been") + } + } +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 297bddfb..2e246dec 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21612,3 +21612,30 @@ func TestJetStreamLimitsToInterestPolicy(t *testing.T) { require_Equal(t, info.State.FirstSeq, 11) require_Equal(t, info.State.Msgs, 10) } + +func TestJetStreamUsageSyncDeadlock(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "foo", "hello") + + // Now purposely mess up the usage that will force a sync. + // Without the fix this will deadlock. + jsa := s.getJetStream().lookupAccount(s.GlobalAccount()) + jsa.usageMu.Lock() + st, ok := jsa.usage[_EMPTY_] + require_True(t, ok) + st.local.store = -1000 + jsa.usageMu.Unlock() + + sendStreamMsg(t, nc, "foo", "hello") +} diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 02388968..d068d720 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2551,7 +2551,7 @@ func TestLeafNodeOperatorBadCfg(t *testing.T) { cfg: ` port: -1 authorization { - users = [{user: "u", password: "p"}]} + users = [{user: "u", password: "p"}] }`, }, { @@ -3891,9 +3891,9 @@ func TestLeafNodeInterestPropagationDaisychain(t *testing.T) { aTmpl := ` port: %d leafnodes { - port: %d - } - }` + port: %d + } + ` confA := createConfFile(t, []byte(fmt.Sprintf(aTmpl, -1, -1))) sA, _ := RunServerWithConfig(confA) diff --git a/server/opts.go b/server/opts.go index 02c3489b..fd5f3234 100644 --- a/server/opts.go +++ b/server/opts.go @@ -792,6 +792,9 @@ func (o *Options) ProcessConfigFile(configFile string) error { // Collect all errors and warnings and report them all together. errors := make([]error, 0) warnings := make([]error, 0) + if len(m) == 0 { + warnings = append(warnings, fmt.Errorf("%s: config has no values or is empty", configFile)) + } // First check whether a system account has been defined, // as that is a condition for other features to be enabled. diff --git a/server/server.go b/server/server.go index 8ca5a3c3..8bd5afdc 100644 --- a/server/server.go +++ b/server/server.go @@ -3949,6 +3949,7 @@ func (s *Server) lameDuckMode() { } s.Noticef("Entering lame duck mode, stop accepting new clients") s.ldm = true + s.sendLDMShutdownEventLocked() expected := 1 s.listener.Close() s.listener = nil