Add test for scaling replica with pull consumers

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
Waldemar Quevedo
2023-09-15 16:11:00 -07:00
parent 8f0e65fe0d
commit 27245891f2
3 changed files with 127 additions and 0 deletions

View File

@@ -18,6 +18,7 @@ package server
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -5682,3 +5683,112 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
nc2, producer := jsClientConnect(t, s)
defer nc2.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
end := time.Now().Add(10 * time.Second)
for time.Now().Before(end) {
select {
case <-ctx.Done():
default:
}
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i))
require_NoError(t, err)
wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}
msgs, err := sub.Fetch(1)
if err != nil && !errors.Is(err, nats.ErrTimeout) {
t.Logf("Pull Error: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
c.lameDuckRestartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")
// Start publishing again for a while.
end = time.Now().Add(10 * time.Second)
for time.Now().Before(end) {
select {
case <-ctx.Done():
default:
}
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
}
fmt.Printf("SCALE DOWN TO R1\n")
// Try to do a stream edit back to R=1 after doing all the upgrade.
info, _ := js.StreamInfo("TEST")
sconfig := info.Config
sconfig.Replicas = 1
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)
// Let running for some time.
time.Sleep(10 * time.Second)
fmt.Printf("SCALE UP TO R3\n")
info, _ = js.StreamInfo("TEST")
sconfig = info.Config
sconfig.Replicas = 3
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)
// Let running after the update...
time.Sleep(10 * time.Second)
// Start publishing again for a while.
end = time.Now().Add(30 * time.Second)
for time.Now().Before(end) {
select {
case <-ctx.Done():
default:
}
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}
// Stop goroutines and wait for them to exit.
cancel()
wg.Wait()
}

View File

@@ -1541,6 +1541,21 @@ func (c *cluster) restartAll() {
c.waitOnClusterReady()
}
func (c *cluster) lameDuckRestartAll() {
c.t.Helper()
for i, s := range c.servers {
s.lameDuckMode()
s.WaitForShutdown()
if !s.Running() {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
}
c.waitOnClusterReady()
}
func (c *cluster) restartAllSamePorts() {
c.t.Helper()
for i, s := range c.servers {

View File

@@ -674,7 +674,9 @@ func (n *raft) Propose(data []byte) error {
func (n *raft) ProposeDirect(entries []*Entry) error {
n.RLock()
if n.state != Leader {
group := n.group
n.RUnlock()
fmt.Printf("Direct proposal ignored, not leader (state: %v, group: %v)\n", n.state, group)
n.debug("Direct proposal ignored, not leader (state: %v)", n.state)
return errNotLeader
}