mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge pull request #2833 from nats-io/si_subjects
Added in ability to get per subject details via StreamInfo.
This commit is contained in:
@@ -1148,5 +1148,15 @@
|
||||
"help": "",
|
||||
"url": "",
|
||||
"deprecates": ""
|
||||
},
|
||||
{
|
||||
"constant": "JSStreamInfoMaxSubjectsErr",
|
||||
"code": 500,
|
||||
"error_code": 10117,
|
||||
"description": "subject details would exceed maximum allowed",
|
||||
"comment": "",
|
||||
"help": "",
|
||||
"url": "",
|
||||
"deprecates": ""
|
||||
}
|
||||
]
|
||||
]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2021 The NATS Authors
|
||||
// Copyright 2019-2022 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -89,6 +89,7 @@ type fileStore struct {
|
||||
aek cipher.AEAD
|
||||
lmb *msgBlock
|
||||
blks []*msgBlock
|
||||
psmc map[string]uint64
|
||||
hh hash.Hash64
|
||||
qch chan struct{}
|
||||
cfs []*consumerFileStore
|
||||
@@ -280,6 +281,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
|
||||
fs := &fileStore{
|
||||
fcfg: fcfg,
|
||||
cfg: FileStreamInfo{Created: created, StreamConfig: cfg},
|
||||
psmc: make(map[string]uint64),
|
||||
prf: prf,
|
||||
qch: make(chan struct{}),
|
||||
}
|
||||
@@ -879,6 +881,13 @@ func (fs *fileStore) recoverMsgs() error {
|
||||
}
|
||||
fs.state.Msgs += mb.msgs
|
||||
fs.state.Bytes += mb.bytes
|
||||
// Walk the fss for this mb and fill in fs.psmc
|
||||
for subj, ss := range mb.fss {
|
||||
if len(subj) > 0 {
|
||||
fs.psmc[subj] += ss.Msgs
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
@@ -988,6 +997,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
mb.msgs--
|
||||
purged++
|
||||
// Update fss
|
||||
fs.removePerSubject(sm.subj)
|
||||
mb.removeSeqPerSubject(sm.subj, seq)
|
||||
}
|
||||
|
||||
@@ -1528,6 +1538,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
|
||||
return err
|
||||
}
|
||||
|
||||
// Adjust top level tracking of per subject msg counts.
|
||||
if len(subj) > 0 {
|
||||
fs.psmc[subj]++
|
||||
}
|
||||
|
||||
// Adjust first if needed.
|
||||
now := time.Unix(0, ts).UTC()
|
||||
if fs.state.Msgs == 0 {
|
||||
@@ -1724,6 +1739,19 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
|
||||
return fs.removeMsg(seq, true, true)
|
||||
}
|
||||
|
||||
// Convenience function to remove per subject tracking at the filestore level.
|
||||
// Lock should be held.
|
||||
func (fs *fileStore) removePerSubject(subj string) {
|
||||
if len(subj) == 0 {
|
||||
return
|
||||
}
|
||||
if n, ok := fs.psmc[subj]; ok && n == 1 {
|
||||
delete(fs.psmc, subj)
|
||||
} else if ok {
|
||||
fs.psmc[subj]--
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a message, optionally rewriting the mb file.
|
||||
func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) {
|
||||
fsLock := func() {
|
||||
@@ -1811,6 +1839,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
|
||||
mb.bytes -= msz
|
||||
|
||||
// If we are tracking multiple subjects here make sure we update that accounting.
|
||||
fs.removePerSubject(sm.subj)
|
||||
mb.removeSeqPerSubject(sm.subj, seq)
|
||||
|
||||
var shouldWriteIndex, firstSeqNeedsUpdate bool
|
||||
@@ -3274,19 +3303,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
|
||||
mb.llseq = seq
|
||||
}
|
||||
|
||||
// We use the high bit to denote we have already checked the checksum.
|
||||
var hh hash.Hash64
|
||||
if !hashChecked {
|
||||
hh = mb.hh // This will force the hash check in msgFromBuf.
|
||||
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
|
||||
}
|
||||
|
||||
li := int(bi) - mb.cache.off
|
||||
if li >= len(mb.cache.buf) {
|
||||
return nil, errPartialCache
|
||||
}
|
||||
buf := mb.cache.buf[li:]
|
||||
|
||||
// We use the high bit to denote we have already checked the checksum.
|
||||
var hh hash.Hash64
|
||||
if !hashChecked {
|
||||
hh = mb.hh // This will force the hash check in msgFromBuf.
|
||||
}
|
||||
|
||||
// Parse from the raw buffer.
|
||||
subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh)
|
||||
if err != nil {
|
||||
@@ -3297,6 +3325,11 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
|
||||
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq)
|
||||
}
|
||||
|
||||
// Clear the check bit here after we know all is good.
|
||||
if !hashChecked {
|
||||
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
|
||||
}
|
||||
|
||||
return &fileStoredMsg{subj, hdr, msg, seq, ts, mb, int64(bi)}, nil
|
||||
}
|
||||
|
||||
@@ -3451,6 +3484,7 @@ func (fs *fileStore) FastState(state *StreamState) {
|
||||
}
|
||||
}
|
||||
state.Consumers = len(fs.cfs)
|
||||
state.NumSubjects = len(fs.psmc)
|
||||
fs.mu.RUnlock()
|
||||
}
|
||||
|
||||
@@ -3459,7 +3493,9 @@ func (fs *fileStore) State() StreamState {
|
||||
fs.mu.RLock()
|
||||
state := fs.state
|
||||
state.Consumers = len(fs.cfs)
|
||||
state.NumSubjects = len(fs.psmc)
|
||||
state.Deleted = nil // make sure.
|
||||
|
||||
for _, mb := range fs.blks {
|
||||
mb.mu.Lock()
|
||||
fseq := mb.first.seq
|
||||
@@ -3819,6 +3855,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
|
||||
mb.msgs--
|
||||
mb.bytes -= rl
|
||||
// FSS updates.
|
||||
fs.removePerSubject(sm.subj)
|
||||
mb.removeSeqPerSubject(sm.subj, seq)
|
||||
// Check for first message.
|
||||
if seq == mb.first.seq {
|
||||
@@ -3924,6 +3961,9 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
|
||||
|
||||
fs.lmb.writeIndexInfo()
|
||||
|
||||
// Clear any per subject tracking.
|
||||
fs.psmc = make(map[string]uint64)
|
||||
|
||||
cb := fs.scb
|
||||
fs.mu.Unlock()
|
||||
|
||||
@@ -3988,6 +4028,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
|
||||
smb.msgs--
|
||||
purged++
|
||||
// Update fss
|
||||
fs.removePerSubject(sm.subj)
|
||||
smb.removeSeqPerSubject(sm.subj, mseq)
|
||||
}
|
||||
}
|
||||
@@ -4243,6 +4284,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
|
||||
if mb.fss == nil {
|
||||
mb.fss = make(map[string]*SimpleState)
|
||||
}
|
||||
|
||||
fseq, lseq := mb.first.seq, mb.last.seq
|
||||
for seq := fseq; seq <= lseq; seq++ {
|
||||
if sm, _ := mb.cacheLookup(seq); sm != nil && len(sm.subj) > 0 {
|
||||
|
||||
@@ -321,8 +321,12 @@ type JSApiStreamDeleteResponse struct {
|
||||
|
||||
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
|
||||
|
||||
// Maximum number of subject details we will send in the stream info.
|
||||
const JSMaxSubjectDetails = 100_000
|
||||
|
||||
type JSApiStreamInfoRequest struct {
|
||||
DeletedDetails bool `json:"deleted_details,omitempty"`
|
||||
DeletedDetails bool `json:"deleted_details,omitempty"`
|
||||
SubjectsFilter string `json:"subjects_filter,omitempty"`
|
||||
}
|
||||
|
||||
type JSApiStreamInfoResponse struct {
|
||||
@@ -1697,6 +1701,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
}
|
||||
|
||||
var details bool
|
||||
var subjects string
|
||||
if !isEmptyRequest(msg) {
|
||||
var req JSApiStreamInfoRequest
|
||||
if err := json.Unmarshal(msg, &req); err != nil {
|
||||
@@ -1704,7 +1709,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
details = req.DeletedDetails
|
||||
details, subjects = req.DeletedDetails, req.SubjectsFilter
|
||||
}
|
||||
|
||||
mset, err := acc.lookupStream(streamName)
|
||||
@@ -1730,6 +1735,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
resp.StreamInfo.Sources = mset.sourcesInfo()
|
||||
}
|
||||
|
||||
// Check if they have asked for subject details.
|
||||
if subjects != _EMPTY_ {
|
||||
if mss := mset.store.SubjectsState(subjects); len(mss) > 0 {
|
||||
if len(mss) > JSMaxSubjectDetails {
|
||||
resp.StreamInfo = nil
|
||||
resp.Error = NewJSStreamInfoMaxSubjectsError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
sd := make(map[string]uint64, len(mss))
|
||||
for subj, ss := range mss {
|
||||
sd[subj] = ss.Msgs
|
||||
}
|
||||
resp.StreamInfo.State.Subjects = sd
|
||||
}
|
||||
|
||||
}
|
||||
// Check for out of band catchups.
|
||||
if mset.hasCatchupPeers() {
|
||||
mset.checkClusterInfo(resp.StreamInfo)
|
||||
|
||||
@@ -260,6 +260,9 @@ const (
|
||||
// JSStreamHeaderExceedsMaximumErr header size exceeds maximum allowed of 64k
|
||||
JSStreamHeaderExceedsMaximumErr ErrorIdentifier = 10097
|
||||
|
||||
// JSStreamInfoMaxSubjectsErr subject details would exceed maximum allowed
|
||||
JSStreamInfoMaxSubjectsErr ErrorIdentifier = 10117
|
||||
|
||||
// JSStreamInvalidConfigF Stream configuration validation error string ({err})
|
||||
JSStreamInvalidConfigF ErrorIdentifier = 10052
|
||||
|
||||
@@ -438,6 +441,7 @@ var (
|
||||
JSStreamExternalDelPrefixOverlapsErrF: {Code: 400, ErrCode: 10022, Description: "stream external delivery prefix {prefix} overlaps with stream subject {subject}"},
|
||||
JSStreamGeneralErrorF: {Code: 500, ErrCode: 10051, Description: "{err}"},
|
||||
JSStreamHeaderExceedsMaximumErr: {Code: 400, ErrCode: 10097, Description: "header size exceeds maximum allowed of 64k"},
|
||||
JSStreamInfoMaxSubjectsErr: {Code: 500, ErrCode: 10117, Description: "subject details would exceed maximum allowed"},
|
||||
JSStreamInvalidConfigF: {Code: 500, ErrCode: 10052, Description: "{err}"},
|
||||
JSStreamInvalidErr: {Code: 500, ErrCode: 10096, Description: "stream not valid"},
|
||||
JSStreamInvalidExternalDeliverySubjErrF: {Code: 400, ErrCode: 10024, Description: "stream external delivery prefix {prefix} must not contain wildcards"},
|
||||
@@ -1445,6 +1449,16 @@ func NewJSStreamHeaderExceedsMaximumError(opts ...ErrorOption) *ApiError {
|
||||
return ApiErrors[JSStreamHeaderExceedsMaximumErr]
|
||||
}
|
||||
|
||||
// NewJSStreamInfoMaxSubjectsError creates a new JSStreamInfoMaxSubjectsErr error: "subject details would exceed maximum allowed"
|
||||
func NewJSStreamInfoMaxSubjectsError(opts ...ErrorOption) *ApiError {
|
||||
eopts := parseOpts(opts)
|
||||
if ae, ok := eopts.err.(*ApiError); ok {
|
||||
return ae
|
||||
}
|
||||
|
||||
return ApiErrors[JSStreamInfoMaxSubjectsErr]
|
||||
}
|
||||
|
||||
// NewJSStreamInvalidConfigError creates a new JSStreamInvalidConfigF error: "{err}"
|
||||
func NewJSStreamInvalidConfigError(err error, opts ...ErrorOption) *ApiError {
|
||||
eopts := parseOpts(opts)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2021 The NATS Authors
|
||||
// Copyright 2019-2022 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -4387,6 +4387,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
|
||||
if rresp.Error != nil {
|
||||
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
|
||||
}
|
||||
|
||||
// Can be any size message.
|
||||
var chunk [512]byte
|
||||
for r := bytes.NewReader(snapshot); ; {
|
||||
@@ -4478,6 +4479,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
|
||||
t.Fatalf("Expected restore subscription to be closed")
|
||||
}
|
||||
|
||||
req, _ = json.Marshal(rreq)
|
||||
rmsg, err = nc2.Request(strings.ReplaceAll(JSApiStreamRestoreT, JSApiPrefix, "$JS.domain.API"), req, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error on snapshot request: %v", err)
|
||||
@@ -4499,6 +4501,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
|
||||
t.Fatalf("Restore not honoring reply subjects for ack flow")
|
||||
}
|
||||
}
|
||||
|
||||
// For EOF this will send back stream info or an error.
|
||||
si, err := nc2.Request(rresp.DeliverSubject, nil, time.Second)
|
||||
if err != nil {
|
||||
@@ -8417,7 +8420,7 @@ func TestJetStreamDeleteMsg(t *testing.T) {
|
||||
t.Fatalf("Expected to get the stream back")
|
||||
}
|
||||
|
||||
expected := StreamState{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20}
|
||||
expected := StreamState{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20, NumSubjects: 1}
|
||||
state = mset.state()
|
||||
state.FirstTime, state.LastTime, state.Deleted, state.NumDeleted = time.Time{}, time.Time{}, nil, 0
|
||||
|
||||
@@ -14642,6 +14645,220 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
getInfo := func(t *testing.T, filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
if si.State.NumSubjects != 3 {
|
||||
t.Fatalf("Expected NumSubjects to be 3, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
return &si
|
||||
}
|
||||
|
||||
testSubjects := func(t *testing.T, st nats.StorageType) {
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Storage: st,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
defer js.DeleteStream("TEST")
|
||||
|
||||
counts, msg := []int{22, 33, 44}, []byte("ok")
|
||||
// Now place msgs, foo-22, bar-33 and baz-44.
|
||||
for i, subj := range []string{"foo", "bar", "baz"} {
|
||||
for n := 0; n < counts[i]; n++ {
|
||||
_, err = js.Publish(subj, msg)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test all subjects first.
|
||||
expected := map[string]uint64{"foo": 22, "bar": 33, "baz": 44}
|
||||
if si := getInfo(t, nats.AllKeys); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
if si := getInfo(t, "*"); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
// Filtered to 1.
|
||||
expected = map[string]uint64{"foo": 22}
|
||||
if si := getInfo(t, "foo"); !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("MemoryStore", func(t *testing.T) { testSubjects(t, nats.MemoryStorage) })
|
||||
t.Run("FileStore", func(t *testing.T) { testSubjects(t, nats.FileStorage) })
|
||||
}
|
||||
|
||||
func TestJetStreamStreamInfoSubjectsDetailsWithDeleteAndPurge(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
getInfo := func(t *testing.T, filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
return &si
|
||||
}
|
||||
|
||||
checkResults := func(t *testing.T, expected map[string]uint64) {
|
||||
t.Helper()
|
||||
si := getInfo(t, nats.AllKeys)
|
||||
if !reflect.DeepEqual(si.State.Subjects, expected) {
|
||||
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
|
||||
}
|
||||
if si.State.NumSubjects != len(expected) {
|
||||
t.Fatalf("Expected NumSubjects to be %d, but got %d", len(expected), si.State.NumSubjects)
|
||||
}
|
||||
}
|
||||
|
||||
testSubjects := func(t *testing.T, st nats.StorageType) {
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Storage: st,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
defer js.DeleteStream("TEST")
|
||||
|
||||
msg := []byte("ok")
|
||||
js.Publish("foo", msg) // 1
|
||||
js.Publish("foo", msg) // 2
|
||||
js.Publish("bar", msg) // 3
|
||||
js.Publish("baz", msg) // 4
|
||||
js.Publish("baz", msg) // 5
|
||||
js.Publish("bar", msg) // 6
|
||||
js.Publish("bar", msg) // 7
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "bar": 3, "baz": 2})
|
||||
|
||||
// Now delete some messages.
|
||||
js.DeleteMsg("TEST", 6)
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "bar": 2, "baz": 2})
|
||||
|
||||
// Delete and add right back, so no-op
|
||||
js.DeleteMsg("TEST", 5) // baz
|
||||
js.Publish("baz", msg) // 8
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "bar": 2, "baz": 2})
|
||||
|
||||
// Now do a purge only of bar.
|
||||
jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "bar"})
|
||||
_, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "TEST"), jr, time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
checkResults(t, map[string]uint64{"foo": 2, "baz": 2})
|
||||
|
||||
// Now purge everything
|
||||
err = js.PurgeStream("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
si := getInfo(t, nats.AllKeys)
|
||||
if len(si.State.Subjects) != 0 {
|
||||
t.Fatalf("Expected no subjects, but got %+v", si.State.Subjects)
|
||||
}
|
||||
if si.State.NumSubjects != 0 {
|
||||
t.Fatalf("Expected NumSubjects to be 0, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("MemoryStore", func(t *testing.T) { testSubjects(t, nats.MemoryStorage) })
|
||||
t.Run("FileStore", func(t *testing.T) { testSubjects(t, nats.FileStorage) })
|
||||
}
|
||||
|
||||
func TestJetStreamStreamInfoSubjectsDetailsAfterRestart(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
getInfo := func(t *testing.T, filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
return &si
|
||||
}
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
defer js.DeleteStream("TEST")
|
||||
|
||||
msg := []byte("ok")
|
||||
js.Publish("foo", msg) // 1
|
||||
js.Publish("foo", msg) // 2
|
||||
js.Publish("bar", msg) // 3
|
||||
js.Publish("baz", msg) // 4
|
||||
js.Publish("baz", msg) // 5
|
||||
|
||||
si := getInfo(t, nats.AllKeys)
|
||||
if si.State.NumSubjects != 3 {
|
||||
t.Fatalf("Expected 3 subjects, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
|
||||
// Stop current
|
||||
nc.Close()
|
||||
sd := s.JetStreamConfig().StoreDir
|
||||
s.Shutdown()
|
||||
// Restart.
|
||||
s = RunJetStreamServerOnPort(-1, sd)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, _ = jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
si = getInfo(t, nats.AllKeys)
|
||||
if si.State.NumSubjects != 3 {
|
||||
t.Fatalf("Expected 3 subjects, but got %d", si.State.NumSubjects)
|
||||
}
|
||||
}
|
||||
|
||||
// Issue #2836
|
||||
func TestJetStreamInterestRetentionBug(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
@@ -14686,7 +14903,6 @@ func TestJetStreamInterestRetentionBug(t *testing.T) {
|
||||
require_NoError(t, err)
|
||||
|
||||
test("bar", 1, 2)
|
||||
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -754,6 +754,7 @@ func (ms *memStore) FastState(state *StreamState) {
|
||||
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
|
||||
}
|
||||
state.Consumers = ms.consumers
|
||||
state.NumSubjects = len(ms.fss)
|
||||
ms.mu.RUnlock()
|
||||
}
|
||||
|
||||
@@ -763,6 +764,7 @@ func (ms *memStore) State() StreamState {
|
||||
|
||||
state := ms.state
|
||||
state.Consumers = ms.consumers
|
||||
state.NumSubjects = len(ms.fss)
|
||||
state.Deleted = nil
|
||||
|
||||
// Calculate interior delete details.
|
||||
|
||||
@@ -4110,3 +4110,89 @@ func TestNoRaceJetStreamClusterHealthz(t *testing.T) {
|
||||
t.Fatalf("Expected to have some errors until we became current, got none")
|
||||
}
|
||||
}
|
||||
|
||||
// Test that we can receive larger messages with stream subject details.
|
||||
// Also test that we will fail at some point and the user can fall back to
|
||||
// an orderedconsumer like we do with watch for KV Keys() call.
|
||||
func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
jetstream: enabled
|
||||
accounts: {
|
||||
default: {
|
||||
jetstream: true
|
||||
users: [ {user: me, password: pwd} ]
|
||||
limits { max_payload: 256 }
|
||||
}
|
||||
}
|
||||
`))
|
||||
defer removeFile(t, conf)
|
||||
|
||||
s, _ := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s, nats.UserInfo("me", "pwd"))
|
||||
defer nc.Close()
|
||||
|
||||
// Make sure we cannot send larger than 256 bytes.
|
||||
// But we can receive larger.
|
||||
sub, err := nc.SubscribeSync("foo")
|
||||
require_NoError(t, err)
|
||||
err = nc.Publish("foo", []byte(strings.Repeat("A", 300)))
|
||||
require_Error(t, err, nats.ErrMaxPayload)
|
||||
sub.Unsubscribe()
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*", "X.*"},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
n := JSMaxSubjectDetails
|
||||
for i := 0; i < n; i++ {
|
||||
_, err := js.PublishAsync(fmt.Sprintf("X.%d", i), []byte("OK"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
getInfo := func(filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
return &si
|
||||
}
|
||||
|
||||
si := getInfo("X.*")
|
||||
if len(si.State.Subjects) != n {
|
||||
t.Fatalf("Expected to get %d subject details, got %d", n, len(si.State.Subjects))
|
||||
}
|
||||
|
||||
// Now add one more message in which will exceed our internal limits for subject details.
|
||||
_, err = js.Publish("foo", []byte("TOO MUCH"))
|
||||
require_NoError(t, err)
|
||||
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: nats.AllKeys})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
var sir JSApiStreamInfoResponse
|
||||
err = json.Unmarshal(resp.Data, &sir)
|
||||
require_NoError(t, err)
|
||||
if !IsNatsErr(sir.Error, JSStreamInfoMaxSubjectsErr) {
|
||||
t.Fatalf("Did not get correct error response: %+v", sir.Error)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2021 The NATS Authors
|
||||
// Copyright 2019-2022 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -119,16 +119,18 @@ const (
|
||||
|
||||
// StreamState is information about the given stream.
|
||||
type StreamState struct {
|
||||
Msgs uint64 `json:"messages"`
|
||||
Bytes uint64 `json:"bytes"`
|
||||
FirstSeq uint64 `json:"first_seq"`
|
||||
FirstTime time.Time `json:"first_ts"`
|
||||
LastSeq uint64 `json:"last_seq"`
|
||||
LastTime time.Time `json:"last_ts"`
|
||||
NumDeleted int `json:"num_deleted,omitempty"`
|
||||
Deleted []uint64 `json:"deleted,omitempty"`
|
||||
Lost *LostStreamData `json:"lost,omitempty"`
|
||||
Consumers int `json:"consumer_count"`
|
||||
Msgs uint64 `json:"messages"`
|
||||
Bytes uint64 `json:"bytes"`
|
||||
FirstSeq uint64 `json:"first_seq"`
|
||||
FirstTime time.Time `json:"first_ts"`
|
||||
LastSeq uint64 `json:"last_seq"`
|
||||
LastTime time.Time `json:"last_ts"`
|
||||
NumSubjects int `json:"num_subjects,omitempty"`
|
||||
Subjects map[string]uint64 `json:"subjects,omitempty"`
|
||||
NumDeleted int `json:"num_deleted,omitempty"`
|
||||
Deleted []uint64 `json:"deleted,omitempty"`
|
||||
Lost *LostStreamData `json:"lost,omitempty"`
|
||||
Consumers int `json:"consumer_count"`
|
||||
}
|
||||
|
||||
// SimpleState for filtered subject specific state.
|
||||
|
||||
Reference in New Issue
Block a user