Send shutdown event on LDM so that R1 assets do not get assigned to the LDM node

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-08-18 11:13:14 +01:00
parent 0712628098
commit 7cc5838a6d
3 changed files with 98 additions and 0 deletions

View File

@@ -533,6 +533,19 @@ RESET:
}
}
// Will send a shutdown message for lame-duck. Unlike sendShutdownEvent, this will
// not close off the send queue or reply handler, as we may still have a workload
// that needs migrating off.
// Lock should be held.
func (s *Server) sendLDMShutdownEventLocked() {
if s.sys == nil || s.sys.sendq == nil {
return
}
subj := fmt.Sprintf(shutdownEventSubj, s.info.ID)
si := &ServerInfo{}
s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true))
}
// Will send a shutdown message.
func (s *Server) sendShutdownEvent() {
s.mu.Lock()

View File

@@ -3379,6 +3379,90 @@ func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) {
}
}
func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Grab the first server and set lameduck option directly.
s := c.servers[0]
s.optsMu.Lock()
s.opts.LameDuckDuration = 5 * time.Second
s.opts.LameDuckGracePeriod = -5 * time.Second
s.optsMu.Unlock()
// Connect to the server to keep it alive when we go into LDM.
dummy, _ := jsClientConnect(t, s)
defer dummy.Close()
// Connect to the third server.
nc, js := jsClientConnect(t, c.servers[2])
defer nc.Close()
// Now put the first server into lame duck mode.
go s.lameDuckMode()
// Wait for news to arrive that the first server has gone into
// lame duck mode and been marked offline.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
id := s.info.ID
s := c.servers[2]
s.mu.RLock()
defer s.mu.RUnlock()
var isOffline bool
s.nodeToInfo.Range(func(_, v any) bool {
ni := v.(nodeInfo)
if ni.id == id {
isOffline = ni.offline
return false
}
return true
})
if !isOffline {
return fmt.Errorf("first node is still online unexpectedly")
}
return nil
})
// Create a go routine that will create streams constantly.
qch := make(chan bool)
go func() {
var index int
for {
select {
case <-time.After(time.Millisecond * 25):
index++
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("NEW_TEST_%d", index),
Subjects: []string{fmt.Sprintf("bar.%d", index)},
Replicas: 1,
})
if err != nil {
return
}
case <-qch:
return
}
}
}()
defer close(qch)
// Make sure we do not have any R1 assets placed on the lameduck server.
for s.isRunning() {
s.rnMu.RLock()
if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil {
s.rnMu.RUnlock()
break
}
hasAsset := len(s.js.srv.gacc.streams()) > 0
s.rnMu.RUnlock()
if hasAsset {
t.Fatalf("Server had an R1 asset when it should not due to lameduck mode")
}
}
}
// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and

View File

@@ -3557,6 +3557,7 @@ func (s *Server) lameDuckMode() {
}
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
s.sendLDMShutdownEventLocked()
expected := 1
s.listener.Close()
s.listener = nil