mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Update PSE for Windows
* Call into the performance counter API directly * Incorporate caching to reduce load on the server
This commit is contained in:
@@ -1,169 +1,268 @@
|
||||
// Copyright 2015-2016 Apcera Inc. All rights reserved.
|
||||
// +build windows
|
||||
|
||||
package pse
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// cache the image name to optimize repeated calls
|
||||
var imageName string
|
||||
var imageLock sync.Mutex
|
||||
var (
|
||||
pdh = syscall.NewLazyDLL("pdh.dll")
|
||||
winPdhOpenQuery = pdh.NewProc("PdhOpenQuery")
|
||||
winPdhAddCounter = pdh.NewProc("PdhAddCounterW")
|
||||
winPdhCollectQueryData = pdh.NewProc("PdhCollectQueryData")
|
||||
winPdhGetFormattedCounterValue = pdh.NewProc("PdhGetFormattedCounterValue")
|
||||
winPdhGetFormattedCounterArray = pdh.NewProc("PdhGetFormattedCounterArrayW")
|
||||
)
|
||||
|
||||
// parseValues parses the results of data returned by typeperf.exe. This
|
||||
// is a series of comma delimited quoted strings, containing date time,
|
||||
// pid, pcpu, rss, and vss. All numeric values are floating point.
|
||||
// eg: "04/17/2016 15.38.00.016", "5123.00000", "1.23400", "123.00000", "123.00000"
|
||||
func parseValues(line string, pid *int, pcpu *float64, rss, vss *int64) (err error) {
|
||||
values := strings.Split(line, ",")
|
||||
if len(values) < 4 {
|
||||
return errors.New("Invalid result.")
|
||||
}
|
||||
// values[0] will be date, time, ignore them
|
||||
// parse the pid
|
||||
fVal, err := strconv.ParseFloat(strings.Trim(values[1], "\""), 64)
|
||||
if err != nil {
|
||||
return errors.New(fmt.Sprintf("Unable to parse pid: %s", values[1]))
|
||||
}
|
||||
*pid = int(fVal)
|
||||
// global performance counter query handle and counters
|
||||
var (
|
||||
pcHandle PDH_HQUERY
|
||||
pidCounter, cpuCounter, rssCounter, vssCounter PDH_HCOUNTER
|
||||
prevCPU float64
|
||||
prevRss int64
|
||||
prevVss int64
|
||||
lastSampleTime time.Time
|
||||
processPid int
|
||||
pcQueryLock sync.Mutex
|
||||
initialSample = true
|
||||
)
|
||||
|
||||
// parse pcpu
|
||||
*pcpu, err = strconv.ParseFloat(strings.Trim(values[2], "\""), 64)
|
||||
if err != nil {
|
||||
return errors.New(fmt.Sprintf("Unable to parse percent cpu: %s", values[2]))
|
||||
}
|
||||
// maxQuerySize is the number of values to return from a query.
|
||||
// It represents the maximum # of servers that can be queried
|
||||
// simultaneously running on a machine.
|
||||
const maxQuerySize = 512
|
||||
|
||||
// parse private working set (rss)
|
||||
fVal, err = strconv.ParseFloat(strings.Trim(values[3], "\""), 64)
|
||||
if err != nil {
|
||||
return errors.New(fmt.Sprintf("Unable to parse working set: %s", values[3]))
|
||||
}
|
||||
*rss = int64(fVal)
|
||||
// Keep static memory around to reuse; this works best for passing
|
||||
// into the pdh API.
|
||||
var counterResults [maxQuerySize]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE
|
||||
|
||||
// parse virtual bytes (vsz)
|
||||
fVal, err = strconv.ParseFloat(strings.Trim(values[4], "\""), 64)
|
||||
if err != nil {
|
||||
return errors.New(fmt.Sprintf("Unable to parse virtual bytes: %s", values[4]))
|
||||
}
|
||||
*vss = int64(fVal)
|
||||
// PDH Types
|
||||
type (
|
||||
PDH_HQUERY syscall.Handle
|
||||
PDH_HCOUNTER syscall.Handle
|
||||
)
|
||||
|
||||
// PDH constants used here
|
||||
const (
|
||||
PDH_FMT_DOUBLE = 0x00000200
|
||||
PDH_INVALID_DATA = 0xC0000BC6
|
||||
PDH_MORE_DATA = 0x800007D2
|
||||
)
|
||||
|
||||
// PDH_FMT_COUNTERVALUE_DOUBLE - double value
|
||||
type PDH_FMT_COUNTERVALUE_DOUBLE struct {
|
||||
CStatus uint32
|
||||
DoubleValue float64
|
||||
}
|
||||
|
||||
// PDH_FMT_COUNTERVALUE_ITEM_DOUBLE is an array
|
||||
// element of a double value
|
||||
type PDH_FMT_COUNTERVALUE_ITEM_DOUBLE struct {
|
||||
SzName *uint16 // pointer to a string
|
||||
FmtValue PDH_FMT_COUNTERVALUE_DOUBLE
|
||||
}
|
||||
|
||||
func pdhAddCounter(hQuery PDH_HQUERY, szFullCounterPath string, dwUserData uintptr, phCounter *PDH_HCOUNTER) error {
|
||||
ptxt, _ := syscall.UTF16PtrFromString(szFullCounterPath)
|
||||
r0, _, _ := winPdhAddCounter.Call(
|
||||
uintptr(hQuery),
|
||||
uintptr(unsafe.Pointer(ptxt)),
|
||||
dwUserData,
|
||||
uintptr(unsafe.Pointer(phCounter)))
|
||||
|
||||
if r0 != 0 {
|
||||
return fmt.Errorf("pdhAddCounter failed. %d", r0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getStatsForProcess retrieves process information for a given instance name.
|
||||
// typeperf.exe is the windows native command line utility to get pcpu, rss,
|
||||
// and vsz equivalents through queries of performance counters.
|
||||
// An alternative is to map the Pdh* native windows API from pdh.dll,
|
||||
// and call those APIs directly - this is a simpler and cleaner approach.
|
||||
func getStatsForProcess(name string, pcpu *float64, rss, vss *int64, pid *int) (err error) {
|
||||
// query the counters using typeperf. "-sc","1" requests one
|
||||
// set of data (versus continuous monitoring)
|
||||
out, err := exec.Command("typeperf.exe",
|
||||
fmt.Sprintf("\\Process(%s)\\ID Process", name),
|
||||
fmt.Sprintf("\\Process(%s)\\%% Processor Time", name),
|
||||
fmt.Sprintf("\\Process(%s)\\Working Set - Private", name),
|
||||
fmt.Sprintf("\\Process(%s)\\Virtual Bytes", name),
|
||||
"-sc", "1").Output()
|
||||
if err != nil {
|
||||
// Signal that the command ran, but the image instance was not found
|
||||
// through a PID of -1.
|
||||
if strings.Contains(string(out), "The data is not valid") {
|
||||
*pid = -1
|
||||
return nil
|
||||
} else {
|
||||
// something went wrong executing the command
|
||||
return errors.New(fmt.Sprintf("typeperf failed: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
results := strings.Split(string(out), "\r\n")
|
||||
// results[0] = newline
|
||||
// results[1] = headers
|
||||
// results[2] = values
|
||||
// ignore the rest...
|
||||
if len(results) < 3 {
|
||||
return errors.New(fmt.Sprintf("unexpected results from typeperf"))
|
||||
}
|
||||
if err = parseValues(results[2], pid, pcpu, rss, vss); err != nil {
|
||||
return err
|
||||
func pdhOpenQuery(datasrc *uint16, userdata uint32, query *PDH_HQUERY) error {
|
||||
r0, _, _ := syscall.Syscall(winPdhOpenQuery.Addr(), 3, 0, uintptr(userdata), uintptr(unsafe.Pointer(query)))
|
||||
if r0 != 0 {
|
||||
return fmt.Errorf("pdhOpenQuery failed - %d", r0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func pdhCollectQueryData(hQuery PDH_HQUERY) error {
|
||||
r0, _, _ := winPdhCollectQueryData.Call(uintptr(hQuery))
|
||||
if r0 != 0 {
|
||||
return fmt.Errorf("pdhCollectQueryData failed - %d", r0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// pdhGetFormattedCounterArrayDouble returns the value of return code
|
||||
// rather than error, to easily check return codes
|
||||
func pdhGetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER, lpdwBufferSize *uint32, lpdwBufferCount *uint32, itemBuffer *PDH_FMT_COUNTERVALUE_ITEM_DOUBLE) uint32 {
|
||||
ret, _, _ := winPdhGetFormattedCounterArray.Call(
|
||||
uintptr(hCounter),
|
||||
uintptr(PDH_FMT_DOUBLE),
|
||||
uintptr(unsafe.Pointer(lpdwBufferSize)),
|
||||
uintptr(unsafe.Pointer(lpdwBufferCount)),
|
||||
uintptr(unsafe.Pointer(itemBuffer)))
|
||||
|
||||
return uint32(ret)
|
||||
}
|
||||
|
||||
func getCounterArrayData(counter PDH_HCOUNTER) ([]float64, error) {
|
||||
var bufSize uint32
|
||||
var bufCount uint32
|
||||
|
||||
// Retrieving array data requires two calls, the first which
|
||||
// requires an adressable empty buffer, and sets size fields.
|
||||
// The second call returns the data.
|
||||
initialBuf := make([]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, 1)
|
||||
ret := pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &initialBuf[0])
|
||||
if ret == PDH_MORE_DATA {
|
||||
// we'll likely never get here, but be safe.
|
||||
if bufCount > maxQuerySize {
|
||||
bufCount = maxQuerySize
|
||||
}
|
||||
ret = pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &counterResults[0])
|
||||
if ret == 0 {
|
||||
rv := make([]float64, bufCount)
|
||||
for i := 0; i < int(bufCount); i++ {
|
||||
rv[i] = counterResults[i].FmtValue.DoubleValue
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
}
|
||||
if ret != 0 {
|
||||
return nil, fmt.Errorf("getCounterArrayData failed - %d", ret)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// getProcessImageName returns the name of the process image, as expected by
|
||||
// typeperf.
|
||||
// the performance counter API.
|
||||
func getProcessImageName() (name string) {
|
||||
name = filepath.Base(os.Args[0])
|
||||
name = strings.TrimRight(name, ".exe")
|
||||
return
|
||||
}
|
||||
|
||||
// procUsage retrieves process cpu and memory information.
|
||||
// Under the hood, typeperf is called. Notably, typeperf cannot search
|
||||
// using a pid, but instead uses a somewhat volatile process image name.
|
||||
// If there is more than one instance, "#<instancecount>" is appended to
|
||||
// the image name. Wildcard filters are supported, but result in a very
|
||||
// complex data set to parse.
|
||||
func ProcUsage(pcpu *float64, rss, vss *int64) error {
|
||||
var ppid int = -1
|
||||
// initialize our counters
|
||||
func initCounters() (err error) {
|
||||
|
||||
imageLock.Lock()
|
||||
name := imageName
|
||||
imageLock.Unlock()
|
||||
|
||||
// Get the pid to retrieve the right set of information for this process.
|
||||
procPid := os.Getpid()
|
||||
|
||||
// if we have cached the image name, try that first
|
||||
if name != "" {
|
||||
err := getStatsForProcess(name, pcpu, rss, vss, &ppid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If the instance name's pid matches ours, we're done.
|
||||
// Otherwise, this instance has been renamed, which is possible
|
||||
// as other process instances start and stop on the system.
|
||||
if ppid == procPid {
|
||||
return nil
|
||||
}
|
||||
processPid = os.Getpid()
|
||||
// require an addressible nil pointer
|
||||
var source uint16
|
||||
if err := pdhOpenQuery(&source, 0, &pcHandle); err != nil {
|
||||
return err
|
||||
}
|
||||
// If we get here, the instance name is invalid (nil, or out of sync)
|
||||
// Query pid and counters until the correct image name is found and
|
||||
// cache it. This is optimized for one or two instances on a windows
|
||||
// node. An alternative is using a wildcard to first lookup up pids,
|
||||
// and parse those to find instance name, then lookup the
|
||||
// performance counters.
|
||||
prefix := getProcessImageName()
|
||||
for i := 0; ppid != procPid; i++ {
|
||||
name = fmt.Sprintf("%s#%d", prefix, i)
|
||||
err := getStatsForProcess(name, pcpu, rss, vss, &ppid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Bail out if an image name is not found.
|
||||
if ppid < 0 {
|
||||
break
|
||||
}
|
||||
// setup the performance counters, search for all server instances
|
||||
name := fmt.Sprintf("%s*", getProcessImageName())
|
||||
pidQuery := fmt.Sprintf("\\Process(%s)\\ID Process", name)
|
||||
cpuQuery := fmt.Sprintf("\\Process(%s)\\%% Processor Time", name)
|
||||
rssQuery := fmt.Sprintf("\\Process(%s)\\Working Set - Private", name)
|
||||
vssQuery := fmt.Sprintf("\\Process(%s)\\Virtual Bytes", name)
|
||||
|
||||
// if the pids equal, this is the right process and cache our
|
||||
// image name
|
||||
if ppid == procPid {
|
||||
imageLock.Lock()
|
||||
imageName = name
|
||||
imageLock.Unlock()
|
||||
break
|
||||
}
|
||||
if err = pdhAddCounter(pcHandle, pidQuery, 0, &pidCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
if ppid < 0 {
|
||||
return errors.New("unable to retrieve process counters")
|
||||
if err = pdhAddCounter(pcHandle, cpuQuery, 0, &cpuCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = pdhAddCounter(pcHandle, rssQuery, 0, &rssCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = pdhAddCounter(pcHandle, vssQuery, 0, &vssCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// prime the counters by collecting once, and sleep to get somewhat
|
||||
// useful information the first request. Counters for the CPU require
|
||||
// at least two collect calls.
|
||||
if err = pdhCollectQueryData(pcHandle); err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(50)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcUsage returns process CPU and memory statistics
|
||||
func ProcUsage(pcpu *float64, rss, vss *int64) error {
|
||||
var err error
|
||||
|
||||
// For simplicity, protect the entire call.
|
||||
// Most simultaneous requests will immediately return
|
||||
// with cached values.
|
||||
pcQueryLock.Lock()
|
||||
defer pcQueryLock.Unlock()
|
||||
|
||||
// First time through, initialize counters.
|
||||
if initialSample {
|
||||
if err = initCounters(); err != nil {
|
||||
return err
|
||||
}
|
||||
initialSample = false
|
||||
} else if time.Since(lastSampleTime) < (2 * time.Second) {
|
||||
// only refresh every two seconds as to minimize impact
|
||||
// on the server.
|
||||
*pcpu = prevCPU
|
||||
*rss = prevRss
|
||||
*vss = prevVss
|
||||
return nil
|
||||
}
|
||||
|
||||
// always save the sample time, even on errors.
|
||||
defer func() {
|
||||
lastSampleTime = time.Now()
|
||||
}()
|
||||
|
||||
// refresh the performance counter data
|
||||
if err = pdhCollectQueryData(pcHandle); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// retrieve the data
|
||||
var pidAry, cpuAry, rssAry, vssAry []float64
|
||||
if pidAry, err = getCounterArrayData(pidCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
if cpuAry, err = getCounterArrayData(cpuCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
if rssAry, err = getCounterArrayData(rssCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
if vssAry, err = getCounterArrayData(vssCounter); err != nil {
|
||||
return err
|
||||
}
|
||||
// find the index of the entry for this process
|
||||
idx := int(-1)
|
||||
for i := range pidAry {
|
||||
if int(pidAry[i]) == processPid {
|
||||
idx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
// no pid found...
|
||||
if idx < 0 {
|
||||
return fmt.Errorf("could not find pid in performance counter results")
|
||||
}
|
||||
// assign values from the performance counters
|
||||
*pcpu = cpuAry[idx]
|
||||
*rss = int64(rssAry[idx])
|
||||
*vss = int64(vssAry[idx])
|
||||
|
||||
// save off cache values
|
||||
prevCPU = *pcpu
|
||||
prevRss = *rss
|
||||
prevVss = *vss
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -26,45 +26,20 @@ func checkValues(t *testing.T, pcpu, tPcpu float64, rss, tRss int64) {
|
||||
if delta < 0 {
|
||||
delta = -delta
|
||||
}
|
||||
if delta > 200*1024 { // 200k
|
||||
if delta > 200*1024 { // 200k - basically sanity check
|
||||
t.Fatalf("RSSs did not match close enough: %d vs %d", rss, tRss)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testParseValues(t *testing.T) {
|
||||
var pid int
|
||||
var pcpu float64
|
||||
var rss, vss int64
|
||||
|
||||
err := parseValues("invalid", &pid, &pcpu, &rss, &vss)
|
||||
if err == nil {
|
||||
t.Fatal("Did not receive expected error.")
|
||||
}
|
||||
err = parseValues(
|
||||
"\"date time\",\"invalid float\",\"invalid float\",\"invalid float\"",
|
||||
&pid, &pcpu, &rss, &vss)
|
||||
if err == nil {
|
||||
t.Fatal("Did not receive expected error.")
|
||||
}
|
||||
err = parseValues(
|
||||
"\"date time\",\"1234.00000\",\"invalid float\",\"invalid float\"",
|
||||
&pid, &pcpu, &rss, &vss)
|
||||
if err == nil {
|
||||
t.Fatal("Did not receive expected error.")
|
||||
}
|
||||
err = parseValues(
|
||||
"\"date time\",\"1234.00000\",\"1234.00000\",\"invalid float\"",
|
||||
&pid, &pcpu, &rss, &vss)
|
||||
if err == nil {
|
||||
t.Fatal("Did not receive expected error.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPSEmulationWin(t *testing.T) {
|
||||
var pcpu, tPcpu float64
|
||||
var rss, vss, tRss int64
|
||||
|
||||
if err := ProcUsage(&pcpu, &rss, &vss); err != nil {
|
||||
t.Fatalf("Error: %v", err)
|
||||
}
|
||||
|
||||
imageName := getProcessImageName()
|
||||
// query the counters using typeperf
|
||||
out, err := exec.Command("typeperf.exe",
|
||||
@@ -83,31 +58,21 @@ func TestPSEmulationWin(t *testing.T) {
|
||||
// parse pcpu
|
||||
tPcpu, err = strconv.ParseFloat(strings.Trim(values[1], "\""), 64)
|
||||
if err != nil {
|
||||
t.Fatal("Unable to parse percent cpu: %s", values[1])
|
||||
t.Fatalf("Unable to parse percent cpu: %s", values[1])
|
||||
}
|
||||
|
||||
// parse private bytes (rss)
|
||||
fval, err := strconv.ParseFloat(strings.Trim(values[2], "\""), 64)
|
||||
if err != nil {
|
||||
t.Fatal("Unable to parse private bytes: %s", values[2])
|
||||
t.Fatalf("Unable to parse private bytes: %s", values[2])
|
||||
}
|
||||
tRss = int64(fval)
|
||||
|
||||
if err = ProcUsage(&pcpu, &rss, &vss); err != nil {
|
||||
t.Fatal("Error: %v", err)
|
||||
}
|
||||
checkValues(t, pcpu, tPcpu, rss, tRss)
|
||||
|
||||
// Again to test image name caching
|
||||
// Again to test caching
|
||||
if err = ProcUsage(&pcpu, &rss, &vss); err != nil {
|
||||
t.Fatal("Error: %v", err)
|
||||
t.Fatalf("Error: %v", err)
|
||||
}
|
||||
checkValues(t, pcpu, tPcpu, rss, tRss)
|
||||
|
||||
testParseValues(t)
|
||||
|
||||
var ppid int
|
||||
if err = getStatsForProcess("invalid", &pcpu, &rss, &vss, &ppid); err != nil {
|
||||
t.Fatal("Did not receive expected error.")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user