From 84536761a93b5fee6c72e62df1b95627bb13bb3f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 22 Aug 2023 08:27:44 -0700 Subject: [PATCH 1/4] Bump to 2.9.22-beta Signed-off-by: Derek Collison --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index d5789cba..29b590a9 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.9.21" + VERSION = "2.9.22-beta" // PROTO is the currently supported protocol. // 0 was the original From 62242a72153eecdeec5dfb045b8210381754b4c1 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 22 Aug 2023 12:02:42 -0700 Subject: [PATCH 2/4] Fix JSON compatibility in conf format Signed-off-by: Waldemar Quevedo --- conf/lex.go | 1 + conf/lex_test.go | 40 ++++++++++ conf/parse.go | 8 +- conf/parse_test.go | 194 ++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 231 insertions(+), 12 deletions(-) diff --git a/conf/lex.go b/conf/lex.go index caee9078..b18447df 100644 --- a/conf/lex.go +++ b/conf/lex.go @@ -78,6 +78,7 @@ const ( topOptTerm = '}' blockStart = '(' blockEnd = ')' + mapEndString = string(mapEnd) ) type stateFn func(lx *lexer) stateFn diff --git a/conf/lex_test.go b/conf/lex_test.go index 7fc4df43..5f54addf 100644 --- a/conf/lex_test.go +++ b/conf/lex_test.go @@ -1471,6 +1471,8 @@ func TestJSONCompat(t *testing.T) { expected: []item{ {itemKey, "http_port", 3, 28}, {itemInteger, "8223", 3, 40}, + {itemKey, "}", 4, 25}, + {itemEOF, "", 0, 0}, }, }, { @@ -1486,6 +1488,8 @@ func TestJSONCompat(t *testing.T) { {itemInteger, "8223", 3, 40}, {itemKey, "port", 4, 28}, {itemInteger, "4223", 4, 35}, + {itemKey, "}", 5, 25}, + {itemEOF, "", 0, 0}, }, }, { @@ -1510,6 +1514,8 @@ func TestJSONCompat(t *testing.T) { {itemBool, "true", 6, 36}, {itemKey, "max_control_line", 7, 28}, {itemInteger, "1024", 7, 47}, + {itemKey, "}", 8, 25}, + {itemEOF, "", 0, 0}, }, }, { @@ -1521,6 +1527,7 @@ func TestJSONCompat(t *testing.T) { {itemInteger, "8224", 1, 14}, {itemKey, "port", 1, 20}, {itemInteger, "4224", 1, 27}, + {itemEOF, "", 0, 0}, }, }, { @@ -1533,6 +1540,8 @@ func TestJSONCompat(t *testing.T) { {itemInteger, "8225", 1, 14}, {itemKey, "port", 1, 20}, {itemInteger, "4225", 1, 27}, + {itemKey, "}", 2, 25}, + {itemEOF, "", 0, 0}, }, }, { @@ -1557,6 +1566,8 @@ func TestJSONCompat(t *testing.T) { {itemString, "nats://127.0.0.1:4224", 1, 140}, {itemArrayEnd, "", 1, 163}, {itemMapEnd, "", 1, 164}, + {itemKey, "}", 14, 25}, + {itemEOF, "", 0, 0}, }, }, { @@ -1594,6 +1605,35 @@ func TestJSONCompat(t *testing.T) { {itemString, "nats://127.0.0.1:4224", 11, 32}, {itemArrayEnd, "", 12, 30}, {itemMapEnd, "", 13, 28}, + {itemKey, "}", 14, 25}, + {itemEOF, "", 0, 0}, + }, + }, + { + name: "should support JSON with blocks", + input: `{ + "jetstream": { + "store_dir": "/tmp/nats" + "max_mem": 1000000, + }, + "port": 4222, + "server_name": "nats1" + } + `, + expected: []item{ + {itemKey, "jetstream", 2, 28}, + {itemMapStart, "", 2, 41}, + {itemKey, "store_dir", 3, 30}, + {itemString, "/tmp/nats", 3, 43}, + {itemKey, "max_mem", 4, 30}, + {itemInteger, "1000000", 4, 40}, + {itemMapEnd, "", 5, 28}, + {itemKey, "port", 6, 28}, + {itemInteger, "4222", 6, 35}, + {itemKey, "server_name", 7, 28}, + {itemString, "nats1", 7, 43}, + {itemKey, "}", 8, 25}, + {itemEOF, "", 0, 0}, }, }, } { diff --git a/conf/parse.go b/conf/parse.go index 05713409..c104c307 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -137,16 +137,18 @@ func parse(data, fp string, pedantic bool) (p *parser, err error) { } p.pushContext(p.mapping) - var prevItem itemType + var prevItem item for { it := p.next() if it.typ == itemEOF { - if prevItem == itemKey { + // Here we allow the final character to be a bracket '}' + // in order to support JSON like configurations. + if prevItem.typ == itemKey && prevItem.val != mapEndString { return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos) } break } - prevItem = it.typ + prevItem = it if err := p.processItem(it, fp); err != nil { return nil, err } diff --git a/conf/parse_test.go b/conf/parse_test.go index a6d5d2e8..4e965d69 100644 --- a/conf/parse_test.go +++ b/conf/parse_test.go @@ -442,15 +442,6 @@ func TestParseWithNoValuesAreInvalid(t *testing.T) { `, "config is invalid (:3:25)", }, - { - // trailing brackets accidentally can become keys, these are also invalid. - "trailing brackets after config", - ` - accounts { users = [{}]} - } - `, - "config is invalid (:4:25)", - }, } { t.Run(test.name, func(t *testing.T) { if _, err := parse(test.conf, "", true); err == nil { @@ -494,6 +485,48 @@ func TestParseWithNoValuesEmptyConfigsAreValid(t *testing.T) { } } +func TestParseWithTrailingBracketsAreValid(t *testing.T) { + for _, test := range []struct { + name string + conf string + }{ + { + "empty conf", + "{}", + }, + { + "just comments with no values", + ` + { + # comments in the body + } + `, + }, + { + // trailing brackets accidentally can become keys, + // this is valid since needed to support JSON like configs.. + "trailing brackets after config", + ` + accounts { users = [{}]} + } + `, + }, + { + "wrapped in brackets", + `{ + accounts { users = [{}]} + } + `, + }, + } { + t.Run(test.name, func(t *testing.T) { + if _, err := parse(test.conf, "", true); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} + func TestParseWithNoValuesIncludes(t *testing.T) { for _, test := range []struct { input string @@ -564,3 +597,146 @@ func TestParseWithNoValuesIncludes(t *testing.T) { }) } } + +func TestJSONParseCompat(t *testing.T) { + for _, test := range []struct { + name string + input string + includes map[string]string + expected map[string]interface{} + }{ + { + "JSON with nested blocks", + ` + { + "http_port": 8227, + "port": 4227, + "write_deadline": "1h", + "cluster": { + "port": 6222, + "routes": [ + "nats://127.0.0.1:4222", + "nats://127.0.0.1:4223", + "nats://127.0.0.1:4224" + ] + } + } + `, + nil, + map[string]interface{}{ + "http_port": int64(8227), + "port": int64(4227), + "write_deadline": "1h", + "cluster": map[string]interface{}{ + "port": int64(6222), + "routes": []interface{}{ + "nats://127.0.0.1:4222", + "nats://127.0.0.1:4223", + "nats://127.0.0.1:4224", + }, + }, + }, + }, + { + "JSON with nested blocks", + `{ + "jetstream": { + "store_dir": "/tmp/nats" + "max_mem": 1000000, + }, + "port": 4222, + "server_name": "nats1" + } + `, + nil, + map[string]interface{}{ + "jetstream": map[string]interface{}{ + "store_dir": "/tmp/nats", + "max_mem": int64(1_000_000), + }, + "port": int64(4222), + "server_name": "nats1", + }, + }, + { + "JSON empty object in one line", + `{}`, + nil, + map[string]interface{}{}, + }, + { + "JSON empty object with line breaks", + ` + { + } + `, + nil, + map[string]interface{}{}, + }, + { + "JSON includes", + ` + accounts { + foo { include 'foo.json' } + bar { include 'bar.json' } + quux { include 'quux.json' } + } + `, + map[string]string{ + "foo.json": `{ "users": [ {"user": "foo"} ] }`, + "bar.json": `{ + "users": [ {"user": "bar"} ] + }`, + "quux.json": `{}`, + }, + map[string]interface{}{ + "accounts": map[string]interface{}{ + "foo": map[string]interface{}{ + "users": []interface{}{ + map[string]interface{}{ + "user": "foo", + }, + }, + }, + "bar": map[string]interface{}{ + "users": []interface{}{ + map[string]interface{}{ + "user": "bar", + }, + }, + }, + "quux": map[string]interface{}{}, + }, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + sdir := t.TempDir() + f, err := os.CreateTemp(sdir, "nats.conf-") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(f.Name(), []byte(test.input), 066); err != nil { + t.Error(err) + } + if test.includes != nil { + for includeFile, contents := range test.includes { + inf, err := os.Create(filepath.Join(sdir, includeFile)) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(inf.Name(), []byte(contents), 066); err != nil { + t.Error(err) + } + } + } + m, err := ParseFile(f.Name()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(m, test.expected) { + t.Fatalf("Not Equal:\nReceived: '%+v'\nExpected: '%+v'\n", m, test.expected) + } + }) + } +} From 2b2fbf7359f3bac04e1452adbe24d4abfcf25a8a Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 22 Aug 2023 13:37:12 -0700 Subject: [PATCH 3/4] Bump to v2.9.22-beta.1 Signed-off-by: Waldemar Quevedo --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index 29b590a9..a489fff9 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.9.22-beta" + VERSION = "2.9.22-beta.1" // PROTO is the currently supported protocol. // 0 was the original From ddb7f9f9d5ffe369998fe988a9dce6b42692935f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 22 Aug 2023 17:45:19 -0700 Subject: [PATCH 4/4] Fix for a peer-remove of an R1 that would brick the stream. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 7 +++++- server/jetstream_super_cluster_test.go | 31 ++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 52d498bd..52ccf5bd 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5184,7 +5184,12 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe return true } - // If we are here let's remove the peer at least. + // If R1 just return to avoid bricking the stream. + if sa.Group.node == nil || len(sa.Group.Peers) == 1 { + return false + } + + // If we are here let's remove the peer at least, as long as we are R>1 for i, peer := range sa.Group.Peers { if peer == removePeer { sa.Group.Peers[i] = sa.Group.Peers[len(sa.Group.Peers)-1] diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index d217f84c..f229d46a 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -4016,3 +4016,34 @@ func TestJetStreamSuperClusterMovingR1Stream(t *testing.T) { return nil }) } + +// https://github.com/nats-io/nats-server/issues/4396 +func TestJetStreamSuperClusterR1StreamPeerRemove(t *testing.T) { + sc := createJetStreamSuperCluster(t, 1, 3) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.serverByName("C1-S1")) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + }) + require_NoError(t, err) + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + + // Call peer remove on the only peer the leader. + resp, err := nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "TEST"), []byte(`{"peer":"`+si.Cluster.Leader+`"}`), time.Second) + require_NoError(t, err) + var rpr JSApiStreamRemovePeerResponse + require_NoError(t, json.Unmarshal(resp.Data, &rpr)) + require_False(t, rpr.Success) + require_True(t, rpr.Error.ErrCode == 10075) + + // Stream should still be in place and useable. + _, err = js.StreamInfo("TEST") + require_NoError(t, err) +}