mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] JetStream: reject stream update with changes to RePublish
The update was not rejected, yet the republish update was not taking place. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -11631,3 +11631,73 @@ func TestJetStreamClusterEncryptedDoubleSnapshotBug(t *testing.T) {
|
||||
_, err = js.Publish("foo", []byte("SNAP3"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) {
|
||||
test := func(t *testing.T, s *Server, stream string, replicas int) {
|
||||
nc := natsConnect(t, s.ClientURL())
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &StreamConfig{
|
||||
Name: stream,
|
||||
Storage: MemoryStorage,
|
||||
Replicas: replicas,
|
||||
}
|
||||
addStream(t, nc, cfg)
|
||||
|
||||
cfg.RePublish = &RePublish{
|
||||
Source: ">",
|
||||
Destination: "bar.>",
|
||||
}
|
||||
// We expect update to fail, do it manually:
|
||||
expectFailUpdate := func() {
|
||||
t.Helper()
|
||||
|
||||
req, err := json.Marshal(cfg)
|
||||
require_NoError(t, err)
|
||||
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var resp JSApiStreamCreateResponse
|
||||
err = json.Unmarshal(rmsg.Data, &resp)
|
||||
require_NoError(t, err)
|
||||
if resp.Type != JSApiStreamUpdateResponseType {
|
||||
t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType)
|
||||
}
|
||||
if !IsNatsErr(resp.Error, JSStreamInvalidConfigF) {
|
||||
t.Fatalf("Expected error regarding config error, got %+v", resp.Error)
|
||||
}
|
||||
}
|
||||
expectFailUpdate()
|
||||
|
||||
// Now try with a new stream with RePublish present and then try to change config
|
||||
cfg = &StreamConfig{
|
||||
Name: stream + "_2",
|
||||
Storage: MemoryStorage,
|
||||
Replicas: replicas,
|
||||
RePublish: &RePublish{
|
||||
Source: ">",
|
||||
Destination: "bar.>",
|
||||
},
|
||||
}
|
||||
addStream(t, nc, cfg)
|
||||
cfg.RePublish.HeadersOnly = true
|
||||
expectFailUpdate()
|
||||
|
||||
// One last test with existing first, then trying to remove
|
||||
cfg.Name = stream + "_3"
|
||||
addStream(t, nc, cfg)
|
||||
cfg.RePublish = nil
|
||||
expectFailUpdate()
|
||||
}
|
||||
|
||||
s := RunBasicJetStreamServer()
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
defer s.Shutdown()
|
||||
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) })
|
||||
t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) })
|
||||
}
|
||||
|
||||
@@ -1272,6 +1272,10 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
|
||||
if !reflect.DeepEqual(cfg.Mirror, old.Mirror) {
|
||||
return nil, NewJSStreamMirrorNotUpdatableError()
|
||||
}
|
||||
// Can't change RePublish
|
||||
if !reflect.DeepEqual(cfg.RePublish, old.RePublish) {
|
||||
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change RePublish"))
|
||||
}
|
||||
|
||||
// Do some adjustments for being sealed.
|
||||
if cfg.Sealed {
|
||||
|
||||
Reference in New Issue
Block a user