From e08f6d863d7c6294bc2564545132ba5064f08e36 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 30 May 2022 11:21:51 -0700 Subject: [PATCH] Allow for republish to be headers only Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 20 +----------- server/jetstream_helpers_test.go | 19 ++++++++++++ server/jetstream_test.go | 53 ++++++++++++++++++++++++++++++-- server/stream.go | 45 ++++++++++++++++++++++----- 4 files changed, 108 insertions(+), 29 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 1b49e0b5..5fb241fc 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7993,24 +7993,6 @@ func TestJetStreamClusterSeal(t *testing.T) { t.Run("Clustered", func(t *testing.T) { testSeal(t, c.randomServer(), 3) }) } -func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { - t.Helper() - req, err := json.Marshal(cfg) - require_NoError(t, err) - rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - require_NoError(t, err) - var resp JSApiStreamCreateResponse - err = json.Unmarshal(rmsg.Data, &resp) - require_NoError(t, err) - if resp.Type != JSApiStreamCreateResponseType { - t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamCreateResponseType) - } - if resp.Error != nil { - t.Fatalf("Unexpected error: %+v", resp.Error) - } - return resp.StreamInfo -} - // Issue #2568 func TestJetStreamClusteredStreamCreateIdempotent(t *testing.T) { c := createJetStreamClusterExplicit(t, "JSC", 3) @@ -10681,7 +10663,7 @@ func TestJetStreamClusterStreamRepublish(t *testing.T) { Storage: MemoryStorage, Subjects: []string{"foo", "bar", "baz"}, Replicas: 1, - RePublish: &SubjectMapping{ + RePublish: &RePublish{ Source: ">", Destination: "RP.>", }, diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index b2d6d094..0a1d28ec 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -17,6 +17,7 @@ package server import ( + "encoding/json" "fmt" "io/ioutil" "math/rand" @@ -1326,3 +1327,21 @@ func (c *cluster) stableTotalSubs() (total int) { return nsubs } + +func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { + t.Helper() + req, err := json.Marshal(cfg) + require_NoError(t, err) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + var resp JSApiStreamCreateResponse + err = json.Unmarshal(rmsg.Data, &resp) + require_NoError(t, err) + if resp.Type != JSApiStreamCreateResponseType { + t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamCreateResponseType) + } + if resp.Error != nil { + t.Fatalf("Unexpected error: %+v", resp.Error) + } + return resp.StreamInfo +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 1e692c30..9f114cc2 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -17523,25 +17523,72 @@ func TestJetStreamStreamRepublishCycle(t *testing.T) { } } - cfg.RePublish = &SubjectMapping{ + cfg.RePublish = &RePublish{ Source: "foo.>", Destination: "foo.>", } expectFail() - cfg.RePublish = &SubjectMapping{ + cfg.RePublish = &RePublish{ Source: "bar.bar", Destination: "foo.bar", } expectFail() - cfg.RePublish = &SubjectMapping{ + cfg.RePublish = &RePublish{ Source: "baz", Destination: "bar.bar", } expectFail() } +func TestJetStreamStreamRepublishHeadersOnly(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Do by hand for now. + cfg := &StreamConfig{ + Name: "RPC", + Storage: MemoryStorage, + Subjects: []string{"foo", "bar", "baz"}, + RePublish: &RePublish{ + Destination: "RP.>", + HeadersOnly: true, + }, + } + addStream(t, nc, cfg) + + sub, err := nc.SubscribeSync("RP.>") + require_NoError(t, err) + + msg, toSend := bytes.Repeat([]byte("Z"), 512), 100 + for i := 0; i < toSend; i++ { + js.PublishAsync("foo", msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + checkSubsPending(t, sub, toSend) + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + + if len(m.Data) > 0 { + t.Fatalf("Expected no msg just headers, but got %d bytes", len(m.Data)) + } + if sz := m.Header.Get(JSMsgSize); sz != "512" { + t.Fatalf("Expected msg size hdr, got %q", sz) + } +} + func TestJetStreamConsumerDeliverNewNotConsumingBeforeRestart(t *testing.T) { s := RunBasicJetStreamServer() if config := s.JetStreamConfig(); config != nil { diff --git a/server/stream.go b/server/stream.go index cdd6f105..1352c7c3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -59,7 +59,7 @@ type StreamConfig struct { Sources []*StreamSource `json:"sources,omitempty"` // Allow republish of the message after being sequenced and stored. - RePublish *SubjectMapping `json:"republish,omitempty"` + RePublish *RePublish `json:"republish,omitempty"` // Optional qualifiers. These can not be modified after set to true. @@ -74,10 +74,11 @@ type StreamConfig struct { AllowRollup bool `json:"allow_rollup_hdrs"` } -// SubjectMapping allows a source subject to be mapped to a destination subject for republishing. -type SubjectMapping struct { +// RePublish is for republishing messages once committed to a stream. +type RePublish struct { Source string `json:"src,omitempty"` Destination string `json:"dest"` + HeadersOnly bool `json:"headers_only,omitempty"` } // JSPubAckResponse is a formal response to a publish operation. @@ -417,6 +418,10 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Check for RePublish. if cfg.RePublish != nil { + // Empty same as all. + if cfg.RePublish.Source == _EMPTY_ { + cfg.RePublish.Source = fwcs + } tr, err := newTransform(cfg.RePublish.Source, cfg.RePublish.Destination) if err != nil { jsa.mu.Unlock() @@ -1144,6 +1149,10 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi // Check to make sure source is a valid subset of the subjects we have. // Also make sure it does not form a cycle. var srcValid bool + // Empty same as all. + if cfg.RePublish.Source == _EMPTY_ { + cfg.RePublish.Source = fwcs + } for _, subj := range cfg.Subjects { if SubjectsCollide(cfg.RePublish.Source, subj) { srcValid = true @@ -3475,8 +3484,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Republish state if needed. var tsubj string var tlseq uint64 + var thdrsOnly bool if mset.tr != nil { tsubj, _ = mset.tr.TransformSubject(subject) + if mset.cfg.RePublish != nil { + thdrsOnly = mset.cfg.RePublish.HeadersOnly + } } republish := tsubj != _EMPTY_ @@ -3562,10 +3575,28 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Check for republish. if republish { - hdr = genHeader(hdr, JSStream, name) - hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10)) - hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10)) - mset.outq.send(newJSPubMsg(tsubj, subject, _EMPTY_, copyBytes(hdr), copyBytes(msg), nil, seq)) + var rpMsg []byte + if len(hdr) == 0 { + const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n" + const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n" + if !thdrsOnly { + hdr = []byte(fmt.Sprintf(ht, name, seq, tlseq)) + rpMsg = copyBytes(msg) + } else { + hdr = []byte(fmt.Sprintf(htho, name, seq, tlseq, len(msg))) + } + } else { + // Slow path. + hdr = genHeader(hdr, JSStream, name) + hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10)) + hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10)) + if !thdrsOnly { + rpMsg = copyBytes(msg) + } else { + hdr = genHeader(hdr, JSMsgSize, strconv.Itoa(len(msg))) + } + } + mset.outq.send(newJSPubMsg(tsubj, subject, _EMPTY_, copyBytes(hdr), rpMsg, nil, seq)) } }