mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
fix move cancel issue where tags and peers diverge (#3354)
This can happen if the move was initiated by the user. A subsequent cancel resets the initial peer list. The original peer list was picked on the old set of tags. A cancel would then keep the new list of tags but reset to the old peers. Thus tags and peers diverge. The problem is that at the time of cancel, the old placement tags can't be found anymore. This fix causes cancel to remove the placement tags, if the old peers do not satisfy the new placement tags. Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -2652,6 +2652,66 @@ func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) {
|
||||
require_True(t, m.Header.Get(JSStream) == "M2")
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterTagInducedMoveCancel(t *testing.T) {
|
||||
server := map[string]struct{}{}
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
server[serverName] = struct{}{}
|
||||
return fmt.Sprintf("%s\nserver_tags: [%s]", conf, clusterName)
|
||||
}, nil)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Client based API
|
||||
c := sc.randomCluster()
|
||||
srv := c.randomNonLeader()
|
||||
nc, js := jsClientConnect(t, srv)
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Placement: &nats.Placement{Tags: []string{"C1"}},
|
||||
Replicas: 3,
|
||||
}
|
||||
siCreate, err := js.AddStream(cfg)
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, siCreate.Cluster.Name, "C1")
|
||||
|
||||
toSend := uint64(1_000)
|
||||
for i := uint64(0); i < toSend; i++ {
|
||||
_, err = js.Publish("foo", nil)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
|
||||
require_NoError(t, err)
|
||||
defer ncsys.Close()
|
||||
|
||||
// cause a move by altering placement tags
|
||||
cfg.Placement.Tags = []string{"C2"}
|
||||
_, err = js.UpdateStream(cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamCancelMoveT, "$G", "TEST"), nil, 5*time.Second)
|
||||
require_NoError(t, err)
|
||||
var cancelResp JSApiStreamUpdateResponse
|
||||
require_NoError(t, json.Unmarshal(rmsg.Data, &cancelResp))
|
||||
if cancelResp.Error != nil && ErrorIdentifier(cancelResp.Error.ErrCode) == JSStreamMoveNotInProgress {
|
||||
t.Skip("This can happen with delays, when Move completed before Cancel", cancelResp.Error)
|
||||
return
|
||||
}
|
||||
require_True(t, cancelResp.Error == nil)
|
||||
|
||||
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
if si.Config.Placement != nil {
|
||||
return fmt.Errorf("expected placement to be cleared got: %+v", si.Config.Placement)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
|
||||
server := map[string]struct{}{}
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
|
||||
|
||||
Reference in New Issue
Block a user