From 9f1580686a787f234a05aece0124c1ca7377fae6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 1 Mar 2023 04:33:47 -0800 Subject: [PATCH 1/2] Revert behavior for JetStream published directly from client to be handled inline. Signed-off-by: Derek Collison --- server/stream.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/server/stream.go b/server/stream.go index 80035576..1683e790 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2022 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3646,9 +3646,22 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) { // processInboundJetStreamMsg handles processing messages bound for a stream. func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - // Always move this to another Go routine. hdr, msg := c.msgParts(rmsg) - mset.queueInboundMsg(subject, reply, hdr, msg) + + // If we are not receiving directly from a client we should move this to another Go routine. + // Make sure to grab no stream or js locks. + if c.kind != CLIENT { + mset.queueInboundMsg(subject, reply, hdr, msg) + return + } + + // This is directly from a client so process inline. + // If we are clustered we need to propose this message to the underlying raft group. + if mset.IsClustered() { + mset.processClusteredInboundMsg(subject, reply, hdr, msg) + } else { + mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0) + } } var ( From baca7bd75125cf0b75f0c616225a4e8024bca77c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 1 Mar 2023 04:58:01 -0800 Subject: [PATCH 2/2] Fix for test flapper Signed-off-by: Derek Collison --- server/norace_test.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index 074bd45e..9fd8702d 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5124,21 +5124,26 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) time.Sleep(5 * time.Second) close(qch) wg.Wait() - time.Sleep(time.Second) - si, err := js.StreamInfo("TEST") - require_NoError(t, err) + checkFor(t, 20*time.Second, 500*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + if err != nil { + return err + } + ci, err := js.ConsumerInfo("TEST", "dur") + if err != nil { + return err + } - ci, err := js.ConsumerInfo("TEST", "dur") - require_NoError(t, err) - - ld := ci.Delivered.Stream - if si.State.FirstSeq > ld { - ld = si.State.FirstSeq - 1 - } - if si.State.LastSeq-ld != ci.NumPending { - t.Fatalf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending) - } + ld := ci.Delivered.Stream + if si.State.FirstSeq > ld { + ld = si.State.FirstSeq - 1 + } + if si.State.LastSeq-ld != ci.NumPending { + return fmt.Errorf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending) + } + return nil + }) } // Test that all peers have the direct access subs that participate in a queue group,