From fb15dfd9b7c8f18c00584e85b4ba8f017c5bc662 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 13 Feb 2022 15:37:43 -0800 Subject: [PATCH] Allow replica updates during stream update. Also add in HAAssets count to Jsz. Signed-off-by: Derek Collison --- server/jetstream.go | 3 + server/jetstream_cluster.go | 105 ++++++++++++++++++++++++++----- server/jetstream_cluster_test.go | 86 +++++++++++++++++++++++-- server/raft.go | 6 ++ 4 files changed, 180 insertions(+), 20 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 63d3a5bc..b4e5a757 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -55,6 +55,7 @@ type JetStreamStats struct { ReservedMemory uint64 `json:"reserved_memory"` ReservedStore uint64 `json:"reserved_storage"` Accounts int `json:"accounts"` + HAAssets int `json:"ha_assets"` API JetStreamAPIStats `json:"api"` } @@ -1738,12 +1739,14 @@ func (js *jetStream) usageStats() *JetStreamStats { stats.Accounts = len(js.accounts) stats.ReservedMemory = (uint64)(js.memReserved) stats.ReservedStore = (uint64)(js.storeReserved) + s := js.srv js.mu.RUnlock() stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiTotal)) stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors)) stats.API.Inflight = (uint64)(atomic.LoadInt64(&js.apiInflight)) stats.Memory = (uint64)(atomic.LoadInt64(&js.memUsed)) stats.Store = (uint64)(atomic.LoadInt64(&js.storeUsed)) + stats.HAAssets = s.numRaftNodes() return &stats } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 847d5a6b..43872684 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -94,6 +94,7 @@ type raftGroup struct { Name string `json:"name"` Peers []string `json:"peers"` Storage StorageType `json:"store"` + Cluster string `json:"cluster,omitempty"` Preferred string `json:"preferred,omitempty"` // Internal node RaftNode @@ -1460,6 +1461,15 @@ func (mset *stream) raftNode() RaftNode { return mset.node } +func (mset *stream) removeNode() { + mset.mu.Lock() + defer mset.mu.Unlock() + if n := mset.node; n != nil { + n.Delete() + mset.node = nil + } +} + // Monitor our stream node for this stream. func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { s, cc, n := js.server(), js.cluster, sa.Group.node @@ -2162,7 +2172,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { accStreams := cc.streams[acc.Name] if accStreams == nil { accStreams = make(map[string]*streamAssignment) - } else if osa := accStreams[stream]; osa != nil { + } else if osa := accStreams[stream]; osa != nil && osa != sa { // Copy over private existing state from former SA. sa.Group.node = osa.Group.node sa.consumers = osa.consumers @@ -2273,7 +2283,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { } // processClusterUpdateStream is called when we have a stream assignment that -// has been updated for an existing assignment. +// has been updated for an existing assignment and we are a member. func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAssignment) { if sa == nil { return @@ -2282,15 +2292,26 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss js.mu.Lock() s, rg := js.srv, sa.Group client, subject, reply := sa.Client, sa.Subject, sa.Reply - alreadyRunning := rg.node != nil + alreadyRunning, numReplicas := osa.Group.node != nil, sa.Config.Replicas + needsNode := rg.node == nil + storage := sa.Config.Storage hasResponded := sa.responded sa.responded = true js.mu.Unlock() mset, err := acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { - if !alreadyRunning { + if !alreadyRunning && numReplicas > 1 { + if needsNode { + js.createRaftGroup(rg, storage) + } s.startGoRoutine(func() { js.monitorStream(mset, sa) }) + } else if numReplicas == 1 && alreadyRunning { + // We downgraded to R1. Make sure we cleanup the raft node and the stream monitor. + mset.removeNode() + js.mu.Lock() + sa.Group.node = nil + js.mu.Unlock() } mset.setStreamAssignment(sa) if err = mset.update(sa.Config); err != nil { @@ -2299,6 +2320,12 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss } } + // If not found we must be expanding into this node since if we are here we know we are a member. + if err == ErrJetStreamStreamNotFound { + js.processStreamAssignment(sa) + return + } + if err != nil { js.mu.Lock() sa.err = err @@ -2328,6 +2355,12 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss return } + // If we were a single node being promoted assume leadership role for purpose of responding. + if !hasResponded && !isLeader && !alreadyRunning { + isLeader = true + } + + // Check if we should bail. if !isLeader || hasResponded { return } @@ -3637,7 +3670,7 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe } // selectPeerGroup will select a group of peers to start a raft group. -func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig) []string { +func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string) []string { if cluster == _EMPTY_ || cfg == nil { return nil } @@ -3656,6 +3689,18 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo var nodes []wn s, peers := cc.s, cc.meta.Peers() + // Map existing. + var ep map[string]struct{} + if le := len(existing); le > 0 { + if le >= r { + return existing + } + ep = make(map[string]struct{}) + for _, p := range existing { + ep[p] = struct{}{} + } + } + for _, p := range peers { si, ok := s.nodeToInfo.Load(p.ID) if !ok || si == nil { @@ -3668,6 +3713,13 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo continue } + // If existing also skip, we will add back in to front of the list when done. + if ep != nil { + if _, ok := ep[p.ID]; ok { + continue + } + } + var available uint64 switch cfg.Storage { case MemoryStorage: @@ -3688,7 +3740,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo } } - // Check if we have enveough room if maxBytes set. + // Otherwise check if we have enough room if maxBytes set. if maxBytes > 0 && maxBytes > available { continue } @@ -3697,13 +3749,17 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo } // If we could not select enough peers, fail. - if len(nodes) < r { + if len(nodes) < (r - len(existing)) { return nil } // Sort based on available from most to least. sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail }) var results []string + if len(existing) > 0 { + results = append(results, existing...) + r -= len(existing) + } for _, r := range nodes[:r] { results = append(results, r.id) } @@ -3750,11 +3806,11 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *ra // Need to create a group here. for _, cn := range clusters { - peers := cc.selectPeerGroup(replicas, cn, cfg) + peers := cc.selectPeerGroup(replicas, cn, cfg, nil) if len(peers) < replicas { continue } - return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers} + return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn} } return nil } @@ -3901,12 +3957,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } - // Check for cluster changes that we want to error on. - if newCfg.Replicas != len(osa.Group.Peers) { - resp.Error = NewJSStreamReplicasNotUpdatableError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return - } + // Check for mirrot changes which are not allowed. if !reflect.DeepEqual(newCfg.Mirror, osa.Config.Mirror) { resp.Error = NewJSStreamMirrorNotUpdatableError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) @@ -3929,7 +3980,29 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } } - sa := &streamAssignment{Group: osa.Group, Sync: osa.Sync, Config: newCfg, Subject: subject, Reply: reply, Client: ci} + // Check for replica changes. + rg := osa.Group + if newCfg.Replicas != len(rg.Peers) { + // We are adding new peers here. + if newCfg.Replicas > len(rg.Peers) { + peers := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers) + if len(peers) != newCfg.Replicas { + resp.Error = NewJSInsufficientResourcesError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + // Single nodes are not recorded by the NRG layer so we can rename. + if len(rg.Peers) == 1 { + rg.Name = groupNameForStream(peers, rg.Storage) + } + rg.Peers = peers + } else { + // We are deleting nodes here. + rg.Peers = rg.Peers[:newCfg.Replicas] + } + } + + sa := &streamAssignment{Group: rg, Sync: osa.Sync, Config: newCfg, Subject: subject, Reply: reply, Client: ci} cc.meta.Propose(encodeUpdateStreamAssignment(sa)) } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 0a7d9e87..15e54106 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1456,10 +1456,6 @@ func TestJetStreamClusterStreamExtendedUpdates(t *testing.T) { if si := updateStream(); !reflect.DeepEqual(si.Config.Subjects, cfg.Subjects) { t.Fatalf("Did not get expected stream info: %+v", si) } - // Make sure these error for now. - // R factor changes - cfg.Replicas = 1 - expectError() // Mirror changes cfg.Replicas = 3 cfg.Mirror = &nats.StreamSource{Name: "ORDERS"} @@ -10507,6 +10503,88 @@ func TestJetStreamAddConsumerWithInfo(t *testing.T) { t.Run("Clustered", func(t *testing.T) { testConsInfo(t, c.randomServer()) }) } +func TestJetStreamClusterStreamReplicaUpdates(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R7S", 7) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Start out at R1 + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + } + _, err := js.AddStream(cfg) + require_NoError(t, err) + + numMsgs := 1000 + for i := 0; i < numMsgs; i++ { + js.PublishAsync("foo", []byte("HELLO WORLD")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + updateReplicas := func(r int) { + t.Helper() + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + leader := si.Cluster.Leader + + cfg.Replicas = r + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + c.waitOnStreamLeader("$G", "TEST") + + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + if len(si.Cluster.Replicas) != r-1 { + return fmt.Errorf("Expected %d replicas, got %d", r-1, len(si.Cluster.Replicas)) + } + return nil + }) + + // Make sure we kept same leader. + if si.Cluster.Leader != leader { + t.Fatalf("Leader changed, expected %q got %q", leader, si.Cluster.Leader) + } + // Make sure all are current. + for _, r := range si.Cluster.Replicas { + c.waitOnStreamCurrent(c.serverByName(r.Name), "$G", "TEST") + } + // Check msgs. + if si.State.Msgs != uint64(numMsgs) { + t.Fatalf("Expected %d msgs, got %d", numMsgs, si.State.Msgs) + } + // Make sure we have the right number of HA Assets running on the leader. + s := c.serverByName(leader) + jsi, err := s.Jsz(nil) + require_NoError(t, err) + nha := 1 // meta always present. + if len(si.Cluster.Replicas) > 0 { + nha++ + } + if nha != jsi.HAAssets { + t.Fatalf("Expected %d HA asset(s), but got %d", nha, jsi.HAAssets) + } + } + + // Update from 1-3 + updateReplicas(3) + // Update from 3-5 + updateReplicas(5) + // Update from 5-3 + updateReplicas(3) + // Update from 3-1 + updateReplicas(1) +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/raft.go b/server/raft.go index 2953c32c..01881a8c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -517,6 +517,12 @@ func (s *Server) unregisterRaftNode(group string) { } } +func (s *Server) numRaftNodes() int { + s.rnMu.Lock() + defer s.rnMu.Unlock() + return len(s.raftNodes) +} + func (s *Server) lookupRaftNode(group string) RaftNode { s.rnMu.RLock() defer s.rnMu.RUnlock()