mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Add in NumSubjects to StreamInfo
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2021 The NATS Authors
|
||||
// Copyright 2019-2022 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -89,6 +89,7 @@ type fileStore struct {
|
||||
aek cipher.AEAD
|
||||
lmb *msgBlock
|
||||
blks []*msgBlock
|
||||
psmc map[string]uint64
|
||||
hh hash.Hash64
|
||||
qch chan struct{}
|
||||
cfs []*consumerFileStore
|
||||
@@ -280,6 +281,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
|
||||
fs := &fileStore{
|
||||
fcfg: fcfg,
|
||||
cfg: FileStreamInfo{Created: created, StreamConfig: cfg},
|
||||
psmc: make(map[string]uint64),
|
||||
prf: prf,
|
||||
qch: make(chan struct{}),
|
||||
}
|
||||
@@ -879,6 +881,13 @@ func (fs *fileStore) recoverMsgs() error {
|
||||
}
|
||||
fs.state.Msgs += mb.msgs
|
||||
fs.state.Bytes += mb.bytes
|
||||
// Walk the fss for this mb and fill in fs.psmc
|
||||
for subj, ss := range mb.fss {
|
||||
if len(subj) > 0 {
|
||||
fs.psmc[subj] += ss.Msgs
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
@@ -988,6 +997,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
mb.msgs--
|
||||
purged++
|
||||
// Update fss
|
||||
fs.removePerSubject(sm.subj)
|
||||
mb.removeSeqPerSubject(sm.subj, seq)
|
||||
}
|
||||
|
||||
@@ -1528,6 +1538,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
|
||||
return err
|
||||
}
|
||||
|
||||
// Adjust top level tracking of per subject msg counts.
|
||||
if len(subj) > 0 {
|
||||
fs.psmc[subj]++
|
||||
}
|
||||
|
||||
// Adjust first if needed.
|
||||
now := time.Unix(0, ts).UTC()
|
||||
if fs.state.Msgs == 0 {
|
||||
@@ -1724,6 +1739,19 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
|
||||
return fs.removeMsg(seq, true, true)
|
||||
}
|
||||
|
||||
// Convenience function to remove per subject tracking at the filestore level.
|
||||
// Lock should be held.
|
||||
func (fs *fileStore) removePerSubject(subj string) {
|
||||
if len(subj) == 0 {
|
||||
return
|
||||
}
|
||||
if n, ok := fs.psmc[subj]; ok && n == 1 {
|
||||
delete(fs.psmc, subj)
|
||||
} else if ok {
|
||||
fs.psmc[subj]--
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a message, optionally rewriting the mb file.
|
||||
func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) {
|
||||
fsLock := func() {
|
||||
@@ -1811,6 +1839,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
|
||||
mb.bytes -= msz
|
||||
|
||||
// If we are tracking multiple subjects here make sure we update that accounting.
|
||||
fs.removePerSubject(sm.subj)
|
||||
mb.removeSeqPerSubject(sm.subj, seq)
|
||||
|
||||
var shouldWriteIndex, firstSeqNeedsUpdate bool
|
||||
@@ -3455,7 +3484,7 @@ func (fs *fileStore) FastState(state *StreamState) {
|
||||
}
|
||||
}
|
||||
state.Consumers = len(fs.cfs)
|
||||
|
||||
state.NumSubjects = len(fs.psmc)
|
||||
fs.mu.RUnlock()
|
||||
}
|
||||
|
||||
@@ -3464,6 +3493,7 @@ func (fs *fileStore) State() StreamState {
|
||||
fs.mu.RLock()
|
||||
state := fs.state
|
||||
state.Consumers = len(fs.cfs)
|
||||
state.NumSubjects = len(fs.psmc)
|
||||
state.Deleted = nil // make sure.
|
||||
|
||||
for _, mb := range fs.blks {
|
||||
@@ -3825,6 +3855,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
|
||||
mb.msgs--
|
||||
mb.bytes -= rl
|
||||
// FSS updates.
|
||||
fs.removePerSubject(sm.subj)
|
||||
mb.removeSeqPerSubject(sm.subj, seq)
|
||||
// Check for first message.
|
||||
if seq == mb.first.seq {
|
||||
@@ -3930,6 +3961,9 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
|
||||
|
||||
fs.lmb.writeIndexInfo()
|
||||
|
||||
// Clear any per subject tracking.
|
||||
fs.psmc = make(map[string]uint64)
|
||||
|
||||
cb := fs.scb
|
||||
fs.mu.Unlock()
|
||||
|
||||
@@ -3994,6 +4028,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
|
||||
smb.msgs--
|
||||
purged++
|
||||
// Update fss
|
||||
fs.removePerSubject(sm.subj)
|
||||
smb.removeSeqPerSubject(sm.subj, mseq)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2021 The NATS Authors
|
||||
// Copyright 2019-2022 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -8420,7 +8420,7 @@ func TestJetStreamDeleteMsg(t *testing.T) {
|
||||
t.Fatalf("Expected to get the stream back")
|
||||
}
|
||||
|
||||
expected := StreamState{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20}
|
||||
expected := StreamState{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20, NumSubjects: 1}
|
||||
state = mset.state()
|
||||
state.FirstTime, state.LastTime, state.Deleted, state.NumDeleted = time.Time{}, time.Time{}, nil, 0
|
||||
|
||||
@@ -14656,7 +14656,7 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
getInfo := func(filter string) *StreamInfo {
|
||||
getInfo := func(t *testing.T, filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
@@ -14666,6 +14666,9 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
if si.State.NumSubjects != 3 {
|
||||
t.Fatalf("Expected NumSubjects to be 3, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
return &si
|
||||
}
|
||||
|
||||
@@ -14689,15 +14692,15 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
|
||||
|
||||
// Test all subjects first.
|
||||
expected := map[string]uint64{"foo": 22, "bar": 33, "baz": 44}
|
||||
if si := getInfo(nats.AllKeys); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
if si := getInfo(t, nats.AllKeys); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
if si := getInfo("*"); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
if si := getInfo(t, "*"); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
// Filtered to 1.
|
||||
expected = map[string]uint64{"foo": 22}
|
||||
if si := getInfo("foo"); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
if si := getInfo(t, "foo"); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
}
|
||||
@@ -14706,6 +14709,156 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
|
||||
t.Run("FileStore", func(t *testing.T) { testSubjects(t, nats.FileStorage) })
|
||||
}
|
||||
|
||||
func TestJetStreamStreamInfoSubjectsDetailsWithDeleteAndPurge(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
getInfo := func(t *testing.T, filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
return &si
|
||||
}
|
||||
|
||||
checkResults := func(t *testing.T, expected map[string]uint64) {
|
||||
t.Helper()
|
||||
si := getInfo(t, nats.AllKeys)
|
||||
if !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
if si.State.NumSubjects != len(expected) {
|
||||
t.Fatalf("Expected NumSubjects to be %d, but got %d", len(expected), si.State.NumSubjects)
|
||||
}
|
||||
}
|
||||
|
||||
testSubjects := func(t *testing.T, st nats.StorageType) {
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Storage: st,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
defer js.DeleteStream("TEST")
|
||||
|
||||
msg := []byte("ok")
|
||||
js.Publish("foo", msg) // 1
|
||||
js.Publish("foo", msg) // 2
|
||||
js.Publish("bar", msg) // 3
|
||||
js.Publish("baz", msg) // 4
|
||||
js.Publish("baz", msg) // 5
|
||||
js.Publish("bar", msg) // 6
|
||||
js.Publish("bar", msg) // 7
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "bar": 3, "baz": 2})
|
||||
|
||||
// Now delete some messages.
|
||||
js.DeleteMsg("TEST", 6)
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "bar": 2, "baz": 2})
|
||||
|
||||
// Delete and add right back, so no-op
|
||||
js.DeleteMsg("TEST", 5) // baz
|
||||
js.Publish("baz", msg) // 8
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "bar": 2, "baz": 2})
|
||||
|
||||
// Now do a purge only of bar.
|
||||
jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "bar"})
|
||||
_, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "TEST"), jr, time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "baz": 2})
|
||||
|
||||
// Now purge everything
|
||||
err = js.PurgeStream("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
si := getInfo(t, nats.AllKeys)
|
||||
if len(si.State.Subjects) != 0 {
|
||||
t.Fatalf("Expected no subjects, but got %+v", si.State.Subjects)
|
||||
}
|
||||
if si.State.NumSubjects != 0 {
|
||||
t.Fatalf("Expected NumSubjects to be 0, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("MemoryStore", func(t *testing.T) { testSubjects(t, nats.MemoryStorage) })
|
||||
t.Run("FileStore", func(t *testing.T) { testSubjects(t, nats.FileStorage) })
|
||||
}
|
||||
|
||||
func TestJetStreamStreamInfoSubjectsDetailsAfterRestart(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
getInfo := func(t *testing.T, filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
return &si
|
||||
}
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
defer js.DeleteStream("TEST")
|
||||
|
||||
msg := []byte("ok")
|
||||
js.Publish("foo", msg) // 1
|
||||
js.Publish("foo", msg) // 2
|
||||
js.Publish("bar", msg) // 3
|
||||
js.Publish("baz", msg) // 4
|
||||
js.Publish("baz", msg) // 5
|
||||
|
||||
si := getInfo(t, nats.AllKeys)
|
||||
if si.State.NumSubjects != 3 {
|
||||
t.Fatalf("Expected 3 subjects, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
|
||||
// Stop current
|
||||
nc.Close()
|
||||
sd := s.JetStreamConfig().StoreDir
|
||||
s.Shutdown()
|
||||
// Restart.
|
||||
s = RunJetStreamServerOnPort(-1, sd)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, _ = jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
si = getInfo(t, nats.AllKeys)
|
||||
if si.State.NumSubjects != 3 {
|
||||
t.Fatalf("Expected 3 subjects, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
}
|
||||
|
||||
// Issue #2836
|
||||
func TestJetStreamInterestRetentionBug(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
|
||||
@@ -754,6 +754,7 @@ func (ms *memStore) FastState(state *StreamState) {
|
||||
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
|
||||
}
|
||||
state.Consumers = ms.consumers
|
||||
state.NumSubjects = len(ms.fss)
|
||||
ms.mu.RUnlock()
|
||||
}
|
||||
|
||||
@@ -763,6 +764,7 @@ func (ms *memStore) State() StreamState {
|
||||
|
||||
state := ms.state
|
||||
state.Consumers = ms.consumers
|
||||
state.NumSubjects = len(ms.fss)
|
||||
state.Deleted = nil
|
||||
|
||||
// Calculate interior delete details.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2021 The NATS Authors
|
||||
// Copyright 2019-2022 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -119,17 +119,18 @@ const (
|
||||
|
||||
// StreamState is information about the given stream.
|
||||
type StreamState struct {
|
||||
Msgs uint64 `json:"messages"`
|
||||
Bytes uint64 `json:"bytes"`
|
||||
FirstSeq uint64 `json:"first_seq"`
|
||||
FirstTime time.Time `json:"first_ts"`
|
||||
LastSeq uint64 `json:"last_seq"`
|
||||
LastTime time.Time `json:"last_ts"`
|
||||
Subjects map[string]uint64 `json:"subjects,omitempty"`
|
||||
NumDeleted int `json:"num_deleted,omitempty"`
|
||||
Deleted []uint64 `json:"deleted,omitempty"`
|
||||
Lost *LostStreamData `json:"lost,omitempty"`
|
||||
Consumers int `json:"consumer_count"`
|
||||
Msgs uint64 `json:"messages"`
|
||||
Bytes uint64 `json:"bytes"`
|
||||
FirstSeq uint64 `json:"first_seq"`
|
||||
FirstTime time.Time `json:"first_ts"`
|
||||
LastSeq uint64 `json:"last_seq"`
|
||||
LastTime time.Time `json:"last_ts"`
|
||||
NumSubjects int `json:"num_subjects,omitempty"`
|
||||
Subjects map[string]uint64 `json:"subjects,omitempty"`
|
||||
NumDeleted int `json:"num_deleted,omitempty"`
|
||||
Deleted []uint64 `json:"deleted,omitempty"`
|
||||
Lost *LostStreamData `json:"lost,omitempty"`
|
||||
Consumers int `json:"consumer_count"`
|
||||
}
|
||||
|
||||
// SimpleState for filtered subject specific state.
|
||||
|
||||
Reference in New Issue
Block a user