diff --git a/server/consumer.go b/server/consumer.go index 3d9d8a9b..ce14e101 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1122,6 +1122,16 @@ func (o *consumer) isLeader() bool { return true } +func (o *consumer) clearLoopAndForward() { + o.mu.Lock() + defer o.mu.Unlock() + if o.qch != nil { + close(o.qch) + // Note can not close pch here. + o.qch, o.pch = nil, nil + } +} + func (o *consumer) setLeader(isLeader bool) { o.mu.RLock() mset := o.mset @@ -2003,9 +2013,13 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { forwardProposals := func() { o.mu.Lock() + node, pch = o.node, o.pch proposal := o.phead o.phead, o.ptail = nil, nil o.mu.Unlock() + if node == nil || pch == nil || node.State() != Leader { + return + } // 256k max for now per batch. const maxBatch = 256 * 1024 var entries []*Entry diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 311dd0ec..2d6f30b5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4264,6 +4264,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } else { // Check for scale down to 1.. if rg.node != nil && len(rg.Peers) == 1 { + // Need to pop loopAndForward by closing qch and nil out both qch and pch. + o.clearLoopAndForward() o.clearNode() o.setLeader(true) // Need to clear from rg too. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ed28ce5e..f619dc76 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5706,12 +5706,8 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - end := time.Now().Add(10 * time.Second) + end := time.Now().Add(2 * time.Second) for time.Now().Before(end) { - select { - case <-ctx.Done(): - default: - } producer.Publish("foo", []byte(strings.Repeat("A", 128))) time.Sleep(time.Millisecond) } @@ -5732,7 +5728,7 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { } msgs, err := sub.Fetch(1) - if err != nil && !errors.Is(err, nats.ErrTimeout) { + if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) { t.Logf("Pull Error: %v", err) } for _, msg := range msgs { @@ -5741,21 +5737,48 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { } }() } - c.lameDuckRestartAll() c.waitOnStreamLeader(globalAccountName, "TEST") - // Start publishing again for a while. - end = time.Now().Add(10 * time.Second) - for time.Now().Before(end) { - select { - case <-ctx.Done(): - default: - } - producer.Publish("foo", []byte(strings.Repeat("A", 128))) + // Swap the logger to try to detect the condition after the restart. + loggers := make([]*captureDebugLogger, 3) + for i, srv := range c.servers { + l := &captureDebugLogger{dbgCh: make(chan string, 10)} + loggers[i] = l + srv.SetLogger(l, true, false) } + condition := `Direct proposal ignored, not leader (state: CLOSED)` + errCh := make(chan error, 10) - fmt.Printf("SCALE DOWN TO R1\n") + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case dl := <-loggers[0].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[1].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[2].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case <-ctx.Done(): + return + } + } + }() + + // Start publishing again for a while. + end = time.Now().Add(2 * time.Second) + for time.Now().Before(end) { + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } // Try to do a stream edit back to R=1 after doing all the upgrade. info, _ := js.StreamInfo("TEST") @@ -5764,28 +5787,20 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { _, err = js.UpdateStream(&sconfig) require_NoError(t, err) - // Let running for some time. - time.Sleep(10 * time.Second) - - fmt.Printf("SCALE UP TO R3\n") + // Leave running for some time after the update. + time.Sleep(2 * time.Second) info, _ = js.StreamInfo("TEST") sconfig = info.Config sconfig.Replicas = 3 _, err = js.UpdateStream(&sconfig) require_NoError(t, err) - // Let running after the update... - time.Sleep(10 * time.Second) - // Start publishing again for a while. - end = time.Now().Add(30 * time.Second) - for time.Now().Before(end) { - select { - case <-ctx.Done(): - default: - } - producer.Publish("foo", []byte(strings.Repeat("A", 128))) - time.Sleep(time.Millisecond) + select { + case e := <-errCh: + t.Fatalf("Bad condition on raft node: %v", e) + case <-time.After(2 * time.Second): + // Done } // Stop goroutines and wait for them to exit. diff --git a/server/raft.go b/server/raft.go index 920f6f41..076b8e0d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -674,9 +674,7 @@ func (n *raft) Propose(data []byte) error { func (n *raft) ProposeDirect(entries []*Entry) error { n.RLock() if n.state != Leader { - group := n.group n.RUnlock() - fmt.Printf("Direct proposal ignored, not leader (state: %v, group: %v)\n", n.state, group) n.debug("Direct proposal ignored, not leader (state: %v)", n.state) return errNotLeader }