Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2022-12-13 10:04:18 -08:00
2 changed files with 175 additions and 4 deletions

View File

@@ -178,8 +178,8 @@ const (
// Returns information useful in mixed mode.
func (s *Server) trackedJetStreamServers() (js, total int) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running || !s.eventsEnabled() {
return -1, -1
}
@@ -195,10 +195,10 @@ func (s *Server) trackedJetStreamServers() (js, total int) {
}
func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
s.mu.Lock()
s.mu.RLock()
shutdown := s.shutdown
js := s.js
s.mu.Unlock()
s.mu.RUnlock()
if shutdown || js == nil {
return nil, nil
@@ -2847,6 +2847,13 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment)
// shut down monitor by shutting down raft
node.Delete()
}
// Make sure this node is no longer attached to our stream assignment.
js, _ := s.getJetStreamCluster()
js.mu.Lock()
nsa.Group.node = nil
js.mu.Unlock()
// wait for monitor to be shut down
mset.monitorWg.Wait()
mset.stop(true, false)

View File

@@ -2134,3 +2134,167 @@ func TestJetStreamClusterLeafnodeDuplicateConsumerMessages(t *testing.T) {
require_True(t, string(msgs[0].Data) == "M-5")
require_True(t, string(msgs[1].Data) == "M-6")
}
func snapRGSet(pFlag bool, banner string, osi *nats.StreamInfo) *map[string]struct{} {
var snapSet = make(map[string]struct{})
if pFlag {
fmt.Println(banner)
}
if osi == nil {
if pFlag {
fmt.Printf("bonkers!\n")
}
return nil
}
snapSet[osi.Cluster.Leader] = struct{}{}
if pFlag {
fmt.Printf("Leader: %s\n", osi.Cluster.Leader)
}
for _, replica := range osi.Cluster.Replicas {
snapSet[replica.Name] = struct{}{}
if pFlag {
fmt.Printf("Replica: %s\n", replica.Name)
}
}
return &snapSet
}
func TestJetStreamClusterAfterPeerRemoveZeroState(t *testing.T) {
// R3 scenario (w/messages) in a 4-node cluster. Peer remove from RG and add back to same RG later.
// Validate that original peer brought no memory or issues from its previous RG tour of duty, specifically
// that the restored peer has the correct filestore usage bytes for the asset.
var err error
sc := createJetStreamClusterExplicit(t, "cl4", 4)
defer sc.shutdown()
sc.waitOnClusterReadyWithNumPeers(4)
s := sc.leader()
nc, jsc := jsClientConnect(t, s)
defer nc.Close()
_, err = jsc.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "foo")
osi, err := jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure 0 msgs
require_True(t, osi.State.Msgs == 0)
// load up messages
toSend := 10000
// storage bytes with JS message overhead
assetStoreBytesExpected := uint64(460000)
for i := 1; i <= toSend; i++ {
msg := []byte("Hello World")
if _, err = jsc.Publish("foo.a", msg); err != nil {
t.Fatalf("unexpected publish error: %v", err)
}
}
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure 10000 msgs
require_True(t, osi.State.Msgs == uint64(toSend))
origSet := *snapRGSet(false, "== Orig RG Set ==", osi)
// remove 1 peer replica (1 of 2 non-leaders)
origPeer := osi.Cluster.Replicas[0].Name
resp, err := nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "foo"), []byte(`{"peer":"`+origPeer+`"}`), time.Second)
require_NoError(t, err)
var rpResp JSApiStreamRemovePeerResponse
err = json.Unmarshal(resp.Data, &rpResp)
require_NoError(t, err)
require_True(t, rpResp.Success)
// validate the origPeer is removed with a replacement newPeer
sc.waitOnStreamLeader(globalAccountName, "foo")
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 2 {
return fmt.Errorf("expected R3, got R%d", len(osi.Cluster.Replicas)+1)
}
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred
for _, replica := range osi.Cluster.Replicas {
if replica.Name == origPeer {
return fmt.Errorf("expected replaced replica, old replica still present")
}
}
return nil
})
// identify the new peer
var newPeer string
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
newSet := *snapRGSet(false, "== New RG Set ==", osi)
for peer := range newSet {
_, ok := origSet[peer]
if !ok {
newPeer = peer
break
}
}
require_True(t, newPeer != "")
// kick out newPeer which will cause origPeer to be assigned to the RG again
resp, err = nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "foo"), []byte(`{"peer":"`+newPeer+`"}`), time.Second)
require_NoError(t, err)
err = json.Unmarshal(resp.Data, &rpResp)
require_NoError(t, err)
require_True(t, rpResp.Success)
// validate the newPeer is removed and R3 has reformed (with origPeer)
sc.waitOnStreamLeader(globalAccountName, "foo")
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 2 {
return fmt.Errorf("expected R3, got R%d", len(osi.Cluster.Replicas)+1)
}
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred
for _, replica := range osi.Cluster.Replicas {
if replica.Name == newPeer {
return fmt.Errorf("expected replaced replica, old replica still present")
}
}
return nil
})
osi, err = jsc.StreamInfo("foo")
require_NoError(t, err)
// make sure all msgs reported in stream at this point with original leader
require_True(t, osi.State.Msgs == uint64(toSend))
snapRGSet(false, "== RG Set w/origPeer Back ==", osi)
// get a handle to original peer server
var origServer *Server = sc.serverByName(origPeer)
if origServer == nil {
t.Fatalf("expected to get a handle to original peer server by name")
}
checkFor(t, time.Second, 200*time.Millisecond, func() error {
jszResult, err := origServer.Jsz(nil)
require_NoError(t, err)
if jszResult.Store != assetStoreBytesExpected {
return fmt.Errorf("expected %d storage on orig peer, got %d", assetStoreBytesExpected, jszResult.Store)
}
return nil
})
}