From b73afbdcb19b4e5d339f52c07af426bbb65553da Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 3 Aug 2022 16:29:53 -0600 Subject: [PATCH] [FIXED] JetStream: reject stream update with changes to RePublish The update was not rejected, yet the republish update was not taking place. Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster_test.go | 70 ++++++++++++++++++++++++++++++++ server/stream.go | 4 ++ 2 files changed, 74 insertions(+) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 27e764f2..487f2751 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -11631,3 +11631,73 @@ func TestJetStreamClusterEncryptedDoubleSnapshotBug(t *testing.T) { _, err = js.Publish("foo", []byte("SNAP3")) require_NoError(t, err) } + +func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) { + test := func(t *testing.T, s *Server, stream string, replicas int) { + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + + cfg := &StreamConfig{ + Name: stream, + Storage: MemoryStorage, + Replicas: replicas, + } + addStream(t, nc, cfg) + + cfg.RePublish = &RePublish{ + Source: ">", + Destination: "bar.>", + } + // We expect update to fail, do it manually: + expectFailUpdate := func() { + t.Helper() + + req, err := json.Marshal(cfg) + require_NoError(t, err) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second) + require_NoError(t, err) + var resp JSApiStreamCreateResponse + err = json.Unmarshal(rmsg.Data, &resp) + require_NoError(t, err) + if resp.Type != JSApiStreamUpdateResponseType { + t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType) + } + if !IsNatsErr(resp.Error, JSStreamInvalidConfigF) { + t.Fatalf("Expected error regarding config error, got %+v", resp.Error) + } + } + expectFailUpdate() + + // Now try with a new stream with RePublish present and then try to change config + cfg = &StreamConfig{ + Name: stream + "_2", + Storage: MemoryStorage, + Replicas: replicas, + RePublish: &RePublish{ + Source: ">", + Destination: "bar.>", + }, + } + addStream(t, nc, cfg) + cfg.RePublish.HeadersOnly = true + expectFailUpdate() + + // One last test with existing first, then trying to remove + cfg.Name = stream + "_3" + addStream(t, nc, cfg) + cfg.RePublish = nil + expectFailUpdate() + } + + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) }) + t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) }) +} diff --git a/server/stream.go b/server/stream.go index 88c1aad6..e96f16db 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1272,6 +1272,10 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str if !reflect.DeepEqual(cfg.Mirror, old.Mirror) { return nil, NewJSStreamMirrorNotUpdatableError() } + // Can't change RePublish + if !reflect.DeepEqual(cfg.RePublish, old.RePublish) { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change RePublish")) + } // Do some adjustments for being sealed. if cfg.Sealed {