When under load, concurrent stream creation of the same stream could return stream not found, which is odd.

Here we know that if we can't find the stream but have the stream assignment, this is a distinct possibility. So we wait, since not processed inline, to see if it appears.

Fixes TestJetStreamClusterParallelStreamCreation as well that could flap.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-27 18:05:43 -07:00
parent bc012d78c9
commit a7ca71017b

View File

@@ -1731,14 +1731,13 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
var clusterWideConsCount int
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
if js == nil {
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if cc != nil {
// Check to make sure the stream is assigned.
js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
var offline bool
@@ -1833,15 +1832,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}
mset, err := acc.lookupStream(streamName)
// Error is not to be expected at this point, but could happen if same stream trying to be created.
if err != nil {
if cc != nil {
// This could be inflight, pause for a short bit and try again.
// This will not be inline, so ok.
time.Sleep(10 * time.Millisecond)
mset, err = acc.lookupStream(streamName)
}
// Check again.
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
config := mset.config()
js, _ := s.getJetStreamCluster()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.stateWithDetail(details),