Make sure purge works with redeliveries pending

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-06 18:39:03 -08:00
parent f78efe2d91
commit 37be43ee5a
2 changed files with 98 additions and 1 deletions

View File

@@ -1317,9 +1317,21 @@ func (o *Observable) purge(sseq uint64) {
o.pending = nil
if o.ptmr != nil {
o.ptmr.Stop()
o.ptmr = nil
// Do not nil this out here. This allows checkPending to fire
// and still be ok and not panic.
}
}
// We need to remove all those being queued for redelivery under o.rdq
if len(o.rdq) > 0 {
var newRDQ []uint64
for _, sseq := range o.rdq {
if sseq >= o.sseq {
newRDQ = append(newRDQ, sseq)
}
}
// Replace with new list. Most of the time this will be nil.
o.rdq = newRDQ
}
o.mu.Unlock()
}

View File

@@ -2108,6 +2108,91 @@ func TestJetStreamMsgSetPurgeWithObservable(t *testing.T) {
}
}
func TestJetStreamMsgSetPurgeWithObservableAndRedelivery(t *testing.T) {
cases := []struct {
name string
mconfig *server.MsgSetConfig
}{
{"MemoryStore", &server.MsgSetConfig{Name: "DC", Storage: server.MemoryStorage}},
{"FileStore", &server.MsgSetConfig{Name: "DC", Storage: server.FileStorage}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.GlobalAccount().AddMsgSet(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Send 100 msgs
for i := 0; i < 100; i++ {
nc.Publish("DC", []byte("OK!"))
}
nc.Flush()
if stats := mset.Stats(); stats.Msgs != 100 {
t.Fatalf("Expected %d messages, got %d", 100, stats.Msgs)
}
// Now create an observable and make sure it functions properly.
// This will test redelivery state and purge of the msgset.
wcfg := &server.ObservableConfig{
Durable: "WQ",
DeliverAll: true,
AckPolicy: server.AckExplicit,
AckWait: 20 * time.Millisecond,
}
o, err := mset.AddObservable(wcfg)
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
nextSubj := o.RequestNextMsgSubject()
for i := 0; i < 50; i++ {
// Do not ack these.
if _, err := nc.Request(nextSubj, nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Now wait to make sure we are in a redelivered state.
time.Sleep(wcfg.AckWait * 2)
// Now do purge.
mset.Purge()
if stats := mset.Stats(); stats.Msgs != 0 {
t.Fatalf("Expected %d messages, got %d", 0, stats.Msgs)
}
// Now get the state and check that we did the right thing.
// Pending should be cleared, and msgset sequences should have been set
// to the total messages before purge + 1.
state := o.Info().State
if len(state.Pending) != 0 {
t.Fatalf("Expected no pending, got %d", len(state.Pending))
}
if state.Delivered.SetSeq != 101 {
t.Fatalf("Expected to have setseq now at next seq of 101, got %d", state.Delivered.SetSeq)
}
// Check AckFloors which should have also been adjusted.
if state.AckFloor.SetSeq != 100 {
t.Fatalf("Expected ackfloor for setseq to be 100, got %d", state.AckFloor.SetSeq)
}
if state.AckFloor.ObsSeq != 50 {
t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.ObsSeq)
}
// Also make sure we can get new messages correctly.
nc.Request("DC", []byte("OK-22"), time.Second)
if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if string(msg.Data) != "OK-22" {
t.Fatalf("Received wrong message, wanted 'OK-22', got %q", msg.Data)
}
})
}
}
func TestJetStreamInterestRetentionMsgSet(t *testing.T) {
cases := []struct {
name string