mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed bug that would prevent queue subs to ack messages
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -4013,14 +4013,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
|
||||
}
|
||||
}
|
||||
|
||||
var rreply = reply
|
||||
if rplyHasGWPrefix && sub.client.kind == CLIENT {
|
||||
rreply = creply
|
||||
}
|
||||
// "rreply" will be stripped of the $GNR prefix (if present)
|
||||
// for client connections only.
|
||||
mh := c.msgHeader(dsubj, rreply, sub)
|
||||
if c.deliverMsg(sub, subject, rreply, mh, msg, rplyHasGWPrefix) {
|
||||
mh := c.msgHeader(dsubj, creply, sub)
|
||||
if c.deliverMsg(sub, subject, creply, mh, msg, rplyHasGWPrefix) {
|
||||
didDeliver = true
|
||||
// Clear rsub
|
||||
rsub = nil
|
||||
|
||||
@@ -3493,6 +3493,82 @@ func TestJetStreamClusterStreamPerf(t *testing.T) {
|
||||
fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds())
|
||||
}
|
||||
|
||||
// This test creates a queue consumer for the delivery subject,
|
||||
// and make sure it connects to the server that is not the leader
|
||||
// of the stream. A bug was not stripping the $JS.ACK reply subject
|
||||
// correctly, which means that ack sent on the reply subject was
|
||||
// droped by the routed server.
|
||||
func TestJetStreamClusterQueueSubConsumer(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R2S", 2)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo.>"},
|
||||
Replicas: 1,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
inbox := nats.NewInbox()
|
||||
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "ivan",
|
||||
DeliverSubject: inbox,
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
AckWait: 100 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Now create a client that does NOT connect to the stream leader.
|
||||
// Start with url from first server in the cluster.
|
||||
u := c.servers[0].ClientURL()
|
||||
// If leader is "S-1", then use S-2 to connect to, which is at servers[1].
|
||||
if ci.Cluster.Leader == "S-1" {
|
||||
u = c.servers[1].ClientURL()
|
||||
}
|
||||
qsubnc, err := nats.Connect(u)
|
||||
if err != nil {
|
||||
t.Fatalf("Error connecting: %v", err)
|
||||
}
|
||||
defer qsubnc.Close()
|
||||
|
||||
ch := make(chan struct{}, 2)
|
||||
if _, err := qsubnc.QueueSubscribe(inbox, "queue", func(m *nats.Msg) {
|
||||
m.Respond(nil)
|
||||
ch <- struct{}{}
|
||||
}); err != nil {
|
||||
t.Fatalf("Error creating sub: %v", err)
|
||||
}
|
||||
|
||||
// Use the other connection to publish a message
|
||||
if _, err := js.Publish("foo.bar", []byte("hello")); err != nil {
|
||||
t.Fatalf("Error on publish: %v", err)
|
||||
}
|
||||
|
||||
// Wait that we receive the message first.
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Did not receive message")
|
||||
}
|
||||
|
||||
// Message should be ack'ed and not redelivered.
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("Message redelivered!!!")
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
// OK
|
||||
}
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
var jsClusterTempl = `
|
||||
|
||||
Reference in New Issue
Block a user