Files
nats-server/server/jetstream_super_cluster_test.go
Ivan Kozlovic 53e3c53d96 [FIXED] JetStream: consumer with deliver new may miss messages
This could happen when a consumer had not sent anything to the
attached NATS subscription and there was a consumer leader
step down or server restart.

Signed-off-by: Derek Collison <derek@nats.io>
2022-05-23 12:01:48 -06:00

2486 lines
71 KiB
Go

// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !skip_js_tests && !skip_js_cluster_tests && !skip_js_super_cluster_tests
// +build !skip_js_tests,!skip_js_cluster_tests,!skip_js_super_cluster_tests
package server
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/nats.go"
)
func TestJetStreamSuperClusterMetaPlacement(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// We want to influence where the meta leader will place itself when we ask the
// current leader to stepdown.
ml := sc.leader()
cn := ml.ClusterName()
var pcn string
for _, c := range sc.clusters {
if c.name != cn {
pcn = c.name
break
}
}
// Client based API
s := sc.randomServer()
nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer nc.Close()
stepdown := func(cn string) *JSApiLeaderStepDownResponse {
req := &JSApiLeaderStepdownRequest{Placement: &Placement{Cluster: cn}}
jreq, err := json.Marshal(req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err := nc.Request(JSApiLeaderStepDown, jreq, time.Second)
if err != nil {
t.Fatalf("Error on stepdown request: %v", err)
}
var sdr JSApiLeaderStepDownResponse
if err := json.Unmarshal(resp.Data, &sdr); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return &sdr
}
// Make sure we get correct errors for tags and bad or unavailable cluster placement.
sdr := stepdown("C22")
if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no suitable peers") {
t.Fatalf("Got incorrect error result: %+v", sdr.Error)
}
// Should work.
sdr = stepdown(pcn)
if sdr.Error != nil {
t.Fatalf("Got an error on stepdown: %+v", sdr.Error)
}
sc.waitOnLeader()
ml = sc.leader()
cn = ml.ClusterName()
if cn != pcn {
t.Fatalf("Expected new metaleader to be in cluster %q, got %q", pcn, cn)
}
}
func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) {
tmlp := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s', unique_tag: az}
leaf {listen: 127.0.0.1:-1}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
s := createJetStreamSuperClusterWithTemplateAndModHook(t, tmlp, 5, 2,
func(serverName, clustername, storeDir, conf string) string {
azTag := map[string]string{
"C1-S1": "az:same",
"C1-S2": "az:same",
"C1-S3": "az:same",
"C1-S4": "az:same",
"C1-S5": "az:same",
"C2-S1": "az:1",
"C2-S2": "az:2",
"C2-S3": "az:1",
"C2-S4": "az:2",
"C2-S5": "az:1",
}
return conf + fmt.Sprintf("\nserver_tags: [cloud:%s-tag, %s]\n", clustername, azTag[serverName])
})
defer s.shutdown()
nc := natsConnect(t, s.randomServer().ClientURL())
defer nc.Close()
js, err := nc.JetStream()
require_NoError(t, err)
for i, test := range []struct {
placement *nats.Placement
replicas int
fail bool
cluster string
}{
// these pass because replica count is 1
{&nats.Placement{Tags: []string{"az:same"}}, 1, false, "C1"},
{&nats.Placement{Tags: []string{"cloud:C1-tag", "az:same"}}, 1, false, "C1"},
{&nats.Placement{Tags: []string{"cloud:C1-tag"}}, 1, false, "C1"},
// pass because az is set, which disables the filter
{&nats.Placement{Tags: []string{"az:same"}}, 2, false, "C1"},
{&nats.Placement{Tags: []string{"cloud:C1-tag", "az:same"}}, 2, false, "C1"},
// fails because this cluster only has the same az
{&nats.Placement{Tags: []string{"cloud:C1-tag"}}, 2, true, ""},
// fails because no 3 unique tags exist
{&nats.Placement{Tags: []string{"cloud:C2-tag"}}, 3, true, ""},
{nil, 3, true, ""},
// pass because replica count is low enough
{nil, 2, false, "C2"},
{&nats.Placement{Tags: []string{"cloud:C2-tag"}}, 2, false, "C2"},
// pass because az is provided
{&nats.Placement{Tags: []string{"az:1"}}, 3, false, "C2"},
{&nats.Placement{Tags: []string{"az:2"}}, 2, false, "C2"},
} {
name := fmt.Sprintf("test-%d", i)
t.Run(name, func(t *testing.T) {
ci, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement})
if test.fail {
require_Error(t, err)
require_Equal(t, err.Error(), "insufficient resources")
return
}
require_NoError(t, err)
if test.cluster != _EMPTY_ {
require_Equal(t, ci.Cluster.Name, test.cluster)
}
})
}
}
func TestJetStreamSuperClusterBasics(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Client based API
s := sc.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 10 messages.
msg, toSend := []byte("Hello JS Clustering"), 10
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// Now grab info for this stream.
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "TEST" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
// Check active state as well, shows that the owner answered.
if si.State.Msgs != uint64(toSend) {
t.Fatalf("Expected %d msgs, got bad state: %+v", toSend, si.State)
}
// Check request origin placement.
if si.Cluster.Name != s.ClusterName() {
t.Fatalf("Expected stream to be placed in %q, but got %q", s.ClusterName(), si.Cluster.Name)
}
// Check consumers.
sub, err := js.SubscribeSync("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, toSend)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.Delivered.Consumer != uint64(toSend) || ci.NumAckPending != toSend {
t.Fatalf("ConsumerInfo is not correct: %+v", ci)
}
// Now check we can place a stream.
pcn := "C3"
scResp, err := js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Placement: &nats.Placement{Cluster: pcn},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.Cluster.Name != pcn {
t.Fatalf("Expected the stream to be placed in %q, got %q", pcn, scResp.Cluster.Name)
}
}
// Test that consumer interest across gateways and superclusters is properly identitifed in a remote cluster.
func TestJetStreamSuperClusterCrossClusterConsumerInterest(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Since we need all of the peers accounted for to add the stream wait for all to be present.
sc.waitOnPeerCount(9)
// Client based API - Connect to Cluster C1. Stream and consumer will live in C2.
s := sc.clusterForName("C1").randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
pcn := "C2"
_, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 3, Placement: &nats.Placement{Cluster: pcn}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Pull based first.
sub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send a message.
if _, err = js.Publish("foo", []byte("CCI")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
fetchMsgs(t, sub, 1, 5*time.Second)
// Now check push based delivery.
sub, err = js.SubscribeSync("foo", nats.Durable("rip"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 1)
// Send another message.
if _, err = js.Publish("foo", []byte("CCI")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
checkSubsPending(t, sub, 2)
}
func TestJetStreamSuperClusterPeerReassign(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Client based API
s := sc.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
pcn := "C2"
// Create a stream in C2 that sources TEST
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Placement: &nats.Placement{Cluster: pcn},
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 10 messages.
msg, toSend := []byte("Hello JS Clustering"), 10
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// Now grab info for this stream.
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "TEST" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
// Check active state as well, shows that the owner answered.
if si.State.Msgs != uint64(toSend) {
t.Fatalf("Expected %d msgs, got bad state: %+v", toSend, si.State)
}
// Check request origin placement.
if si.Cluster.Name != pcn {
t.Fatalf("Expected stream to be placed in %q, but got %q", s.ClusterName(), si.Cluster.Name)
}
// Now remove a peer that is assigned to the stream.
rc := sc.clusterForName(pcn)
rs := rc.randomNonStreamLeader("$G", "TEST")
rc.removeJetStream(rs)
// Check the stream info is eventually correct.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
if len(si.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(si.Cluster.Replicas))
}
for _, peer := range si.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
if !strings.HasPrefix(peer.Name, pcn) {
t.Fatalf("Stream peer reassigned to wrong cluster: %q", peer.Name)
}
}
return nil
})
}
func TestJetStreamSuperClusterInterestOnlyMode(t *testing.T) {
template := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
accounts {
one {
jetstream: enabled
users [{user: one, password: password}]
}
two {
%s
users [{user: two, password: password}]
}
}
cluster {
listen: 127.0.0.1:%d
name: %s
routes = ["nats://127.0.0.1:%d"]
}
gateway {
name: %s
listen: 127.0.0.1:%d
gateways = [{name: %s, urls: ["nats://127.0.0.1:%d"]}]
}
`
storeDir1 := createDir(t, JetStreamStoreDir)
conf1 := createConfFile(t, []byte(fmt.Sprintf(template,
"S1", storeDir1, "", 23222, "A", 23222, "A", 11222, "B", 11223)))
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()
storeDir2 := createDir(t, JetStreamStoreDir)
conf2 := createConfFile(t, []byte(fmt.Sprintf(template,
"S2", storeDir2, "", 23223, "B", 23223, "B", 11223, "A", 11222)))
s2, o2 := RunServerWithConfig(conf2)
defer s2.Shutdown()
waitForInboundGateways(t, s1, 1, 2*time.Second)
waitForInboundGateways(t, s2, 1, 2*time.Second)
waitForOutboundGateways(t, s1, 1, 2*time.Second)
waitForOutboundGateways(t, s2, 1, 2*time.Second)
nc1 := natsConnect(t, fmt.Sprintf("nats://two:password@127.0.0.1:%d", o1.Port))
defer nc1.Close()
nc1.Publish("foo", []byte("some message"))
nc1.Flush()
nc2 := natsConnect(t, fmt.Sprintf("nats://two:password@127.0.0.1:%d", o2.Port))
defer nc2.Close()
nc2.Publish("bar", []byte("some message"))
nc2.Flush()
checkMode := func(accName string, expectedMode GatewayInterestMode) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
servers := []*Server{s1, s2}
for _, s := range servers {
var gws []*client
s.getInboundGatewayConnections(&gws)
for _, gw := range gws {
var mode GatewayInterestMode
gw.mu.Lock()
ie := gw.gw.insim[accName]
if ie != nil {
mode = ie.mode
}
gw.mu.Unlock()
if ie == nil {
return fmt.Errorf("Account %q not in map", accName)
}
if mode != expectedMode {
return fmt.Errorf("Expected account %q mode to be %v, got: %v", accName, expectedMode, mode)
}
}
}
return nil
})
}
checkMode("one", InterestOnly)
checkMode("two", Optimistic)
// Now change account "two" to enable JS
changeCurrentConfigContentWithNewContent(t, conf1, []byte(fmt.Sprintf(template,
"S1", storeDir1, "jetstream: enabled", 23222, "A", 23222, "A", 11222, "B", 11223)))
changeCurrentConfigContentWithNewContent(t, conf2, []byte(fmt.Sprintf(template,
"S2", storeDir2, "jetstream: enabled", 23223, "B", 23223, "B", 11223, "A", 11222)))
if err := s1.Reload(); err != nil {
t.Fatalf("Error on s1 reload: %v", err)
}
if err := s2.Reload(); err != nil {
t.Fatalf("Error on s2 reload: %v", err)
}
checkMode("one", InterestOnly)
checkMode("two", InterestOnly)
}
func TestJetStreamSuperClusterConnectionCount(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 2)
defer sc.shutdown()
sysNc := natsConnect(t, sc.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
defer sysNc.Close()
_, err := sysNc.Request(fmt.Sprintf(accReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond)
// this is a timeout as the server only responds when it has connections....
// not convinced this should be that way, but also not the issue to investigate.
require_True(t, err == nats.ErrTimeout)
for i := 1; i <= 2; i++ {
func() {
nc := natsConnect(t, sc.clusterForName(fmt.Sprintf("C%d", i)).randomServer().ClientURL())
defer nc.Close()
js, err := nc.JetStream()
require_NoError(t, err)
name := fmt.Sprintf("foo%d", 1)
_, err = js.AddStream(&nats.StreamConfig{
Name: name,
Subjects: []string{name},
Replicas: 3})
require_NoError(t, err)
}()
}
func() {
nc := natsConnect(t, sc.clusterForName("C1").randomServer().ClientURL())
defer nc.Close()
js, err := nc.JetStream()
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "src",
Sources: []*nats.StreamSource{{Name: "foo.1"}, {Name: "foo.2"}},
Replicas: 3})
require_NoError(t, err)
}()
func() {
nc := natsConnect(t, sc.clusterForName("C2").randomServer().ClientURL())
defer nc.Close()
js, err := nc.JetStream()
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "mir",
Mirror: &nats.StreamSource{Name: "foo.2"},
Replicas: 3})
require_NoError(t, err)
}()
// There should be no active NATS CLIENT connections, but we still need
// to wait a little bit...
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
_, err := sysNc.Request(fmt.Sprintf(accReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond)
if err != nats.ErrTimeout {
return fmt.Errorf("Expected timeout, got %v", err)
}
return nil
})
sysNc.Close()
s := sc.randomServer()
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
acc, err := s.lookupAccount("ONE")
if err != nil {
t.Fatalf("Could not look up account: %v", err)
}
if n := acc.NumConnections(); n != 0 {
return fmt.Errorf("Expected no connections, got %d", n)
}
return nil
})
}
func TestJetStreamSuperClusterConsumersBrokenGateways(t *testing.T) {
sc := createJetStreamSuperCluster(t, 1, 2)
defer sc.shutdown()
// Client based API
s := sc.clusterForName("C1").randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// This will be in C1.
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Create a stream in C2 that sources TEST
_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Placement: &nats.Placement{Cluster: "C2"},
Sources: []*nats.StreamSource{{Name: "TEST"}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Wait for direct consumer to get registered and detect interest across GW.
time.Sleep(time.Second)
// Send 100 msgs over 100ms in separate Go routine.
msg, toSend, done := []byte("Hello"), 100, make(chan bool)
go func() {
// Send in 10 messages.
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Errorf("Unexpected publish error: %v", err)
}
time.Sleep(500 * time.Microsecond)
}
done <- true
}()
breakGW := func() {
s.gateway.Lock()
gw := s.gateway.out["C2"]
s.gateway.Unlock()
if gw != nil {
gw.closeConnection(ClientClosed)
}
}
// Wait til about half way through.
time.Sleep(20 * time.Millisecond)
// Now break GW connection.
breakGW()
// Wait for GW to reform.
for _, c := range sc.clusters {
for _, s := range c.servers {
waitForOutboundGateways(t, s, 1, 2*time.Second)
}
}
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatalf("Did not complete sending first batch of messages")
}
// Make sure we can deal with data loss at the end.
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 100 {
return fmt.Errorf("Expected to have %d messages, got %d", 100, si.State.Msgs)
}
return nil
})
// Now send 100 more. Will aos break here in the middle.
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
if i == 50 {
breakGW()
}
}
// Wait for GW to reform.
for _, c := range sc.clusters {
for _, s := range c.servers {
waitForOutboundGateways(t, s, 1, 2*time.Second)
}
}
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 200 {
t.Fatalf("Expected to have %d messages, got %d", 200, si.State.Msgs)
}
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
return fmt.Errorf("Unexpected error: %v", err)
}
if si.State.Msgs != 200 {
return fmt.Errorf("Expected to have %d messages, got %d", 200, si.State.Msgs)
}
return nil
})
}
func TestJetStreamSuperClusterLeafNodesWithSharedSystemAccountAndSameDomain(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
lnc := sc.createLeafNodes("LNC", 2)
defer lnc.shutdown()
// We want to make sure there is only one leader and its always in the supercluster.
sc.waitOnLeader()
if ml := lnc.leader(); ml != nil {
t.Fatalf("Detected a meta-leader in the leafnode cluster: %s", ml)
}
// leafnodes should have been added into the overall peer count.
sc.waitOnPeerCount(8)
// Check here that we auto detect sharing system account as well and auto place the correct
// deny imports and exports.
ls := lnc.randomServer()
if ls == nil {
t.Fatalf("Expected a leafnode server, got none")
}
gacc := ls.globalAccount().GetName()
ls.mu.Lock()
var hasDE, hasDI bool
for _, ln := range ls.leafs {
ln.mu.Lock()
if ln.leaf.remote.RemoteLeafOpts.LocalAccount == gacc {
re := ln.perms.pub.deny.Match(jsAllAPI)
hasDE = len(re.psubs)+len(re.qsubs) > 0
rs := ln.perms.sub.deny.Match(jsAllAPI)
hasDI = len(rs.psubs)+len(rs.qsubs) > 0
}
ln.mu.Unlock()
}
ls.mu.Unlock()
if !hasDE {
t.Fatalf("No deny export on global account")
}
if !hasDI {
t.Fatalf("No deny import on global account")
}
// Make a stream by connecting to the leafnode cluster. Make sure placement is correct.
// Client based API
nc, js := jsClientConnect(t, lnc.randomServer())
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster.Name != "LNC" {
t.Fatalf("Expected default placement to be %q, got %q", "LNC", si.Cluster.Name)
}
// Now make sure placement also works if we want to place in a cluster in the supercluster.
pcn := "C2"
si, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"baz"},
Replicas: 2,
Placement: &nats.Placement{Cluster: pcn},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster.Name != pcn {
t.Fatalf("Expected default placement to be %q, got %q", pcn, si.Cluster.Name)
}
}
func TestJetStreamSuperClusterLeafNodesWithSharedSystemAccountAndDifferentDomain(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
lnc := sc.createLeafNodesWithDomain("LNC", 2, "LEAFDOMAIN")
defer lnc.shutdown()
// We want to make sure there is only one leader and its always in the supercluster.
sc.waitOnLeader()
lnc.waitOnLeader()
// even though system account is shared, because domains differ,
sc.waitOnPeerCount(6)
lnc.waitOnPeerCount(2)
// Check here that we auto detect sharing system account as well and auto place the correct
// deny imports and exports.
ls := lnc.randomServer()
if ls == nil {
t.Fatalf("Expected a leafnode server, got none")
}
gacc := ls.globalAccount().GetName()
ls.mu.Lock()
var hasDE, hasDI bool
for _, ln := range ls.leafs {
ln.mu.Lock()
if ln.leaf.remote.RemoteLeafOpts.LocalAccount == gacc {
re := ln.perms.pub.deny.Match(jsAllAPI)
hasDE = len(re.psubs)+len(re.qsubs) > 0
rs := ln.perms.sub.deny.Match(jsAllAPI)
hasDI = len(rs.psubs)+len(rs.qsubs) > 0
}
ln.mu.Unlock()
}
ls.mu.Unlock()
if !hasDE {
t.Fatalf("No deny export on global account")
}
if !hasDI {
t.Fatalf("No deny import on global account")
}
// Make a stream by connecting to the leafnode cluster. Make sure placement is correct.
// Client based API
nc, js := jsClientConnect(t, lnc.randomServer())
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster.Name != "LNC" {
t.Fatalf("Expected default placement to be %q, got %q", "LNC", si.Cluster.Name)
}
// Now make sure placement does not works for cluster in different domain
pcn := "C2"
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"baz"},
Replicas: 2,
Placement: &nats.Placement{Cluster: pcn},
})
if err == nil || !strings.Contains(err.Error(), "insufficient resources") {
t.Fatalf("Expected insufficient resources, got: %v", err)
}
}
func TestJetStreamSuperClusterSingleLeafNodeWithSharedSystemAccount(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
ln := sc.createSingleLeafNode(true)
defer ln.Shutdown()
// We want to make sure there is only one leader and its always in the supercluster.
sc.waitOnLeader()
// leafnodes should have been added into the overall peer count.
sc.waitOnPeerCount(7)
// Now make sure we can place a stream in the leaf node.
// First connect to the leafnode server itself.
nc, js := jsClientConnect(t, ln)
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster.Name != "LNS" {
t.Fatalf("Expected to be placed in leafnode with %q as cluster name, got %q", "LNS", si.Cluster.Name)
}
// Now check we can place on here as well but connect to the hub.
nc, js = jsClientConnect(t, sc.randomServer())
defer nc.Close()
si, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"bar"},
Placement: &nats.Placement{Cluster: "LNS"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster.Name != "LNS" {
t.Fatalf("Expected to be placed in leafnode with %q as cluster name, got %q", "LNS", si.Cluster.Name)
}
}
// Issue reported with superclusters and leafnodes where first few get next requests for pull subscribers
// have the wrong subject.
func TestJetStreamSuperClusterGetNextRewrite(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 2, 2)
defer sc.shutdown()
// Will connect the leafnode to cluster C1. We will then connect the "client" to cluster C2 to cross gateways.
ln := sc.clusterForName("C1").createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain("C", "nojs")
defer ln.Shutdown()
c2 := sc.clusterForName("C2")
nc, js := jsClientConnectEx(t, c2.randomServer(), "C", nats.UserInfo("nojs", "p"))
defer nc.Close()
// Create a stream and add messages.
if _, err := js.AddStream(&nats.StreamConfig{Name: "foo"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 10; i++ {
if _, err := js.Publish("foo", []byte("ok")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// Pull messages and make sure subject rewrite works.
sub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for _, m := range fetchMsgs(t, sub, 5, time.Second) {
if m.Subject != "foo" {
t.Fatalf("Expected %q as subject but got %q", "foo", m.Subject)
}
}
}
func TestJetStreamSuperClusterEphemeralCleanup(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
// Create a stream in cluster 0
s := sc.clusters[0].randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for _, test := range []struct {
name string
sourceInCluster int
streamName string
sourceName string
}{
{"local", 0, "TEST1", "S1"},
{"remote", 1, "TEST2", "S2"},
} {
t.Run(test.name, func(t *testing.T) {
if _, err := js.AddStream(&nats.StreamConfig{Name: test.streamName, Replicas: 3}); err != nil {
t.Fatalf("Error adding %q stream: %v", test.streamName, err)
}
if _, err := js.Publish(test.streamName, []byte("hello")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
// Now create a source for that stream, either in same or remote cluster.
s2 := sc.clusters[test.sourceInCluster].randomServer()
nc2, js2 := jsClientConnect(t, s2)
defer nc2.Close()
if _, err := js2.AddStream(&nats.StreamConfig{
Name: test.sourceName,
Storage: nats.FileStorage,
Sources: []*nats.StreamSource{{Name: test.streamName}},
Replicas: 1,
}); err != nil {
t.Fatalf("Error adding source stream: %v", err)
}
// Check that TEST(n) has 1 consumer and that S(n) is created and has 1 message.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo(test.sourceName)
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg, got state: %+v", si.State)
}
return nil
})
// Get the consumer because we will want to artificially reduce
// the delete threshold.
leader := sc.clusters[0].streamLeader("$G", test.streamName)
mset, err := leader.GlobalAccount().lookupStream(test.streamName)
if err != nil {
t.Fatalf("Expected to find a stream for %q, got %v", test.streamName, err)
}
cons := mset.getConsumers()[0]
cons.mu.Lock()
cons.dthresh = 1250 * time.Millisecond
active := cons.active
dtimerSet := cons.dtmr != nil
deliver := cons.cfg.DeliverSubject
cons.mu.Unlock()
if !active || dtimerSet {
t.Fatalf("Invalid values for active=%v dtimerSet=%v", active, dtimerSet)
}
// To add to the mix, let's create a local interest on the delivery subject
// and stop it. This is to ensure that this does not stop timers that should
// still be running and monitor the GW interest.
sub := natsSubSync(t, nc, deliver)
natsFlush(t, nc)
natsUnsub(t, sub)
natsFlush(t, nc)
// Now remove the "S(n)" stream...
if err := js2.DeleteStream(test.sourceName); err != nil {
t.Fatalf("Error deleting stream: %v", err)
}
// Now check that the stream S(n) is really removed and that
// the consumer is gone for stream TEST(n).
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
// First, make sure that stream S(n) has disappeared.
if _, err := js2.StreamInfo(test.sourceName); err == nil {
return fmt.Errorf("Stream %q should no longer exist", test.sourceName)
}
if ndc := mset.numDirectConsumers(); ndc != 0 {
return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, ndc)
}
return nil
})
})
}
}
func TestJetStreamSuperClusterGetNextSubRace(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 2, 2)
defer sc.shutdown()
// Will connect the leafnode to cluster C1. We will then connect the "client" to cluster C2 to cross gateways.
ln := sc.clusterForName("C1").createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain("C", "nojs")
defer ln.Shutdown()
// Shutdown 1 of the server from C1, (the one LN is not connected to)
for _, s := range sc.clusterForName("C1").servers {
s.mu.Lock()
if len(s.leafs) == 0 {
s.mu.Unlock()
s.Shutdown()
break
}
s.mu.Unlock()
}
// Wait on meta leader in case shutdown of server above caused an election.
sc.waitOnLeader()
var c2Srv *Server
// Take the server from C2 that has no inbound from C1.
c2 := sc.clusterForName("C2")
for _, s := range c2.servers {
var gwsa [2]*client
gws := gwsa[:0]
s.getInboundGatewayConnections(&gws)
if len(gws) == 0 {
c2Srv = s
break
}
}
if c2Srv == nil {
t.Fatalf("Both servers in C2 had an inbound GW connection!")
}
nc, js := jsClientConnectEx(t, c2Srv, "C", nats.UserInfo("nojs", "p"))
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "foo"})
require_NoError(t, err)
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", "ok")
}
// Wait for all messages to appear in the consumer
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("foo", "dur")
if err != nil {
return err
}
if n := ci.NumPending; n != 100 {
return fmt.Errorf("Expected 100 msgs, got %v", n)
}
return nil
})
req := &JSApiConsumerGetNextRequest{Batch: 1, Expires: 5 * time.Second}
jreq, err := json.Marshal(req)
require_NoError(t, err)
// Create this by hand here to make sure we create the subscription
// on the reply subject for every single request
nextSubj := fmt.Sprintf(JSApiRequestNextT, "foo", "dur")
nextSubj = "$JS.C.API" + strings.TrimPrefix(nextSubj, "$JS.API")
for i := 0; i < 100; i++ {
inbox := nats.NewInbox()
sub := natsSubSync(t, nc, inbox)
natsPubReq(t, nc, nextSubj, inbox, jreq)
msg := natsNexMsg(t, sub, time.Second)
if len(msg.Header) != 0 && string(msg.Data) != "ok" {
t.Fatalf("Unexpected message: header=%+v data=%s", msg.Header, msg.Data)
}
sub.Unsubscribe()
}
}
func TestJetStreamSuperClusterPullConsumerAndHeaders(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
c1 := sc.clusterForName("C1")
c2 := sc.clusterForName("C2")
nc, js := jsClientConnect(t, c1.randomServer())
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "ORIGIN"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 50
for i := 0; i < toSend; i++ {
if _, err := js.Publish("ORIGIN", []byte("ok")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
nc2, js2 := jsClientConnect(t, c2.randomServer())
defer nc2.Close()
_, err := js2.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "ORIGIN"}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Wait for them to be in the sourced stream.
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if si, _ := js2.StreamInfo("S"); si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Expected %d msgs for %q, got %d", toSend, "S", si.State.Msgs)
}
return nil
})
// Now create a pull consumer for the sourced stream.
_, err = js2.AddConsumer("S", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now we will connect and request the next message from each server in C1 cluster and check that headers remain in place.
for _, s := range c1.servers {
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
m, err := nc.Request("$JS.API.CONSUMER.MSG.NEXT.S.dlc", nil, 2*time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(m.Header) != 1 {
t.Fatalf("Expected 1 header element, got %+v", m.Header)
}
}
}
func TestJetStreamSuperClusterStatszActiveServers(t *testing.T) {
sc := createJetStreamSuperCluster(t, 2, 2)
defer sc.shutdown()
checkActive := func(expected int) {
t.Helper()
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
s := sc.randomServer()
nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer nc.Close()
resp, err := nc.Request(serverStatsPingReqSubj, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var ssm ServerStatsMsg
if err := json.Unmarshal(resp.Data, &ssm); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ssm.Stats.ActiveServers != expected {
return fmt.Errorf("Wanted %d, got %d", expected, ssm.Stats.ActiveServers)
}
return nil
})
}
checkActive(4)
c := sc.randomCluster()
ss := c.randomServer()
ss.Shutdown()
checkActive(3)
c.restartServer(ss)
checkActive(4)
}
func TestJetStreamSuperClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
c1 := sc.clusterForName("C1")
c2 := sc.clusterForName("C2")
nc, js := jsClientConnect(t, c1.randomServer())
defer nc.Close()
var sources []*nats.StreamSource
numStreams := 10
for i := 1; i <= numStreams; i++ {
name := fmt.Sprintf("O%d", i)
sources = append(sources, &nats.StreamSource{Name: name})
if _, err := js.AddStream(&nats.StreamConfig{Name: name}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Place our new stream that will source all the others in different cluster.
nc, js = jsClientConnect(t, c2.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "S",
Replicas: 2,
Sources: sources,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Force leader change twice.
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "S"), nil, time.Second)
c2.waitOnStreamLeader("$G", "S")
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "S"), nil, time.Second)
c2.waitOnStreamLeader("$G", "S")
// Now make sure we only have a single direct consumer on our origin streams.
// Pick one at random.
name := fmt.Sprintf("O%d", rand.Intn(numStreams-1)+1)
c1.waitOnStreamLeader("$G", name)
s := c1.streamLeader("$G", name)
a, err := s.lookupAccount("$G")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
mset, err := a.lookupStream(name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
if ndc := mset.numDirectConsumers(); ndc != 1 {
return fmt.Errorf("Stream %q wanted 1 direct consumer, got %d", name, ndc)
}
return nil
})
// Now create a mirror of selected from above. Will test same scenario.
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Replicas: 2,
Mirror: &nats.StreamSource{Name: name},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Force leader change twice.
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "M"), nil, time.Second)
c2.waitOnStreamLeader("$G", "M")
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "M"), nil, time.Second)
c2.waitOnStreamLeader("$G", "M")
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
if ndc := mset.numDirectConsumers(); ndc != 2 {
return fmt.Errorf("Stream %q wanted 2 direct consumers, got %d", name, ndc)
}
return nil
})
}
func TestJetStreamSuperClusterPushConsumerInterest(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
for _, test := range []struct {
name string
queue string
}{
{"non queue", _EMPTY_},
{"queue", "queue"},
} {
t.Run(test.name, func(t *testing.T) {
testInterest := func(s *Server) {
t.Helper()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
var sub *nats.Subscription
if test.queue != _EMPTY_ {
sub, err = js.QueueSubscribeSync("foo", test.queue)
} else {
sub, err = js.SubscribeSync("foo", nats.Durable("dur"))
}
require_NoError(t, err)
js.Publish("foo", []byte("msg1"))
// Since the GW watcher is checking every 1sec, make sure we are
// giving it enough time for the delivery to start.
_, err = sub.NextMsg(2 * time.Second)
require_NoError(t, err)
}
// Create the durable push consumer from cluster "0"
testInterest(sc.clusters[0].servers[0])
// Now "move" to a server in cluster "1"
testInterest(sc.clusters[1].servers[0])
})
}
}
func TestJetStreamSuperClusterOverflowPlacement(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3)
defer sc.shutdown()
pcn := "C2"
s := sc.clusterForName(pcn).randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// With this setup, we opted in for requiring MaxBytes, so this should error.
_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Replicas: 3,
})
require_Error(t, err, NewJSStreamMaxBytesRequiredError())
// R=2 on purpose to leave one server empty.
_, err = js.AddStream(&nats.StreamConfig{
Name: "foo",
Replicas: 2,
MaxBytes: 2 * 1024 * 1024 * 1024,
})
require_NoError(t, err)
// Now try to add another that will overflow the current cluster's reservation.
// Since we asked explicitly for the same cluster this should fail.
// Note this will not be testing the peer picker since the update has probably not made it to the meta leader.
_, err = js.AddStream(&nats.StreamConfig{
Name: "bar",
Replicas: 3,
MaxBytes: 2 * 1024 * 1024 * 1024,
Placement: &nats.Placement{Cluster: pcn},
})
require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError())
// Now test actual overflow placement. So try again with no placement designation.
// This will test the peer picker's logic since they are updated at this point and the meta leader
// knows it can not place it in C2.
si, err := js.AddStream(&nats.StreamConfig{
Name: "bar",
Replicas: 3,
MaxBytes: 2 * 1024 * 1024 * 1024,
})
require_NoError(t, err)
// Make sure we did not get place into C2.
falt := si.Cluster.Name
if falt == pcn {
t.Fatalf("Expected to be placed in another cluster besides %q, but got %q", pcn, falt)
}
// One more time that should spill over again to our last cluster.
si, err = js.AddStream(&nats.StreamConfig{
Name: "baz",
Replicas: 3,
MaxBytes: 2 * 1024 * 1024 * 1024,
})
require_NoError(t, err)
// Make sure we did not get place into C2.
if salt := si.Cluster.Name; salt == pcn || salt == falt {
t.Fatalf("Expected to be placed in last cluster besides %q or %q, but got %q", pcn, falt, salt)
}
// Now place a stream of R1 into C2 which should have space.
si, err = js.AddStream(&nats.StreamConfig{
Name: "dlc",
MaxBytes: 2 * 1024 * 1024 * 1024,
})
require_NoError(t, err)
if si.Cluster.Name != pcn {
t.Fatalf("Expected to be placed in our origin cluster %q, but got %q", pcn, si.Cluster.Name)
}
}
func TestJetStreamSuperClusterConcurrentOverflow(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3)
defer sc.shutdown()
pcn := "C2"
startCh := make(chan bool)
var wg sync.WaitGroup
var swg sync.WaitGroup
start := func(name string) {
defer wg.Done()
s := sc.clusterForName(pcn).randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
swg.Done()
<-startCh
_, err := js.AddStream(&nats.StreamConfig{
Name: name,
Replicas: 3,
MaxBytes: 2 * 1024 * 1024 * 1024,
})
require_NoError(t, err)
}
wg.Add(2)
swg.Add(2)
go start("foo")
go start("bar")
swg.Wait()
// Now start both at same time.
close(startCh)
wg.Wait()
}
func TestJetStreamSuperClusterStreamTagPlacement(t *testing.T) {
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()
placeOK := func(connectCluster string, tags []string, expectedCluster string) {
t.Helper()
nc, js := jsClientConnect(t, sc.clusterForName(connectCluster).randomServer())
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Placement: &nats.Placement{Tags: tags},
})
require_NoError(t, err)
if si.Cluster.Name != expectedCluster {
t.Fatalf("Failed to place properly in %q, got %q", expectedCluster, si.Cluster.Name)
}
js.DeleteStream("TEST")
}
placeOK("C2", []string{"cloud:aws"}, "C1")
placeOK("C2", []string{"country:jp"}, "C3")
placeOK("C1", []string{"cloud:gcp", "country:uk"}, "C2")
// Case shoud not matter.
placeOK("C1", []string{"cloud:GCP", "country:UK"}, "C2")
placeOK("C2", []string{"Cloud:AwS", "Country:uS"}, "C1")
placeErr := func(connectCluster string, tags []string) {
t.Helper()
nc, js := jsClientConnect(t, sc.clusterForName(connectCluster).randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Placement: &nats.Placement{Tags: tags},
})
require_Error(t, err, NewJSInsufficientResourcesError())
}
placeErr("C1", []string{"cloud:GCP", "country:US"})
placeErr("C1", []string{"country:DN"})
placeErr("C1", []string{"cloud:DO"})
}
func TestJetStreamSuperClusterRemovedPeersAndStreamsListAndDelete(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
pcn := "C2"
sc.waitOnLeader()
ml := sc.leader()
if ml.ClusterName() == pcn {
pcn = "C1"
}
// Client based API
nc, js := jsClientConnect(t, ml)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "GONE",
Replicas: 3,
Placement: &nats.Placement{Cluster: pcn},
})
require_NoError(t, err)
_, err = js.AddConsumer("GONE", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 3,
Placement: &nats.Placement{Cluster: ml.ClusterName()},
})
require_NoError(t, err)
// Put messages in..
num := 100
for i := 0; i < num; i++ {
js.PublishAsync("GONE", []byte("SLS"))
js.PublishAsync("TEST", []byte("SLS"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
c := sc.clusterForName(pcn)
c.shutdown()
// Grab Stream List..
start := time.Now()
resp, err := nc.Request(JSApiStreamList, nil, 2*time.Second)
require_NoError(t, err)
if delta := time.Since(start); delta > 100*time.Millisecond {
t.Fatalf("Stream list call took too long to return: %v", delta)
}
var list JSApiStreamListResponse
err = json.Unmarshal(resp.Data, &list)
require_NoError(t, err)
if len(list.Missing) != 1 || list.Missing[0] != "GONE" {
t.Fatalf("Wrong Missing: %+v", list)
}
// Check behavior of stream info as well. We want it to return the stream is offline and not just timeout.
_, err = js.StreamInfo("GONE")
// FIXME(dlc) - Go client not putting nats: prefix on for stream but does for consumer.
require_Error(t, err, NewJSStreamOfflineError(), errors.New("nats: stream is offline"))
// Same for Consumer
start = time.Now()
resp, err = nc.Request("$JS.API.CONSUMER.LIST.GONE", nil, 2*time.Second)
require_NoError(t, err)
if delta := time.Since(start); delta > 100*time.Millisecond {
t.Fatalf("Consumer list call took too long to return: %v", delta)
}
var clist JSApiConsumerListResponse
err = json.Unmarshal(resp.Data, &clist)
require_NoError(t, err)
if len(clist.Missing) != 1 || clist.Missing[0] != "dlc" {
t.Fatalf("Wrong Missing: %+v", clist)
}
_, err = js.ConsumerInfo("GONE", "dlc")
require_Error(t, err, NewJSConsumerOfflineError(), errors.New("nats: consumer is offline"))
// Make sure delete works.
err = js.DeleteConsumer("GONE", "dlc")
require_NoError(t, err)
err = js.DeleteStream("GONE")
require_NoError(t, err)
// Test it is really gone.
_, err = js.StreamInfo("GONE")
require_Error(t, err, nats.ErrStreamNotFound)
}
func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
pcn := "C2"
sc.waitOnLeader()
ml := sc.leader()
if ml.ClusterName() == pcn {
pcn = "C1"
}
// Client based API
nc, js := jsClientConnect(t, ml)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "T",
Replicas: 3,
Placement: &nats.Placement{Cluster: pcn},
})
require_NoError(t, err)
// Put messages in..
num := 100
for i := 0; i < num; i++ {
js.PublishAsync("T", []byte("OK"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
ci, err := js.AddConsumer("T", &nats.ConsumerConfig{
Durable: "d",
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverNewPolicy,
})
require_NoError(t, err)
if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 {
t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered)
}
c := sc.clusterForName(pcn)
for _, s := range c.servers {
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
removeDir(t, sd)
s = c.restartServer(s)
c.waitOnServerHealthz(s)
}
c.waitOnConsumerLeader("$G", "T", "d")
ci, err = js.ConsumerInfo("T", "d")
require_NoError(t, err)
if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 {
t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered)
}
if ci.NumPending != 0 {
t.Fatalf("Did not expect NumPending, got %d", ci.NumPending)
}
}
// This will test our ability to move streams and consumers between clusters.
func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) {
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
for _, test := range []struct {
name string
replicas int
}{
{"R1", 1},
{"R3", 3},
} {
t.Run(test.name, func(t *testing.T) {
replicas := test.replicas
si, err := js.AddStream(&nats.StreamConfig{
Name: "MOVE",
Replicas: replicas,
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
})
require_NoError(t, err)
defer js.DeleteStream("MOVE")
if si.Cluster.Name != "C1" {
t.Fatalf("Failed to place properly in %q, got %q", "C1", si.Cluster.Name)
}
for i := 0; i < 1000; i++ {
_, err := js.PublishAsync("MOVE", []byte("Moving on up"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// Durable Push Consumer, so same R.
dpushSub, err := js.SubscribeSync("MOVE", nats.Durable("dlc"))
require_NoError(t, err)
defer dpushSub.Unsubscribe()
// Ephemeral Push Consumer, R1.
epushSub, err := js.SubscribeSync("MOVE")
require_NoError(t, err)
defer epushSub.Unsubscribe()
// Durable Pull Consumer, so same R.
dpullSub, err := js.PullSubscribe("MOVE", "dlc-pull")
require_NoError(t, err)
defer dpullSub.Unsubscribe()
// TODO(dlc) - Server supports ephemeral pulls but Go client does not yet.
si, err = js.StreamInfo("MOVE")
require_NoError(t, err)
if si.State.Consumers != 3 {
t.Fatalf("Expected 3 attached consumers, got %d", si.State.Consumers)
}
initialState := si.State
checkSubsPending(t, dpushSub, int(initialState.Msgs))
checkSubsPending(t, epushSub, int(initialState.Msgs))
// Ack 100
toAck := 100
for i := 0; i < toAck; i++ {
m, err := dpushSub.NextMsg(time.Second)
require_NoError(t, err)
m.AckSync()
// Ephemeral
m, err = epushSub.NextMsg(time.Second)
require_NoError(t, err)
m.AckSync()
}
// Do same with pull subscriber.
for _, m := range fetchMsgs(t, dpullSub, toAck, 5*time.Second) {
m.AckSync()
}
// First make sure we disallow move and replica changes in same update.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "MOVE",
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
Replicas: replicas + 1,
})
require_Error(t, err, NewJSStreamMoveAndScaleError())
// Now move to new cluster.
si, err = js.UpdateStream(&nats.StreamConfig{
Name: "MOVE",
Replicas: replicas,
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
})
require_NoError(t, err)
if si.Cluster.Name != "C1" {
t.Fatalf("Expected cluster of %q but got %q", "C1", si.Cluster.Name)
}
// Make sure we can not move an inflight stream and consumers, should error.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "MOVE",
Replicas: replicas,
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
})
require_Error(t, err, NewJSStreamMoveInProgressError())
checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
si, err := js.StreamInfo("MOVE")
if err != nil {
return err
}
// We should see 2X peers.
numPeers := len(si.Cluster.Replicas)
if si.Cluster.Leader != _EMPTY_ {
numPeers++
}
if numPeers != 2*replicas {
return fmt.Errorf("Expected to see %d replicas, got %d", 2*replicas, numPeers)
}
return nil
})
// Expect a new leader to emerge and replicas to drop as a leader is elected.
// We have to check fast or it might complete and we will not see intermediate steps.
sc.waitOnStreamLeader("$G", "MOVE")
checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
si, err := js.StreamInfo("MOVE")
if err != nil {
return err
}
if len(si.Cluster.Replicas) >= 2*replicas {
return fmt.Errorf("Expected <%d replicas, got %d", 2*replicas, len(si.Cluster.Replicas))
}
return nil
})
// Should see the cluster designation and leader switch to C2.
// We should also shrink back down to original replica count.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("MOVE")
if err != nil {
return err
}
if si.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("No leader yet")
} else if !strings.HasPrefix(si.Cluster.Leader, "C2-") {
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
}
// Now we want to see that we shrink back to original.
if len(si.Cluster.Replicas) != replicas-1 {
return fmt.Errorf("Expected %d replicas, got %d", replicas-1, len(si.Cluster.Replicas))
}
return nil
})
// Check moved state is same as initial state.
si, err = js.StreamInfo("MOVE")
require_NoError(t, err)
if si.State != initialState {
t.Fatalf("States do not match after migration:\n%+v\nvs\n%+v", si.State, initialState)
}
// Make sure we can still send messages.
addN := toAck
for i := 0; i < addN; i++ {
_, err := js.Publish("MOVE", []byte("Done Moved"))
require_NoError(t, err)
}
si, err = js.StreamInfo("MOVE")
require_NoError(t, err)
expectedPushMsgs := initialState.Msgs + uint64(addN)
expectedPullMsgs := uint64(addN)
if si.State.Msgs != expectedPushMsgs {
t.Fatalf("Expected to be able to send new messages")
}
// Now check consumers, make sure the state is correct and that they transferred state and reflect the new messages.
// We Ack'd 100 and sent another 100, so should be same.
checkConsumer := func(sub *nats.Subscription, isPull bool) {
t.Helper()
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
ci, err := sub.ConsumerInfo()
if err != nil {
return err
}
var expectedMsgs uint64
if isPull {
expectedMsgs = expectedPullMsgs
} else {
expectedMsgs = expectedPushMsgs
}
if ci.Delivered.Consumer != expectedMsgs || ci.Delivered.Stream != expectedMsgs {
return fmt.Errorf("Delivered for %q is not correct: %+v", ci.Name, ci.Delivered)
}
if ci.AckFloor.Consumer != uint64(toAck) || ci.AckFloor.Stream != uint64(toAck) {
return fmt.Errorf("AckFloor for %q is not correct: %+v", ci.Name, ci.AckFloor)
}
if isPull && ci.NumAckPending != 0 {
return fmt.Errorf("NumAckPending for %q is not correct: %v", ci.Name, ci.NumAckPending)
} else if !isPull && ci.NumAckPending != int(initialState.Msgs) {
return fmt.Errorf("NumAckPending for %q is not correct: %v", ci.Name, ci.NumAckPending)
}
// Make sure the replicas etc are back to what is expected.
si, err := js.StreamInfo("MOVE")
if err != nil {
return err
}
numExpected := si.Config.Replicas
if ci.Config.Durable == _EMPTY_ {
numExpected = 1
}
numPeers := len(ci.Cluster.Replicas)
if ci.Cluster.Leader != _EMPTY_ {
numPeers++
}
if numPeers != numExpected {
return fmt.Errorf("Expected %d peers, got %d", numExpected, numPeers)
}
// If we are push check sub pending.
if !isPull {
checkSubsPending(t, sub, int(expectedPushMsgs)-toAck)
}
return nil
})
}
checkPushConsumer := func(sub *nats.Subscription) {
t.Helper()
checkConsumer(sub, false)
}
checkPullConsumer := func(sub *nats.Subscription) {
t.Helper()
checkConsumer(sub, true)
}
checkPushConsumer(dpushSub)
checkPushConsumer(epushSub)
checkPullConsumer(dpullSub)
// Cleanup
err = js.DeleteStream("MOVE")
require_NoError(t, err)
})
}
}
func TestJetStreamSuperClusterMovingStreamsWithMirror(t *testing.T) {
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "SOURCE",
Subjects: []string{"foo", "bar"},
Replicas: 3,
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRROR",
Replicas: 1,
Mirror: &nats.StreamSource{Name: "SOURCE"},
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
})
require_NoError(t, err)
done := make(chan struct{})
exited := make(chan struct{})
errors := make(chan error, 1)
numNoResp := uint64(0)
// We will run a separate routine and send at 100hz
go func() {
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
defer close(exited)
for {
select {
case <-done:
return
case <-time.After(10 * time.Millisecond):
_, err := js.Publish("foo", []byte("100HZ"))
if err == nil {
} else if err == nats.ErrNoStreamResponse {
atomic.AddUint64(&numNoResp, 1)
continue
}
if err != nil {
errors <- err
return
}
}
}
}()
// Let it get going.
time.Sleep(500 * time.Millisecond)
// Now move the source to a new cluster.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "SOURCE",
Subjects: []string{"foo", "bar"},
Replicas: 3,
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
})
require_NoError(t, err)
checkFor(t, 30*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("SOURCE")
if err != nil {
return err
}
if si.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("No leader yet")
} else if !strings.HasPrefix(si.Cluster.Leader, "C2-") {
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
}
// Now we want to see that we shrink back to original.
if len(si.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected %d replicas, got %d", 2, len(si.Cluster.Replicas))
}
// Let's get to 50+ msgs.
if si.State.Msgs < 50 {
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
}
return nil
})
close(done)
<-exited
if nnr := atomic.LoadUint64(&numNoResp); nnr > 0 {
if nnr > 5 {
t.Fatalf("Expected no or very few failed message publishes, got %d", nnr)
} else {
t.Logf("Got a few failed publishes: %d", nnr)
}
}
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("SOURCE")
require_NoError(t, err)
mi, err := js.StreamInfo("MIRROR")
require_NoError(t, err)
if si.State != mi.State {
return fmt.Errorf("Expected mirror to be the same, got %+v vs %+v", mi.State, si.State)
}
return nil
})
}
func TestJetStreamSuperClusterMovingStreamAndMoveBack(t *testing.T) {
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
for _, test := range []struct {
name string
replicas int
}{
{"R1", 1},
{"R3", 3},
} {
t.Run(test.name, func(t *testing.T) {
js.DeleteStream("TEST")
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: test.replicas,
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
})
require_NoError(t, err)
toSend := 10_000
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("TEST", []byte("HELLO WORLD"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Replicas: test.replicas,
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
})
require_NoError(t, err)
checkMove := func(cluster string) {
t.Helper()
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return err
}
if si.Cluster.Name != cluster {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("No leader yet")
} else if !strings.HasPrefix(si.Cluster.Leader, cluster) {
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
}
// Now we want to see that we shrink back to original.
if len(si.Cluster.Replicas) != test.replicas-1 {
return fmt.Errorf("Expected %d replicas, got %d", test.replicas-1, len(si.Cluster.Replicas))
}
if si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
}
return nil
})
}
checkMove("C2")
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Replicas: test.replicas,
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
})
require_NoError(t, err)
checkMove("C1")
})
}
}
func TestJetStreamSuperClusterImportConsumerStreamSubjectRemap(t *testing.T) {
template := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: HUB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts: {
JS: {
jetstream: enabled
users: [ {user: js, password: pwd} ]
exports [
# This is streaming to a delivery subject for a push based consumer.
{ stream: "deliver.ORDERS.*" }
# This is to ack received messages. This is a service to support sync ack.
{ service: "$JS.ACK.ORDERS.*.>" }
# To support ordered consumers, flow control.
{ service: "$JS.FC.>" }
]
},
IM: {
users: [ {user: im, password: pwd} ]
imports [
{ stream: { account: JS, subject: "deliver.ORDERS.route" }}
{ stream: { account: JS, subject: "deliver.ORDERS.gateway" }}
{ stream: { account: JS, subject: "deliver.ORDERS.leaf1" }}
{ stream: { account: JS, subject: "deliver.ORDERS.leaf2" }}
{ service: {account: JS, subject: "$JS.FC.>" }}
]
},
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] },
}
leaf {
listen: 127.0.0.1:-1
}`
test := func(t *testing.T, queue bool) {
c := createJetStreamSuperClusterWithTemplate(t, template, 3, 2)
defer c.shutdown()
s := c.randomServer()
nc, js := jsClientConnect(t, s, nats.UserInfo("js", "pwd"))
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"foo"}, // The JS subject.
Replicas: 3,
Placement: &nats.Placement{Cluster: "C1"},
})
require_NoError(t, err)
_, err = js.Publish("foo", []byte("OK"))
require_NoError(t, err)
for dur, deliver := range map[string]string{
"dur-route": "deliver.ORDERS.route",
"dur-gateway": "deliver.ORDERS.gateway",
"dur-leaf-1": "deliver.ORDERS.leaf1",
"dur-leaf-2": "deliver.ORDERS.leaf2",
} {
cfg := &nats.ConsumerConfig{
Durable: dur,
DeliverSubject: deliver,
AckPolicy: nats.AckExplicitPolicy,
}
if queue {
cfg.DeliverGroup = "queue"
}
_, err = js.AddConsumer("ORDERS", cfg)
require_NoError(t, err)
}
testCase := func(t *testing.T, s *Server, dSubj string) {
nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("im", "pwd"))
require_NoError(t, err)
defer nc2.Close()
var sub *nats.Subscription
if queue {
sub, err = nc2.QueueSubscribeSync(dSubj, "queue")
} else {
sub, err = nc2.SubscribeSync(dSubj)
}
require_NoError(t, err)
m, err := sub.NextMsg(time.Second)
require_NoError(t, err)
if m.Subject != "foo" {
t.Fatalf("Subject not mapped correctly across account boundary, expected %q got %q", "foo", m.Subject)
}
require_False(t, strings.Contains(m.Reply, "@"))
}
t.Run("route", func(t *testing.T) {
// pick random non consumer leader so we receive via route
s := c.clusterForName("C1").randomNonConsumerLeader("JS", "ORDERS", "dur-route")
testCase(t, s, "deliver.ORDERS.route")
})
t.Run("gateway", func(t *testing.T) {
// pick server with inbound gateway from consumer leader, so we receive from gateway and have no route in between
scl := c.clusterForName("C1").consumerLeader("JS", "ORDERS", "dur-gateway")
var sfound *Server
for _, s := range c.clusterForName("C2").servers {
s.mu.Lock()
for _, c := range s.gateway.in {
if c.GetName() == scl.info.ID {
sfound = s
break
}
}
s.mu.Unlock()
if sfound != nil {
break
}
}
testCase(t, sfound, "deliver.ORDERS.gateway")
})
t.Run("leaf-post-export", func(t *testing.T) {
// create leaf node server connected post export/import
scl := c.clusterForName("C1").consumerLeader("JS", "ORDERS", "dur-leaf-1")
cf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes [ { url: "nats://im:pwd@127.0.0.1:%d" } ]
}
authorization: {
user: im,
password: pwd
}
`, scl.getOpts().LeafNode.Port)))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
checkLeafNodeConnected(t, scl)
testCase(t, s, "deliver.ORDERS.leaf1")
})
t.Run("leaf-pre-export", func(t *testing.T) {
// create leaf node server connected pre export, perform export/import on leaf node server
scl := c.clusterForName("C1").consumerLeader("JS", "ORDERS", "dur-leaf-2")
cf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes [ { url: "nats://js:pwd@127.0.0.1:%d", account: JS2 } ]
}
accounts: {
JS2: {
users: [ {user: js, password: pwd} ]
exports [
# This is streaming to a delivery subject for a push based consumer.
{ stream: "deliver.ORDERS.leaf2" }
# This is to ack received messages. This is a service to support sync ack.
{ service: "$JS.ACK.ORDERS.*.>" }
# To support ordered consumers, flow control.
{ service: "$JS.FC.>" }
]
},
IM2: {
users: [ {user: im, password: pwd} ]
imports [
{ stream: { account: JS2, subject: "deliver.ORDERS.leaf2" }}
{ service: {account: JS2, subject: "$JS.FC.>" }}
]
},
}
`, scl.getOpts().LeafNode.Port)))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
checkLeafNodeConnected(t, scl)
testCase(t, s, "deliver.ORDERS.leaf2")
})
}
t.Run("noQueue", func(t *testing.T) {
test(t, false)
})
t.Run("queue", func(t *testing.T) {
test(t, true)
})
}
func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s', limits: {max_ha_assets: 2}}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`, 3, 2,
func(serverName, clusterName, storeDir, conf string) string {
return conf
})
defer sc.shutdown()
// speed up statsz reporting
for _, c := range sc.clusters {
for _, s := range c.servers {
s.mu.Lock()
s.sys.statsz = 10 * time.Millisecond
s.sys.cstatsz = s.sys.statsz
s.sys.stmr.Reset(s.sys.statsz)
s.mu.Unlock()
}
}
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
ncSys := natsConnect(t, sc.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
statszSub, err := ncSys.SubscribeSync(fmt.Sprintf(serverStatsSubj, "*"))
require_NoError(t, err)
require_NoError(t, ncSys.Flush())
waitStatsz := func(peers, haassets int) {
t.Helper()
for peersWithExactHaAssets := 0; peersWithExactHaAssets < peers; {
m, err := statszSub.NextMsg(time.Second)
require_NoError(t, err)
var statsz ServerStatsMsg
err = json.Unmarshal(m.Data, &statsz)
require_NoError(t, err)
if statsz.Stats.JetStream == nil {
continue
}
if haassets == statsz.Stats.JetStream.Stats.HAAssets {
peersWithExactHaAssets++
}
}
}
waitStatsz(6, 1) // counts _meta_
_, err = js.AddStream(&nats.StreamConfig{Name: "S0", Replicas: 1, Placement: &nats.Placement{Cluster: "C1"}})
require_NoError(t, err)
waitStatsz(6, 1)
_, err = js.AddStream(&nats.StreamConfig{Name: "S1", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}})
require_NoError(t, err)
waitStatsz(3, 2)
waitStatsz(3, 1)
_, err = js.AddStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}})
require_NoError(t, err)
waitStatsz(3, 3)
waitStatsz(3, 1)
_, err = js.AddStream(&nats.StreamConfig{Name: "S3", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}})
require_Error(t, err)
require_Equal(t, err.Error(), "insufficient resources")
require_NoError(t, js.DeleteStream("S1"))
waitStatsz(3, 2)
waitStatsz(3, 1)
_, err = js.AddConsumer("S2", &nats.ConsumerConfig{Durable: "DUR1", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
waitStatsz(3, 3)
waitStatsz(3, 1)
_, err = js.AddConsumer("S2", &nats.ConsumerConfig{Durable: "DUR2", AckPolicy: nats.AckExplicitPolicy})
require_Error(t, err)
require_Equal(t, err.Error(), "insufficient resources")
_, err = js.AddConsumer("S2", &nats.ConsumerConfig{AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
waitStatsz(3, 3)
waitStatsz(3, 1)
_, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Description: "foobar"})
require_NoError(t, err)
waitStatsz(3, 3)
waitStatsz(3, 1)
si, err := js.AddStream(&nats.StreamConfig{Name: "S4", Replicas: 3})
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "C2")
waitStatsz(3, 3)
waitStatsz(3, 2)
si, err = js.AddStream(&nats.StreamConfig{Name: "S5", Replicas: 3})
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "C2")
waitStatsz(6, 3)
_, err = js.AddConsumer("S4", &nats.ConsumerConfig{Durable: "DUR2", AckPolicy: nats.AckExplicitPolicy})
require_Error(t, err)
require_Equal(t, err.Error(), "insufficient resources")
_, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Placement: &nats.Placement{Cluster: "C2"}})
require_Error(t, err)
require_Equal(t, err.Error(), "insufficient resources")
}
func TestJetStreamSuperClusterStreamAlternates(t *testing.T) {
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
// C1
_, err := js.AddStream(&nats.StreamConfig{
Name: "SOURCE",
Subjects: []string{"foo", "bar", "baz"},
Replicas: 3,
Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}},
})
require_NoError(t, err)
// C2
_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRROR-1",
Replicas: 1,
Mirror: &nats.StreamSource{Name: "SOURCE"},
Placement: &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}},
})
require_NoError(t, err)
// C3
_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRROR-2",
Replicas: 2,
Mirror: &nats.StreamSource{Name: "SOURCE"},
Placement: &nats.Placement{Tags: []string{"cloud:az", "country:jp"}},
})
require_NoError(t, err)
// No client support yet, so do by hand.
getStreamInfo := func(nc *nats.Conn, expected string) {
t.Helper()
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "SOURCE"), nil, time.Second)
require_NoError(t, err)
var si StreamInfo
err = json.Unmarshal(resp.Data, &si)
require_NoError(t, err)
require_True(t, len(si.Alternates) == 3)
require_True(t, si.Alternates[0].Cluster == expected)
seen := make(map[string]struct{})
for _, alt := range si.Alternates {
seen[alt.Cluster] = struct{}{}
}
require_True(t, len(seen) == 3)
}
// Connect to different clusters to check ordering.
nc, _ = jsClientConnect(t, sc.clusterForName("C1").randomServer())
getStreamInfo(nc, "C1")
nc, _ = jsClientConnect(t, sc.clusterForName("C2").randomServer())
getStreamInfo(nc, "C2")
nc, _ = jsClientConnect(t, sc.clusterForName("C3").randomServer())
getStreamInfo(nc, "C3")
}
// We had a scenario where a consumer would not recover properly on restart due to
// the cluster state not being set properly when checking source subjects.
func TestJetStreamSuperClusterStateOnRestartPreventsConsumerRecovery(t *testing.T) {
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
// C1
_, err := js.AddStream(&nats.StreamConfig{
Name: "SOURCE",
Subjects: []string{"foo", "bar"},
Replicas: 3,
Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}},
})
require_NoError(t, err)
// C2
_, err = js.AddStream(&nats.StreamConfig{
Name: "DS",
Subjects: []string{"baz"},
Replicas: 3,
Sources: []*nats.StreamSource{{Name: "SOURCE"}},
Placement: &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}},
})
require_NoError(t, err)
// Bind to DS and match filter subject of SOURCE.
_, err = js.AddConsumer("DS", &nats.ConsumerConfig{
Durable: "dlc",
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: "foo",
DeliverSubject: "d",
})
require_NoError(t, err)
// Send a few messages.
for i := 0; i < 100; i++ {
_, err := js.Publish("foo", []byte("HELLO"))
require_NoError(t, err)
}
sub := natsSubSync(t, nc, "d")
natsNexMsg(t, sub, time.Second)
c := sc.clusterForName("C2")
cl := c.consumerLeader("$G", "DS", "dlc")
// Pull source out from underneath the downstream stream.
err = js.DeleteStream("SOURCE")
require_NoError(t, err)
cl.Shutdown()
cl = c.restartServer(cl)
c.waitOnServerHealthz(cl)
// Now make sure the consumer is still on this server and has restarted properly.
mset, err := cl.GlobalAccount().lookupStream("DS")
require_NoError(t, err)
if o := mset.lookupConsumer("dlc"); o == nil {
t.Fatalf("Consumer was not properly restarted")
}
}