diff --git a/server/const.go b/server/const.go index f5511ad1..2b4df1b2 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-beta.2" + VERSION = "2.2.0-beta.3" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/filestore.go b/server/filestore.go index be9f65fd..e7232a74 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -207,7 +207,7 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { if err := fs.recoverMsgs(); err != nil { return nil, err } - // Write our meta data iff new. + // Write our meta data. if err := fs.writeMsgSetMeta(); err != nil { return nil, err } @@ -219,6 +219,41 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { return fs, nil } +func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { + if cfg.Name == "" { + return fmt.Errorf("name required") + } + if cfg.Storage != FileStorage { + return fmt.Errorf("fileStore requires file storage type in config") + } + + fs.mu.Lock() + old_cfg := fs.cfg + fs.cfg = *cfg + if err := fs.writeMsgSetMeta(); err != nil { + fs.cfg = old_cfg + fs.mu.Unlock() + return err + } + // Limits checks and enforcement. + fs.enforceMsgLimit() + fs.enforceBytesLimit() + // Do age timers. + if fs.ageChk == nil && fs.cfg.MaxAge != 0 { + fs.startAgeChk() + } + if fs.ageChk != nil && fs.cfg.MaxAge == 0 { + fs.ageChk.Stop() + fs.ageChk = nil + } + fs.mu.Unlock() + + if cfg.MaxAge != 0 { + fs.expireMsgs() + } + return nil +} + func dynBlkSize(retention RetentionPolicy, maxBytes int64) uint64 { if retention == LimitsPolicy { // TODO(dlc) - Make the blocksize relative to this if set. @@ -232,7 +267,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64) uint64 { // Write out meta and the checksum. func (fs *fileStore) writeMsgSetMeta() error { meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile) - if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil { + if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) { return err } b, err := json.MarshalIndent(fs.cfg, _EMPTY_, " ") @@ -504,7 +539,9 @@ func (fs *fileStore) enforceMsgLimit() { if fs.cfg.MaxMsgs <= 0 || fs.state.Msgs <= uint64(fs.cfg.MaxMsgs) { return } - fs.deleteFirstMsg() + for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs { + fs.deleteFirstMsg() + } } // Will check the bytes limit and drop msgs if needed. diff --git a/server/jetstream.go b/server/jetstream.go index 94c179b6..ac963cbd 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -700,16 +700,23 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error { } // Check storage, memory or disk. if config.MaxBytes > 0 { - mb := config.MaxBytes * int64(config.Replicas) - switch config.Storage { - case MemoryStorage: - if jsa.memReserved+mb > jsa.limits.MaxMemory { - return fmt.Errorf("insufficient memory resources available") - } - case FileStorage: - if jsa.storeReserved+mb > jsa.limits.MaxStore { - return fmt.Errorf("insufficient storage resources available") - } + return jsa.checkBytesLimits(config.MaxBytes*int64(config.Replicas), config.Storage) + } + return nil +} + +// Check if additional bytes will exceed our account limits. +// This should account for replicas. +// Lock should be held. +func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType) error { + switch storage { + case MemoryStorage: + if jsa.memReserved+addBytes > jsa.limits.MaxMemory { + return fmt.Errorf("insufficient memory resources available") + } + case FileStorage: + if jsa.storeReserved+addBytes > jsa.limits.MaxStore { + return fmt.Errorf("insufficient storage resources available") } } return nil diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d9944e76..006c51a2 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -58,6 +58,11 @@ const ( JetStreamCreateStream = "$JS.STREAM.*.CREATE" JetStreamCreateStreamT = "$JS.STREAM.%s.CREATE" + // JetStreamUpdateStream is the endpoint to update existing streams. + // Will return +OK on success and -ERR on failure. + JetStreamUpdateStream = "$JS.STREAM.*.UPDATE" + JetStreamUpdateStreamT = "$JS.STREAM.%s.UPDATE" + // JetStreamListStreams is the endpoint to list all streams for this account. // Will return json list of string on success and -ERR on failure. JetStreamListStreams = "$JS.STREAM.LIST" @@ -157,6 +162,7 @@ var allJsExports = []string{ JetStreamTemplateInfo, JetStreamDeleteTemplate, JetStreamCreateStream, + JetStreamUpdateStream, JetStreamListStreams, JetStreamStreamInfo, JetStreamDeleteStream, @@ -181,6 +187,7 @@ func (s *Server) setJetStreamExportSubs() error { {JetStreamTemplateInfo, s.jsTemplateInfoRequest}, {JetStreamDeleteTemplate, s.jsTemplateDeleteRequest}, {JetStreamCreateStream, s.jsCreateStreamRequest}, + {JetStreamUpdateStream, s.jsStreamUpdateRequest}, {JetStreamListStreams, s.jsStreamListRequest}, {JetStreamStreamInfo, s.jsStreamInfoRequest}, {JetStreamDeleteStream, s.jsStreamDeleteRequest}, @@ -370,6 +377,38 @@ func (s *Server) jsCreateStreamRequest(sub *subscription, c *client, subject, re s.sendAPIResponse(c, subject, reply, string(msg), response) } +// Request to update a stream. +func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, reply string, msg []byte) { + if c == nil || c.acc == nil { + return + } + if !c.acc.JetStreamEnabled() { + s.sendAPIResponse(c, subject, reply, string(msg), JetStreamNotEnabled) + return + } + var cfg StreamConfig + if err := json.Unmarshal(msg, &cfg); err != nil { + s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest) + return + } + streamName := subjectToken(subject, 2) + if streamName != cfg.Name { + s.sendInternalAccountMsg(c.acc, reply, protoErr("stream name in subject does not match request")) + return + } + mset, err := c.acc.LookupStream(streamName) + if err != nil { + s.sendAPIResponse(c, subject, reply, string(msg), protoErr(err)) + return + } + + var response = OK + if err := mset.Update(&cfg); err != nil { + response = protoErr(err) + } + s.sendAPIResponse(c, subject, reply, string(msg), response) +} + // Request for the list of all streams. func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, reply string, msg []byte) { if c == nil || c.acc == nil { diff --git a/server/memstore.go b/server/memstore.go index 2bf0fd48..60a9f8ad 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -49,6 +49,35 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) { return &memStore{msgs: make(map[uint64]*storedMsg), config: *cfg}, nil } +func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { + if cfg == nil { + return fmt.Errorf("config required") + } + if cfg.Storage != MemoryStorage { + return fmt.Errorf("memStore requires memory storage type in config") + } + + ms.mu.Lock() + ms.config = *cfg + // Limits checks and enforcement. + ms.enforceMsgLimit() + ms.enforceBytesLimit() + // Do age timers. + if ms.ageChk == nil && ms.config.MaxAge != 0 { + ms.startAgeChk() + } + if ms.ageChk != nil && ms.config.MaxAge == 0 { + ms.ageChk.Stop() + ms.ageChk = nil + } + ms.mu.Unlock() + + if cfg.MaxAge != 0 { + ms.expireMsgs() + } + return nil +} + // Store stores a message. func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, error) { ms.mu.Lock() @@ -74,7 +103,7 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, error) { ms.enforceMsgLimit() ms.enforceBytesLimit() - // Check it we have and need age expiration timer running. + // Check if we have and need the age expiration timer running. if ms.ageChk == nil && ms.config.MaxAge != 0 { ms.startAgeChk() } @@ -127,7 +156,9 @@ func (ms *memStore) enforceMsgLimit() { if ms.config.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.config.MaxMsgs) { return } - ms.deleteFirstMsgOrPanic() + for nmsgs := ms.state.Msgs; nmsgs > uint64(ms.config.MaxMsgs); nmsgs = ms.state.Msgs { + ms.deleteFirstMsgOrPanic() + } } // Will check the bytes limit and drop msgs if needed. diff --git a/server/store.go b/server/store.go index 6ddc4130..030034a0 100644 --- a/server/store.go +++ b/server/store.go @@ -47,6 +47,7 @@ type StreamStore interface { GetSeqFromTime(t time.Time) uint64 State() StreamState StorageBytesUpdate(func(int64)) + UpdateConfig(cfg *StreamConfig) error Delete() error Stop() error ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error) diff --git a/server/stream.go b/server/stream.go index 22f5c36c..3fa0e7e9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -95,9 +95,6 @@ func (a *Account) AddStream(config *StreamConfig) (*Stream, error) { } } - if len(cfg.Subjects) == 0 { - cfg.Subjects = append(cfg.Subjects, cfg.Name) - } // Check for overlapping subjects. These are not allowed for now. if jsa.subjectsOverlap(cfg.Subjects) { jsa.mu.Unlock() @@ -152,17 +149,19 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) { if config == nil { return StreamConfig{}, fmt.Errorf("stream configuration invalid") } - if !isValidName(config.Name) { return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'") } - cfg := *config // TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode. if cfg.Replicas == 0 { cfg.Replicas = 1 } + // TODO(dlc) - Remove when clustering happens. + if cfg.Replicas > 1 { + return StreamConfig{}, fmt.Errorf("maximum replicas is 1") + } if cfg.Replicas > StreamMaxReplicas { return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas) } @@ -175,6 +174,21 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) { if cfg.MaxMsgSize == 0 { cfg.MaxMsgSize = -1 } + if cfg.MaxConsumers == 0 { + cfg.MaxConsumers = -1 + } + if len(cfg.Subjects) == 0 { + cfg.Subjects = append(cfg.Subjects, cfg.Name) + } else { + // We can allow overlaps, but don't allow direct duplicates. + dset := make(map[string]struct{}, len(cfg.Subjects)) + for _, subj := range cfg.Subjects { + if _, ok := dset[subj]; ok { + return StreamConfig{}, fmt.Errorf("duplicate subjects detected") + } + dset[subj] = struct{}{} + } + } return cfg, nil } @@ -200,6 +214,89 @@ func (mset *Stream) Delete() error { return mset.delete() } +// Update will allow certain configuration properties of an existing stream to be updated. +func (mset *Stream) Update(config *StreamConfig) error { + cfg, err := checkStreamCfg(config) + if err != nil { + return err + } + o_cfg := mset.Config() + + // Name must match. + if cfg.Name != o_cfg.Name { + return fmt.Errorf("stream configuration name must match original") + } + // Can't change MaxConsumers for now. + if cfg.MaxConsumers != o_cfg.MaxConsumers { + return fmt.Errorf("stream configuration update can not change MaxConsumers") + } + // Can't change storage types. + if cfg.Storage != o_cfg.Storage { + return fmt.Errorf("stream configuration update can not change storage type") + } + // Can't change retention. + if cfg.Retention != o_cfg.Retention { + return fmt.Errorf("stream configuration update can not change retention policy") + } + // Can not have a template owner for now. + if o_cfg.Template != "" { + return fmt.Errorf("stream configuration update not allowed on template owned stream") + } + if cfg.Template != "" { + return fmt.Errorf("stream configuration update can not be owned by a template") + } + + // Check limits. + mset.mu.Lock() + jsa := mset.jsa + mset.mu.Unlock() + + jsa.mu.Lock() + if cfg.MaxConsumers > 0 && cfg.MaxConsumers > jsa.limits.MaxConsumers { + jsa.mu.Unlock() + return fmt.Errorf("stream configuration maximum consumers exceeds account limit") + } + if cfg.MaxBytes > 0 && cfg.MaxBytes > o_cfg.MaxBytes { + if err := jsa.checkBytesLimits(cfg.MaxBytes*int64(cfg.Replicas), cfg.Storage); err != nil { + jsa.mu.Unlock() + return err + } + } + jsa.mu.Unlock() + + // Now check for subject interest differences. + current := make(map[string]struct{}, len(o_cfg.Subjects)) + for _, s := range o_cfg.Subjects { + current[s] = struct{}{} + } + // Update config with new values. The store update will enforce any stricter limits. + mset.mu.Lock() + defer mset.mu.Unlock() + + // Now walk new subjects. All of these need to be added, but we will check + // the originals first, since if it is in there we can skip, already added. + for _, s := range cfg.Subjects { + if _, ok := current[s]; !ok { + if _, err := mset.subscribeInternal(s, mset.processInboundJetStreamMsg); err != nil { + return err + } + } + delete(current, s) + } + // What is left in current needs to be deleted. + for s := range current { + if err := mset.unsubscribeInternal(s); err != nil { + return err + } + } + + // Now update config and store's version of our config. + mset.config = cfg + mset.store.UpdateConfig(&cfg) + + return nil +} + // Purge will remove all messages from the stream and underlying store. func (mset *Stream) Purge() uint64 { mset.mu.Lock() @@ -254,6 +351,7 @@ func (mset *Stream) subscribeToStream() error { } // FIXME(dlc) - This only works in single server mode for the moment. Need to fix as we expand to clusters. +// Lock should be held. func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) { c := mset.client if c == nil { @@ -279,6 +377,36 @@ func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscript return sub, nil } +// This will unsubscribe us from the exact subject given. +// We do not currently track the subs so do not have the sid. +// This should be called only on an update. +// Lock should be held. +func (mset *Stream) unsubscribeInternal(subject string) error { + c := mset.client + if c == nil { + return fmt.Errorf("invalid stream") + } + if !c.srv.eventsEnabled() { + return ErrNoSysAccount + } + + var sid []byte + + c.mu.Lock() + for _, sub := range c.subs { + if subject == string(sub.subject) { + sid = sub.sid + break + } + } + c.mu.Unlock() + + if sid != nil { + return c.processUnsub(sid) + } + return nil +} + // Lock should be held. func (mset *Stream) unsubscribe(sub *subscription) { if sub == nil || mset.client == nil { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 4bfd822e..81b33108 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -834,7 +834,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { } } -func TestJetStreamSubjecting(t *testing.T) { +func TestJetStreamSubjectFiltering(t *testing.T) { cases := []struct { name string mconfig *server.StreamConfig @@ -906,7 +906,7 @@ func TestJetStreamSubjecting(t *testing.T) { } } -func TestJetStreamWorkQueueSubjecting(t *testing.T) { +func TestJetStreamWorkQueueSubjectFiltering(t *testing.T) { cases := []struct { name string mconfig *server.StreamConfig @@ -2067,7 +2067,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { } } -func TestJetStreamDurableSubjectedConsumerReconnect(t *testing.T) { +func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) { cases := []struct { name string mconfig *server.StreamConfig @@ -3447,6 +3447,16 @@ func TestJetStreamRequestAPI(t *testing.T) { t.Fatalf("Got wrong error response: %q", resp.Data) } + // Check that update works. + msetCfg.Subjects = []string{"foo", "bar", "baz"} + msetCfg.MaxBytes = 2222222 + req, err = json.Marshal(msetCfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resp, _ = nc.Request(fmt.Sprintf(server.JetStreamUpdateStreamT, msetCfg.Name), req, time.Second) + expectOKResponse(t, resp) + // Now lookup info again and see that we can see the new stream. resp, err = nc.Request(server.JetStreamInfo, nil, time.Second) if err != nil { @@ -3756,6 +3766,228 @@ func TestJetStreamRequestAPI(t *testing.T) { } } +func TestJetStreamUpdateStream(t *testing.T) { + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {name: "MemoryStore", + mconfig: &server.StreamConfig{ + Name: "foo", + Retention: server.LimitsPolicy, + MaxAge: time.Hour, + Storage: server.MemoryStorage, + Replicas: 1, + }}, + {name: "FileStore", + mconfig: &server.StreamConfig{ + Name: "foo", + Retention: server.LimitsPolicy, + MaxAge: time.Hour, + Storage: server.FileStorage, + Replicas: 1, + }}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil && config.StoreDir != "" { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + // Test basic updates. We allow changing the subjects, limits, and no_ack along with replicas(TBD w/ cluster) + cfg := *c.mconfig + + // Can't change name. + cfg.Name = "bar" + if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "name must match") { + t.Fatalf("Expected error trying to update name") + } + // Can't change max consumers for now. + cfg = *c.mconfig + cfg.MaxConsumers = 10 + if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "can not change") { + t.Fatalf("Expected error trying to change MaxConsumers") + } + // Can't change storage types. + cfg = *c.mconfig + if cfg.Storage == server.FileStorage { + cfg.Storage = server.MemoryStorage + } else { + cfg.Storage = server.FileStorage + } + if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "can not change") { + t.Fatalf("Expected error trying to change Storage") + } + // Can't change replicas > 1 for now. + cfg = *c.mconfig + cfg.Replicas = 10 + if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "maximum replicas") { + t.Fatalf("Expected error trying to change Replicas") + } + // Can't have a template set for now. + cfg = *c.mconfig + cfg.Template = "baz" + if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "template") { + t.Fatalf("Expected error trying to change Template owner") + } + // Can't change limits policy. + cfg = *c.mconfig + cfg.Retention = server.WorkQueuePolicy + if err := mset.Update(&cfg); err == nil || !strings.Contains(err.Error(), "can not change") { + t.Fatalf("Expected error trying to change Retention") + } + + // Now test changing limits. + nc := clientConnectToServer(t, s) + defer nc.Close() + + pending := uint64(100) + for i := uint64(0); i < pending; i++ { + sendStreamMsg(t, nc, "foo", "0123456789") + } + pendingBytes := mset.State().Bytes + + checkPending := func(msgs, bts uint64) { + t.Helper() + state := mset.State() + if state.Msgs != msgs { + t.Fatalf("Expected %d messages, got %d", msgs, state.Msgs) + } + if state.Bytes != bts { + t.Fatalf("Expected %d bytes, got %d", bts, state.Bytes) + } + } + checkPending(pending, pendingBytes) + + // Update msgs to higher. + cfg = *c.mconfig + cfg.MaxMsgs = int64(pending * 2) + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + if mset.Config().MaxMsgs != cfg.MaxMsgs { + t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxMsgs, cfg.MaxMsgs) + } + checkPending(pending, pendingBytes) + + // Update msgs to lower. + cfg = *c.mconfig + cfg.MaxMsgs = int64(pending / 2) + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + if mset.Config().MaxMsgs != cfg.MaxMsgs { + t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxMsgs, cfg.MaxMsgs) + } + checkPending(pending/2, pendingBytes/2) + // Now do bytes. + cfg = *c.mconfig + cfg.MaxBytes = int64(pendingBytes / 4) + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + if mset.Config().MaxBytes != cfg.MaxBytes { + t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxBytes, cfg.MaxBytes) + } + checkPending(pending/4, pendingBytes/4) + + // Now do age. + cfg = *c.mconfig + cfg.MaxAge = time.Millisecond + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + // Just wait a bit for expiration. + time.Sleep(5 * time.Millisecond) + if mset.Config().MaxAge != cfg.MaxAge { + t.Fatalf("Expected the change to take effect, %d vs %d", mset.Config().MaxAge, cfg.MaxAge) + } + checkPending(0, 0) + + // Now put back to original. + cfg = *c.mconfig + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + for i := uint64(0); i < pending; i++ { + sendStreamMsg(t, nc, "foo", "0123456789") + } + + // subject changes. + // Add in a subject first. + cfg = *c.mconfig + cfg.Subjects = []string{"foo", "bar"} + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + // Make sure we can still send to foo. + sendStreamMsg(t, nc, "foo", "0123456789") + // And we can now send to bar. + sendStreamMsg(t, nc, "bar", "0123456789") + // Now delete both and change to baz only. + cfg.Subjects = []string{"baz"} + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + // Make sure we do not get response acks for "foo" or "bar". + if resp, err := nc.Request("foo", nil, 25*time.Millisecond); err == nil || resp != nil { + t.Fatalf("Expected no response from jetstream for deleted subject: %q", "foo") + } + if resp, err := nc.Request("bar", nil, 25*time.Millisecond); err == nil || resp != nil { + t.Fatalf("Expected no response from jetstream for deleted subject: %q", "bar") + } + // Make sure we can send to "baz" + sendStreamMsg(t, nc, "baz", "0123456789") + if nmsgs := mset.State().Msgs; nmsgs != pending+3 { + t.Fatalf("Expected %d msgs, got %d", pending+3, nmsgs) + } + + // FileStore restarts for config save. + cfg = *c.mconfig + if cfg.Storage == server.FileStorage { + cfg.Subjects = []string{"foo", "bar"} + cfg.MaxMsgs = 2222 + cfg.MaxBytes = 3333333 + cfg.MaxAge = 22 * time.Hour + if err := mset.Update(&cfg); err != nil { + t.Fatalf("Unexpected error %v", err) + } + // Pull since certain defaults etc are set in processing. + cfg = mset.Config() + + // Restart the server. + // Capture port since it was dynamic. + u, _ := url.Parse(s.ClientURL()) + port, _ := strconv.Atoi(u.Port()) + + // Stop current server. + s.Shutdown() + // Restart. + s = RunJetStreamServerOnPort(port) + defer s.Shutdown() + + mset, err = s.GlobalAccount().LookupStream(cfg.Name) + if err != nil { + t.Fatalf("Expected to find a stream for %q", cfg.Name) + } + restored_cfg := mset.Config() + if !reflect.DeepEqual(cfg, restored_cfg) { + t.Fatalf("restored configuration does not match: \n%+v\n vs \n%+v", restored_cfg, cfg) + } + } + }) + } +} + func TestJetStreamDeleteMsg(t *testing.T) { cases := []struct { name string