mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Implements pagination for JS Stream Info requests
This commit is contained in:
@@ -396,16 +396,18 @@ type JSApiStreamDeleteResponse struct {
|
||||
|
||||
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
|
||||
|
||||
// Maximum number of subject details we will send in the stream info.
|
||||
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
|
||||
const JSMaxSubjectDetails = 100_000
|
||||
|
||||
type JSApiStreamInfoRequest struct {
|
||||
ApiPagedRequest
|
||||
DeletedDetails bool `json:"deleted_details,omitempty"`
|
||||
SubjectsFilter string `json:"subjects_filter,omitempty"`
|
||||
}
|
||||
|
||||
type JSApiStreamInfoResponse struct {
|
||||
ApiResponse
|
||||
ApiPaged
|
||||
*StreamInfo
|
||||
}
|
||||
|
||||
@@ -1788,6 +1790,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
|
||||
var details bool
|
||||
var subjects string
|
||||
var offset int
|
||||
if !isEmptyRequest(msg) {
|
||||
var req JSApiStreamInfoRequest
|
||||
if err := json.Unmarshal(msg, &req); err != nil {
|
||||
@@ -1796,6 +1799,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
return
|
||||
}
|
||||
details, subjects = req.DeletedDetails, req.SubjectsFilter
|
||||
offset = req.Offset
|
||||
}
|
||||
|
||||
mset, err := acc.lookupStream(streamName)
|
||||
@@ -1825,17 +1829,39 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
// Check if they have asked for subject details.
|
||||
if subjects != _EMPTY_ {
|
||||
if mss := mset.store.SubjectsState(subjects); len(mss) > 0 {
|
||||
if len(mss) > JSMaxSubjectDetails {
|
||||
resp.StreamInfo = nil
|
||||
resp.Error = NewJSStreamInfoMaxSubjectsError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
// As go iterates over map in a non-consistent order, no choice but to buffer it a slice
|
||||
|
||||
buffer := make([]string, 0, len(mss))
|
||||
for subj := range mss {
|
||||
buffer = append(buffer, subj)
|
||||
}
|
||||
sd := make(map[string]uint64, len(mss))
|
||||
for subj, ss := range mss {
|
||||
sd[subj] = ss.Msgs
|
||||
|
||||
// Sort it
|
||||
sort.Strings(buffer)
|
||||
|
||||
if offset > len(buffer) {
|
||||
offset = len(buffer)
|
||||
}
|
||||
|
||||
end := offset + JSMaxSubjectDetails
|
||||
if end > len(buffer) {
|
||||
end = len(buffer)
|
||||
}
|
||||
|
||||
actualSize := end - offset
|
||||
var sd map[string]uint64
|
||||
|
||||
if actualSize > 0 {
|
||||
sd = make(map[string]uint64, actualSize)
|
||||
for _, ss := range buffer[offset:end] {
|
||||
sd[ss] = mss[ss].Msgs
|
||||
}
|
||||
}
|
||||
|
||||
resp.StreamInfo.State.Subjects = sd
|
||||
resp.Offset = offset
|
||||
resp.Limit = JSMaxSubjectDetails
|
||||
resp.Total = len(mss)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4289,37 +4289,31 @@ func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) {
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
getInfo := func(filter string) *StreamInfo {
|
||||
t.Helper()
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
return &si
|
||||
}
|
||||
|
||||
si := getInfo("X.*")
|
||||
// Need to grab StreamInfo by hand for now.
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: "X.*"})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
var si StreamInfo
|
||||
err = json.Unmarshal(resp.Data, &si)
|
||||
require_NoError(t, err)
|
||||
if len(si.State.Subjects) != n {
|
||||
t.Fatalf("Expected to get %d subject details, got %d", n, len(si.State.Subjects))
|
||||
}
|
||||
|
||||
// Now add one more message in which will exceed our internal limits for subject details.
|
||||
// Now add one more message to check pagination
|
||||
_, err = js.Publish("foo", []byte("TOO MUCH"))
|
||||
require_NoError(t, err)
|
||||
|
||||
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: nats.AllKeys})
|
||||
req, err = json.Marshal(&JSApiStreamInfoRequest{ApiPagedRequest: ApiPagedRequest{Offset: n}, SubjectsFilter: nats.AllKeys})
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
|
||||
resp, err = nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
var sir JSApiStreamInfoResponse
|
||||
err = json.Unmarshal(resp.Data, &sir)
|
||||
require_NoError(t, err)
|
||||
if !IsNatsErr(sir.Error, JSStreamInfoMaxSubjectsErr) {
|
||||
t.Fatalf("Did not get correct error response: %+v", sir.Error)
|
||||
if len(sir.State.Subjects) != 1 {
|
||||
t.Fatalf("Expected to get 1 extra subject detail, got %d", len(sir.State.Subjects))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user