Fix for corrupting a message when extending an existing header.

We had a report of corrupt message payloads when going across leafnodes between streams that were sourced from one another.
We were incorrectly using the underlying buffer when a header already existed.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-05-23 11:28:48 -07:00
parent b7e1f66bea
commit a52697170c
2 changed files with 63 additions and 4 deletions

View File

@@ -3634,14 +3634,13 @@ func removeHeaderIfPresent(hdr []byte, key string) []byte {
// Generate a new header based on optional original header and key value.
// More used in JetStream layers.
func genHeader(hdr []byte, key, value string) []byte {
var bb *bytes.Buffer
var bb bytes.Buffer
if len(hdr) > LEN_CR_LF {
bb = bytes.NewBuffer(hdr[:len(hdr)-LEN_CR_LF])
bb.Write(hdr[:len(hdr)-LEN_CR_LF])
} else {
bb = &bytes.Buffer{}
bb.WriteString(hdrLine)
}
http.Header{key: []string{value}}.Write(bb)
http.Header{key: []string{value}}.Write(&bb)
bb.WriteString(CR_LF)
return bb.Bytes()
}

View File

@@ -6863,6 +6863,66 @@ func TestJetStreamClusterCrossAccountInterop(t *testing.T) {
})
}
func TestJetStreamClusterNilMsgWithHeaderThroughSourcedStream(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 12232, true)
defer c.shutdown()
tmpl = strings.Replace(jsClusterTemplWithSingleLeafNode, "store_dir:", "domain: SPOKE, store_dir:", 1)
spoke := c.createLeafNodeWithTemplate("SPOKE", tmpl)
defer spoke.Shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, spoke)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
jsHub, err := nc.JetStream(nats.APIPrefix("$JS.HUB.API"))
if err != nil {
t.Fatalf("Unexpected error getting JetStream context: %v", err)
}
_, err = jsHub.AddStream(&nats.StreamConfig{
Name: "S",
Replicas: 2,
Sources: []*nats.StreamSource{{
Name: "TEST",
External: &nats.ExternalStream{APIPrefix: "$JS.SPOKE.API"},
}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now send a message to the origin stream with nil body and a header.
m := nats.NewMsg("foo")
m.Header.Add("X-Request-ID", "e9a639b4-cecb-4fbe-8376-1ef511ae1f8d")
m.Data = []byte("HELLO WORLD")
if _, err = jsHub.PublishMsg(m); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := jsHub.SubscribeSync("foo", nats.BindStream("S"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(msg.Data) != "HELLO WORLD" {
t.Fatalf("Message corrupt? Expecting %q got %q", "HELLO WORLD", msg.Data)
}
}
// Support functions
// Used to setup superclusters for tests.