diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 6700e678..db90a57a 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3849,3 +3849,37 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) { return nil }) } + +// Make sure that stopping a stream shutdowns down it's raft node. +func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + sendStreamMsg(t, nc, "foo", "HELLO") + } + + s := c.randomServer() + numNodesStart := s.numRaftNodes() + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + node := mset.raftNode() + require_NotNil(t, node) + node.InstallSnapshot(mset.stateSnapshot()) + // Stop the stream + mset.stop(false, false) + + if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 { + t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes) + } +} diff --git a/server/stream.go b/server/stream.go index 9d3e2000..46981340 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4548,9 +4548,11 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if deleteFlag { n.Delete() sa = mset.sa - } else if n.NeedSnapshot() { - // Attempt snapshot on clean exit. - n.InstallSnapshot(mset.stateSnapshotLocked()) + } else { + if n.NeedSnapshot() { + // Attempt snapshot on clean exit. + n.InstallSnapshot(mset.stateSnapshotLocked()) + } n.Stop() } } @@ -4642,23 +4644,23 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { sysc.closeConnection(ClientClosed) } - if store == nil { - return nil - } - if deleteFlag { - if err := store.Delete(); err != nil { - return err + if store != nil { + if err := store.Delete(); err != nil { + return err + } } + // Release any resources. js.releaseStreamResources(&mset.cfg) - // cleanup directories after the stream accDir := filepath.Join(js.config.StoreDir, accName) // no op if not empty os.Remove(filepath.Join(accDir, streamsDir)) os.Remove(accDir) - } else if err := store.Stop(); err != nil { - return err + } else if store != nil { + if err := store.Stop(); err != nil { + return err + } } return nil