mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Add in advisories for leader elected and quorum lost advisories.
Note that quorum lost only fires if the old leader steps down. If the leader itself fails and that causes the loss of quorum currently no advisory is sent. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2316,6 +2316,13 @@ func (mset *Stream) DeleteConsumer(o *Consumer) error {
|
||||
return o.Delete()
|
||||
}
|
||||
|
||||
func (o *Consumer) Stream() string {
|
||||
o.mu.RLock()
|
||||
mset := o.mset
|
||||
o.mu.RUnlock()
|
||||
return mset.Name()
|
||||
}
|
||||
|
||||
// Active indicates if this consumer is still active.
|
||||
func (o *Consumer) Active() bool {
|
||||
o.mu.Lock()
|
||||
|
||||
@@ -162,33 +162,45 @@ const (
|
||||
// JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
|
||||
JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
|
||||
|
||||
// JSAdvisoryStreamCreatedPre notification that a stream was created
|
||||
// JSAdvisoryStreamCreatedPre notification that a stream was created.
|
||||
JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
|
||||
|
||||
// JSAdvisoryStreamDeletedPre notification that a stream was deleted
|
||||
// JSAdvisoryStreamDeletedPre notification that a stream was deleted.
|
||||
JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
|
||||
|
||||
// JSAdvisoryStreamUpdatedPre notification that a stream was updated
|
||||
// JSAdvisoryStreamUpdatedPre notification that a stream was updated.
|
||||
JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
|
||||
|
||||
// JSAdvisoryConsumerCreatedPre notification that a template created
|
||||
// JSAdvisoryConsumerCreatedPre notification that a template created.
|
||||
JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
|
||||
|
||||
// JSAdvisoryConsumerDeletedPre notification that a template deleted
|
||||
// JSAdvisoryConsumerDeletedPre notification that a template deleted.
|
||||
JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
|
||||
|
||||
// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created
|
||||
// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
|
||||
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
|
||||
|
||||
// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed
|
||||
// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
|
||||
JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
|
||||
|
||||
// JSAdvisoryStreamRestoreCreatePre notification that a restore was start
|
||||
// JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
|
||||
JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
|
||||
|
||||
// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed
|
||||
// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
|
||||
JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
|
||||
|
||||
// JSAdvisoryStreamLeaderElectPre notification that a replicated stream has elected a leader.
|
||||
JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
|
||||
|
||||
// JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
|
||||
JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
|
||||
|
||||
// JSAdvisoryConsumerLeaderElectPre notification that a replicated consumer has elected a leader.
|
||||
JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
|
||||
|
||||
// JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
|
||||
JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
|
||||
|
||||
// JSAuditAdvisory is a notification about JetStream API access.
|
||||
// FIXME - Add in details about who..
|
||||
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
|
||||
|
||||
@@ -1241,22 +1241,68 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
|
||||
return didSnap, nil
|
||||
}
|
||||
|
||||
// Returns the PeerInfo for all replicas of a raft node. This is different than node.Peers()
|
||||
// and is used for external facing advisories.
|
||||
func (s *Server) replicas(node RaftNode) []*PeerInfo {
|
||||
now := time.Now()
|
||||
var replicas []*PeerInfo
|
||||
for _, rp := range node.Peers() {
|
||||
pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: rp.Current, Active: now.Sub(rp.Last)}
|
||||
replicas = append(replicas, pi)
|
||||
}
|
||||
return replicas
|
||||
}
|
||||
|
||||
func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignment, isLeader bool) {
|
||||
if isLeader {
|
||||
js.srv.Noticef("JetStream cluster new stream leader for '%s > %s'", sa.Client.Account, mset.Name())
|
||||
}
|
||||
|
||||
mset.setLeader(isLeader)
|
||||
|
||||
js.mu.Lock()
|
||||
if !isLeader || sa.responded {
|
||||
js.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s, account, err := js.srv, sa.Client.Account, sa.err
|
||||
client, reply := sa.Client, sa.Reply
|
||||
hasResponded := sa.responded
|
||||
sa.responded = true
|
||||
js.mu.Unlock()
|
||||
|
||||
stream := mset.Name()
|
||||
|
||||
if isLeader {
|
||||
s.Noticef("JetStream cluster new stream leader for '%s > %s'", sa.Client.Account, stream)
|
||||
if node := mset.raftNode(); node != nil {
|
||||
s.publishAdvisory(mset.account(), JSAdvisoryStreamLeaderElectedPre+"."+stream, &JSStreamLeaderElectedAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSStreamLeaderElectedAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
State: mset.State(),
|
||||
Leader: s.serverNameForNode(node.GroupLeader()),
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// We are stepping down. Make sure if we are doing so because we have lost quorum that
|
||||
// we send the appropriate advisories.
|
||||
if node := mset.raftNode(); node != nil && !node.Quorum() {
|
||||
s.Warnf("JetStream cluster stream '%s > %s' has lost quorum, stalled.", sa.Client.Account, stream)
|
||||
s.publishAdvisory(mset.account(), JSAdvisoryStreamQuorumLostPre+"."+stream, &JSStreamQuorumLostAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSStreamQuorumLostAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
State: mset.State(),
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Tell stream to switch leader status.
|
||||
mset.setLeader(isLeader)
|
||||
|
||||
if !isLeader || hasResponded {
|
||||
return
|
||||
}
|
||||
|
||||
acc, _ := s.LookupAccount(account)
|
||||
if acc == nil {
|
||||
return
|
||||
@@ -1973,27 +2019,60 @@ func decodeDeliveredUpdate(buf []byte) (dseq, sseq, dc uint64, ts int64, err err
|
||||
}
|
||||
|
||||
func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssignment, isLeader bool) {
|
||||
if isLeader {
|
||||
js.srv.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.Account, ca.Stream, ca.Name)
|
||||
}
|
||||
|
||||
o.setLeader(isLeader)
|
||||
|
||||
js.mu.Lock()
|
||||
if !isLeader || ca.responded {
|
||||
ca.responded = true
|
||||
js.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s, account, err := js.srv, ca.Client.Account, ca.err
|
||||
client, reply := ca.Client, ca.Reply
|
||||
hasResponded := ca.responded
|
||||
ca.responded = true
|
||||
js.mu.Unlock()
|
||||
|
||||
stream := o.Stream()
|
||||
consumer := o.Name()
|
||||
acc, _ := s.LookupAccount(account)
|
||||
if acc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if isLeader {
|
||||
s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.Account, stream, consumer)
|
||||
if node := o.raftNode(); node != nil {
|
||||
s.publishAdvisory(acc, JSAdvisoryConsumerLeaderElectedPre+"."+stream+"."+consumer, &JSConsumerLeaderElectedAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSConsumerLeaderElectedAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Consumer: consumer,
|
||||
Leader: s.serverNameForNode(node.GroupLeader()),
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// We are stepping down. Make sure if we are doing so because we have lost quorum that
|
||||
// we send the appropriate advisories.
|
||||
if node := o.raftNode(); node != nil && !node.Quorum() {
|
||||
s.Warnf("JetStream cluster consumer '%s > %s >%s' has lost quorum, stalled.", ca.Client.Account, stream, consumer)
|
||||
s.publishAdvisory(acc, JSAdvisoryConsumerQuorumLostPre+"."+stream+"."+consumer, &JSConsumerQuorumLostAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSConsumerQuorumLostAdvisoryType,
|
||||
ID: nuid.Next(),
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Stream: stream,
|
||||
Consumer: consumer,
|
||||
Replicas: s.replicas(node),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Tell consumer to switch leader status.
|
||||
o.setLeader(isLeader)
|
||||
|
||||
if !isLeader || hasResponded {
|
||||
return
|
||||
}
|
||||
|
||||
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
|
||||
if err != nil {
|
||||
resp.Error = jsError(err)
|
||||
|
||||
@@ -145,3 +145,53 @@ type JSRestoreCompleteAdvisory struct {
|
||||
|
||||
// JSRestoreCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory
|
||||
const JSRestoreCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.restore_complete"
|
||||
|
||||
// Clustering specific.
|
||||
|
||||
// JSStreamLeaderElectedAdvisoryType is sent when the system elects a leader for a stream.
|
||||
const JSStreamLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.stream_leader_elected"
|
||||
|
||||
// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
|
||||
type JSStreamLeaderElectedAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
State StreamState `json:"state"`
|
||||
Leader string `json:"leader"`
|
||||
Replicas []*PeerInfo `json:"replicas"`
|
||||
}
|
||||
|
||||
// JSStreamQuorumAdvisoryType is sent when the system detects a clustered stream and
|
||||
// its consumers are stalled and unable to make progress.
|
||||
const JSStreamQuorumLostAdvisoryType = "io.nats.jetstream.advisory.v1.stream_quorum_lost"
|
||||
|
||||
// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
|
||||
type JSStreamQuorumLostAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
State StreamState `json:"state"`
|
||||
Replicas []*PeerInfo `json:"replicas"`
|
||||
}
|
||||
|
||||
// JSConsumerLeaderElectedAdvisoryType is sent when the system elects a leader for a consumer.
|
||||
const JSConsumerLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_leader_elected"
|
||||
|
||||
// JSStreamQuorumLostAdvisory indicates that a stream has lost quorum and is stalled.
|
||||
type JSConsumerLeaderElectedAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
Consumer string `json:"consumer"`
|
||||
Leader string `json:"leader"`
|
||||
Replicas []*PeerInfo `json:"replicas"`
|
||||
}
|
||||
|
||||
// JSConsumerQuorumAdvisoryType is sent when the system detects a clustered consumer and
|
||||
// is stalled and unable to make progress.
|
||||
const JSConsumerQuorumLostAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_quorum_lost"
|
||||
|
||||
// JSConsumerQuorumLostAdvisory indicates that a consumer has lost quorum and is stalled.
|
||||
type JSConsumerQuorumLostAdvisory struct {
|
||||
TypedEvent
|
||||
Stream string `json:"stream"`
|
||||
Consumer string `json:"consumer"`
|
||||
Replicas []*PeerInfo `json:"replicas"`
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ type RaftNode interface {
|
||||
State() RaftState
|
||||
Size() (entries, bytes uint64)
|
||||
Leader() bool
|
||||
Quorum() bool
|
||||
Current() bool
|
||||
GroupLeader() string
|
||||
StepDown() error
|
||||
@@ -754,9 +755,6 @@ func (n *raft) Group() string {
|
||||
func (n *raft) Peers() []*Peer {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
if n.state != Leader {
|
||||
return nil
|
||||
}
|
||||
|
||||
var peers []*Peer
|
||||
for id, ps := range n.peers {
|
||||
@@ -1196,6 +1194,23 @@ func (n *raft) runAsLeader() {
|
||||
}
|
||||
}
|
||||
|
||||
// Quorum reports the quorum status. Will be called on former leaders.
|
||||
func (n *raft) Quorum() bool {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
now, nc := time.Now().UnixNano(), 1
|
||||
for _, peer := range n.peers {
|
||||
if now-peer.ts < int64(lostQuorumInterval) {
|
||||
nc++
|
||||
if nc >= n.qn {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (n *raft) lostQuorum() bool {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
@@ -2285,10 +2285,29 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
// Setup subscription for leader elected.
|
||||
lesub, err := nc.SubscribeSync(server.JSAdvisoryStreamLeaderElectedPre + ".*")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "NO-Q", Replicas: 2}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we received our leader elected advisory.
|
||||
leadv, _ := lesub.NextMsg(0)
|
||||
if leadv == nil {
|
||||
t.Fatalf("Expected to receive a leader elected advisory")
|
||||
}
|
||||
var le server.JSStreamLeaderElectedAdvisory
|
||||
if err := json.Unmarshal(leadv.Data, &le); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if ln := c.streamLeader("$G", "NO-Q").Name(); le.Leader != ln {
|
||||
t.Fatalf("Expected to have leader %q in elect advisory, got %q", ln, le.Leader)
|
||||
}
|
||||
|
||||
payload := []byte("Hello JSC")
|
||||
for i := 0; i < 10; i++ {
|
||||
if _, err := js.Publish("NO-Q", payload); err != nil {
|
||||
@@ -2296,6 +2315,12 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Setup subscription for leader elected.
|
||||
clesub, err := nc.SubscribeSync(server.JSAdvisoryConsumerLeaderElectedPre + ".*.*")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
sub, err := js.SubscribeSync("NO-Q")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@@ -2305,10 +2330,23 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we received our consumer leader elected advisory.
|
||||
leadv, _ = clesub.NextMsg(0)
|
||||
if leadv == nil {
|
||||
t.Fatalf("Expected to receive a consumer leader elected advisory")
|
||||
}
|
||||
|
||||
// Setup subscription for lost quorum advisory.
|
||||
ssub, err := nc.SubscribeSync(server.JSAdvisoryStreamQuorumLostPre + ".*")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
// Shutdown the non-leader.
|
||||
c.randomNonStreamLeader("$G", "NO-Q").Shutdown()
|
||||
|
||||
// This should eventually have us stepdown as leader since we would have lost quorum.
|
||||
// This should eventually have us stepdown as leader since we would have lost quorum with R=2.
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
if sl := c.streamLeader("$G", "NO-Q"); sl == nil {
|
||||
return nil
|
||||
@@ -2339,6 +2377,19 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
t.Fatalf("Expected an 'unavailable' error, got %v", err)
|
||||
}
|
||||
|
||||
// Make sure we received our lost quorum advisories.
|
||||
adv, _ := ssub.NextMsg(0)
|
||||
if adv == nil {
|
||||
t.Fatalf("Expected to receive a quorum lost advisory")
|
||||
}
|
||||
var lqa server.JSStreamQuorumLostAdvisory
|
||||
if err := json.Unmarshal(adv.Data, &lqa); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if len(lqa.Replicas) != 2 {
|
||||
t.Fatalf("Expected reports for both replicas, only got %d", len(lqa.Replicas))
|
||||
}
|
||||
|
||||
// Now let's take out the other non meta-leader server.
|
||||
// We should get same error for general API calls.
|
||||
c.randomNonLeader().Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user