mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -960,36 +960,62 @@ type recoveryUpdates struct {
|
||||
// Streams and consumers are recovered from disk, and the meta layer's mappings
|
||||
// should clean them up, but under crash scenarios there could be orphans.
|
||||
func (js *jetStream) checkForOrphans() {
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
|
||||
consumerName := func(o *consumer) string {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
return o.name
|
||||
}
|
||||
|
||||
// Can not hold jetstream lock while trying to delete streams or consumers.
|
||||
js.mu.Lock()
|
||||
s, cc := js.srv, js.cluster
|
||||
s.Debugf("JetStream cluster checking for orphans")
|
||||
|
||||
var streams []*stream
|
||||
var consumers []*consumer
|
||||
|
||||
for accName, jsa := range js.accounts {
|
||||
asa := cc.streams[accName]
|
||||
for stream, mset := range jsa.streams {
|
||||
if sa := asa[stream]; sa == nil {
|
||||
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
|
||||
mset.delete()
|
||||
streams = append(streams, mset)
|
||||
} else {
|
||||
// This one is good, check consumers now.
|
||||
for _, o := range mset.getConsumers() {
|
||||
consumer := consumerName(o)
|
||||
if sa.consumers[consumer] == nil {
|
||||
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
|
||||
o.delete()
|
||||
consumers = append(consumers, o)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
js.mu.Unlock()
|
||||
|
||||
for _, mset := range streams {
|
||||
mset.mu.RLock()
|
||||
accName, stream := mset.acc.Name, mset.cfg.Name
|
||||
mset.mu.RUnlock()
|
||||
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
|
||||
if err := mset.delete(); err != nil {
|
||||
s.Warnf("Deleting stream encountered an error: %v", err)
|
||||
}
|
||||
}
|
||||
for _, o := range consumers {
|
||||
o.mu.RLock()
|
||||
accName, mset, consumer := o.acc.Name, o.mset, o.name
|
||||
o.mu.RUnlock()
|
||||
stream := "N/A"
|
||||
if mset != nil {
|
||||
mset.mu.RLock()
|
||||
stream = mset.cfg.Name
|
||||
mset.mu.RUnlock()
|
||||
}
|
||||
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
|
||||
if err := o.delete(); err != nil {
|
||||
s.Warnf("Deleting consumer encountered an error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (js *jetStream) monitorCluster() {
|
||||
|
||||
Reference in New Issue
Block a user