General improvements to the JetStream clustering layer during meta corruption.

We now check for orphaned streams or consumers in clustered mode after our metastate has recovered.
Do not warn on failures for installing raft snapshots if this is due to the node being closed.
During a stream update make sure to check to see if our group assignment has changed out from underneath of us.
Stream info should always delay if we are not the leader. Could cause duplicate responses when it should not.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-03-01 22:02:08 -08:00
parent c586014477
commit f358bf2687

View File

@@ -956,6 +956,42 @@ type recoveryUpdates struct {
updateConsumers map[string]*consumerAssignment
}
// Called after recovery of the cluster on startup to check for any orphans.
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
js.mu.Lock()
defer js.mu.Unlock()
consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}
s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")
for accName, jsa := range js.accounts {
asa := cc.streams[accName]
for stream, mset := range jsa.streams {
if sa := asa[stream]; sa == nil {
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
mset.delete()
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
o.delete()
}
}
}
}
}
}
func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
@@ -1001,7 +1037,7 @@ func (js *jetStream) monitorCluster() {
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
@@ -1050,6 +1086,7 @@ func (js *jetStream) monitorCluster() {
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
continue
}
// FIXME(dlc) - Deal with errors.
@@ -1070,9 +1107,9 @@ func (js *jetStream) monitorCluster() {
if isLeader {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
// Optionally install a snapshot as we become leader.
doSnapshot()
// Install a snapshot as we become leader.
js.checkClusterSize()
n.InstallSnapshot(js.metaSnapshot())
}
case <-t.C:
@@ -1274,7 +1311,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
// Do removals first.
for _, sa := range saDel {
js.setStreamAssignmentRecovering(sa)
if isRecovering {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
@@ -1896,7 +1932,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
@@ -2987,6 +3023,18 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
mset, err := acc.lookupStream(cfg.Name)
if err == nil && mset != nil {
// Make sure we have not had a new group assigned to us.
if osa.Group.Name != sa.Group.Name {
s.Warnf("JetStream cluster detected stream remapping for '%s > %s' from %q to %q",
acc, cfg.Name, osa.Group.Name, sa.Group.Name)
mset.removeNode()
alreadyRunning, needsNode = false, true
// Make sure to clear from original.
js.mu.Lock()
osa.Group.node = nil
js.mu.Unlock()
}
var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
@@ -4062,7 +4110,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable {
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}
@@ -7607,7 +7655,6 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _
func (mset *stream) processClusterStreamInfoRequest(reply string) {
mset.mu.RLock()
sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg
stype := mset.cfg.Storage
isLeader := mset.isLeader()
mset.mu.RUnlock()
@@ -7617,9 +7664,9 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
return
}
// If we are here we are in a compromised state due to server limits let someone else answer if they can.
if !isLeader && js.limitsExceeded(stype) {
time.Sleep(100 * time.Millisecond)
// If we are not the leader let someone else possible respond first.
if !isLeader {
time.Sleep(200 * time.Millisecond)
}
si := &StreamInfo{