diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 36361f18..71e36125 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6361,24 +6361,21 @@ func TestJetStreamClusterEncryptedDoubleSnapshotBug(t *testing.T) { require_NoError(t, err) } -func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) { +func TestJetStreamClusterRePublishUpdateSupported(t *testing.T) { test := func(t *testing.T, s *Server, stream string, replicas int) { - nc := natsConnect(t, s.ClientURL()) + nc, js := jsClientConnect(t, s) defer nc.Close() - cfg := &StreamConfig{ + cfg := &nats.StreamConfig{ Name: stream, - Storage: MemoryStorage, + Storage: nats.MemoryStorage, Replicas: replicas, + Subjects: []string{"foo.>"}, } - addStream(t, nc, cfg) + _, err := js.AddStream(cfg) + require_NoError(t, err) - cfg.RePublish = &RePublish{ - Source: ">", - Destination: "bar.>", - } - // We expect update to fail, do it manually: - expectFailUpdate := func() { + expectUpdate := func() { t.Helper() req, err := json.Marshal(cfg) @@ -6391,41 +6388,77 @@ func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) { if resp.Type != JSApiStreamUpdateResponseType { t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType) } - if !IsNatsErr(resp.Error, JSStreamInvalidConfigF) { - t.Fatalf("Expected error regarding config error, got %+v", resp.Error) + if IsNatsErr(resp.Error, JSStreamInvalidConfigF) { + t.Fatalf("Expected no error regarding config error, got %+v", resp.Error) } } - expectFailUpdate() - // Now try with a new stream with RePublish present and then try to change config - cfg = &StreamConfig{ - Name: stream + "_2", - Storage: MemoryStorage, - Replicas: replicas, - RePublish: &RePublish{ - Source: ">", - Destination: "bar.>", - }, + expectRepublished := func(expectedRepub bool) { + t.Helper() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Create a subscriber for foo.> so that we can see + // our published message being echoed back to us. + sf, err := nc.SubscribeSync("foo.>") + require_NoError(t, err) + defer sf.Unsubscribe() + + // Create a subscriber for bar.> so that we can see + // any potentially republished messages. + sb, err := nc.SubscribeSync("bar.>") + require_NoError(t, err) + defer sf.Unsubscribe() + + // Publish a message, it will hit the foo.> stream and + // may potentially be republished to the bar.> stream. + _, err = js.Publish("foo."+stream, []byte("HELLO!")) + require_NoError(t, err) + + // Wait for a little while so that we have enough time + // to determine whether it's going to arrive on one or + // both streams. + checkSubsPending(t, sf, 1) + if expectedRepub { + checkSubsPending(t, sb, 1) + } else { + checkSubsPending(t, sb, 0) + } } - addStream(t, nc, cfg) - cfg.RePublish.HeadersOnly = true - expectFailUpdate() - // One last test with existing first, then trying to remove - cfg.Name = stream + "_3" - addStream(t, nc, cfg) + // At this point there's no republish config, so we should + // only receive our published message on foo.>. + expectRepublished(false) + + // Add a republish config so that everything on foo.> also + // gets republished to bar.>. + cfg.RePublish = &nats.RePublish{ + Source: "foo.>", + Destination: "bar.>", + } + expectUpdate() + expectRepublished(true) + + // Now take the republish config away again, so we should go + // back to only getting them on foo.>. cfg.RePublish = nil - expectFailUpdate() + expectUpdate() + expectRepublished(false) } - s := RunBasicJetStreamServer(t) - defer s.Shutdown() + t.Run("Single", func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() - c := createJetStreamClusterExplicit(t, "JSC", 3) - defer c.shutdown() + test(t, s, "single", 1) + }) + t.Run("Clustered", func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() - t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) }) - t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) }) + test(t, c.randomNonLeader(), "clustered", 3) + }) } func TestJetStreamClusterDirectGetFromLeafnode(t *testing.T) { diff --git a/server/stream.go b/server/stream.go index b62c38fb..7c95cc1d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1348,10 +1348,6 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str if !reflect.DeepEqual(cfg.Mirror, old.Mirror) { return nil, NewJSStreamMirrorNotUpdatableError() } - // Can't change RePublish - if !reflect.DeepEqual(cfg.RePublish, old.RePublish) { - return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change RePublish")) - } // Check on new discard new per subject. if cfg.DiscardNewPer { @@ -1555,6 +1551,23 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) } } + // Check for changes to RePublish. + if cfg.RePublish != nil { + // Empty same as all. + if cfg.RePublish.Source == _EMPTY_ { + cfg.RePublish.Source = fwcs + } + tr, err := newTransform(cfg.RePublish.Source, cfg.RePublish.Destination) + if err != nil { + jsa.mu.Unlock() + return fmt.Errorf("stream configuration for republish not valid") + } + // Assign our transform for republishing. + mset.tr = tr + } else { + mset.tr = nil + } + js := mset.js if targetTier := tierName(cfg); mset.tier != targetTier {