From a7ca71017b226145f1711dd727d9d1031f290ded Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 27 Sep 2023 18:05:43 -0700 Subject: [PATCH] When under load, concurrent stream creation of the same stream could return stream not found, which is odd. Here we know that if we can't find the stream but have the stream assignment, this is a distinct possibility. So we wait, since not processed inline, to see if it appears. Fixes TestJetStreamClusterParallelStreamCreation as well that could flap. Signed-off-by: Derek Collison --- server/jetstream_api.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 53736a1b..dfade67d 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1731,14 +1731,13 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s var clusterWideConsCount int + js, cc := s.getJetStreamCluster() + if js == nil { + return + } // If we are in clustered mode we need to be the stream leader to proceed. - if s.JetStreamIsClustered() { + if cc != nil { // Check to make sure the stream is assigned. - js, cc := s.getJetStreamCluster() - if js == nil || cc == nil { - return - } - js.mu.RLock() isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName) var offline bool @@ -1833,15 +1832,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s } mset, err := acc.lookupStream(streamName) + // Error is not to be expected at this point, but could happen if same stream trying to be created. if err != nil { - resp.Error = NewJSStreamNotFoundError(Unless(err)) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return + if cc != nil { + // This could be inflight, pause for a short bit and try again. + // This will not be inline, so ok. + time.Sleep(10 * time.Millisecond) + mset, err = acc.lookupStream(streamName) + } + // Check again. + if err != nil { + resp.Error = NewJSStreamNotFoundError(Unless(err)) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } } config := mset.config() - js, _ := s.getJetStreamCluster() - resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.stateWithDetail(details),