Merge pull request #3169 from nats-io/republish_updates

Republish on the republish subject, place original subject in a header like direct stream gets.
This commit is contained in:
Derek Collison
2022-06-05 16:48:57 -07:00
committed by GitHub

View File

@@ -3655,17 +3655,18 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if republish {
var rpMsg []byte
if len(hdr) == 0 {
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
if !thdrsOnly {
hdr = []byte(fmt.Sprintf(ht, name, seq, tlseq))
hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tlseq))
rpMsg = copyBytes(msg)
} else {
hdr = []byte(fmt.Sprintf(htho, name, seq, tlseq, len(msg)))
hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tlseq, len(msg)))
}
} else {
// Slow path.
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, subject)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10))
hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10))
if !thdrsOnly {
@@ -3674,7 +3675,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
hdr = genHeader(hdr, JSMsgSize, strconv.Itoa(len(msg)))
}
}
mset.outq.send(newJSPubMsg(tsubj, subject, _EMPTY_, copyBytes(hdr), rpMsg, nil, seq))
mset.outq.send(newJSPubMsg(tsubj, _EMPTY_, _EMPTY_, copyBytes(hdr), rpMsg, nil, seq))
}
}