From 37be43ee5a16668d1860d3c1e6b683670eeafe0d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 6 Jan 2020 18:39:03 -0800 Subject: [PATCH] Make sure purge works with redeliveries pending Signed-off-by: Derek Collison --- server/observable.go | 14 ++++++- test/jetstream_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/server/observable.go b/server/observable.go index 04ccb2c1..88154859 100644 --- a/server/observable.go +++ b/server/observable.go @@ -1317,9 +1317,21 @@ func (o *Observable) purge(sseq uint64) { o.pending = nil if o.ptmr != nil { o.ptmr.Stop() - o.ptmr = nil + // Do not nil this out here. This allows checkPending to fire + // and still be ok and not panic. } } + // We need to remove all those being queued for redelivery under o.rdq + if len(o.rdq) > 0 { + var newRDQ []uint64 + for _, sseq := range o.rdq { + if sseq >= o.sseq { + newRDQ = append(newRDQ, sseq) + } + } + // Replace with new list. Most of the time this will be nil. + o.rdq = newRDQ + } o.mu.Unlock() } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index f30c62b0..6fb139e9 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -2108,6 +2108,91 @@ func TestJetStreamMsgSetPurgeWithObservable(t *testing.T) { } } +func TestJetStreamMsgSetPurgeWithObservableAndRedelivery(t *testing.T) { + cases := []struct { + name string + mconfig *server.MsgSetConfig + }{ + {"MemoryStore", &server.MsgSetConfig{Name: "DC", Storage: server.MemoryStorage}}, + {"FileStore", &server.MsgSetConfig{Name: "DC", Storage: server.FileStorage}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.GlobalAccount().AddMsgSet(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Send 100 msgs + for i := 0; i < 100; i++ { + nc.Publish("DC", []byte("OK!")) + } + nc.Flush() + if stats := mset.Stats(); stats.Msgs != 100 { + t.Fatalf("Expected %d messages, got %d", 100, stats.Msgs) + } + // Now create an observable and make sure it functions properly. + // This will test redelivery state and purge of the msgset. + wcfg := &server.ObservableConfig{ + Durable: "WQ", + DeliverAll: true, + AckPolicy: server.AckExplicit, + AckWait: 20 * time.Millisecond, + } + o, err := mset.AddObservable(wcfg) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + nextSubj := o.RequestNextMsgSubject() + for i := 0; i < 50; i++ { + // Do not ack these. + if _, err := nc.Request(nextSubj, nil, time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + // Now wait to make sure we are in a redelivered state. + time.Sleep(wcfg.AckWait * 2) + // Now do purge. + mset.Purge() + if stats := mset.Stats(); stats.Msgs != 0 { + t.Fatalf("Expected %d messages, got %d", 0, stats.Msgs) + } + // Now get the state and check that we did the right thing. + // Pending should be cleared, and msgset sequences should have been set + // to the total messages before purge + 1. + state := o.Info().State + if len(state.Pending) != 0 { + t.Fatalf("Expected no pending, got %d", len(state.Pending)) + } + if state.Delivered.SetSeq != 101 { + t.Fatalf("Expected to have setseq now at next seq of 101, got %d", state.Delivered.SetSeq) + } + // Check AckFloors which should have also been adjusted. + if state.AckFloor.SetSeq != 100 { + t.Fatalf("Expected ackfloor for setseq to be 100, got %d", state.AckFloor.SetSeq) + } + if state.AckFloor.ObsSeq != 50 { + t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.ObsSeq) + } + // Also make sure we can get new messages correctly. + nc.Request("DC", []byte("OK-22"), time.Second) + if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if string(msg.Data) != "OK-22" { + t.Fatalf("Received wrong message, wanted 'OK-22', got %q", msg.Data) + } + }) + } +} + func TestJetStreamInterestRetentionMsgSet(t *testing.T) { cases := []struct { name string