Files
nats-server/server/jetstream_cluster_3_test.go
Derek Collison ff2cd1d7f9 Fixed test and bug that would override consumer replicas.
Signed-off-by: Derek Collison <derek@nats.io>
2022-10-25 14:35:20 -07:00

939 lines
25 KiB
Go

// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !skip_js_tests && !skip_js_cluster_tests_3
// +build !skip_js_tests,!skip_js_cluster_tests_3
package server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/nats-io/nats.go"
)
func TestJetStreamClusterRemovePeerByID(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)
// Wait for a leader
c.waitOnStreamLeader(globalAccountName, "TEST")
// Get the name of the one that is not restarted
srvName := c.opts[2].ServerName
// And its node ID
peerID := c.servers[2].Node()
nc.Close()
// Now stop the whole cluster
c.stopAll()
// Restart all but one
for i := 0; i < 2; i++ {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
c.waitOnClusterReadyWithNumPeers(2)
c.waitOnStreamLeader(globalAccountName, "TEST")
// Now attempt to remove by name, this should fail because the cluster
// was restarted and names are not persisted.
ml := c.leader()
nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()
req := &JSApiMetaServerRemoveRequest{Server: srvName}
jsreq, err := json.Marshal(req)
require_NoError(t, err)
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
var resp JSApiMetaServerRemoveResponse
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now try by ID, but first with an ID that does not match any peerID
req.Peer = "some_bad_id"
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now with the proper peer ID
req.Peer = peerID
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error == nil)
require_True(t, resp.Success)
}
func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client for API requests.
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for _, test := range []struct {
name string
storage StorageType
replicas int
}{
{"MEM-R1", MemoryStorage, 1},
{"FILE-R1", FileStorage, 1},
{"MEM-R3", MemoryStorage, 3},
{"FILE-R3", FileStorage, 3},
} {
t.Run(test.name, func(t *testing.T) {
js.DeleteStream("KV")
// Make sure setting new without DiscardPolicy also being new is error.
cfg := &StreamConfig{
Name: "KV",
Subjects: []string{"KV.>"},
Storage: test.storage,
AllowDirect: true,
DiscardNewPer: true,
MaxMsgs: 10,
Replicas: test.replicas,
}
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires discard new policy") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
// Set broad discard new policy to engage DiscardNewPer
cfg.Discard = DiscardNew
// We should also error here since we have not setup max msgs per subject.
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires max msgs per subject > 0") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
cfg.MaxMsgsPer = 1
addStream(t, nc, cfg)
// We want to test that we reject new messages on a per subject basis if the
// max msgs per subject limit has been hit, even if other limits have not.
_, err := js.Publish("KV.foo", nil)
require_NoError(t, err)
_, err = js.Publish("KV.foo", nil)
// Go client does not have const for this one.
require_Error(t, err, errors.New("nats: maximum messages per subject exceeded"))
})
}
}
func TestJetStreamClusterCreateConsumerWithReplicaOneGetsResponse(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C3")
// Update to scale down to R1, that should work (get a response)
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C3")
ci, err := js.ConsumerInfo("TEST", "C3")
require_NoError(t, err)
require_True(t, ci.Config.Replicas == 1)
require_True(t, len(ci.Cluster.Replicas) == 0)
}
func TestJetStreamClusterConsumerListPaging(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
cfg := &nats.ConsumerConfig{
Replicas: 1,
MemoryStorage: true,
AckPolicy: nats.AckExplicitPolicy,
}
// create 3000 consumers.
numConsumers := 3000
for i := 1; i <= numConsumers; i++ {
cfg.Durable = fmt.Sprintf("d-%.4d", i)
_, err := js.AddConsumer("TEST", cfg)
require_NoError(t, err)
}
// Test both names and list operations.
// Names
reqSubj := fmt.Sprintf(JSApiConsumersT, "TEST")
grabConsumerNames := func(offset int) []string {
req := fmt.Sprintf(`{"offset":%d}`, offset)
respMsg, err := nc.Request(reqSubj, []byte(req), time.Second)
require_NoError(t, err)
var resp JSApiConsumerNamesResponse
err = json.Unmarshal(respMsg.Data, &resp)
require_NoError(t, err)
// Sanity check that we are actually paging properly around limits.
if resp.Limit < len(resp.Consumers) {
t.Fatalf("Expected total limited to %d but got %d", resp.Limit, len(resp.Consumers))
}
return resp.Consumers
}
results := make(map[string]bool)
for offset := 0; len(results) < numConsumers; {
consumers := grabConsumerNames(offset)
offset += len(consumers)
for _, name := range consumers {
if results[name] {
t.Fatalf("Found duplicate %q", name)
}
results[name] = true
}
}
// List
reqSubj = fmt.Sprintf(JSApiConsumerListT, "TEST")
grabConsumerList := func(offset int) []*ConsumerInfo {
req := fmt.Sprintf(`{"offset":%d}`, offset)
respMsg, err := nc.Request(reqSubj, []byte(req), time.Second)
require_NoError(t, err)
var resp JSApiConsumerListResponse
err = json.Unmarshal(respMsg.Data, &resp)
require_NoError(t, err)
// Sanity check that we are actually paging properly around limits.
if resp.Limit < len(resp.Consumers) {
t.Fatalf("Expected total limited to %d but got %d", resp.Limit, len(resp.Consumers))
}
return resp.Consumers
}
results = make(map[string]bool)
for offset := 0; len(results) < numConsumers; {
consumers := grabConsumerList(offset)
offset += len(consumers)
for _, ci := range consumers {
name := ci.Config.Durable
if results[name] {
t.Fatalf("Found duplicate %q", name)
}
results[name] = true
}
}
}
func TestJetStreamClusterMetaRecoveryLogic(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 1,
})
require_NoError(t, err)
err = js.DeleteStream("TEST")
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
err = js.DeleteStream("TEST")
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"baz"},
Replicas: 1,
})
require_NoError(t, err)
osi, err := js.StreamInfo("TEST")
require_NoError(t, err)
c.stopAll()
c.restartAll()
c.waitOnLeader()
c.waitOnStreamLeader("$G", "TEST")
s = c.randomNonLeader()
nc, js = jsClientConnect(t, s)
defer nc.Close()
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if !reflect.DeepEqual(si.Config, osi.Config) {
t.Fatalf("Expected %+v, but got %+v", osi.Config, si.Config)
}
}
func TestJetStreamClusterDeleteConsumerWhileServerDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
s := c.randomNonConsumerLeader("$G", "TEST", "DC")
s.Shutdown()
c.waitOnLeader() // In case that was metaleader.
nc, js = jsClientConnect(t, c.randomNonLeader()) // In case we were connected there.
defer nc.Close()
err = js.DeleteConsumer("TEST", "DC")
require_NoError(t, err)
// Restart.
s = c.restartServer(s)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
hs := s.healthz(&HealthzOptions{
JSEnabled: true,
JSServerOnly: false,
})
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure we can not see it on the server that was down at the time of delete.
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if o := mset.lookupConsumer("DC"); o != nil {
t.Fatalf("Expected to not find consumer, but did")
}
// Now repeat but force a meta snapshot.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
s = c.randomNonConsumerLeader("$G", "TEST", "DC")
s.Shutdown()
c.waitOnLeader() // In case that was metaleader.
nc, js = jsClientConnect(t, c.randomNonLeader()) // In case we were connected there.
defer nc.Close()
err = js.DeleteConsumer("TEST", "DC")
require_NoError(t, err)
err = c.leader().JetStreamSnapshotMeta()
require_NoError(t, err)
// Restart.
s = c.restartServer(s)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
hs := s.healthz(&HealthzOptions{
JSEnabled: true,
JSServerOnly: false,
})
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure we can not see it on the server that was down at the time of delete.
mset, err = s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if o := mset.lookupConsumer("DC"); o != nil {
t.Fatalf("Expected to not find consumer, but did")
}
}
func TestJetStreamClusterNegativeReplicas(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
testBadReplicas := func(t *testing.T, s *Server, name string) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: name,
Replicas: -1,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
_, err = js.AddStream(&nats.StreamConfig{
Name: name,
Replicas: 1,
})
require_NoError(t, err)
// Check upadte now.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: name,
Replicas: -11,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
// Now same for consumers
durName := fmt.Sprintf("%s_dur", name)
_, err = js.AddConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: -1,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
_, err = js.AddConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: 1,
})
require_NoError(t, err)
// Check update now
_, err = js.UpdateConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: -11,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
}
t.Run("Standalone", func(t *testing.T) { testBadReplicas(t, s, "TEST1") })
t.Run("Clustered", func(t *testing.T) { testBadReplicas(t, c.randomServer(), "TEST2") })
}
func TestJetStreamClusterUserGivenConsName(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
test := func(t *testing.T, s *Server, stream string, replicas int, cons string) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: stream,
Replicas: replicas,
})
require_NoError(t, err)
cc := &CreateConsumerRequest{
Stream: stream,
Config: ConsumerConfig{
Name: cons,
FilterSubject: stream,
InactiveThreshold: 10 * time.Second,
},
}
subj := fmt.Sprintf(JSApiConsumerCreateExT, stream, cons, stream)
req, err := json.Marshal(cc)
require_NoError(t, err)
reply, err := nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
var cresp JSApiConsumerCreateResponse
json.Unmarshal(reply.Data, &cresp)
if cresp.Error != nil {
t.Fatalf("Unexpected error: %v", cresp.Error)
}
require_Equal(t, cresp.Name, cons)
require_Equal(t, cresp.Config.Name, cons)
// Resend the add request but before change something that the server
// should reject since the consumer already exist and we don't support
// the update of the consumer that way.
cc.Config.DeliverPolicy = DeliverNew
req, err = json.Marshal(cc)
require_NoError(t, err)
reply, err = nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
cresp = JSApiConsumerCreateResponse{}
json.Unmarshal(reply.Data, &cresp)
require_Error(t, cresp.Error, NewJSConsumerCreateError(errors.New("deliver policy can not be updated")))
}
t.Run("Standalone", func(t *testing.T) { test(t, s, "TEST", 1, "cons") })
t.Run("Clustered R1", func(t *testing.T) { test(t, c.randomServer(), "TEST2", 1, "cons2") })
t.Run("Clustered R3", func(t *testing.T) { test(t, c.randomServer(), "TEST3", 3, "cons3") })
}
func TestJetStreamClusterUserGivenConsNameWithLeaderChange(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
consName := "myephemeral"
cc := &CreateConsumerRequest{
Stream: "TEST",
Config: ConsumerConfig{
Name: consName,
FilterSubject: "foo",
InactiveThreshold: time.Hour,
Replicas: 3,
},
}
subj := fmt.Sprintf(JSApiConsumerCreateExT, "TEST", consName, "foo")
req, err := json.Marshal(cc)
require_NoError(t, err)
reply, err := nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
var cresp JSApiConsumerCreateResponse
json.Unmarshal(reply.Data, &cresp)
if cresp.Error != nil {
t.Fatalf("Unexpected error: %v", cresp.Error)
}
require_Equal(t, cresp.Name, consName)
require_Equal(t, cresp.Config.Name, consName)
// Consumer leader name
clname := cresp.ConsumerInfo.Cluster.Leader
nreq := &JSApiConsumerGetNextRequest{Batch: 1, Expires: time.Second}
req, err = json.Marshal(nreq)
require_NoError(t, err)
sub := natsSubSync(t, nc, "xxx")
rsubj := fmt.Sprintf(JSApiRequestNextT, "TEST", consName)
err = nc.PublishRequest(rsubj, "xxx", req)
require_NoError(t, err)
msg := natsNexMsg(t, sub, time.Second)
require_Equal(t, string(msg.Data), "msg")
// Shutdown the consumer leader
cl := c.serverByName(clname)
cl.Shutdown()
// Wait for a bit to be sure that we lost leadership
time.Sleep(250 * time.Millisecond)
// Wait for new leader
c.waitOnStreamLeader(globalAccountName, "TEST")
c.waitOnConsumerLeader(globalAccountName, "TEST", consName)
// Make sure we can still consume.
err = nc.PublishRequest(rsubj, "xxx", req)
require_NoError(t, err)
msg = natsNexMsg(t, sub, time.Second)
require_Equal(t, string(msg.Data), "msg")
}
func TestJetStreamClusterMirrorCrossDomainOnLeadnodeNoSystemShare(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "CORE", _EMPTY_, 3, 18033, true)
defer c.shutdown()
tmpl = strings.Replace(jsClusterTemplWithSingleLeafNode, "store_dir:", "domain: SPOKE, store_dir:", 1)
ln := c.createLeafNodeWithTemplateNoSystem("LN-SPOKE", tmpl)
defer ln.Shutdown()
checkLeafNodeConnectedCount(t, ln, 1)
// Create origin stream in hub.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
MaxMsgsPerSubject: 10,
AllowDirect: true,
})
require_NoError(t, err)
// Now create the mirror on the leafnode.
lnc, ljs := jsClientConnect(t, ln)
defer lnc.Close()
_, err = ljs.AddStream(&nats.StreamConfig{
Name: "M",
MaxMsgsPerSubject: 10,
AllowDirect: true,
MirrorDirect: true,
Mirror: &nats.StreamSource{
Name: "TEST",
External: &nats.ExternalStream{
APIPrefix: "$JS.HUB.API",
},
},
})
require_NoError(t, err)
// Publish to the hub stream and make sure the mirror gets those messages.
for i := 0; i < 20; i++ {
js.Publish("foo", nil)
}
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == 10)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
si, err := ljs.StreamInfo("M")
require_NoError(t, err)
if si.State.Msgs == 10 {
return nil
}
return fmt.Errorf("State not current: %+v", si.State)
})
}
func TestJetStreamClusterFirstSeqMismatch(t *testing.T) {
c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "C", 3,
func(serverName, clusterName, storeDir, conf string) string {
tf := createFile(t, "")
logName := tf.Name()
tf.Close()
return fmt.Sprintf("%s\nlogfile: '%s'", conf, logName)
})
defer c.shutdown()
rs := c.randomServer()
nc, js := jsClientConnect(t, rs)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
MaxAge: 2 * time.Second,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
mset, err := c.streamLeader(globalAccountName, "TEST").GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
if rs == nl {
nc.Close()
for _, s := range c.servers {
if s != nl {
nc, _ = jsClientConnect(t, s)
defer nc.Close()
break
}
}
}
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})
go func() {
defer wg.Done()
for i := 0; ; i++ {
sendStreamMsg(t, nc, "foo", "msg")
select {
case <-ch:
return
default:
}
}
}()
time.Sleep(2500 * time.Millisecond)
nl.Shutdown()
time.Sleep(500 * time.Millisecond)
node.InstallSnapshot(mset.stateSnapshot())
time.Sleep(3500 * time.Millisecond)
c.restartServer(nl)
c.waitOnAllCurrent()
close(ch)
wg.Wait()
log := nl.getOpts().LogFile
nl.Shutdown()
content, err := os.ReadFile(log)
require_NoError(t, err)
if bytes.Contains(content, []byte(errFirstSequenceMismatch.Error())) {
t.Fatalf("First sequence mismatch occurred!")
}
}
func TestJetStreamClusterConsumerInactiveThreshold(t *testing.T) {
// Create a standalone, a cluster, and a super cluster
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
test := func(t *testing.T, c *cluster, s *Server, replicas int) {
if c != nil {
s = c.randomServer()
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
sname := fmt.Sprintf("TEST%d", replicas)
_, err := js.AddStream(&nats.StreamConfig{
Name: sname,
Subjects: []string{sname},
Replicas: replicas,
})
require_NoError(t, err)
if c != nil {
c.waitOnStreamLeader(globalAccountName, sname)
}
for i := 0; i < 10; i++ {
js.PublishAsync(sname, []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
waitOnCleanup := func(ci *nats.ConsumerInfo) {
t.Helper()
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
_, err := js.ConsumerInfo(ci.Stream, ci.Name)
if err == nil {
return fmt.Errorf("Consumer still present")
}
return nil
})
}
// Test to make sure inactive threshold is enforced for all types.
// Ephemeral and Durable, both push and pull.
// Ephemeral Push (no bind to deliver subject)
ci, err := js.AddConsumer(sname, &nats.ConsumerConfig{
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Ephemeral Pull
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Support InactiveThresholds for Durables as well.
// Durable Push (no bind to deliver subject)
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d1",
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Durable Push (no bind to deliver subject) with an activity
// threshold set after creation
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d2",
DeliverSubject: "_no_bind_",
})
require_NoError(t, err)
if c != nil {
c.waitOnConsumerLeader(globalAccountName, sname, "d2")
}
_, err = js.UpdateConsumer(sname, &nats.ConsumerConfig{
Durable: "d2",
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Durable Pull
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d3",
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Durable Pull with an inactivity threshold set after creation
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d4",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
if c != nil {
c.waitOnConsumerLeader(globalAccountName, sname, "d4")
}
_, err = js.UpdateConsumer(sname, &nats.ConsumerConfig{
Durable: "d4",
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
}
t.Run("standalone", func(t *testing.T) { test(t, nil, s, 1) })
t.Run("cluster-r1", func(t *testing.T) { test(t, c, nil, 1) })
t.Run("cluster-r3", func(t *testing.T) { test(t, c, nil, 3) })
t.Run("super-cluster-r1", func(t *testing.T) { test(t, sc.randomCluster(), nil, 1) })
t.Run("super-cluster-r3", func(t *testing.T) { test(t, sc.randomCluster(), nil, 3) })
}