From e73cddc2de8e7da670666602099eece01aaa524b Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Thu, 7 Oct 2021 14:01:56 +0200 Subject: [PATCH] error when a stream requesting rollups deny purge Signed-off-by: R.I.Pienaar --- server/jetstream_cluster_test.go | 30 ++++++++++++++++++++++++++++++ server/stream.go | 4 ++++ 2 files changed, 34 insertions(+) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index cb5eeca6..daab4f20 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -9023,6 +9023,36 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { return resp.StreamInfo } +func TestJetStreamRollupsRequirePurge(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := &StreamConfig{ + Name: "SENSORS", + Storage: FileStorage, + Subjects: []string{"sensor.*.temp"}, + MaxMsgsPer: 10, + AllowRollup: true, + DenyPurge: true, + Replicas: 2, + } + + j, err := json.Marshal(cfg) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second) + require_NoError(t, err) + + var cr JSApiStreamCreateResponse + err = json.Unmarshal(resp.Data, &cr) + require_NoError(t, err) + if cr.Error == nil || cr.Error.Description != "roll-ups require the purge permission" { + t.Fatalf("unexpected error: %v", cr.Error) + } +} + func TestJetStreamRollups(t *testing.T) { c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() diff --git a/server/stream.go b/server/stream.go index 95456a30..c0e5a53d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -839,6 +839,10 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) { return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then max age") } + if cfg.DenyPurge && cfg.AllowRollup { + return StreamConfig{}, fmt.Errorf("roll-ups require the purge permission") + } + if len(cfg.Subjects) == 0 { if cfg.Mirror == nil && len(cfg.Sources) == 0 { cfg.Subjects = append(cfg.Subjects, cfg.Name)