diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f3ed160c..5a4da584 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -649,11 +649,18 @@ type JSApiStreamTemplateNamesResponse struct { const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response" -// Default max API calls outstanding. -const defaultMaxJSApiOut = int64(4096) - -// Max API calls outstanding. -var maxJSApiOut = defaultMaxJSApiOut +// Structure that holds state for a JetStream API request that is processed +// in a separate long-lived go routine. This is to avoid possibly blocking +// ROUTE and GATEWAY connections. +type jsAPIRoutedReq struct { + jsub *subscription + sub *subscription + acc *Account + subject string + reply string + msg []byte + pa pubArg +} func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) { // No lock needed, those are immutable. @@ -687,36 +694,36 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub } // If we are here we have received this request over a non client connection. - // We need to make sure not to block. We will spin a Go routine per but also make - // sure we do not have too many outstanding. - if apiOut := atomic.AddInt64(&js.apiInflight, 1); apiOut > maxJSApiOut { - atomic.AddInt64(&js.apiInflight, -1) - ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) - if err == nil { - resp := &ApiResponse{Type: JSApiOverloadedType, Error: NewJSInsufficientResourcesError()} - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - } else { - s.Warnf(badAPIRequestT, rmsg) - } - s.Warnf("JetStream API limit exceeded: %d calls outstanding", apiOut) - return - } + // We need to make sure not to block. We will send the request to a long-lived + // go routine. - // If we are here we can properly dispatch this API call. - // Copy the message and the client. Client for the pubArgs - // but note the JSAPI only uses the hdr index to piece apart - // the header from the msg body. No other references are needed. - // FIXME(dlc) - Should cleanup eventually and make sending - // and receiving internal messages more formal. - rmsg = copyBytes(rmsg) + // Copy the state. Note the JSAPI only uses the hdr index to piece apart the + // header from the msg body. No other references are needed. + s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) +} + +func (s *Server) processJSAPIRoutedRequests() { + defer s.grWG.Done() + + s.mu.Lock() + queue := s.jsAPIRoutedReqs client := &client{srv: s, kind: JETSTREAM} - client.pa = c.pa + s.mu.Unlock() - // Dispatch the API call to its own Go routine. - go func() { - jsub.icb(sub, client, acc, subject, reply, rmsg) - atomic.AddInt64(&js.apiInflight, -1) - }() + for { + select { + case <-queue.ch: + reqs := queue.pop() + for _, req := range reqs { + r := req.(*jsAPIRoutedReq) + client.pa = r.pa + r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg) + } + queue.recycle(&reqs) + case <-s.quitCh: + return + } + } } func (s *Server) setJetStreamExportSubs() error { @@ -725,6 +732,11 @@ func (s *Server) setJetStreamExportSubs() error { return NewJSNotEnabledError() } + // Start the go routine that will process API requests received by the + // subscription below when they are coming from routes, etc.. + s.jsAPIRoutedReqs = s.newIPQueue("Routed JS API Requests") + s.startGoRoutine(s.processJSAPIRoutedRequests) + // This is the catch all now for all JetStream API calls. if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil { return err diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index df7559f9..ebcafcd9 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1100,16 +1100,6 @@ func (c *cluster) randomNonStreamLeader(account, stream string) *Server { return nil } -func (c *cluster) randomStreamNotAssigned(account, stream string) *Server { - c.t.Helper() - for _, s := range c.servers { - if !s.JetStreamIsStreamAssigned(account, stream) { - return s - } - } - return nil -} - func (c *cluster) streamLeader(account, stream string) *Server { c.t.Helper() for _, s := range c.servers { diff --git a/server/norace_test.go b/server/norace_test.go index 1f393c0c..3a642d84 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3855,61 +3855,6 @@ func TestNoRaceJetStreamClusterStreamReset(t *testing.T) { c.waitOnConsumerLeader("$G", "TEST", "d1") } -// Issue #2644 -func TestNoRaceJetStreamPullConsumerAPIOutUnlock(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - // Client based API - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Retention: nats.WorkQueuePolicy, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - if _, err = js.PullSubscribe("foo", "dlc"); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - for i := 0; i < 100; i++ { - if _, err := js.PublishAsync("foo", []byte("OK")); err != nil { - t.Fatalf("Unexpected publish error: %v", err) - } - } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(time.Second): - t.Fatalf("Did not receive completion signal") - } - - // Force to go through route to use the Go routines, etc. - s := c.randomStreamNotAssigned("$G", "TEST") - if s == nil { - t.Fatalf("Did not get a server") - } - - nc, _ = jsClientConnect(t, s) - defer nc.Close() - - // Set this low to trigger error. - maxJSApiOut = 5 - defer func() { maxJSApiOut = defaultMaxJSApiOut }() - - nsubj := fmt.Sprintf(JSApiRequestNextT, "TEST", "dlc") - for i := 0; i < 500; i++ { - if err := nc.PublishRequest(nsubj, "bar", nil); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - } - nc.Flush() -} - // Reports of high cpu on compaction for a KV store. func TestNoRaceJetStreamKeyValueCompaction(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) diff --git a/server/server.go b/server/server.go index 1e97a16d..09fc9c92 100644 --- a/server/server.go +++ b/server/server.go @@ -285,6 +285,9 @@ type Server struct { // Total outbound syncRequests syncOutSem chan struct{} + + // Queue to process JS API requests that come from routes (or gateways) + jsAPIRoutedReqs *ipQueue } // For tracking JS nodes.