From e15eb22ca63829c423bcd57a85ee180374408e8a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 25 Jan 2023 20:08:04 -0800 Subject: [PATCH] When we create a consumer with less replicas then the stream, make sure to select from online peers. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 34 +++++++++++++++++++++----- server/jetstream_cluster_3_test.go | 38 ++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0b9fe537..f9013353 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6147,14 +6147,36 @@ func decodeStreamAssignment(buf []byte) (*streamAssignment, error) { // createGroupForConsumer will create a new group from same peer set as the stream. func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *streamAssignment) *raftGroup { - peers := copyStrings(sa.Group.Peers) - if len(peers) == 0 { + if len(sa.Group.Peers) == 0 || cfg.Replicas > len(sa.Group.Peers) { return nil } - if cfg.Replicas > 0 && cfg.Replicas != len(peers) { - // First shuffle the peers and then select to account for replica = 1. - rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) - peers = peers[:cfg.Replicas] + + peers := copyStrings(sa.Group.Peers) + var _ss [5]string + active := _ss[:0] + + // Calculate all active peers. + for _, peer := range peers { + if sir, ok := cc.s.nodeToInfo.Load(peer); ok && sir != nil { + if !sir.(nodeInfo).offline { + active = append(active, peer) + } + } + } + if quorum := cfg.Replicas/2 + 1; quorum > len(active) { + // Not enough active to satisfy the request. + return nil + } + + // If we want less then our parent stream, select from active. + if cfg.Replicas > 0 && cfg.Replicas < len(peers) { + // Pedantic in case stream is say R5 and consumer is R3 and 3 or more offline, etc. + if len(active) < cfg.Replicas { + return nil + } + // First shuffle the active peers and then select to account for replica = 1. + rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] }) + peers = active[:cfg.Replicas] } storage := sa.Config.Storage if cfg.MemoryStorage { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index f53ea6bf..ce865116 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2462,3 +2462,41 @@ func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) { require_NoError(t, err) require_True(t, string(entry.Value()) == "derek") } + +// https://github.com/nats-io/nats-server/issues/3791 +func TestJetStreamClusterKVWatchersWithServerDown(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "TEST", + Replicas: 3, + }) + require_NoError(t, err) + + kv.PutString("foo", "bar") + kv.PutString("foo", "baz") + + // Shutdown a follower. + s := c.randomNonStreamLeader(globalAccountName, "KV_TEST") + s.Shutdown() + c.waitOnLeader() + + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + js, err = nc.JetStream(nats.MaxWait(2 * time.Second)) + require_NoError(t, err) + + kv, err = js.KeyValue("TEST") + require_NoError(t, err) + + for i := 0; i < 100; i++ { + w, err := kv.Watch("foo") + require_NoError(t, err) + w.Stop() + } +}