mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #3707 from nats-io/zb-fix
[FIXED] Make sure we remove node from stream assignment.
This commit is contained in:
@@ -178,8 +178,8 @@ const (
|
||||
|
||||
// Returns information useful in mixed mode.
|
||||
func (s *Server) trackedJetStreamServers() (js, total int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if !s.running || !s.eventsEnabled() {
|
||||
return -1, -1
|
||||
}
|
||||
@@ -195,10 +195,10 @@ func (s *Server) trackedJetStreamServers() (js, total int) {
|
||||
}
|
||||
|
||||
func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
shutdown := s.shutdown
|
||||
js := s.js
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
|
||||
if shutdown || js == nil {
|
||||
return nil, nil
|
||||
@@ -2847,6 +2847,13 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment)
|
||||
// shut down monitor by shutting down raft
|
||||
node.Delete()
|
||||
}
|
||||
|
||||
// Make sure this node is no longer attached to our stream assignment.
|
||||
js, _ := s.getJetStreamCluster()
|
||||
js.mu.Lock()
|
||||
nsa.Group.node = nil
|
||||
js.mu.Unlock()
|
||||
|
||||
// wait for monitor to be shut down
|
||||
mset.monitorWg.Wait()
|
||||
mset.stop(true, false)
|
||||
|
||||
@@ -3,9 +3,10 @@ package server
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/nats-io/nats.go"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func snapRGSet(pFlag bool, banner string, osi *nats.StreamInfo) *map[string]struct{} {
|
||||
@@ -68,7 +69,7 @@ func TestJetStreamClusterAfterPeerRemoveZeroState(t *testing.T) {
|
||||
// Load up 10000
|
||||
toSend := 10000
|
||||
for i := 1; i <= toSend; i++ {
|
||||
msg := []byte(fmt.Sprintf("Hello World"))
|
||||
msg := []byte("Hello World")
|
||||
if _, err = jsc.Publish("foo.a", msg); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user