diff --git a/server/errors.json b/server/errors.json index 08f3a369..dcd1c1ca 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1068,5 +1068,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSStreamSealedErr", + "code": 400, + "error_code": 10109, + "description": "invalid operation on sealed stream", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 712db39d..517aec2e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1133,6 +1133,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, a *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + streamName := streamNameFromSubject(subject) if streamName != cfg.Name { resp.Error = NewJSStreamMismatchError() @@ -2296,7 +2297,11 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - + if mset.cfg.Sealed { + resp.Error = NewJSStreamSealedError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } if s.JetStreamIsClustered() { s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg) return @@ -2533,7 +2538,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - + if mset.cfg.Sealed { + resp.Error = NewJSStreamSealedError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } if s.JetStreamIsClustered() { s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest) return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f677bc6b..3146c52a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3544,6 +3544,11 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return + + } else if cfg.Sealed { + resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return } // Check for subject collisions here. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2e2fc0ca..2b9cfa84 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -8789,6 +8789,223 @@ func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) { checkClusterSize(c.randomNonLeader()) } +func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) { + storeDir1 := createDir(t, JetStreamStoreDir) + conf1 := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: "%s"} + accounts { + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, + } + system_account = SYS + no_auth_user: a1 + leafnodes: { + listen: localhost:-1 + } + `, storeDir1))) + s1, _ := RunServerWithConfig(conf1) + defer s1.Shutdown() + storeDir2 := createDir(t, JetStreamStoreDir) + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: "%s"} + accounts { + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, + } + system_account = SYS + no_auth_user: a1 + leafnodes:{ + remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, + { url:nats://s1:s1@localhost:%d, account: SYS}] + } + `, storeDir2, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + storeDir3 := createDir(t, JetStreamStoreDir) + conf3 := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: "%s"} + accounts { + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, + } + system_account = SYS + no_auth_user: a1 + leafnodes:{ + remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, + { url:nats://s1:s1@localhost:%d, account: SYS}] + } + `, storeDir3, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkLeafNodeConnectedCount(t, s1, 4) + checkLeafNodeConnectedCount(t, s2, 2) + checkLeafNodeConnectedCount(t, s3, 2) + + c2 := natsConnect(t, s2.ClientURL()) + defer c2.Close() + js2, err := c2.JetStream(nats.Domain("domain2")) + require_NoError(t, err) + ai2, err := js2.AccountInfo() + require_NoError(t, err) + require_Equal(t, ai2.Domain, "domain2") + _, err = js2.AddStream(&nats.StreamConfig{ + Name: "disk", + Storage: nats.FileStorage, + Subjects: []string{"disk"}, + }) + require_NoError(t, err) + _, err = js2.Publish("disk", nil) + require_NoError(t, err) + si, err := js2.StreamInfo("disk") + require_NoError(t, err) + require_True(t, si.State.Msgs == 1) + + c3 := natsConnect(t, s3.ClientURL()) + defer c3.Close() + js3, err := c3.JetStream(nats.Domain("domain3")) + require_NoError(t, err) + ai3, err := js3.AccountInfo() + require_NoError(t, err) + require_Equal(t, ai3.Domain, "domain3") + + _, err = js3.AddStream(&nats.StreamConfig{ + Name: "stream-mirror", + Storage: nats.FileStorage, + Mirror: &nats.StreamSource{ + Name: "disk", + External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, + }, + }) + require_NoError(t, err) + + _, err = js3.AddStream(&nats.StreamConfig{ + Name: "stream-source", + Storage: nats.FileStorage, + Sources: []*nats.StreamSource{{ + Name: "disk", + External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, + }}, + }) + require_NoError(t, err) + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + if si, _ := js3.StreamInfo("stream-mirror"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for mirror, got %d", si.State.Msgs) + } + if si, _ := js3.StreamInfo("stream-source"); si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg for source, got %d", si.State.Msgs) + } + return nil + }) +} + +func TestJetStreamSeal(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + // Need to be done by hand until makes its way to Go client. + createStream := func(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *JSApiStreamCreateResponse { + t.Helper() + req, err := json.Marshal(cfg) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + var scResp JSApiStreamCreateResponse + err = json.Unmarshal(resp.Data, &scResp) + require_NoError(t, err) + return &scResp + } + + updateStream := func(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *JSApiStreamUpdateResponse { + t.Helper() + req, err := json.Marshal(cfg) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second) + require_NoError(t, err) + var scResp JSApiStreamUpdateResponse + err = json.Unmarshal(resp.Data, &scResp) + require_NoError(t, err) + return &scResp + } + + testSeal := func(t *testing.T, s *Server, replicas int) { + nc, js := jsClientConnect(t, s) + defer nc.Close() + // Should not be able to create a stream that starts sealed. + scr := createStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, Storage: MemoryStorage, Sealed: true}) + if scr.Error == nil { + t.Fatalf("Expected an error but got none") + } + // Create our stream. + scr = createStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, MaxAge: time.Minute, Storage: MemoryStorage}) + if scr.Error != nil { + t.Fatalf("Unexpected error: %v", scr.Error) + } + for i := 0; i < 100; i++ { + js.Publish("SEALED", []byte("OK")) + } + // Update to sealed. + sur := updateStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, MaxAge: time.Minute, Storage: MemoryStorage, Sealed: true}) + if sur.Error != nil { + t.Fatalf("Unexpected error: %v", sur.Error) + } + + // Grab stream info and make sure its reflected as sealed. + resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "SEALED"), nil, time.Second) + require_NoError(t, err) + var sir JSApiStreamInfoResponse + err = json.Unmarshal(resp.Data, &sir) + require_NoError(t, err) + if sir.Error != nil { + t.Fatalf("Unexpected error: %v", sir.Error) + } + si := sir.StreamInfo + if !si.Config.Sealed { + t.Fatalf("Expetced the stream to be marked sealed, got %+v\n", si.Config) + } + // Make sure we also updated any max age and moved to discard new. + if si.Config.MaxAge != 0 { + t.Fatalf("Expected MaxAge to be cleared, got %v", si.Config.MaxAge) + } + if si.Config.Discard != DiscardNew { + t.Fatalf("Expected DiscardPolicy to be set to new, got %v", si.Config.Discard) + } + + // Sealing is not reversible, so make sure we get an error trying to undo. + sur = updateStream(t, nc, &StreamConfig{Name: "SEALED", Replicas: replicas, Storage: MemoryStorage, Sealed: false}) + if sur.Error == nil { + t.Fatalf("Expected an error but got none") + } + + // Now test operations like publish a new msg, delete, purge etc all fail. + if _, err := js.Publish("SEALED", []byte("OK")); err == nil { + t.Fatalf("Expected a publish to fail") + } + if err := js.DeleteMsg("SEALED", 1); err == nil { + t.Fatalf("Expected a delete to fail") + } + if err := js.PurgeStream("SEALED"); err == nil { + t.Fatalf("Expected a purge to fail") + } + if err := js.DeleteStream("SEALED"); err != nil { + t.Fatalf("Expected a delete to succeed, got %v", err) + } + } + + t.Run("Single", func(t *testing.T) { testSeal(t, s, 1) }) + t.Run("Clustered", func(t *testing.T) { testSeal(t, c.randomServer(), 3) }) +} + // Support functions // Used to setup superclusters for tests. @@ -9825,116 +10042,3 @@ func (c *cluster) stableTotalSubs() (total int) { return nsubs } - -func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) { - storeDir1 := createDir(t, JetStreamStoreDir) - conf1 := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: "%s"} - accounts { - A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, - SYS:{ users:[ {user:s1,password:s1}]}, - } - system_account = SYS - no_auth_user: a1 - leafnodes: { - listen: localhost:-1 - } - `, storeDir1))) - s1, _ := RunServerWithConfig(conf1) - defer s1.Shutdown() - storeDir2 := createDir(t, JetStreamStoreDir) - conf2 := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: "%s"} - accounts { - A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, - SYS:{ users:[ {user:s1,password:s1}]}, - } - system_account = SYS - no_auth_user: a1 - leafnodes:{ - remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, - { url:nats://s1:s1@localhost:%d, account: SYS}] - } - `, storeDir2, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) - s2, _ := RunServerWithConfig(conf2) - defer s2.Shutdown() - storeDir3 := createDir(t, JetStreamStoreDir) - conf3 := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: "%s"} - accounts { - A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, - SYS:{ users:[ {user:s1,password:s1}]}, - } - system_account = SYS - no_auth_user: a1 - leafnodes:{ - remotes:[{ url:nats://a1:a1@localhost:%d, account: A}, - { url:nats://s1:s1@localhost:%d, account: SYS}] - } - `, storeDir3, s1.opts.LeafNode.Port, s1.opts.LeafNode.Port))) - s3, _ := RunServerWithConfig(conf3) - defer s3.Shutdown() - - checkLeafNodeConnectedCount(t, s1, 4) - checkLeafNodeConnectedCount(t, s2, 2) - checkLeafNodeConnectedCount(t, s3, 2) - - c2 := natsConnect(t, s2.ClientURL()) - defer c2.Close() - js2, err := c2.JetStream(nats.Domain("domain2")) - require_NoError(t, err) - ai2, err := js2.AccountInfo() - require_NoError(t, err) - require_Equal(t, ai2.Domain, "domain2") - _, err = js2.AddStream(&nats.StreamConfig{ - Name: "disk", - Storage: nats.FileStorage, - Subjects: []string{"disk"}, - }) - require_NoError(t, err) - _, err = js2.Publish("disk", nil) - require_NoError(t, err) - si, err := js2.StreamInfo("disk") - require_NoError(t, err) - require_True(t, si.State.Msgs == 1) - - c3 := natsConnect(t, s3.ClientURL()) - defer c3.Close() - js3, err := c3.JetStream(nats.Domain("domain3")) - require_NoError(t, err) - ai3, err := js3.AccountInfo() - require_NoError(t, err) - require_Equal(t, ai3.Domain, "domain3") - - _, err = js3.AddStream(&nats.StreamConfig{ - Name: "stream-mirror", - Storage: nats.FileStorage, - Mirror: &nats.StreamSource{ - Name: "disk", - External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, - }, - }) - require_NoError(t, err) - - _, err = js3.AddStream(&nats.StreamConfig{ - Name: "stream-source", - Storage: nats.FileStorage, - Sources: []*nats.StreamSource{{ - Name: "disk", - External: &nats.ExternalStream{APIPrefix: "$JS.domain2.API"}, - }}, - }) - require_NoError(t, err) - checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { - if si, _ := js3.StreamInfo("stream-mirror"); si.State.Msgs != 1 { - return fmt.Errorf("Expected 1 msg for mirror, got %d", si.State.Msgs) - } - if si, _ := js3.StreamInfo("stream-source"); si.State.Msgs != 1 { - return fmt.Errorf("Expected 1 msg for source, got %d", si.State.Msgs) - } - return nil - }) -} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 6ae5db1f..c4315ef4 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -290,6 +290,9 @@ const ( // JSStreamRestoreErrF restore failed: {err} JSStreamRestoreErrF ErrorIdentifier = 10062 + // JSStreamSealedErr invalid operation on sealed stream + JSStreamSealedErr ErrorIdentifier = 10109 + // JSStreamSequenceNotMatchErr expected stream sequence does not match JSStreamSequenceNotMatchErr ErrorIdentifier = 10063 @@ -424,6 +427,7 @@ var ( JSStreamReplicasNotSupportedErr: {Code: 500, ErrCode: 10074, Description: "replicas > 1 not supported in non-clustered mode"}, JSStreamReplicasNotUpdatableErr: {Code: 400, ErrCode: 10061, Description: "Replicas configuration can not be updated"}, JSStreamRestoreErrF: {Code: 500, ErrCode: 10062, Description: "restore failed: {err}"}, + JSStreamSealedErr: {Code: 400, ErrCode: 10109, Description: "invalid operation on sealed stream"}, JSStreamSequenceNotMatchErr: {Code: 503, ErrCode: 10063, Description: "expected stream sequence does not match"}, JSStreamSnapshotErrF: {Code: 500, ErrCode: 10064, Description: "snapshot failed: {err}"}, JSStreamStoreFailedF: {Code: 503, ErrCode: 10077, Description: "{err}"}, @@ -1543,6 +1547,16 @@ func NewJSStreamRestoreError(err error, opts ...ErrorOption) *ApiError { } } +// NewJSStreamSealedError creates a new JSStreamSealedErr error: "invalid operation on sealed stream" +func NewJSStreamSealedError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSStreamSealedErr] +} + // NewJSStreamSequenceNotMatchError creates a new JSStreamSequenceNotMatchErr error: "expected stream sequence does not match" func NewJSStreamSequenceNotMatchError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/stream.go b/server/stream.go index ae4c9f72..5f5bc2ea 100644 --- a/server/stream.go +++ b/server/stream.go @@ -57,6 +57,7 @@ type StreamConfig struct { Placement *Placement `json:"placement,omitempty"` Mirror *StreamSource `json:"mirror,omitempty"` Sources []*StreamSource `json:"sources,omitempty"` + Sealed bool `json:"sealed,omitempty"` } // JSPubAckResponse is a formal response to a publish operation. @@ -263,6 +264,11 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, NewJSStreamInvalidConfigError(err, Unless(err)) } + // Can't create a stream with a sealed state. + if cfg.Sealed { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() if singleServerMode && cfg.Replicas > 1 { return nil, ApiErrors[JSStreamReplicasNotSupportedErr] @@ -882,6 +888,15 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, if cfg.Template != _EMPTY_ { return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not be owned by a template")) } + if !cfg.Sealed && old.Sealed { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not unseal a sealed stream")) + } + + // Do some adjustments for being sealed. + if cfg.Sealed { + cfg.MaxAge = 0 + cfg.Discard = DiscardNew + } // Check limits. if err := jsa.checkLimits(&cfg); err != nil { @@ -984,6 +999,9 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err mset.mu.Unlock() return 0, errors.New("invalid stream") } + if mset.cfg.Sealed { + return 0, errors.New("sealed stream") + } var _obs [4]*consumer obs := _obs[:0] for _, o := range mset.consumers { @@ -2571,7 +2589,7 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { // processInboundJetStreamMsg handles processing messages bound for a stream. func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { mset.mu.RLock() - isLeader, isClustered := mset.isLeader(), mset.node != nil + isLeader, isClustered, isSealed := mset.isLeader(), mset.node != nil, mset.cfg.Sealed mset.mu.RUnlock() // If we are not the leader just ignore. @@ -2579,6 +2597,16 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac return } + if isSealed { + var resp = JSPubAckResponse{ + PubAck: &PubAck{Stream: mset.name()}, + Error: NewJSStreamSealedError(), + } + b, _ := json.Marshal(resp) + mset.outq.sendMsg(reply, b) + return + } + hdr, msg := c.msgParts(rmsg) // If we are not receiving directly from a client we should move this this Go routine.