1
0
mirror of https://github.com/taigrr/go-fastping synced 2025-01-18 05:03:15 -08:00

Multi-thread receive of packets

This commit is contained in:
Anton Skorokhod 2015-09-21 13:59:13 +02:00 committed by Tatsushi Demachi
parent 04c22ac9c6
commit 42755b085b

View File

@ -107,7 +107,7 @@ func ipv4Payload(b []byte) []byte {
}
func numGoRoutines() int {
return runtime.GOMAXPROCS(0) * 4
return runtime.NumCPU() * 4
}
type packet struct {
@ -420,18 +420,25 @@ func (p *Pinger) run(once bool) {
defer conn6.Close()
}
recv := make(chan *packet, 1)
recv := make(chan *packet, len(p.addrs))
recvCtx := newContext()
wg := new(sync.WaitGroup)
p.debugln("Run(): call recvICMP()")
if conn != nil {
wg.Add(1)
go p.recvICMP(conn, recv, recvCtx, wg)
routines := runtime.NumCPU()
wg.Add(routines)
for i := 0; i < routines; i++ {
go p.recvICMP(conn, recv, recvCtx, wg)
}
}
if conn6 != nil {
wg.Add(1)
go p.recvICMP(conn6, recv, recvCtx, wg)
routines := runtime.NumCPU()
wg.Add(routines)
for i := 0; i < routines; i++ {
go p.recvICMP(conn6, recv, recvCtx, wg)
}
}
p.debugln("Run(): call sendICMP()")
@ -476,6 +483,13 @@ mainloop:
p.debugln("Run(): wait recvICMP()")
wg.Wait()
// process all queued results
close(recv)
for r := range recv {
p.debugln("Run(): <-recv")
p.procRecv(r, queue)
}
p.mu.Lock()
p.ctx.err = err
p.mu.Unlock()
@ -488,7 +502,7 @@ mainloop:
func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr, error) {
type sendResult struct {
addr *net.IPAddr
err error
err error
}
p.debugln("sendICMP(): Start")
@ -504,7 +518,7 @@ func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr,
results := make(chan sendResult, 1)
errors := make(chan []error)
collectResult := func (results <-chan sendResult, errors chan<- []error) {
collectResult := func(results <-chan sendResult, errors chan<- []error) {
var errs []error
for r := range results {
if r.err != nil {
@ -518,7 +532,7 @@ func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr,
go collectResult(results, errors)
wg := new(sync.WaitGroup)
sendPacket := func (addrs <-chan *net.IPAddr, results chan<- sendResult) {
sendPacket := func(addrs <-chan *net.IPAddr, results chan<- sendResult) {
defer wg.Done()
p.debugln("sendICMP(): Invoke sender goroutine")
for addr := range addrs {