mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2166 from nats-io/lngwr
[FIXED] JetStream pull based message delivery could drop responses.
This commit is contained in:
@@ -3058,20 +3058,22 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
|
||||
|
||||
// Check for internal subscriptions.
|
||||
if sub.icb != nil && !c.noIcb {
|
||||
if gwrply {
|
||||
// Note that we keep track of the GW routed reply in the destination
|
||||
// connection (`client`). The routed reply subject is in `c.pa.reply`,
|
||||
// should that change, we would have to pass the GW routed reply as
|
||||
// a parameter of deliverMsg().
|
||||
srv.trackGWReply(client, c.pa.reply)
|
||||
ireply := string(reply)
|
||||
// For internal callbacks we don't want to rely on the GW routed reply tracking
|
||||
// since clients can change for who receives vs sends response. GW routed
|
||||
// currently pins to a destination client connection. Also for internal its
|
||||
// ok if we do not hide the raw reply like we do for external clients. We
|
||||
// will not remap service import replies since they are handled different.
|
||||
if gwrply && !isServiceReply(reply) {
|
||||
ireply = string(c.pa.reply)
|
||||
}
|
||||
client.mu.Unlock()
|
||||
|
||||
// Internal account clients are for service imports and need the '\r\n'.
|
||||
if client.kind == ACCOUNT {
|
||||
sub.icb(sub, c, string(subject), string(reply), msg)
|
||||
sub.icb(sub, c, string(subject), ireply, msg)
|
||||
} else {
|
||||
sub.icb(sub, c, string(subject), string(reply), msg[:msgSize])
|
||||
sub.icb(sub, c, string(subject), ireply, msg[:msgSize])
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user