Avoid deadlock, release js lock

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-12-29 10:46:53 -08:00
parent 1a37f0963a
commit 5932fa1852

View File

@@ -3841,30 +3841,36 @@ func (s *Server) jsClusteredStreamPurgeRequest(
}
js.mu.Lock()
defer js.mu.Unlock()
sa := js.streamAssignment(acc.Name, stream)
if sa == nil {
resp := JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
resp.Error = NewJSStreamNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
js.mu.Unlock()
return
}
if n := sa.Group.node; n != nil {
sp := &streamPurge{Stream: stream, LastSeq: mset.state().LastSeq, Subject: subject, Reply: reply, Client: ci, Request: preq}
n.Propose(encodeStreamPurge(sp))
} else if mset != nil {
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
purged, err := mset.purge(preq)
if err != nil {
resp.Error = NewJSStreamGeneralError(err, Unless(err))
} else {
resp.Purged = purged
resp.Success = true
}
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
js.mu.Unlock()
return
}
js.mu.Unlock()
if mset == nil {
return
}
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
purged, err := mset.purge(preq)
if err != nil {
resp.Error = NewJSStreamGeneralError(err, Unless(err))
} else {
resp.Purged = purged
resp.Success = true
}
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
}
func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, req *JSApiStreamRestoreRequest, stream, subject, reply string, rmsg []byte) {
@@ -4273,35 +4279,42 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset
}
js.mu.Lock()
defer js.mu.Unlock()
sa := js.streamAssignment(acc.Name, stream)
if sa == nil {
s.Debugf("Message delete failed, could not locate stream '%s > %s'", acc.Name, stream)
js.mu.Unlock()
return
}
// Check for single replica items.
if n := sa.Group.node; n != nil {
md := streamMsgDelete{Seq: req.Seq, NoErase: req.NoErase, Stream: stream, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeMsgDelete(&md))
} else if mset != nil {
var err error
var removed bool
if req.NoErase {
removed, err = mset.removeMsg(req.Seq)
} else {
removed, err = mset.eraseMsg(req.Seq)
}
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if err != nil {
resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
} else if !removed {
resp.Error = NewJSSequenceNotFoundError(req.Seq)
} else {
resp.Success = true
}
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
js.mu.Unlock()
return
}
js.mu.Unlock()
if mset == nil {
return
}
var err error
var removed bool
if req.NoErase {
removed, err = mset.removeMsg(req.Seq)
} else {
removed, err = mset.eraseMsg(req.Seq)
}
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if err != nil {
resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
} else if !removed {
resp.Error = NewJSSequenceNotFoundError(req.Seq)
} else {
resp.Success = true
}
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
}
func encodeAddStreamAssignment(sa *streamAssignment) []byte {