mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
add implementation for consumer replica change (#3293)
* add implementation for consumer replica change fixes #3262 also check peer list on every update Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -3593,7 +3593,13 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
|
||||
}
|
||||
|
||||
if isClustered && !req.Config.Direct {
|
||||
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
|
||||
// If we are inline with client, we still may need to do a callout for consumer info
|
||||
// during this call, so place in Go routine to not block client.
|
||||
if c.kind != ROUTER && c.kind != GATEWAY {
|
||||
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
|
||||
} else {
|
||||
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1878,6 +1878,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
case <-uch:
|
||||
// keep stream assignment current
|
||||
sa = mset.streamAssignment()
|
||||
// keep peer list up to date with config
|
||||
js.checkPeers(mset.raftGroup())
|
||||
// We get this when we have a new stream assignment caused by an update.
|
||||
// We want to know if we are migrating.
|
||||
migrating := mset.isMigrating()
|
||||
@@ -3739,7 +3741,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
// We get this when we have a new consumer assignment caused by an update.
|
||||
// We want to know if we are migrating.
|
||||
rg := o.raftGroup()
|
||||
|
||||
// keep peer list up to date with config
|
||||
js.checkPeers(rg)
|
||||
// If we are migrating, monitor for the new peers to be caught up.
|
||||
if isLeader && len(rg.Peers) != o.replica() {
|
||||
startMigrationMonitoring()
|
||||
@@ -4764,6 +4767,57 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
|
||||
cc.meta.Propose(encodeAddStreamAssignment(sa))
|
||||
}
|
||||
|
||||
var (
|
||||
errReqTimeout = errors.New("timeout while waiting for response")
|
||||
errReqSrvExit = errors.New("server shutdown while waiting for response")
|
||||
)
|
||||
|
||||
// blocking utility call to perform requests on the system account
|
||||
// returns (synchronized) v or error
|
||||
func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{}) (interface{}, error) {
|
||||
isubj := fmt.Sprintf(subjFormat, args...)
|
||||
s.mu.Lock()
|
||||
inbox := s.newRespInbox()
|
||||
results := make(chan interface{}, 1)
|
||||
// Store our handler.
|
||||
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
|
||||
if err := json.Unmarshal(msg, v); err != nil {
|
||||
s.Warnf("Error unmarshalling response for request '%s':%v", isubj, err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case results <- v:
|
||||
default:
|
||||
s.Warnf("Failed placing request response on internal channel")
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
|
||||
const timeout = 2 * time.Second
|
||||
notActive := time.NewTimer(timeout)
|
||||
defer notActive.Stop()
|
||||
|
||||
var err error
|
||||
var data interface{}
|
||||
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
err = errReqSrvExit
|
||||
case <-notActive.C:
|
||||
err = errReqTimeout
|
||||
case data = <-results:
|
||||
}
|
||||
// Clean up here.
|
||||
s.mu.Lock()
|
||||
if s.sys != nil && s.sys.replies != nil {
|
||||
delete(s.sys.replies, inbox)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return data, err
|
||||
}
|
||||
|
||||
func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil || cc == nil {
|
||||
@@ -4891,48 +4945,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
|
||||
if !s.allPeersOffline(rg) {
|
||||
// Need to release js lock.
|
||||
js.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
inbox := s.newRespInbox()
|
||||
results := make(chan *StreamInfo, 1)
|
||||
// Store our handler.
|
||||
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
|
||||
var si StreamInfo
|
||||
if err := json.Unmarshal(msg, &si); err != nil {
|
||||
s.Warnf("Error unmarshaling clustered stream info response:%v", err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case results <- &si:
|
||||
default:
|
||||
s.Warnf("Failed placing remote stream info result on internal channel")
|
||||
}
|
||||
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
|
||||
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
|
||||
} else if cl := si.(*StreamInfo).Cluster; cl != nil {
|
||||
curLeader = string(getHash(cl.Leader))
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
isubj := fmt.Sprintf(clusterStreamInfoT, ci.serviceAccount(), cfg.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
|
||||
const timeout = 2 * time.Second
|
||||
notActive := time.NewTimer(timeout)
|
||||
defer notActive.Stop()
|
||||
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
break
|
||||
case <-notActive.C:
|
||||
s.Warnf("Did not receive stream info results for '%s > %s'", acc, cfg.Name)
|
||||
case si := <-results:
|
||||
if si.Cluster != nil {
|
||||
// The leader here is the server name, but need to convert to internal name.
|
||||
curLeader = string(getHash(si.Cluster.Leader))
|
||||
}
|
||||
}
|
||||
// Clean up here.
|
||||
s.mu.Lock()
|
||||
if s.sys != nil && s.sys.replies != nil {
|
||||
delete(s.sys.replies, inbox)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
// Re-acquire here.
|
||||
js.mu.Lock()
|
||||
}
|
||||
@@ -5807,13 +5824,74 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
|
||||
Created: time.Now().UTC(),
|
||||
}
|
||||
} else {
|
||||
nca := ca.copyGroup()
|
||||
|
||||
rBefore := ca.Config.replicas(sa.Config)
|
||||
rAfter := cfg.replicas(sa.Config)
|
||||
|
||||
var curLeader string
|
||||
if rBefore != rAfter {
|
||||
// We are modifying nodes here. We want to do our best to preserve the current leader.
|
||||
// We have support now from above that guarantees we are in our own Go routine, so can
|
||||
// ask for stream info from the stream leader to make sure we keep the leader in the new list.
|
||||
if !s.allPeersOffline(ca.Group) {
|
||||
// Need to release js lock.
|
||||
js.mu.Unlock()
|
||||
if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
|
||||
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, sa.Config.Name, cfg.Durable, err)
|
||||
} else if cl := ci.(*ConsumerInfo).Cluster; cl != nil {
|
||||
curLeader = string(getHash(cl.Leader))
|
||||
}
|
||||
// Re-acquire here.
|
||||
js.mu.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
if rBefore < rAfter {
|
||||
newPeerSet := nca.Group.Peers
|
||||
// scale up by adding new members from the stream peer set that are not yet in the consumer peer set
|
||||
streamPeerSet := copyStrings(sa.Group.Peers)
|
||||
rand.Shuffle(rAfter, func(i, j int) { streamPeerSet[i], streamPeerSet[j] = streamPeerSet[j], streamPeerSet[i] })
|
||||
for _, p := range streamPeerSet {
|
||||
found := false
|
||||
for _, sp := range newPeerSet {
|
||||
if sp == p {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
newPeerSet = append(newPeerSet, p)
|
||||
if len(newPeerSet) == rAfter {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
nca.Group.Peers = newPeerSet
|
||||
nca.Group.Preferred = curLeader
|
||||
} else if rBefore > rAfter {
|
||||
newPeerSet := nca.Group.Peers
|
||||
// mark leader preferred and move it to end
|
||||
nca.Group.Preferred = curLeader
|
||||
if nca.Group.Preferred != _EMPTY_ {
|
||||
for i, p := range newPeerSet {
|
||||
if nca.Group.Preferred == p {
|
||||
newPeerSet[i] = newPeerSet[len(newPeerSet)-1]
|
||||
newPeerSet[len(newPeerSet)-1] = p
|
||||
}
|
||||
}
|
||||
}
|
||||
// scale down by removing peers from the end
|
||||
newPeerSet = newPeerSet[len(newPeerSet)-rAfter:]
|
||||
nca.Group.Peers = newPeerSet
|
||||
}
|
||||
|
||||
// Update config and client info on copy of existing.
|
||||
nca := *ca
|
||||
nca.Config = cfg
|
||||
nca.Client = ci
|
||||
nca.Subject = subject
|
||||
nca.Reply = reply
|
||||
ca = &nca
|
||||
ca = nca
|
||||
}
|
||||
|
||||
eca := encodeAddConsumerAssignment(ca)
|
||||
|
||||
@@ -3670,6 +3670,98 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterScaleConsumer(t *testing.T) {
|
||||
c := createJetStreamClusterWithTemplate(t, jsClusterTempl, "C", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
srv := c.randomNonLeader()
|
||||
nc, js := jsClientConnect(t, srv)
|
||||
defer nc.Close()
|
||||
|
||||
si, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
durCfg := &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}
|
||||
ci, err := js.AddConsumer("TEST", durCfg)
|
||||
require_NoError(t, err)
|
||||
require_True(t, ci.Config.Replicas == 0)
|
||||
|
||||
toSend := uint64(1_000)
|
||||
for i := uint64(0); i < toSend; i++ {
|
||||
_, err = js.Publish("foo", nil)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
s, err := js.PullSubscribe("foo", "DUR")
|
||||
require_NoError(t, err)
|
||||
|
||||
consumeOne := func(expSeq uint64) error {
|
||||
if ci, err := js.ConsumerInfo("TEST", "DUR"); err != nil {
|
||||
return err
|
||||
} else if ci.Delivered.Stream != expSeq {
|
||||
return fmt.Errorf("pre: not expected delivered stream %d, got %d", expSeq, ci.Delivered.Stream)
|
||||
} else if ci.Delivered.Consumer != expSeq {
|
||||
return fmt.Errorf("pre: not expected delivered consumer %d, got %d", expSeq, ci.Delivered.Consumer)
|
||||
} else if ci.AckFloor.Stream != expSeq {
|
||||
return fmt.Errorf("pre: not expected ack stream %d, got %d", expSeq, ci.AckFloor.Stream)
|
||||
} else if ci.AckFloor.Consumer != expSeq {
|
||||
return fmt.Errorf("pre: not expected ack consumer %d, got %d", expSeq, ci.AckFloor.Consumer)
|
||||
}
|
||||
if m, err := s.Fetch(1); err != nil {
|
||||
return err
|
||||
} else if err := m[0].AckSync(); err != nil {
|
||||
return err
|
||||
}
|
||||
expSeq = expSeq + 1
|
||||
if ci, err := js.ConsumerInfo("TEST", "DUR"); err != nil {
|
||||
return err
|
||||
} else if ci.Delivered.Stream != expSeq {
|
||||
return fmt.Errorf("post: not expected delivered stream %d, got %d", expSeq, ci.Delivered.Stream)
|
||||
} else if ci.Delivered.Consumer != expSeq {
|
||||
return fmt.Errorf("post: not expected delivered consumer %d, got %d", expSeq, ci.Delivered.Consumer)
|
||||
} else if ci.AckFloor.Stream != expSeq {
|
||||
return fmt.Errorf("post: not expected ack stream %d, got %d", expSeq, ci.AckFloor.Stream)
|
||||
} else if ci.AckFloor.Consumer != expSeq {
|
||||
return fmt.Errorf("post: not expected ack consumer %d, got %d", expSeq, ci.AckFloor.Consumer)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
require_NoError(t, consumeOne(0))
|
||||
|
||||
// scale down, up, down and up to default == 3 again
|
||||
for i, r := range []int{1, 3, 1, 0} {
|
||||
durCfg.Replicas = r
|
||||
if r == 0 {
|
||||
r = si.Config.Replicas
|
||||
}
|
||||
js.UpdateConsumer("TEST", durCfg)
|
||||
|
||||
checkFor(t, time.Second*30, time.Millisecond*250, func() error {
|
||||
if ci, err = js.ConsumerInfo("TEST", "DUR"); err != nil {
|
||||
return err
|
||||
} else if ci.Cluster.Leader == _EMPTY_ {
|
||||
return fmt.Errorf("no leader")
|
||||
} else if len(ci.Cluster.Replicas) != r-1 {
|
||||
return fmt.Errorf("not enough replica, got %d wanted %d", len(ci.Cluster.Replicas), r-1)
|
||||
} else {
|
||||
for _, r := range ci.Cluster.Replicas {
|
||||
if !r.Current || r.Offline || r.Lag != 0 {
|
||||
return fmt.Errorf("replica %s not current %t offline %t lag %d", r.Name, r.Current, r.Offline, r.Lag)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
require_NoError(t, consumeOne(uint64(i+1)))
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMoveCancel(t *testing.T) {
|
||||
server := map[string]struct{}{}
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
|
||||
|
||||
Reference in New Issue
Block a user