mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge pull request #3689 from nats-io/ghost-ephemerals
Ensure that consumers that are deleted on startup are removed from the system.
This commit is contained in:
@@ -1353,7 +1353,6 @@ func (o *consumer) deleteNotActive() {
|
||||
// Check to make sure we went away.
|
||||
// Don't think this needs to be a monitored go routine.
|
||||
go func() {
|
||||
var fs bool
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
@@ -1361,11 +1360,8 @@ func (o *consumer) deleteNotActive() {
|
||||
ca := js.consumerAssignment(acc, stream, name)
|
||||
js.mu.RUnlock()
|
||||
if ca != nil {
|
||||
if fs {
|
||||
s.Warnf("Consumer assignment not cleaned up, retrying")
|
||||
meta.ForwardProposal(removeEntry)
|
||||
}
|
||||
fs = true
|
||||
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
|
||||
meta.ForwardProposal(removeEntry)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1635,5 +1635,67 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
|
||||
if len(rg) != expected {
|
||||
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Add in 100 memory based ephemerals.
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Replicas: 1,
|
||||
InactiveThreshold: time.Second,
|
||||
MemoryStorage: true,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
// Grab random server.
|
||||
rs := c.randomServer()
|
||||
// Now shutdown cluster.
|
||||
c.stopAll()
|
||||
|
||||
// Let the consumers all expire.
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Restart first and wait so that we know it will try cleanup without a metaleader.
|
||||
c.restartServer(rs)
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c.restartAll()
|
||||
c.waitOnLeader()
|
||||
c.waitOnStreamLeader(globalAccountName, "TEST")
|
||||
|
||||
nc, _ = jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
|
||||
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
|
||||
m, err := nc.Request(subj, nil, time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var resp JSApiConsumerListResponse
|
||||
err = json.Unmarshal(m.Data, &resp)
|
||||
require_NoError(t, err)
|
||||
if len(resp.Consumers) != 0 {
|
||||
return fmt.Errorf("Still have %d consumers", len(resp.Consumers))
|
||||
}
|
||||
if len(resp.Missing) != 0 {
|
||||
return fmt.Errorf("Still have %d missing consumers", len(resp.Missing))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user