From 412dee67f1fb6712223835929e5228a1da37424d Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sun, 13 Aug 2023 19:36:57 -0700 Subject: [PATCH 01/13] config: allow empty configs, but prevent bad configs - Adds reporting the line with the bad key position that makes the config invalid. - Fixes a few tests with trailing braces which were being handled as keys and ignored before. Signed-off-by: Waldemar Quevedo --- conf/parse.go | 8 +- conf/parse_test.go | 180 +++++++++++++++++++++++++++++++++++----- server/leafnode_test.go | 8 +- 3 files changed, 166 insertions(+), 30 deletions(-) diff --git a/conf/parse.go b/conf/parse.go index 4d2a20ad..05713409 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -137,18 +137,20 @@ func parse(data, fp string, pedantic bool) (p *parser, err error) { } p.pushContext(p.mapping) + var prevItem itemType for { it := p.next() if it.typ == itemEOF { + if prevItem == itemKey { + return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos) + } break } + prevItem = it.typ if err := p.processItem(it, fp); err != nil { return nil, err } } - if len(p.mapping) == 0 { - return nil, fmt.Errorf("config has no values or is empty") - } return p, nil } diff --git a/conf/parse_test.go b/conf/parse_test.go index 3fc7c927..a6d5d2e8 100644 --- a/conf/parse_test.go +++ b/conf/parse_test.go @@ -3,6 +3,7 @@ package conf import ( "fmt" "os" + "path/filepath" "reflect" "strings" "testing" @@ -404,29 +405,162 @@ func TestParserNoInfiniteLoop(t *testing.T) { } } -func TestParseWithNoValues(t *testing.T) { - for _, test := range []string{ - ``, - `aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`, - ` aaaaaaaaaaaaaaaaaaaaaaaaaaa`, - ` aaaaaaaaaaaaaaaaaaaaaaaaaaa `, - ` - # just comments with no values - # is also invalid. - `, - ` - # with comments and no spaces to create key values - # is also an invalid config. - aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa - `, - ` - a,a,a,a,a,a,a,a,a,a,a - `, +func TestParseWithNoValuesAreInvalid(t *testing.T) { + for _, test := range []struct { + name string + conf string + err string + }{ + { + "invalid key without values", + `aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`, + "config is invalid (:1:41)", + }, + { + "invalid untrimmed key without values", + ` aaaaaaaaaaaaaaaaaaaaaaaaaaa`, + "config is invalid (:1:41)", + }, + { + "invalid untrimmed key without values", + ` aaaaaaaaaaaaaaaaaaaaaaaaaaa `, + "config is invalid (:1:41)", + }, + { + "invalid keys after comments", + ` + # with comments and no spaces to create key values + # is also an invalid config. + aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + `, + "config is invalid (:5:25)", + }, + { + "comma separated without values are invalid", + ` + a,a,a,a,a,a,a,a,a,a,a + `, + "config is invalid (:3:25)", + }, + { + // trailing brackets accidentally can become keys, these are also invalid. + "trailing brackets after config", + ` + accounts { users = [{}]} + } + `, + "config is invalid (:4:25)", + }, } { - if _, err := parse(test, "", true); err == nil { - t.Fatal("expected an error") - } else if !strings.Contains(err.Error(), "config has no values or is empty") { - t.Fatal("expected invalid conf error") - } + t.Run(test.name, func(t *testing.T) { + if _, err := parse(test.conf, "", true); err == nil { + t.Error("expected an error") + } else if !strings.Contains(err.Error(), test.err) { + t.Errorf("expected invalid conf error, got: %v", err) + } + }) + } +} + +func TestParseWithNoValuesEmptyConfigsAreValid(t *testing.T) { + for _, test := range []struct { + name string + conf string + }{ + { + "empty conf", + "", + }, + { + "empty conf with line breaks", + ` + + + `, + }, + { + "just comments with no values", + ` + # just comments with no values + # is still valid. + `, + }, + } { + t.Run(test.name, func(t *testing.T) { + if _, err := parse(test.conf, "", true); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} + +func TestParseWithNoValuesIncludes(t *testing.T) { + for _, test := range []struct { + input string + includes map[string]string + err string + linepos string + }{ + { + `# includes + accounts { + foo { include 'foo.conf'} + bar { users = [{user = "bar"}] } + quux { include 'quux.conf'} + } + `, + map[string]string{ + "foo.conf": ``, + "quux.conf": `?????????????`, + }, + "error parsing include file 'quux.conf', config is invalid", + "quux.conf:1:1", + }, + { + `# includes + accounts { + foo { include 'foo.conf'} + bar { include 'bar.conf'} + quux { include 'quux.conf'} + } + `, + map[string]string{ + "foo.conf": ``, // Empty configs are ok + "bar.conf": `AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA`, + "quux.conf": ` + # just some comments, + # and no key values also ok. + `, + }, + "error parsing include file 'bar.conf', config is invalid", + "bar.conf:1:34", + }, + } { + t.Run("", func(t *testing.T) { + sdir := t.TempDir() + f, err := os.CreateTemp(sdir, "nats.conf-") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(f.Name(), []byte(test.input), 066); err != nil { + t.Error(err) + } + if test.includes != nil { + for includeFile, contents := range test.includes { + inf, err := os.Create(filepath.Join(sdir, includeFile)) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(inf.Name(), []byte(contents), 066); err != nil { + t.Error(err) + } + } + } + if _, err := parse(test.input, f.Name(), true); err == nil { + t.Error("expected an error") + } else if !strings.Contains(err.Error(), test.err) || !strings.Contains(err.Error(), test.linepos) { + t.Errorf("expected invalid conf error, got: %v", err) + } + }) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 10caf8cb..e6577fd3 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2540,7 +2540,7 @@ func TestLeafNodeOperatorBadCfg(t *testing.T) { cfg: ` port: -1 authorization { - users = [{user: "u", password: "p"}]} + users = [{user: "u", password: "p"}] }`, }, { @@ -3876,9 +3876,9 @@ func TestLeafNodeInterestPropagationDaisychain(t *testing.T) { aTmpl := ` port: %d leafnodes { - port: %d - } - }` + port: %d + } + ` confA := createConfFile(t, []byte(fmt.Sprintf(aTmpl, -1, -1))) sA, _ := RunServerWithConfig(confA) From 3a20f665358b22f421165d19470901d411624415 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sun, 13 Aug 2023 23:59:50 -0700 Subject: [PATCH 02/13] config: parsed empty config only show warnings Signed-off-by: Waldemar Quevedo --- server/config_check_test.go | 19 +++++++++++++++++++ server/opts.go | 3 +++ 2 files changed, 22 insertions(+) diff --git a/server/config_check_test.go b/server/config_check_test.go index 0ba2c77f..0d7187d3 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1579,6 +1579,23 @@ func TestConfigCheck(t *testing.T) { errorLine: 5, errorPos: 6, }, + { + name: "show warnings on empty configs without values", + config: ``, + warningErr: errors.New(`config has no values or is empty`), + errorLine: 0, + errorPos: 0, + reason: "", + }, + { + name: "show warnings on empty configs without values and only comments", + config: `# Valid file but has no usable values. + `, + warningErr: errors.New(`config has no values or is empty`), + errorLine: 0, + errorPos: 0, + reason: "", + }, } checkConfig := func(config string) error { @@ -1620,6 +1637,8 @@ func TestConfigCheck(t *testing.T) { if test.reason != "" { msg += ": " + test.reason } + } else if test.warningErr != nil { + msg = expectedErr.Error() } else { msg = test.reason } diff --git a/server/opts.go b/server/opts.go index a2b231aa..1cb531da 100644 --- a/server/opts.go +++ b/server/opts.go @@ -744,6 +744,9 @@ func (o *Options) ProcessConfigFile(configFile string) error { // Collect all errors and warnings and report them all together. errors := make([]error, 0) warnings := make([]error, 0) + if len(m) == 0 { + warnings = append(warnings, fmt.Errorf("%s: config has no values or is empty", configFile)) + } // First check whether a system account has been defined, // as that is a condition for other features to be enabled. From c0636d117f0e9d283fbc0074ca1ba4ab5f65e6b0 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Aug 2023 09:30:20 +0100 Subject: [PATCH 03/13] Tweak consumer replica scaling, add unit test for orphaned consumer subjects Signed-off-by: Neil Twigg --- server/consumer.go | 5 ++-- server/jetstream_cluster.go | 6 ++--- server/jetstream_cluster_3_test.go | 41 ++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index b229b9fd..2ee3896a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -213,14 +213,13 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { - if consCfg.Replicas == 0 { + if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { return 1 } return strCfg.Replicas - } else { - return consCfg.Replicas } + return consCfg.Replicas } // Consumer is a jetstream consumer. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6af4fa42..12b44b33 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5996,11 +5996,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Need to remap any consumers. for _, ca := range osa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy. - numPeers := len(ca.Group.Peers) - if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy { + replicas := ca.Config.replicas(cfg) + if ca.Config.Durable != _EMPTY_ || replicas > 1 || cfg.Retention != LimitsPolicy { cca := ca.copyGroup() // Adjust preferred as needed. - if numPeers == 1 && len(rg.Peers) > 1 { + if replicas == 1 && len(rg.Peers) > 1 { cca.Group.Preferred = ca.Group.Peers[0] } else { cca.Group.Preferred = _EMPTY_ diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index fbf46524..eaa412cc 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4985,3 +4985,44 @@ func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) { t.Fatalf("Expected no errors, got %d", len(errCh)) } } + +func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>", "bar.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "consumer_foo", + Durable: "consumer_foo", + FilterSubject: "foo.something", + }) + require_NoError(t, err) + + for _, replicas := range []int{3, 1, 3} { + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"bar.>"}, + Replicas: replicas, + }) + require_NoError(t, err) + c.waitOnAllCurrent() + } + + c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "consumer_foo") + + info, err := js.ConsumerInfo("TEST", "consumer_foo") + require_NoError(t, err) + require_True(t, info.Cluster != nil) + require_NotEqual(t, info.Cluster.Leader, "") + require_Equal(t, len(info.Cluster.Replicas), 2) +} From 3c85490dc06660f6ca8c9ef2eb54bf84acfd3168 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Aug 2023 15:28:48 +0100 Subject: [PATCH 04/13] Backport test helper tweak Signed-off-by: Neil Twigg --- server/test_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/test_test.go b/server/test_test.go index 64717f90..50594710 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -14,7 +14,6 @@ package server import ( - "bytes" "fmt" "math/rand" "net/url" @@ -112,16 +111,16 @@ func require_Error(t *testing.T, err error, expected ...error) { t.Fatalf("Expected one of %v, got '%v'", expected, err) } -func require_Equal(t *testing.T, a, b string) { +func require_Equal[T comparable](t *testing.T, a, b T) { t.Helper() - if strings.Compare(a, b) != 0 { + if a != b { t.Fatalf("require equal, but got: %v != %v", a, b) } } -func require_NotEqual(t *testing.T, a, b [32]byte) { +func require_NotEqual[T comparable](t *testing.T, a, b T) { t.Helper() - if bytes.Equal(a[:], b[:]) { + if a == b { t.Fatalf("require not equal, but got: %v != %v", a, b) } } From c437157c1f84a5ba08becc12e647b87d9ef3ce7e Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Aug 2023 17:22:43 +0100 Subject: [PATCH 05/13] Recover in consumer assignment when asset already existed Signed-off-by: Neil Twigg --- server/consumer.go | 3 ++- server/jetstream_cluster.go | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 2ee3896a..304fb326 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -214,7 +214,8 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { - if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { + if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 { + // Matches old-school ephemerals only, where the replica count is 0. return 1 } return strCfg.Replicas diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 12b44b33..1734f096 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4001,7 +4001,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. - if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil { + if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil { didCreate = true } } else { @@ -5996,11 +5996,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Need to remap any consumers. for _, ca := range osa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy. - replicas := ca.Config.replicas(cfg) - if ca.Config.Durable != _EMPTY_ || replicas > 1 || cfg.Retention != LimitsPolicy { + numPeers := len(ca.Group.Peers) + if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy { cca := ca.copyGroup() // Adjust preferred as needed. - if replicas == 1 && len(rg.Peers) > 1 { + if numPeers == 1 && len(rg.Peers) > 1 { cca.Group.Preferred = ca.Group.Peers[0] } else { cca.Group.Preferred = _EMPTY_ From ca79ac9a73104eb698bf984cee86ea08a19b5068 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 18 Aug 2023 21:39:57 -0400 Subject: [PATCH 06/13] Update issue forms Signed-off-by: Byron Ruth --- .github/ISSUE_TEMPLATE/blank_issue.md | 4 --- .github/ISSUE_TEMPLATE/config.yml | 9 +++-- .github/ISSUE_TEMPLATE/defect.md | 23 ------------- .github/ISSUE_TEMPLATE/defect.yml | 41 +++++++++++++++++++++++ .github/ISSUE_TEMPLATE/feature_request.md | 16 --------- .github/ISSUE_TEMPLATE/proposal.yml | 34 +++++++++++++++++++ 6 files changed, 81 insertions(+), 46 deletions(-) delete mode 100644 .github/ISSUE_TEMPLATE/blank_issue.md delete mode 100644 .github/ISSUE_TEMPLATE/defect.md create mode 100644 .github/ISSUE_TEMPLATE/defect.yml delete mode 100644 .github/ISSUE_TEMPLATE/feature_request.md create mode 100644 .github/ISSUE_TEMPLATE/proposal.yml diff --git a/.github/ISSUE_TEMPLATE/blank_issue.md b/.github/ISSUE_TEMPLATE/blank_issue.md deleted file mode 100644 index 38ba9463..00000000 --- a/.github/ISSUE_TEMPLATE/blank_issue.md +++ /dev/null @@ -1,4 +0,0 @@ ---- -name: Blank Issue -about: Create an issue with a blank template. ---- diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml index 989f7213..ad105397 100644 --- a/.github/ISSUE_TEMPLATE/config.yml +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -1,5 +1,8 @@ -blank_issues_enabled: true +blank_issues_enabled: false contact_links: - - name: NATS Slack + - name: Discussion + url: https://github.com/nats-io/nats-server/discussions + about: Ideal for ideas, feedback, or longer form questions. + - name: Chat url: https://slack.nats.io - about: Please ask and answer questions in our Slack server here. + about: Ideal for short, one-off questions, general conversation, and meeting other NATS users! diff --git a/.github/ISSUE_TEMPLATE/defect.md b/.github/ISSUE_TEMPLATE/defect.md deleted file mode 100644 index 1367c509..00000000 --- a/.github/ISSUE_TEMPLATE/defect.md +++ /dev/null @@ -1,23 +0,0 @@ ---- -name: Defect Report -about: Report a bug found in the NATS Server -labels: 🐞 bug ---- - -## Defect - -Make sure that these boxes are checked before submitting your issue -- thank you! - - - [ ] Included `nats-server -DV` output - - [ ] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve) - -#### Versions of `nats-server` and affected client libraries used: - -#### OS/Container environment: - -#### Steps or code to reproduce the issue: - -#### Expected result: - -#### Actual result: - diff --git a/.github/ISSUE_TEMPLATE/defect.yml b/.github/ISSUE_TEMPLATE/defect.yml new file mode 100644 index 00000000..a7f5e688 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/defect.yml @@ -0,0 +1,41 @@ +--- +name: Defect +description: Report a defect, such as a bug or regression. +labels: + - defect +body: + - type: textarea + id: versions + attributes: + label: What version were you using? + description: Include the server version (`nats-server --version`) and any client versions when observing the issue. + validations: + required: true + - type: textarea + id: environment + attributes: + label: What environment was the server running in? + description: This pertains to the operating system, CPU architecture, and/or Docker image that was used. + validations: + required: true + - type: textarea + id: steps + attributes: + label: Is this an defect that is reproducible? + description: Provide best-effort steps to showcase the defect. + validations: + required: true + - type: textarea + id: expected + attributes: + label: Given the capability you are leveraging, describe your expectation? + description: This may be the expected behavior or performance characteristics. + validations: + required: true + - type: textarea + id: actual + attributes: + label: Given the expectation, what is the defect you are observing? + description: This may be an unexpected behavior or regression in performance. + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md deleted file mode 100644 index 51e54c6c..00000000 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ /dev/null @@ -1,16 +0,0 @@ ---- -name: Feature Request -about: Request a feature for the NATS Server -labels: 🎉 enhancement ---- - -## Feature Request - -#### Use Case: - -#### Proposed Change: - -#### Who Benefits From The Change(s)? - -#### Alternative Approaches - diff --git a/.github/ISSUE_TEMPLATE/proposal.yml b/.github/ISSUE_TEMPLATE/proposal.yml new file mode 100644 index 00000000..b4ca7fb3 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/proposal.yml @@ -0,0 +1,34 @@ +--- +name: Proposal +description: Propose an enhancement or new feature. +labels: + - proposal +body: + - type: textarea + id: usecase + attributes: + label: What motivated this proposal? + description: Describe the use case justifying this request. + validations: + required: true + - type: textarea + id: change + attributes: + label: What is the proposed change? + description: This could be a behavior change, enhanced API, or a branch new feature. + validations: + required: true + - type: textarea + id: benefits + attributes: + label: Who benefits from this change? + description: Describe how this not only benefits you. + validations: + required: false + - type: textarea + id: alternates + attributes: + label: What alternatives have you evaluated? + description: This could be using existing features or relaying on an external dependency. + validations: + required: false From a43075cabcfec181353e782abcbe3eafcb188ce3 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sat, 19 Aug 2023 06:33:19 -0400 Subject: [PATCH 07/13] Address typos Signed-off-by: Byron Ruth --- .github/ISSUE_TEMPLATE/defect.yml | 2 +- .github/ISSUE_TEMPLATE/proposal.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/defect.yml b/.github/ISSUE_TEMPLATE/defect.yml index a7f5e688..99b4800a 100644 --- a/.github/ISSUE_TEMPLATE/defect.yml +++ b/.github/ISSUE_TEMPLATE/defect.yml @@ -21,7 +21,7 @@ body: - type: textarea id: steps attributes: - label: Is this an defect that is reproducible? + label: Is this defect reproducible? description: Provide best-effort steps to showcase the defect. validations: required: true diff --git a/.github/ISSUE_TEMPLATE/proposal.yml b/.github/ISSUE_TEMPLATE/proposal.yml index b4ca7fb3..d7da0ca4 100644 --- a/.github/ISSUE_TEMPLATE/proposal.yml +++ b/.github/ISSUE_TEMPLATE/proposal.yml @@ -29,6 +29,6 @@ body: id: alternates attributes: label: What alternatives have you evaluated? - description: This could be using existing features or relaying on an external dependency. + description: This could be using existing features or relying on an external dependency. validations: required: false From e018705a082815d435ac2c6672a21027c88c6707 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 21 Aug 2023 11:36:42 -0700 Subject: [PATCH 08/13] Fixed deadlock when checkAndSync was being called as part of storing message. We violated the locking pattern, so we now make sure we do this in a separate Go routine and put checks to only run it once. Signed-off-by: Derek Collison --- server/jetstream.go | 14 +++++++++++++- server/jetstream_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/server/jetstream.go b/server/jetstream.go index d4f642ed..7143c384 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -148,6 +148,9 @@ type jsAccount struct { // From server sendq *ipQueue[*pubMsg] + // For limiting only running one checkAndSync at a time. + sync atomic.Bool + // Usage/limits related fields that will be protected by usageMu usageMu sync.RWMutex limits map[string]JetStreamAccountLimits // indexed by tierName @@ -1811,6 +1814,12 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account // When we detect a skew of some sort this will verify the usage reporting is correct. // No locks should be held. func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) { + // This will run in a separate go routine, so check that we are only running once. + if !jsa.sync.CompareAndSwap(false, true) { + return + } + defer jsa.sync.Store(true) + // Hold the account read lock and the usage lock while we calculate. // We scope by tier and storage type, but if R3 File has 200 streams etc. could // show a pause. I did test with > 100 non-active streams and was 80-200ns or so. @@ -1916,7 +1925,10 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta jsa.usageMu.Unlock() if needsCheck { - jsa.checkAndSyncUsage(tierName, storeType) + // We could be holding the stream lock from up in the stack, and this + // will want the jsa lock, which would violate locking order. + // So do this in a Go routine. The function will check if it is already running. + go jsa.checkAndSyncUsage(tierName, storeType) } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 9151c533..b2214f56 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20408,3 +20408,30 @@ func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { }) } } + +func TestJetStreamUsageSyncDeadlock(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "foo", "hello") + + // Now purposely mess up the usage that will force a sync. + // Without the fix this will deadlock. + jsa := s.getJetStream().lookupAccount(s.GlobalAccount()) + jsa.usageMu.Lock() + st, ok := jsa.usage[_EMPTY_] + require_True(t, ok) + st.local.store = -1000 + jsa.usageMu.Unlock() + + sendStreamMsg(t, nc, "foo", "hello") +} From 10f73e888e8491e6885f9bcda006738eb23a1dfd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 21 Aug 2023 12:02:28 -0700 Subject: [PATCH 09/13] Remove 1.18 compile build, support 1.19 and above Signed-off-by: Derek Collison --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index c4d99e35..75e7d2b1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -37,7 +37,7 @@ jobs: - name: "Run all tests from all other packages" env: TEST_SUITE=non_srv_pkg_tests - name: "Compile with older Go release" - go: 1.18.x + go: 1.19.12 env: TEST_SUITE=build_only script: ./scripts/runTestsOnTravis.sh $TEST_SUITE From 7cc5838a6d2332c8d30143ecf6539f1de2635533 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 18 Aug 2023 11:13:14 +0100 Subject: [PATCH 10/13] Send shutdown event on LDM so that R1 assets do not get assigned to the LDM node Signed-off-by: Neil Twigg --- server/events.go | 13 +++++ server/jetstream_cluster_3_test.go | 84 ++++++++++++++++++++++++++++++ server/server.go | 1 + 3 files changed, 98 insertions(+) diff --git a/server/events.go b/server/events.go index 1b58d912..9da504f7 100644 --- a/server/events.go +++ b/server/events.go @@ -533,6 +533,19 @@ RESET: } } +// Will send a shutdown message for lame-duck. Unlike sendShutdownEvent, this will +// not close off the send queue or reply handler, as we may still have a workload +// that needs migrating off. +// Lock should be held. +func (s *Server) sendLDMShutdownEventLocked() { + if s.sys == nil || s.sys.sendq == nil { + return + } + subj := fmt.Sprintf(shutdownEventSubj, s.info.ID) + si := &ServerInfo{} + s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true)) +} + // Will send a shutdown message. func (s *Server) sendShutdownEvent() { s.mu.Lock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index eaa412cc..3843239d 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3379,6 +3379,90 @@ func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) { } } +func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Grab the first server and set lameduck option directly. + s := c.servers[0] + s.optsMu.Lock() + s.opts.LameDuckDuration = 5 * time.Second + s.opts.LameDuckGracePeriod = -5 * time.Second + s.optsMu.Unlock() + + // Connect to the server to keep it alive when we go into LDM. + dummy, _ := jsClientConnect(t, s) + defer dummy.Close() + + // Connect to the third server. + nc, js := jsClientConnect(t, c.servers[2]) + defer nc.Close() + + // Now put the first server into lame duck mode. + go s.lameDuckMode() + + // Wait for news to arrive that the first server has gone into + // lame duck mode and been marked offline. + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + id := s.info.ID + s := c.servers[2] + s.mu.RLock() + defer s.mu.RUnlock() + + var isOffline bool + s.nodeToInfo.Range(func(_, v any) bool { + ni := v.(nodeInfo) + if ni.id == id { + isOffline = ni.offline + return false + } + return true + }) + + if !isOffline { + return fmt.Errorf("first node is still online unexpectedly") + } + return nil + }) + + // Create a go routine that will create streams constantly. + qch := make(chan bool) + go func() { + var index int + for { + select { + case <-time.After(time.Millisecond * 25): + index++ + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("NEW_TEST_%d", index), + Subjects: []string{fmt.Sprintf("bar.%d", index)}, + Replicas: 1, + }) + if err != nil { + return + } + case <-qch: + return + } + } + }() + defer close(qch) + + // Make sure we do not have any R1 assets placed on the lameduck server. + for s.isRunning() { + s.rnMu.RLock() + if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil { + s.rnMu.RUnlock() + break + } + hasAsset := len(s.js.srv.gacc.streams()) > 0 + s.rnMu.RUnlock() + if hasAsset { + t.Fatalf("Server had an R1 asset when it should not due to lameduck mode") + } + } +} + // If a consumer has not been registered (possible in heavily loaded systems with lots of assets) // it could miss the signal of a message going away. If that message was pending and expires the // ack floor could fall below the stream first sequence. This test will force that condition and diff --git a/server/server.go b/server/server.go index 11e715b9..3ecb4e87 100644 --- a/server/server.go +++ b/server/server.go @@ -3557,6 +3557,7 @@ func (s *Server) lameDuckMode() { } s.Noticef("Entering lame duck mode, stop accepting new clients") s.ldm = true + s.sendLDMShutdownEventLocked() expected := 1 s.listener.Close() s.listener = nil From d720a6931c71a83aa8df8715b7dc0f87d5b0f527 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 21 Aug 2023 21:42:20 +0100 Subject: [PATCH 11/13] Use own subject for LDM event Signed-off-by: Neil Twigg --- server/events.go | 10 +++++++++- server/events_test.go | 2 +- server/monitor_test.go | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/server/events.go b/server/events.go index 9da504f7..2d8283e8 100644 --- a/server/events.go +++ b/server/events.go @@ -56,6 +56,7 @@ const ( connsRespSubj = "$SYS._INBOX_.%s" accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS" accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility + lameDuckEventSubj = "$SYS.SERVER.%s.LAMEDUCK" shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR" serverStatsSubj = "$SYS.SERVER.%s.STATSZ" @@ -541,7 +542,7 @@ func (s *Server) sendLDMShutdownEventLocked() { if s.sys == nil || s.sys.sendq == nil { return } - subj := fmt.Sprintf(shutdownEventSubj, s.info.ID) + subj := fmt.Sprintf(lameDuckEventSubj, s.info.ID) si := &ServerInfo{} s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true)) } @@ -957,6 +958,13 @@ func (s *Server) initEventTracking() { if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } + // Listen for servers entering lame-duck mode. + // NOTE: This currently is handled in the same way as a server shutdown, but has + // a different subject in case we need to handle differently in future. + subject = fmt.Sprintf(lameDuckEventSubj, "*") + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } // Listen for account claims updates. subscribeToUpdate := true if s.accResolver != nil { diff --git a/server/events_test.go b/server/events_test.go index 5398beda..34a9d0d4 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1666,7 +1666,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 45, sa) + checkExpectedSubs(t, 46, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) diff --git a/server/monitor_test.go b/server/monitor_test.go index a0caa579..c98429cb 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3942,7 +3942,7 @@ func TestMonitorAccountz(t *testing.T) { body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath))) require_Contains(t, body, `"account_detail": {`) require_Contains(t, body, `"account_name": "$SYS",`) - require_Contains(t, body, `"subscriptions": 40,`) + require_Contains(t, body, `"subscriptions": 41,`) require_Contains(t, body, `"is_system": true,`) require_Contains(t, body, `"system_account": "$SYS"`) From 43314fd4395325e2a62f62d1017ae5b01572f141 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 21 Aug 2023 14:53:09 -0700 Subject: [PATCH 12/13] Fix for a bug that would allow old leaders of pull based durables to delete a consumer from an inactivity threshold. Signed-off-by: Derek Collison --- server/consumer.go | 19 ++++++------ server/jetstream_cluster_3_test.go | 46 ++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 304fb326..8cc502e9 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1091,7 +1091,7 @@ func (o *consumer) setLeader(isLeader bool) { if o.dthresh > 0 && (o.isPullMode() || !o.active) { // Pull consumer. We run the dtmr all the time for this one. stopAndClearTimer(&o.dtmr) - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } // If we are not in ReplayInstant mode mark us as in replay state until resolved. @@ -1121,7 +1121,6 @@ func (o *consumer) setLeader(isLeader bool) { if pullMode { // Now start up Go routine to process inbound next message requests. go o.processInboundNextMsgReqs(qch) - } // If we are R>1 spin up our proposal loop. @@ -1140,7 +1139,10 @@ func (o *consumer) setLeader(isLeader bool) { close(o.qch) o.qch = nil } - // Make sure to clear out any re delivery queues + // Stop any inactivity timers. Should only be running on leaders. + stopAndClearTimer(&o.dtmr) + + // Make sure to clear out any re-deliver queues stopAndClearTimer(&o.ptmr) o.rdq, o.rdqi = nil, nil o.pending = nil @@ -1156,9 +1158,6 @@ func (o *consumer) setLeader(isLeader bool) { // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) - if !o.isDurable() { - stopAndClearTimer(&o.dtmr) - } o.nextMsgReqs.drain() } else if o.srv.gateway.enabled { stopAndClearTimer(&o.gwdtmr) @@ -1349,7 +1348,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { // If we do not have interest anymore and have a delete threshold set, then set // a timer to delete us. We wait for a bit in case of server reconnect. if !interest && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) return true } return false @@ -1376,7 +1375,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh - elapsed) } else { - o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive) } o.mu.Unlock() return @@ -1386,7 +1385,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh) } else { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } o.mu.Unlock() return @@ -1640,7 +1639,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { stopAndClearTimer(&o.dtmr) // Restart timer only if we are the leader. if o.isLeader() && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index eaa412cc..3a816ce4 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5026,3 +5026,49 @@ func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) { require_NotEqual(t, info.Cluster.Leader, "") require_Equal(t, len(info.Cluster.Replicas), 2) } + +func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Queue a msg. + sendStreamMsg(t, nc, "foo", "ok") + + thresh := 250 * time.Millisecond + + // This will start the timer. + sub, err := js.PullSubscribe("foo", "dlc", nats.InactiveThreshold(thresh)) + require_NoError(t, err) + + // Switch over leader. + cl := c.consumerLeader(globalAccountName, "TEST", "dlc") + cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "dlc") + c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc") + + // Create activity on this consumer. + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + + // This is consider activity as well. So we can watch now up to thresh to make sure consumer still active. + msgs[0].AckSync() + + // The consumer should not disappear for next `thresh` interval unless old leader does so. + timeout := time.Now().Add(thresh) + for time.Now().Before(timeout) { + _, err := js.ConsumerInfo("TEST", "dlc") + if err == nats.ErrConsumerNotFound { + t.Fatalf("Consumer deleted when it should not have been") + } + } +} From 0a86bf4a9a8fa4ee775a426b90cecd05da531f6c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 21 Aug 2023 14:57:17 -0700 Subject: [PATCH 13/13] Should reset to false, not true when done Signed-off-by: Derek Collison --- server/jetstream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream.go b/server/jetstream.go index 7143c384..de79b9d4 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1818,7 +1818,7 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) if !jsa.sync.CompareAndSwap(false, true) { return } - defer jsa.sync.Store(true) + defer jsa.sync.Store(false) // Hold the account read lock and the usage lock while we calculate. // We scope by tier and storage type, but if R3 File has 200 streams etc. could