1
0
mirror of https://github.com/taigrr/go-fastping synced 2025-01-18 05:03:15 -08:00

Add mutex

This commit is contained in:
Tatsushi Demachi 2014-08-03 20:34:54 +09:00
parent f930d86dae
commit 2fb7a54c20

View File

@ -98,6 +98,7 @@ type Pinger struct {
// key string is IPAddr.String() // key string is IPAddr.String()
addrs map[string]*net.IPAddr addrs map[string]*net.IPAddr
ctx *context ctx *context
mu sync.Mutex
// Number of (nano,milli)seconds of an idle timeout. Once it passed, // Number of (nano,milli)seconds of an idle timeout. Once it passed,
// the library calls an idle callback function. It is also used for an // the library calls an idle callback function. It is also used for an
// interval time of RunLoop() method // interval time of RunLoop() method
@ -127,13 +128,17 @@ func (p *Pinger) AddIP(ipaddr string) error {
if addr == nil { if addr == nil {
return errors.New(fmt.Sprintf("%s is not a valid textual representation of an IP address", ipaddr)) return errors.New(fmt.Sprintf("%s is not a valid textual representation of an IP address", ipaddr))
} }
p.mu.Lock()
p.addrs[addr.String()] = &net.IPAddr{IP: addr} p.addrs[addr.String()] = &net.IPAddr{IP: addr}
p.mu.Unlock()
return nil return nil
} }
// Add an IP address to Pinger. ip arg should be a net.IPAddr pointer. // Add an IP address to Pinger. ip arg should be a net.IPAddr pointer.
func (p *Pinger) AddIPAddr(ip *net.IPAddr) { func (p *Pinger) AddIPAddr(ip *net.IPAddr) {
p.mu.Lock()
p.addrs[ip.String()] = ip p.addrs[ip.String()] = ip
p.mu.Unlock()
} }
// Add event handler to Pinger. event arg should be "receive" or "idle" string. // Add event handler to Pinger. event arg should be "receive" or "idle" string.
@ -155,14 +160,18 @@ func (p *Pinger) AddHandler(event string, handler interface{}) error {
switch event { switch event {
case "receive": case "receive":
if hdl, ok := handler.(func(*net.IPAddr, time.Duration)); ok { if hdl, ok := handler.(func(*net.IPAddr, time.Duration)); ok {
p.mu.Lock()
p.handlers[event] = hdl p.handlers[event] = hdl
p.mu.Unlock()
return nil return nil
} else { } else {
return errors.New(fmt.Sprintf("Receive event handler should be `func(*net.IPAddr, time.Duration)`")) return errors.New(fmt.Sprintf("Receive event handler should be `func(*net.IPAddr, time.Duration)`"))
} }
case "idle": case "idle":
if hdl, ok := handler.(func()); ok { if hdl, ok := handler.(func()); ok {
p.mu.Lock()
p.handlers[event] = hdl p.handlers[event] = hdl
p.mu.Unlock()
return nil return nil
} else { } else {
return errors.New(fmt.Sprintf("Idle event handler should be `func()`")) return errors.New(fmt.Sprintf("Idle event handler should be `func()`"))
@ -178,8 +187,12 @@ func (p *Pinger) AddHandler(event string, handler interface{}) error {
// an error value. It means it blocks until MaxRTT seconds passed. For the // an error value. It means it blocks until MaxRTT seconds passed. For the
// purpose of sending/receiving packets over and over, use RunLoop(). // purpose of sending/receiving packets over and over, use RunLoop().
func (p *Pinger) Run() error { func (p *Pinger) Run() error {
p.mu.Lock()
p.ctx = newContext() p.ctx = newContext()
p.mu.Unlock()
p.run(true) p.run(true)
p.mu.Lock()
defer p.mu.Unlock()
return p.ctx.err return p.ctx.err
} }
@ -207,7 +220,9 @@ func (p *Pinger) Run() error {
// //
// For more details, please see "cmd/ping/ping.go". // For more details, please see "cmd/ping/ping.go".
func (p *Pinger) RunLoop() { func (p *Pinger) RunLoop() {
p.mu.Lock()
p.ctx = newContext() p.ctx = newContext()
p.mu.Unlock()
go p.run(false) go p.run(false)
} }
@ -228,6 +243,8 @@ func (p *Pinger) Stop() {
// Return an error that is set by RunLoop(). It must be called after RunLoop(). // Return an error that is set by RunLoop(). It must be called after RunLoop().
// If not, it causes panic. // If not, it causes panic.
func (p *Pinger) Err() error { func (p *Pinger) Err() error {
p.mu.Lock()
defer p.mu.Unlock()
return p.ctx.err return p.ctx.err
} }
@ -235,7 +252,9 @@ func (p *Pinger) run(once bool) {
p.debugln("Run(): Start") p.debugln("Run(): Start")
conn, err := net.ListenIP("ip4:icmp", &net.IPAddr{IP: net.IPv4zero}) conn, err := net.ListenIP("ip4:icmp", &net.IPAddr{IP: net.IPv4zero})
if err != nil { if err != nil {
p.mu.Lock()
p.ctx.err = err p.ctx.err = err
p.mu.Unlock()
p.debugln("Run(): close(p.ctx.done)") p.debugln("Run(): close(p.ctx.done)")
close(p.ctx.done) close(p.ctx.done)
return return
@ -261,10 +280,15 @@ mainloop:
break mainloop break mainloop
case <-recvCtx.done: case <-recvCtx.done:
p.debugln("Run(): <-recvCtx.done") p.debugln("Run(): <-recvCtx.done")
p.mu.Lock()
err = recvCtx.err err = recvCtx.err
p.mu.Unlock()
break mainloop break mainloop
case <-ticker.C: case <-ticker.C:
if handler, ok := p.handlers["idle"]; ok && handler != nil { p.mu.Lock()
handler, ok := p.handlers["idle"]
p.mu.Unlock()
if ok && handler != nil {
if hdl, ok := handler.(func()); ok { if hdl, ok := handler.(func()); ok {
hdl() hdl()
} }
@ -286,7 +310,11 @@ mainloop:
close(recvCtx.stop) close(recvCtx.stop)
p.debugln("Run(): <-recvCtx.done") p.debugln("Run(): <-recvCtx.done")
<-recvCtx.done <-recvCtx.done
p.mu.Lock()
p.ctx.err = err p.ctx.err = err
p.mu.Unlock()
p.debugln("Run(): close(p.ctx.done)") p.debugln("Run(): close(p.ctx.done)")
close(p.ctx.done) close(p.ctx.done)
p.debugln("Run(): End") p.debugln("Run(): End")
@ -294,11 +322,14 @@ mainloop:
func (p *Pinger) sendICMP4(conn *net.IPConn) (map[string]*net.IPAddr, error) { func (p *Pinger) sendICMP4(conn *net.IPConn) (map[string]*net.IPAddr, error) {
p.debugln("sendICMP4(): Start") p.debugln("sendICMP4(): Start")
p.mu.Lock()
p.id = rand.Intn(0xffff) p.id = rand.Intn(0xffff)
p.seq = rand.Intn(0xffff) p.seq = rand.Intn(0xffff)
p.mu.Unlock()
queue := make(map[string]*net.IPAddr) queue := make(map[string]*net.IPAddr)
var wg sync.WaitGroup var wg sync.WaitGroup
for k, v := range p.addrs { for k, v := range p.addrs {
p.mu.Lock()
bytes, err := (&icmpMessage{ bytes, err := (&icmpMessage{
Type: icmpv4EchoRequest, Code: 0, Type: icmpv4EchoRequest, Code: 0,
Body: &icmpEcho{ Body: &icmpEcho{
@ -306,6 +337,7 @@ func (p *Pinger) sendICMP4(conn *net.IPConn) (map[string]*net.IPAddr, error) {
Data: timeToBytes(time.Now()), Data: timeToBytes(time.Now()),
}, },
}).Marshal() }).Marshal()
p.mu.Unlock()
if err != nil { if err != nil {
wg.Wait() wg.Wait()
return queue, err return queue, err
@ -359,7 +391,9 @@ func (p *Pinger) recvICMP4(conn *net.IPConn, recv chan<- *packet, ctx *context)
continue continue
} else { } else {
p.debugln("recvICMP4(): OpError happen", err) p.debugln("recvICMP4(): OpError happen", err)
p.mu.Lock()
ctx.err = err ctx.err = err
p.mu.Unlock()
close(ctx.done) close(ctx.done)
return return
} }
@ -372,9 +406,12 @@ func (p *Pinger) recvICMP4(conn *net.IPConn, recv chan<- *packet, ctx *context)
func (p *Pinger) procRecv(recv *packet, queue map[string]*net.IPAddr) { func (p *Pinger) procRecv(recv *packet, queue map[string]*net.IPAddr) {
addr := recv.addr.String() addr := recv.addr.String()
p.mu.Lock()
if _, ok := p.addrs[addr]; !ok { if _, ok := p.addrs[addr]; !ok {
p.mu.Unlock()
return return
} }
p.mu.Unlock()
bytes := ipv4Payload(recv.bytes) bytes := ipv4Payload(recv.bytes)
var m *icmpMessage var m *icmpMessage
@ -390,16 +427,21 @@ func (p *Pinger) procRecv(recv *packet, queue map[string]*net.IPAddr) {
var rtt time.Duration var rtt time.Duration
switch pkt := m.Body.(type) { switch pkt := m.Body.(type) {
case *icmpEcho: case *icmpEcho:
p.mu.Lock()
if pkt.ID == p.id && pkt.Seq == p.seq { if pkt.ID == p.id && pkt.Seq == p.seq {
rtt = time.Since(bytesToTime(pkt.Data)) rtt = time.Since(bytesToTime(pkt.Data))
} }
p.mu.Unlock()
default: default:
return return
} }
if _, ok := queue[addr]; ok { if _, ok := queue[addr]; ok {
delete(queue, addr) delete(queue, addr)
if handler, ok := p.handlers["receive"]; ok { p.mu.Lock()
handler, ok := p.handlers["receive"]
p.mu.Unlock()
if ok && handler != nil {
if hdl, ok := handler.(func(*net.IPAddr, time.Duration)); ok { if hdl, ok := handler.(func(*net.IPAddr, time.Duration)); ok {
hdl(recv.addr, rtt) hdl(recv.addr, rtt)
} }
@ -408,12 +450,16 @@ func (p *Pinger) procRecv(recv *packet, queue map[string]*net.IPAddr) {
} }
func (p *Pinger) debugln(args ...interface{}) { func (p *Pinger) debugln(args ...interface{}) {
p.mu.Lock()
defer p.mu.Unlock()
if p.Debug { if p.Debug {
log.Println(args...) log.Println(args...)
} }
} }
func (p *Pinger) debugf(format string, args ...interface{}) { func (p *Pinger) debugf(format string, args ...interface{}) {
p.mu.Lock()
defer p.mu.Unlock()
if p.Debug { if p.Debug {
log.Printf(format, args...) log.Printf(format, args...)
} }