Snapshot restore now works across leafnodes.

This also introduces the ability to have flow control inbound for restoring a stream.
If the system detects a reply subject it will respond with a nil payload.
For the last EOF message if a reply is present it will respond with a stream info response or error.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-06-03 20:00:59 -07:00
parent 30bc47b87a
commit 660ea3c807
3 changed files with 138 additions and 16 deletions

View File

@@ -1341,8 +1341,9 @@ func (a *Account) internalClient() *client {
}
// Internal account scoped subscriptions.
func (a *Account) subscribeInternal(c *client, subject string, cb msgHandler) (*subscription, error) {
func (a *Account) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
a.mu.Lock()
c := a.internalClient()
sid := strconv.FormatUint(a.isid+1, 10)
a.isid++
a.mu.Unlock()
@@ -1352,7 +1353,7 @@ func (a *Account) subscribeInternal(c *client, subject string, cb msgHandler) (*
return nil, fmt.Errorf("no internal account client")
}
sub, err := c.processSub([]byte(subject+" "+sid), true)
sub, err := c.processSub([]byte(subject+" "+sid), false)
if err != nil {
return nil, err
}

View File

@@ -1129,6 +1129,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
return
}
// FIXME(dlc) - Need to close these up if we fail for some reason.
tfile, err := ioutil.TempFile("", "jetstream-restore-")
if err != nil {
resp.Error = &ApiError{Code: 500, Description: "jetstream unable to open temp storage for restore"}
@@ -1136,9 +1137,6 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
return
}
// Create our internal subscription to accept the snapshot.
restoreSubj := fmt.Sprintf(jsRestoreDeliverT, stream, nuid.Next())
s.Noticef("Starting restore for stream %q in account %q", stream, c.acc.Name)
start := time.Now()
received := 0
@@ -1154,19 +1152,34 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
Client: caudit,
})
// Track errors writing to temp files.
var tfileError error
// Create our internal subscription to accept the snapshot.
restoreSubj := fmt.Sprintf(jsRestoreDeliverT, stream, nuid.Next())
// FIXME(dlc) - Can't recover well here if something goes wrong. Could use channels and at least time
// things out. Note that this is tied to the requesting client, so if it is a tool this goes away when
// the client does. Only thing leaking here is the sub on strange failure.
acc.subscribeInternal(c, restoreSubj, func(sub *subscription, c *client, _, _ string, msg []byte) {
acc.subscribeInternal(restoreSubj, func(sub *subscription, c *client, subject, reply string, msg []byte) {
// Account client messages have \r\n on end.
if len(msg) < LEN_CR_LF {
return
}
msg = msg[:len(msg)-LEN_CR_LF]
if len(msg) == 0 {
tfile.Seek(0, 0)
// TODO(dlc) - no way right now to communicate back.
acc.RestoreStream(stream, tfile)
mset, err := acc.RestoreStream(stream, tfile)
if err != nil && tfileError != nil {
err = tfileError
}
tfile.Close()
os.Remove(tfile.Name())
c.processUnsub(sub.sid)
end := time.Now()
// TODO(rip) - Should this have the error code in it??
s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCompletePre+"."+stream, &JSRestoreCompleteAdvisory{
TypedEvent: TypedEvent{
Type: JSRestoreCompleteAdvisoryType,
@@ -1180,13 +1193,31 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
Client: caudit,
})
s.Noticef("Completed %s restore for stream %q in account %q in %v", FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start))
s.Noticef("Completed %s restore for stream %q in account %q in %v",
FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start))
// If there is a reply subject on the last EOF, send back the stream info or error status.
if reply != _EMPTY_ {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
} else {
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()}
}
s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp))
}
return
}
// Append chunk to temp file.
tfile.Write(msg)
// Append chunk to temp file. Mark as issue if we encounter an error.
if n, err := tfile.Write(msg); n != len(msg) || err != nil {
tfileError = err
}
received += len(msg)
if reply != _EMPTY_ {
s.sendInternalAccountMsg(acc, reply, nil)
}
})
resp.DeliverSubject = restoreSubj
@@ -1286,7 +1317,11 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
Client: caudit,
})
s.Noticef("Completed %s snapshot for stream %q in account %q in %v", FriendlyBytes(int64(resp.NumBlks*resp.BlkSize)), mset.Name(), mset.jsa.account.Name, end.Sub(start))
s.Noticef("Completed %s snapshot for stream %q in account %q in %v",
FriendlyBytes(int64(resp.NumBlks*resp.BlkSize)),
mset.Name(),
mset.jsa.account.Name,
end.Sub(start))
}()
}

View File

@@ -2894,9 +2894,27 @@ func TestJetStreamSnapshots(t *testing.T) {
}
func TestJetStreamSnapshotsAPI(t *testing.T) {
s := RunBasicJetStreamServer()
lopts := DefaultTestOptions
lopts.ServerName = "LS"
lopts.Port = -1
lopts.LeafNode.Host = lopts.Host
lopts.LeafNode.Port = -1
ls := RunServer(&lopts)
defer ls.Shutdown()
opts := DefaultTestOptions
opts.ServerName = "S"
opts.Port = -1
opts.JetStream = true
rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lopts.LeafNode.Host, lopts.LeafNode.Port))
opts.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
s := RunServer(&opts)
defer s.Shutdown()
checkLeafNodeConnected(t, s)
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
@@ -3041,10 +3059,9 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
if rresp.Error != nil {
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
}
r := bytes.NewReader(snapshot)
// Can be anysize message.
// Can be any size message.
var chunk [512]byte
for {
for r := bytes.NewReader(snapshot); ; {
n, err := r.Read(chunk[:])
if err != nil {
break
@@ -3077,6 +3094,75 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive our snapshot in time")
}
// Now connect through a cluster server and make sure we can get things to work this way as well.
nc2 := clientConnectToServer(t, ls)
defer nc2.Close()
snapshot = snapshot[:0]
req, _ = json.Marshal(sreq)
rmsg, err = nc2.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, mname), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
resp.Error = nil
json.Unmarshal(rmsg.Data, &resp)
if resp.Error != nil {
t.Fatalf("Did not get correct error response: %+v", resp.Error)
}
// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive our snapshot in time")
}
// Now do a restore through the new client connection.
// Delete this stream first.
mset, err = acc.LookupStream(mname)
if err != nil {
t.Fatalf("Expected to find a stream for %q", mname)
}
state = mset.State()
mset.Delete()
rmsg, err = nc2.Request(fmt.Sprintf(server.JSApiStreamRestoreT, mname), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
// Make sure to clear.
rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
if rresp.Error != nil {
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
}
for r := bytes.NewReader(snapshot); ; {
n, err := r.Read(chunk[:])
if err != nil {
break
}
// Make sure other side responds to reply subjects for ack flow. Optional.
if _, err := nc2.Request(rresp.DeliverSubject, chunk[:n], time.Second); err != nil {
t.Fatalf("Restore not honoring reply subjects for ack flow")
}
}
// For EOF this will send back stream info or an error.
si, err := nc2.Request(rresp.DeliverSubject, nil, time.Second)
if err != nil {
t.Fatalf("Got an error restoring stream: %v", err)
}
var scResp server.JSApiStreamCreateResponse
if err := json.Unmarshal(si.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.Error != nil {
t.Fatalf("Got an unexpected error from EOF omn restore: %+v", scResp.Error)
}
if scResp.StreamInfo.State != state {
t.Fatalf("Did not match states, %+v vs %+v", scResp.StreamInfo.State, state)
}
}
func TestJetStreamSnapshotsAPIPerf(t *testing.T) {