From d36dff7d2fa66c6e6ab2fefe0ed81dbcbe88f446 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 3 May 2020 07:59:44 -0700 Subject: [PATCH] Move list to names, and add list in for detailed info for streams and consumers Signed-off-by: Derek Collison --- server/jetstream_api.go | 191 +++++++++++++++++++++++++++++++++++----- test/jetstream_test.go | 33 +++++-- 2 files changed, 198 insertions(+), 26 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 9f24ff5e..9c16485c 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -37,9 +37,9 @@ const ( JSApiTemplateCreate = "$JS.API.STREAM.TEMPLATE.CREATE.*" JSApiTemplateCreateT = "$JS.API.STREAM.TEMPLATE.CREATE.%s" - // JSApiListTemplates is the endpoint to list all stream templates for this account. + // JSApiTemplates is the endpoint to list all stream template names for this account. // Will return JSON response. - JSApiTemplates = "$JS.API.STREAM.TEMPLATE.LIST" + JSApiTemplates = "$JS.API.STREAM.TEMPLATE.NAMES" // JSApiTemplateInfo is for obtaining general information about a named stream template. // Will return JSON response. @@ -61,9 +61,11 @@ const ( JSApiStreamUpdate = "$JS.API.STREAM.UPDATE.*" JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s" - // JSApiListStreams is the endpoint to list all streams for this account. + // JSApiStreams is the endpoint to list all stream names for this account. // Will return JSON response. - JSApiStreams = "$JS.API.STREAM.LIST" + JSApiStreams = "$JS.API.STREAM.NAMES" + // JSApiStreamList is the endpoint that will return all detailed stream information + JSApiStreamList = "$JS.API.STREAM.LIST" // JSApiStreamInfo is for obtaining general information about a named stream. // Will return JSON response. @@ -95,15 +97,19 @@ const ( JSApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*" JSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s" - // JSApiDurableConsumerCreate is the endpoint to create ephemeral consumers for streams. + // JSApiDurableCreate is the endpoint to create ephemeral consumers for streams. // You need to include the stream and consumer name in the subject. JSApiDurableCreate = "$JS.API.CONSUMER.DURABLE.CREATE.*.*" JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s" - // JSApiConsumers is the endpoint to list all consumers for the stream. + // JSApiConsumers is the endpoint to list all consumer names for the stream. // Will return JSON response. - JSApiConsumers = "$JS.API.CONSUMER.LIST.*" - JSApiConsumersT = "$JS.API.CONSUMER.LIST.%s" + JSApiConsumers = "$JS.API.CONSUMER.NAMES.*" + JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s" + + // JSApiConsumerList is the endpoint that will return all detailed consumer information + JSApiConsumerList = "$JS.API.CONSUMER.LIST.*" + JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s" // JSApiConsumerInfo is for obtaining general information about a consumer. // Will return JSON response. @@ -183,7 +189,8 @@ type JSApiStreamInfoResponse struct { // Maximum entries we will return for streams or consumers lists. // TODO(dlc) - with header or request support could request chunked response. -const JSApiListLimit = 1024 +const JSApiNamesLimit = 1024 +const JSApiListLimit = 256 type JSApiStreamsRequest struct { Offset int `json:"offset"` @@ -199,6 +206,16 @@ type JSApiStreamsResponse struct { Streams []string `json:"streams"` } +// JSApiStreamListResponse list of detailed stream information. +// A nil request is valid and means all streams. +type JSApiStreamListResponse struct { + Error *ApiError `json:"error,omitempty"` + Total int `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` + Streams []*StreamInfo `json:"streams"` +} + // JSApiStreamPurgeResponse. type JSApiStreamPurgeResponse struct { Error *ApiError `json:"error,omitempty"` @@ -266,6 +283,15 @@ type JSApiConsumersResponse struct { Consumers []string `json:"streams"` } +// JSApiConsumerListResponse. +type JSApiConsumerListResponse struct { + Error *ApiError `json:"error,omitempty"` + Total int `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` + Consumers []*ConsumerInfo `json:"streams"` +} + // JSApiStreamTemplateCreateResponse for creating templates. type JSApiStreamTemplateCreateResponse struct { Error *ApiError `json:"error,omitempty"` @@ -284,9 +310,17 @@ type JSApiStreamTemplateInfoResponse struct { *StreamTemplateInfo } +// JSApiTemplatessRequest +type JSApiTemplatessRequest struct { + Offset int `json:"offset"` +} + // JSApiStreamTemplateListResponse list of templates. -type JSApiStreamTemplateListResponse struct { +type JSApiStreamTemplatesResponse struct { Error *ApiError `json:"error,omitempty"` + Total int `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` Templates []string `json:"streams,omitempty"` } @@ -305,6 +339,7 @@ var allJsExports = []string{ JSApiStreamCreate, JSApiStreamUpdate, JSApiStreams, + JSApiStreamList, JSApiStreamInfo, JSApiStreamDelete, JSApiStreamPurge, @@ -313,6 +348,7 @@ var allJsExports = []string{ JSApiConsumerCreate, JSApiDurableCreate, JSApiConsumers, + JSApiConsumerList, JSApiConsumerInfo, JSApiConsumerDelete, } @@ -324,12 +360,13 @@ func (s *Server) setJetStreamExportSubs() error { }{ {JSApiAccountInfo, s.jsAccountInfoRequest}, {JSApiTemplateCreate, s.jsTemplateCreateRequest}, - {JSApiTemplates, s.jsTemplateListRequest}, + {JSApiTemplates, s.jsTemplateNamesRequest}, {JSApiTemplateInfo, s.jsTemplateInfoRequest}, {JSApiTemplateDelete, s.jsTemplateDeleteRequest}, {JSApiStreamCreate, s.jsStreamCreateRequest}, {JSApiStreamUpdate, s.jsStreamUpdateRequest}, - {JSApiStreams, s.jsStreamListRequest}, + {JSApiStreams, s.jsStreamNamesRequest}, + {JSApiStreamList, s.jsStreamListRequest}, {JSApiStreamInfo, s.jsStreamInfoRequest}, {JSApiStreamDelete, s.jsStreamDeleteRequest}, {JSApiStreamPurge, s.jsStreamPurgeRequest}, @@ -337,7 +374,8 @@ func (s *Server) setJetStreamExportSubs() error { {JSApiMsgGet, s.jsMsgGetRequest}, {JSApiConsumerCreate, s.jsConsumerCreateRequest}, {JSApiDurableCreate, s.jsDurableCreateRequest}, - {JSApiConsumers, s.jsConsumerListRequest}, + {JSApiConsumers, s.jsConsumerNamesRequest}, + {JSApiConsumerList, s.jsConsumerListRequest}, {JSApiConsumerInfo, s.jsConsumerInfoRequest}, {JSApiConsumerDelete, s.jsConsumerDeleteRequest}, } @@ -424,24 +462,45 @@ func (s *Server) jsTemplateCreateRequest(sub *subscription, c *client, subject, s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp)) } -// Request for the list of all templates. -func (s *Server) jsTemplateListRequest(sub *subscription, c *client, subject, reply string, msg []byte) { +// Request for the list of all template names. +func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, subject, reply string, msg []byte) { if c == nil || c.acc == nil { return } - var resp JSApiStreamTemplateListResponse + var resp JSApiStreamTemplatesResponse if !c.acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) return } + var offset int + if !isEmptyRequest(msg) { + var req JSApiTemplatessRequest + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = jsBadRequestErr + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + offset = req.Offset + } + ts := c.acc.Templates() - for _, t := range ts { + sort.Slice(ts, func(i, j int) bool { + return strings.Compare(ts[i].StreamTemplateConfig.Name, ts[j].StreamTemplateConfig.Name) < 0 + }) + + for _, t := range ts[offset:] { t.mu.Lock() name := t.Name t.mu.Unlock() resp.Templates = append(resp.Templates, name) + if len(resp.Templates) >= JSApiNamesLimit { + break + } } + resp.Total = len(ts) + resp.Limit = JSApiNamesLimit + resp.Offset = offset s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp)) } @@ -594,8 +653,8 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp)) } -// Request for the list of all streams. -func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, reply string, msg []byte) { +// Request for the list of all stream names. +func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, reply string, msg []byte) { if c == nil || c.acc == nil { return } @@ -626,6 +685,49 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl for _, mset := range msets[offset:] { resp.Streams = append(resp.Streams, mset.config.Name) + if len(resp.Streams) >= JSApiNamesLimit { + break + } + } + resp.Total = len(msets) + resp.Limit = JSApiNamesLimit + resp.Offset = offset + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp)) +} + +// Request for the list of all detailed stream info. +// TODO(dlc) - combine with above long term +func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, reply string, msg []byte) { + if c == nil || c.acc == nil { + return + } + var resp JSApiStreamListResponse + if !c.acc.JetStreamEnabled() { + resp.Error = jsNotEnabledErr + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + var offset int + if !isEmptyRequest(msg) { + var req JSApiStreamsRequest + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = jsBadRequestErr + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + offset = req.Offset + } + + // TODO(dlc) - Maybe hold these results for large results that we expect to be paged. + // TODO(dlc) - If this list is long maybe do this in a Go routine? + msets := c.acc.Streams() + sort.Slice(msets, func(i, j int) bool { + return strings.Compare(msets[i].config.Name, msets[j].config.Name) < 0 + }) + + for _, mset := range msets[offset:] { + resp.Streams = append(resp.Streams, &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()}) if len(resp.Streams) >= JSApiListLimit { break } @@ -921,8 +1023,8 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp)) } -// Request for the list of all consumers. -func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, reply string, msg []byte) { +// Request for the list of all consumer names. +func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, reply string, msg []byte) { if c == nil || c.acc == nil { return } @@ -958,6 +1060,53 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re }) for _, o := range obs[offset:] { resp.Consumers = append(resp.Consumers, o.Name()) + if len(resp.Consumers) >= JSApiNamesLimit { + break + } + } + resp.Total = len(obs) + resp.Limit = JSApiNamesLimit + resp.Offset = offset + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp)) +} + +// Request for the list of all detailed consumer information. +func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, reply string, msg []byte) { + if c == nil || c.acc == nil { + return + } + var resp JSApiConsumerListResponse + if !c.acc.JetStreamEnabled() { + resp.Error = jsNotEnabledErr + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + var offset int + if !isEmptyRequest(msg) { + var req JSApiConsumersRequest + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = jsBadRequestErr + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + offset = req.Offset + } + + streamName := streamNameFromSubject(subject) + mset, err := c.acc.LookupStream(streamName) + if err != nil { + resp.Error = jsError(err) + s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + obs := mset.Consumers() + sort.Slice(obs, func(i, j int) bool { + return strings.Compare(obs[i].name, obs[j].name) < 0 + }) + for _, o := range obs[offset:] { + resp.Consumers = append(resp.Consumers, o.Info()) if len(resp.Consumers) >= JSApiListLimit { break } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index c4b404e0..57be7042 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -4253,12 +4253,35 @@ func TestJetStreamRequestAPI(t *testing.T) { t.Fatalf("Expected to see 1 Stream, got %d", info.Streams) } - // Make sure list works. + // Make sure list names works. resp, err = nc.Request(server.JSApiStreams, nil, time.Second) - var listResponse server.JSApiStreamsResponse + var namesResponse server.JSApiStreamsResponse + if err = json.Unmarshal(resp.Data, &namesResponse); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(namesResponse.Streams) != 1 { + t.Fatalf("Expected only 1 stream but got %d", len(namesResponse.Streams)) + } + if namesResponse.Total != 1 { + t.Fatalf("Expected total to be 1 but got %d", namesResponse.Total) + } + if namesResponse.Offset != 0 { + t.Fatalf("Expected offset to be 0 but got %d", namesResponse.Offset) + } + if namesResponse.Limit != server.JSApiNamesLimit { + t.Fatalf("Expected limit to be %d but got %d", server.JSApiNamesLimit, namesResponse.Limit) + } + if namesResponse.Streams[0] != msetCfg.Name { + t.Fatalf("Expected to get %q, but got %q", msetCfg.Name, namesResponse.Streams[0]) + } + + // Now do detailed version. + resp, err = nc.Request(server.JSApiStreamList, nil, time.Second) + var listResponse server.JSApiStreamListResponse if err = json.Unmarshal(resp.Data, &listResponse); err != nil { t.Fatalf("Unexpected error: %v", err) } + if len(listResponse.Streams) != 1 { t.Fatalf("Expected only 1 stream but got %d", len(listResponse.Streams)) } @@ -4271,8 +4294,8 @@ func TestJetStreamRequestAPI(t *testing.T) { if listResponse.Limit != server.JSApiListLimit { t.Fatalf("Expected limit to be %d but got %d", server.JSApiListLimit, listResponse.Limit) } - if listResponse.Streams[0] != msetCfg.Name { - t.Fatalf("Expected to get %q, but got %q", msetCfg.Name, listResponse.Streams[0]) + if listResponse.Streams[0].Config.Name != msetCfg.Name { + t.Fatalf("Expected to get %q, but got %q", msetCfg.Name, listResponse.Streams[0].Config.Name) } // Now send some messages, then we can poll for info on this stream. @@ -4571,7 +4594,7 @@ func TestJetStreamRequestAPI(t *testing.T) { } // Now grab the list of templates - var tListResp server.JSApiStreamTemplateListResponse + var tListResp server.JSApiStreamTemplatesResponse resp, err = nc.Request(server.JSApiTemplates, nil, time.Second) if err = json.Unmarshal(resp.Data, &tListResp); err != nil { t.Fatalf("Unexpected error: %v", err)