diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2cbfc8a9..c4fb1b93 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -956,6 +956,42 @@ type recoveryUpdates struct { updateConsumers map[string]*consumerAssignment } +// Called after recovery of the cluster on startup to check for any orphans. +// Streams and consumers are recovered from disk, and the meta layer's mappings +// should clean them up, but under crash scenarios there could be orphans. +func (js *jetStream) checkForOrphans() { + js.mu.Lock() + defer js.mu.Unlock() + + consumerName := func(o *consumer) string { + o.mu.RLock() + defer o.mu.RUnlock() + return o.name + } + + s, cc := js.srv, js.cluster + s.Debugf("JetStream cluster checking for orphans") + + for accName, jsa := range js.accounts { + asa := cc.streams[accName] + for stream, mset := range jsa.streams { + if sa := asa[stream]; sa == nil { + s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream) + mset.delete() + } else { + // This one is good, check consumers now. + for _, o := range mset.getConsumers() { + consumer := consumerName(o) + if sa.consumers[consumer] == nil { + s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + o.delete() + } + } + } + } + } +} + func (js *jetStream) monitorCluster() { s, n := js.server(), js.getMetaGroup() qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ() @@ -1001,7 +1037,7 @@ func (js *jetStream) monitorCluster() { if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable { + } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } @@ -1050,6 +1086,7 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") + js.checkForOrphans() continue } // FIXME(dlc) - Deal with errors. @@ -1070,9 +1107,9 @@ func (js *jetStream) monitorCluster() { if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) - // Optionally install a snapshot as we become leader. - doSnapshot() + // Install a snapshot as we become leader. js.checkClusterSize() + n.InstallSnapshot(js.metaSnapshot()) } case <-t.C: @@ -1274,7 +1311,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove // Do removals first. for _, sa := range saDel { js.setStreamAssignmentRecovering(sa) - if isRecovering { key := sa.recoveryKey() ru.removeStreams[key] = sa @@ -1896,7 +1932,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable { + } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } @@ -2987,6 +3023,18 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss mset, err := acc.lookupStream(cfg.Name) if err == nil && mset != nil { + // Make sure we have not had a new group assigned to us. + if osa.Group.Name != sa.Group.Name { + s.Warnf("JetStream cluster detected stream remapping for '%s > %s' from %q to %q", + acc, cfg.Name, osa.Group.Name, sa.Group.Name) + mset.removeNode() + alreadyRunning, needsNode = false, true + // Make sure to clear from original. + js.mu.Lock() + osa.Group.node = nil + js.mu.Unlock() + } + var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { @@ -4062,7 +4110,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable { + } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err) } } @@ -7607,7 +7655,6 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg - stype := mset.cfg.Storage isLeader := mset.isLeader() mset.mu.RUnlock() @@ -7617,9 +7664,9 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { return } - // If we are here we are in a compromised state due to server limits let someone else answer if they can. - if !isLeader && js.limitsExceeded(stype) { - time.Sleep(100 * time.Millisecond) + // If we are not the leader let someone else possible respond first. + if !isLeader { + time.Sleep(200 * time.Millisecond) } si := &StreamInfo{