Merge pull request #2931 from nats-io/ipq_changes

Changes to IPQueues
This commit is contained in:
Ivan Kozlovic
2022-03-17 19:13:02 -06:00
committed by GitHub
13 changed files with 235 additions and 221 deletions

View File

@@ -1077,6 +1077,38 @@ func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) {
ResponseHandler(w, r, buf[:n])
}
type monitorIPQueue struct {
Pending int `json:"pending"`
InProgress int `json:"in_progress,omitempty"`
}
func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
all, err := decodeBool(w, r, "all")
if err != nil {
return
}
qfilter := r.URL.Query().Get("queues")
queues := map[string]monitorIPQueue{}
s.ipQueues.Range(func(k, v interface{}) bool {
name := k.(string)
queue := v.(*ipQueue)
pending := queue.len()
inProgress := int(queue.inProgress())
if !all && (pending == 0 && inProgress == 0) {
return true
} else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) {
return true
}
queues[name] = monitorIPQueue{Pending: pending, InProgress: inProgress}
return true
})
b, _ := json.MarshalIndent(queues, "", " ")
ResponseHandler(w, r, b)
}
// Varz will output server information on the monitoring port at /varz.
type Varz struct {
ID string `json:"server_id"`