From 8888ab51f49a3b8d17d1466a3521999d853d0f0c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 24 May 2021 09:53:31 -0700 Subject: [PATCH] Fix for #2243. We were not allowing replicated acks processing for workqueues properly, only interest retention. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 2 +- server/jetstream_cluster_test.go | 60 +++++++++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 985fbc0a..1f715187 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2850,7 +2850,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) { o.mu.RLock() mset := o.mset - if mset == nil || mset.cfg.Retention != InterestPolicy { + if mset == nil || mset.cfg.Retention == LimitsPolicy { o.mu.RUnlock() return } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index ffa08064..70d0f4c1 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1979,11 +1979,6 @@ func TestJetStreamClusterInterestRetention(t *testing.T) { m.Ack() waitForZero := func() { - js, err := nc.JetStream(nats.MaxWait(50 * time.Millisecond)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("foo") if err != nil { @@ -2014,6 +2009,61 @@ func TestJetStreamClusterInterestRetention(t *testing.T) { waitForZero() } +// https://github.com/nats-io/nats-server/issues/2243 +func TestJetStreamClusterWorkQueueRetention(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo.*"}, + Replicas: 2, + Retention: nats.WorkQueuePolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.PullSubscribe("foo.test", "test") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err = js.Publish("foo.test", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + si, err := js.StreamInfo("FOO") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 1 { + t.Fatalf("Expected 1 msg, got state: %+v", si.State) + } + + // Fetch from our pull consumer and ack. + for _, m := range fetchMsgs(t, sub, 1, 5*time.Second) { + m.Ack() + } + + // Make sure the messages are removed. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("FOO") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 0 { + return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State) + } + return nil + }) + +} + func TestJetStreamClusterMirrorAndSourceWorkQueues(t *testing.T) { c := createJetStreamClusterExplicit(t, "WQ", 3) defer c.shutdown()