diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3c51546b..c3e9b9d5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3841,30 +3841,36 @@ func (s *Server) jsClusteredStreamPurgeRequest( } js.mu.Lock() - defer js.mu.Unlock() - sa := js.streamAssignment(acc.Name, stream) if sa == nil { resp := JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} resp.Error = NewJSStreamNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + js.mu.Unlock() return } if n := sa.Group.node; n != nil { sp := &streamPurge{Stream: stream, LastSeq: mset.state().LastSeq, Subject: subject, Reply: reply, Client: ci, Request: preq} n.Propose(encodeStreamPurge(sp)) - } else if mset != nil { - var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} - purged, err := mset.purge(preq) - if err != nil { - resp.Error = NewJSStreamGeneralError(err, Unless(err)) - } else { - resp.Purged = purged - resp.Success = true - } - s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + js.mu.Unlock() + return } + js.mu.Unlock() + + if mset == nil { + return + } + + var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} + purged, err := mset.purge(preq) + if err != nil { + resp.Error = NewJSStreamGeneralError(err, Unless(err)) + } else { + resp.Purged = purged + resp.Success = true + } + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, req *JSApiStreamRestoreRequest, stream, subject, reply string, rmsg []byte) { @@ -4273,35 +4279,42 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset } js.mu.Lock() - defer js.mu.Unlock() - sa := js.streamAssignment(acc.Name, stream) if sa == nil { s.Debugf("Message delete failed, could not locate stream '%s > %s'", acc.Name, stream) + js.mu.Unlock() return } + // Check for single replica items. if n := sa.Group.node; n != nil { md := streamMsgDelete{Seq: req.Seq, NoErase: req.NoErase, Stream: stream, Subject: subject, Reply: reply, Client: ci} n.Propose(encodeMsgDelete(&md)) - } else if mset != nil { - var err error - var removed bool - if req.NoErase { - removed, err = mset.removeMsg(req.Seq) - } else { - removed, err = mset.eraseMsg(req.Seq) - } - var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} - if err != nil { - resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err)) - } else if !removed { - resp.Error = NewJSSequenceNotFoundError(req.Seq) - } else { - resp.Success = true - } - s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + js.mu.Unlock() + return } + js.mu.Unlock() + + if mset == nil { + return + } + + var err error + var removed bool + if req.NoErase { + removed, err = mset.removeMsg(req.Seq) + } else { + removed, err = mset.eraseMsg(req.Seq) + } + var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} + if err != nil { + resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err)) + } else if !removed { + resp.Error = NewJSSequenceNotFoundError(req.Seq) + } else { + resp.Success = true + } + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } func encodeAddStreamAssignment(sa *streamAssignment) []byte {