Improved performance of subjects details for stream info.

This version avoids all disk IO in the filestore version.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-02-24 17:22:18 -08:00
parent 87a1846822
commit 24c2f3b452
6 changed files with 257 additions and 37 deletions

View File

@@ -1746,10 +1746,12 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
if !isAll {
wc := subjectHasWildcard(filter)
// Do start
if mb := fs.bim[start]; mb != nil {
mb := fs.bim[start]
if mb != nil {
_, f, _ := mb.filteredPending(filter, wc, 0)
ss.First = f
} else {
}
if ss.First == 0 {
// This is a miss. This can happen since psi.fblk is lazy, but should be very rare.
for i := start + 1; i <= stop; i++ {
mb := fs.bim[i]
@@ -1763,7 +1765,7 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
}
}
// Now last
if mb := fs.bim[stop]; mb != nil {
if mb = fs.bim[stop]; mb != nil {
_, _, l := mb.filteredPending(filter, wc, 0)
ss.Last = l
}
@@ -1826,6 +1828,33 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
return fss
}
// SubjectsTotal return message totals per subject.
func (fs *fileStore) SubjectsTotals(filterSubject string) map[string]uint64 {
fs.mu.RLock()
defer fs.mu.RUnlock()
if len(fs.psim) == 0 {
return nil
}
tsa := [32]string{}
fsa := [32]string{}
fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject)
isAll := filterSubject == _EMPTY_ || filterSubject == fwcs
fst := make(map[string]uint64)
for subj, psi := range fs.psim {
if isAll {
fst[subj] = psi.total
} else {
if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) {
fst[subj] = psi.total
}
}
}
return fst
}
// RegisterStorageUpdates registers a callback for updates to storage changes.
// It will present number of messages and bytes as a signed integer and an
// optional sequence number of the message if a single.

View File

@@ -5274,3 +5274,83 @@ func TestFileStoreOnlyWritePerSubjectInfoOnExpireWithUpdate(t *testing.T) {
require_False(t, needsUpdate())
})
}
func TestFileStoreSubjectsTotals(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStore(
fcfg,
StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
fmap := make(map[int]int)
bmap := make(map[int]int)
var m map[int]int
var ft string
for i := 0; i < 10_000; i++ {
// Flip coin for prefix
if rand.Intn(2) == 0 {
ft, m = "foo", fmap
} else {
ft, m = "bar", bmap
}
dt := rand.Intn(100)
subj := fmt.Sprintf("%s.%d", ft, dt)
m[dt]++
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}
// Now test SubjectsTotal
for dt, total := range fmap {
subj := fmt.Sprintf("foo.%d", dt)
m := fs.SubjectsTotals(subj)
if m[subj] != uint64(total) {
t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj])
}
}
// Check fmap.
if st := fs.SubjectsTotals("foo.*"); len(st) != len(fmap) {
t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st))
} else {
expected := 0
for _, n := range fmap {
expected += n
}
received := uint64(0)
for _, n := range st {
received += n
}
if received != uint64(expected) {
t.Fatalf("Expected %d total but got %d", expected, received)
}
}
// Check bmap.
if st := fs.SubjectsTotals("bar.*"); len(st) != len(bmap) {
t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st))
} else {
expected := 0
for _, n := range bmap {
expected += n
}
received := uint64(0)
for _, n := range st {
received += n
}
if received != uint64(expected) {
t.Fatalf("Expected %d total but got %d", expected, received)
}
}
// All with pwc match.
if st, expected := fs.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected {
t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st))
}
})
}

View File

@@ -1842,42 +1842,44 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
// Check if they have asked for subject details.
if subjects != _EMPTY_ {
if mss := mset.store.SubjectsState(subjects); len(mss) > 0 {
// As go iterates over map in a non-consistent order, no choice but to buffer it a slice
buffer := make([]string, 0, len(mss))
for subj := range mss {
buffer = append(buffer, subj)
}
// Sort it
sort.Strings(buffer)
if offset > len(buffer) {
offset = len(buffer)
}
end := offset + JSMaxSubjectDetails
if end > len(buffer) {
end = len(buffer)
}
actualSize := end - offset
var sd map[string]uint64
if actualSize > 0 {
sd = make(map[string]uint64, actualSize)
for _, ss := range buffer[offset:end] {
sd[ss] = mss[ss].Msgs
}
}
resp.StreamInfo.State.Subjects = sd
st := mset.store.SubjectsTotals(subjects)
if lst := len(st); lst > 0 {
// Common for both cases.
resp.Offset = offset
resp.Limit = JSMaxSubjectDetails
resp.Total = len(mss)
}
resp.Total = lst
if offset == 0 && lst <= JSMaxSubjectDetails {
resp.StreamInfo.State.Subjects = st
} else {
// Here we have to filter list due to offset or maximum constraints.
subjs := make([]string, 0, len(st))
for subj := range st {
subjs = append(subjs, subj)
}
// Sort it
sort.Strings(subjs)
if offset > len(subjs) {
offset = len(subjs)
}
end := offset + JSMaxSubjectDetails
if end > len(subjs) {
end = len(subjs)
}
actualSize := end - offset
var sd map[string]uint64
if actualSize > 0 {
sd = make(map[string]uint64, actualSize)
for _, ss := range subjs[offset:end] {
sd[ss] = st[ss]
}
}
resp.StreamInfo.State.Subjects = sd
}
}
}
// Check for out of band catchups.
if mset.hasCatchupPeers() {

View File

@@ -375,6 +375,33 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
return fss
}
// SubjectsTotal return message totals per subject.
func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.fss) == 0 {
return nil
}
tsa := [32]string{}
fsa := [32]string{}
fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject)
isAll := filterSubject == _EMPTY_ || filterSubject == fwcs
fst := make(map[string]uint64)
for subj, ss := range ms.fss {
if isAll {
fst[subj] = ss.Msgs
} else {
if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) {
fst[subj] = ss.Msgs
}
}
}
return fst
}
// Will check the msg limit for this tracked subject.
// Lock should be held.
func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) {

View File

@@ -16,6 +16,7 @@ package server
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"testing"
"time"
@@ -520,3 +521,83 @@ func TestMemStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
state := ms.State()
require_True(t, state.NumSubjects == 500)
}
func TestMemStoreSubjectsTotals(t *testing.T) {
cfg := &StreamConfig{
Name: "TEST",
Storage: MemoryStorage,
Subjects: []string{"*.*"},
}
ms, err := newMemStore(cfg)
require_NoError(t, err)
fmap := make(map[int]int)
bmap := make(map[int]int)
var m map[int]int
var ft string
for i := 0; i < 10_000; i++ {
// Flip coin for prefix
if rand.Intn(2) == 0 {
ft, m = "foo", fmap
} else {
ft, m = "bar", bmap
}
dt := rand.Intn(100)
subj := fmt.Sprintf("%s.%d", ft, dt)
m[dt]++
_, _, err := ms.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}
// Now test SubjectsTotal
for dt, total := range fmap {
subj := fmt.Sprintf("foo.%d", dt)
m := ms.SubjectsTotals(subj)
if m[subj] != uint64(total) {
t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj])
}
}
// Check fmap.
if st := ms.SubjectsTotals("foo.*"); len(st) != len(fmap) {
t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st))
} else {
expected := 0
for _, n := range fmap {
expected += n
}
received := uint64(0)
for _, n := range st {
received += n
}
if received != uint64(expected) {
t.Fatalf("Expected %d total but got %d", expected, received)
}
}
// Check bmap.
if st := ms.SubjectsTotals("bar.*"); len(st) != len(bmap) {
t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st))
} else {
expected := 0
for _, n := range bmap {
expected += n
}
received := uint64(0)
for _, n := range st {
received += n
}
if received != uint64(expected) {
t.Fatalf("Expected %d total but got %d", expected, received)
}
}
// All with pwc match.
if st, expected := ms.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected {
t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st))
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -95,6 +95,7 @@ type StreamStore interface {
GetSeqFromTime(t time.Time) uint64
FilteredState(seq uint64, subject string) SimpleState
SubjectsState(filterSubject string) map[string]SimpleState
SubjectsTotals(filterSubject string) map[string]uint64
State() StreamState
FastState(*StreamState)
Type() StorageType