From 2d4c12c5dda440ffea83ae0ae17f35b8b1c62821 Mon Sep 17 00:00:00 2001 From: Andres Morey Date: Wed, 1 Mar 2023 22:08:52 +0300 Subject: [PATCH] added original message timestamp to republished message headers --- server/jetstream_cluster_2_test.go | 6 ++++++ server/stream.go | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 250b2043..aa6cb178 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -5651,6 +5651,12 @@ func TestJetStreamClusterStreamRepublish(t *testing.T) { seq, err := strconv.Atoi(m.Header.Get(JSSequence)) require_NoError(t, err) require_True(t, seq == i) + // Make sure timestamp is correct + ts, err := time.Parse(time.RFC3339Nano, m.Header.Get(JSTimeStamp)) + require_NoError(t, err) + origMsg, err := js.GetMsg("RP", uint64(seq)) + require_NoError(t, err) + require_True(t, ts == origMsg.Time) // Make sure last sequence matches last seq we received on this subject. last, err := strconv.Atoi(m.Header.Get(JSLastSequence)) require_NoError(t, err) diff --git a/server/stream.go b/server/stream.go index 12cb7fa4..30889e7c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4056,21 +4056,23 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Check for republish. if republish { + tsStr := time.Unix(0, ts).UTC().Format(time.RFC3339Nano) var rpMsg []byte if len(hdr) == 0 { - const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n" - const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n" + const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\n\r\n" + const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n" if !thdrsOnly { - hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tlseq)) + hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tsStr, tlseq)) rpMsg = copyBytes(msg) } else { - hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tlseq, len(msg))) + hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tsStr, tlseq, len(msg))) } } else { // Slow path. hdr = genHeader(hdr, JSStream, name) hdr = genHeader(hdr, JSSubject, subject) hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10)) + hdr = genHeader(hdr, JSTimeStamp, tsStr) hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10)) if !thdrsOnly { rpMsg = copyBytes(msg)