mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] JetStream: Account max streams/consumers not always honoured
This could happen during concurrent requests where the assignments are not yet fully processed. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2034,16 +2034,15 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
|
||||
|
||||
startCh := make(chan bool)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(10)
|
||||
for n := 0; n < 10; n++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
go func(js nats.JetStreamContext) {
|
||||
defer wg.Done()
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
<-startCh
|
||||
js.SubscribeSync("in.maxcc.foo")
|
||||
}()
|
||||
}(js)
|
||||
}
|
||||
// Wait for Go routines.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
@@ -2060,6 +2059,97 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) {
|
||||
tmpl := `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
||||
|
||||
leaf {
|
||||
listen: 127.0.0.1:-1
|
||||
}
|
||||
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
|
||||
accounts {
|
||||
A {
|
||||
jetstream {
|
||||
max_file: 9663676416
|
||||
max_streams: 2
|
||||
max_consumers: 1
|
||||
}
|
||||
users = [ { user: "a", pass: "pwd" } ]
|
||||
}
|
||||
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
||||
}
|
||||
`
|
||||
c := createJetStreamClusterWithTemplate(t, tmpl, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "pwd"))
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "MAXCC",
|
||||
Storage: nats.MemoryStorage,
|
||||
Subjects: []string{"in.maxcc.>"},
|
||||
Replicas: 3,
|
||||
}
|
||||
if _, err := js.AddStream(cfg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
si, err := js.StreamInfo("MAXCC")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.Config.MaxConsumers != -1 {
|
||||
t.Fatalf("Expected max of -1, got %d", si.Config.MaxConsumers)
|
||||
}
|
||||
|
||||
startCh := make(chan bool)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(10)
|
||||
for n := 0; n < 10; n++ {
|
||||
nc, js := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "pwd"))
|
||||
defer nc.Close()
|
||||
go func(js nats.JetStreamContext, idx int) {
|
||||
defer wg.Done()
|
||||
<-startCh
|
||||
// Test adding new streams
|
||||
js.AddStream(&nats.StreamConfig{
|
||||
Name: fmt.Sprintf("OTHER_%d", idx),
|
||||
Replicas: 3,
|
||||
})
|
||||
// Test adding consumers to MAXCC stream
|
||||
js.SubscribeSync("in.maxcc.foo", nats.BindStream("MAXCC"))
|
||||
}(js, n)
|
||||
}
|
||||
// Wait for Go routines.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
close(startCh)
|
||||
wg.Wait()
|
||||
|
||||
var names []string
|
||||
for n := range js.StreamNames() {
|
||||
names = append(names, n)
|
||||
}
|
||||
if nc := len(names); nc > 2 {
|
||||
t.Fatalf("Expected only 2 streams, got %d", nc)
|
||||
}
|
||||
names = names[:0]
|
||||
for n := range js.ConsumerNames("MAXCC") {
|
||||
names = append(names, n)
|
||||
}
|
||||
if nc := len(names); nc > 1 {
|
||||
t.Fatalf("Expected only 1 consumer, got %d", nc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterPanicDecodingConsumerState(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user