mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge branch 'main' into dev
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -343,7 +343,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
|
||||
}
|
||||
// Default values.
|
||||
if fcfg.BlockSize == 0 {
|
||||
fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, fcfg.Cipher)
|
||||
fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, prf != nil)
|
||||
}
|
||||
if fcfg.BlockSize > maxBlockSize {
|
||||
return nil, fmt.Errorf("filestore max block size is %s", friendlyBytes(maxBlockSize))
|
||||
@@ -590,7 +590,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) uint64 {
|
||||
func dynBlkSize(retention RetentionPolicy, maxBytes int64, encrypted bool) uint64 {
|
||||
if maxBytes > 0 {
|
||||
blkSize := (maxBytes / 4) + 1 // (25% overhead)
|
||||
// Round up to nearest 100
|
||||
@@ -604,7 +604,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u
|
||||
} else {
|
||||
blkSize = defaultMediumBlockSize
|
||||
}
|
||||
if cipher != NoCipher && blkSize > maximumEncryptedBlockSize {
|
||||
if encrypted && blkSize > maximumEncryptedBlockSize {
|
||||
// Notes on this below.
|
||||
blkSize = maximumEncryptedBlockSize
|
||||
}
|
||||
@@ -612,7 +612,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u
|
||||
}
|
||||
|
||||
switch {
|
||||
case cipher != NoCipher:
|
||||
case encrypted:
|
||||
// In the case of encrypted stores, large blocks can result in worsened perf
|
||||
// since many writes on disk involve re-encrypting the entire block. For now,
|
||||
// we will enforce a cap on the block size when encryption is enabled to avoid
|
||||
|
||||
@@ -5544,3 +5544,91 @@ func TestJetStreamClusterConsumerDefaultsFromStream(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Discovered that we are not properly setting certain default filestore blkSizes.
|
||||
func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Nowmal Stream
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "C3",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// KV
|
||||
_, err = js.CreateKeyValue(&nats.KeyValueConfig{
|
||||
Bucket: "TEST",
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
blkSize := func(fs *fileStore) uint64 {
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
return fs.fcfg.BlockSize
|
||||
}
|
||||
|
||||
// We will check now the following filestores.
|
||||
// meta
|
||||
// TEST stream and NRG
|
||||
// C3 NRG
|
||||
// KV_TEST stream and NRG
|
||||
for _, s := range c.servers {
|
||||
js, cc := s.getJetStreamCluster()
|
||||
// META
|
||||
js.mu.RLock()
|
||||
meta := cc.meta
|
||||
js.mu.RUnlock()
|
||||
require_True(t, meta != nil)
|
||||
fs := meta.(*raft).wal.(*fileStore)
|
||||
require_True(t, blkSize(fs) == defaultMetaFSBlkSize)
|
||||
|
||||
// TEST STREAM
|
||||
mset, err := s.GlobalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
mset.mu.RLock()
|
||||
fs = mset.store.(*fileStore)
|
||||
mset.mu.RUnlock()
|
||||
require_True(t, blkSize(fs) == defaultLargeBlockSize)
|
||||
|
||||
// KV STREAM
|
||||
// Now the KV which is different default size.
|
||||
kv, err := s.GlobalAccount().lookupStream("KV_TEST")
|
||||
require_NoError(t, err)
|
||||
kv.mu.RLock()
|
||||
fs = kv.store.(*fileStore)
|
||||
kv.mu.RUnlock()
|
||||
require_True(t, blkSize(fs) == defaultKVBlockSize)
|
||||
|
||||
// Now check NRGs
|
||||
// TEST Stream
|
||||
n := mset.raftNode()
|
||||
require_True(t, n != nil)
|
||||
fs = n.(*raft).wal.(*fileStore)
|
||||
require_True(t, blkSize(fs) == defaultMediumBlockSize)
|
||||
// KV TEST Stream
|
||||
n = kv.raftNode()
|
||||
require_True(t, n != nil)
|
||||
fs = n.(*raft).wal.(*fileStore)
|
||||
require_True(t, blkSize(fs) == defaultMediumBlockSize)
|
||||
// Consumer
|
||||
o := mset.lookupConsumer("C3")
|
||||
require_True(t, o != nil)
|
||||
n = o.raftNode()
|
||||
require_True(t, n != nil)
|
||||
fs = n.(*raft).wal.(*fileStore)
|
||||
require_True(t, blkSize(fs) == defaultMediumBlockSize)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user