diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 81613943..282391e5 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5675,10 +5675,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) { defer nc.Close() // Origin - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - }) - if err != nil { + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -5686,7 +5683,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) { t.Helper() // Send a batch to a given subject. for i := 0; i < n; i++ { - if _, err := js.Publish("TEST", []byte("OK")); err != nil { + if _, err := js.PublishAsync("TEST", []byte("OK")); err != nil { t.Fatalf("Unexpected publish error: %v", err) } } @@ -5694,7 +5691,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) { checkStream := func(stream string, num uint64) { t.Helper() - checkFor(t, 5*time.Second, 50*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo(stream) if err != nil { return err @@ -5710,11 +5707,13 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) { checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) } checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) } + var err error + _, err = js.AddStream(&nats.StreamConfig{ Name: "M", Mirror: &nats.StreamSource{Name: "TEST"}, Replicas: 2, - MaxAge: 100 * time.Millisecond, + MaxAge: 200 * time.Millisecond, }) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -5724,20 +5723,32 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) { Name: "S", Sources: []*nats.StreamSource{{Name: "TEST"}}, Replicas: 2, - MaxAge: 100 * time.Millisecond, + MaxAge: 200 * time.Millisecond, }) if err != nil { t.Fatalf("Unexpected error: %v", err) } - sendBatch(20) - checkTest(20) - checkMirror(20) - checkSource(20) + sendBatch(100) + checkTest(100) + checkMirror(100) + checkSource(100) // Make sure they expire. checkMirror(0) checkSource(0) + + // Now stop the server housing the leader of the source stream. + sl := c.streamLeader("$G", "TEST") + sl.Shutdown() + c.restartServer(sl) + checkClusterFormed(t, c.servers...) + + // Make sure can process correctluy after we have expired all of the messages. + sendBatch(100) + checkMirror(100) + checkSource(100) + checkTest(200) } // Support functions diff --git a/server/stream.go b/server/stream.go index e1752350..4110738c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1352,7 +1352,7 @@ func (mset *stream) setupMirrorConsumer() error { if ext != nil { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".") } else { - deliverSubject = syncSubject("$JS.M") + deliverSubject = syncSubject("$JSDC.M") } if !isReset { @@ -1524,10 +1524,11 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) { return } s := mset.srv + s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name) - si := mset.sources[sname] + // No longer configured. - if si == nil { + if si := mset.sources[sname]; si == nil { return } mset.setSourceConsumer(sname, seq) @@ -1567,7 +1568,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { if ext != nil { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".") } else { - deliverSubject = syncSubject("$JS.S") + deliverSubject = syncSubject("$JSDC.S") } if !si.grr { @@ -1862,8 +1863,10 @@ func (mset *stream) setStartingSequenceForSource(sname string) { var state StreamState mset.store.FastState(&state) + + // Do not reset sseq here so we can remember when purge/expiration happens. if state.Msgs == 0 { - si.sseq, si.dseq = 0, 0 + si.dseq = 0 return }