From d293af1da6ae98a4f9f3ece9df07dcf92f59d492 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 13 May 2023 12:57:05 -0700 Subject: [PATCH 1/6] Fix to service imports reporting for Accountz() when import subject is mapped into different local subject. Signed-off-by: Derek Collison --- server/monitor.go | 19 +++++++++++-------- server/monitor_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 76cd6150..66bf96ca 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2446,19 +2446,20 @@ func (s *Server) Accountz(optz *AccountzOptions) (*Accountz, error) { if sacc := s.SystemAccount(); sacc != nil { a.SystemAccount = sacc.GetName() } - if optz.Account == "" { + if optz == nil || optz.Account == _EMPTY_ { a.Accounts = []string{} s.accounts.Range(func(key, value interface{}) bool { a.Accounts = append(a.Accounts, key.(string)) return true }) return a, nil - } else if aInfo, err := s.accountInfo(optz.Account); err != nil { - return nil, err - } else { - a.Account = aInfo - return a, nil } + aInfo, err := s.accountInfo(optz.Account) + if err != nil { + return nil, err + } + a.Account = aInfo + return a, nil } func newExtImport(v *serviceImport) ExtImport { @@ -2471,10 +2472,12 @@ func newExtImport(v *serviceImport) ExtImport { imp.Tracking = v.tracking imp.Invalid = v.invalid imp.Import = jwt.Import{ - Subject: jwt.Subject(v.from), + Subject: jwt.Subject(v.to), Account: v.acc.Name, Type: jwt.Service, - To: jwt.Subject(v.to), + // Deprecated so we duplicate. Use LocalSubject. + To: jwt.Subject(v.from), + LocalSubject: jwt.RenamingSubject(v.from), } imp.TrackingHdr = v.trackingHdr imp.Latency = newExtServiceLatency(v.latency) diff --git a/server/monitor_test.go b/server/monitor_test.go index 0c04796f..b7b02b7a 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4734,3 +4734,36 @@ func TestMonitorConnzSortByRTT(t *testing.T) { } } } + +// https://github.com/nats-io/nats-server/issues/4144 +func TestMonitorAccountszMappingOrderReporting(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + server_name: SR22 + accounts { + CLOUD { + exports [ { service: "downlink.>" } ] + } + APP { + imports [ { service: { account: CLOUD, subject: "downlink.>"}, to: "event.>"} ] + } + }`)) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + az, err := s.Accountz(&AccountzOptions{"APP"}) + require_NoError(t, err) + require_NotNil(t, az.Account) + require_True(t, len(az.Account.Imports) > 0) + + var found bool + for _, si := range az.Account.Imports { + if si.Import.Subject == "downlink.>" { + found = true + require_True(t, si.Import.LocalSubject == "event.>") + break + } + } + require_True(t, found) +} From 75d274a6362b3ca780e6c878841bbe192d6babd3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 13 May 2023 18:36:42 -0700 Subject: [PATCH 2/6] If a NATS system has multiple domains make sure to process those during a remote update before bailing. Signed-off-by: Derek Collison --- server/events.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/events.go b/server/events.go index 4f7e8a70..00c1d3ed 100644 --- a/server/events.go +++ b/server/events.go @@ -1268,6 +1268,13 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su } si := ssm.Server + // Should do normal updates before bailing if wrong domain. + s.mu.Lock() + if s.running && s.eventsEnabled() && ssm.Server.ID != s.info.ID { + s.updateRemoteServer(&si) + } + s.mu.Unlock() + // JetStream node updates. if !s.sameDomain(si.Domain) { return @@ -1293,11 +1300,6 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su stats, false, si.JetStream, }) - s.mu.Lock() - if s.running && s.eventsEnabled() && ssm.Server.ID != s.info.ID { - s.updateRemoteServer(&si) - } - s.mu.Unlock() } // updateRemoteServer is called when we have an update from a remote server. From 3c4ed549a50aea0c3b04a5de2646384d5e164580 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sun, 14 May 2023 11:10:25 -0700 Subject: [PATCH 3/6] resolver: improve signaling for missing account lookups (#4151) When using the nats account resolver and a JWT is not found, the client could often get an i/o timeout error due to not receiving a timely response before the account resolver fetch request times out. Now instead of waiting for the fetch request to timeout, a resolver without JWTs will notify as well that it could not find a matching JWT, waiting for a response from all active servers. Also included in this PR is some cleanup to the logs emitted by the resolver. Signed-off-by: Waldemar Quevedo --- server/accounts.go | 73 +++++++++++------ server/dirstore.go | 7 ++ server/jwt_test.go | 197 +++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 249 insertions(+), 28 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 8f62104b..5d37d5f8 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -21,6 +21,7 @@ import ( "hash/fnv" "hash/maphash" "io" + "io/fs" "math" "math/rand" "net/http" @@ -3993,17 +3994,19 @@ func (dr *DirAccResolver) Start(s *Server) error { dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); ok { if theJwt, err := dr.LoadAcc(pubKey); err != nil { - s.Errorf("update got error on load: %v", err) + s.Errorf("DirResolver - Update got error on load: %v", err) } else { acc := v.(*Account) if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil { - s.Errorf("update resulted in error %v", err) + s.Errorf("DirResolver - Update for account %q resulted in error %v", pubKey, err) } else { if _, jsa, err := acc.checkForJetStream(); err != nil { - s.Warnf("error checking for JetStream enabled error %v", err) + if !IsNatsErr(err, JSNotEnabledForAccountErr) { + s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err) + } } else if jsa == nil { if err = s.configJetStream(acc); err != nil { - s.Errorf("updated resulted in error when configuring JetStream %v", err) + s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err) } } } @@ -4024,7 +4027,7 @@ func (dr *DirAccResolver) Start(s *Server) error { } else if len(tk) == accUpdateTokensOld { pubKey = tk[accUpdateAccIdxOld] } else { - s.Debugf("jwt update skipped due to bad subject %q", subj) + s.Debugf("DirResolver - jwt update skipped due to bad subject %q", subj) return } if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { @@ -4074,8 +4077,15 @@ func (dr *DirAccResolver) Start(s *Server) error { if len(tk) != accLookupReqTokens { return } - if theJWT, err := dr.DirJWTStore.LoadAcc(tk[accReqAccIndex]); err != nil { - s.Errorf("Merging resulted in error: %v", err) + accName := tk[accReqAccIndex] + if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil { + if errors.Is(err, fs.ErrNotExist) { + s.Debugf("DirResolver - Could not find account %q", accName) + // Reply with empty response to signal absence of JWT to others. + s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil) + } else { + s.Errorf("DirResolver - Error looking up account %q: %v", accName, err) + } } else { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT)) } @@ -4083,7 +4093,7 @@ func (dr *DirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up lookup request handling: %v", err) } // respond to pack requests with one or more pack messages - // an empty message signifies the end of the response responder + // an empty message signifies the end of the response responder. if _, err := s.sysSubscribeQ(accPackReqSubj, "responder", func(_ *subscription, _ *client, _ *Account, _, reply string, theirHash []byte) { if reply == _EMPTY_ { return @@ -4091,14 +4101,14 @@ func (dr *DirAccResolver) Start(s *Server) error { ourHash := dr.DirJWTStore.Hash() if bytes.Equal(theirHash, ourHash[:]) { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{}) - s.Debugf("pack request matches hash %x", ourHash[:]) + s.Debugf("DirResolver - Pack request matches hash %x", ourHash[:]) } else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg)) }); err != nil { // let them timeout - s.Errorf("pack request error: %v", err) + s.Errorf("DirResolver - Pack request error: %v", err) } else { - s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash) + s.Debugf("DirResolver - Pack request hash %x - finished responding with hash %x", theirHash, ourHash) s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{}) } }); err != nil { @@ -4119,12 +4129,12 @@ func (dr *DirAccResolver) Start(s *Server) error { if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) { hash := dr.DirJWTStore.Hash() if len(msg) == 0 { // end of response stream - s.Debugf("Merging Finished and resulting in: %x", dr.DirJWTStore.Hash()) + s.Debugf("DirResolver - Merging finished and resulting in: %x", dr.DirJWTStore.Hash()) return } else if err := dr.DirJWTStore.Merge(string(msg)); err != nil { - s.Errorf("Merging resulted in error: %v", err) + s.Errorf("DirResolver - Merging resulted in error: %v", err) } else { - s.Debugf("Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash()) + s.Debugf("DirResolver - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash()) } }); err != nil { return fmt.Errorf("error setting up pack response handling: %v", err) @@ -4142,7 +4152,7 @@ func (dr *DirAccResolver) Start(s *Server) error { case <-ticker.C: } ourHash := dr.DirJWTStore.Hash() - s.Debugf("Checking store state: %x", ourHash) + s.Debugf("DirResolver - Checking store state: %x", ourHash) s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:]) } }) @@ -4227,20 +4237,35 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) s.mu.Unlock() return _EMPTY_, fmt.Errorf("eventing shut down") } + // Resolver will wait for detected active servers to reply + // before serving an error in case there weren't any found. + expectedServers := len(s.sys.servers) replySubj := s.newRespInbox() replies := s.sys.replies + // Store our handler. replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { - clone := make([]byte, len(msg)) - copy(clone, msg) + var clone []byte + isEmpty := len(msg) == 0 + if !isEmpty { + clone = make([]byte, len(msg)) + copy(clone, msg) + } s.mu.Lock() + defer s.mu.Unlock() + expectedServers-- + // Skip empty responses until getting all the available servers. + if isEmpty && expectedServers > 0 { + return + } + // Use the first valid response if there is still interest or + // one of the empty responses to signal that it was not found. if _, ok := replies[replySubj]; ok { select { - case respC <- clone: // only use first response and only if there is still interest + case respC <- clone: default: } } - s.mu.Unlock() } s.sendInternalMsg(accountLookupRequest, replySubj, nil, []byte{}) quit := s.quitCh @@ -4253,7 +4278,9 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) case <-time.After(timeout): err = errors.New("fetching jwt timed out") case m := <-respC: - if err = res.Store(name, string(m)); err == nil { + if len(m) == 0 { + err = errors.New("account jwt not found") + } else if err = res.Store(name, string(m)); err == nil { theJWT = string(m) } } @@ -4291,9 +4318,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); !ok { } else if theJwt, err := dr.LoadAcc(pubKey); err != nil { - s.Errorf("update got error on load: %v", err) + s.Errorf("DirResolver - Update got error on load: %v", err) } else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil { - s.Errorf("update resulted in error %v", err) + s.Errorf("DirResolver - Update resulted in error %v", err) } } dr.DirJWTStore.deleted = func(pubKey string) { @@ -4309,7 +4336,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { } else if len(tk) == accUpdateTokensOld { pubKey = tk[accUpdateAccIdxOld] } else { - s.Debugf("jwt update cache skipped due to bad subject %q", subj) + s.Debugf("DirResolver - jwt update cache skipped due to bad subject %q", subj) return } if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { diff --git a/server/dirstore.go b/server/dirstore.go index cabd4a19..b39ab9ae 100644 --- a/server/dirstore.go +++ b/server/dirstore.go @@ -288,6 +288,10 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string)) if err != nil { return err } + if len(jwtBytes) == 0 { + // Skip if no contents in the JWT. + return nil + } if exp != nil { claim, err := jwt.DecodeGeneric(string(jwtBytes)) if err == nil && claim.Expires > 0 && claim.Expires < time.Now().Unix() { @@ -406,6 +410,9 @@ func (store *DirJWTStore) load(publicKey string) (string, error) { // write that keeps hash of all jwt in sync // Assumes the lock is held. Does return true or an error never both. func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (bool, error) { + if len(theJWT) == 0 { + return false, fmt.Errorf("invalid JWT") + } var newHash *[sha256.Size]byte if store.expiration != nil { h := sha256.Sum256([]byte(theJWT)) diff --git a/server/jwt_test.go b/server/jwt_test.go index 10858924..58884b97 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3692,7 +3692,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { listen: 127.0.0.1:-1 no_advertise: true } - `, ojwt, syspub, dirAA))) + `, ojwt, syspub, dirAA))) sAA, _ := RunServerWithConfig(confAA) defer sAA.Shutdown() // Create Server B (using no_advertise to prevent fail over) @@ -3718,7 +3718,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { nats-route://127.0.0.1:%d ] } - `, ojwt, syspub, dirAB, sAA.opts.Cluster.Port))) + `, ojwt, syspub, dirAB, sAA.opts.Cluster.Port))) sAB, _ := RunServerWithConfig(confAB) defer sAB.Shutdown() // Create Server C (using no_advertise to prevent fail over) @@ -3744,10 +3744,10 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { listen: 127.0.0.1:-1 no_advertise: true } - `, ojwt, syspub, dirBA, sAA.opts.Gateway.Port))) + `, ojwt, syspub, dirBA, sAA.opts.Gateway.Port))) sBA, _ := RunServerWithConfig(confBA) defer sBA.Shutdown() - // Create Sever BA (using no_advertise to prevent fail over) + // Create Server BA (using no_advertise to prevent fail over) confBB := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 server_name: srv-B-B @@ -3773,7 +3773,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { {name: "clust-A", url: "nats://127.0.0.1:%d"}, ] } - `, ojwt, syspub, dirBB, sBA.opts.Cluster.Port, sAA.opts.Cluster.Port))) + `, ojwt, syspub, dirBB, sBA.opts.Cluster.Port, sAA.opts.Cluster.Port))) sBB, _ := RunServerWithConfig(confBB) defer sBB.Shutdown() // Assert topology @@ -6592,3 +6592,190 @@ func TestServerOperatorModeNoAuthRequired(t *testing.T) { require_True(t, nc.AuthRequired()) } + +func TestJWTAccountNATSResolverWrongCreds(t *testing.T) { + require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) { + t.Helper() + for _, srv := range srvs { + if acc, ok := srv.accounts.Load(account); ok { + checkAccClientsCount(t, acc.(*Account), 0) + } + } + } + connect := func(url string, credsfile string, acc string, srvs ...*Server) { + t.Helper() + nc := natsConnect(t, url, nats.UserCredentials(credsfile), nats.Timeout(5*time.Second)) + nc.Close() + require_NoLocalOrRemoteConnections(acc, srvs...) + } + createAccountAndUser := func(limit bool, done chan struct{}, pubKey, jwt1, jwt2, creds *string) { + t.Helper() + kp, _ := nkeys.CreateAccount() + *pubKey, _ = kp.PublicKey() + claim := jwt.NewAccountClaims(*pubKey) + var err error + *jwt1, err = claim.Encode(oKp) + require_NoError(t, err) + *jwt2, err = claim.Encode(oKp) + require_NoError(t, err) + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub + ujwt, err := uclaim.Encode(kp) + require_NoError(t, err) + *creds = genCredsFile(t, ujwt, seed) + done <- struct{}{} + } + // Create Accounts and corresponding user creds. + doneChan := make(chan struct{}, 4) + defer close(doneChan) + var syspub, sysjwt, dummy1, sysCreds string + createAccountAndUser(false, doneChan, &syspub, &sysjwt, &dummy1, &sysCreds) + + var apub, ajwt1, ajwt2, aCreds string + createAccountAndUser(true, doneChan, &apub, &ajwt1, &ajwt2, &aCreds) + + var bpub, bjwt1, bjwt2, bCreds string + createAccountAndUser(true, doneChan, &bpub, &bjwt1, &bjwt2, &bCreds) + + // The one that is going to be missing. + var cpub, cjwt1, cjwt2, cCreds string + createAccountAndUser(true, doneChan, &cpub, &cjwt1, &cjwt2, &cCreds) + for i := 0; i < cap(doneChan); i++ { + <-doneChan + } + // Create one directory for each server + dirA := t.TempDir() + dirB := t.TempDir() + dirC := t.TempDir() + + // Store accounts on servers A and B, then let C sync on its own. + writeJWT(t, dirA, apub, ajwt1) + writeJWT(t, dirB, bpub, bjwt1) + + ///////////////////////////////////////// + // // + // Server A: has creds from client A // + // // + ///////////////////////////////////////// + confA := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: srv-A + operator: %s + system_account: %s + debug: true + resolver: { + type: full + dir: '%s' + allow_delete: true + timeout: "1.5s" + interval: "200ms" + } + resolver_preload: { + %s: %s + } + cluster { + name: clust + listen: 127.0.0.1:-1 + no_advertise: true + } + `, ojwt, syspub, dirA, apub, ajwt1))) + sA, _ := RunServerWithConfig(confA) + defer sA.Shutdown() + require_JWTPresent(t, dirA, apub) + + ///////////////////////////////////////// + // // + // Server B: has creds from client B // + // // + ///////////////////////////////////////// + confB := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: srv-B + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + allow_delete: true + timeout: "1.5s" + interval: "200ms" + } + cluster { + name: clust + listen: 127.0.0.1:-1 + no_advertise: true + routes [ + nats-route://127.0.0.1:%d + ] + } + `, ojwt, syspub, dirB, sA.opts.Cluster.Port))) + sB, _ := RunServerWithConfig(confB) + defer sB.Shutdown() + + ///////////////////////////////////////// + // // + // Server C: has no creds // + // // + ///////////////////////////////////////// + fmtC := ` + listen: 127.0.0.1:-1 + server_name: srv-C + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + allow_delete: true + timeout: "1.5s" + interval: "200ms" + } + cluster { + name: clust + listen: 127.0.0.1:-1 + no_advertise: true + routes [ + nats-route://127.0.0.1:%d + ] + } + ` + confClongTTL := createConfFile(t, []byte(fmt.Sprintf(fmtC, ojwt, syspub, dirC, sA.opts.Cluster.Port))) + sC, _ := RunServerWithConfig(confClongTTL) // use long ttl to assure it is not kicking + defer sC.Shutdown() + + // startup cluster + checkClusterFormed(t, sA, sB, sC) + time.Sleep(1 * time.Second) // wait for the protocol to converge + // // Check all accounts + require_JWTPresent(t, dirA, apub) // was already present on startup + require_JWTPresent(t, dirB, apub) // was copied from server A + require_JWTPresent(t, dirA, bpub) // was copied from server B + require_JWTPresent(t, dirB, bpub) // was already present on startup + + // There should be no state about the missing account. + require_JWTAbsent(t, dirA, cpub) + require_JWTAbsent(t, dirB, cpub) + require_JWTAbsent(t, dirC, cpub) + + // system account client can connect to every server + connect(sA.ClientURL(), sysCreds, "") + connect(sB.ClientURL(), sysCreds, "") + connect(sC.ClientURL(), sysCreds, "") + + // A and B clients can connect to any server. + connect(sA.ClientURL(), aCreds, "") + connect(sB.ClientURL(), aCreds, "") + connect(sC.ClientURL(), aCreds, "") + connect(sA.ClientURL(), bCreds, "") + connect(sB.ClientURL(), bCreds, "") + connect(sC.ClientURL(), bCreds, "") + + // Check that trying to connect with bad credentials should not hang until the fetch timeout + // and instead return a faster response when an account is not found. + _, err := nats.Connect(sC.ClientURL(), nats.UserCredentials(cCreds), nats.Timeout(500*time.Second)) + if err != nil && !errors.Is(err, nats.ErrAuthorization) { + t.Fatalf("Expected auth error: %v", err) + } +} From 832df1cdbada8226832e1314fb56e42344bec01a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 15 May 2023 14:38:26 -0700 Subject: [PATCH 4/6] Protect against out of bounds access on usage updates. Signed-off-by: Derek Collison --- server/jetstream.go | 90 ++++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 1f663708..50cc8a7d 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1714,14 +1714,13 @@ func (a *Account) JetStreamEnabled() bool { } func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account, subject, _ string, msg []byte) { - const usageSize = 32 - // jsa.js.srv is immutable and guaranteed to no be nil, so no lock needed. s := jsa.js.srv jsa.usageMu.Lock() - if len(msg) < usageSize { - jsa.usageMu.Unlock() + defer jsa.usageMu.Unlock() + + if len(msg) < minUsageUpdateLen { s.Warnf("Ignoring remote usage update with size too short") return } @@ -1730,7 +1729,6 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account rnode = subject[li+1:] } if rnode == _EMPTY_ { - jsa.usageMu.Unlock() s.Warnf("Received remote usage update with no remote node") return } @@ -1765,21 +1763,31 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account apiTotal, apiErrors := le.Uint64(msg[16:]), le.Uint64(msg[24:]) memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:])) - // we later extended the data structure to support multiple tiers - excessRecordCnt := uint32(0) - tierName := _EMPTY_ - if len(msg) >= 44 { - excessRecordCnt = le.Uint32(msg[32:]) - length := le.Uint64(msg[36:]) - tierName = string(msg[44 : 44+length]) - msg = msg[44+length:] + // We later extended the data structure to support multiple tiers + var excessRecordCnt uint32 + var tierName string + + if len(msg) >= usageMultiTiersLen { + excessRecordCnt = le.Uint32(msg[minUsageUpdateLen:]) + length := le.Uint64(msg[minUsageUpdateLen+4:]) + // Need to protect past this point in case this is wrong. + if uint64(len(msg)) < usageMultiTiersLen+length { + s.Warnf("Received corrupt remote usage update") + return + } + tierName = string(msg[usageMultiTiersLen : usageMultiTiersLen+length]) + msg = msg[usageMultiTiersLen+length:] } updateTotal(tierName, memUsed, storeUsed) - for ; excessRecordCnt > 0 && len(msg) >= 24; excessRecordCnt-- { + for ; excessRecordCnt > 0 && len(msg) >= usageRecordLen; excessRecordCnt-- { memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:])) length := le.Uint64(msg[16:]) - tierName = string(msg[24 : 24+length]) - msg = msg[24+length:] + if uint64(len(msg)) < usageRecordLen+length { + s.Warnf("Received corrupt remote usage update on excess record") + return + } + tierName = string(msg[usageRecordLen : usageRecordLen+length]) + msg = msg[usageRecordLen+length:] updateTotal(tierName, memUsed, storeUsed) } jsa.apiTotal -= rUsage.api @@ -1788,7 +1796,6 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account rUsage.err = apiErrors jsa.apiTotal += apiTotal jsa.apiErrors += apiErrors - jsa.usageMu.Unlock() } // When we detect a skew of some sort this will verify the usage reporting is correct. @@ -1906,12 +1913,22 @@ func (jsa *jsAccount) sendClusterUsageUpdateTimer() { } } +// For usage fields. +const ( + minUsageUpdateLen = 32 + stackUsageUpdate = 72 + usageRecordLen = 24 + usageMultiTiersLen = 44 + apiStatsAndNumTiers = 20 + minUsageUpdateWindow = 250 * time.Millisecond +) + // Send updates to our account usage for this server. // jsa.usageMu lock should be held. func (jsa *jsAccount) sendClusterUsageUpdate() { // These values are absolute so we can limit send rates. now := time.Now() - if now.Sub(jsa.lupdate) < 250*time.Millisecond { + if now.Sub(jsa.lupdate) < minUsageUpdateWindow { return } jsa.lupdate = now @@ -1921,32 +1938,37 @@ func (jsa *jsAccount) sendClusterUsageUpdate() { return } // every base record contains mem/store/len(tier) as well as the tier name - l := 24 * lenUsage + l := usageRecordLen * lenUsage for tier := range jsa.usage { l += len(tier) } - if lenUsage > 0 { - // first record contains api/usage errors as well as count for extra base records - l += 20 - } - var le = binary.LittleEndian - b := make([]byte, l) - i := 0 + // first record contains api/usage errors as well as count for extra base records + l += apiStatsAndNumTiers + var raw [stackUsageUpdate]byte + var b []byte + if l > stackUsageUpdate { + b = make([]byte, l) + } else { + b = raw[:l] + } + + var i int + var le = binary.LittleEndian for tier, usage := range jsa.usage { le.PutUint64(b[i+0:], uint64(usage.local.mem)) le.PutUint64(b[i+8:], uint64(usage.local.store)) if i == 0 { - le.PutUint64(b[i+16:], jsa.usageApi) - le.PutUint64(b[i+24:], jsa.usageErr) - le.PutUint32(b[i+32:], uint32(len(jsa.usage)-1)) - le.PutUint64(b[i+36:], uint64(len(tier))) - copy(b[i+44:], tier) - i += 44 + len(tier) + le.PutUint64(b[16:], jsa.usageApi) + le.PutUint64(b[24:], jsa.usageErr) + le.PutUint32(b[32:], uint32(len(jsa.usage)-1)) + le.PutUint64(b[36:], uint64(len(tier))) + copy(b[usageMultiTiersLen:], tier) + i = usageMultiTiersLen + len(tier) } else { le.PutUint64(b[i+16:], uint64(len(tier))) - copy(b[i+24:], tier) - i += 24 + len(tier) + copy(b[i+usageRecordLen:], tier) + i += usageRecordLen + len(tier) } } jsa.sendq.push(newPubMsg(nil, jsa.updatesPub, _EMPTY_, nil, nil, b, noCompression, false, false)) From 3602ff51467e758a0fb49890fca5ffc0626e3573 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 15 May 2023 15:30:55 -0700 Subject: [PATCH 5/6] Additional fix for #3734. When the first block was truncated and missing any index info we would not properly rebuild the state. Signed-off-by: Derek Collison --- server/filestore.go | 37 +++++++++++++++++++------ server/jetstream_test.go | 59 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 8b91966f..8fd7b20d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -989,6 +989,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.last.seq, mb.last.ts = 0, 0 firstNeedsSet := true + // Remove the .fss file from disk. + mb.removePerSubjectInfoLocked() + // Check if we need to decrypt. if mb.bek != nil && len(buf) > 0 { // Recreate to reset counter. @@ -1186,6 +1189,13 @@ func (fs *fileStore) recoverMsgs() error { return err } if mb, err := fs.recoverMsgBlock(finfo, index); err == nil && mb != nil { + // This is a truncate block with possibly no index. If the OS got shutdown + // out from underneath of us this is possible. + if mb.first.seq == 0 { + mb.dirtyCloseWithRemove(true) + fs.removeMsgBlockFromList(mb) + continue + } if fs.state.FirstSeq == 0 || mb.first.seq < fs.state.FirstSeq { fs.state.FirstSeq = mb.first.seq fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() @@ -2468,12 +2478,16 @@ func (fs *fileStore) rebuildFirst() { if len(fs.blks) == 0 { return } - if fmb := fs.blks[0]; fmb != nil { - fmb.removeIndexFile() - fmb.rebuildState() - fmb.writeIndexInfo() - fs.selectNextFirst() + fmb := fs.blks[0] + if fmb == nil { + return } + + fmb.removeIndexFile() + ld, _ := fmb.rebuildState() + fmb.writeIndexInfo() + fs.selectNextFirst() + fs.rebuildStateLocked(ld) } // Optimized helper function to return first sequence. @@ -5667,11 +5681,9 @@ func (fs *fileStore) addMsgBlock(mb *msgBlock) { fs.bim[mb.index] = mb } -// Removes the msgBlock +// Remove from our list of blks. // Both locks should be held. -func (fs *fileStore) removeMsgBlock(mb *msgBlock) { - mb.dirtyCloseWithRemove(true) - +func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) { // Remove from list. for i, omb := range fs.blks { if mb == omb { @@ -5683,6 +5695,13 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { break } } +} + +// Removes the msgBlock +// Both locks should be held. +func (fs *fileStore) removeMsgBlock(mb *msgBlock) { + mb.dirtyCloseWithRemove(true) + fs.removeMsgBlockFromList(mb) // Check for us being last message block if mb == fs.lmb { // Creating a new message write block requires that the lmb lock is not held. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 94954eaa..c788b1a7 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20002,3 +20002,62 @@ func TestJetStreamSnapshotRestoreStallAndHealthz(t *testing.T) { t.Fatalf("Expected health to be ok, got %+v", hs) } } + +// https://github.com/nats-io/nats-server/pull/4163 +func TestJetStreamMaxBytesIgnored(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + MaxBytes: 10 * 1024 * 1024, + }) + require_NoError(t, err) + + msg := bytes.Repeat([]byte("A"), 1024*1024) + + for i := 0; i < 10; i++ { + _, err := js.Publish("x", msg) + require_NoError(t, err) + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.Msgs == 9) + + // Stop current + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + + // We will remove the idx file and truncate the blk and fss files. + mdir := filepath.Join(sd, "$G", "streams", "TEST", "msgs") + // Remove idx + err = os.Remove(filepath.Join(mdir, "1.idx")) + require_NoError(t, err) + // Truncate fss + err = os.WriteFile(filepath.Join(mdir, "1.fss"), nil, defaultFilePerms) + require_NoError(t, err) + // Truncate blk + err = os.WriteFile(filepath.Join(mdir, "1.blk"), nil, defaultFilePerms) + require_NoError(t, err) + + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + for i := 0; i < 10; i++ { + _, err := js.Publish("x", msg) + require_NoError(t, err) + } + + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.Bytes <= 10*1024*1024) +} From ee38f8bbc550467c988affe99c4e7d85442b2a1e Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 15 May 2023 15:33:57 -0700 Subject: [PATCH 6/6] monitor: change account detail info back to utc when served (#4163) Signed-off-by: Waldemar Quevedo --- server/monitor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 66bf96ca..8e0c11a6 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -788,7 +788,7 @@ type RouteInfo struct { // Routez returns a Routez struct containing information about routes. func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { rs := &Routez{Routes: []*RouteInfo{}} - rs.Now = time.Now() + rs.Now = time.Now().UTC() if routezOpts == nil { routezOpts = &RoutezOptions{} @@ -975,7 +975,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { slStats := &SublistStats{} // FIXME(dlc) - Make account aware. - sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil} + sz := &Subsz{s.info.ID, time.Now().UTC(), slStats, 0, offset, limit, nil} if subdetail { var raw [4096]*subscription @@ -1522,7 +1522,7 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { Compression: ws.Compression, HandshakeTimeout: ws.HandshakeTimeout, }, - Start: s.start, + Start: s.start.UTC(), MaxSubs: opts.MaxSubs, Cores: runtime.NumCPU(), MaxProcs: runtime.GOMAXPROCS(0), @@ -2606,7 +2606,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { } return &AccountInfo{ accName, - a.updated, + a.updated.UTC(), isSys, a.expired, !a.incomplete,