From d732c8fe82308b895f7697f3b79d4fc5efeb9990 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sun, 7 Feb 2021 11:26:54 -0700 Subject: [PATCH] Fixed bug that would prevent queue subs to ack messages Signed-off-by: Ivan Kozlovic --- server/client.go | 10 +---- test/jetstream_cluster_test.go | 76 ++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/server/client.go b/server/client.go index 62517f7f..527c9017 100644 --- a/server/client.go +++ b/server/client.go @@ -4013,14 +4013,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } } - var rreply = reply - if rplyHasGWPrefix && sub.client.kind == CLIENT { - rreply = creply - } - // "rreply" will be stripped of the $GNR prefix (if present) - // for client connections only. - mh := c.msgHeader(dsubj, rreply, sub) - if c.deliverMsg(sub, subject, rreply, mh, msg, rplyHasGWPrefix) { + mh := c.msgHeader(dsubj, creply, sub) + if c.deliverMsg(sub, subject, creply, mh, msg, rplyHasGWPrefix) { didDeliver = true // Clear rsub rsub = nil diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 315b1fa2..8ceb3972 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -3493,6 +3493,82 @@ func TestJetStreamClusterStreamPerf(t *testing.T) { fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds()) } +// This test creates a queue consumer for the delivery subject, +// and make sure it connects to the server that is not the leader +// of the stream. A bug was not stripping the $JS.ACK reply subject +// correctly, which means that ack sent on the reply subject was +// droped by the routed server. +func TestJetStreamClusterQueueSubConsumer(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R2S", 2) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Replicas: 1, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + inbox := nats.NewInbox() + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "ivan", + DeliverSubject: inbox, + AckPolicy: nats.AckExplicitPolicy, + AckWait: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Now create a client that does NOT connect to the stream leader. + // Start with url from first server in the cluster. + u := c.servers[0].ClientURL() + // If leader is "S-1", then use S-2 to connect to, which is at servers[1]. + if ci.Cluster.Leader == "S-1" { + u = c.servers[1].ClientURL() + } + qsubnc, err := nats.Connect(u) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer qsubnc.Close() + + ch := make(chan struct{}, 2) + if _, err := qsubnc.QueueSubscribe(inbox, "queue", func(m *nats.Msg) { + m.Respond(nil) + ch <- struct{}{} + }); err != nil { + t.Fatalf("Error creating sub: %v", err) + } + + // Use the other connection to publish a message + if _, err := js.Publish("foo.bar", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + // Wait that we receive the message first. + select { + case <-ch: + case <-time.After(time.Second): + t.Fatal("Did not receive message") + } + + // Message should be ack'ed and not redelivered. + select { + case <-ch: + t.Fatal("Message redelivered!!!") + case <-time.After(250 * time.Millisecond): + // OK + } +} + // Support functions var jsClusterTempl = `