Merge pull request #3820 from nats-io/issue-3791

[FIXED] Select consumer peer(s) from active peers only.
This commit is contained in:
Derek Collison
2023-01-26 08:27:11 -08:00
committed by GitHub
2 changed files with 66 additions and 6 deletions

View File

@@ -6157,14 +6157,36 @@ func decodeStreamAssignment(buf []byte) (*streamAssignment, error) {
// createGroupForConsumer will create a new group from same peer set as the stream.
func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *streamAssignment) *raftGroup {
peers := copyStrings(sa.Group.Peers)
if len(peers) == 0 {
if len(sa.Group.Peers) == 0 || cfg.Replicas > len(sa.Group.Peers) {
return nil
}
if cfg.Replicas > 0 && cfg.Replicas != len(peers) {
// First shuffle the peers and then select to account for replica = 1.
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
peers = peers[:cfg.Replicas]
peers := copyStrings(sa.Group.Peers)
var _ss [5]string
active := _ss[:0]
// Calculate all active peers.
for _, peer := range peers {
if sir, ok := cc.s.nodeToInfo.Load(peer); ok && sir != nil {
if !sir.(nodeInfo).offline {
active = append(active, peer)
}
}
}
if quorum := cfg.Replicas/2 + 1; quorum > len(active) {
// Not enough active to satisfy the request.
return nil
}
// If we want less then our parent stream, select from active.
if cfg.Replicas > 0 && cfg.Replicas < len(peers) {
// Pedantic in case stream is say R5 and consumer is R3 and 3 or more offline, etc.
if len(active) < cfg.Replicas {
return nil
}
// First shuffle the active peers and then select to account for replica = 1.
rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] })
peers = active[:cfg.Replicas]
}
storage := sa.Config.Storage
if cfg.MemoryStorage {

View File

@@ -2462,3 +2462,41 @@ func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) {
require_NoError(t, err)
require_True(t, string(entry.Value()) == "derek")
}
// https://github.com/nats-io/nats-server/issues/3791
func TestJetStreamClusterKVWatchersWithServerDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)
kv.PutString("foo", "bar")
kv.PutString("foo", "baz")
// Shutdown a follower.
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
s.Shutdown()
c.waitOnLeader()
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
js, err = nc.JetStream(nats.MaxWait(2 * time.Second))
require_NoError(t, err)
kv, err = js.KeyValue("TEST")
require_NoError(t, err)
for i := 0; i < 100; i++ {
w, err := kv.Watch("foo")
require_NoError(t, err)
w.Stop()
}
}