diff --git a/server/filestore.go b/server/filestore.go index 33a230dc..3e943381 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -107,6 +107,35 @@ func (alg StoreCompression) String() string { } } +func (alg StoreCompression) MarshalJSON() ([]byte, error) { + var str string + switch alg { + case S2Compression: + str = "s2" + case NoCompression: + str = "none" + default: + return nil, fmt.Errorf("unknown compression algorithm") + } + return json.Marshal(str) +} + +func (alg *StoreCompression) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err != nil { + return err + } + switch str { + case "s2": + *alg = S2Compression + case "none": + *alg = NoCompression + default: + return fmt.Errorf("unknown compression algorithm") + } + return nil +} + // File ConsumerInfo is used for creating consumer stores. type FileConsumerInfo struct { Created time.Time diff --git a/server/stream.go b/server/stream.go index e229188e..e190508e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -38,25 +38,26 @@ import ( // StreamConfig will determine the name, subjects and retention policy // for a given stream. If subjects is empty the name will be used. type StreamConfig struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Subjects []string `json:"subjects,omitempty"` - Retention RetentionPolicy `json:"retention"` - MaxConsumers int `json:"max_consumers"` - MaxMsgs int64 `json:"max_msgs"` - MaxBytes int64 `json:"max_bytes"` - MaxAge time.Duration `json:"max_age"` - MaxMsgsPer int64 `json:"max_msgs_per_subject"` - MaxMsgSize int32 `json:"max_msg_size,omitempty"` - Discard DiscardPolicy `json:"discard"` - Storage StorageType `json:"storage"` - Replicas int `json:"num_replicas"` - NoAck bool `json:"no_ack,omitempty"` - Template string `json:"template_owner,omitempty"` - Duplicates time.Duration `json:"duplicate_window,omitempty"` - Placement *Placement `json:"placement,omitempty"` - Mirror *StreamSource `json:"mirror,omitempty"` - Sources []*StreamSource `json:"sources,omitempty"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxConsumers int `json:"max_consumers"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + MaxAge time.Duration `json:"max_age"` + MaxMsgsPer int64 `json:"max_msgs_per_subject"` + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Discard DiscardPolicy `json:"discard"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` + Template string `json:"template_owner,omitempty"` + Duplicates time.Duration `json:"duplicate_window,omitempty"` + Placement *Placement `json:"placement,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` + Compression StoreCompression `json:"compression"` // Allow applying a subject transform to incoming messages before doing anything else SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` @@ -549,6 +550,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.StoreDir = storeDir fsCfg.AsyncFlush = false fsCfg.SyncInterval = 2 * time.Minute + fsCfg.Compression = config.Compression if err := mset.setupStore(fsCfg); err != nil { mset.stop(true, false)