Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2022-12-15 04:53:15 -08:00

View File

@@ -1122,10 +1122,7 @@ func (o *consumer) setLeader(isLeader bool) {
}
func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
o.mu.RLock()
sysc := o.sysc
o.mu.RUnlock()
sysc.sendInternalMsg(reply, _EMPTY_, nil, o.info())
o.infoWithSnapAndReply(false, reply)
}
// Lock should be held.
@@ -2202,23 +2199,27 @@ func (o *consumer) info() *ConsumerInfo {
}
func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
o.mu.RLock()
return o.infoWithSnapAndReply(snap, _EMPTY_)
}
func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
o.mu.Lock()
mset := o.mset
if mset == nil || mset.srv == nil {
o.mu.RUnlock()
o.mu.Unlock()
return nil
}
js := o.js
o.mu.RUnlock()
if js == nil {
o.mu.Unlock()
return nil
}
ci := js.clusterInfo(o.raftGroup())
o.mu.Lock()
defer o.mu.Unlock()
// Capture raftGroup.
var rg *raftGroup
if o.ca != nil {
rg = o.ca.Group
}
cfg := o.cfg
info := &ConsumerInfo{
@@ -2238,7 +2239,6 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
NumRedelivered: len(o.rdc),
NumPending: o.streamNumPending(),
PushBound: o.isPushMode() && o.active,
Cluster: ci,
}
// Adjust active based on non-zero etc. Also make UTC here.
if !o.ldt.IsZero() {
@@ -2259,6 +2259,18 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
if snap {
o.ici = info
}
sysc := o.sysc
o.mu.Unlock()
// Do cluster.
if rg != nil {
info.Cluster = js.clusterInfo(rg)
}
// If we have a reply subject send the response here.
if reply != _EMPTY_ && sysc != nil {
sysc.sendInternalMsg(reply, _EMPTY_, nil, info)
}
return info
}