From a5119008a5d5c63c789c70b9b227616fccbac0ea Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 6 Aug 2022 13:45:06 -0700 Subject: [PATCH] Fix up some processing during account purge to fix flapping tests Signed-off-by: Derek Collison --- server/jetstream_api.go | 39 ++++++++++++++------------------------- 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 80ae12a5..b583ead8 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2533,36 +2533,25 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac return } - rm := recoveryRemovals{ - streams: make(map[string]*streamAssignment), - consumers: make(map[string]*consumerAssignment), - } - - js.mu.Lock() + js.mu.RLock() + ns, nc := 0, 0 streams, hasAccount := cc.streams[accName] - if hasAccount { - for _, s := range streams { - key := accName + ":" + s.Config.Name - rm.streams[key] = s.copyGroup() - for _, c := range s.consumers { - key := accName + ":" + s.Config.Name + ":" + c.Config.Durable - rm.consumers[key] = c.copyGroup() - } + for _, osa := range streams { + for _, oca := range osa.consumers { + oca.deleted = true + ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client} + cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) + nc++ } - } - js.mu.Unlock() - - s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", - accName, len(rm.streams), len(rm.consumers), hasAccount) - - for _, ca := range rm.consumers { - cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) - } - for _, sa := range rm.streams { + sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client} cc.meta.Propose(encodeDeleteStreamAssignment(sa)) + ns++ } - resp.Initiated = true + js.mu.RUnlock() + s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount) + + resp.Initiated = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) }