mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
If no max bytes set do better randomization of placement.
If the stream is replicated take into consideration number of ha assets on the server already. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -4158,6 +4158,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
|
||||
type wn struct {
|
||||
id string
|
||||
avail uint64
|
||||
ha int
|
||||
}
|
||||
|
||||
var nodes []wn
|
||||
@@ -4180,7 +4181,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
|
||||
if uniqueTagPrefix != _EMPTY_ {
|
||||
for _, tag := range tags {
|
||||
if strings.HasPrefix(tag, uniqueTagPrefix) {
|
||||
// disable uniqueness check of explicitly listed in tags
|
||||
// disable uniqueness check if explicitly listed in tags
|
||||
uniqueTagPrefix = _EMPTY_
|
||||
break
|
||||
}
|
||||
@@ -4189,6 +4190,8 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
|
||||
var uniqueTags = make(map[string]struct{})
|
||||
maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets
|
||||
|
||||
// Shuffle them up.
|
||||
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
|
||||
for _, p := range peers {
|
||||
si, ok := s.nodeToInfo.Load(p.ID)
|
||||
if !ok || si == nil {
|
||||
@@ -4222,23 +4225,27 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
|
||||
}
|
||||
|
||||
var available uint64
|
||||
switch cfg.Storage {
|
||||
case MemoryStorage:
|
||||
used := ni.stats.ReservedMemory
|
||||
if ni.stats.Memory > used {
|
||||
used = ni.stats.Memory
|
||||
}
|
||||
if ni.cfg.MaxMemory > int64(used) {
|
||||
available = uint64(ni.cfg.MaxMemory) - used
|
||||
}
|
||||
case FileStorage:
|
||||
used := ni.stats.ReservedStore
|
||||
if ni.stats.Store > used {
|
||||
used = ni.stats.Store
|
||||
}
|
||||
if ni.cfg.MaxStore > int64(used) {
|
||||
available = uint64(ni.cfg.MaxStore) - used
|
||||
var ha int
|
||||
if ni.stats != nil {
|
||||
switch cfg.Storage {
|
||||
case MemoryStorage:
|
||||
used := ni.stats.ReservedMemory
|
||||
if ni.stats.Memory > used {
|
||||
used = ni.stats.Memory
|
||||
}
|
||||
if ni.cfg.MaxMemory > int64(used) {
|
||||
available = uint64(ni.cfg.MaxMemory) - used
|
||||
}
|
||||
case FileStorage:
|
||||
used := ni.stats.ReservedStore
|
||||
if ni.stats.Store > used {
|
||||
used = ni.stats.Store
|
||||
}
|
||||
if ni.cfg.MaxStore > int64(used) {
|
||||
available = uint64(ni.cfg.MaxStore) - used
|
||||
}
|
||||
}
|
||||
ha = ni.stats.HAAssets
|
||||
}
|
||||
|
||||
// Otherwise check if we have enough room if maxBytes set.
|
||||
@@ -4271,7 +4278,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
|
||||
}
|
||||
}
|
||||
// Add to our list of potential nodes.
|
||||
nodes = append(nodes, wn{p.ID, available})
|
||||
nodes = append(nodes, wn{p.ID, available, ha})
|
||||
}
|
||||
|
||||
// If we could not select enough peers, fail.
|
||||
@@ -4281,6 +4288,11 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
|
||||
// Sort based on available from most to least.
|
||||
sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail })
|
||||
|
||||
// If we are placing a replicated stream, let's sort based in haAssets, as that is more important to balance.
|
||||
if cfg.Replicas > 1 {
|
||||
sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].ha < nodes[j].ha })
|
||||
}
|
||||
|
||||
var results []string
|
||||
if len(existing) > 0 {
|
||||
results = append(results, existing...)
|
||||
|
||||
@@ -10875,3 +10875,27 @@ func TestJetStreamClusterNoRestartAdvisories(t *testing.T) {
|
||||
|
||||
checkSubsPending(t, sub, 0)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterR1StreamPlacementNoReservation(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
sp := make(map[string]int)
|
||||
for i := 0; i < 100; i++ {
|
||||
sname := fmt.Sprintf("T-%d", i)
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: sname,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
sp[c.streamLeader("$G", sname).Name()]++
|
||||
}
|
||||
|
||||
for serverName, num := range sp {
|
||||
if num > 60 {
|
||||
t.Fatalf("Streams not distributed, expected ~30-35 but got %d for server %q", num, serverName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user