Merge pull request #2246 from nats-io/wq-retention

Fix for #2243. We were not allowing replicated acks processing for work queues.
This commit is contained in:
Derek Collison
2021-05-24 10:01:04 -07:00
committed by GitHub
2 changed files with 56 additions and 6 deletions

View File

@@ -2850,7 +2850,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) {
o.mu.RLock()
mset := o.mset
if mset == nil || mset.cfg.Retention != InterestPolicy {
if mset == nil || mset.cfg.Retention == LimitsPolicy {
o.mu.RUnlock()
return
}

View File

@@ -1979,11 +1979,6 @@ func TestJetStreamClusterInterestRetention(t *testing.T) {
m.Ack()
waitForZero := func() {
js, err := nc.JetStream(nats.MaxWait(50 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("foo")
if err != nil {
@@ -2014,6 +2009,61 @@ func TestJetStreamClusterInterestRetention(t *testing.T) {
waitForZero()
}
// https://github.com/nats-io/nats-server/issues/2243
func TestJetStreamClusterWorkQueueRetention(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.PullSubscribe("foo.test", "test")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err = js.Publish("foo.test", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
si, err := js.StreamInfo("FOO")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 1 {
t.Fatalf("Expected 1 msg, got state: %+v", si.State)
}
// Fetch from our pull consumer and ack.
for _, m := range fetchMsgs(t, sub, 1, 5*time.Second) {
m.Ack()
}
// Make sure the messages are removed.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("FOO")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 0 {
return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State)
}
return nil
})
}
func TestJetStreamClusterMirrorAndSourceWorkQueues(t *testing.T) {
c := createJetStreamClusterExplicit(t, "WQ", 3)
defer c.shutdown()