diff --git a/fastping.go b/fastping.go index 349ec99..5769939 100644 --- a/fastping.go +++ b/fastping.go @@ -486,60 +486,86 @@ mainloop: } func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr, error) { + type sendResult struct { + addr *net.IPAddr + err error + } + p.debugln("sendICMP(): Start") + p.mu.Lock() p.id = rand.Intn(0xffff) p.seq = rand.Intn(0xffff) p.mu.Unlock() + queue := make(map[string]*net.IPAddr) + + addrs := make(chan *net.IPAddr) + results := make(chan sendResult, 1) + errors := make(chan []error) + + collectResult := func (results <-chan sendResult, errors chan<- []error) { + var errs []error + for r := range results { + if r.err != nil { + errs = append(errs, r.err) + } else { + queue[r.addr.String()] = r.addr + } + } + errors <- errs + } + go collectResult(results, errors) + wg := new(sync.WaitGroup) - for key, addr := range p.addrs { - var typ icmp.Type - var cn *icmp.PacketConn - if isIPv4(addr.IP) { - typ = ipv4.ICMPTypeEcho - cn = conn - } else if isIPv6(addr.IP) { - typ = ipv6.ICMPTypeEchoRequest - cn = conn6 - } else { - continue - } - if cn == nil { - continue - } + sendPacket := func (addrs <-chan *net.IPAddr, results chan<- sendResult) { + defer wg.Done() + p.debugln("sendICMP(): Invoke sender goroutine") + for addr := range addrs { + var typ icmp.Type + var cn *icmp.PacketConn + if isIPv4(addr.IP) { + typ = ipv4.ICMPTypeEcho + cn = conn + } else if isIPv6(addr.IP) { + typ = ipv6.ICMPTypeEchoRequest + cn = conn6 + } else { + continue + } + if cn == nil { + continue + } - t := timeToBytes(time.Now()) + t := timeToBytes(time.Now()) - if p.Size-TimeSliceLength != 0 { - t = append(t, byteSliceOfSize(p.Size-TimeSliceLength)...) - } + if p.Size-TimeSliceLength != 0 { + t = append(t, byteSliceOfSize(p.Size-TimeSliceLength)...) + } - p.mu.Lock() - bytes, err := (&icmp.Message{ - Type: typ, Code: 0, - Body: &icmp.Echo{ - ID: p.id, Seq: p.seq, - Data: t, - }, - }).Marshal(nil) - p.mu.Unlock() - if err != nil { - wg.Wait() - return queue, err - } + p.mu.Lock() + bytes, err := (&icmp.Message{ + Type: typ, Code: 0, + Body: &icmp.Echo{ + ID: p.id, Seq: p.seq, + Data: t, + }, + }).Marshal(nil) + p.mu.Unlock() + if err != nil { + p.debugln("sendICMP(): End sender goroutine with error") + results <- sendResult{addr: nil, err: err} + return + } - queue[key] = addr - var dst net.Addr = addr - if p.network == "udp" { - dst = &net.UDPAddr{IP: addr.IP, Zone: addr.Zone} - } + var dst net.Addr = addr + if p.network == "udp" { + dst = &net.UDPAddr{IP: addr.IP, Zone: addr.Zone} + } - p.debugln("sendICMP(): Invoke goroutine") - wg.Add(1) - go func(conn *icmp.PacketConn, ra net.Addr, b []byte) { + p.debugln("sendICMP(): WriteTo Start") for { - if _, err := conn.WriteTo(bytes, ra); err != nil { + if _, err := cn.WriteTo(bytes, dst); err != nil { if neterr, ok := err.(*net.OpError); ok { if neterr.Err == syscall.ENOBUFS { continue @@ -549,11 +575,30 @@ func (p *Pinger) sendICMP(conn, conn6 *icmp.PacketConn) (map[string]*net.IPAddr, break } p.debugln("sendICMP(): WriteTo End") - wg.Done() - }(cn, dst, bytes) + results <- sendResult{addr: addr, err: nil} + } + p.debugln("sendICMP(): End sender goroutine") } + + routines := numGoRoutines() + wg.Add(routines) + for i := 0; i < routines; i++ { + go sendPacket(addrs, results) + } + + for _, addr := range p.addrs { + addrs <- addr + } + + close(addrs) wg.Wait() + close(results) + errs := <-errors + p.debugln("sendICMP(): End") + if len(errs) > 0 { + return queue, errs[0] + } return queue, nil }