[FIXED] JetStream: Sources with OptStartTime gets redelivered

If start by time is before what we remember during recovery use that instead

Resolves #3559

Signed-off-by: Derek Collison <derek@nats.io>
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-11-03 14:21:26 -06:00
parent c9fd776889
commit c16ccd34c3
2 changed files with 121 additions and 1 deletions

View File

@@ -1050,3 +1050,117 @@ func TestJetStreamClusterSignalPullConsumersOnDelete(t *testing.T) {
t.Fatalf("Took to long to bail out on stream delete")
}
}
// https://github.com/nats-io/nats-server/issues/3559
func TestJetStreamClusterSourceWithOptStartTime(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
test := func(t *testing.T, c *cluster, s *Server) {
replicas := 1
if c != nil {
s = c.randomServer()
replicas = 3
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: replicas,
})
require_NoError(t, err)
yesterday := time.Now().Add(-24 * time.Hour)
_, err = js.AddStream(&nats.StreamConfig{
Name: "SOURCE",
Replicas: replicas,
Sources: []*nats.StreamSource{&nats.StreamSource{
Name: "TEST",
OptStartTime: &yesterday,
}},
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRROR",
Replicas: replicas,
Mirror: &nats.StreamSource{
Name: "TEST",
OptStartTime: &yesterday,
},
})
require_NoError(t, err)
total := 10
for i := 0; i < total; i++ {
sendStreamMsg(t, nc, "foo", "hello")
}
checkCount := func(sname string, expected int) {
t.Helper()
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo(sname)
if err != nil {
return err
}
if n := si.State.Msgs; n != uint64(expected) {
return fmt.Errorf("Expected stream %q to have %v messages, got %v", sname, expected, n)
}
return nil
})
}
checkCount("TEST", 10)
checkCount("SOURCE", 10)
checkCount("MIRROR", 10)
err = js.PurgeStream("SOURCE")
require_NoError(t, err)
err = js.PurgeStream("MIRROR")
require_NoError(t, err)
checkCount("TEST", 10)
checkCount("SOURCE", 0)
checkCount("MIRROR", 0)
nc.Close()
if c != nil {
c.stopAll()
c.restartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")
c.waitOnStreamLeader(globalAccountName, "SOURCE")
c.waitOnStreamLeader(globalAccountName, "MIRROR")
s = c.randomServer()
} else {
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
s = RunJetStreamServerOnPort(-1, sd)
}
// Wait a bit before checking because sync'ing (even with the defect)
// would not happen right away. I tried with 1 sec and test would pass,
// so need to be at least that much.
time.Sleep(2 * time.Second)
nc, js = jsClientConnect(t, s)
defer nc.Close()
checkCount("TEST", 10)
checkCount("SOURCE", 0)
checkCount("MIRROR", 0)
}
t.Run("standalone", func(t *testing.T) { test(t, nil, s) })
t.Run("cluster", func(t *testing.T) { test(t, c, nil) })
}

View File

@@ -2479,7 +2479,13 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
req.Config.OptStartSeq = ssi.OptStartSeq
req.Config.DeliverPolicy = DeliverByStartSequence
} else if ssi.OptStartTime != nil {
req.Config.OptStartTime = ssi.OptStartTime
// Check to see if our configured start is before what we remember.
// Applicable on restart similar to below.
if ssi.OptStartTime.Before(si.start) {
req.Config.OptStartTime = &si.start
} else {
req.Config.OptStartTime = ssi.OptStartTime
}
req.Config.DeliverPolicy = DeliverByStartTime
} else if !si.start.IsZero() {
// We are falling back to time based startup on a recover, but our messages are gone. e.g. purge, expired, retention policy.