mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix up some processing during account purge to fix flapping tests
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
committed by
Ivan Kozlovic
parent
5a050fc10b
commit
a5119008a5
@@ -2533,36 +2533,25 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
|
||||
return
|
||||
}
|
||||
|
||||
rm := recoveryRemovals{
|
||||
streams: make(map[string]*streamAssignment),
|
||||
consumers: make(map[string]*consumerAssignment),
|
||||
}
|
||||
|
||||
js.mu.Lock()
|
||||
js.mu.RLock()
|
||||
ns, nc := 0, 0
|
||||
streams, hasAccount := cc.streams[accName]
|
||||
if hasAccount {
|
||||
for _, s := range streams {
|
||||
key := accName + ":" + s.Config.Name
|
||||
rm.streams[key] = s.copyGroup()
|
||||
for _, c := range s.consumers {
|
||||
key := accName + ":" + s.Config.Name + ":" + c.Config.Durable
|
||||
rm.consumers[key] = c.copyGroup()
|
||||
}
|
||||
for _, osa := range streams {
|
||||
for _, oca := range osa.consumers {
|
||||
oca.deleted = true
|
||||
ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client}
|
||||
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
|
||||
nc++
|
||||
}
|
||||
}
|
||||
js.mu.Unlock()
|
||||
|
||||
s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)",
|
||||
accName, len(rm.streams), len(rm.consumers), hasAccount)
|
||||
|
||||
for _, ca := range rm.consumers {
|
||||
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
|
||||
}
|
||||
for _, sa := range rm.streams {
|
||||
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client}
|
||||
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
|
||||
ns++
|
||||
}
|
||||
resp.Initiated = true
|
||||
js.mu.RUnlock()
|
||||
|
||||
s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
|
||||
|
||||
resp.Initiated = true
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user