When scaling a consumer down make sure to pop the loopAndForwardProposals go routine

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-18 11:00:37 -07:00
committed by Waldemar Quevedo
parent 27245891f2
commit 850c89e175
4 changed files with 62 additions and 33 deletions

View File

@@ -5706,12 +5706,8 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
end := time.Now().Add(10 * time.Second)
end := time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
select {
case <-ctx.Done():
default:
}
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}
@@ -5732,7 +5728,7 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
}
msgs, err := sub.Fetch(1)
if err != nil && !errors.Is(err, nats.ErrTimeout) {
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) {
t.Logf("Pull Error: %v", err)
}
for _, msg := range msgs {
@@ -5741,21 +5737,48 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
}
}()
}
c.lameDuckRestartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")
// Start publishing again for a while.
end = time.Now().Add(10 * time.Second)
for time.Now().Before(end) {
select {
case <-ctx.Done():
default:
}
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
// Swap the logger to try to detect the condition after the restart.
loggers := make([]*captureDebugLogger, 3)
for i, srv := range c.servers {
l := &captureDebugLogger{dbgCh: make(chan string, 10)}
loggers[i] = l
srv.SetLogger(l, true, false)
}
condition := `Direct proposal ignored, not leader (state: CLOSED)`
errCh := make(chan error, 10)
fmt.Printf("SCALE DOWN TO R1\n")
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case dl := <-loggers[0].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[1].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[2].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case <-ctx.Done():
return
}
}
}()
// Start publishing again for a while.
end = time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}
// Try to do a stream edit back to R=1 after doing all the upgrade.
info, _ := js.StreamInfo("TEST")
@@ -5764,28 +5787,20 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)
// Let running for some time.
time.Sleep(10 * time.Second)
fmt.Printf("SCALE UP TO R3\n")
// Leave running for some time after the update.
time.Sleep(2 * time.Second)
info, _ = js.StreamInfo("TEST")
sconfig = info.Config
sconfig.Replicas = 3
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)
// Let running after the update...
time.Sleep(10 * time.Second)
// Start publishing again for a while.
end = time.Now().Add(30 * time.Second)
for time.Now().Before(end) {
select {
case <-ctx.Done():
default:
}
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
select {
case e := <-errCh:
t.Fatalf("Bad condition on raft node: %v", e)
case <-time.After(2 * time.Second):
// Done
}
// Stop goroutines and wait for them to exit.