diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e2e98757..e26de049 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -178,8 +178,8 @@ const ( // Returns information useful in mixed mode. func (s *Server) trackedJetStreamServers() (js, total int) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() if !s.running || !s.eventsEnabled() { return -1, -1 } @@ -195,10 +195,10 @@ func (s *Server) trackedJetStreamServers() (js, total int) { } func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { - s.mu.Lock() + s.mu.RLock() shutdown := s.shutdown js := s.js - s.mu.Unlock() + s.mu.RUnlock() if shutdown || js == nil { return nil, nil @@ -2847,6 +2847,13 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) // shut down monitor by shutting down raft node.Delete() } + + // Make sure this node is no longer attached to our stream assignment. + js, _ := s.getJetStreamCluster() + js.mu.Lock() + nsa.Group.node = nil + js.mu.Unlock() + // wait for monitor to be shut down mset.monitorWg.Wait() mset.stop(true, false) diff --git a/server/todd_test.go b/server/todd_test.go index e5debafd..b2ff3475 100644 --- a/server/todd_test.go +++ b/server/todd_test.go @@ -3,9 +3,10 @@ package server import ( "encoding/json" "fmt" - "github.com/nats-io/nats.go" "testing" "time" + + "github.com/nats-io/nats.go" ) func snapRGSet(pFlag bool, banner string, osi *nats.StreamInfo) *map[string]struct{} { @@ -68,7 +69,7 @@ func TestJetStreamClusterAfterPeerRemoveZeroState(t *testing.T) { // Load up 10000 toSend := 10000 for i := 1; i <= toSend; i++ { - msg := []byte(fmt.Sprintf("Hello World")) + msg := []byte("Hello World") if _, err = jsc.Publish("foo.a", msg); err != nil { t.Fatalf("Unexpected publish error: %v", err) }