In addition to sealed we add in other stream perms to control purge, msg deletes and rollups.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-10-05 17:43:24 -07:00
parent 5ff751af99
commit aff0f62106
5 changed files with 183 additions and 18 deletions

View File

@@ -1078,5 +1078,25 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamPurgeFailedF",
"code": 500,
"error_code": 10110,
"description": "{err}",
"comment": "Generic stream purge failure error string",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamRollupFailedF",
"code": 500,
"error_code": 10111,
"description": "{err}",
"comment": "Generic stream rollup failure error string",
"help": "",
"url": "",
"deprecates": ""
}
]
]

View File

@@ -16,6 +16,7 @@ package server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
@@ -2302,6 +2303,12 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if mset.cfg.DenyDelete {
resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if s.JetStreamIsClustered() {
s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
return
@@ -2544,6 +2551,12 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if mset.cfg.DenyPurge {
resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if s.JetStreamIsClustered() {
s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
return

View File

@@ -9008,6 +9008,21 @@ func TestJetStreamSeal(t *testing.T) {
t.Run("Clustered", func(t *testing.T) { testSeal(t, c.randomServer(), 3) })
}
func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
t.Helper()
req, err := json.Marshal(cfg)
require_NoError(t, err)
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
require_NoError(t, err)
var resp JSApiStreamCreateResponse
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
return resp.StreamInfo
}
func TestJetStreamRollups(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
@@ -9015,15 +9030,15 @@ func TestJetStreamRollups(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := &nats.StreamConfig{
Name: "SENSORS",
Subjects: []string{"sensor.*.temp"},
MaxMsgsPerSubject: 10,
Replicas: 2,
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
cfg := &StreamConfig{
Name: "SENSORS",
Storage: FileStorage,
Subjects: []string{"sensor.*.temp"},
MaxMsgsPer: 10,
RollupAllowed: true,
Replicas: 2,
}
addStream(t, nc, cfg)
var bt [16]byte
var le = binary.LittleEndian
@@ -9091,15 +9106,15 @@ func TestJetStreamRollupSubjectAndWatchers(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := &nats.StreamConfig{
Name: "KVW",
Subjects: []string{"kv.*"},
MaxMsgsPerSubject: 10,
Replicas: 2,
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
cfg := &StreamConfig{
Name: "KVW",
Storage: FileStorage,
Subjects: []string{"kv.*"},
MaxMsgsPer: 10,
RollupAllowed: true,
Replicas: 2,
}
addStream(t, nc, cfg)
sub, err := js.SubscribeSync("kv.*")
if err != nil {
@@ -9152,6 +9167,52 @@ func TestJetStreamRollupSubjectAndWatchers(t *testing.T) {
expectUpdate("age", "50", 6)
}
func TestJetStreamAppendOnly(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := &StreamConfig{
Name: "AUDIT",
Storage: MemoryStorage,
Subjects: []string{"foo"},
Replicas: 3,
DenyDelete: true,
DenyPurge: true,
}
si := addStream(t, nc, cfg)
if !si.Config.DenyDelete || !si.Config.DenyPurge {
t.Fatalf("Expected DenyDelete and DenyPurge to be set, got %+v", si.Config)
}
for i := 0; i < 10; i++ {
js.Publish("foo", []byte("ok"))
}
// Delete should not be allowed.
if err := js.DeleteMsg("AUDIT", 1); err == nil {
t.Fatalf("Expected an error for delete but got none")
}
if err := js.PurgeStream("AUDIT"); err == nil {
t.Fatalf("Expected an error for purge but got none")
}
cfg.DenyDelete = false
cfg.DenyPurge = false
req, err := json.Marshal(cfg)
require_NoError(t, err)
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second)
require_NoError(t, err)
var resp JSApiStreamCreateResponse
err = json.Unmarshal(rmsg.Data, &resp)
require_NoError(t, err)
if resp.Error == nil {
t.Fatalf("Expected an error")
}
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -281,6 +281,9 @@ const (
// JSStreamNotMatchErr expected stream does not match
JSStreamNotMatchErr ErrorIdentifier = 10060
// JSStreamPurgeFailedF Generic stream purge failure error string ({err})
JSStreamPurgeFailedF ErrorIdentifier = 10110
// JSStreamReplicasNotSupportedErr replicas > 1 not supported in non-clustered mode
JSStreamReplicasNotSupportedErr ErrorIdentifier = 10074
@@ -290,6 +293,9 @@ const (
// JSStreamRestoreErrF restore failed: {err}
JSStreamRestoreErrF ErrorIdentifier = 10062
// JSStreamRollupFailedF Generic stream rollup failure error string ({err})
JSStreamRollupFailedF ErrorIdentifier = 10111
// JSStreamSealedErr invalid operation on sealed stream
JSStreamSealedErr ErrorIdentifier = 10109
@@ -424,9 +430,11 @@ var (
JSStreamNameExistErr: {Code: 400, ErrCode: 10058, Description: "stream name already in use"},
JSStreamNotFoundErr: {Code: 404, ErrCode: 10059, Description: "stream not found"},
JSStreamNotMatchErr: {Code: 400, ErrCode: 10060, Description: "expected stream does not match"},
JSStreamPurgeFailedF: {Code: 500, ErrCode: 10110, Description: "{err}"},
JSStreamReplicasNotSupportedErr: {Code: 500, ErrCode: 10074, Description: "replicas > 1 not supported in non-clustered mode"},
JSStreamReplicasNotUpdatableErr: {Code: 400, ErrCode: 10061, Description: "Replicas configuration can not be updated"},
JSStreamRestoreErrF: {Code: 500, ErrCode: 10062, Description: "restore failed: {err}"},
JSStreamRollupFailedF: {Code: 500, ErrCode: 10111, Description: "{err}"},
JSStreamSealedErr: {Code: 400, ErrCode: 10109, Description: "invalid operation on sealed stream"},
JSStreamSequenceNotMatchErr: {Code: 503, ErrCode: 10063, Description: "expected stream sequence does not match"},
JSStreamSnapshotErrF: {Code: 500, ErrCode: 10064, Description: "snapshot failed: {err}"},
@@ -1511,6 +1519,22 @@ func NewJSStreamNotMatchError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamNotMatchErr]
}
// NewJSStreamPurgeFailedError creates a new JSStreamPurgeFailedF error: "{err}"
func NewJSStreamPurgeFailedError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
e := ApiErrors[JSStreamPurgeFailedF]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}
// NewJSStreamReplicasNotSupportedError creates a new JSStreamReplicasNotSupportedErr error: "replicas > 1 not supported in non-clustered mode"
func NewJSStreamReplicasNotSupportedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
@@ -1547,6 +1571,22 @@ func NewJSStreamRestoreError(err error, opts ...ErrorOption) *ApiError {
}
}
// NewJSStreamRollupFailedError creates a new JSStreamRollupFailedF error: "{err}"
func NewJSStreamRollupFailedError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
e := ApiErrors[JSStreamRollupFailedF]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}
// NewJSStreamSealedError creates a new JSStreamSealedErr error: "invalid operation on sealed stream"
func NewJSStreamSealedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)

View File

@@ -57,7 +57,18 @@ type StreamConfig struct {
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
// Optional qualifiers. These can not be modified after set to true.
// Sealed will seal a stream so no messages can get our or in.
Sealed bool `json:"sealed,omitempty"`
// DenyDelete will restrict the ability to delete messages.
DenyDelete bool `json:"deny_delete,omitempty"`
// DenyPurge will restrict the ability to purge messages.
DenyPurge bool `json:"deny_purge,omitempty"`
// RollupAllowed allows messages to be placed into the system and purge
// all older messages using a special msg header.
RollupAllowed bool `json:"rollup_hdrs,omitempty"`
}
// JSPubAckResponse is a formal response to a publish operation.
@@ -895,9 +906,18 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig,
if cfg.Template != _EMPTY_ {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not be owned by a template"))
}
// Can not change from true to false.
if !cfg.Sealed && old.Sealed {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not unseal a sealed stream"))
}
// Can not change from true to false.
if !cfg.DenyDelete && old.DenyDelete {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not cancel deny message deletes"))
}
// Can not change from true to false.
if !cfg.DenyPurge && old.DenyPurge {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not cancel deny purge"))
}
// Do some adjustments for being sealed.
if cfg.Sealed {
@@ -2803,12 +2823,23 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
// Check for any rollups.
if rollup := getRollup(hdr); rollup != _EMPTY_ {
if !mset.cfg.RollupAllowed || mset.cfg.DenyPurge {
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamRollupFailedError(errors.New("rollup not permitted"))
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return errors.New("rollup not permitted")
}
switch rollup {
case JSMsgRollupSubject:
rollupSub = true
case JSMsgRollupAll:
rollupAll = true
default:
mset.mu.Unlock()
return fmt.Errorf("rollup value invalid: %q", rollup)
}
}