From 5fc2cc5754be0fe64d0a82a757597a8b55f62373 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Sep 2021 15:25:38 -0700 Subject: [PATCH] Allow streams to be sealed through a stream update. Sealed streams can not accept new messages, allow you to delete or purge messages, or have messages expire due to age. Sealed stream can not be unsealed through an update. Signed-off-by: Derek Collison --- server/errors.json | 10 + server/jetstream_api.go | 13 +- server/jetstream_cluster.go | 5 + server/jetstream_cluster_test.go | 330 ++++++++++++++++++--------- server/jetstream_errors_generated.go | 14 ++ server/stream.go | 30 ++- 6 files changed, 286 insertions(+), 116 deletions(-) diff --git a/server/errors.json b/server/errors.json index 08f3a369..dcd1c1ca 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1068,5 +1068,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSStreamSealedErr", + "code": 400, + "error_code": 10109, + "description": "invalid operation on sealed stream", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 712db39d..517aec2e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1133,6 +1133,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, a *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + streamName := streamNameFromSubject(subject) if streamName != cfg.Name { resp.Error = NewJSStreamMismatchError() @@ -2296,7 +2297,11 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - + if mset.cfg.Sealed { + resp.Error = NewJSStreamSealedError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } if s.JetStreamIsClustered() { s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg) return @@ -2533,7 +2538,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - + if mset.cfg.Sealed { + resp.Error = NewJSStreamSealedError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } if s.JetStreamIsClustered() { s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest) return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f677bc6b..3146c52a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3544,6 +3544,11 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return + + } else if cfg.Sealed { + resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return } // Check for subject collisions here. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2e2fc0ca..2b9cfa84 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -8789,6 +8789,223 @@ func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) { checkClusterSize(c.randomNonLeader()) } +func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) { + storeDir1 := createDir(t, JetStreamStoreDir) + conf1 := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: "%s"} + accounts { + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, + } + system_account = SYS + no_auth_user: a1 + leafnodes: { + listen: localhost:-1 + } + `, storeDir1))) + s1, _ := RunServerWithConfig(conf1) + defer s1.Shutdown() + storeDir2 := createDir(t, JetStreamStoreDir) + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: "%s"} + accounts { + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, + } + system_account = SYS + no_auth_user: a1 + leafnodes:{ + remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, + { url:nats://s1:s1@localhost:%d, account: SYS}] + } + `, storeDir2, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + storeDir3 := createDir(t, JetStreamStoreDir) + conf3 := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: "%s"} + accounts { + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, + } + system_account = SYS + no_auth_user: a1 + leafnodes:{ + remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, + { url:nats://s1:s1@localhost:%d, account: SYS}] + } + `, storeDir3, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkLeafNodeConnectedCount(t, s1, 4) + checkLeafNodeConnectedCount(t, s2, 2) + checkLeafNodeConnectedCount(t, s3, 2) + + c2 := natsConnect(t, s2.ClientURL()) + defer c2.Close() + js2, err := c2.JetStream(nats.Domain("domain2")) + require_NoError(t, err) + ai2, err := js2.AccountInfo() + require_NoError(t, err) + require_Equal(t, ai2.Domain, "domain2") + _, err = js2.AddStream(&nats.StreamConfig{ + Name: "disk", + Storage: nats.FileStorage, + Subjects: []string{"disk"}, + }) + require_NoError(t, err) + _, err = js2.Publish("disk", nil) + require_NoError(t, err) + si, err := js2.StreamInfo("disk") + require_NoError(t, err) + require_True(t, si.State.Msgs == 1) + + c3 := natsConnect(t, s3.ClientURL()) + defer c3.Close() + js3, err := c3.JetStream(nats.Domain("domain3")) + require_NoError(t, err) + ai3, err := js3.AccountInfo() + require_NoError(t, err) + require_Equal(t, ai3.Domain, "domain3") + + _, err = js3.AddStream(&nats.StreamConfig{ + Name: "stream-mirror", + Storage: nats.FileStorage, + Mirror: &nats.StreamSource{ + Name: "disk", + External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, + }, + }) + require_NoError(t, err) + + _, err = js3.AddStream(&nats.StreamConfig{ + Name: "stream-source", + Storage: nats.FileStorage, + Sources: []*nats.StreamSource{{ + Name: "disk", + External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, + }}, + }) + require_NoError(t, err) + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + if si, _ := js3.StreamInfo("stream-mirror"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for mirror, got %d", si.State.Msgs) + } + if si, _ := js3.StreamInfo("stream-source"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for source, got %d", si.State.Msgs) + } + return nil + }) +} + +func TestJetStreamSeal(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + // Need to be done by hand until makes its way to Go client. + createStream := func(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *JSApiStreamCreateResponse { + t.Helper() + req, err := json.Marshal(cfg) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + var scResp JSApiStreamCreateResponse + err = json.Unmarshal(resp.Data, &scResp) + require_NoError(t, err) + return &scResp + } + + updateStream := func(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *JSApiStreamUpdateResponse { + t.Helper() + req, err := json.Marshal(cfg) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second) + require_NoError(t, err) + var scResp JSApiStreamUpdateResponse + err = json.Unmarshal(resp.Data, &scResp) + require_NoError(t, err) + return &scResp + } + + testSeal := func(t *testing.T, s *Server, replicas int) { + nc, js := jsClientConnect(t, s) + defer nc.Close() + // Should not be able to create a stream that starts sealed. + scr := createStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, Storage: MemoryStorage, Sealed: true}) + if scr.Error == nil { + t.Fatalf("Expected an error but got none") + } + // Create our stream. + scr = createStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, MaxAge: time.Minute, Storage: MemoryStorage}) + if scr.Error != nil { + t.Fatalf("Unexpected error: %v", scr.Error) + } + for i := 0; i < 100; i++ { + js.Publish("SEALED", []byte("OK")) + } + // Update to sealed. + sur := updateStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, MaxAge: time.Minute, Storage: MemoryStorage, Sealed: true}) + if sur.Error != nil { + t.Fatalf("Unexpected error: %v", sur.Error) + } + + // Grab stream info and make sure its reflected as sealed. + resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "SEALED"), nil, time.Second) + require_NoError(t, err) + var sir JSApiStreamInfoResponse + err = json.Unmarshal(resp.Data, &sir) + require_NoError(t, err) + if sir.Error != nil { + t.Fatalf("Unexpected error: %v", sir.Error) + } + si := sir.StreamInfo + if !si.Config.Sealed { + t.Fatalf("Expetced the stream to be marked sealed, got %+v\n", si.Config) + } + // Make sure we also updated any max age and moved to discard new. + if si.Config.MaxAge != 0 { + t.Fatalf("Expected MaxAge to be cleared, got %v", si.Config.MaxAge) + } + if si.Config.Discard != DiscardNew { + t.Fatalf("Expected DiscardPolicy to be set to new, got %v", si.Config.Discard) + } + + // Sealing is not reversible, so make sure we get an error trying to undo. + sur = updateStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, Storage: MemoryStorage, Sealed: false}) + if sur.Error == nil { + t.Fatalf("Expected an error but got none") + } + + // Now test operations like publish a new msg, delete, purge etc all fail. + if _, err := js.Publish("SEALED", []byte("OK")); err == nil { + t.Fatalf("Expected a publish to fail") + } + if err := js.DeleteMsg("SEALED", 1); err == nil { + t.Fatalf("Expected a delete to fail") + } + if err := js.PurgeStream("SEALED"); err == nil { + t.Fatalf("Expected a purge to fail") + } + if err := js.DeleteStream("SEALED"); err != nil { + t.Fatalf("Expected a delete to succeed, got %v", err) + } + } + + t.Run("Single", func(t *testing.T) { testSeal(t, s, 1) }) + t.Run("Clustered", func(t *testing.T) { testSeal(t, c.randomServer(), 3) }) +} + // Support functions // Used to setup superclusters for tests. @@ -9825,116 +10042,3 @@ func (c *cluster) stableTotalSubs() (total int) { return nsubs } - -func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) { - storeDir1 := createDir(t, JetStreamStoreDir) - conf1 := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: "%s"} - accounts { - A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, - SYS:{ users:[ {user:s1,password:s1}]}, - } - system_account = SYS - no_auth_user: a1 - leafnodes: { - listen: localhost:-1 - } - `, storeDir1))) - s1, _ := RunServerWithConfig(conf1) - defer s1.Shutdown() - storeDir2 := createDir(t, JetStreamStoreDir) - conf2 := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: "%s"} - accounts { - A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, - SYS:{ users:[ {user:s1,password:s1}]}, - } - system_account = SYS - no_auth_user: a1 - leafnodes:{ - remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, - { url:nats://s1:s1@localhost:%d, account: SYS}] - } - `, storeDir2, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) - s2, _ := RunServerWithConfig(conf2) - defer s2.Shutdown() - storeDir3 := createDir(t, JetStreamStoreDir) - conf3 := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: "%s"} - accounts { - A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, - SYS:{ users:[ {user:s1,password:s1}]}, - } - system_account = SYS - no_auth_user: a1 - leafnodes:{ - remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, - { url:nats://s1:s1@localhost:%d, account: SYS}] - } - `, storeDir3, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) - s3, _ := RunServerWithConfig(conf3) - defer s3.Shutdown() - - checkLeafNodeConnectedCount(t, s1, 4) - checkLeafNodeConnectedCount(t, s2, 2) - checkLeafNodeConnectedCount(t, s3, 2) - - c2 := natsConnect(t, s2.ClientURL()) - defer c2.Close() - js2, err := c2.JetStream(nats.Domain("domain2")) - require_NoError(t, err) - ai2, err := js2.AccountInfo() - require_NoError(t, err) - require_Equal(t, ai2.Domain, "domain2") - _, err = js2.AddStream(&nats.StreamConfig{ - Name: "disk", - Storage: nats.FileStorage, - Subjects: []string{"disk"}, - }) - require_NoError(t, err) - _, err = js2.Publish("disk", nil) - require_NoError(t, err) - si, err := js2.StreamInfo("disk") - require_NoError(t, err) - require_True(t, si.State.Msgs == 1) - - c3 := natsConnect(t, s3.ClientURL()) - defer c3.Close() - js3, err := c3.JetStream(nats.Domain("domain3")) - require_NoError(t, err) - ai3, err := js3.AccountInfo() - require_NoError(t, err) - require_Equal(t, ai3.Domain, "domain3") - - _, err = js3.AddStream(&nats.StreamConfig{ - Name: "stream-mirror", - Storage: nats.FileStorage, - Mirror: &nats.StreamSource{ - Name: "disk", - External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, - }, - }) - require_NoError(t, err) - - _, err = js3.AddStream(&nats.StreamConfig{ - Name: "stream-source", - Storage: nats.FileStorage, - Sources: []*nats.StreamSource{{ - Name: "disk", - External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, - }}, - }) - require_NoError(t, err) - checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { - if si, _ := js3.StreamInfo("stream-mirror"); si.State.Msgs != 1 { - return fmt.Errorf("Expected 1 msg for mirror, got %d", si.State.Msgs) - } - if si, _ := js3.StreamInfo("stream-source"); si.State.Msgs != 1 { - return fmt.Errorf("Expected 1 msg for source, got %d", si.State.Msgs) - } - return nil - }) -} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 6ae5db1f..c4315ef4 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -290,6 +290,9 @@ const ( // JSStreamRestoreErrF restore failed: {err} JSStreamRestoreErrF ErrorIdentifier = 10062 + // JSStreamSealedErr invalid operation on sealed stream + JSStreamSealedErr ErrorIdentifier = 10109 + // JSStreamSequenceNotMatchErr expected stream sequence does not match JSStreamSequenceNotMatchErr ErrorIdentifier = 10063 @@ -424,6 +427,7 @@ var ( JSStreamReplicasNotSupportedErr: {Code: 500, ErrCode: 10074, Description: "replicas > 1 not supported in non-clustered mode"}, JSStreamReplicasNotUpdatableErr: {Code: 400, ErrCode: 10061, Description: "Replicas configuration can not be updated"}, JSStreamRestoreErrF: {Code: 500, ErrCode: 10062, Description: "restore failed: {err}"}, + JSStreamSealedErr: {Code: 400, ErrCode: 10109, Description: "invalid operation on sealed stream"}, JSStreamSequenceNotMatchErr: {Code: 503, ErrCode: 10063, Description: "expected stream sequence does not match"}, JSStreamSnapshotErrF: {Code: 500, ErrCode: 10064, Description: "snapshot failed: {err}"}, JSStreamStoreFailedF: {Code: 503, ErrCode: 10077, Description: "{err}"}, @@ -1543,6 +1547,16 @@ func NewJSStreamRestoreError(err error, opts ...ErrorOption) *ApiError { } } +// NewJSStreamSealedError creates a new JSStreamSealedErr error: "invalid operation on sealed stream" +func NewJSStreamSealedError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSStreamSealedErr] +} + // NewJSStreamSequenceNotMatchError creates a new JSStreamSequenceNotMatchErr error: "expected stream sequence does not match" func NewJSStreamSequenceNotMatchError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/stream.go b/server/stream.go index ae4c9f72..5f5bc2ea 100644 --- a/server/stream.go +++ b/server/stream.go @@ -57,6 +57,7 @@ type StreamConfig struct { Placement *Placement `json:"placement,omitempty"` Mirror *StreamSource `json:"mirror,omitempty"` Sources []*StreamSource `json:"sources,omitempty"` + Sealed bool `json:"sealed,omitempty"` } // JSPubAckResponse is a formal response to a publish operation. @@ -263,6 +264,11 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, NewJSStreamInvalidConfigError(err, Unless(err)) } + // Can't create a stream with a sealed state. + if cfg.Sealed { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() if singleServerMode && cfg.Replicas > 1 { return nil, ApiErrors[JSStreamReplicasNotSupportedErr] @@ -882,6 +888,15 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, if cfg.Template != _EMPTY_ { return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not be owned by a template")) } + if !cfg.Sealed && old.Sealed { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not unseal a sealed stream")) + } + + // Do some adjustments for being sealed. + if cfg.Sealed { + cfg.MaxAge = 0 + cfg.Discard = DiscardNew + } // Check limits. if err := jsa.checkLimits(&cfg); err != nil { @@ -984,6 +999,9 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err mset.mu.Unlock() return 0, errors.New("invalid stream") } + if mset.cfg.Sealed { + return 0, errors.New("sealed stream") + } var _obs [4]*consumer obs := _obs[:0] for _, o := range mset.consumers { @@ -2571,7 +2589,7 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { // processInboundJetStreamMsg handles processing messages bound for a stream. func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { mset.mu.RLock() - isLeader, isClustered := mset.isLeader(), mset.node != nil + isLeader, isClustered, isSealed := mset.isLeader(), mset.node != nil, mset.cfg.Sealed mset.mu.RUnlock() // If we are not the leader just ignore. @@ -2579,6 +2597,16 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac return } + if isSealed { + var resp = JSPubAckResponse{ + PubAck: &PubAck{Stream: mset.name()}, + Error: NewJSStreamSealedError(), + } + b, _ := json.Marshal(resp) + mset.outq.sendMsg(reply, b) + return + } + hdr, msg := c.msgParts(rmsg) // If we are not receiving directly from a client we should move this this Go routine.