Files
nats-server/server/jetstream_cluster_3_test.go
2022-10-07 09:24:54 -07:00

463 lines
12 KiB
Go

// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !skip_js_tests && !skip_js_cluster_tests_3
// +build !skip_js_tests,!skip_js_cluster_tests_3
package server
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"testing"
"time"
"github.com/nats-io/nats.go"
)
func TestJetStreamClusterRemovePeerByID(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)
// Wait for a leader
c.waitOnStreamLeader(globalAccountName, "TEST")
// Get the name of the one that is not restarted
srvName := c.opts[2].ServerName
// And its node ID
peerID := c.servers[2].Node()
nc.Close()
// Now stop the whole cluster
c.stopAll()
// Restart all but one
for i := 0; i < 2; i++ {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
c.waitOnClusterReadyWithNumPeers(2)
c.waitOnStreamLeader(globalAccountName, "TEST")
// Now attempt to remove by name, this should fail because the cluster
// was restarted and names are not persisted.
ml := c.leader()
nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()
req := &JSApiMetaServerRemoveRequest{Server: srvName}
jsreq, err := json.Marshal(req)
require_NoError(t, err)
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
var resp JSApiMetaServerRemoveResponse
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now try by ID, but first with an ID that does not match any peerID
req.Peer = "some_bad_id"
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now with the proper peer ID
req.Peer = peerID
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error == nil)
require_True(t, resp.Success)
}
func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client for API requests.
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for _, test := range []struct {
name string
storage StorageType
replicas int
}{
{"MEM-R1", MemoryStorage, 1},
{"FILE-R1", FileStorage, 1},
{"MEM-R3", MemoryStorage, 3},
{"FILE-R3", FileStorage, 3},
} {
t.Run(test.name, func(t *testing.T) {
js.DeleteStream("KV")
// Make sure setting new without DiscardPolicy also being new is error.
cfg := &StreamConfig{
Name: "KV",
Subjects: []string{"KV.>"},
Storage: test.storage,
AllowDirect: true,
DiscardNewPer: true,
MaxMsgs: 10,
Replicas: test.replicas,
}
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires discard new policy") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
// Set broad discard new policy to engage DiscardNewPer
cfg.Discard = DiscardNew
// We should also error here since we have not setup max msgs per subject.
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires max msgs per subject > 0") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
cfg.MaxMsgsPer = 1
addStream(t, nc, cfg)
// We want to test that we reject new messages on a per subject basis if the
// max msgs per subject limit has been hit, even if other limits have not.
_, err := js.Publish("KV.foo", nil)
require_NoError(t, err)
_, err = js.Publish("KV.foo", nil)
// Go client does not have const for this one.
require_Error(t, err, errors.New("nats: maximum messages per subject exceeded"))
})
}
}
func TestJetStreamClusterCreateConsumerWithReplicaOneGetsResponse(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C3")
// Update to scale down to R1, that should work (get a response)
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C3")
ci, err := js.ConsumerInfo("TEST", "C3")
require_NoError(t, err)
require_True(t, ci.Config.Replicas == 1)
require_True(t, len(ci.Cluster.Replicas) == 0)
}
func TestJetStreamClusterConsumerListPaging(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
cfg := &nats.ConsumerConfig{
Replicas: 1,
MemoryStorage: true,
AckPolicy: nats.AckExplicitPolicy,
}
// create 3000 consumers.
numConsumers := 3000
for i := 1; i <= numConsumers; i++ {
cfg.Durable = fmt.Sprintf("d-%.4d", i)
_, err := js.AddConsumer("TEST", cfg)
require_NoError(t, err)
}
// Test both names and list operations.
// Names
reqSubj := fmt.Sprintf(JSApiConsumersT, "TEST")
grabConsumerNames := func(offset int) []string {
req := fmt.Sprintf(`{"offset":%d}`, offset)
respMsg, err := nc.Request(reqSubj, []byte(req), time.Second)
require_NoError(t, err)
var resp JSApiConsumerNamesResponse
err = json.Unmarshal(respMsg.Data, &resp)
require_NoError(t, err)
// Sanity check that we are actually paging properly around limits.
if resp.Limit < len(resp.Consumers) {
t.Fatalf("Expected total limited to %d but got %d", resp.Limit, len(resp.Consumers))
}
return resp.Consumers
}
results := make(map[string]bool)
for offset := 0; len(results) < numConsumers; {
consumers := grabConsumerNames(offset)
offset += len(consumers)
for _, name := range consumers {
if results[name] {
t.Fatalf("Found duplicate %q", name)
}
results[name] = true
}
}
// List
reqSubj = fmt.Sprintf(JSApiConsumerListT, "TEST")
grabConsumerList := func(offset int) []*ConsumerInfo {
req := fmt.Sprintf(`{"offset":%d}`, offset)
respMsg, err := nc.Request(reqSubj, []byte(req), time.Second)
require_NoError(t, err)
var resp JSApiConsumerListResponse
err = json.Unmarshal(respMsg.Data, &resp)
require_NoError(t, err)
// Sanity check that we are actually paging properly around limits.
if resp.Limit < len(resp.Consumers) {
t.Fatalf("Expected total limited to %d but got %d", resp.Limit, len(resp.Consumers))
}
return resp.Consumers
}
results = make(map[string]bool)
for offset := 0; len(results) < numConsumers; {
consumers := grabConsumerList(offset)
offset += len(consumers)
for _, ci := range consumers {
name := ci.Config.Durable
if results[name] {
t.Fatalf("Found duplicate %q", name)
}
results[name] = true
}
}
}
func TestJetStreamClusterMetaRecoveryLogic(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 1,
})
require_NoError(t, err)
err = js.DeleteStream("TEST")
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
err = js.DeleteStream("TEST")
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"baz"},
Replicas: 1,
})
require_NoError(t, err)
osi, err := js.StreamInfo("TEST")
require_NoError(t, err)
c.stopAll()
c.restartAll()
c.waitOnLeader()
c.waitOnStreamLeader("$G", "TEST")
s = c.randomNonLeader()
nc, js = jsClientConnect(t, s)
defer nc.Close()
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if !reflect.DeepEqual(si.Config, osi.Config) {
t.Fatalf("Expected %+v, but got %+v", osi.Config, si.Config)
}
}
func TestJetStreamClusterDeleteConsumerWhileServerDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
s := c.randomNonConsumerLeader("$G", "TEST", "DC")
s.Shutdown()
c.waitOnLeader() // In case that was metaleader.
nc, js = jsClientConnect(t, c.randomNonLeader()) // In case we were connected there.
defer nc.Close()
err = js.DeleteConsumer("TEST", "DC")
require_NoError(t, err)
// Restart.
s = c.restartServer(s)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
hs := s.healthz(&HealthzOptions{
JSEnabled: true,
JSServerOnly: false,
})
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure we can not see it on the server that was down at the time of delete.
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if o := mset.lookupConsumer("DC"); o != nil {
t.Fatalf("Expected to not find consumer, but did")
}
// Now repeat but force a meta snapshot.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
s = c.randomNonConsumerLeader("$G", "TEST", "DC")
s.Shutdown()
c.waitOnLeader() // In case that was metaleader.
nc, js = jsClientConnect(t, c.randomNonLeader()) // In case we were connected there.
defer nc.Close()
err = js.DeleteConsumer("TEST", "DC")
require_NoError(t, err)
err = c.leader().JetStreamSnapshotMeta()
require_NoError(t, err)
// Restart.
s = c.restartServer(s)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
hs := s.healthz(&HealthzOptions{
JSEnabled: true,
JSServerOnly: false,
})
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure we can not see it on the server that was down at the time of delete.
mset, err = s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if o := mset.lookupConsumer("DC"); o != nil {
t.Fatalf("Expected to not find consumer, but did")
}
}