diff --git a/server/client.go b/server/client.go index 617d9680..3a5d5241 100644 --- a/server/client.go +++ b/server/client.go @@ -3844,14 +3844,16 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { // Go back to the sublist data structure. if !ok { r = acc.sl.Match(string(c.pa.subject)) - c.in.results[string(c.pa.subject)] = r - // Prune the results cache. Keeps us from unbounded growth. Random delete. - if len(c.in.results) > maxResultCacheSize { - n := 0 - for subject := range c.in.results { - delete(c.in.results, subject) - if n++; n > pruneSize { - break + if len(r.psubs)+len(r.qsubs) > 0 { + c.in.results[string(c.pa.subject)] = r + // Prune the results cache. Keeps us from unbounded growth. Random delete. + if len(c.in.results) > maxResultCacheSize { + n := 0 + for subject := range c.in.results { + delete(c.in.results, subject) + if n++; n > pruneSize { + break + } } } } diff --git a/server/filestore.go b/server/filestore.go index 12dcc055..b73b3497 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5708,13 +5708,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { var purged, bytes uint64 - // We have to delete interior messages. fs.mu.Lock() + // Same as purge all. if lseq := fs.state.LastSeq; seq > lseq { fs.mu.Unlock() return fs.purge(seq) } - + // We have to delete interior messages. smb := fs.selectMsgBlock(seq) if smb == nil { fs.mu.Unlock() @@ -6633,6 +6633,9 @@ func (fs *fileStore) Stop() error { fs.cancelSyncTimer() fs.cancelAgeChk() + // We should update the upper usage layer on a stop. + cb, bytes := fs.scb, int64(fs.state.Bytes) + var _cfs [256]ConsumerStore cfs := append(_cfs[:0], fs.cfs...) fs.cfs = nil @@ -6642,6 +6645,10 @@ func (fs *fileStore) Stop() error { o.Stop() } + if bytes > 0 && cb != nil { + cb(0, -bytes, 0, _EMPTY_) + } + return nil } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index bdbfc96e..1fcded12 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2666,7 +2666,7 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac } // Request to have the meta leader stepdown. -// These will only be received the meta leaders, so less checking needed. +// These will only be received by the meta leader, so less checking needed. func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f9f24c03..897f9e5d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2817,12 +2817,15 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } panic(err.Error()) } - // Ignore if we are recovering and we have already processed. - if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) { + // If no explicit request, fill in with leader stamped last sequence to protect ourselves on replay during server start. + if sp.Request == nil || sp.Request.Sequence == 0 { + purgeSeq := sp.LastSeq + 1 if sp.Request == nil { - sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq} - } else { - sp.Request.Sequence = sp.LastSeq + sp.Request = &JSApiStreamPurgeRequest{Sequence: purgeSeq} + } else if sp.Request.Keep == 0 { + sp.Request.Sequence = purgeSeq + } else if isRecovering { + continue } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 626b97ca..e1b370a6 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) @@ -399,7 +400,7 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) { }) require_NoError(t, err) - // Check upadte now. + // Check update now. _, err = js.UpdateStream(&nats.StreamConfig{ Name: name, Replicas: -11, @@ -4588,3 +4589,221 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) { t.Fatalf("Expected server not to start") } } + +func TestJetStreamAccountUsageDrifts(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + opFrag := ` + operator: %s + system_account: %s + resolver: { type: MEM } + resolver_preload = { + %s : %s + %s : %s + } + ` + + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + + template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + c := createJetStreamClusterWithTemplate(t, template, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds)) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST2", + Subjects: []string{"bar"}, + }) + require_NoError(t, err) + + // These expected store values can come directory from stream info's state bytes. + // We will *= 3 for R3 + checkAccount := func(r1u, r3u uint64) { + t.Helper() + r3u *= 3 + + // Remote usage updates can be delayed, so wait for a bit for values we want. + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + info, err := js.AccountInfo() + require_NoError(t, err) + require_True(t, len(info.Tiers) >= 2) + // These can move. + if u := info.Tiers["R1"].Store; u != r1u { + return fmt.Errorf("Expected R1 to be %v, got %v", friendlyBytes(r1u), friendlyBytes(u)) + } + if u := info.Tiers["R3"].Store; u != r3u { + return fmt.Errorf("Expected R3 to be %v, got %v", friendlyBytes(r3u), friendlyBytes(u)) + } + return nil + }) + } + + checkAccount(0, 0) + + // Now add in some R3 data. + msg := bytes.Repeat([]byte("Z"), 32*1024) // 32k + smallMsg := bytes.Repeat([]byte("Z"), 4*1024) // 4k + + for i := 0; i < 1000; i++ { + js.Publish("foo", msg) + } + sir3, err := js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(0, sir3.State.Bytes) + + // Now add in some R1 data. + for i := 0; i < 100; i++ { + js.Publish("bar", msg) + } + + sir1, err := js.StreamInfo("TEST2") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // We will now test a bunch of scenarios to see that we are doing accounting correctly. + + // Since our R3 has a limit of 1000 msgs, let's add in more msgs and drop older ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Move our R3 stream leader and make sure acounting is correct. + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now scale down. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 1, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes+sir3.State.Bytes, 0) + + // Add in more msgs which will replace the older and bigger ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + // Now scale back up. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Test Purge. + err = js.PurgeStream("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, 0) + + for i := 0; i < 1000; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + requestLeaderStepDown := func() { + ml := c.leader() + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if cml := c.leader(); cml == ml { + nc.Request(JSApiLeaderStepDown, nil, time.Second) + return fmt.Errorf("Metaleader has not moved yet") + } + return nil + }) + } + + // Test meta leader stepdowns. + for i := 0; i < len(c.servers); i++ { + requestLeaderStepDown() + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } + + // Now test cluster reset operations where we internally reset the NRG and optionally the stream too. + nl := c.randomNonStreamLeader(aExpPub, "TEST1") + acc, err := nl.LookupAccount(aExpPub) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST1") + require_NoError(t, err) + // NRG only + mset.resetClusteredState(nil) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + // Now NRG and Stream state itself. + mset.resetClusteredState(errFirstSequenceMismatch) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now test server restart + for _, s := range c.servers { + s.Shutdown() + s.WaitForShutdown() + s = c.restartServer(s) + + // Wait on healthz and leader etc. + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + if hs := s.healthz(nil); hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + c.waitOnLeader() + c.waitOnStreamLeader(aExpPub, "TEST1") + c.waitOnStreamLeader(aExpPub, "TEST2") + + // Now check account again. + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } +} diff --git a/server/memstore.go b/server/memstore.go index 1a7cc564..8dfbffce 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1163,11 +1163,12 @@ func memStoreMsgSize(subj string, hdr, msg []byte) uint64 { // Delete is same as Stop for memory store. func (ms *memStore) Delete() error { - ms.Purge() return ms.Stop() } func (ms *memStore) Stop() error { + // These can't come back, so stop is same as Delete. + ms.Purge() ms.mu.Lock() if ms.ageChk != nil { ms.ageChk.Stop() diff --git a/server/opts.go b/server/opts.go index ce0e4321..02c3489b 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1247,17 +1247,22 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error *errors = append(*errors, &configErr{tk, err.Error()}) return } - if dir == _EMPTY_ { - *errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"}) - return - } - if info, _ := os.Stat(dir); info != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) { - *errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"}) - return + + checkDir := func() { + if dir == _EMPTY_ { + *errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"}) + return + } + if info, _ := os.Stat(dir); info != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) { + *errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"}) + return + } } + var res AccountResolver switch strings.ToUpper(dirType) { case "CACHE": + checkDir() if sync != 0 { *errors = append(*errors, &configErr{tk, "CACHE does not accept sync"}) } @@ -1269,6 +1274,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error } res, err = NewCacheDirAccResolver(dir, limit, ttl, opts...) case "FULL": + checkDir() if ttl != 0 { *errors = append(*errors, &configErr{tk, "FULL does not accept ttl"}) } @@ -1284,6 +1290,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error } } res, err = NewDirAccResolver(dir, limit, sync, delete, opts...) + case "MEM", "MEMORY": + res = &MemAccResolver{} } if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) diff --git a/server/stream.go b/server/stream.go index 87e03ba5..09f22b3e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3490,9 +3490,9 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { // Register our server. fs.registerServer(s) } - mset.mu.Unlock() - + // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates) + mset.mu.Unlock() return nil } diff --git a/server/sublist.go b/server/sublist.go index ed38d4de..6f12f891 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -48,7 +48,7 @@ const ( // cacheMax is used to bound limit the frontend cache slCacheMax = 1024 // If we run a sweeper we will drain to this count. - slCacheSweep = 512 + slCacheSweep = 256 // plistMin is our lower bounds to create a fast plist for Match. plistMin = 256 )