diff --git a/server/consumer.go b/server/consumer.go index 57b0a862..e4cbf165 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -211,6 +211,7 @@ type consumer struct { replay bool filterWC bool dtmr *time.Timer + gwdtmr *time.Timer dthresh time.Duration mch chan struct{} qch chan struct{} @@ -708,6 +709,12 @@ func (o *consumer) setLeader(isLeader bool) { if o.active = <-o.inch; !o.active { // Check gateways in case they are enabled. o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject) + if o.active { + // There is no local interest, but there is GW interest, we + // will watch for interest disappearing. + // TODO: may need to revisit... + o.gwdtmr = time.AfterFunc(o.dthresh, func() { o.watchGWinterest() }) + } } } @@ -888,7 +895,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool { } // This processes an update to the local interest for a deliver subject. -func (o *consumer) updateDeliveryInterest(localInterest bool) { +func (o *consumer) updateDeliveryInterest(localInterest bool) bool { interest := o.hasDeliveryInterest(localInterest) o.mu.Lock() @@ -896,7 +903,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) { mset := o.mset if mset == nil || o.isPullMode() { - return + return false } if interest && !o.active { @@ -911,7 +918,9 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) { // a timer to delete us. We wait for a bit in case of server reconnect. if !o.isDurable() && !interest { o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + return true } + return false } func (o *consumer) deleteNotActive() { @@ -972,6 +981,25 @@ func (o *consumer) deleteNotActive() { o.delete() } +func (o *consumer) watchGWinterest() { + var delete bool + // If there is no local interest... + if o.hasNoLocalInterest() { + // then call this which will check for GW interest and if none, + // will start the delete timer. This will return if the delete + // timer was set. + delete = o.updateDeliveryInterest(false) + } + o.mu.Lock() + // Now either clear the gwdtmr or reset for next try. + if delete { + stopAndClearTimer(&o.gwdtmr) + } else if o.gwdtmr != nil { + o.gwdtmr.Reset(o.dthresh) + } + o.mu.Unlock() +} + // Config returns the consumer's configuration. func (o *consumer) config() ConsumerConfig { o.mu.Lock() @@ -2588,6 +2616,7 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error { o.sysc = nil stopAndClearTimer(&o.ptmr) stopAndClearTimer(&o.dtmr) + stopAndClearTimer(&o.gwdtmr) delivery := o.cfg.DeliverSubject o.waiting = nil // Break us out of the readLoop. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 34c29e4b..c6df034e 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4598,6 +4598,124 @@ func TestJetStreamClusterSuperClusterInterestOnlyMode(t *testing.T) { checkMode("two", InterestOnly) } +func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { + sc := createJetStreamSuperCluster(t, 3, 2) + defer sc.shutdown() + + // Create a stream in cluster 0 + s := sc.clusters[0].randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + for _, test := range []struct { + name string + sourceInCluster int + streamName string + sourceName string + }{ + {"local", 0, "TEST1", "S1"}, + {"remote", 1, "TEST2", "S2"}, + } { + t.Run(test.name, func(t *testing.T) { + if _, err := js.AddStream(&nats.StreamConfig{Name: test.streamName, Replicas: 3}); err != nil { + t.Fatalf("Error adding %q stream: %v", test.streamName, err) + } + if _, err := js.Publish(test.streamName, []byte("hello")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + + // Now create a source for that stream, either in same or remote cluster. + s2 := sc.clusters[test.sourceInCluster].randomServer() + nc2, js2 := jsClientConnect(t, s2) + defer nc2.Close() + + if _, err := js2.AddStream(&nats.StreamConfig{ + Name: test.sourceName, + Storage: nats.FileStorage, + Sources: []*nats.StreamSource{&nats.StreamSource{Name: test.streamName}}, + Replicas: 1, + }); err != nil { + t.Fatalf("Error adding source stream: %v", err) + } + + // Check that TEST(n) has 1 consumer and that S(n) is created and has 1 message. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo(test.streamName) + if err != nil { + return fmt.Errorf("Could not get stream info: %v", err) + } + if si.State.Consumers != 1 { + return fmt.Errorf("Expected %q stream to have 1 consumer, got %v", test.streamName, si.State.Consumers) + } + si, err = js2.StreamInfo(test.sourceName) + if err != nil { + return fmt.Errorf("Could not get stream info: %v", err) + } + if si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg, got state: %+v", si.State) + } + return nil + }) + + // Get the consumer because we will want to artificially reduce + // the delete threshold. + leader := sc.clusters[0].streamLeader("$G", test.streamName) + mset, err := leader.GlobalAccount().lookupStream(test.streamName) + if err != nil { + t.Fatalf("Expected to find a stream for %q, got %v", test.streamName, err) + } + cons := mset.getConsumers()[0] + cons.mu.Lock() + cons.dthresh = 10 * time.Millisecond + active := cons.active + dtimerSet := cons.dtmr != nil + gwtimerSet := cons.gwdtmr != nil + if gwtimerSet { + cons.gwdtmr.Reset(cons.dthresh) + } + deliver := cons.cfg.DeliverSubject + cons.mu.Unlock() + + if !active || dtimerSet { + t.Fatalf("Invalid values for active=%v dtimerSet=%v", active, dtimerSet) + } + if gwtimerSet && test.sourceInCluster == 0 || !gwtimerSet && test.sourceInCluster == 1 { + t.Fatalf("test=%q timerSet=%v", test.name, gwtimerSet) + } + + // To add to the mix, let's create a local interest on the delivery subject + // and stop it. This is to ensure that this does not stop timers that should + // still be running and monitor the GW interest. + sub := natsSubSync(t, nc, deliver) + natsFlush(t, nc) + natsUnsub(t, sub) + natsFlush(t, nc) + + // Now remove the "S(n)" stream... + if err := js2.DeleteStream(test.sourceName); err != nil { + t.Fatalf("Error deleting stream: %v", err) + } + + // Now check that the stream S(n) is really removed and that + // the consumer is gone for stream TEST(n). + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + // First, make sure that stream S(n) has disappeared. + if _, err := js2.StreamInfo(test.sourceName); err == nil { + return fmt.Errorf("Stream %q should no longer exist", test.sourceName) + } + si, err := js.StreamInfo(test.streamName) + if err != nil { + return fmt.Errorf("Could not get stream info: %v", err) + } + if si.State.Consumers != 0 { + return fmt.Errorf("Expected %q stream to have 0 consumer, got %v", test.streamName, si.State.Consumers) + } + return nil + }) + }) + } +} + // Support functions // Used to setup superclusters for tests.