From 9892a132e78da8ac9ff88625fbac703a31f7747b Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 17 Aug 2022 15:12:32 -0700 Subject: [PATCH] Improve StreamMoveInProgressError (#3376) by adding progress indicators Signed-off-by: Matthias Hanel --- server/errors.json | 4 ++-- server/jetstream_cluster.go | 29 +++++++++++++++++++++++++- server/jetstream_errors_generated.go | 18 ++++++++++------ server/jetstream_super_cluster_test.go | 2 +- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/server/errors.json b/server/errors.json index 5730766e..fc5f9327 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1220,10 +1220,10 @@ "deprecates": "" }, { - "constant": "JSStreamMoveInProgress", + "constant": "JSStreamMoveInProgressF", "code": 400, "error_code": 10124, - "description": "stream move already in progress", + "description": "stream move already in progress: {msg}", "comment": "", "help": "", "url": "", diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 19c6d2bb..d96350f5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5076,7 +5076,34 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Check if this is a move request, but no cancellation, and we are already moving this stream. if isMoveRequest && !isMoveCancel && osa.Config.Replicas != len(rg.Peers) { - resp.Error = NewJSStreamMoveInProgressError() + // obtain stats to include in error message + msg := _EMPTY_ + if s.allPeersOffline(rg) { + msg = fmt.Sprintf("all %d peers offline", len(rg.Peers)) + } else { + // Need to release js lock. + js.mu.Unlock() + if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil { + msg = fmt.Sprintf("error retrieving info: %s", err.Error()) + } else if si := si.(*StreamInfo); si != nil { + currentCount := 0 + if si.Cluster.Leader != _EMPTY_ { + currentCount++ + } + combinedLag := uint64(0) + for _, r := range si.Cluster.Replicas { + if r.Current { + currentCount++ + } + combinedLag += r.Lag + } + msg = fmt.Sprintf("total peers: %d, current peers: %d, combined lag: %d", + len(rg.Peers), currentCount, combinedLag) + } + // Re-acquire here. + js.mu.Lock() + } + resp.Error = NewJSStreamMoveInProgressError(msg) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index ce9f62ac..91419f93 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -311,8 +311,8 @@ const ( // JSStreamMoveAndScaleErr can not move and scale a stream in a single update JSStreamMoveAndScaleErr ErrorIdentifier = 10123 - // JSStreamMoveInProgress stream move already in progress - JSStreamMoveInProgress ErrorIdentifier = 10124 + // JSStreamMoveInProgressF stream move already in progress: {msg} + JSStreamMoveInProgressF ErrorIdentifier = 10124 // JSStreamMoveNotInProgress stream move not in progress JSStreamMoveNotInProgress ErrorIdentifier = 10129 @@ -497,7 +497,7 @@ var ( JSStreamMirrorNotUpdatableErr: {Code: 400, ErrCode: 10055, Description: "stream mirror configuration can not be updated"}, JSStreamMismatchErr: {Code: 400, ErrCode: 10056, Description: "stream name in subject does not match request"}, JSStreamMoveAndScaleErr: {Code: 400, ErrCode: 10123, Description: "can not move and scale a stream in a single update"}, - JSStreamMoveInProgress: {Code: 400, ErrCode: 10124, Description: "stream move already in progress"}, + JSStreamMoveInProgressF: {Code: 400, ErrCode: 10124, Description: "stream move already in progress: {msg}"}, JSStreamMoveNotInProgress: {Code: 400, ErrCode: 10129, Description: "stream move not in progress"}, JSStreamMsgDeleteFailedF: {Code: 500, ErrCode: 10057, Description: "{err}"}, JSStreamNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10128, Description: "Stream name can not contain path separators"}, @@ -1707,14 +1707,20 @@ func NewJSStreamMoveAndScaleError(opts ...ErrorOption) *ApiError { return ApiErrors[JSStreamMoveAndScaleErr] } -// NewJSStreamMoveInProgressError creates a new JSStreamMoveInProgress error: "stream move already in progress" -func NewJSStreamMoveInProgressError(opts ...ErrorOption) *ApiError { +// NewJSStreamMoveInProgressError creates a new JSStreamMoveInProgressF error: "stream move already in progress: {msg}" +func NewJSStreamMoveInProgressError(msg interface{}, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) if ae, ok := eopts.err.(*ApiError); ok { return ae } - return ApiErrors[JSStreamMoveInProgress] + e := ApiErrors[JSStreamMoveInProgressF] + args := e.toReplacerArgs([]interface{}{"{msg}", msg}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } } // NewJSStreamMoveNotInProgressError creates a new JSStreamMoveNotInProgress error: "stream move not in progress" diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 8d7a1ad4..89ddc218 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -1778,7 +1778,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) { Replicas: replicas, Placement: &nats.Placement{Tags: []string{"cloud:aws"}}, }) - require_Error(t, err, NewJSStreamMoveInProgressError()) + require_Contains(t, err.Error(), "stream move already in progress") checkFor(t, 10*time.Second, 10*time.Millisecond, func() error { si, err := js.StreamInfo("MOVE")