diff --git a/server/consumer.go b/server/consumer.go index 2e8d47e0..edda7863 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -43,7 +43,6 @@ type ConsumerInfo struct { NumRedelivered int `json:"num_redelivered"` NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` - Active *PushActive `json:"active,omitempty"` Cluster *ClusterInfo `json:"cluster,omitempty"` } @@ -71,11 +70,6 @@ type ConsumerConfig struct { Direct bool `json:"direct,omitempty"` } -type PushActive struct { - Subject string `json:"subject"` - Queue string `json:"queue,omitempty"` -} - type CreateConsumerRequest struct { Stream string `json:"stream_name"` Config ConsumerConfig `json:"config"` @@ -1469,8 +1463,6 @@ func (o *consumer) writeStoreState() error { // Info returns our current consumer state. func (o *consumer) info() *ConsumerInfo { - var pa *PushActive - o.mu.RLock() mset := o.mset if mset == nil || mset.srv == nil { @@ -1478,12 +1470,6 @@ func (o *consumer) info() *ConsumerInfo { return nil } js := o.js - if o.isPushMode() && o.active { - pa = &PushActive{ - Subject: o.dsubj, - Queue: o.qgroup, - } - } o.mu.RUnlock() if js == nil { @@ -1512,7 +1498,6 @@ func (o *consumer) info() *ConsumerInfo { NumAckPending: len(o.pending), NumRedelivered: len(o.rdc), NumPending: o.adjustedPending(), - Active: pa, Cluster: ci, } // If we are a pull mode consumer, report on number of waiting requests. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 19da017f..8a384b6b 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11830,113 +11830,6 @@ func TestJetStreamDomainInPubAck(t *testing.T) { } } -func TestJetStreamPushConsumerInfo(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - config := s.JetStreamConfig() - if config != nil { - defer removeDir(t, config.StoreDir) - } - - nc, js := jsClientConnect(t, s) - defer nc.Close() - - cfg := &nats.StreamConfig{ - Name: "TEST", - Storage: nats.MemoryStorage, - Subjects: []string{"foo"}, - } - if _, err := js.AddStream(cfg); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // We want to test extended consumer info for push based consumers. - // We need to do these by hand for now. - createConsumer := func(name, deliver string) { - t.Helper() - creq := CreateConsumerRequest{ - Stream: "TEST", - Config: ConsumerConfig{ - Durable: name, - DeliverSubject: deliver, - AckPolicy: AckExplicit, - }, - } - req, err := json.Marshal(creq) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - resp, err := nc.Request(fmt.Sprintf(JSApiDurableCreateT, "TEST", name), req, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var ccResp JSApiConsumerCreateResponse - if err := json.Unmarshal(resp.Data, &ccResp); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if ccResp.ConsumerInfo == nil || ccResp.Error != nil { - t.Fatalf("Got a bad response %+v", ccResp) - } - } - - consumerInfo := func(name string) *ConsumerInfo { - t.Helper() - resp, err := nc.Request(fmt.Sprintf(JSApiConsumerInfoT, "TEST", name), nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var cinfo JSApiConsumerInfoResponse - if err := json.Unmarshal(resp.Data, &cinfo); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if cinfo.ConsumerInfo == nil || cinfo.Error != nil { - t.Fatalf("Got a bad response %+v", cinfo) - } - return cinfo.ConsumerInfo - } - - // First create a durable push and make sure we show now active status. - createConsumer("dlc", "d.X") - if ci := consumerInfo("dlc"); ci.Active != nil { - t.Fatalf("Expected active to be nil, got %+v\n", ci.Active) - } - // Now bind the deliver subject. - sub, _ := nc.SubscribeSync("d.X") - nc.Flush() // Make sure it registers. - // Check that its reported. - if ci := consumerInfo("dlc"); ci.Active == nil || ci.Active.Subject != "d.X" { - t.Fatalf("Expected active to be set and have subject %q, got %+v\n", "d.X", ci.Active) - } - sub.Unsubscribe() - nc.Flush() // Make sure it registers. - if ci := consumerInfo("dlc"); ci.Active != nil { - t.Fatalf("Expected active to be nil, got %+v\n", ci.Active) - } - - // Now make sure we have queue groups indictated as needed. - createConsumer("ik", "d.Z") - // Now bind the deliver subject with a queue group. - sub, _ = nc.QueueSubscribeSync("d.Z", "g22") - defer sub.Unsubscribe() - nc.Flush() // Make sure it registers. - // Check that queue group reported. - if ci := consumerInfo("ik"); ci.Active == nil || ci.Active.Subject != "d.Z" || ci.Active.Queue != "g22" { - t.Fatalf("Expected active to be set and have subject %q and queue %q, got %+v\n", "d.Z", "g22", ci.Active) - } - sub.Unsubscribe() - nc.Flush() // Make sure it registers. - if ci := consumerInfo("ik"); ci.Active != nil { - t.Fatalf("Expected active to be nil, got %+v\n", ci.Active) - } - - // Make sure pull consumers report Active as nil. - createConsumer("rip", _EMPTY_) - if ci := consumerInfo("rip"); ci.Active != nil { - t.Fatalf("Expected active to be nil, got %+v\n", ci.Active) - } -} - // Issue #2213 func TestJetStreamDirectConsumersBeingReported(t *testing.T) { s := RunBasicJetStreamServer()