From 42755b085bcd06c6a968d1bacea3b647d970f275 Mon Sep 17 00:00:00 2001 From: Anton Skorokhod Date: Mon, 21 Sep 2015 13:59:13 +0200 Subject: [PATCH] Multi-thread receive of packets --- fastping.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/fastping.go b/fastping.go index 5769939..a860f01 100644 --- a/fastping.go +++ b/fastping.go @@ -107,7 +107,7 @@ func ipv4Payload(b []byte) []byte { } func numGoRoutines() int { - return runtime.GOMAXPROCS(0) * 4 + return runtime.NumCPU() * 4 } type packet struct { @@ -420,18 +420,25 @@ func (p *Pinger) run(once bool) { defer conn6.Close() } - recv := make(chan *packet, 1) + recv := make(chan *packet, len(p.addrs)) recvCtx := newContext() wg := new(sync.WaitGroup) p.debugln("Run(): call recvICMP()") if conn != nil { - wg.Add(1) - go p.recvICMP(conn, recv, recvCtx, wg) + routines := runtime.NumCPU() + wg.Add(routines) + for i := 0; i < routines; i++ { + go p.recvICMP(conn, recv, recvCtx, wg) + } } + if conn6 != nil { - wg.Add(1) - go p.recvICMP(conn6, recv, recvCtx, wg) + routines := runtime.NumCPU() + wg.Add(routines) + for i := 0; i < routines; i++ { + go p.recvICMP(conn6, recv, recvCtx, wg) + } } p.debugln("Run(): call sendICMP()") @@ -476,6 +483,13 @@ mainloop: p.debugln("Run(): wait recvICMP()") wg.Wait() + // process all queued results + close(recv) + for r := range recv { + p.debugln("Run(): <-recv") + p.procRecv(r, queue) + } + p.mu.Lock() p.ctx.err = err p.mu.Unlock() @@ -488,7 +502,7 @@ mainloop: func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr, error) { type sendResult struct { addr *net.IPAddr - err error + err error } p.debugln("sendICMP(): Start") @@ -504,7 +518,7 @@ func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr, results := make(chan sendResult, 1) errors := make(chan []error) - collectResult := func (results <-chan sendResult, errors chan<- []error) { + collectResult := func(results <-chan sendResult, errors chan<- []error) { var errs []error for r := range results { if r.err != nil { @@ -518,7 +532,7 @@ func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr, go collectResult(results, errors) wg := new(sync.WaitGroup) - sendPacket := func (addrs <-chan *net.IPAddr, results chan<- sendResult) { + sendPacket := func(addrs <-chan *net.IPAddr, results chan<- sendResult) { defer wg.Done() p.debugln("sendICMP(): Invoke sender goroutine") for addr := range addrs {