Merge pull request #2695 from ripienaar/streams_filter

allow streams api to be filtered like list api
This commit is contained in:
R.I.Pienaar
2021-11-18 18:58:35 +01:00
committed by GitHub
3 changed files with 100 additions and 5 deletions

View File

@@ -351,6 +351,12 @@ type JSApiStreamNamesResponse struct {
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
type JSApiStreamListRequest struct {
ApiPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}
// JSApiStreamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type JSApiStreamListResponse struct {
@@ -1541,27 +1547,38 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
}
var offset int
var filter string
if !isEmptyRequest(msg) {
var req JSApiStreamNamesRequest
var req JSApiStreamListRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
if req.Subject != _EMPTY_ {
filter = req.Subject
}
}
// Clustered mode will invoke a scatter and gather.
if s.JetStreamIsClustered() {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) })
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
return
}
// TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
// TODO(dlc) - If this list is long maybe do this in a Go routine?
msets := acc.streams()
var msets []*stream
if filter == _EMPTY_ {
msets = acc.streams()
} else {
msets = acc.filteredStreams(filter)
}
sort.Slice(msets, func(i, j int) bool {
return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0
})

View File

@@ -3836,7 +3836,7 @@ func (s *Server) allPeersOffline(rg *raftGroup) bool {
// This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader.
// This will be running in a separate Go routine.
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) {
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filter string, offset int, subject, reply string, rmsg []byte) {
defer s.grWG.Done()
js, cc := s.getJetStreamCluster()
@@ -3848,8 +3848,29 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
var streams []*streamAssignment
for _, sa := range cc.streams[acc.Name] {
streams = append(streams, sa)
if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
continue
}
if filter != _EMPTY_ {
// These could not have subjects auto-filled in since they are raw and unprocessed.
if len(sa.Config.Subjects) == 0 {
if SubjectsCollide(filter, sa.Config.Name) {
streams = append(streams, sa)
}
} else {
for _, subj := range sa.Config.Subjects {
if SubjectsCollide(filter, subj) {
streams = append(streams, sa)
break
}
}
}
} else {
streams = append(streams, sa)
}
}
// Needs to be sorted for offsets etc.
if len(streams) > 1 {
sort.Slice(streams, func(i, j int) bool {

View File

@@ -9442,6 +9442,63 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) {
}
}
func TestJetStreamListFilter(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
testList := func(t *testing.T, srv *Server, r int) {
nc, js := jsClientConnect(t, srv)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "ONE",
Subjects: []string{"one.>"},
Replicas: r,
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TWO",
Subjects: []string{"two.>"},
Replicas: r,
})
require_NoError(t, err)
resp, err := nc.Request(JSApiStreamList, []byte("{}"), time.Second)
require_NoError(t, err)
list := &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)
if len(list.Streams) != 2 {
t.Fatalf("Expected 2 responses got %d", len(list.Streams))
}
resp, err = nc.Request(JSApiStreamList, []byte(`{"subject":"two.x"}`), time.Second)
require_NoError(t, err)
list = &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)
if len(list.Streams) != 1 {
t.Fatalf("Expected 1 response got %d", len(list.Streams))
}
if list.Streams[0].Config.Name != "TWO" {
t.Fatalf("Expected stream TWO in result got %#v", list.Streams[0])
}
}
t.Run("Single", func(t *testing.T) { testList(t, s, 1) })
t.Run("Clustered", func(t *testing.T) { testList(t, c.randomServer(), 3) })
}
func TestJetStreamConsumerUpdates(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()