diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index c622e2ee..25ce0211 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1050,3 +1050,117 @@ func TestJetStreamClusterSignalPullConsumersOnDelete(t *testing.T) { t.Fatalf("Took to long to bail out on stream delete") } } + +// https://github.com/nats-io/nats-server/issues/3559 +func TestJetStreamClusterSourceWithOptStartTime(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + test := func(t *testing.T, c *cluster, s *Server) { + + replicas := 1 + if c != nil { + s = c.randomServer() + replicas = 3 + } + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + }) + require_NoError(t, err) + + yesterday := time.Now().Add(-24 * time.Hour) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "SOURCE", + Replicas: replicas, + Sources: []*nats.StreamSource{&nats.StreamSource{ + Name: "TEST", + OptStartTime: &yesterday, + }}, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "MIRROR", + Replicas: replicas, + Mirror: &nats.StreamSource{ + Name: "TEST", + OptStartTime: &yesterday, + }, + }) + require_NoError(t, err) + + total := 10 + for i := 0; i < total; i++ { + sendStreamMsg(t, nc, "foo", "hello") + } + + checkCount := func(sname string, expected int) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + si, err := js.StreamInfo(sname) + if err != nil { + return err + } + if n := si.State.Msgs; n != uint64(expected) { + return fmt.Errorf("Expected stream %q to have %v messages, got %v", sname, expected, n) + } + return nil + }) + } + + checkCount("TEST", 10) + checkCount("SOURCE", 10) + checkCount("MIRROR", 10) + + err = js.PurgeStream("SOURCE") + require_NoError(t, err) + err = js.PurgeStream("MIRROR") + require_NoError(t, err) + + checkCount("TEST", 10) + checkCount("SOURCE", 0) + checkCount("MIRROR", 0) + + nc.Close() + if c != nil { + c.stopAll() + c.restartAll() + + c.waitOnStreamLeader(globalAccountName, "TEST") + c.waitOnStreamLeader(globalAccountName, "SOURCE") + c.waitOnStreamLeader(globalAccountName, "MIRROR") + + s = c.randomServer() + } else { + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + s = RunJetStreamServerOnPort(-1, sd) + } + + // Wait a bit before checking because sync'ing (even with the defect) + // would not happen right away. I tried with 1 sec and test would pass, + // so need to be at least that much. + time.Sleep(2 * time.Second) + + nc, js = jsClientConnect(t, s) + defer nc.Close() + checkCount("TEST", 10) + checkCount("SOURCE", 0) + checkCount("MIRROR", 0) + } + + t.Run("standalone", func(t *testing.T) { test(t, nil, s) }) + t.Run("cluster", func(t *testing.T) { test(t, c, nil) }) +} diff --git a/server/stream.go b/server/stream.go index 07eb29c0..b9610cba 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2479,7 +2479,13 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T req.Config.OptStartSeq = ssi.OptStartSeq req.Config.DeliverPolicy = DeliverByStartSequence } else if ssi.OptStartTime != nil { - req.Config.OptStartTime = ssi.OptStartTime + // Check to see if our configured start is before what we remember. + // Applicable on restart similar to below. + if ssi.OptStartTime.Before(si.start) { + req.Config.OptStartTime = &si.start + } else { + req.Config.OptStartTime = ssi.OptStartTime + } req.Config.DeliverPolicy = DeliverByStartTime } else if !si.start.IsZero() { // We are falling back to time based startup on a recover, but our messages are gone. e.g. purge, expired, retention policy.