Allow replica updates during stream update.

Also add in HAAssets count to Jsz.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-02-13 15:37:43 -08:00
parent 7f985e350c
commit fb15dfd9b7
4 changed files with 180 additions and 20 deletions

View File

@@ -55,6 +55,7 @@ type JetStreamStats struct {
ReservedMemory uint64 `json:"reserved_memory"`
ReservedStore uint64 `json:"reserved_storage"`
Accounts int `json:"accounts"`
HAAssets int `json:"ha_assets"`
API JetStreamAPIStats `json:"api"`
}
@@ -1738,12 +1739,14 @@ func (js *jetStream) usageStats() *JetStreamStats {
stats.Accounts = len(js.accounts)
stats.ReservedMemory = (uint64)(js.memReserved)
stats.ReservedStore = (uint64)(js.storeReserved)
s := js.srv
js.mu.RUnlock()
stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiTotal))
stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors))
stats.API.Inflight = (uint64)(atomic.LoadInt64(&js.apiInflight))
stats.Memory = (uint64)(atomic.LoadInt64(&js.memUsed))
stats.Store = (uint64)(atomic.LoadInt64(&js.storeUsed))
stats.HAAssets = s.numRaftNodes()
return &stats
}

View File

@@ -94,6 +94,7 @@ type raftGroup struct {
Name string `json:"name"`
Peers []string `json:"peers"`
Storage StorageType `json:"store"`
Cluster string `json:"cluster,omitempty"`
Preferred string `json:"preferred,omitempty"`
// Internal
node RaftNode
@@ -1460,6 +1461,15 @@ func (mset *stream) raftNode() RaftNode {
return mset.node
}
func (mset *stream) removeNode() {
mset.mu.Lock()
defer mset.mu.Unlock()
if n := mset.node; n != nil {
n.Delete()
mset.node = nil
}
}
// Monitor our stream node for this stream.
func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
s, cc, n := js.server(), js.cluster, sa.Group.node
@@ -2162,7 +2172,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
accStreams := cc.streams[acc.Name]
if accStreams == nil {
accStreams = make(map[string]*streamAssignment)
} else if osa := accStreams[stream]; osa != nil {
} else if osa := accStreams[stream]; osa != nil && osa != sa {
// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
sa.consumers = osa.consumers
@@ -2273,7 +2283,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
}
// processClusterUpdateStream is called when we have a stream assignment that
// has been updated for an existing assignment.
// has been updated for an existing assignment and we are a member.
func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAssignment) {
if sa == nil {
return
@@ -2282,15 +2292,26 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
js.mu.Lock()
s, rg := js.srv, sa.Group
client, subject, reply := sa.Client, sa.Subject, sa.Reply
alreadyRunning := rg.node != nil
alreadyRunning, numReplicas := osa.Group.node != nil, sa.Config.Replicas
needsNode := rg.node == nil
storage := sa.Config.Storage
hasResponded := sa.responded
sa.responded = true
js.mu.Unlock()
mset, err := acc.lookupStream(sa.Config.Name)
if err == nil && mset != nil {
if !alreadyRunning {
if !alreadyRunning && numReplicas > 1 {
if needsNode {
js.createRaftGroup(rg, storage)
}
s.startGoRoutine(func() { js.monitorStream(mset, sa) })
} else if numReplicas == 1 && alreadyRunning {
// We downgraded to R1. Make sure we cleanup the raft node and the stream monitor.
mset.removeNode()
js.mu.Lock()
sa.Group.node = nil
js.mu.Unlock()
}
mset.setStreamAssignment(sa)
if err = mset.update(sa.Config); err != nil {
@@ -2299,6 +2320,12 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
}
}
// If not found we must be expanding into this node since if we are here we know we are a member.
if err == ErrJetStreamStreamNotFound {
js.processStreamAssignment(sa)
return
}
if err != nil {
js.mu.Lock()
sa.err = err
@@ -2328,6 +2355,12 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
return
}
// If we were a single node being promoted assume leadership role for purpose of responding.
if !hasResponded && !isLeader && !alreadyRunning {
isLeader = true
}
// Check if we should bail.
if !isLeader || hasResponded {
return
}
@@ -3637,7 +3670,7 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
}
// selectPeerGroup will select a group of peers to start a raft group.
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig) []string {
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string) []string {
if cluster == _EMPTY_ || cfg == nil {
return nil
}
@@ -3656,6 +3689,18 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
var nodes []wn
s, peers := cc.s, cc.meta.Peers()
// Map existing.
var ep map[string]struct{}
if le := len(existing); le > 0 {
if le >= r {
return existing
}
ep = make(map[string]struct{})
for _, p := range existing {
ep[p] = struct{}{}
}
}
for _, p := range peers {
si, ok := s.nodeToInfo.Load(p.ID)
if !ok || si == nil {
@@ -3668,6 +3713,13 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
continue
}
// If existing also skip, we will add back in to front of the list when done.
if ep != nil {
if _, ok := ep[p.ID]; ok {
continue
}
}
var available uint64
switch cfg.Storage {
case MemoryStorage:
@@ -3688,7 +3740,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}
// Check if we have enveough room if maxBytes set.
// Otherwise check if we have enough room if maxBytes set.
if maxBytes > 0 && maxBytes > available {
continue
}
@@ -3697,13 +3749,17 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
// If we could not select enough peers, fail.
if len(nodes) < r {
if len(nodes) < (r - len(existing)) {
return nil
}
// Sort based on available from most to least.
sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail })
var results []string
if len(existing) > 0 {
results = append(results, existing...)
r -= len(existing)
}
for _, r := range nodes[:r] {
results = append(results, r.id)
}
@@ -3750,11 +3806,11 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *ra
// Need to create a group here.
for _, cn := range clusters {
peers := cc.selectPeerGroup(replicas, cn, cfg)
peers := cc.selectPeerGroup(replicas, cn, cfg, nil)
if len(peers) < replicas {
continue
}
return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers}
return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn}
}
return nil
}
@@ -3901,12 +3957,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Check for cluster changes that we want to error on.
if newCfg.Replicas != len(osa.Group.Peers) {
resp.Error = NewJSStreamReplicasNotUpdatableError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Check for mirrot changes which are not allowed.
if !reflect.DeepEqual(newCfg.Mirror, osa.Config.Mirror) {
resp.Error = NewJSStreamMirrorNotUpdatableError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
@@ -3929,7 +3980,29 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
}
}
sa := &streamAssignment{Group: osa.Group, Sync: osa.Sync, Config: newCfg, Subject: subject, Reply: reply, Client: ci}
// Check for replica changes.
rg := osa.Group
if newCfg.Replicas != len(rg.Peers) {
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
peers := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers)
if len(peers) != newCfg.Replicas {
resp.Error = NewJSInsufficientResourcesError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Single nodes are not recorded by the NRG layer so we can rename.
if len(rg.Peers) == 1 {
rg.Name = groupNameForStream(peers, rg.Storage)
}
rg.Peers = peers
} else {
// We are deleting nodes here.
rg.Peers = rg.Peers[:newCfg.Replicas]
}
}
sa := &streamAssignment{Group: rg, Sync: osa.Sync, Config: newCfg, Subject: subject, Reply: reply, Client: ci}
cc.meta.Propose(encodeUpdateStreamAssignment(sa))
}

View File

@@ -1456,10 +1456,6 @@ func TestJetStreamClusterStreamExtendedUpdates(t *testing.T) {
if si := updateStream(); !reflect.DeepEqual(si.Config.Subjects, cfg.Subjects) {
t.Fatalf("Did not get expected stream info: %+v", si)
}
// Make sure these error for now.
// R factor changes
cfg.Replicas = 1
expectError()
// Mirror changes
cfg.Replicas = 3
cfg.Mirror = &nats.StreamSource{Name: "ORDERS"}
@@ -10507,6 +10503,88 @@ func TestJetStreamAddConsumerWithInfo(t *testing.T) {
t.Run("Clustered", func(t *testing.T) { testConsInfo(t, c.randomServer()) })
}
func TestJetStreamClusterStreamReplicaUpdates(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R7S", 7)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Start out at R1
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
}
_, err := js.AddStream(cfg)
require_NoError(t, err)
numMsgs := 1000
for i := 0; i < numMsgs; i++ {
js.PublishAsync("foo", []byte("HELLO WORLD"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
updateReplicas := func(r int) {
t.Helper()
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
leader := si.Cluster.Leader
cfg.Replicas = r
_, err = js.UpdateStream(cfg)
require_NoError(t, err)
c.waitOnStreamLeader("$G", "TEST")
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
if len(si.Cluster.Replicas) != r-1 {
return fmt.Errorf("Expected %d replicas, got %d", r-1, len(si.Cluster.Replicas))
}
return nil
})
// Make sure we kept same leader.
if si.Cluster.Leader != leader {
t.Fatalf("Leader changed, expected %q got %q", leader, si.Cluster.Leader)
}
// Make sure all are current.
for _, r := range si.Cluster.Replicas {
c.waitOnStreamCurrent(c.serverByName(r.Name), "$G", "TEST")
}
// Check msgs.
if si.State.Msgs != uint64(numMsgs) {
t.Fatalf("Expected %d msgs, got %d", numMsgs, si.State.Msgs)
}
// Make sure we have the right number of HA Assets running on the leader.
s := c.serverByName(leader)
jsi, err := s.Jsz(nil)
require_NoError(t, err)
nha := 1 // meta always present.
if len(si.Cluster.Replicas) > 0 {
nha++
}
if nha != jsi.HAAssets {
t.Fatalf("Expected %d HA asset(s), but got %d", nha, jsi.HAAssets)
}
}
// Update from 1-3
updateReplicas(3)
// Update from 3-5
updateReplicas(5)
// Update from 5-3
updateReplicas(3)
// Update from 3-1
updateReplicas(1)
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -517,6 +517,12 @@ func (s *Server) unregisterRaftNode(group string) {
}
}
func (s *Server) numRaftNodes() int {
s.rnMu.Lock()
defer s.rnMu.Unlock()
return len(s.raftNodes)
}
func (s *Server) lookupRaftNode(group string) RaftNode {
s.rnMu.RLock()
defer s.rnMu.RUnlock()