When receiving a response across a gateway that has headers and a globally routed subject (_GR_) we were dropping header information.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-06-10 13:46:52 -07:00
parent 00bfc1f0ae
commit 1270977322
2 changed files with 74 additions and 3 deletions

View File

@@ -2718,11 +2718,11 @@ func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte {
// This should be invoked only from processInboundGatewayMsg() or
// processInboundRoutedMsg() and is checking if the subject
// (c.pa.subject) has the $GNR prefix. If so, this is processed
// (c.pa.subject) has the _GR_ prefix. If so, this is processed
// as a GW reply and `true` is returned to indicate to the caller
// that it should stop processing.
// If gateway is not enabled on this server or if the subject
// does not start with $GR, `false` is returned and caller should
// does not start with _GR_, `false` is returned and caller should
// process message as usual.
func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// Do not handle GW prefixed messages if this server does not have
@@ -2830,7 +2830,18 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
buf = append(buf, c.pa.reply...)
buf = append(buf, ' ')
}
buf = append(buf, c.pa.szb...)
szb := c.pa.szb
if c.pa.hdr >= 0 {
if route.headers {
buf[0] = 'H'
buf = append(buf, c.pa.hdb...)
buf = append(buf, ' ')
} else {
szb = []byte(strconv.Itoa(c.pa.size - c.pa.hdr))
msg = msg[c.pa.hdr:]
}
}
buf = append(buf, szb...)
mhEnd := len(buf)
buf = append(buf, _CRLF_...)
buf = append(buf, msg...)

View File

@@ -6525,6 +6525,66 @@ func TestJetStreamClusterSuperClusterGetNextRewrite(t *testing.T) {
}
}
func TestJetStreamClusterSuperClusterPullConsumerAndHeaders(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
c1 := sc.clusterForName("C1")
c2 := sc.clusterForName("C2")
nc, js := jsClientConnect(t, c1.randomServer())
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "ORIGIN"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 50
for i := 0; i < toSend; i++ {
if _, err := js.Publish("ORIGIN", []byte("ok")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
nc2, js2 := jsClientConnect(t, c2.randomServer())
defer nc2.Close()
_, err := js2.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "ORIGIN"}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Wait for them to be in the sourced stream.
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if si, _ := js2.StreamInfo("S"); si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Expected %d msgs for %q, got %d", toSend, "S", si.State.Msgs)
}
return nil
})
// Now create a pull consumers for the sourced stream.
_, err = js2.AddConsumer("S", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now we will connect and request the next message from each server in C1 cluster and check that headers remain in place.
for _, s := range c1.servers {
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
m, err := nc.Request("$JS.API.CONSUMER.MSG.NEXT.S.dlc", nil, 2*time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(m.Header) != 1 {
t.Fatalf("Expected 1 header element, got %+v", m.Header)
}
}
}
func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) {
c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 33133, false)
defer c.shutdown()