Files
nats-server/server/jetstream_cluster_3_test.go
Derek Collison 9774ad5641 Added check on publish error.
Signed-off-by: Derek Collison <derek@nats.io>
2022-09-22 07:13:57 -07:00

172 lines
5.1 KiB
Go

// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !skip_js_tests && !skip_js_cluster_tests_3
// +build !skip_js_tests,!skip_js_cluster_tests_3
package server
import (
"encoding/json"
"errors"
"strings"
"testing"
"time"
"github.com/nats-io/nats.go"
)
func TestJetStreamClusterRemovePeerByID(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)
// Wait for a leader
c.waitOnStreamLeader(globalAccountName, "TEST")
// Get the name of the one that is not restarted
srvName := c.opts[2].ServerName
// And its node ID
peerID := c.servers[2].Node()
nc.Close()
// Now stop the whole cluster
c.stopAll()
// Restart all but one
for i := 0; i < 2; i++ {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
c.waitOnClusterReadyWithNumPeers(2)
c.waitOnStreamLeader(globalAccountName, "TEST")
// Now attempt to remove by name, this should fail because the cluster
// was restarted and names are not persisted.
ml := c.leader()
nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()
req := &JSApiMetaServerRemoveRequest{Server: srvName}
jsreq, err := json.Marshal(req)
require_NoError(t, err)
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
var resp JSApiMetaServerRemoveResponse
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now try by ID, but first with an ID that does not match any peerID
req.Peer = "some_bad_id"
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now with the proper peer ID
req.Peer = peerID
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error == nil)
require_True(t, resp.Success)
}
func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client for API requests.
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for _, test := range []struct {
name string
storage StorageType
replicas int
}{
{"MEM-R1", MemoryStorage, 1},
{"FILE-R1", FileStorage, 1},
{"MEM-R3", MemoryStorage, 3},
{"FILE-R3", FileStorage, 3},
} {
t.Run(test.name, func(t *testing.T) {
js.DeleteStream("KV")
// Make sure setting new without DiscardPolicy also being new is error.
cfg := &StreamConfig{
Name: "KV",
Subjects: []string{"KV.>"},
Storage: test.storage,
AllowDirect: true,
DiscardNewPer: true,
MaxMsgs: 10,
Replicas: test.replicas,
}
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires discard new policy") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
// Set broad discard new policy to engage DiscardNewPer
cfg.Discard = DiscardNew
// We should also error here since we have not setup max msgs per subject.
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires max msgs per subject > 0") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
cfg.MaxMsgsPer = 1
addStream(t, nc, cfg)
// We want to test that we reject new messages on a per subject basis if the
// max msgs per subject limit has been hit, even if other limits have not.
_, err := js.Publish("KV.foo", nil)
require_NoError(t, err)
_, err = js.Publish("KV.foo", nil)
// Go client does not have const for this one.
require_Error(t, err, errors.New("nats: maximum messages per subject exceeded"))
})
}
}