diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 18f26686..7013a2cc 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2656,7 +2656,7 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac } // Request to have the meta leader stepdown. -// These will only be received the meta leaders, so less checking needed. +// These will only be received by the meta leader, so less checking needed. func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1c94322c..9be5bdf1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2808,12 +2808,15 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } panic(err.Error()) } - // Ignore if we are recovering and we have already processed. - if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) { + // If no explicit request, fill in with leader stamped last sequence to protect ourselves on replay during server start. + if sp.Request == nil || sp.Request.Sequence == 0 { + purgeSeq := sp.LastSeq + 1 if sp.Request == nil { - sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq} - } else { - sp.Request.Sequence = sp.LastSeq + sp.Request = &JSApiStreamPurgeRequest{Sequence: purgeSeq} + } else if sp.Request.Keep == 0 { + sp.Request.Sequence = purgeSeq + } else if isRecovering { + continue } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4f4fdcf8..159cf269 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) @@ -399,7 +400,7 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) { }) require_NoError(t, err) - // Check upadte now. + // Check update now. _, err = js.UpdateStream(&nats.StreamConfig{ Name: name, Replicas: -11, @@ -4566,3 +4567,221 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) { t.Fatalf("Expected server not to start") } } + +func TestJetStreamAccountUsageDrifts(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + opFrag := ` + operator: %s + system_account: %s + resolver: { type: MEM } + resolver_preload = { + %s : %s + %s : %s + } + ` + + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: -1, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + + template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + c := createJetStreamClusterWithTemplate(t, template, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds)) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST2", + Subjects: []string{"bar"}, + }) + require_NoError(t, err) + + // These expected store values can come directory from stream info's state bytes. + // We will *= 3 for R3 + checkAccount := func(r1u, r3u uint64) { + t.Helper() + r3u *= 3 + + // Remote usage updates can be delayed, so wait for a bit for values we want. + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + info, err := js.AccountInfo() + require_NoError(t, err) + require_True(t, len(info.Tiers) >= 2) + // These can move. + if u := info.Tiers["R1"].Store; u != r1u { + return fmt.Errorf("Expected R1 to be %v, got %v", friendlyBytes(r1u), friendlyBytes(u)) + } + if u := info.Tiers["R3"].Store; u != r3u { + return fmt.Errorf("Expected R3 to be %v, got %v", friendlyBytes(r3u), friendlyBytes(u)) + } + return nil + }) + } + + checkAccount(0, 0) + + // Now add in some R3 data. + msg := bytes.Repeat([]byte("Z"), 32*1024) // 32k + smallMsg := bytes.Repeat([]byte("Z"), 4*1024) // 4k + + for i := 0; i < 1000; i++ { + js.Publish("foo", msg) + } + sir3, err := js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(0, sir3.State.Bytes) + + // Now add in some R1 data. + for i := 0; i < 100; i++ { + js.Publish("bar", msg) + } + + sir1, err := js.StreamInfo("TEST2") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // We will now test a bunch of scenarios to see that we are doing accounting correctly. + + // Since our R3 has a limit of 1000 msgs, let's add in more msgs and drop older ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Move our R3 stream leader and make sure acounting is correct. + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now scale down. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 1, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes+sir3.State.Bytes, 0) + + // Add in more msgs which will replace the older and bigger ones. + for i := 0; i < 100; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + // Now scale back up. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST1", + Subjects: []string{"foo"}, + MaxBytes: 1 * 1024 * 1024 * 1024, + MaxMsgs: 1000, + Replicas: 3, + }) + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Test Purge. + err = js.PurgeStream("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, 0) + + for i := 0; i < 1000; i++ { + js.Publish("foo", smallMsg) + } + sir3, err = js.StreamInfo("TEST1") + require_NoError(t, err) + + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + requestLeaderStepDown := func() { + ml := c.leader() + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if cml := c.leader(); cml == ml { + nc.Request(JSApiLeaderStepDown, nil, time.Second) + return fmt.Errorf("Metaleader has not moved yet") + } + return nil + }) + } + + // Test meta leader stepdowns. + for i := 0; i < len(c.servers); i++ { + requestLeaderStepDown() + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } + + // Now test cluster reset operations where we internally reset the NRG and optionally the stream too. + nl := c.randomNonStreamLeader(aExpPub, "TEST1") + acc, err := nl.LookupAccount(aExpPub) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST1") + require_NoError(t, err) + // NRG only + mset.resetClusteredState(nil) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + // Now NRG and Stream state itself. + mset.resetClusteredState(errFirstSequenceMismatch) + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + + // Now test server restart + for _, s := range c.servers { + s.Shutdown() + s.WaitForShutdown() + s = c.restartServer(s) + + // Wait on healthz and leader etc. + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + if hs := s.healthz(nil); hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + c.waitOnLeader() + c.waitOnStreamLeader(aExpPub, "TEST1") + c.waitOnStreamLeader(aExpPub, "TEST2") + + // Now check account again. + checkAccount(sir1.State.Bytes, sir3.State.Bytes) + } +} diff --git a/server/stream.go b/server/stream.go index b343fb9c..c9280648 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3364,9 +3364,9 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { // Register our server. fs.registerServer(s) } - mset.mu.Unlock() - + // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates) + mset.mu.Unlock() return nil }