[FIXED] bug in consumer names paging, did not honor limits and returned duplicate results.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-09-29 06:14:00 -07:00
parent 98de9136ce
commit fef702a688
2 changed files with 98 additions and 2 deletions

View File

@@ -151,7 +151,8 @@ const (
JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
// JSApiConsumerList is the endpoint that will return all detailed consumer information
JSApiConsumerList = "$JS.API.CONSUMER.LIST.*"
JSApiConsumerList = "$JS.API.CONSUMER.LIST.*"
JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
// JSApiConsumerInfo is for obtaining general information about a consumer.
// Will return JSON response.
@@ -3934,7 +3935,10 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
numConsumers = len(resp.Consumers)
if offset > numConsumers {
offset = numConsumers
resp.Consumers = resp.Consumers[:offset]
}
resp.Consumers = resp.Consumers[offset:]
if len(resp.Consumers) > JSApiNamesLimit {
resp.Consumers = resp.Consumers[:JSApiNamesLimit]
}
js.mu.RUnlock()

View File

@@ -19,6 +19,7 @@ package server
import (
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"
@@ -210,3 +211,94 @@ func TestJetStreamClusterCreateConsumerWithReplicaOneGetsResponse(t *testing.T)
require_True(t, ci.Config.Replicas == 1)
require_True(t, len(ci.Cluster.Replicas) == 0)
}
func TestJetStreamClusterConsumerListPaging(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
cfg := &nats.ConsumerConfig{
Replicas: 1,
MemoryStorage: true,
AckPolicy: nats.AckExplicitPolicy,
}
// create 3000 consumers.
numConsumers := 3000
for i := 1; i <= numConsumers; i++ {
cfg.Durable = fmt.Sprintf("d-%.4d", i)
_, err := js.AddConsumer("TEST", cfg)
require_NoError(t, err)
}
// Test both names and list operations.
// Names
reqSubj := fmt.Sprintf(JSApiConsumersT, "TEST")
grabConsumerNames := func(offset int) []string {
req := fmt.Sprintf(`{"offset":%d}`, offset)
respMsg, err := nc.Request(reqSubj, []byte(req), time.Second)
require_NoError(t, err)
var resp JSApiConsumerNamesResponse
err = json.Unmarshal(respMsg.Data, &resp)
require_NoError(t, err)
// Sanity check that we are actually paging properly around limits.
if resp.Limit < len(resp.Consumers) {
t.Fatalf("Expected total limited to %d but got %d", resp.Limit, len(resp.Consumers))
}
return resp.Consumers
}
results := make(map[string]bool)
for offset := 0; len(results) < numConsumers; {
consumers := grabConsumerNames(offset)
offset += len(consumers)
for _, name := range consumers {
if results[name] {
t.Fatalf("Found duplicate %q", name)
}
results[name] = true
}
}
// List
reqSubj = fmt.Sprintf(JSApiConsumerListT, "TEST")
grabConsumerList := func(offset int) []*ConsumerInfo {
req := fmt.Sprintf(`{"offset":%d}`, offset)
respMsg, err := nc.Request(reqSubj, []byte(req), time.Second)
require_NoError(t, err)
var resp JSApiConsumerListResponse
err = json.Unmarshal(respMsg.Data, &resp)
require_NoError(t, err)
// Sanity check that we are actually paging properly around limits.
if resp.Limit < len(resp.Consumers) {
t.Fatalf("Expected total limited to %d but got %d", resp.Limit, len(resp.Consumers))
}
return resp.Consumers
}
results = make(map[string]bool)
for offset := 0; len(results) < numConsumers; {
consumers := grabConsumerList(offset)
offset += len(consumers)
for _, ci := range consumers {
name := ci.Config.Durable
if results[name] {
t.Fatalf("Found duplicate %q", name)
}
results[name] = true
}
}
}