mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Moved log running test to NoRace suite
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -5372,126 +5372,3 @@ func TestFileStoreSubjectsTotals(t *testing.T) {
|
||||
t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileStoreNumPending(t *testing.T) {
|
||||
// No need for all permutations here.
|
||||
storeDir := t.TempDir()
|
||||
fcfg := FileStoreConfig{
|
||||
StoreDir: storeDir,
|
||||
BlockSize: 2 * 1024, // Create many blocks on purpose.
|
||||
}
|
||||
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage})
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
tokens := []string{"foo", "bar", "baz"}
|
||||
genSubj := func() string {
|
||||
return fmt.Sprintf("%s.%s.%s.%s",
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
)
|
||||
}
|
||||
|
||||
for i := 0; i < 50_000; i++ {
|
||||
subj := genSubj()
|
||||
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
state := fs.State()
|
||||
|
||||
// Scan one by one for sanity check against other calculations.
|
||||
sanityCheck := func(sseq uint64, filter string) SimpleState {
|
||||
t.Helper()
|
||||
var ss SimpleState
|
||||
var smv StoreMsg
|
||||
// For here we know 0 is invalid, set to 1.
|
||||
if sseq == 0 {
|
||||
sseq = 1
|
||||
}
|
||||
for seq := sseq; seq <= state.LastSeq; seq++ {
|
||||
sm, err := fs.LoadMsg(seq, &smv)
|
||||
if err != nil {
|
||||
t.Logf("Encountered error %v loading sequence: %d", err, seq)
|
||||
continue
|
||||
}
|
||||
if subjectIsSubsetMatch(sm.subj, filter) {
|
||||
ss.Msgs++
|
||||
ss.Last = seq
|
||||
if ss.First == 0 || seq < ss.First {
|
||||
ss.First = seq
|
||||
}
|
||||
}
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
check := func(sseq uint64, filter string) {
|
||||
t.Helper()
|
||||
np, lvs := fs.NumPending(sseq, filter, false)
|
||||
ss := fs.FilteredState(sseq, filter)
|
||||
sss := sanityCheck(sseq, filter)
|
||||
if lvs != state.LastSeq {
|
||||
t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs)
|
||||
}
|
||||
if ss.Msgs != np {
|
||||
t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs)
|
||||
}
|
||||
if ss != sss {
|
||||
t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss)
|
||||
}
|
||||
}
|
||||
|
||||
sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState {
|
||||
t.Helper()
|
||||
var ss SimpleState
|
||||
var smv StoreMsg
|
||||
// For here we know 0 is invalid, set to 1.
|
||||
if sseq == 0 {
|
||||
sseq = 1
|
||||
}
|
||||
seen := make(map[string]bool)
|
||||
for seq := state.LastSeq; seq >= sseq; seq-- {
|
||||
sm, err := fs.LoadMsg(seq, &smv)
|
||||
if err != nil {
|
||||
t.Logf("Encountered error %v loading sequence: %d", err, seq)
|
||||
continue
|
||||
}
|
||||
if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) {
|
||||
ss.Msgs++
|
||||
if ss.Last == 0 {
|
||||
ss.Last = seq
|
||||
}
|
||||
if ss.First == 0 || seq < ss.First {
|
||||
ss.First = seq
|
||||
}
|
||||
seen[sm.subj] = true
|
||||
}
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
checkLastOnly := func(sseq uint64, filter string) {
|
||||
t.Helper()
|
||||
np, lvs := fs.NumPending(sseq, filter, true)
|
||||
ss := sanityCheckLastOnly(sseq, filter)
|
||||
if lvs != state.LastSeq {
|
||||
t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs)
|
||||
}
|
||||
if ss.Msgs != np {
|
||||
t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs)
|
||||
}
|
||||
}
|
||||
|
||||
startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999}
|
||||
checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"}
|
||||
|
||||
for _, filter := range checkSubs {
|
||||
for _, start := range startSeqs {
|
||||
check(start, filter)
|
||||
checkLastOnly(start, filter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7312,3 +7312,126 @@ func TestNoRaceJetStreamClusterInterestStreamConsistencyAfterRollingRestart(t *t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoRaceFileStoreNumPending(t *testing.T) {
|
||||
// No need for all permutations here.
|
||||
storeDir := t.TempDir()
|
||||
fcfg := FileStoreConfig{
|
||||
StoreDir: storeDir,
|
||||
BlockSize: 2 * 1024, // Create many blocks on purpose.
|
||||
}
|
||||
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage})
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
tokens := []string{"foo", "bar", "baz"}
|
||||
genSubj := func() string {
|
||||
return fmt.Sprintf("%s.%s.%s.%s",
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
tokens[rand.Intn(len(tokens))],
|
||||
)
|
||||
}
|
||||
|
||||
for i := 0; i < 50_000; i++ {
|
||||
subj := genSubj()
|
||||
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
state := fs.State()
|
||||
|
||||
// Scan one by one for sanity check against other calculations.
|
||||
sanityCheck := func(sseq uint64, filter string) SimpleState {
|
||||
t.Helper()
|
||||
var ss SimpleState
|
||||
var smv StoreMsg
|
||||
// For here we know 0 is invalid, set to 1.
|
||||
if sseq == 0 {
|
||||
sseq = 1
|
||||
}
|
||||
for seq := sseq; seq <= state.LastSeq; seq++ {
|
||||
sm, err := fs.LoadMsg(seq, &smv)
|
||||
if err != nil {
|
||||
t.Logf("Encountered error %v loading sequence: %d", err, seq)
|
||||
continue
|
||||
}
|
||||
if subjectIsSubsetMatch(sm.subj, filter) {
|
||||
ss.Msgs++
|
||||
ss.Last = seq
|
||||
if ss.First == 0 || seq < ss.First {
|
||||
ss.First = seq
|
||||
}
|
||||
}
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
check := func(sseq uint64, filter string) {
|
||||
t.Helper()
|
||||
np, lvs := fs.NumPending(sseq, filter, false)
|
||||
ss := fs.FilteredState(sseq, filter)
|
||||
sss := sanityCheck(sseq, filter)
|
||||
if lvs != state.LastSeq {
|
||||
t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs)
|
||||
}
|
||||
if ss.Msgs != np {
|
||||
t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs)
|
||||
}
|
||||
if ss != sss {
|
||||
t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss)
|
||||
}
|
||||
}
|
||||
|
||||
sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState {
|
||||
t.Helper()
|
||||
var ss SimpleState
|
||||
var smv StoreMsg
|
||||
// For here we know 0 is invalid, set to 1.
|
||||
if sseq == 0 {
|
||||
sseq = 1
|
||||
}
|
||||
seen := make(map[string]bool)
|
||||
for seq := state.LastSeq; seq >= sseq; seq-- {
|
||||
sm, err := fs.LoadMsg(seq, &smv)
|
||||
if err != nil {
|
||||
t.Logf("Encountered error %v loading sequence: %d", err, seq)
|
||||
continue
|
||||
}
|
||||
if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) {
|
||||
ss.Msgs++
|
||||
if ss.Last == 0 {
|
||||
ss.Last = seq
|
||||
}
|
||||
if ss.First == 0 || seq < ss.First {
|
||||
ss.First = seq
|
||||
}
|
||||
seen[sm.subj] = true
|
||||
}
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
checkLastOnly := func(sseq uint64, filter string) {
|
||||
t.Helper()
|
||||
np, lvs := fs.NumPending(sseq, filter, true)
|
||||
ss := sanityCheckLastOnly(sseq, filter)
|
||||
if lvs != state.LastSeq {
|
||||
t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs)
|
||||
}
|
||||
if ss.Msgs != np {
|
||||
t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs)
|
||||
}
|
||||
}
|
||||
|
||||
startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999}
|
||||
checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"}
|
||||
|
||||
for _, filter := range checkSubs {
|
||||
for _, start := range startSeqs {
|
||||
check(start, filter)
|
||||
checkLastOnly(start, filter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user