mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix for a bug that could leave a raft node running when stopping a stream.
This can happen when we reset a stream internally and the stream had a prior snapshot. Also make sure to always release resources back to the account regardless if the store is no longer present. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3849,3 +3849,37 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Make sure that stopping a stream shutdowns down it's raft node.
|
||||
func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "NATS", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sendStreamMsg(t, nc, "foo", "HELLO")
|
||||
}
|
||||
|
||||
s := c.randomServer()
|
||||
numNodesStart := s.numRaftNodes()
|
||||
mset, err := s.GlobalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
node := mset.raftNode()
|
||||
require_NotNil(t, node)
|
||||
node.InstallSnapshot(mset.stateSnapshot())
|
||||
// Stop the stream
|
||||
mset.stop(false, false)
|
||||
|
||||
if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 {
|
||||
t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4548,9 +4548,11 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
|
||||
if deleteFlag {
|
||||
n.Delete()
|
||||
sa = mset.sa
|
||||
} else if n.NeedSnapshot() {
|
||||
// Attempt snapshot on clean exit.
|
||||
n.InstallSnapshot(mset.stateSnapshotLocked())
|
||||
} else {
|
||||
if n.NeedSnapshot() {
|
||||
// Attempt snapshot on clean exit.
|
||||
n.InstallSnapshot(mset.stateSnapshotLocked())
|
||||
}
|
||||
n.Stop()
|
||||
}
|
||||
}
|
||||
@@ -4642,23 +4644,23 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
|
||||
sysc.closeConnection(ClientClosed)
|
||||
}
|
||||
|
||||
if store == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if deleteFlag {
|
||||
if err := store.Delete(); err != nil {
|
||||
return err
|
||||
if store != nil {
|
||||
if err := store.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Release any resources.
|
||||
js.releaseStreamResources(&mset.cfg)
|
||||
|
||||
// cleanup directories after the stream
|
||||
accDir := filepath.Join(js.config.StoreDir, accName)
|
||||
// no op if not empty
|
||||
os.Remove(filepath.Join(accDir, streamsDir))
|
||||
os.Remove(accDir)
|
||||
} else if err := store.Stop(); err != nil {
|
||||
return err
|
||||
} else if store != nil {
|
||||
if err := store.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user