Make sure source consumers work properly after expiration/purge

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-16 17:28:04 -07:00
parent 60df8a2fcb
commit d84af39f16
2 changed files with 31 additions and 17 deletions

View File

@@ -5675,10 +5675,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
defer nc.Close()
// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
})
if err != nil {
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -5686,7 +5683,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
if _, err := js.PublishAsync("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
@@ -5694,7 +5691,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
checkStream := func(stream string, num uint64) {
t.Helper()
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo(stream)
if err != nil {
return err
@@ -5710,11 +5707,13 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }
var err error
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST"},
Replicas: 2,
MaxAge: 100 * time.Millisecond,
MaxAge: 200 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -5724,20 +5723,32 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
Name: "S",
Sources: []*nats.StreamSource{{Name: "TEST"}},
Replicas: 2,
MaxAge: 100 * time.Millisecond,
MaxAge: 200 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendBatch(20)
checkTest(20)
checkMirror(20)
checkSource(20)
sendBatch(100)
checkTest(100)
checkMirror(100)
checkSource(100)
// Make sure they expire.
checkMirror(0)
checkSource(0)
// Now stop the server housing the leader of the source stream.
sl := c.streamLeader("$G", "TEST")
sl.Shutdown()
c.restartServer(sl)
checkClusterFormed(t, c.servers...)
// Make sure can process correctluy after we have expired all of the messages.
sendBatch(100)
checkMirror(100)
checkSource(100)
checkTest(200)
}
// Support functions

View File

@@ -1352,7 +1352,7 @@ func (mset *stream) setupMirrorConsumer() error {
if ext != nil {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".")
} else {
deliverSubject = syncSubject("$JS.M")
deliverSubject = syncSubject("$JSDC.M")
}
if !isReset {
@@ -1524,10 +1524,11 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
return
}
s := mset.srv
s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)
si := mset.sources[sname]
// No longer configured.
if si == nil {
if si := mset.sources[sname]; si == nil {
return
}
mset.setSourceConsumer(sname, seq)
@@ -1567,7 +1568,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
if ext != nil {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".")
} else {
deliverSubject = syncSubject("$JS.S")
deliverSubject = syncSubject("$JSDC.S")
}
if !si.grr {
@@ -1862,8 +1863,10 @@ func (mset *stream) setStartingSequenceForSource(sname string) {
var state StreamState
mset.store.FastState(&state)
// Do not reset sseq here so we can remember when purge/expiration happens.
if state.Msgs == 0 {
si.sseq, si.dseq = 0, 0
si.dseq = 0
return
}