From ca237bdfa09003b59a7fcfc0ce69fa666d0a744e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 4 Nov 2022 08:52:52 -0600 Subject: [PATCH] [FIXED] JetStream: Stream scale down while it has no quorum If a stream R2 had one of its server network-partitioned and at that time the stream was edited to be scaled down to an R1 it would cause the stream to no longer have quorum even when the network partition is resolved. Signed-off-by: Derek Collison Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster.go | 2 +- server/jetstream_cluster_3_test.go | 117 +++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 63b722c5..f7d82926 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5282,7 +5282,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su js.mu.Unlock() if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil { s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err) - } else if cl := si.(*StreamInfo).Cluster; cl != nil { + } else if cl := si.(*StreamInfo).Cluster; cl != nil && cl.Leader != _EMPTY_ { curLeader = getHash(cl.Leader) } // Re-acquire here. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 25ce0211..5843234c 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "os" "reflect" "strings" @@ -1164,3 +1165,119 @@ func TestJetStreamClusterSourceWithOptStartTime(t *testing.T) { t.Run("standalone", func(t *testing.T) { test(t, nil, s) }) t.Run("cluster", func(t *testing.T) { test(t, c, nil) }) } + +type networkCableUnplugged struct { + net.Conn + sync.Mutex + unplugged bool + wb bytes.Buffer + wg sync.WaitGroup +} + +func (c *networkCableUnplugged) Write(b []byte) (int, error) { + c.Lock() + if c.unplugged { + c.wb.Write(b) + c.Unlock() + return len(b), nil + } else if c.wb.Len() > 0 { + c.wb.Write(b) + buf := c.wb.Bytes() + c.wb.Reset() + c.Unlock() + if _, err := c.Conn.Write(buf); err != nil { + return 0, err + } + return len(b), nil + } + c.Unlock() + return c.Conn.Write(b) +} + +func (c *networkCableUnplugged) Read(b []byte) (int, error) { + c.Lock() + wait := c.unplugged + c.Unlock() + if wait { + c.wg.Wait() + } + return c.Conn.Read(b) +} + +func TestJetStreamClusterScaleDownWhileNoQuorum(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R5S", 5) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + require_NoError(t, err) + + for i := 0; i < 1000; i++ { + sendStreamMsg(t, nc, "foo", "msg") + } + + // Let's have a server from this R2 stream be network partitionned. + // We will take the leader, but doesn't have to be. + // To simulate partition, we will replace all its routes with a + // special connection that drops messages. + sl := c.serverByName(si.Cluster.Leader) + if s == sl { + nc.Close() + for s = c.randomServer(); s != sl; s = c.randomServer() { + } + nc, js = jsClientConnect(t, s) + defer nc.Close() + } + + sl.mu.Lock() + for _, r := range sl.routes { + r.mu.Lock() + ncu := &networkCableUnplugged{Conn: r.nc, unplugged: true} + ncu.wg.Add(1) + r.nc = ncu + r.mu.Unlock() + } + sl.mu.Unlock() + + // Wait for the stream info to fail + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) + if err != nil { + return err + } + if si.Cluster.Leader == _EMPTY_ { + return nil + } + return fmt.Errorf("stream still has a leader") + }) + + // Now try to edit the stream by making it an R1. In some case we get + // a context deadline error, in some no error. So don't check the returned error. + js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + }, nats.MaxWait(5*time.Second)) + + sl.mu.Lock() + for _, r := range sl.routes { + r.mu.Lock() + ncu := r.nc.(*networkCableUnplugged) + ncu.Lock() + ncu.unplugged = false + ncu.wg.Done() + ncu.Unlock() + r.mu.Unlock() + } + sl.mu.Unlock() + + checkClusterFormed(t, c.servers...) + c.waitOnStreamLeader(globalAccountName, "TEST") +}