From 660ea3c80796a1fceea86fda2140e50ccae1f1fc Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 3 Jun 2020 20:00:59 -0700 Subject: [PATCH] Snapshot restore now works across leafnodes. This also introduces the ability to have flow control inbound for restoring a stream. If the system detects a reply subject it will respond with a nil payload. For the last EOF message if a reply is present it will respond with a stream info response or error. Signed-off-by: Derek Collison --- server/accounts.go | 5 ++- server/jetstream_api.go | 55 +++++++++++++++++++----- test/jetstream_test.go | 94 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 138 insertions(+), 16 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 2b66ef2a..6caa5998 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1341,8 +1341,9 @@ func (a *Account) internalClient() *client { } // Internal account scoped subscriptions. -func (a *Account) subscribeInternal(c *client, subject string, cb msgHandler) (*subscription, error) { +func (a *Account) subscribeInternal(subject string, cb msgHandler) (*subscription, error) { a.mu.Lock() + c := a.internalClient() sid := strconv.FormatUint(a.isid+1, 10) a.isid++ a.mu.Unlock() @@ -1352,7 +1353,7 @@ func (a *Account) subscribeInternal(c *client, subject string, cb msgHandler) (* return nil, fmt.Errorf("no internal account client") } - sub, err := c.processSub([]byte(subject+" "+sid), true) + sub, err := c.processSub([]byte(subject+" "+sid), false) if err != nil { return nil, err } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b2d401c9..c4ddb01d 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1129,6 +1129,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r return } + // FIXME(dlc) - Need to close these up if we fail for some reason. tfile, err := ioutil.TempFile("", "jetstream-restore-") if err != nil { resp.Error = &ApiError{Code: 500, Description: "jetstream unable to open temp storage for restore"} @@ -1136,9 +1137,6 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r return } - // Create our internal subscription to accept the snapshot. - restoreSubj := fmt.Sprintf(jsRestoreDeliverT, stream, nuid.Next()) - s.Noticef("Starting restore for stream %q in account %q", stream, c.acc.Name) start := time.Now() received := 0 @@ -1154,19 +1152,34 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r Client: caudit, }) + // Track errors writing to temp files. + var tfileError error + // Create our internal subscription to accept the snapshot. + restoreSubj := fmt.Sprintf(jsRestoreDeliverT, stream, nuid.Next()) + // FIXME(dlc) - Can't recover well here if something goes wrong. Could use channels and at least time // things out. Note that this is tied to the requesting client, so if it is a tool this goes away when // the client does. Only thing leaking here is the sub on strange failure. - acc.subscribeInternal(c, restoreSubj, func(sub *subscription, c *client, _, _ string, msg []byte) { + acc.subscribeInternal(restoreSubj, func(sub *subscription, c *client, subject, reply string, msg []byte) { + // Account client messages have \r\n on end. + if len(msg) < LEN_CR_LF { + return + } + msg = msg[:len(msg)-LEN_CR_LF] + if len(msg) == 0 { tfile.Seek(0, 0) - // TODO(dlc) - no way right now to communicate back. - acc.RestoreStream(stream, tfile) + mset, err := acc.RestoreStream(stream, tfile) + if err != nil && tfileError != nil { + err = tfileError + } tfile.Close() os.Remove(tfile.Name()) c.processUnsub(sub.sid) end := time.Now() + + // TODO(rip) - Should this have the error code in it?? s.publishAdvisory(c.acc, JSAdvisoryStreamRestoreCompletePre+"."+stream, &JSRestoreCompleteAdvisory{ TypedEvent: TypedEvent{ Type: JSRestoreCompleteAdvisoryType, @@ -1180,13 +1193,31 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r Client: caudit, }) - s.Noticef("Completed %s restore for stream %q in account %q in %v", FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start)) + s.Noticef("Completed %s restore for stream %q in account %q in %v", + FriendlyBytes(int64(received)), stream, c.acc.Name, end.Sub(start)) + + // If there is a reply subject on the last EOF, send back the stream info or error status. + if reply != _EMPTY_ { + var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} + if err != nil { + resp.Error = jsError(err) + } else { + resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()} + } + s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp)) + } return } - // Append chunk to temp file. - tfile.Write(msg) + // Append chunk to temp file. Mark as issue if we encounter an error. + if n, err := tfile.Write(msg); n != len(msg) || err != nil { + tfileError = err + } received += len(msg) + + if reply != _EMPTY_ { + s.sendInternalAccountMsg(acc, reply, nil) + } }) resp.DeliverSubject = restoreSubj @@ -1286,7 +1317,11 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, Client: caudit, }) - s.Noticef("Completed %s snapshot for stream %q in account %q in %v", FriendlyBytes(int64(resp.NumBlks*resp.BlkSize)), mset.Name(), mset.jsa.account.Name, end.Sub(start)) + s.Noticef("Completed %s snapshot for stream %q in account %q in %v", + FriendlyBytes(int64(resp.NumBlks*resp.BlkSize)), + mset.Name(), + mset.jsa.account.Name, + end.Sub(start)) }() } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 6cc2d776..e6a9d2a3 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -2894,9 +2894,27 @@ func TestJetStreamSnapshots(t *testing.T) { } func TestJetStreamSnapshotsAPI(t *testing.T) { - s := RunBasicJetStreamServer() + lopts := DefaultTestOptions + lopts.ServerName = "LS" + lopts.Port = -1 + lopts.LeafNode.Host = lopts.Host + lopts.LeafNode.Port = -1 + + ls := RunServer(&lopts) + defer ls.Shutdown() + + opts := DefaultTestOptions + opts.ServerName = "S" + opts.Port = -1 + opts.JetStream = true + rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lopts.LeafNode.Host, lopts.LeafNode.Port)) + opts.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}} + + s := RunServer(&opts) defer s.Shutdown() + checkLeafNodeConnected(t, s) + if config := s.JetStreamConfig(); config != nil { defer os.RemoveAll(config.StoreDir) } @@ -3041,10 +3059,9 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { if rresp.Error != nil { t.Fatalf("Got an unexpected error response: %+v", rresp.Error) } - r := bytes.NewReader(snapshot) - // Can be anysize message. + // Can be any size message. var chunk [512]byte - for { + for r := bytes.NewReader(snapshot); ; { n, err := r.Read(chunk[:]) if err != nil { break @@ -3077,6 +3094,75 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { case <-time.After(5 * time.Second): t.Fatalf("Did not receive our snapshot in time") } + + // Now connect through a cluster server and make sure we can get things to work this way as well. + nc2 := clientConnectToServer(t, ls) + defer nc2.Close() + + snapshot = snapshot[:0] + + req, _ = json.Marshal(sreq) + rmsg, err = nc2.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, mname), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error on snapshot request: %v", err) + } + resp.Error = nil + json.Unmarshal(rmsg.Data, &resp) + if resp.Error != nil { + t.Fatalf("Did not get correct error response: %+v", resp.Error) + } + // Wait to receive the snapshot. + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive our snapshot in time") + } + + // Now do a restore through the new client connection. + // Delete this stream first. + mset, err = acc.LookupStream(mname) + if err != nil { + t.Fatalf("Expected to find a stream for %q", mname) + } + state = mset.State() + mset.Delete() + + rmsg, err = nc2.Request(fmt.Sprintf(server.JSApiStreamRestoreT, mname), nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error on snapshot request: %v", err) + } + // Make sure to clear. + rresp.Error = nil + json.Unmarshal(rmsg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Got an unexpected error response: %+v", rresp.Error) + } + for r := bytes.NewReader(snapshot); ; { + n, err := r.Read(chunk[:]) + if err != nil { + break + } + // Make sure other side responds to reply subjects for ack flow. Optional. + if _, err := nc2.Request(rresp.DeliverSubject, chunk[:n], time.Second); err != nil { + t.Fatalf("Restore not honoring reply subjects for ack flow") + } + } + // For EOF this will send back stream info or an error. + si, err := nc2.Request(rresp.DeliverSubject, nil, time.Second) + if err != nil { + t.Fatalf("Got an error restoring stream: %v", err) + } + var scResp server.JSApiStreamCreateResponse + if err := json.Unmarshal(si.Data, &scResp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if scResp.Error != nil { + t.Fatalf("Got an unexpected error from EOF omn restore: %+v", scResp.Error) + } + + if scResp.StreamInfo.State != state { + t.Fatalf("Did not match states, %+v vs %+v", scResp.StreamInfo.State, state) + } } func TestJetStreamSnapshotsAPIPerf(t *testing.T) {