[IMPROVED] Concurrent stream creation of the same stream could return not found (#4600)

Here we know that if we can't find the stream but have the stream
assignment, this is a distinct possibility. So we wait, since not
processed inline, to see if it appears.

Fixes TestJetStreamClusterParallelStreamCreation as well that could
flap.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-27 19:55:52 -07:00
committed by GitHub

View File

@@ -1731,14 +1731,13 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
var clusterWideConsCount int
js, cc := s.getJetStreamCluster()
if js == nil {
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
if cc != nil {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
var offline bool
@@ -1833,15 +1832,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}
mset, err := acc.lookupStream(streamName)
// Error is not to be expected at this point, but could happen if same stream trying to be created.
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
if cc != nil {
// This could be inflight, pause for a short bit and try again.
// This will not be inline, so ok.
time.Sleep(10 * time.Millisecond)
mset, err = acc.lookupStream(streamName)
}
// Check again.
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
config := mset.config()
js, _ := s.getJetStreamCluster()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.stateWithDetail(details),