mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added Stream Update abilities for certain properties
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.2.0-beta.2"
|
||||
VERSION = "2.2.0-beta.3"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -207,7 +207,7 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
|
||||
if err := fs.recoverMsgs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Write our meta data iff new.
|
||||
// Write our meta data.
|
||||
if err := fs.writeMsgSetMeta(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -219,6 +219,41 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
|
||||
if cfg.Name == "" {
|
||||
return fmt.Errorf("name required")
|
||||
}
|
||||
if cfg.Storage != FileStorage {
|
||||
return fmt.Errorf("fileStore requires file storage type in config")
|
||||
}
|
||||
|
||||
fs.mu.Lock()
|
||||
old_cfg := fs.cfg
|
||||
fs.cfg = *cfg
|
||||
if err := fs.writeMsgSetMeta(); err != nil {
|
||||
fs.cfg = old_cfg
|
||||
fs.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
// Limits checks and enforcement.
|
||||
fs.enforceMsgLimit()
|
||||
fs.enforceBytesLimit()
|
||||
// Do age timers.
|
||||
if fs.ageChk == nil && fs.cfg.MaxAge != 0 {
|
||||
fs.startAgeChk()
|
||||
}
|
||||
if fs.ageChk != nil && fs.cfg.MaxAge == 0 {
|
||||
fs.ageChk.Stop()
|
||||
fs.ageChk = nil
|
||||
}
|
||||
fs.mu.Unlock()
|
||||
|
||||
if cfg.MaxAge != 0 {
|
||||
fs.expireMsgs()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func dynBlkSize(retention RetentionPolicy, maxBytes int64) uint64 {
|
||||
if retention == LimitsPolicy {
|
||||
// TODO(dlc) - Make the blocksize relative to this if set.
|
||||
@@ -232,7 +267,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64) uint64 {
|
||||
// Write out meta and the checksum.
|
||||
func (fs *fileStore) writeMsgSetMeta() error {
|
||||
meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
|
||||
if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil {
|
||||
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
b, err := json.MarshalIndent(fs.cfg, _EMPTY_, " ")
|
||||
@@ -504,7 +539,9 @@ func (fs *fileStore) enforceMsgLimit() {
|
||||
if fs.cfg.MaxMsgs <= 0 || fs.state.Msgs <= uint64(fs.cfg.MaxMsgs) {
|
||||
return
|
||||
}
|
||||
fs.deleteFirstMsg()
|
||||
for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs {
|
||||
fs.deleteFirstMsg()
|
||||
}
|
||||
}
|
||||
|
||||
// Will check the bytes limit and drop msgs if needed.
|
||||
|
||||
@@ -700,16 +700,23 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error {
|
||||
}
|
||||
// Check storage, memory or disk.
|
||||
if config.MaxBytes > 0 {
|
||||
mb := config.MaxBytes * int64(config.Replicas)
|
||||
switch config.Storage {
|
||||
case MemoryStorage:
|
||||
if jsa.memReserved+mb > jsa.limits.MaxMemory {
|
||||
return fmt.Errorf("insufficient memory resources available")
|
||||
}
|
||||
case FileStorage:
|
||||
if jsa.storeReserved+mb > jsa.limits.MaxStore {
|
||||
return fmt.Errorf("insufficient storage resources available")
|
||||
}
|
||||
return jsa.checkBytesLimits(config.MaxBytes*int64(config.Replicas), config.Storage)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if additional bytes will exceed our account limits.
|
||||
// This should account for replicas.
|
||||
// Lock should be held.
|
||||
func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType) error {
|
||||
switch storage {
|
||||
case MemoryStorage:
|
||||
if jsa.memReserved+addBytes > jsa.limits.MaxMemory {
|
||||
return fmt.Errorf("insufficient memory resources available")
|
||||
}
|
||||
case FileStorage:
|
||||
if jsa.storeReserved+addBytes > jsa.limits.MaxStore {
|
||||
return fmt.Errorf("insufficient storage resources available")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -58,6 +58,11 @@ const (
|
||||
JetStreamCreateStream = "$JS.STREAM.*.CREATE"
|
||||
JetStreamCreateStreamT = "$JS.STREAM.%s.CREATE"
|
||||
|
||||
// JetStreamUpdateStream is the endpoint to update existing streams.
|
||||
// Will return +OK on success and -ERR on failure.
|
||||
JetStreamUpdateStream = "$JS.STREAM.*.UPDATE"
|
||||
JetStreamUpdateStreamT = "$JS.STREAM.%s.UPDATE"
|
||||
|
||||
// JetStreamListStreams is the endpoint to list all streams for this account.
|
||||
// Will return json list of string on success and -ERR on failure.
|
||||
JetStreamListStreams = "$JS.STREAM.LIST"
|
||||
@@ -157,6 +162,7 @@ var allJsExports = []string{
|
||||
JetStreamTemplateInfo,
|
||||
JetStreamDeleteTemplate,
|
||||
JetStreamCreateStream,
|
||||
JetStreamUpdateStream,
|
||||
JetStreamListStreams,
|
||||
JetStreamStreamInfo,
|
||||
JetStreamDeleteStream,
|
||||
@@ -181,6 +187,7 @@ func (s *Server) setJetStreamExportSubs() error {
|
||||
{JetStreamTemplateInfo, s.jsTemplateInfoRequest},
|
||||
{JetStreamDeleteTemplate, s.jsTemplateDeleteRequest},
|
||||
{JetStreamCreateStream, s.jsCreateStreamRequest},
|
||||
{JetStreamUpdateStream, s.jsStreamUpdateRequest},
|
||||
{JetStreamListStreams, s.jsStreamListRequest},
|
||||
{JetStreamStreamInfo, s.jsStreamInfoRequest},
|
||||
{JetStreamDeleteStream, s.jsStreamDeleteRequest},
|
||||
@@ -370,6 +377,38 @@ func (s *Server) jsCreateStreamRequest(sub *subscription, c *client, subject, re
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), response)
|
||||
}
|
||||
|
||||
// Request to update a stream.
|
||||
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
if c == nil || c.acc == nil {
|
||||
return
|
||||
}
|
||||
if !c.acc.JetStreamEnabled() {
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), JetStreamNotEnabled)
|
||||
return
|
||||
}
|
||||
var cfg StreamConfig
|
||||
if err := json.Unmarshal(msg, &cfg); err != nil {
|
||||
s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest)
|
||||
return
|
||||
}
|
||||
streamName := subjectToken(subject, 2)
|
||||
if streamName != cfg.Name {
|
||||
s.sendInternalAccountMsg(c.acc, reply, protoErr("stream name in subject does not match request"))
|
||||
return
|
||||
}
|
||||
mset, err := c.acc.LookupStream(streamName)
|
||||
if err != nil {
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), protoErr(err))
|
||||
return
|
||||
}
|
||||
|
||||
var response = OK
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
response = protoErr(err)
|
||||
}
|
||||
s.sendAPIResponse(c, subject, reply, string(msg), response)
|
||||
}
|
||||
|
||||
// Request for the list of all streams.
|
||||
func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
if c == nil || c.acc == nil {
|
||||
|
||||
@@ -49,6 +49,35 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
|
||||
return &memStore{msgs: make(map[uint64]*storedMsg), config: *cfg}, nil
|
||||
}
|
||||
|
||||
func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
|
||||
if cfg == nil {
|
||||
return fmt.Errorf("config required")
|
||||
}
|
||||
if cfg.Storage != MemoryStorage {
|
||||
return fmt.Errorf("memStore requires memory storage type in config")
|
||||
}
|
||||
|
||||
ms.mu.Lock()
|
||||
ms.config = *cfg
|
||||
// Limits checks and enforcement.
|
||||
ms.enforceMsgLimit()
|
||||
ms.enforceBytesLimit()
|
||||
// Do age timers.
|
||||
if ms.ageChk == nil && ms.config.MaxAge != 0 {
|
||||
ms.startAgeChk()
|
||||
}
|
||||
if ms.ageChk != nil && ms.config.MaxAge == 0 {
|
||||
ms.ageChk.Stop()
|
||||
ms.ageChk = nil
|
||||
}
|
||||
ms.mu.Unlock()
|
||||
|
||||
if cfg.MaxAge != 0 {
|
||||
ms.expireMsgs()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Store stores a message.
|
||||
func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, error) {
|
||||
ms.mu.Lock()
|
||||
@@ -74,7 +103,7 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, error) {
|
||||
ms.enforceMsgLimit()
|
||||
ms.enforceBytesLimit()
|
||||
|
||||
// Check it we have and need age expiration timer running.
|
||||
// Check if we have and need the age expiration timer running.
|
||||
if ms.ageChk == nil && ms.config.MaxAge != 0 {
|
||||
ms.startAgeChk()
|
||||
}
|
||||
@@ -127,7 +156,9 @@ func (ms *memStore) enforceMsgLimit() {
|
||||
if ms.config.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.config.MaxMsgs) {
|
||||
return
|
||||
}
|
||||
ms.deleteFirstMsgOrPanic()
|
||||
for nmsgs := ms.state.Msgs; nmsgs > uint64(ms.config.MaxMsgs); nmsgs = ms.state.Msgs {
|
||||
ms.deleteFirstMsgOrPanic()
|
||||
}
|
||||
}
|
||||
|
||||
// Will check the bytes limit and drop msgs if needed.
|
||||
|
||||
@@ -47,6 +47,7 @@ type StreamStore interface {
|
||||
GetSeqFromTime(t time.Time) uint64
|
||||
State() StreamState
|
||||
StorageBytesUpdate(func(int64))
|
||||
UpdateConfig(cfg *StreamConfig) error
|
||||
Delete() error
|
||||
Stop() error
|
||||
ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error)
|
||||
|
||||
138
server/stream.go
138
server/stream.go
@@ -95,9 +95,6 @@ func (a *Account) AddStream(config *StreamConfig) (*Stream, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if len(cfg.Subjects) == 0 {
|
||||
cfg.Subjects = append(cfg.Subjects, cfg.Name)
|
||||
}
|
||||
// Check for overlapping subjects. These are not allowed for now.
|
||||
if jsa.subjectsOverlap(cfg.Subjects) {
|
||||
jsa.mu.Unlock()
|
||||
@@ -152,17 +149,19 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
|
||||
if config == nil {
|
||||
return StreamConfig{}, fmt.Errorf("stream configuration invalid")
|
||||
}
|
||||
|
||||
if !isValidName(config.Name) {
|
||||
return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'")
|
||||
}
|
||||
|
||||
cfg := *config
|
||||
|
||||
// TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode.
|
||||
if cfg.Replicas == 0 {
|
||||
cfg.Replicas = 1
|
||||
}
|
||||
// TODO(dlc) - Remove when clustering happens.
|
||||
if cfg.Replicas > 1 {
|
||||
return StreamConfig{}, fmt.Errorf("maximum replicas is 1")
|
||||
}
|
||||
if cfg.Replicas > StreamMaxReplicas {
|
||||
return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)
|
||||
}
|
||||
@@ -175,6 +174,21 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
|
||||
if cfg.MaxMsgSize == 0 {
|
||||
cfg.MaxMsgSize = -1
|
||||
}
|
||||
if cfg.MaxConsumers == 0 {
|
||||
cfg.MaxConsumers = -1
|
||||
}
|
||||
if len(cfg.Subjects) == 0 {
|
||||
cfg.Subjects = append(cfg.Subjects, cfg.Name)
|
||||
} else {
|
||||
// We can allow overlaps, but don't allow direct duplicates.
|
||||
dset := make(map[string]struct{}, len(cfg.Subjects))
|
||||
for _, subj := range cfg.Subjects {
|
||||
if _, ok := dset[subj]; ok {
|
||||
return StreamConfig{}, fmt.Errorf("duplicate subjects detected")
|
||||
}
|
||||
dset[subj] = struct{}{}
|
||||
}
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
@@ -200,6 +214,89 @@ func (mset *Stream) Delete() error {
|
||||
return mset.delete()
|
||||
}
|
||||
|
||||
// Update will allow certain configuration properties of an existing stream to be updated.
|
||||
func (mset *Stream) Update(config *StreamConfig) error {
|
||||
cfg, err := checkStreamCfg(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o_cfg := mset.Config()
|
||||
|
||||
// Name must match.
|
||||
if cfg.Name != o_cfg.Name {
|
||||
return fmt.Errorf("stream configuration name must match original")
|
||||
}
|
||||
// Can't change MaxConsumers for now.
|
||||
if cfg.MaxConsumers != o_cfg.MaxConsumers {
|
||||
return fmt.Errorf("stream configuration update can not change MaxConsumers")
|
||||
}
|
||||
// Can't change storage types.
|
||||
if cfg.Storage != o_cfg.Storage {
|
||||
return fmt.Errorf("stream configuration update can not change storage type")
|
||||
}
|
||||
// Can't change retention.
|
||||
if cfg.Retention != o_cfg.Retention {
|
||||
return fmt.Errorf("stream configuration update can not change retention policy")
|
||||
}
|
||||
// Can not have a template owner for now.
|
||||
if o_cfg.Template != "" {
|
||||
return fmt.Errorf("stream configuration update not allowed on template owned stream")
|
||||
}
|
||||
if cfg.Template != "" {
|
||||
return fmt.Errorf("stream configuration update can not be owned by a template")
|
||||
}
|
||||
|
||||
// Check limits.
|
||||
mset.mu.Lock()
|
||||
jsa := mset.jsa
|
||||
mset.mu.Unlock()
|
||||
|
||||
jsa.mu.Lock()
|
||||
if cfg.MaxConsumers > 0 && cfg.MaxConsumers > jsa.limits.MaxConsumers {
|
||||
jsa.mu.Unlock()
|
||||
return fmt.Errorf("stream configuration maximum consumers exceeds account limit")
|
||||
}
|
||||
if cfg.MaxBytes > 0 && cfg.MaxBytes > o_cfg.MaxBytes {
|
||||
if err := jsa.checkBytesLimits(cfg.MaxBytes*int64(cfg.Replicas), cfg.Storage); err != nil {
|
||||
jsa.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
jsa.mu.Unlock()
|
||||
|
||||
// Now check for subject interest differences.
|
||||
current := make(map[string]struct{}, len(o_cfg.Subjects))
|
||||
for _, s := range o_cfg.Subjects {
|
||||
current[s] = struct{}{}
|
||||
}
|
||||
// Update config with new values. The store update will enforce any stricter limits.
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
|
||||
// Now walk new subjects. All of these need to be added, but we will check
|
||||
// the originals first, since if it is in there we can skip, already added.
|
||||
for _, s := range cfg.Subjects {
|
||||
if _, ok := current[s]; !ok {
|
||||
if _, err := mset.subscribeInternal(s, mset.processInboundJetStreamMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
delete(current, s)
|
||||
}
|
||||
// What is left in current needs to be deleted.
|
||||
for s := range current {
|
||||
if err := mset.unsubscribeInternal(s); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Now update config and store's version of our config.
|
||||
mset.config = cfg
|
||||
mset.store.UpdateConfig(&cfg)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Purge will remove all messages from the stream and underlying store.
|
||||
func (mset *Stream) Purge() uint64 {
|
||||
mset.mu.Lock()
|
||||
@@ -254,6 +351,7 @@ func (mset *Stream) subscribeToStream() error {
|
||||
}
|
||||
|
||||
// FIXME(dlc) - This only works in single server mode for the moment. Need to fix as we expand to clusters.
|
||||
// Lock should be held.
|
||||
func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
|
||||
c := mset.client
|
||||
if c == nil {
|
||||
@@ -279,6 +377,36 @@ func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscript
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// This will unsubscribe us from the exact subject given.
|
||||
// We do not currently track the subs so do not have the sid.
|
||||
// This should be called only on an update.
|
||||
// Lock should be held.
|
||||
func (mset *Stream) unsubscribeInternal(subject string) error {
|
||||
c := mset.client
|
||||
if c == nil {
|
||||
return fmt.Errorf("invalid stream")
|
||||
}
|
||||
if !c.srv.eventsEnabled() {
|
||||
return ErrNoSysAccount
|
||||
}
|
||||
|
||||
var sid []byte
|
||||
|
||||
c.mu.Lock()
|
||||
for _, sub := range c.subs {
|
||||
if subject == string(sub.subject) {
|
||||
sid = sub.sid
|
||||
break
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if sid != nil {
|
||||
return c.processUnsub(sid)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *Stream) unsubscribe(sub *subscription) {
|
||||
if sub == nil || mset.client == nil {
|
||||
|
||||
@@ -834,7 +834,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamSubjecting(t *testing.T) {
|
||||
func TestJetStreamSubjectFiltering(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
mconfig *server.StreamConfig
|
||||
@@ -906,7 +906,7 @@ func TestJetStreamSubjecting(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamWorkQueueSubjecting(t *testing.T) {
|
||||
func TestJetStreamWorkQueueSubjectFiltering(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
mconfig *server.StreamConfig
|
||||
@@ -2067,7 +2067,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamDurableSubjectedConsumerReconnect(t *testing.T) {
|
||||
func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
mconfig *server.StreamConfig
|
||||
@@ -3447,6 +3447,16 @@ func TestJetStreamRequestAPI(t *testing.T) {
|
||||
t.Fatalf("Got wrong error response: %q", resp.Data)
|
||||
}
|
||||
|
||||
// Check that update works.
|
||||
msetCfg.Subjects = []string{"foo", "bar", "baz"}
|
||||
msetCfg.MaxBytes = 2222222
|
||||
req, err = json.Marshal(msetCfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
resp, _ = nc.Request(fmt.Sprintf(server.JetStreamUpdateStreamT, msetCfg.Name), req, time.Second)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
// Now lookup info again and see that we can see the new stream.
|
||||
resp, err = nc.Request(server.JetStreamInfo, nil, time.Second)
|
||||
if err != nil {
|
||||
@@ -3756,6 +3766,228 @@ func TestJetStreamRequestAPI(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamUpdateStream(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
mconfig *server.StreamConfig
|
||||
}{
|
||||
{name: "MemoryStore",
|
||||
mconfig: &server.StreamConfig{
|
||||
Name: "foo",
|
||||
Retention: server.LimitsPolicy,
|
||||
MaxAge: time.Hour,
|
||||
Storage: server.MemoryStorage,
|
||||
Replicas: 1,
|
||||
}},
|
||||
{name: "FileStore",
|
||||
mconfig: &server.StreamConfig{
|
||||
Name: "foo",
|
||||
Retention: server.LimitsPolicy,
|
||||
MaxAge: time.Hour,
|
||||
Storage: server.FileStorage,
|
||||
Replicas: 1,
|
||||
}},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil && config.StoreDir != "" {
|
||||
defer os.RemoveAll(config.StoreDir)
|
||||
}
|
||||
|
||||
mset, err := s.GlobalAccount().AddStream(c.mconfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding stream: %v", err)
|
||||
}
|
||||
defer mset.Delete()
|
||||
|
||||
// Test basic updates. We allow changing the subjects, limits, and no_ack along with replicas(TBD w/ cluster)
|
||||
cfg := *c.mconfig
|
||||
|
||||
// Can't change name.
|
||||
cfg.Name = "bar"
|
||||
if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "name must match") {
|
||||
t.Fatalf("Expected error trying to update name")
|
||||
}
|
||||
// Can't change max consumers for now.
|
||||
cfg = *c.mconfig
|
||||
cfg.MaxConsumers = 10
|
||||
if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "can not change") {
|
||||
t.Fatalf("Expected error trying to change MaxConsumers")
|
||||
}
|
||||
// Can't change storage types.
|
||||
cfg = *c.mconfig
|
||||
if cfg.Storage == server.FileStorage {
|
||||
cfg.Storage = server.MemoryStorage
|
||||
} else {
|
||||
cfg.Storage = server.FileStorage
|
||||
}
|
||||
if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "can not change") {
|
||||
t.Fatalf("Expected error trying to change Storage")
|
||||
}
|
||||
// Can't change replicas > 1 for now.
|
||||
cfg = *c.mconfig
|
||||
cfg.Replicas = 10
|
||||
if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "maximum replicas") {
|
||||
t.Fatalf("Expected error trying to change Replicas")
|
||||
}
|
||||
// Can't have a template set for now.
|
||||
cfg = *c.mconfig
|
||||
cfg.Template = "baz"
|
||||
if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "template") {
|
||||
t.Fatalf("Expected error trying to change Template owner")
|
||||
}
|
||||
// Can't change limits policy.
|
||||
cfg = *c.mconfig
|
||||
cfg.Retention = server.WorkQueuePolicy
|
||||
if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "can not change") {
|
||||
t.Fatalf("Expected error trying to change Retention")
|
||||
}
|
||||
|
||||
// Now test changing limits.
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
pending := uint64(100)
|
||||
for i := uint64(0); i < pending; i++ {
|
||||
sendStreamMsg(t, nc, "foo", "0123456789")
|
||||
}
|
||||
pendingBytes := mset.State().Bytes
|
||||
|
||||
checkPending := func(msgs, bts uint64) {
|
||||
t.Helper()
|
||||
state := mset.State()
|
||||
if state.Msgs != msgs {
|
||||
t.Fatalf("Expected %d messages, got %d", msgs, state.Msgs)
|
||||
}
|
||||
if state.Bytes != bts {
|
||||
t.Fatalf("Expected %d bytes, got %d", bts, state.Bytes)
|
||||
}
|
||||
}
|
||||
checkPending(pending, pendingBytes)
|
||||
|
||||
// Update msgs to higher.
|
||||
cfg = *c.mconfig
|
||||
cfg.MaxMsgs = int64(pending * 2)
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
if mset.Config().MaxMsgs != cfg.MaxMsgs {
|
||||
t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxMsgs, cfg.MaxMsgs)
|
||||
}
|
||||
checkPending(pending, pendingBytes)
|
||||
|
||||
// Update msgs to lower.
|
||||
cfg = *c.mconfig
|
||||
cfg.MaxMsgs = int64(pending / 2)
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
if mset.Config().MaxMsgs != cfg.MaxMsgs {
|
||||
t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxMsgs, cfg.MaxMsgs)
|
||||
}
|
||||
checkPending(pending/2, pendingBytes/2)
|
||||
// Now do bytes.
|
||||
cfg = *c.mconfig
|
||||
cfg.MaxBytes = int64(pendingBytes / 4)
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
if mset.Config().MaxBytes != cfg.MaxBytes {
|
||||
t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxBytes, cfg.MaxBytes)
|
||||
}
|
||||
checkPending(pending/4, pendingBytes/4)
|
||||
|
||||
// Now do age.
|
||||
cfg = *c.mconfig
|
||||
cfg.MaxAge = time.Millisecond
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
// Just wait a bit for expiration.
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
if mset.Config().MaxAge != cfg.MaxAge {
|
||||
t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxAge, cfg.MaxAge)
|
||||
}
|
||||
checkPending(0, 0)
|
||||
|
||||
// Now put back to original.
|
||||
cfg = *c.mconfig
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
for i := uint64(0); i < pending; i++ {
|
||||
sendStreamMsg(t, nc, "foo", "0123456789")
|
||||
}
|
||||
|
||||
// subject changes.
|
||||
// Add in a subject first.
|
||||
cfg = *c.mconfig
|
||||
cfg.Subjects = []string{"foo", "bar"}
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
// Make sure we can still send to foo.
|
||||
sendStreamMsg(t, nc, "foo", "0123456789")
|
||||
// And we can now send to bar.
|
||||
sendStreamMsg(t, nc, "bar", "0123456789")
|
||||
// Now delete both and change to baz only.
|
||||
cfg.Subjects = []string{"baz"}
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
// Make sure we do not get response acks for "foo" or "bar".
|
||||
if resp, err := nc.Request("foo", nil, 25*time.Millisecond); err == nil || resp != nil {
|
||||
t.Fatalf("Expected no response from jetstream for deleted subject: %q", "foo")
|
||||
}
|
||||
if resp, err := nc.Request("bar", nil, 25*time.Millisecond); err == nil || resp != nil {
|
||||
t.Fatalf("Expected no response from jetstream for deleted subject: %q", "bar")
|
||||
}
|
||||
// Make sure we can send to "baz"
|
||||
sendStreamMsg(t, nc, "baz", "0123456789")
|
||||
if nmsgs := mset.State().Msgs; nmsgs != pending+3 {
|
||||
t.Fatalf("Expected %d msgs, got %d", pending+3, nmsgs)
|
||||
}
|
||||
|
||||
// FileStore restarts for config save.
|
||||
cfg = *c.mconfig
|
||||
if cfg.Storage == server.FileStorage {
|
||||
cfg.Subjects = []string{"foo", "bar"}
|
||||
cfg.MaxMsgs = 2222
|
||||
cfg.MaxBytes = 3333333
|
||||
cfg.MaxAge = 22 * time.Hour
|
||||
if err := mset.Update(&cfg); err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
// Pull since certain defaults etc are set in processing.
|
||||
cfg = mset.Config()
|
||||
|
||||
// Restart the server.
|
||||
// Capture port since it was dynamic.
|
||||
u, _ := url.Parse(s.ClientURL())
|
||||
port, _ := strconv.Atoi(u.Port())
|
||||
|
||||
// Stop current server.
|
||||
s.Shutdown()
|
||||
// Restart.
|
||||
s = RunJetStreamServerOnPort(port)
|
||||
defer s.Shutdown()
|
||||
|
||||
mset, err = s.GlobalAccount().LookupStream(cfg.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to find a stream for %q", cfg.Name)
|
||||
}
|
||||
restored_cfg := mset.Config()
|
||||
if !reflect.DeepEqual(cfg, restored_cfg) {
|
||||
t.Fatalf("restored configuration does not match: \n%+v\n vs \n%+v", restored_cfg, cfg)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamDeleteMsg(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
|
||||
Reference in New Issue
Block a user