mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #3608 from nats-io/js_scale_down_no_quorum
[FIXED] JetStream: Stream scale down while it has no quorum
This commit is contained in:
@@ -5282,7 +5282,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
|
|||||||
js.mu.Unlock()
|
js.mu.Unlock()
|
||||||
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
|
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
|
||||||
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
|
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
|
||||||
} else if cl := si.(*StreamInfo).Cluster; cl != nil {
|
} else if cl := si.(*StreamInfo).Cluster; cl != nil && cl.Leader != _EMPTY_ {
|
||||||
curLeader = getHash(cl.Leader)
|
curLeader = getHash(cl.Leader)
|
||||||
}
|
}
|
||||||
// Re-acquire here.
|
// Re-acquire here.
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -1164,3 +1165,119 @@ func TestJetStreamClusterSourceWithOptStartTime(t *testing.T) {
|
|||||||
t.Run("standalone", func(t *testing.T) { test(t, nil, s) })
|
t.Run("standalone", func(t *testing.T) { test(t, nil, s) })
|
||||||
t.Run("cluster", func(t *testing.T) { test(t, c, nil) })
|
t.Run("cluster", func(t *testing.T) { test(t, c, nil) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type networkCableUnplugged struct {
|
||||||
|
net.Conn
|
||||||
|
sync.Mutex
|
||||||
|
unplugged bool
|
||||||
|
wb bytes.Buffer
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *networkCableUnplugged) Write(b []byte) (int, error) {
|
||||||
|
c.Lock()
|
||||||
|
if c.unplugged {
|
||||||
|
c.wb.Write(b)
|
||||||
|
c.Unlock()
|
||||||
|
return len(b), nil
|
||||||
|
} else if c.wb.Len() > 0 {
|
||||||
|
c.wb.Write(b)
|
||||||
|
buf := c.wb.Bytes()
|
||||||
|
c.wb.Reset()
|
||||||
|
c.Unlock()
|
||||||
|
if _, err := c.Conn.Write(buf); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return len(b), nil
|
||||||
|
}
|
||||||
|
c.Unlock()
|
||||||
|
return c.Conn.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *networkCableUnplugged) Read(b []byte) (int, error) {
|
||||||
|
c.Lock()
|
||||||
|
wait := c.unplugged
|
||||||
|
c.Unlock()
|
||||||
|
if wait {
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
return c.Conn.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJetStreamClusterScaleDownWhileNoQuorum(t *testing.T) {
|
||||||
|
c := createJetStreamClusterExplicit(t, "R5S", 5)
|
||||||
|
defer c.shutdown()
|
||||||
|
|
||||||
|
s := c.randomServer()
|
||||||
|
nc, js := jsClientConnect(t, s)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
si, err := js.AddStream(&nats.StreamConfig{
|
||||||
|
Name: "TEST",
|
||||||
|
Subjects: []string{"foo"},
|
||||||
|
Replicas: 2,
|
||||||
|
})
|
||||||
|
require_NoError(t, err)
|
||||||
|
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
sendStreamMsg(t, nc, "foo", "msg")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let's have a server from this R2 stream be network partitionned.
|
||||||
|
// We will take the leader, but doesn't have to be.
|
||||||
|
// To simulate partition, we will replace all its routes with a
|
||||||
|
// special connection that drops messages.
|
||||||
|
sl := c.serverByName(si.Cluster.Leader)
|
||||||
|
if s == sl {
|
||||||
|
nc.Close()
|
||||||
|
for s = c.randomServer(); s != sl; s = c.randomServer() {
|
||||||
|
}
|
||||||
|
nc, js = jsClientConnect(t, s)
|
||||||
|
defer nc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
sl.mu.Lock()
|
||||||
|
for _, r := range sl.routes {
|
||||||
|
r.mu.Lock()
|
||||||
|
ncu := &networkCableUnplugged{Conn: r.nc, unplugged: true}
|
||||||
|
ncu.wg.Add(1)
|
||||||
|
r.nc = ncu
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
sl.mu.Unlock()
|
||||||
|
|
||||||
|
// Wait for the stream info to fail
|
||||||
|
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
||||||
|
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if si.Cluster.Leader == _EMPTY_ {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("stream still has a leader")
|
||||||
|
})
|
||||||
|
|
||||||
|
// Now try to edit the stream by making it an R1. In some case we get
|
||||||
|
// a context deadline error, in some no error. So don't check the returned error.
|
||||||
|
js.UpdateStream(&nats.StreamConfig{
|
||||||
|
Name: "TEST",
|
||||||
|
Subjects: []string{"foo"},
|
||||||
|
Replicas: 1,
|
||||||
|
}, nats.MaxWait(5*time.Second))
|
||||||
|
|
||||||
|
sl.mu.Lock()
|
||||||
|
for _, r := range sl.routes {
|
||||||
|
r.mu.Lock()
|
||||||
|
ncu := r.nc.(*networkCableUnplugged)
|
||||||
|
ncu.Lock()
|
||||||
|
ncu.unplugged = false
|
||||||
|
ncu.wg.Done()
|
||||||
|
ncu.Unlock()
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
sl.mu.Unlock()
|
||||||
|
|
||||||
|
checkClusterFormed(t, c.servers...)
|
||||||
|
c.waitOnStreamLeader(globalAccountName, "TEST")
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user