Files
nats-server/server/jetstream_cluster_3_test.go
Tomasz Pietrek 02ba78454d Fix new replicas late MaxAge expiry
This commit fixes the issue when scaling Stream with MaxAge
and some older messages stored. Until now, old messages were not properly
expired on new replicas, because new replicas first expiry timer
was set to MaxAge duration.
This commit adds a check if received messages expiry happens before
MaxAge, meaning they're messages older than the replica.
https://github.com/nats-io/nats-server/issues/3848

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
2023-02-24 00:46:02 +01:00

2928 lines
75 KiB
Go

// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !skip_js_tests && !skip_js_cluster_tests_3
// +build !skip_js_tests,!skip_js_cluster_tests_3
package server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/nats-io/nats.go"
)
func TestJetStreamClusterRemovePeerByID(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)
// Wait for a leader
c.waitOnStreamLeader(globalAccountName, "TEST")
// Get the name of the one that is not restarted
srvName := c.opts[2].ServerName
// And its node ID
peerID := c.servers[2].Node()
nc.Close()
// Now stop the whole cluster
c.stopAll()
// Restart all but one
for i := 0; i < 2; i++ {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
c.waitOnClusterReadyWithNumPeers(2)
c.waitOnStreamLeader(globalAccountName, "TEST")
// Now attempt to remove by name, this should fail because the cluster
// was restarted and names are not persisted.
ml := c.leader()
nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()
req := &JSApiMetaServerRemoveRequest{Server: srvName}
jsreq, err := json.Marshal(req)
require_NoError(t, err)
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
var resp JSApiMetaServerRemoveResponse
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now try by ID, but first with an ID that does not match any peerID
req.Peer = "some_bad_id"
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error != nil)
require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr))
// Now with the proper peer ID
req.Peer = peerID
jsreq, err = json.Marshal(req)
require_NoError(t, err)
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
require_NoError(t, err)
resp = JSApiMetaServerRemoveResponse{}
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Error == nil)
require_True(t, resp.Success)
}
func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client for API requests.
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for _, test := range []struct {
name string
storage StorageType
replicas int
}{
{"MEM-R1", MemoryStorage, 1},
{"FILE-R1", FileStorage, 1},
{"MEM-R3", MemoryStorage, 3},
{"FILE-R3", FileStorage, 3},
} {
t.Run(test.name, func(t *testing.T) {
js.DeleteStream("KV")
// Make sure setting new without DiscardPolicy also being new is error.
cfg := &StreamConfig{
Name: "KV",
Subjects: []string{"KV.>"},
Storage: test.storage,
AllowDirect: true,
DiscardNewPer: true,
MaxMsgs: 10,
Replicas: test.replicas,
}
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires discard new policy") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
// Set broad discard new policy to engage DiscardNewPer
cfg.Discard = DiscardNew
// We should also error here since we have not setup max msgs per subject.
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires max msgs per subject > 0") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
cfg.MaxMsgsPer = 1
addStream(t, nc, cfg)
// We want to test that we reject new messages on a per subject basis if the
// max msgs per subject limit has been hit, even if other limits have not.
_, err := js.Publish("KV.foo", nil)
require_NoError(t, err)
_, err = js.Publish("KV.foo", nil)
// Go client does not have const for this one.
require_Error(t, err, errors.New("nats: maximum messages per subject exceeded"))
})
}
}
func TestJetStreamClusterCreateConsumerWithReplicaOneGetsResponse(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C3")
// Update to scale down to R1, that should work (get a response)
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C3")
ci, err := js.ConsumerInfo("TEST", "C3")
require_NoError(t, err)
require_True(t, ci.Config.Replicas == 1)
require_True(t, len(ci.Cluster.Replicas) == 0)
}
func TestJetStreamClusterMetaRecoveryLogic(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 1,
})
require_NoError(t, err)
err = js.DeleteStream("TEST")
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
err = js.DeleteStream("TEST")
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"baz"},
Replicas: 1,
})
require_NoError(t, err)
osi, err := js.StreamInfo("TEST")
require_NoError(t, err)
c.stopAll()
c.restartAll()
c.waitOnLeader()
c.waitOnStreamLeader("$G", "TEST")
s = c.randomNonLeader()
nc, js = jsClientConnect(t, s)
defer nc.Close()
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if !reflect.DeepEqual(si.Config, osi.Config) {
t.Fatalf("Expected %+v, but got %+v", osi.Config, si.Config)
}
}
func TestJetStreamClusterDeleteConsumerWhileServerDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
s := c.randomNonConsumerLeader("$G", "TEST", "DC")
s.Shutdown()
c.waitOnLeader() // In case that was metaleader.
nc, js = jsClientConnect(t, c.randomNonLeader()) // In case we were connected there.
defer nc.Close()
err = js.DeleteConsumer("TEST", "DC")
require_NoError(t, err)
// Restart.
s = c.restartServer(s)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
hs := s.healthz(&HealthzOptions{
JSEnabledOnly: false,
JSServerOnly: false,
})
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure we can not see it on the server that was down at the time of delete.
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if o := mset.lookupConsumer("DC"); o != nil {
t.Fatalf("Expected to not find consumer, but did")
}
// Now repeat but force a meta snapshot.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
s = c.randomNonConsumerLeader("$G", "TEST", "DC")
s.Shutdown()
c.waitOnLeader() // In case that was metaleader.
nc, js = jsClientConnect(t, c.randomNonLeader()) // In case we were connected there.
defer nc.Close()
err = js.DeleteConsumer("TEST", "DC")
require_NoError(t, err)
err = c.leader().JetStreamSnapshotMeta()
require_NoError(t, err)
// Restart.
s = c.restartServer(s)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
hs := s.healthz(&HealthzOptions{
JSEnabledOnly: false,
JSServerOnly: false,
})
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure we can not see it on the server that was down at the time of delete.
mset, err = s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if o := mset.lookupConsumer("DC"); o != nil {
t.Fatalf("Expected to not find consumer, but did")
}
}
func TestJetStreamClusterNegativeReplicas(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
testBadReplicas := func(t *testing.T, s *Server, name string) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: name,
Replicas: -1,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
_, err = js.AddStream(&nats.StreamConfig{
Name: name,
Replicas: 1,
})
require_NoError(t, err)
// Check upadte now.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: name,
Replicas: -11,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
// Now same for consumers
durName := fmt.Sprintf("%s_dur", name)
_, err = js.AddConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: -1,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
_, err = js.AddConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: 1,
})
require_NoError(t, err)
// Check update now
_, err = js.UpdateConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: -11,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
}
t.Run("Standalone", func(t *testing.T) { testBadReplicas(t, s, "TEST1") })
t.Run("Clustered", func(t *testing.T) { testBadReplicas(t, c.randomServer(), "TEST2") })
}
func TestJetStreamClusterUserGivenConsName(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
test := func(t *testing.T, s *Server, stream string, replicas int, cons string) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: stream,
Replicas: replicas,
})
require_NoError(t, err)
cc := &CreateConsumerRequest{
Stream: stream,
Config: ConsumerConfig{
Name: cons,
FilterSubject: stream,
InactiveThreshold: 10 * time.Second,
},
}
subj := fmt.Sprintf(JSApiConsumerCreateExT, stream, cons, stream)
req, err := json.Marshal(cc)
require_NoError(t, err)
reply, err := nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
var cresp JSApiConsumerCreateResponse
json.Unmarshal(reply.Data, &cresp)
if cresp.Error != nil {
t.Fatalf("Unexpected error: %v", cresp.Error)
}
require_Equal(t, cresp.Name, cons)
require_Equal(t, cresp.Config.Name, cons)
// Resend the add request but before change something that the server
// should reject since the consumer already exist and we don't support
// the update of the consumer that way.
cc.Config.DeliverPolicy = DeliverNew
req, err = json.Marshal(cc)
require_NoError(t, err)
reply, err = nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
cresp = JSApiConsumerCreateResponse{}
json.Unmarshal(reply.Data, &cresp)
require_Error(t, cresp.Error, NewJSConsumerCreateError(errors.New("deliver policy can not be updated")))
}
t.Run("Standalone", func(t *testing.T) { test(t, s, "TEST", 1, "cons") })
t.Run("Clustered R1", func(t *testing.T) { test(t, c.randomServer(), "TEST2", 1, "cons2") })
t.Run("Clustered R3", func(t *testing.T) { test(t, c.randomServer(), "TEST3", 3, "cons3") })
}
func TestJetStreamClusterUserGivenConsNameWithLeaderChange(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
consName := "myephemeral"
cc := &CreateConsumerRequest{
Stream: "TEST",
Config: ConsumerConfig{
Name: consName,
FilterSubject: "foo",
InactiveThreshold: time.Hour,
Replicas: 3,
},
}
subj := fmt.Sprintf(JSApiConsumerCreateExT, "TEST", consName, "foo")
req, err := json.Marshal(cc)
require_NoError(t, err)
reply, err := nc.Request(subj, req, 2*time.Second)
require_NoError(t, err)
var cresp JSApiConsumerCreateResponse
json.Unmarshal(reply.Data, &cresp)
if cresp.Error != nil {
t.Fatalf("Unexpected error: %v", cresp.Error)
}
require_Equal(t, cresp.Name, consName)
require_Equal(t, cresp.Config.Name, consName)
// Consumer leader name
clname := cresp.ConsumerInfo.Cluster.Leader
nreq := &JSApiConsumerGetNextRequest{Batch: 1, Expires: time.Second}
req, err = json.Marshal(nreq)
require_NoError(t, err)
sub := natsSubSync(t, nc, "xxx")
rsubj := fmt.Sprintf(JSApiRequestNextT, "TEST", consName)
err = nc.PublishRequest(rsubj, "xxx", req)
require_NoError(t, err)
msg := natsNexMsg(t, sub, time.Second)
require_Equal(t, string(msg.Data), "msg")
// Shutdown the consumer leader
cl := c.serverByName(clname)
cl.Shutdown()
// Wait for a bit to be sure that we lost leadership
time.Sleep(250 * time.Millisecond)
// Wait for new leader
c.waitOnStreamLeader(globalAccountName, "TEST")
c.waitOnConsumerLeader(globalAccountName, "TEST", consName)
// Make sure we can still consume.
for i := 0; i < 2; i++ {
err = nc.PublishRequest(rsubj, "xxx", req)
require_NoError(t, err)
msg = natsNexMsg(t, sub, time.Second)
if len(msg.Data) == 0 {
continue
}
require_Equal(t, string(msg.Data), "msg")
return
}
t.Fatal("Did not receive message")
}
func TestJetStreamClusterMirrorCrossDomainOnLeadnodeNoSystemShare(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "CORE", _EMPTY_, 3, 18033, true)
defer c.shutdown()
tmpl = strings.Replace(jsClusterTemplWithSingleLeafNode, "store_dir:", "domain: SPOKE, store_dir:", 1)
ln := c.createLeafNodeWithTemplateNoSystem("LN-SPOKE", tmpl)
defer ln.Shutdown()
checkLeafNodeConnectedCount(t, ln, 1)
// Create origin stream in hub.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
MaxMsgsPerSubject: 10,
AllowDirect: true,
})
require_NoError(t, err)
// Now create the mirror on the leafnode.
lnc, ljs := jsClientConnect(t, ln)
defer lnc.Close()
_, err = ljs.AddStream(&nats.StreamConfig{
Name: "M",
MaxMsgsPerSubject: 10,
AllowDirect: true,
MirrorDirect: true,
Mirror: &nats.StreamSource{
Name: "TEST",
External: &nats.ExternalStream{
APIPrefix: "$JS.HUB.API",
},
},
})
require_NoError(t, err)
// Publish to the hub stream and make sure the mirror gets those messages.
for i := 0; i < 20; i++ {
js.Publish("foo", nil)
}
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == 10)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
si, err := ljs.StreamInfo("M")
require_NoError(t, err)
if si.State.Msgs == 10 {
return nil
}
return fmt.Errorf("State not current: %+v", si.State)
})
}
func TestJetStreamClusterFirstSeqMismatch(t *testing.T) {
c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "C", 3,
func(serverName, clusterName, storeDir, conf string) string {
tf := createTempFile(t, "")
logName := tf.Name()
tf.Close()
return fmt.Sprintf("%s\nlogfile: '%s'", conf, logName)
})
defer c.shutdown()
rs := c.randomServer()
nc, js := jsClientConnect(t, rs)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
MaxAge: 2 * time.Second,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
mset, err := c.streamLeader(globalAccountName, "TEST").GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
if rs == nl {
nc.Close()
for _, s := range c.servers {
if s != nl {
nc, _ = jsClientConnect(t, s)
defer nc.Close()
break
}
}
}
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})
go func() {
defer wg.Done()
for i := 0; ; i++ {
sendStreamMsg(t, nc, "foo", "msg")
select {
case <-ch:
return
default:
}
}
}()
time.Sleep(2500 * time.Millisecond)
nl.Shutdown()
time.Sleep(500 * time.Millisecond)
node.InstallSnapshot(mset.stateSnapshot())
time.Sleep(3500 * time.Millisecond)
c.restartServer(nl)
c.waitOnAllCurrent()
close(ch)
wg.Wait()
log := nl.getOpts().LogFile
nl.Shutdown()
content, err := os.ReadFile(log)
require_NoError(t, err)
if bytes.Contains(content, []byte(errFirstSequenceMismatch.Error())) {
t.Fatalf("First sequence mismatch occurred!")
}
}
func TestJetStreamClusterConsumerInactiveThreshold(t *testing.T) {
// Create a standalone, a cluster, and a super cluster
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
test := func(t *testing.T, c *cluster, s *Server, replicas int) {
if c != nil {
s = c.randomServer()
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
sname := fmt.Sprintf("TEST%d", replicas)
_, err := js.AddStream(&nats.StreamConfig{
Name: sname,
Subjects: []string{sname},
Replicas: replicas,
})
require_NoError(t, err)
if c != nil {
c.waitOnStreamLeader(globalAccountName, sname)
}
for i := 0; i < 10; i++ {
js.PublishAsync(sname, []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
waitOnCleanup := func(ci *nats.ConsumerInfo) {
t.Helper()
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
_, err := js.ConsumerInfo(ci.Stream, ci.Name)
if err == nil {
return fmt.Errorf("Consumer still present")
}
return nil
})
}
// Test to make sure inactive threshold is enforced for all types.
// Ephemeral and Durable, both push and pull.
// Ephemeral Push (no bind to deliver subject)
ci, err := js.AddConsumer(sname, &nats.ConsumerConfig{
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Ephemeral Pull
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Support InactiveThresholds for Durables as well.
// Durable Push (no bind to deliver subject)
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d1",
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Durable Push (no bind to deliver subject) with an activity
// threshold set after creation
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d2",
DeliverSubject: "_no_bind_",
})
require_NoError(t, err)
if c != nil {
c.waitOnConsumerLeader(globalAccountName, sname, "d2")
}
_, err = js.UpdateConsumer(sname, &nats.ConsumerConfig{
Durable: "d2",
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Durable Pull
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d3",
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Durable Pull with an inactivity threshold set after creation
ci, err = js.AddConsumer(sname, &nats.ConsumerConfig{
Durable: "d4",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
if c != nil {
c.waitOnConsumerLeader(globalAccountName, sname, "d4")
}
_, err = js.UpdateConsumer(sname, &nats.ConsumerConfig{
Durable: "d4",
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
}
t.Run("standalone", func(t *testing.T) { test(t, nil, s, 1) })
t.Run("cluster-r1", func(t *testing.T) { test(t, c, nil, 1) })
t.Run("cluster-r3", func(t *testing.T) { test(t, c, nil, 3) })
t.Run("super-cluster-r1", func(t *testing.T) { test(t, sc.randomCluster(), nil, 1) })
t.Run("super-cluster-r3", func(t *testing.T) { test(t, sc.randomCluster(), nil, 3) })
}
// To capture our false warnings for clustered stream lag.
type testStreamLagWarnLogger struct {
DummyLogger
ch chan string
}
func (l *testStreamLagWarnLogger) Warnf(format string, v ...interface{}) {
msg := fmt.Sprintf(format, v...)
if strings.Contains(msg, "has high message lag") {
select {
case l.ch <- msg:
default:
}
}
}
// False triggering warnings on stream lag because not offsetting by failures.
func TestJetStreamClusterStreamLagWarning(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
sl := c.streamLeader("$G", "TEST")
l := &testStreamLagWarnLogger{ch: make(chan string, 10)}
sl.SetLogger(l, false, false)
// We only need to trigger post RAFT propose failures that increment mset.clfs.
// Dedupe with msgIDs is one, so we will use that.
m := nats.NewMsg("foo")
m.Data = []byte("OK")
m.Header.Set(JSMsgId, "zz")
// Make sure we know we will trip the warning threshold.
for i := 0; i < 2*streamLagWarnThreshold; i++ {
js.PublishMsgAsync(m)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
select {
case msg := <-l.ch:
t.Fatalf("Unexpected msg lag warning seen: %s", msg)
case <-time.After(100 * time.Millisecond):
// OK
}
}
// https://github.com/nats-io/nats-server/issues/3603
func TestJetStreamClusterSignalPullConsumersOnDelete(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
// Create 2 pull consumers.
sub1, err := js.PullSubscribe("foo", "d1")
require_NoError(t, err)
sub2, err := js.PullSubscribe("foo", "d2")
require_NoError(t, err)
// We want to make sure we get kicked out prior to the timeout
// when consumers are being deleted or the parent stream is being deleted.
// Note this should be lower case, Go client needs to be updated.
expectedErr := errors.New("nats: consumer deleted")
// Queue up the delete for sub1
time.AfterFunc(250*time.Millisecond, func() { js.DeleteConsumer("TEST", "d1") })
start := time.Now()
_, err = sub1.Fetch(1, nats.MaxWait(10*time.Second))
require_Error(t, err, expectedErr)
// Check that we bailed early.
if time.Since(start) > time.Second {
t.Fatalf("Took to long to bail out on consumer delete")
}
time.AfterFunc(250*time.Millisecond, func() { js.DeleteStream("TEST") })
start = time.Now()
_, err = sub2.Fetch(1, nats.MaxWait(10*time.Second))
require_Error(t, err, expectedErr)
if time.Since(start) > time.Second {
t.Fatalf("Took to long to bail out on stream delete")
}
}
// https://github.com/nats-io/nats-server/issues/3559
func TestJetStreamClusterSourceWithOptStartTime(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
test := func(t *testing.T, c *cluster, s *Server) {
replicas := 1
if c != nil {
s = c.randomServer()
replicas = 3
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: replicas,
})
require_NoError(t, err)
yesterday := time.Now().Add(-24 * time.Hour)
_, err = js.AddStream(&nats.StreamConfig{
Name: "SOURCE",
Replicas: replicas,
Sources: []*nats.StreamSource{{
Name: "TEST",
OptStartTime: &yesterday,
}},
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRROR",
Replicas: replicas,
Mirror: &nats.StreamSource{
Name: "TEST",
OptStartTime: &yesterday,
},
})
require_NoError(t, err)
total := 10
for i := 0; i < total; i++ {
sendStreamMsg(t, nc, "foo", "hello")
}
checkCount := func(sname string, expected int) {
t.Helper()
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo(sname)
if err != nil {
return err
}
if n := si.State.Msgs; n != uint64(expected) {
return fmt.Errorf("Expected stream %q to have %v messages, got %v", sname, expected, n)
}
return nil
})
}
checkCount("TEST", 10)
checkCount("SOURCE", 10)
checkCount("MIRROR", 10)
err = js.PurgeStream("SOURCE")
require_NoError(t, err)
err = js.PurgeStream("MIRROR")
require_NoError(t, err)
checkCount("TEST", 10)
checkCount("SOURCE", 0)
checkCount("MIRROR", 0)
nc.Close()
if c != nil {
c.stopAll()
c.restartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")
c.waitOnStreamLeader(globalAccountName, "SOURCE")
c.waitOnStreamLeader(globalAccountName, "MIRROR")
s = c.randomServer()
} else {
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
s = RunJetStreamServerOnPort(-1, sd)
}
// Wait a bit before checking because sync'ing (even with the defect)
// would not happen right away. I tried with 1 sec and test would pass,
// so need to be at least that much.
time.Sleep(2 * time.Second)
nc, js = jsClientConnect(t, s)
defer nc.Close()
checkCount("TEST", 10)
checkCount("SOURCE", 0)
checkCount("MIRROR", 0)
}
t.Run("standalone", func(t *testing.T) { test(t, nil, s) })
t.Run("cluster", func(t *testing.T) { test(t, c, nil) })
}
type networkCableUnplugged struct {
net.Conn
sync.Mutex
unplugged bool
wb bytes.Buffer
wg sync.WaitGroup
}
func (c *networkCableUnplugged) Write(b []byte) (int, error) {
c.Lock()
if c.unplugged {
c.wb.Write(b)
c.Unlock()
return len(b), nil
} else if c.wb.Len() > 0 {
c.wb.Write(b)
buf := c.wb.Bytes()
c.wb.Reset()
c.Unlock()
if _, err := c.Conn.Write(buf); err != nil {
return 0, err
}
return len(b), nil
}
c.Unlock()
return c.Conn.Write(b)
}
func (c *networkCableUnplugged) Read(b []byte) (int, error) {
c.Lock()
wait := c.unplugged
c.Unlock()
if wait {
c.wg.Wait()
}
return c.Conn.Read(b)
}
func TestJetStreamClusterScaleDownWhileNoQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
require_NoError(t, err)
for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
// Let's have a server from this R2 stream be network partitionned.
// We will take the leader, but doesn't have to be.
// To simulate partition, we will replace all its routes with a
// special connection that drops messages.
sl := c.serverByName(si.Cluster.Leader)
if s == sl {
nc.Close()
for s = c.randomServer(); s != sl; s = c.randomServer() {
}
nc, js = jsClientConnect(t, s)
defer nc.Close()
}
sl.mu.Lock()
for _, r := range sl.routes {
r.mu.Lock()
ncu := &networkCableUnplugged{Conn: r.nc, unplugged: true}
ncu.wg.Add(1)
r.nc = ncu
r.mu.Unlock()
}
sl.mu.Unlock()
// Wait for the stream info to fail
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
if err != nil {
return err
}
if si.Cluster.Leader == _EMPTY_ {
return nil
}
return fmt.Errorf("stream still has a leader")
})
// Make sure if meta leader was on same server as stream leader we make sure
// it elects new leader to receive update request.
c.waitOnLeader()
// Now try to edit the stream by making it an R1. In some case we get
// a context deadline error, in some no error. So don't check the returned error.
js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
}, nats.MaxWait(5*time.Second))
sl.mu.Lock()
for _, r := range sl.routes {
r.mu.Lock()
ncu := r.nc.(*networkCableUnplugged)
ncu.Lock()
ncu.unplugged = false
ncu.wg.Done()
ncu.Unlock()
r.mu.Unlock()
}
sl.mu.Unlock()
checkClusterFormed(t, c.servers...)
c.waitOnStreamLeader(globalAccountName, "TEST")
}
// We noticed that ha_assets enforcement seemed to not be upheld when assets created in a rapid fashion.
func TestJetStreamClusterHAssetsEnforcement(t *testing.T) {
tmpl := strings.Replace(jsClusterTempl, "store_dir:", "limits: {max_ha_assets: 2}, store_dir:", 1)
c := createJetStreamClusterWithTemplateAndModHook(t, tmpl, "R3S", 3, nil)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST-1",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST-2",
Subjects: []string{"bar"},
Replicas: 3,
})
require_NoError(t, err)
exceededErrs := []error{errors.New("system limit reached"), errors.New("no suitable peers")}
// Should fail.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST-3",
Subjects: []string{"baz"},
Replicas: 3,
})
require_Error(t, err, exceededErrs...)
}
func TestJetStreamClusterInterestStreamConsumer(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)
var subs []*nats.Subscription
ns := 5
for i := 0; i < ns; i++ {
dn := fmt.Sprintf("d%d", i)
sub, err := js.PullSubscribe("foo", dn)
require_NoError(t, err)
subs = append(subs, sub)
}
// Send 10 msgs
n := 10
for i := 0; i < n; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
// Collect all the messages.
var msgs []*nats.Msg
for _, sub := range subs {
lmsgs := fetchMsgs(t, sub, n, time.Second)
if len(lmsgs) != n {
t.Fatalf("Did not receive all msgs: %d vs %d", len(lmsgs), n)
}
msgs = append(msgs, lmsgs...)
}
// Shuffle
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
for _, m := range msgs {
m.AckSync()
}
// Make sure replicated acks are processed.
time.Sleep(250 * time.Millisecond)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs != 0 {
t.Fatalf("Should not have any messages left: %d of %d", si.State.Msgs, n)
}
}
func TestJetStreamClusterNoPanicOnStreamInfoWhenNoLeaderYet(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc := natsConnect(t, c.randomServer().ClientURL())
defer nc.Close()
js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond))
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})
go func() {
defer wg.Done()
for {
js.StreamInfo("TEST")
select {
case <-ch:
return
case <-time.After(15 * time.Millisecond):
}
}
}()
time.Sleep(250 * time.Millisecond)
// Don't care if this succeeds or not (could get a context deadline
// due to the low MaxWait() when creating the context).
js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
close(ch)
wg.Wait()
}
// Issue https://github.com/nats-io/nats-server/issues/3630
func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
n := 10
for i := 0; i < n; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
// Pull Consumer
sub, err := js.PullSubscribe("foo", "d", nats.InactiveThreshold(time.Second))
require_NoError(t, err)
fetchMsgs(t, sub, n/2, time.Second)
// Will wait for .5s.
time.Sleep(500 * time.Millisecond)
msgs := fetchMsgs(t, sub, n/2, time.Second)
if len(msgs) != n/2 {
t.Fatalf("Did not receive msgs: %d vs %d", len(msgs), n/2)
}
// Wait for .5s.
time.Sleep(500 * time.Millisecond)
msgs[0].Ack() // Ack
// Wait another .5s.
time.Sleep(500 * time.Millisecond)
msgs[1].Nak() // Nak
// Wait another .5s.
time.Sleep(500 * time.Millisecond)
msgs[2].Term() // Term
time.Sleep(500 * time.Millisecond)
msgs[3].InProgress() // WIP
// The above should have kept the consumer alive.
_, err = js.ConsumerInfo("TEST", "d")
require_NoError(t, err)
// Make sure it gets cleaned up.
time.Sleep(2 * time.Second)
_, err = js.ConsumerInfo("TEST", "d")
require_Error(t, err, nats.ErrConsumerNotFound)
}
// https://github.com/nats-io/nats-server/issues/3677
func TestJetStreamParallelStreamCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
np := 20
startCh := make(chan bool)
errCh := make(chan error, np)
wg := sync.WaitGroup{}
wg.Add(np)
for i := 0; i < np; i++ {
go func() {
defer wg.Done()
// Individual connection
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Make them all fire at once.
<-startCh
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"common.*.*"},
Replicas: 3,
})
if err != nil {
errCh <- err
}
}()
}
close(startCh)
wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}
// In addition to test above, if streams were attempted to be created in parallel
// it could be that multiple raft groups would be created for the same asset.
func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
np := 20
startCh := make(chan bool)
wg := sync.WaitGroup{}
wg.Add(np)
for i := 0; i < np; i++ {
go func() {
defer wg.Done()
// Individual connection
nc, _ := jsClientConnect(t, c.randomServer())
js, _ := nc.JetStream(nats.MaxWait(time.Second))
defer nc.Close()
// Make them all fire at once.
<-startCh
// Ignore errors in this test, care about raft group and metastate.
js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"common.*.*"},
Replicas: 3,
})
}()
}
close(startCh)
wg.Wait()
// Restart a server too.
s := c.randomServer()
s.Shutdown()
s = c.restartServer(s)
c.waitOnLeader()
c.waitOnStreamLeader(globalAccountName, "TEST")
// Check that this server has only two active raft nodes after restart.
if nrn := s.numRaftNodes(); nrn != 2 {
t.Fatalf("Expected only two active raft nodes, got %d", nrn)
}
// Make sure we only have 2 unique raft groups for all servers.
// One for meta, one for stream.
expected := 2
rg := make(map[string]struct{})
for _, s := range c.servers {
s.mu.RLock()
for _, ni := range s.raftNodes {
n := ni.(*raft)
rg[n.Group()] = struct{}{}
}
s.mu.RUnlock()
}
if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
}
}
func TestJetStreamParallelConsumerCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"common.*.*"},
Replicas: 3,
})
require_NoError(t, err)
np := 50
startCh := make(chan bool)
errCh := make(chan error, np)
cfg := &nats.ConsumerConfig{
Durable: "dlc",
Replicas: 3,
}
wg := sync.WaitGroup{}
swg := sync.WaitGroup{}
wg.Add(np)
swg.Add(np)
for i := 0; i < np; i++ {
go func() {
defer wg.Done()
// Individual connection
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
swg.Done()
// Make them all fire at once.
<-startCh
if _, err := js.AddConsumer("TEST", cfg); err != nil {
errCh <- err
}
}()
}
swg.Wait()
close(startCh)
wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
// Make sure we only have 3 unique raft groups for all servers.
// One for meta, one for stream, one for consumer.
expected := 3
rg := make(map[string]struct{})
for _, s := range c.servers {
s.mu.RLock()
for _, ni := range s.raftNodes {
n := ni.(*raft)
rg[n.Group()] = struct{}{}
}
s.mu.RUnlock()
}
if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
}
}
func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
// Add in 100 memory based ephemerals.
for i := 0; i < 100; i++ {
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Replicas: 1,
InactiveThreshold: time.Second,
MemoryStorage: true,
})
require_NoError(t, err)
}
// Grab random server.
rs := c.randomServer()
// Now shutdown cluster.
c.stopAll()
// Let the consumers all expire.
time.Sleep(2 * time.Second)
// Restart first and wait so that we know it will try cleanup without a metaleader.
c.restartServer(rs)
time.Sleep(time.Second)
c.restartAll()
c.waitOnLeader()
c.waitOnStreamLeader(globalAccountName, "TEST")
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
m, err := nc.Request(subj, nil, time.Second)
if err != nil {
return err
}
var resp JSApiConsumerListResponse
err = json.Unmarshal(m.Data, &resp)
require_NoError(t, err)
if len(resp.Consumers) != 0 {
return fmt.Errorf("Still have %d consumers", len(resp.Consumers))
}
if len(resp.Missing) != 0 {
return fmt.Errorf("Still have %d missing consumers", len(resp.Missing))
}
return nil
})
}
func TestJetStreamClusterReplacementPolicyAfterPeerRemove(t *testing.T) {
// R3 scenario where there is a redundant node in each unique cloud so removing a peer should result in
// an immediate replacement also preserving cloud uniqueness.
sc := createJetStreamClusterExplicit(t, "PR9", 9)
sc.waitOnPeerCount(9)
reset := func(s *Server) {
s.mu.Lock()
rch := s.sys.resetCh
s.mu.Unlock()
if rch != nil {
rch <- struct{}{}
}
s.sendStatszUpdate()
}
tags := []string{"cloud:aws", "cloud:aws", "cloud:aws", "cloud:gcp", "cloud:gcp", "cloud:gcp", "cloud:az", "cloud:az", "cloud:az"}
var serverUTags = make(map[string]string)
for i, s := range sc.servers {
s.optsMu.Lock()
serverUTags[s.Name()] = tags[i]
s.opts.Tags.Add(tags[i])
s.opts.JetStreamUniqueTag = "cloud"
s.optsMu.Unlock()
reset(s)
}
ml := sc.leader()
js := ml.getJetStream()
require_True(t, js != nil)
js.mu.RLock()
cc := js.cluster
require_True(t, cc != nil)
// Walk and make sure all tags are registered.
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
allOK := true
for _, p := range cc.meta.Peers() {
si, ok := ml.nodeToInfo.Load(p.ID)
require_True(t, ok)
ni := si.(nodeInfo)
if len(ni.tags) == 0 {
allOK = false
reset(sc.serverByName(ni.name))
}
}
if allOK {
break
}
}
js.mu.RUnlock()
defer sc.shutdown()
sc.waitOnClusterReadyWithNumPeers(9)
s := sc.leader()
nc, jsc := jsClientConnect(t, s)
defer nc.Close()
_, err := jsc.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "TEST")
osi, err := jsc.StreamInfo("TEST")
require_NoError(t, err)
// Double check original placement honors unique_tag
var uTags = make(map[string]struct{})
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected initial placement to honor unique_tag")
}
}
// Remove a peer and select replacement 5 times to avoid false good
for i := 0; i < 5; i++ {
// Remove 1 peer replica (this will be random cloud region as initial placement was randomized ordering)
// After each successful iteration, osi will reflect the current RG peers
toRemove := osi.Cluster.Replicas[0].Name
resp, err := nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "TEST"), []byte(`{"peer":"`+toRemove+`"}`), time.Second)
require_NoError(t, err)
var rpResp JSApiStreamRemovePeerResponse
err = json.Unmarshal(resp.Data, &rpResp)
require_NoError(t, err)
require_True(t, rpResp.Success)
sc.waitOnStreamLeader(globalAccountName, "TEST")
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("TEST")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 2 {
return fmt.Errorf("expected R3, got R%d", len(osi.Cluster.Replicas)+1)
}
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred by
// checking that the toRemove peer is gone.
for _, replica := range osi.Cluster.Replicas {
if replica.Name == toRemove {
return fmt.Errorf("expected replaced replica, old replica still present")
}
}
return nil
})
// Validate that replacement with new peer still honors
uTags = make(map[string]struct{}) //reset
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected new peer and revised placement to honor unique_tag")
}
}
}
}
func TestJetStreamClusterReplacementPolicyAfterPeerRemoveNoPlace(t *testing.T) {
// R3 scenario where there are exactly three unique cloud nodes, so removing a peer should NOT
// result in a new peer
sc := createJetStreamClusterExplicit(t, "threeup", 3)
sc.waitOnPeerCount(3)
reset := func(s *Server) {
s.mu.Lock()
rch := s.sys.resetCh
s.mu.Unlock()
if rch != nil {
rch <- struct{}{}
}
s.sendStatszUpdate()
}
tags := []string{"cloud:aws", "cloud:gcp", "cloud:az"}
var serverUTags = make(map[string]string)
for i, s := range sc.servers {
s.optsMu.Lock()
serverUTags[s.Name()] = tags[i]
s.opts.Tags.Add(tags[i])
s.opts.JetStreamUniqueTag = "cloud"
s.optsMu.Unlock()
reset(s)
}
ml := sc.leader()
js := ml.getJetStream()
require_True(t, js != nil)
js.mu.RLock()
cc := js.cluster
require_True(t, cc != nil)
// Walk and make sure all tags are registered.
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
allOK := true
for _, p := range cc.meta.Peers() {
si, ok := ml.nodeToInfo.Load(p.ID)
require_True(t, ok)
ni := si.(nodeInfo)
if len(ni.tags) == 0 {
allOK = false
reset(sc.serverByName(ni.name))
}
}
if allOK {
break
}
}
js.mu.RUnlock()
defer sc.shutdown()
sc.waitOnClusterReadyWithNumPeers(3)
s := sc.leader()
nc, jsc := jsClientConnect(t, s)
defer nc.Close()
_, err := jsc.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "TEST")
osi, err := jsc.StreamInfo("TEST")
require_NoError(t, err)
// Double check original placement honors unique_tag
var uTags = make(map[string]struct{})
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected initial placement to honor unique_tag")
}
}
// Remove 1 peer replica (this will be random cloud region as initial placement was randomized ordering)
_, err = nc.Request("$JS.API.STREAM.PEER.REMOVE.TEST", []byte(`{"peer":"`+osi.Cluster.Replicas[0].Name+`"}`), time.Second*10)
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "TEST")
// Verify R2 since no eligible peer can replace the removed peer without braking unique constraint
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("TEST")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 1 {
return fmt.Errorf("expected R2, got R%d", len(osi.Cluster.Replicas)+1)
}
return nil
})
// Validate that remaining members still honor unique tags
uTags = make(map[string]struct{}) //reset
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected revised placement to honor unique_tag")
}
}
}
// https://github.com/nats-io/nats-server/issues/3191
func TestJetStreamClusterLeafnodeDuplicateConsumerMessages(t *testing.T) {
// Cluster B
c := createJetStreamCluster(t, jsClusterTempl, "B", _EMPTY_, 2, 22020, false)
defer c.shutdown()
// Cluster A
// Domain is "A'
lc := c.createLeafNodesWithStartPortAndDomain("A", 2, 22110, "A")
defer lc.shutdown()
lc.waitOnClusterReady()
// We want A-S-1 connected to B-S-1 and A-S-2 connected to B-S-2
// So adjust if needed.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for i, ls := range lc.servers {
ls.mu.RLock()
var remoteServer string
for _, rc := range ls.leafs {
rc.mu.Lock()
remoteServer = rc.leaf.remoteServer
rc.mu.Unlock()
break
}
ls.mu.RUnlock()
wantedRemote := fmt.Sprintf("S-%d", i+1)
if remoteServer != wantedRemote {
ls.Shutdown()
lc.restartServer(ls)
return fmt.Errorf("Leafnode server %d not connected to %q", i+1, wantedRemote)
}
}
return nil
})
// Wait on ready again.
lc.waitOnClusterReady()
// Create a stream and a durable pull consumer on cluster A.
lnc, ljs := jsClientConnect(t, lc.randomServer())
defer lnc.Close()
_, err := ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
require_NoError(t, err)
// Make sure stream leader is on S-1
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := ljs.StreamInfo("TEST")
require_NoError(t, err)
if si.Cluster.Leader == "A-S-1" {
return nil
}
_, err = lnc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
return fmt.Errorf("Stream leader not placed on A-S-1")
})
_, err = ljs.StreamInfo("TEST")
require_NoError(t, err)
_, err = ljs.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dlc",
Replicas: 2,
MaxDeliver: 1,
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)
// Make sure consumer leader is on S-2
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ci, err := ljs.ConsumerInfo("TEST", "dlc")
require_NoError(t, err)
if ci.Cluster.Leader == "A-S-2" {
return nil
}
_, err = lnc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second)
require_NoError(t, err)
return fmt.Errorf("Stream leader not placed on A-S-1")
})
_, err = ljs.ConsumerInfo("TEST", "dlc")
require_NoError(t, err)
// Send 2 messages.
sendStreamMsg(t, lnc, "foo", "M-1")
sendStreamMsg(t, lnc, "foo", "M-2")
// Now bind apps to cluster B servers and bind to pull consumer.
nc1, _ := jsClientConnect(t, c.servers[0])
defer nc1.Close()
js1, err := nc1.JetStream(nats.Domain("A"))
require_NoError(t, err)
sub1, err := js1.PullSubscribe("foo", "dlc", nats.BindStream("TEST"))
require_NoError(t, err)
defer sub1.Unsubscribe()
nc2, _ := jsClientConnect(t, c.servers[1])
defer nc2.Close()
js2, err := nc2.JetStream(nats.Domain("A"))
require_NoError(t, err)
sub2, err := js2.PullSubscribe("foo", "dlc", nats.BindStream("TEST"))
require_NoError(t, err)
defer sub2.Unsubscribe()
// Make sure we can properly get messages.
msgs, err := sub1.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)
require_True(t, string(msgs[0].Data) == "M-1")
msgs, err = sub2.Fetch(1)
require_NoError(t, err)
require_True(t, len(msgs) == 1)
require_True(t, string(msgs[0].Data) == "M-2")
// Make sure delivered state makes it to other server to not accidentally send M-2 again
// and fail the test below.
time.Sleep(250 * time.Millisecond)
// Now let's introduce and event, where A-S-2 will now reconnect after a restart to B-S-2
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ls := lc.servers[1]
wantedRemote := "S-1"
var remoteServer string
ls.mu.RLock()
for _, rc := range ls.leafs {
rc.mu.Lock()
remoteServer = rc.leaf.remoteServer
rc.mu.Unlock()
break
}
ls.mu.RUnlock()
if remoteServer != wantedRemote {
ls.Shutdown()
lc.restartServer(ls)
return fmt.Errorf("Leafnode server not connected to %q", wantedRemote)
}
return nil
})
// Wait on ready again.
lc.waitOnClusterReady()
lc.waitOnStreamLeader(globalAccountName, "TEST")
lc.waitOnConsumerLeader(globalAccountName, "TEST", "dlc")
// Send 2 more messages.
sendStreamMsg(t, lnc, "foo", "M-3")
sendStreamMsg(t, lnc, "foo", "M-4")
msgs, err = sub1.Fetch(2)
require_NoError(t, err)
require_True(t, len(msgs) == 2)
require_True(t, string(msgs[0].Data) == "M-3")
require_True(t, string(msgs[1].Data) == "M-4")
// Send 2 more messages.
sendStreamMsg(t, lnc, "foo", "M-5")
sendStreamMsg(t, lnc, "foo", "M-6")
msgs, err = sub2.Fetch(2)
require_NoError(t, err)
require_True(t, len(msgs) == 2)
require_True(t, string(msgs[0].Data) == "M-5")
require_True(t, string(msgs[1].Data) == "M-6")
}
func snapRGSet(pFlag bool, banner string, osi *nats.StreamInfo) *map[string]struct{} {
var snapSet = make(map[string]struct{})
if pFlag {
fmt.Println(banner)
}
if osi == nil {
if pFlag {
fmt.Printf("bonkers!\n")
}
return nil
}
snapSet[osi.Cluster.Leader] = struct{}{}
if pFlag {
fmt.Printf("Leader: %s\n", osi.Cluster.Leader)
}
for _, replica := range osi.Cluster.Replicas {
snapSet[replica.Name] = struct{}{}
if pFlag {
fmt.Printf("Replica: %s\n", replica.Name)
}
}
return &snapSet
}
func TestJetStreamClusterAfterPeerRemoveZeroState(t *testing.T) {
// R3 scenario (w/messages) in a 4-node cluster. Peer remove from RG and add back to same RG later.
// Validate that original peer brought no memory or issues from its previous RG tour of duty, specifically
// that the restored peer has the correct filestore usage bytes for the asset.
var err error
sc := createJetStreamClusterExplicit(t, "cl4", 4)
defer sc.shutdown()
sc.waitOnClusterReadyWithNumPeers(4)
s := sc.leader()
nc, jsc := jsClientConnect(t, s)
defer nc.Close()
_, err = jsc.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "foo")
osi, err := jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure 0 msgs
require_True(t, osi.State.Msgs == 0)
// load up messages
toSend := 10000
// storage bytes with JS message overhead
assetStoreBytesExpected := uint64(460000)
for i := 1; i <= toSend; i++ {
msg := []byte("Hello World")
if _, err = jsc.Publish("foo.a", msg); err != nil {
t.Fatalf("unexpected publish error: %v", err)
}
}
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure 10000 msgs
require_True(t, osi.State.Msgs == uint64(toSend))
origSet := *snapRGSet(false, "== Orig RG Set ==", osi)
// remove 1 peer replica (1 of 2 non-leaders)
origPeer := osi.Cluster.Replicas[0].Name
resp, err := nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "foo"), []byte(`{"peer":"`+origPeer+`"}`), time.Second)
require_NoError(t, err)
var rpResp JSApiStreamRemovePeerResponse
err = json.Unmarshal(resp.Data, &rpResp)
require_NoError(t, err)
require_True(t, rpResp.Success)
// validate the origPeer is removed with a replacement newPeer
sc.waitOnStreamLeader(globalAccountName, "foo")
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 2 {
return fmt.Errorf("expected R3, got R%d", len(osi.Cluster.Replicas)+1)
}
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred
for _, replica := range osi.Cluster.Replicas {
if replica.Name == origPeer {
return fmt.Errorf("expected replaced replica, old replica still present")
}
}
return nil
})
// identify the new peer
var newPeer string
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
newSet := *snapRGSet(false, "== New RG Set ==", osi)
for peer := range newSet {
_, ok := origSet[peer]
if !ok {
newPeer = peer
break
}
}
require_True(t, newPeer != "")
// kick out newPeer which will cause origPeer to be assigned to the RG again
resp, err = nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "foo"), []byte(`{"peer":"`+newPeer+`"}`), time.Second)
require_NoError(t, err)
err = json.Unmarshal(resp.Data, &rpResp)
require_NoError(t, err)
require_True(t, rpResp.Success)
// validate the newPeer is removed and R3 has reformed (with origPeer)
sc.waitOnStreamLeader(globalAccountName, "foo")
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 2 {
return fmt.Errorf("expected R3, got R%d", len(osi.Cluster.Replicas)+1)
}
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred
for _, replica := range osi.Cluster.Replicas {
if replica.Name == newPeer {
return fmt.Errorf("expected replaced replica, old replica still present")
}
}
return nil
})
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure all msgs reported in stream at this point with original leader
require_True(t, osi.State.Msgs == uint64(toSend))
snapRGSet(false, "== RG Set w/origPeer Back ==", osi)
// get a handle to original peer server
var origServer *Server = sc.serverByName(origPeer)
if origServer == nil {
t.Fatalf("expected to get a handle to original peer server by name")
}
checkFor(t, time.Second, 200*time.Millisecond, func() error {
jszResult, err := origServer.Jsz(nil)
require_NoError(t, err)
if jszResult.Store != assetStoreBytesExpected {
return fmt.Errorf("expected %d storage on orig peer, got %d", assetStoreBytesExpected, jszResult.Store)
}
return nil
})
}
func TestJetStreamClusterMemLeaderRestart(t *testing.T) {
// Test if R3 clustered mem store asset leader server restarted, that asset remains stable with final quorum
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
ml := c.leader()
nc, jsc := jsClientConnect(t, ml)
defer nc.Close()
_, err := jsc.AddStream(&nats.StreamConfig{
Name: "foo",
Storage: nats.MemoryStorage,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)
// load up messages
toSend := 10000
for i := 1; i <= toSend; i++ {
msg := []byte("Hello World")
if _, err = jsc.Publish("foo.a", msg); err != nil {
t.Fatalf("unexpected publish error: %v", err)
}
}
osi, err := jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure 10000 msgs
require_True(t, osi.State.Msgs == uint64(toSend))
// Shutdown the stream leader server
rs := c.serverByName(osi.Cluster.Leader)
rs.Shutdown()
// Make sure that we have a META leader (there can always be a re-election)
c.waitOnLeader()
// Should still have quorum and a new leader
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("foo")
if err != nil {
return fmt.Errorf("expected healthy stream asset, got %s", err.Error())
}
if osi.Cluster.Leader == "" {
return fmt.Errorf("expected healthy stream asset with new leader")
}
if osi.State.Msgs != uint64(toSend) {
return fmt.Errorf("expected healthy stream asset %d messages, got %d messages", toSend, osi.State.Msgs)
}
return nil
})
// Now restart the old leader peer (old stream state)
oldrs := rs
rs, _ = RunServerWithConfig(rs.getOpts().ConfigFile)
defer rs.Shutdown()
// Replaced old with new server
for i := 0; i < len(c.servers); i++ {
if c.servers[i] == oldrs {
c.servers[i] = rs
}
}
// Wait for cluster to be formed
checkClusterFormed(t, c.servers...)
// Make sure that we have a leader (there can always be a re-election)
c.waitOnLeader()
// Can we get stream info after return
osi, err = jsc.StreamInfo("foo")
if err != nil {
t.Fatalf("expected stream asset info return, got %s", err.Error())
}
// When asset leader came back did we re-form with quorum
if osi.Cluster.Leader == "" {
t.Fatalf("expected a current leader after old leader restarted")
}
}
// Customer reported R1 consumers that seemed to be ghosted after server restart.
func TestJetStreamClusterLostConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "GHOST", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
Replicas: 3,
})
require_NoError(t, err)
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
_, err := js.Publish(fmt.Sprintf("events.%d.%d", i, j), []byte("test"))
require_NoError(t, err)
}
}
s := c.randomServer()
s.Shutdown()
c.waitOnLeader()
c.waitOnStreamLeader(globalAccountName, "TEST")
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
cc := CreateConsumerRequest{
Stream: "TEST",
Config: ConsumerConfig{
AckPolicy: AckExplicit,
},
}
req, err := json.Marshal(cc)
require_NoError(t, err)
reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST")
// Now create 50 consumers. We do not wait for the answer.
for i := 0; i < 50; i++ {
nc.Publish(reqSubj, req)
}
nc.Flush()
// Grab the meta leader.
ml := c.leader()
require_NoError(t, ml.JetStreamSnapshotMeta())
numConsumerAssignments := func(s *Server) int {
t.Helper()
js := s.getJetStream()
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
for _, asa := range cc.streams {
for _, sa := range asa {
return len(sa.consumers)
}
}
return 0
}
checkFor(t, time.Second, 100*time.Millisecond, func() error {
num := numConsumerAssignments(ml)
if num == 50 {
return nil
}
return fmt.Errorf("Consumers is only %d", num)
})
// Restart the server we shutdown. We snapshotted to the snapshot
// has to fill in the new consumers.
// The bug would fail to add them to the meta state since the stream
// existed.
s = c.restartServer(s)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
num := numConsumerAssignments(s)
if num == 50 {
return nil
}
return fmt.Errorf("Consumers is only %d", num)
})
}
// https://github.com/nats-io/nats-server/issues/3636
func TestJetStreamClusterScaleDownDuringServerOffline(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", "hello")
}
s := c.randomNonStreamLeader(globalAccountName, "TEST")
s.Shutdown()
c.waitOnLeader()
nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)
s = c.restartServer(s)
checkFor(t, time.Second, 200*time.Millisecond, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
}
// Reported by a customer manually upgrading their streams to support direct gets.
// Worked if single replica but not in clustered mode.
func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "KV_TEST",
Subjects: []string{"$KV.TEST.>"},
Discard: nats.DiscardNew,
MaxMsgsPerSubject: 1,
DenyDelete: true,
Replicas: 3,
})
require_NoError(t, err)
kv, err := js.KeyValue("TEST")
require_NoError(t, err)
_, err = kv.PutString("name", "derek")
require_NoError(t, err)
entry, err := kv.Get("name")
require_NoError(t, err)
require_True(t, string(entry.Value()) == "derek")
// Now simulate a update to the stream to support direct gets.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "KV_TEST",
Subjects: []string{"$KV.TEST.>"},
Discard: nats.DiscardNew,
MaxMsgsPerSubject: 1,
DenyDelete: true,
AllowDirect: true,
Replicas: 3,
})
require_NoError(t, err)
// Rebind to KV to make sure we DIRECT version of Get().
kv, err = js.KeyValue("TEST")
require_NoError(t, err)
// Make sure direct get works.
entry, err = kv.Get("name")
require_NoError(t, err)
require_True(t, string(entry.Value()) == "derek")
}
// For interest (or workqueue) based streams its important to match the replication factor.
// This was the case but now that more control over consumer creation is allowed its possible
// to create a consumer where the replication factor does not match. This could cause
// instability in the state between servers and cause problems on leader switches.
func TestJetStreamClusterInterestPolicyStreamForConsumersToMatchRFactor(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "XX",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_Error(t, err, NewJSConsumerReplicasShouldMatchStreamError())
}
// https://github.com/nats-io/nats-server/issues/3791
func TestJetStreamClusterKVWatchersWithServerDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)
kv.PutString("foo", "bar")
kv.PutString("foo", "baz")
// Shutdown a follower.
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
s.Shutdown()
c.waitOnLeader()
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
js, err = nc.JetStream(nats.MaxWait(2 * time.Second))
require_NoError(t, err)
kv, err = js.KeyValue("TEST")
require_NoError(t, err)
for i := 0; i < 100; i++ {
w, err := kv.Watch("foo")
require_NoError(t, err)
w.Stop()
}
}
// TestJetStreamClusterCurrentVsHealth is designed to show the
// difference between "current" and "healthy" when async publishes
// outpace the rate at which they can be applied.
func TestJetStreamClusterCurrentVsHealth(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
c.waitOnLeader()
server := c.randomNonLeader()
nc, js := jsClientConnect(t, server)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
server = c.randomNonStreamLeader(globalAccountName, "TEST")
stream, err := server.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
raft, ok := stream.raftGroup().node.(*raft)
require_True(t, ok)
for i := 0; i < 1000; i++ {
_, err := js.PublishAsync("foo", []byte("bar"))
require_NoError(t, err)
raft.RLock()
commit := raft.commit
applied := raft.applied
raft.RUnlock()
current := raft.Current()
healthy := raft.Healthy()
if !current || !healthy || commit != applied {
t.Logf(
"%d | Current %v, healthy %v, commit %d, applied %d, pending %d",
i, current, healthy, commit, applied, commit-applied,
)
}
}
}
// Several users and customers use this setup, but many times across leafnodes.
// This should be allowed in same account since we are really protecting against
// multiple pub acks with cycle detection.
func TestJetStreamClusterActiveActiveSourcedStreams(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "A",
Subjects: []string{"A.>"},
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "B",
Subjects: []string{"B.>"},
})
require_NoError(t, err)
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "A",
Subjects: []string{"A.>"},
Sources: []*nats.StreamSource{{
Name: "B",
FilterSubject: "B.>",
}},
})
require_NoError(t, err)
// Before this would fail.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "B",
Subjects: []string{"B.>"},
Sources: []*nats.StreamSource{{
Name: "A",
FilterSubject: "A.>",
}},
})
require_NoError(t, err)
}
func TestJetStreamClusterUpdateConsumerShouldNotForceDeleteOnRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R7S", 7)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_no_bind_",
})
require_NoError(t, err)
// Shutdown a consumer follower.
nc.Close()
s := c.serverByName(ci.Cluster.Replicas[0].Name)
s.Shutdown()
c.waitOnLeader()
nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()
// Change delivery subject.
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Durable: "D",
DeliverSubject: "_d_",
})
require_NoError(t, err)
// Create interest in new and old deliver subject.
_, err = nc.SubscribeSync("_d_")
require_NoError(t, err)
_, err = nc.SubscribeSync("_no_bind_")
require_NoError(t, err)
nc.Flush()
c.restartServer(s)
c.waitOnAllCurrent()
// Wait on bad error that would cleanup consumer.
time.Sleep(time.Second)
_, err = js.ConsumerInfo("TEST", "D")
require_NoError(t, err)
}
func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
for _, test := range []struct {
testName string
stream string
subject string
durable string
name string
}{
{testName: "InterestWithDurable", durable: "eph", subject: "intdur", stream: "INT_DUR"},
{testName: "InterestWithName", name: "eph", subject: "inteph", stream: "INT_EPH"},
} {
t.Run(test.testName, func(t *testing.T) {
var err error
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err = js.AddStream(&nats.StreamConfig{
Name: test.stream,
Subjects: []string{test.subject},
Retention: nats.LimitsPolicy,
Replicas: 3,
})
require_NoError(t, err)
const inactiveThreshold = time.Second
_, err = js.AddConsumer(test.stream, &nats.ConsumerConfig{
DeliverSubject: nats.NewInbox(),
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: inactiveThreshold,
Durable: test.durable,
Name: test.name,
})
require_NoError(t, err)
name := test.durable
if test.durable == _EMPTY_ {
name = test.name
}
const msgs = 10_000
done, count := make(chan bool), 0
sub, err := js.Subscribe(_EMPTY_, func(msg *nats.Msg) {
require_NoError(t, msg.Ack())
count++
if count >= msgs {
done <- true
}
}, nats.Bind(test.stream, name), nats.ManualAck())
require_NoError(t, err)
// This happens only if we start publishing messages after consumer was created.
go func() {
for i := 0; i < msgs; i++ {
js.PublishAsync(test.subject, []byte("DATA"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * inactiveThreshold):
}
}()
// Wait for inactive threshold to expire and all messages to be published and received
// Bug is we clean up active consumers when we should not.
time.Sleep(3 * inactiveThreshold / 2)
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive done signal")
}
info, err := js.ConsumerInfo(test.stream, name)
if err != nil {
t.Fatalf("Expected to be able to retrieve consumer: %v", err)
}
require_True(t, info.Delivered.Stream == msgs)
// Stop the subscription and remove the interest.
err = sub.Unsubscribe()
require_NoError(t, err)
// Now wait for interest inactivity threshold to kick in.
time.Sleep(3 * inactiveThreshold / 2)
// Check if the consumer has been removed.
_, err = js.ConsumerInfo(test.stream, name)
require_Error(t, err, nats.ErrConsumerNotFound)
})
}
}
// TestJetStreamClusterWALBuildupOnNoOpPull tests whether or not the consumer
// RAFT log is being compacted when the stream is idle but we are performing
// lots of fetches. Otherwise the disk usage just spirals out of control if
// there are no other state changes to trigger a compaction.
func TestJetStreamClusterWALBuildupOnNoOpPull(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
sub, err := js.PullSubscribe(
"foo",
"durable",
nats.ConsumerReplicas(3),
)
require_NoError(t, err)
for i := 0; i < 10000; i++ {
_, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond))
}
// Needs to be at least 5 seconds, otherwise we won't hit the
// minSnapDelta that prevents us from snapshotting too often
time.Sleep(time.Second * 6)
for i := 0; i < 1024; i++ {
_, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond))
}
time.Sleep(time.Second)
server := c.randomNonConsumerLeader(globalAccountName, "TEST", "durable")
stream, err := server.globalAccount().lookupStream("TEST")
require_NoError(t, err)
consumer := stream.lookupConsumer("durable")
require_NotNil(t, consumer)
entries, bytes := consumer.raftNode().Size()
t.Log("new entries:", entries)
t.Log("new bytes:", bytes)
if max := uint64(1024); entries > max {
t.Fatalf("got %d entries, expected less than %d entries", entries, max)
}
}
// Found in https://github.com/nats-io/nats-server/issues/3848
// When Max Age was specified and stream was scaled up, new replicas
// were expiring messages much later than the leader.
func TestJetStreamClusterStreamMaxAgeScaleUp(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
for _, test := range []struct {
name string
storage nats.StorageType
stream string
purge bool
}{
{name: "file", storage: nats.FileStorage, stream: "A", purge: false},
{name: "memory", storage: nats.MemoryStorage, stream: "B", purge: false},
{name: "file with purge", storage: nats.FileStorage, stream: "C", purge: true},
{name: "memory with purge", storage: nats.MemoryStorage, stream: "D", purge: true},
} {
t.Run(test.name, func(t *testing.T) {
ttl := time.Second * 5
// Add stream with one replica and short MaxAge.
_, err := js.AddStream(&nats.StreamConfig{
Name: test.stream,
Replicas: 1,
Subjects: []string{test.stream},
MaxAge: ttl,
Storage: test.storage,
})
require_NoError(t, err)
// Add some messages.
for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, test.stream, "HELLO")
}
// We need to also test if we properly set expiry
// if first sequence is not 1.
if test.purge {
err = js.PurgeStream(test.stream)
require_NoError(t, err)
// Add some messages.
for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, test.stream, "HELLO")
}
}
// Mark the time when all messages were published.
start := time.Now()
// Sleep for half of the MaxAge time.
time.Sleep(ttl / 2)
// Scale up the Stream to 3 replicas.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: test.stream,
Replicas: 3,
Subjects: []string{test.stream},
MaxAge: ttl,
Storage: test.storage,
})
require_NoError(t, err)
// All messages should still be there.
info, err := js.StreamInfo(test.stream)
require_NoError(t, err)
require_True(t, info.State.Msgs == 10)
// Wait until MaxAge is reached.
time.Sleep(ttl - time.Since(start) + (10 * time.Millisecond))
// Check if all messages are expired.
info, err = js.StreamInfo(test.stream)
require_NoError(t, err)
require_True(t, info.State.Msgs == 0)
// Now switch leader to one of replicas
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, test.stream), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader("$G", test.stream)
// and make sure that it also expired all messages
info, err = js.StreamInfo(test.stream)
require_NoError(t, err)
require_True(t, info.State.Msgs == 0)
})
}
}