diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f7d82926..77d71354 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -40,6 +40,9 @@ type jetStreamCluster struct { // For stream and consumer assignments. All servers will have this be the same. // ACCOUNT -> STREAM -> Stream Assignment -> Consumers streams map[string]map[string]*streamAssignment + // These are inflight proposals and used to apply limits when there are + // concurrent requests that would otherwise be accepted. + inflight map[string]map[string]struct{} // Signals meta-leader should check the stream assignments. streamsCheck bool // Server. @@ -837,6 +840,21 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b return false } +// Remove the stream `streamName` for the account `accName` from the inflight +// proposals map. This is done on success (processStreamAssignment) or on +// failure (processStreamAssignmentResults). +// (Write) Lock held on entry. +func (cc *jetStreamCluster) removeInflightProposal(accName, streamName string) { + streams, ok := cc.inflight[accName] + if !ok { + return + } + delete(streams, streamName) + if len(streams) == 0 { + delete(cc.inflight, accName) + } +} + // Return the cluster quit chan. func (js *jetStream) clusterQuitC() chan struct{} { js.mu.RLock() @@ -2646,7 +2664,7 @@ func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignm // processStreamAssignment is called when followers have replicated an assignment. func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { - js.mu.RLock() + js.mu.Lock() s, cc := js.srv, js.cluster accName, stream := sa.Client.serviceAccount(), sa.Config.Name noMeta := cc == nil || cc.meta == nil @@ -2658,13 +2676,15 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { if sa.Group != nil && ourID != _EMPTY_ { isMember = sa.Group.isMember(ourID) } - js.mu.RUnlock() + + // Remove this stream from the inflight proposals + cc.removeInflightProposal(accName, sa.Config.Name) if s == nil || noMeta { + js.mu.Unlock() return false } - js.mu.Lock() accStreams := cc.streams[accName] if accStreams == nil { accStreams = make(map[string]*streamAssignment) @@ -4342,6 +4362,11 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client s, cc := js.srv, js.cluster + // This should have been done already in processStreamAssignment, but in + // case we have a code path that gets here with no processStreamAssignment, + // then we will do the proper thing. Otherwise will be a no-op. + cc.removeInflightProposal(result.Account, result.Stream) + // FIXME(dlc) - suppress duplicates? if sa := js.streamAssignment(result.Account, result.Stream); sa != nil { canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second @@ -4979,7 +5004,10 @@ func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfi asa := js.cluster.streams[acc.Name] numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg) - + // Check for inflight proposals... + if cc := js.cluster; cc != nil && cc.inflight != nil { + numStreams += len(cc.inflight[acc.Name]) + } if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { return NewJSMaximumStreamsLimitError() } @@ -5006,17 +5034,6 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } cfg := &ccfg - js.mu.RLock() - apiErr = js.jsClusteredStreamLimitsCheck(acc, cfg) - asa := cc.streams[acc.Name] - js.mu.RUnlock() - // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. - if apiErr != nil { - resp.Error = apiErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return - } - // Now process the request and proposal. js.mu.Lock() defer js.mu.Unlock() @@ -5050,6 +5067,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } // Check for subject collisions here. + asa := cc.streams[acc.Name] for _, sa := range asa { for _, subj := range sa.Config.Subjects { for _, tsubj := range cfg.Subjects { @@ -5062,6 +5080,14 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } } + apiErr = js.jsClusteredStreamLimitsCheck(acc, cfg) + // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. + if apiErr != nil { + resp.Error = apiErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + // Raft group selection and placement. rg, err := js.createGroupForStream(ci, cfg) if err != nil { @@ -5073,7 +5099,20 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, rg.setPreferred() // Sync subject for post snapshot sync. sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} - cc.meta.Propose(encodeAddStreamAssignment(sa)) + if err := cc.meta.Propose(encodeAddStreamAssignment(sa)); err == nil { + // On success, add this as an inflight proposal so we can apply limits + // on concurrent create requests while this stream assignment has + // possibly not been processed yet. + if cc.inflight == nil { + cc.inflight = make(map[string]map[string]struct{}) + } + streams, ok := cc.inflight[acc.Name] + if !ok { + streams = make(map[string]struct{}) + cc.inflight[acc.Name] = streams + } + streams[cfg.Name] = struct{}{} + } } var ( @@ -6046,7 +6085,13 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } // Check for max consumers here to short circuit if possible. - if maxc := sa.Config.MaxConsumers; maxc > 0 { + // Start with limit on a stream, but if one is defined at the level of the account + // and is lower, use that limit. + maxc := sa.Config.MaxConsumers + if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) { + maxc = selectedLimits.MaxConsumers + } + if maxc > 0 { // Don't count DIRECTS. total := 0 for _, ca := range sa.consumers { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 42fe8171..b5970ef1 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2034,16 +2034,15 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { startCh := make(chan bool) var wg sync.WaitGroup - + wg.Add(10) for n := 0; n < 10; n++ { - wg.Add(1) - go func() { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + go func(js nats.JetStreamContext) { defer wg.Done() - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() <-startCh js.SubscribeSync("in.maxcc.foo") - }() + }(js) } // Wait for Go routines. time.Sleep(250 * time.Millisecond) @@ -2060,6 +2059,97 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { } } +func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + A { + jetstream { + max_file: 9663676416 + max_streams: 2 + max_consumers: 1 + } + users = [ { user: "a", pass: "pwd" } ] + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } + ` + c := createJetStreamClusterWithTemplate(t, tmpl, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "pwd")) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "MAXCC", + Storage: nats.MemoryStorage, + Subjects: []string{"in.maxcc.>"}, + Replicas: 3, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + si, err := js.StreamInfo("MAXCC") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Config.MaxConsumers != -1 { + t.Fatalf("Expected max of -1, got %d", si.Config.MaxConsumers) + } + + startCh := make(chan bool) + var wg sync.WaitGroup + wg.Add(10) + for n := 0; n < 10; n++ { + nc, js := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "pwd")) + defer nc.Close() + go func(js nats.JetStreamContext, idx int) { + defer wg.Done() + <-startCh + // Test adding new streams + js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("OTHER_%d", idx), + Replicas: 3, + }) + // Test adding consumers to MAXCC stream + js.SubscribeSync("in.maxcc.foo", nats.BindStream("MAXCC")) + }(js, n) + } + // Wait for Go routines. + time.Sleep(250 * time.Millisecond) + + close(startCh) + wg.Wait() + + var names []string + for n := range js.StreamNames() { + names = append(names, n) + } + if nc := len(names); nc > 2 { + t.Fatalf("Expected only 2 streams, got %d", nc) + } + names = names[:0] + for n := range js.ConsumerNames("MAXCC") { + names = append(names, n) + } + if nc := len(names); nc > 1 { + t.Fatalf("Expected only 1 consumer, got %d", nc) + } +} + func TestJetStreamClusterPanicDecodingConsumerState(t *testing.T) { c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown()