From 59cc0f00152f9dd907f4faf959a5ca36d4c009b9 Mon Sep 17 00:00:00 2001 From: Matt Stephenson Date: Fri, 14 Jan 2022 16:41:32 -0800 Subject: [PATCH] Add source and mirror info to stream monitoring --- server/monitor.go | 14 +++++++----- server/monitor_test.go | 48 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 1f8c484b..6f604ea9 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2388,11 +2388,13 @@ type JSzOptions struct { } type StreamDetail struct { - Name string `json:"name"` - Cluster *ClusterInfo `json:"cluster,omitempty"` - Config *StreamConfig `json:"config,omitempty"` - State StreamState `json:"state,omitempty"` - Consumer []*ConsumerInfo `json:"consumer_detail,omitempty"` + Name string `json:"name"` + Cluster *ClusterInfo `json:"cluster,omitempty"` + Config *StreamConfig `json:"config,omitempty"` + State StreamState `json:"state,omitempty"` + Consumer []*ConsumerInfo `json:"consumer_detail,omitempty"` + Mirror *StreamSourceInfo `json:"mirror,omitempty"` + Sources []*StreamSourceInfo `json:"sources,omitempty"` } type AccountDetail struct { @@ -2469,6 +2471,8 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg State: stream.state(), Cluster: ci, Config: cfg, + Mirror: stream.mirrorInfo(), + Sources: stream.sourcesInfo(), } if optConsumers { for _, consumer := range stream.getPublicConsumers() { diff --git a/server/monitor_test.go b/server/monitor_test.go index c4b75903..446b0a3c 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4056,6 +4056,14 @@ func TestMonitorJsz(t *testing.T) { Replicas: 1, }) require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "my-stream-mirror", + Replicas: 2, + Mirror: &nats.StreamSource{ + Name: "my-stream-replicated", + }, + }) + require_NoError(t, err) _, err = js.AddConsumer("my-stream-replicated", &nats.ConsumerConfig{ Durable: "my-consumer-replicated", AckPolicy: nats.AckExplicitPolicy, @@ -4066,9 +4074,16 @@ func TestMonitorJsz(t *testing.T) { AckPolicy: nats.AckExplicitPolicy, }) require_NoError(t, err) + _, err = js.AddConsumer("my-stream-mirror", &nats.ConsumerConfig{ + Durable: "my-consumer-mirror", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) nc.Flush() _, err = js.Publish("foo", nil) require_NoError(t, err) + // Wait for mirror replication + time.Sleep(100 * time.Millisecond) monUrl1 := fmt.Sprintf("http://127.0.0.1:%d/jsz", 7501) monUrl2 := fmt.Sprintf("http://127.0.0.1:%d/jsz", 5501) @@ -4080,13 +4095,13 @@ func TestMonitorJsz(t *testing.T) { t.Fatalf("expected no account to be returned by %s but got %v", url, info) } if info.Streams == 0 { - t.Fatalf("expected stream count to be 2 but got %d", info.Streams) + t.Fatalf("expected stream count to be 3 but got %d", info.Streams) } if info.Consumers == 0 { - t.Fatalf("expected consumer count to be 2 but got %d", info.Consumers) + t.Fatalf("expected consumer count to be 3 but got %d", info.Consumers) } - if info.Messages != 1 { - t.Fatalf("expected one message but got %d", info.Messages) + if info.Messages != 2 { + t.Fatalf("expected two message but got %d", info.Messages) } } }) @@ -4192,6 +4207,31 @@ func TestMonitorJsz(t *testing.T) { } } }) + t.Run("replication", func(t *testing.T) { + // The replication lag may only be present on the leader + replicationFound := false + for _, url := range []string{monUrl1, monUrl2} { + info := readJsInfo(url + "?acc=ACC&streams=true") + if len(info.AccountDetails) != 1 { + t.Fatalf("expected account ACC to be returned by %s but got %v", url, info) + } + streamFound := false + for _, stream := range info.AccountDetails[0].Streams { + if stream.Name == "my-stream-mirror" { + streamFound = true + if stream.Mirror != nil { + replicationFound = true + } + } + } + if !streamFound { + t.Fatalf("Did not locate my-stream-mirror stream in results") + } + } + if !replicationFound { + t.Fatal("ReplicationLag expected to be present for my-stream-mirror stream") + } + }) t.Run("account-non-existing", func(t *testing.T) { for _, url := range []string{monUrl1, monUrl2} { info := readJsInfo(url + "?acc=DOES_NOT_EXIT")