mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -43,7 +43,6 @@ type ConsumerInfo struct {
|
||||
NumRedelivered int `json:"num_redelivered"`
|
||||
NumWaiting int `json:"num_waiting"`
|
||||
NumPending uint64 `json:"num_pending"`
|
||||
Active *PushActive `json:"active,omitempty"`
|
||||
Cluster *ClusterInfo `json:"cluster,omitempty"`
|
||||
}
|
||||
|
||||
@@ -71,11 +70,6 @@ type ConsumerConfig struct {
|
||||
Direct bool `json:"direct,omitempty"`
|
||||
}
|
||||
|
||||
type PushActive struct {
|
||||
Subject string `json:"subject"`
|
||||
Queue string `json:"queue,omitempty"`
|
||||
}
|
||||
|
||||
type CreateConsumerRequest struct {
|
||||
Stream string `json:"stream_name"`
|
||||
Config ConsumerConfig `json:"config"`
|
||||
@@ -1469,8 +1463,6 @@ func (o *consumer) writeStoreState() error {
|
||||
|
||||
// Info returns our current consumer state.
|
||||
func (o *consumer) info() *ConsumerInfo {
|
||||
var pa *PushActive
|
||||
|
||||
o.mu.RLock()
|
||||
mset := o.mset
|
||||
if mset == nil || mset.srv == nil {
|
||||
@@ -1478,12 +1470,6 @@ func (o *consumer) info() *ConsumerInfo {
|
||||
return nil
|
||||
}
|
||||
js := o.js
|
||||
if o.isPushMode() && o.active {
|
||||
pa = &PushActive{
|
||||
Subject: o.dsubj,
|
||||
Queue: o.qgroup,
|
||||
}
|
||||
}
|
||||
o.mu.RUnlock()
|
||||
|
||||
if js == nil {
|
||||
@@ -1512,7 +1498,6 @@ func (o *consumer) info() *ConsumerInfo {
|
||||
NumAckPending: len(o.pending),
|
||||
NumRedelivered: len(o.rdc),
|
||||
NumPending: o.adjustedPending(),
|
||||
Active: pa,
|
||||
Cluster: ci,
|
||||
}
|
||||
// If we are a pull mode consumer, report on number of waiting requests.
|
||||
|
||||
@@ -11830,113 +11830,6 @@ func TestJetStreamDomainInPubAck(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamPushConsumerInfo(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
config := s.JetStreamConfig()
|
||||
if config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Storage: nats.MemoryStorage,
|
||||
Subjects: []string{"foo"},
|
||||
}
|
||||
if _, err := js.AddStream(cfg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// We want to test extended consumer info for push based consumers.
|
||||
// We need to do these by hand for now.
|
||||
createConsumer := func(name, deliver string) {
|
||||
t.Helper()
|
||||
creq := CreateConsumerRequest{
|
||||
Stream: "TEST",
|
||||
Config: ConsumerConfig{
|
||||
Durable: name,
|
||||
DeliverSubject: deliver,
|
||||
AckPolicy: AckExplicit,
|
||||
},
|
||||
}
|
||||
req, err := json.Marshal(creq)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiDurableCreateT, "TEST", name), req, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
var ccResp JSApiConsumerCreateResponse
|
||||
if err := json.Unmarshal(resp.Data, &ccResp); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if ccResp.ConsumerInfo == nil || ccResp.Error != nil {
|
||||
t.Fatalf("Got a bad response %+v", ccResp)
|
||||
}
|
||||
}
|
||||
|
||||
consumerInfo := func(name string) *ConsumerInfo {
|
||||
t.Helper()
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiConsumerInfoT, "TEST", name), nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
var cinfo JSApiConsumerInfoResponse
|
||||
if err := json.Unmarshal(resp.Data, &cinfo); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if cinfo.ConsumerInfo == nil || cinfo.Error != nil {
|
||||
t.Fatalf("Got a bad response %+v", cinfo)
|
||||
}
|
||||
return cinfo.ConsumerInfo
|
||||
}
|
||||
|
||||
// First create a durable push and make sure we show now active status.
|
||||
createConsumer("dlc", "d.X")
|
||||
if ci := consumerInfo("dlc"); ci.Active != nil {
|
||||
t.Fatalf("Expected active to be nil, got %+v\n", ci.Active)
|
||||
}
|
||||
// Now bind the deliver subject.
|
||||
sub, _ := nc.SubscribeSync("d.X")
|
||||
nc.Flush() // Make sure it registers.
|
||||
// Check that its reported.
|
||||
if ci := consumerInfo("dlc"); ci.Active == nil || ci.Active.Subject != "d.X" {
|
||||
t.Fatalf("Expected active to be set and have subject %q, got %+v\n", "d.X", ci.Active)
|
||||
}
|
||||
sub.Unsubscribe()
|
||||
nc.Flush() // Make sure it registers.
|
||||
if ci := consumerInfo("dlc"); ci.Active != nil {
|
||||
t.Fatalf("Expected active to be nil, got %+v\n", ci.Active)
|
||||
}
|
||||
|
||||
// Now make sure we have queue groups indictated as needed.
|
||||
createConsumer("ik", "d.Z")
|
||||
// Now bind the deliver subject with a queue group.
|
||||
sub, _ = nc.QueueSubscribeSync("d.Z", "g22")
|
||||
defer sub.Unsubscribe()
|
||||
nc.Flush() // Make sure it registers.
|
||||
// Check that queue group reported.
|
||||
if ci := consumerInfo("ik"); ci.Active == nil || ci.Active.Subject != "d.Z" || ci.Active.Queue != "g22" {
|
||||
t.Fatalf("Expected active to be set and have subject %q and queue %q, got %+v\n", "d.Z", "g22", ci.Active)
|
||||
}
|
||||
sub.Unsubscribe()
|
||||
nc.Flush() // Make sure it registers.
|
||||
if ci := consumerInfo("ik"); ci.Active != nil {
|
||||
t.Fatalf("Expected active to be nil, got %+v\n", ci.Active)
|
||||
}
|
||||
|
||||
// Make sure pull consumers report Active as nil.
|
||||
createConsumer("rip", _EMPTY_)
|
||||
if ci := consumerInfo("rip"); ci.Active != nil {
|
||||
t.Fatalf("Expected active to be nil, got %+v\n", ci.Active)
|
||||
}
|
||||
}
|
||||
|
||||
// Issue #2213
|
||||
func TestJetStreamDirectConsumersBeingReported(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
|
||||
Reference in New Issue
Block a user