diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 63b722c5..f7d82926 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5282,7 +5282,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su js.mu.Unlock() if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil { s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err) - } else if cl := si.(*StreamInfo).Cluster; cl != nil { + } else if cl := si.(*StreamInfo).Cluster; cl != nil && cl.Leader != _EMPTY_ { curLeader = getHash(cl.Leader) } // Re-acquire here. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 25ce0211..5843234c 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "os" "reflect" "strings" @@ -1164,3 +1165,119 @@ func TestJetStreamClusterSourceWithOptStartTime(t *testing.T) { t.Run("standalone", func(t *testing.T) { test(t, nil, s) }) t.Run("cluster", func(t *testing.T) { test(t, c, nil) }) } + +type networkCableUnplugged struct { + net.Conn + sync.Mutex + unplugged bool + wb bytes.Buffer + wg sync.WaitGroup +} + +func (c *networkCableUnplugged) Write(b []byte) (int, error) { + c.Lock() + if c.unplugged { + c.wb.Write(b) + c.Unlock() + return len(b), nil + } else if c.wb.Len() > 0 { + c.wb.Write(b) + buf := c.wb.Bytes() + c.wb.Reset() + c.Unlock() + if _, err := c.Conn.Write(buf); err != nil { + return 0, err + } + return len(b), nil + } + c.Unlock() + return c.Conn.Write(b) +} + +func (c *networkCableUnplugged) Read(b []byte) (int, error) { + c.Lock() + wait := c.unplugged + c.Unlock() + if wait { + c.wg.Wait() + } + return c.Conn.Read(b) +} + +func TestJetStreamClusterScaleDownWhileNoQuorum(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R5S", 5) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + require_NoError(t, err) + + for i := 0; i < 1000; i++ { + sendStreamMsg(t, nc, "foo", "msg") + } + + // Let's have a server from this R2 stream be network partitionned. + // We will take the leader, but doesn't have to be. + // To simulate partition, we will replace all its routes with a + // special connection that drops messages. + sl := c.serverByName(si.Cluster.Leader) + if s == sl { + nc.Close() + for s = c.randomServer(); s != sl; s = c.randomServer() { + } + nc, js = jsClientConnect(t, s) + defer nc.Close() + } + + sl.mu.Lock() + for _, r := range sl.routes { + r.mu.Lock() + ncu := &networkCableUnplugged{Conn: r.nc, unplugged: true} + ncu.wg.Add(1) + r.nc = ncu + r.mu.Unlock() + } + sl.mu.Unlock() + + // Wait for the stream info to fail + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) + if err != nil { + return err + } + if si.Cluster.Leader == _EMPTY_ { + return nil + } + return fmt.Errorf("stream still has a leader") + }) + + // Now try to edit the stream by making it an R1. In some case we get + // a context deadline error, in some no error. So don't check the returned error. + js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + }, nats.MaxWait(5*time.Second)) + + sl.mu.Lock() + for _, r := range sl.routes { + r.mu.Lock() + ncu := r.nc.(*networkCableUnplugged) + ncu.Lock() + ncu.unplugged = false + ncu.wg.Done() + ncu.Unlock() + r.mu.Unlock() + } + sl.mu.Unlock() + + checkClusterFormed(t, c.servers...) + c.waitOnStreamLeader(globalAccountName, "TEST") +}