From 549b77ca2dabb37e03682e3320256845f712cac5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 6 Dec 2022 15:07:46 -0800 Subject: [PATCH] Ensure that ephemeral consumers that are deleted on startup properly are removed from the system. Signed-off-by: Derek Collison --- server/consumer.go | 8 +--- server/jetstream_cluster_3_test.go | 64 +++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 53769e7f..75d41877 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1353,7 +1353,6 @@ func (o *consumer) deleteNotActive() { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - var fs bool ticker := time.NewTicker(time.Second) defer ticker.Stop() for range ticker.C { @@ -1361,11 +1360,8 @@ func (o *consumer) deleteNotActive() { ca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() if ca != nil { - if fs { - s.Warnf("Consumer assignment not cleaned up, retrying") - meta.ForwardProposal(removeEntry) - } - fs = true + s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) + meta.ForwardProposal(removeEntry) } else { return } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 79633d5b..5e19cd5f 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1635,5 +1635,67 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) { if len(rg) != expected { t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg)) } - +} + +func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Add in 100 memory based ephemerals. + for i := 0; i < 100; i++ { + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Replicas: 1, + InactiveThreshold: time.Second, + MemoryStorage: true, + }) + require_NoError(t, err) + } + + // Grab random server. + rs := c.randomServer() + // Now shutdown cluster. + c.stopAll() + + // Let the consumers all expire. + time.Sleep(2 * time.Second) + + // Restart first and wait so that we know it will try cleanup without a metaleader. + c.restartServer(rs) + time.Sleep(time.Second) + + c.restartAll() + c.waitOnLeader() + c.waitOnStreamLeader(globalAccountName, "TEST") + + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + subj := fmt.Sprintf(JSApiConsumerListT, "TEST") + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + m, err := nc.Request(subj, nil, time.Second) + if err != nil { + return err + } + var resp JSApiConsumerListResponse + err = json.Unmarshal(m.Data, &resp) + require_NoError(t, err) + if len(resp.Consumers) != 0 { + return fmt.Errorf("Still have %d consumers", len(resp.Consumers)) + } + if len(resp.Missing) != 0 { + return fmt.Errorf("Still have %d missing consumers", len(resp.Missing)) + } + + return nil + }) }