AckExplicit removes message for "offline" durable

The test shows the issue.
It seems that Consumer.needAck() for AckExplicit should consider
that an Ack is needed if sseq > o.asflr and there is no pending
ack at all. However making this change would break the test
TestJetStreamInterestRetentionStream.

Also, running the new test with `-count 100` and by adding an
artificial delay in stream.ackMsg() (just before calling
mset.store.RemoveMsg(seq)) causes sometimes delete requests
for the same sequence to be processed twice, which causes
the new test to fail (even with an attempted fix as discussed
above). I think that the attempt to remove the same sequence twice
is messing up the state.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-10-01 12:22:02 -06:00
parent 8989fb6524
commit eff27e26be

View File

@@ -7842,3 +7842,141 @@ func TestJetStreamPubSubPerf(t *testing.T) {
fmt.Printf("time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}
func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{
Name: "MY_STREAM",
Storage: server.MemoryStorage,
Subjects: []string{"foo.*"},
Retention: server.InterestPolicy,
}},
{"FileStore", &server.StreamConfig{
Name: "MY_STREAM",
Storage: server.FileStorage,
Subjects: []string{"foo.*"},
Retention: server.InterestPolicy,
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc1 := clientConnectToServer(t, s)
defer nc1.Close()
nc2 := clientConnectToServer(t, s)
defer nc2.Close()
// Create two durable consumers on the same subject
sub1, _ := nc1.SubscribeSync(nats.NewInbox())
defer sub1.Unsubscribe()
nc1.Flush()
o1, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur1",
DeliverSubject: sub1.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o1.Delete()
sub2, _ := nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
nc2.Flush()
o2, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur2",
DeliverSubject: sub2.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o2.Delete()
// Send 1 message
toSend := 2
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1))
}
state := mset.State()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
}
// Receive this first message and ack it.
subs := []*nats.Subscription{sub1, sub2}
for _, sub := range subs {
for i := 0; i < toSend; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error acking message: %v", err)
}
m.Respond(nil)
}
}
// Now close the 2nd subscription...
sub2.Unsubscribe()
nc2.Flush()
// Send new messages
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1))
}
// first subscription should get it and will ack it.
for i := 0; i < toSend; i++ {
m, err := sub1.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error acking message: %v", err)
}
m.Respond(nil)
}
// Now recreate the subscription for the 2nd JS consumer
sub2, _ = nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
nc2.Flush()
o2, err = mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur2",
DeliverSubject: sub2.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o2.Delete()
// Those messages should be redelivered to the 2nd consumer
for i := 0; i < toSend; i++ {
m, err := sub2.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error acking message: %v", err)
}
m.Respond(nil)
}
})
}
}