Ephemeral cleanup across GWs

Watch for interest loss across GWs so ephemeral consumers are removed
when there is no longer local and GW interest.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-03-13 14:11:34 -07:00
parent ef8838126e
commit 9e858ff81f
2 changed files with 149 additions and 2 deletions

View File

@@ -211,6 +211,7 @@ type consumer struct {
replay bool
filterWC bool
dtmr *time.Timer
gwdtmr *time.Timer
dthresh time.Duration
mch chan struct{}
qch chan struct{}
@@ -708,6 +709,12 @@ func (o *consumer) setLeader(isLeader bool) {
if o.active = <-o.inch; !o.active {
// Check gateways in case they are enabled.
o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject)
if o.active {
// There is no local interest, but there is GW interest, we
// will watch for interest disappearing.
// TODO: may need to revisit...
o.gwdtmr = time.AfterFunc(o.dthresh, func() { o.watchGWinterest() })
}
}
}
@@ -888,7 +895,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool {
}
// This processes an update to the local interest for a deliver subject.
func (o *consumer) updateDeliveryInterest(localInterest bool) {
func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
interest := o.hasDeliveryInterest(localInterest)
o.mu.Lock()
@@ -896,7 +903,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) {
mset := o.mset
if mset == nil || o.isPullMode() {
return
return false
}
if interest && !o.active {
@@ -911,7 +918,9 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) {
// a timer to delete us. We wait for a bit in case of server reconnect.
if !o.isDurable() && !interest {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
return true
}
return false
}
func (o *consumer) deleteNotActive() {
@@ -972,6 +981,25 @@ func (o *consumer) deleteNotActive() {
o.delete()
}
func (o *consumer) watchGWinterest() {
var delete bool
// If there is no local interest...
if o.hasNoLocalInterest() {
// then call this which will check for GW interest and if none,
// will start the delete timer. This will return if the delete
// timer was set.
delete = o.updateDeliveryInterest(false)
}
o.mu.Lock()
// Now either clear the gwdtmr or reset for next try.
if delete {
stopAndClearTimer(&o.gwdtmr)
} else if o.gwdtmr != nil {
o.gwdtmr.Reset(o.dthresh)
}
o.mu.Unlock()
}
// Config returns the consumer's configuration.
func (o *consumer) config() ConsumerConfig {
o.mu.Lock()
@@ -2588,6 +2616,7 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error {
o.sysc = nil
stopAndClearTimer(&o.ptmr)
stopAndClearTimer(&o.dtmr)
stopAndClearTimer(&o.gwdtmr)
delivery := o.cfg.DeliverSubject
o.waiting = nil
// Break us out of the readLoop.

View File

@@ -4598,6 +4598,124 @@ func TestJetStreamClusterSuperClusterInterestOnlyMode(t *testing.T) {
checkMode("two", InterestOnly)
}
func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
// Create a stream in cluster 0
s := sc.clusters[0].randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for _, test := range []struct {
name string
sourceInCluster int
streamName string
sourceName string
}{
{"local", 0, "TEST1", "S1"},
{"remote", 1, "TEST2", "S2"},
} {
t.Run(test.name, func(t *testing.T) {
if _, err := js.AddStream(&nats.StreamConfig{Name: test.streamName, Replicas: 3}); err != nil {
t.Fatalf("Error adding %q stream: %v", test.streamName, err)
}
if _, err := js.Publish(test.streamName, []byte("hello")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
// Now create a source for that stream, either in same or remote cluster.
s2 := sc.clusters[test.sourceInCluster].randomServer()
nc2, js2 := jsClientConnect(t, s2)
defer nc2.Close()
if _, err := js2.AddStream(&nats.StreamConfig{
Name: test.sourceName,
Storage: nats.FileStorage,
Sources: []*nats.StreamSource{&nats.StreamSource{Name: test.streamName}},
Replicas: 1,
}); err != nil {
t.Fatalf("Error adding source stream: %v", err)
}
// Check that TEST(n) has 1 consumer and that S(n) is created and has 1 message.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo(test.streamName)
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Consumers != 1 {
return fmt.Errorf("Expected %q stream to have 1 consumer, got %v", test.streamName, si.State.Consumers)
}
si, err = js2.StreamInfo(test.sourceName)
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg, got state: %+v", si.State)
}
return nil
})
// Get the consumer because we will want to artificially reduce
// the delete threshold.
leader := sc.clusters[0].streamLeader("$G", test.streamName)
mset, err := leader.GlobalAccount().lookupStream(test.streamName)
if err != nil {
t.Fatalf("Expected to find a stream for %q, got %v", test.streamName, err)
}
cons := mset.getConsumers()[0]
cons.mu.Lock()
cons.dthresh = 10 * time.Millisecond
active := cons.active
dtimerSet := cons.dtmr != nil
gwtimerSet := cons.gwdtmr != nil
if gwtimerSet {
cons.gwdtmr.Reset(cons.dthresh)
}
deliver := cons.cfg.DeliverSubject
cons.mu.Unlock()
if !active || dtimerSet {
t.Fatalf("Invalid values for active=%v dtimerSet=%v", active, dtimerSet)
}
if gwtimerSet && test.sourceInCluster == 0 || !gwtimerSet && test.sourceInCluster == 1 {
t.Fatalf("test=%q timerSet=%v", test.name, gwtimerSet)
}
// To add to the mix, let's create a local interest on the delivery subject
// and stop it. This is to ensure that this does not stop timers that should
// still be running and monitor the GW interest.
sub := natsSubSync(t, nc, deliver)
natsFlush(t, nc)
natsUnsub(t, sub)
natsFlush(t, nc)
// Now remove the "S(n)" stream...
if err := js2.DeleteStream(test.sourceName); err != nil {
t.Fatalf("Error deleting stream: %v", err)
}
// Now check that the stream S(n) is really removed and that
// the consumer is gone for stream TEST(n).
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
// First, make sure that stream S(n) has disappeared.
if _, err := js2.StreamInfo(test.sourceName); err == nil {
return fmt.Errorf("Stream %q should no longer exist", test.sourceName)
}
si, err := js.StreamInfo(test.streamName)
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Consumers != 0 {
return fmt.Errorf("Expected %q stream to have 0 consumer, got %v", test.streamName, si.State.Consumers)
}
return nil
})
})
}
}
// Support functions
// Used to setup superclusters for tests.