diff --git a/internal/beacon/broadcast.go b/internal/beacon/broadcast.go index d30561e5d..5ab2b7f58 100644 --- a/internal/beacon/broadcast.go +++ b/internal/beacon/broadcast.go @@ -6,31 +6,51 @@ package beacon -import "net" +import ( + "fmt" + "net" + "time" + + "github.com/thejerf/suture" +) type Broadcast struct { - conn *net.UDPConn + *suture.Supervisor port int inbox chan []byte outbox chan recv } -func NewBroadcast(port int) (*Broadcast, error) { - conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: port}) - if err != nil { - return nil, err - } +func NewBroadcast(port int) *Broadcast { b := &Broadcast{ - conn: conn, + Supervisor: suture.New("broadcastBeacon", suture.Spec{ + // Don't retry too frenetically: an error to open a socket or + // whatever is usually something that is either permanent or takes + // a while to get solved... + FailureThreshold: 2, + FailureBackoff: 60 * time.Second, + // Only log restarts in debug mode. + Log: func(line string) { + if debug { + l.Debugln(line) + } + }, + }), port: port, inbox: make(chan []byte), outbox: make(chan recv, 16), } - go genericReader(b.conn, b.outbox) - go b.writer() + b.Add(&broadcastReader{ + port: port, + outbox: b.outbox, + }) + b.Add(&broadcastWriter{ + port: port, + inbox: b.inbox, + }) - return b, nil + return b } func (b *Broadcast) Send(data []byte) { @@ -42,13 +62,37 @@ func (b *Broadcast) Recv() ([]byte, net.Addr) { return recv.data, recv.src } -func (b *Broadcast) writer() { - for bs := range b.inbox { +type broadcastWriter struct { + port int + inbox chan []byte + conn *net.UDPConn + failed bool // Have we already logged a failure reason? +} +func (w *broadcastWriter) Serve() { + if debug { + l.Debugln(w, "starting") + defer l.Debugln(w, "stopping") + } + + var err error + w.conn, err = net.ListenUDP("udp4", nil) + if err != nil { + if !w.failed { + l.Warnln("Local discovery over IPv4 unavailable:", err) + w.failed = true + } + return + } + defer w.conn.Close() + + w.failed = false + + for bs := range w.inbox { addrs, err := net.InterfaceAddrs() if err != nil { if debug { - l.Debugln("Broadcast: interface addresses:", err) + l.Debugln("Local discovery (broadcast writer):", err) } continue } @@ -71,13 +115,27 @@ func (b *Broadcast) writer() { } for _, ip := range dsts { - dst := &net.UDPAddr{IP: ip, Port: b.port} + dst := &net.UDPAddr{IP: ip, Port: w.port} - _, err := b.conn.WriteTo(bs, dst) - if err != nil { + w.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + _, err := w.conn.WriteTo(bs, dst) + if err, ok := err.(net.Error); ok && err.Timeout() { + // Write timeouts should not happen. We treat it as a fatal + // error on the socket. + l.Infoln("Local discovery (broadcast writer):", err) + w.failed = true + return + } else if err, ok := err.(net.Error); ok && err.Temporary() { + // A transient error. Lets hope for better luck in the future. if debug { l.Debugln(err) } + continue + } else if err != nil { + // Some other error that we don't expect. Bail and retry. + l.Infoln("Local discovery (broadcast writer):", err) + w.failed = true + return } else if debug { l.Debugf("sent %d bytes to %s", len(bs), dst) } @@ -85,6 +143,76 @@ func (b *Broadcast) writer() { } } +func (w *broadcastWriter) Stop() { + w.conn.Close() +} + +func (w *broadcastWriter) String() string { + return fmt.Sprintf("broadcastWriter@%p", w) +} + +type broadcastReader struct { + port int + outbox chan recv + conn *net.UDPConn + failed bool +} + +func (r *broadcastReader) Serve() { + if debug { + l.Debugln(r, "starting") + defer l.Debugln(r, "stopping") + } + + var err error + r.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: r.port}) + if err != nil { + if !r.failed { + l.Warnln("Local discovery over IPv4 unavailable:", err) + r.failed = true + } + return + } + defer r.conn.Close() + + bs := make([]byte, 65536) + for { + n, addr, err := r.conn.ReadFrom(bs) + if err != nil { + if !r.failed { + l.Infoln("Local discovery (broadcast reader):", err) + r.failed = true + } + return + } + + r.failed = false + + if debug { + l.Debugf("recv %d bytes from %s", n, addr) + } + + c := make([]byte, n) + copy(c, bs) + select { + case r.outbox <- recv{c, addr}: + default: + if debug { + l.Debugln("dropping message") + } + } + } + +} + +func (r *broadcastReader) Stop() { + r.conn.Close() +} + +func (r *broadcastReader) String() string { + return fmt.Sprintf("broadcastReader@%p", r) +} + func bcast(ip *net.IPNet) *net.IPNet { var bc = &net.IPNet{} bc.IP = make([]byte, len(ip.IP)) diff --git a/internal/discover/discover.go b/internal/discover/discover.go index dac472e1e..30d265da7 100644 --- a/internal/discover/discover.go +++ b/internal/discover/discover.go @@ -86,17 +86,10 @@ func (d *Discoverer) StartLocal(localPort int, localMCAddr string) { } func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) { - bb, err := beacon.NewBroadcast(localPort) - if err != nil { - if debug { - l.Debugln("discover: Start local v4:", err) - } - l.Infoln("Local discovery over IPv4 unavailable") - return - } - + bb := beacon.NewBroadcast(localPort) d.beacons = append(d.beacons, bb) go d.recvAnnouncements(bb) + bb.ServeBackground() } func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {