From 27245891f2eddee34ed500e1410e0f9825f43e29 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 15 Sep 2023 16:11:00 -0700 Subject: [PATCH] Add test for scaling replica with pull consumers Signed-off-by: Waldemar Quevedo --- server/jetstream_cluster_3_test.go | 110 +++++++++++++++++++++++++++++ server/jetstream_helpers_test.go | 15 ++++ server/raft.go | 2 + 3 files changed, 127 insertions(+) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 629f7976..ed28ce5e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -18,6 +18,7 @@ package server import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -5682,3 +5683,112 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { require_True(t, s.lookupRaftNode(sgn) == nil) require_True(t, s.lookupRaftNode(ogn) == nil) } + +func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + nc2, producer := jsClientConnect(t, s) + defer nc2.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + end := time.Now().Add(10 * time.Second) + for time.Now().Before(end) { + select { + case <-ctx.Done(): + default: + } + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i)) + require_NoError(t, err) + + wg.Add(1) + go func() { + defer wg.Done() + for range time.NewTicker(10 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + + msgs, err := sub.Fetch(1) + if err != nil && !errors.Is(err, nats.ErrTimeout) { + t.Logf("Pull Error: %v", err) + } + for _, msg := range msgs { + msg.Ack() + } + } + }() + } + + c.lameDuckRestartAll() + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Start publishing again for a while. + end = time.Now().Add(10 * time.Second) + for time.Now().Before(end) { + select { + case <-ctx.Done(): + default: + } + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + } + + fmt.Printf("SCALE DOWN TO R1\n") + + // Try to do a stream edit back to R=1 after doing all the upgrade. + info, _ := js.StreamInfo("TEST") + sconfig := info.Config + sconfig.Replicas = 1 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + + // Let running for some time. + time.Sleep(10 * time.Second) + + fmt.Printf("SCALE UP TO R3\n") + + info, _ = js.StreamInfo("TEST") + sconfig = info.Config + sconfig.Replicas = 3 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + // Let running after the update... + time.Sleep(10 * time.Second) + + // Start publishing again for a while. + end = time.Now().Add(30 * time.Second) + for time.Now().Before(end) { + select { + case <-ctx.Done(): + default: + } + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + // Stop goroutines and wait for them to exit. + cancel() + wg.Wait() +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index d4d31f48..1d6813dd 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1541,6 +1541,21 @@ func (c *cluster) restartAll() { c.waitOnClusterReady() } +func (c *cluster) lameDuckRestartAll() { + c.t.Helper() + for i, s := range c.servers { + s.lameDuckMode() + s.WaitForShutdown() + if !s.Running() { + opts := c.opts[i] + s, o := RunServerWithConfig(opts.ConfigFile) + c.servers[i] = s + c.opts[i] = o + } + } + c.waitOnClusterReady() +} + func (c *cluster) restartAllSamePorts() { c.t.Helper() for i, s := range c.servers { diff --git a/server/raft.go b/server/raft.go index 076b8e0d..920f6f41 100644 --- a/server/raft.go +++ b/server/raft.go @@ -674,7 +674,9 @@ func (n *raft) Propose(data []byte) error { func (n *raft) ProposeDirect(entries []*Entry) error { n.RLock() if n.state != Leader { + group := n.group n.RUnlock() + fmt.Printf("Direct proposal ignored, not leader (state: %v, group: %v)\n", n.state, group) n.debug("Direct proposal ignored, not leader (state: %v)", n.state) return errNotLeader }