Require reply subjects for restore chunks

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-06-04 06:56:07 -07:00
parent 012d517ba1
commit 164f44ed18
2 changed files with 19 additions and 15 deletions

View File

@@ -1130,6 +1130,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
}
// FIXME(dlc) - Need to close these up if we fail for some reason.
// TODO(dlc) - Might need to make configurable or stream direct to storage dir.
tfile, err := ioutil.TempFile("", "jetstream-restore-")
if err != nil {
resp.Error = &ApiError{Code: 500, Description: "jetstream unable to open temp storage for restore"}
@@ -1159,6 +1160,14 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
// things out. Note that this is tied to the requesting client, so if it is a tool this goes away when
// the client does. Only thing leaking here is the sub on strange failure.
acc.subscribeInternal(restoreSubj, func(sub *subscription, c *client, subject, reply string, msg []byte) {
// We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
if reply == _EMPTY_ {
tfile.Close()
os.Remove(tfile.Name())
c.processUnsub(sub.sid)
s.Warnf("Restore for stream %q in account %q requires reply subject for each chunk", stream, c.acc.Name)
return
}
// Account client messages have \r\n on end.
if len(msg) < LEN_CR_LF {
return
@@ -1191,16 +1200,14 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
s.Noticef("Completed %s restore for stream %q in account %q in %v",
FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start))
// If there is a reply subject on the last EOF, send back the stream info or error status.
if reply != _EMPTY_ {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
} else {
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()}
}
s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp))
// On the last EOF, send back the stream info or error status.
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
} else {
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()}
}
s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp))
return
}
@@ -1217,9 +1224,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
return
}
received += len(msg)
if reply != _EMPTY_ {
s.sendInternalAccountMsg(acc, reply, nil)
}
s.sendInternalAccountMsg(acc, reply, nil)
})
resp.DeliverSubject = restoreSubj

View File

@@ -3066,10 +3066,9 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
if err != nil {
break
}
nc.Publish(rresp.DeliverSubject, chunk[:n])
nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
}
nc.Publish(rresp.DeliverSubject, nil)
nc.Flush()
nc.Request(rresp.DeliverSubject, nil, time.Second)
mset, err = acc.LookupStream(mname)
if err != nil {