Fix local announce (IPv6 multicast, include all listen addresses)

This commit is contained in:
Jakob Borg 2014-03-18 17:51:55 +01:00
parent 58fd379e35
commit 8db1bf9732
5 changed files with 125 additions and 120 deletions

View File

@ -14,7 +14,6 @@ import (
"path" "path"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strconv"
"strings" "strings"
"time" "time"
@ -263,7 +262,7 @@ func main() {
if verbose { if verbose {
infoln("Attempting to connect to other nodes") infoln("Attempting to connect to other nodes")
} }
disc := discovery(cfg.Options.ListenAddress[0]) disc := discovery()
go connect(myID, disc, m, tlsCfg, connOpts) go connect(myID, disc, m, tlsCfg, connOpts)
// Routine to pull blocks from other nodes to synchronize the local // Routine to pull blocks from other nodes to synchronize the local
@ -452,24 +451,20 @@ listen:
} }
} }
func discovery(addr string) *discover.Discoverer { func discovery() *discover.Discoverer {
_, portstr, err := net.SplitHostPort(addr)
fatalErr(err)
port, _ := strconv.Atoi(portstr)
if !cfg.Options.LocalAnnEnabled { if !cfg.Options.LocalAnnEnabled {
port = -1 return nil
} else if verbose {
infoln("Sending local discovery announcements")
} }
infoln("Sending local discovery announcements")
if !cfg.Options.GlobalAnnEnabled { if !cfg.Options.GlobalAnnEnabled {
cfg.Options.GlobalAnnServer = "" cfg.Options.GlobalAnnServer = ""
} else if verbose { } else if verbose {
infoln("Sending external discovery announcements") infoln("Sending external discovery announcements")
} }
disc, err := discover.NewDiscoverer(myID, port, cfg.Options.GlobalAnnServer) disc, err := discover.NewDiscoverer(myID, cfg.Options.ListenAddress, cfg.Options.GlobalAnnServer)
if err != nil { if err != nil {
warnf("No discovery possible (%v)", err) warnf("No discovery possible (%v)", err)

View File

@ -9,6 +9,7 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"code.google.com/p/go.net/ipv6"
"github.com/calmh/syncthing/buffers" "github.com/calmh/syncthing/buffers"
) )
@ -19,14 +20,16 @@ const (
type Discoverer struct { type Discoverer struct {
MyID string MyID string
ListenPort int ListenAddresses []string
BroadcastIntv time.Duration BroadcastIntv time.Duration
ExtBroadcastIntv time.Duration ExtBroadcastIntv time.Duration
conn *net.UDPConn conn *ipv6.PacketConn
intfs []*net.Interface
registry map[string][]string registry map[string][]string
registryLock sync.RWMutex registryLock sync.RWMutex
extServer string extServer string
group *net.UDPAddr
localBroadcastTick <-chan time.Time localBroadcastTick <-chan time.Time
forcedBroadcastTick chan time.Time forcedBroadcastTick chan time.Time
@ -41,113 +44,114 @@ var (
// When we hit this many errors in succession, we stop. // When we hit this many errors in succession, we stop.
const maxErrors = 30 const maxErrors = 30
func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) { func NewDiscoverer(id string, addresses []string, extServer string) (*Discoverer, error) {
local := &net.UDPAddr{IP: nil, Port: AnnouncementPort} disc := &Discoverer{
conn, err := net.ListenUDP("udp", local) MyID: id,
ListenAddresses: addresses,
BroadcastIntv: 30 * time.Second,
ExtBroadcastIntv: 1800 * time.Second,
registry: make(map[string][]string),
extServer: extServer,
group: &net.UDPAddr{IP: net.ParseIP("ff02::2012:1025"), Port: AnnouncementPort},
}
// Listen on a multicast socket. This enables sharing the socket, i.e.
// other instances of syncting on the same box can listen on the same
// group/port.
conn, err := net.ListenPacket("udp6", fmt.Sprintf("[ff02::]:%d", AnnouncementPort))
if err != nil { if err != nil {
return nil, err return nil, err
} }
disc.conn = ipv6.NewPacketConn(conn)
disc := &Discoverer{ // Join the multicast group on as many interfaces as possible. Remember
MyID: id, // which those were.
ListenPort: port,
BroadcastIntv: 30 * time.Second,
ExtBroadcastIntv: 1800 * time.Second,
conn: conn, intfs, err := net.Interfaces()
registry: make(map[string][]string), if err != nil {
extServer: extServer, log.Printf("discover/interfaces: %v; no local announcements", err)
conn.Close()
return nil, err
} }
for _, intf := range intfs {
intf := intf
addrs, err := intf.Addrs()
if err == nil && len(addrs) > 0 && intf.Flags&net.FlagMulticast != 0 && intf.Flags&net.FlagUp != 0 {
if err := disc.conn.JoinGroup(&intf, disc.group); err != nil {
if debug {
dlog.Printf("%v; not joining on %s", err, intf.Name)
}
} else {
disc.intfs = append(disc.intfs, &intf)
}
}
}
// Receive announcements sent to the local multicast group.
go disc.recvAnnouncements() go disc.recvAnnouncements()
if disc.ListenPort > 0 { // If we got a list of addresses that we listen on, announce those
// locally.
if len(disc.ListenAddresses) > 0 {
disc.localBroadcastTick = time.Tick(disc.BroadcastIntv) disc.localBroadcastTick = time.Tick(disc.BroadcastIntv)
disc.forcedBroadcastTick = make(chan time.Time) disc.forcedBroadcastTick = make(chan time.Time)
go disc.sendAnnouncements() go disc.sendLocalAnnouncements()
}
// If we have an external server address, also announce to that
// server.
if len(disc.extServer) > 0 { if len(disc.extServer) > 0 {
go disc.sendExtAnnouncements() go disc.sendExternalAnnouncements()
}
} }
return disc, nil return disc, nil
} }
func (d *Discoverer) sendAnnouncements() { func (d *Discoverer) announcementPkt() []byte {
var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} var addrs []Address
var buf = pkt.MarshalXDR() for _, astr := range d.ListenAddresses {
addr, err := net.ResolveTCPAddr("tcp", astr)
if err != nil {
log.Printf("discover/announcement: %v: not announcing %s", err, astr)
continue
} else if debug {
dlog.Printf("announcing %s: %#v", astr, addr)
}
if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
addrs = append(addrs, Address{Port: uint16(addr.Port)})
} else if bs := addr.IP.To4(); bs != nil {
addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
} else if bs := addr.IP.To16(); bs != nil {
addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
}
}
var pkt = AnnounceV2{
Magic: AnnouncementMagicV2,
NodeID: d.MyID,
Addresses: addrs,
}
return pkt.MarshalXDR()
}
func (d *Discoverer) sendLocalAnnouncements() {
var buf = d.announcementPkt()
var errCounter = 0 var errCounter = 0
var err error var err error
remote := &net.UDPAddr{ wcm := ipv6.ControlMessage{HopLimit: 1}
IP: net.IP{255, 255, 255, 255},
Port: AnnouncementPort,
}
for errCounter < maxErrors { for errCounter < maxErrors {
intfs, err := net.Interfaces() for _, intf := range d.intfs {
if err != nil { wcm.IfIndex = intf.Index
log.Printf("discover/listInterfaces: %v; no local announcements", err) if _, err = d.conn.WriteTo(buf, &wcm, d.group); err != nil {
return log.Printf("discover/sendLocalAnnouncements: on %s: %v; no local announcement", intf.Name, err)
}
for _, intf := range intfs {
if intf.Flags&(net.FlagBroadcast|net.FlagLoopback) == net.FlagBroadcast {
addrs, err := intf.Addrs()
if err != nil {
log.Println("discover/listAddrs: warning:", err)
errCounter++ errCounter++
continue continue
} } else {
var srcAddr string
for _, addr := range addrs {
if strings.Contains(addr.String(), ".") {
// Found an IPv4 adress
parts := strings.Split(addr.String(), "/")
srcAddr = parts[0]
break
}
}
if len(srcAddr) == 0 {
if debug {
dlog.Println("no source address found on interface", intf.Name)
}
continue
}
iaddr, err := net.ResolveUDPAddr("udp4", srcAddr+":0")
if err != nil {
log.Println("discover/resolve: warning:", err)
errCounter++
continue
}
conn, err := net.ListenUDP("udp4", iaddr)
if err != nil {
log.Println("discover/listen: warning:", err)
errCounter++
continue
}
if debug {
dlog.Println("send announcement from", conn.LocalAddr(), "to", remote, "on", intf.Name)
}
_, err = conn.WriteTo(buf, remote)
if err != nil {
// Some interfaces don't seem to support broadcast even though the flags claims they do, i.e. vmnet
conn.Close()
if debug {
log.Println(err)
}
errCounter++
continue
}
conn.Close()
errCounter = 0 errCounter = 0
} }
} }
@ -157,25 +161,29 @@ func (d *Discoverer) sendAnnouncements() {
case <-d.forcedBroadcastTick: case <-d.forcedBroadcastTick:
} }
} }
log.Println("discover/write: local: stopping due to too many errors:", err)
} }
func (d *Discoverer) sendExtAnnouncements() { func (d *Discoverer) sendExternalAnnouncements() {
remote, err := net.ResolveUDPAddr("udp", d.extServer) remote, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil { if err != nil {
log.Printf("discover/external: %v; no external announcements", err) log.Printf("discover/external: %v; no external announcements", err)
return return
} }
var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} conn, err := net.ListenUDP("udp", nil)
var buf = pkt.MarshalXDR() if err != nil {
log.Printf("discover/external: %v; no external announcements", err)
return
}
var buf = d.announcementPkt()
var errCounter = 0 var errCounter = 0
for errCounter < maxErrors { for errCounter < maxErrors {
if debug { if debug {
dlog.Println("send announcement -> ", remote) dlog.Println("send announcement -> ", remote)
} }
_, err = d.conn.WriteTo(buf, remote) _, err = conn.WriteTo(buf, remote)
if err != nil { if err != nil {
log.Println("discover/write: warning:", err) log.Println("discover/write: warning:", err)
errCounter++ errCounter++
@ -192,7 +200,7 @@ func (d *Discoverer) recvAnnouncements() {
var errCounter = 0 var errCounter = 0
var err error var err error
for errCounter < maxErrors { for errCounter < maxErrors {
n, addr, err := d.conn.ReadFromUDP(buf) n, _, addr, err := d.conn.ReadFrom(buf)
if err != nil { if err != nil {
errCounter++ errCounter++
time.Sleep(time.Second) time.Sleep(time.Second)
@ -224,7 +232,9 @@ func (d *Discoverer) recvAnnouncements() {
if len(a.IP) > 0 { if len(a.IP) > 0 {
nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port) nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port)
} else { } else {
nodeAddr = fmt.Sprintf("%s:%d", addr.IP.String(), a.Port) ua := addr.(*net.UDPAddr)
ua.Port = int(a.Port)
nodeAddr = ua.String()
} }
addrs = append(addrs, nodeAddr) addrs = append(addrs, nodeAddr)
} }

View File

@ -1,13 +1,13 @@
<configuration version="1"> <configuration version="1">
<repository directory="s1"> <repository directory="s1">
<node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"> <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1">
<address>127.0.0.1:22001</address> <address>dynamic</address>
</node> </node>
<node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"> <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2">
<address>127.0.0.1:22002</address> <address>dynamic</address>
</node> </node>
<node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"> <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3">
<address>127.0.0.1:22003</address> <address>dynamic</address>
</node> </node>
</repository> </repository>
<options> <options>
@ -19,7 +19,7 @@
<guiAddress>127.0.0.1:8081</guiAddress> <guiAddress>127.0.0.1:8081</guiAddress>
<globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer> <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer>
<globalAnnounceEnabled>false</globalAnnounceEnabled> <globalAnnounceEnabled>false</globalAnnounceEnabled>
<localAnnounceEnabled>false</localAnnounceEnabled> <localAnnounceEnabled>true</localAnnounceEnabled>
<parallelRequests>16</parallelRequests> <parallelRequests>16</parallelRequests>
<maxSendKbps>0</maxSendKbps> <maxSendKbps>0</maxSendKbps>
<rescanIntervalS>60</rescanIntervalS> <rescanIntervalS>60</rescanIntervalS>

View File

@ -1,17 +1,17 @@
<configuration version="1"> <configuration version="1">
<repository directory="s2"> <repository directory="s2">
<node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"> <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1">
<address>127.0.0.1:22001</address> <address>dynamic</address>
</node> </node>
<node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"> <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2">
<address>127.0.0.1:22002</address> <address>dynamic</address>
</node> </node>
<node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"> <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3">
<address>127.0.0.1:22003</address> <address>dynamic</address>
</node> </node>
</repository> </repository>
<options> <options>
<listenAddress>127.0.0.2:22002</listenAddress> <listenAddress>127.0.0.1:22002</listenAddress>
<readOnly>false</readOnly> <readOnly>false</readOnly>
<allowDelete>true</allowDelete> <allowDelete>true</allowDelete>
<followSymlinks>true</followSymlinks> <followSymlinks>true</followSymlinks>
@ -19,7 +19,7 @@
<guiAddress>127.0.0.1:8082</guiAddress> <guiAddress>127.0.0.1:8082</guiAddress>
<globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer> <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer>
<globalAnnounceEnabled>false</globalAnnounceEnabled> <globalAnnounceEnabled>false</globalAnnounceEnabled>
<localAnnounceEnabled>false</localAnnounceEnabled> <localAnnounceEnabled>true</localAnnounceEnabled>
<parallelRequests>16</parallelRequests> <parallelRequests>16</parallelRequests>
<maxSendKbps>0</maxSendKbps> <maxSendKbps>0</maxSendKbps>
<rescanIntervalS>60</rescanIntervalS> <rescanIntervalS>60</rescanIntervalS>

View File

@ -1,13 +1,13 @@
<configuration version="1"> <configuration version="1">
<repository directory="s3"> <repository directory="s3">
<node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"> <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1">
<address>127.0.0.1:22001</address> <address>dynamic</address>
</node> </node>
<node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"> <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2">
<address>127.0.0.1:22002</address> <address>dynamic</address>
</node> </node>
<node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"> <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3">
<address>127.0.0.1:22003</address> <address>dynamic</address>
</node> </node>
</repository> </repository>
<options> <options>
@ -19,7 +19,7 @@
<guiAddress>127.0.0.1:8083</guiAddress> <guiAddress>127.0.0.1:8083</guiAddress>
<globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer> <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer>
<globalAnnounceEnabled>false</globalAnnounceEnabled> <globalAnnounceEnabled>false</globalAnnounceEnabled>
<localAnnounceEnabled>false</localAnnounceEnabled> <localAnnounceEnabled>true</localAnnounceEnabled>
<parallelRequests>16</parallelRequests> <parallelRequests>16</parallelRequests>
<maxSendKbps>0</maxSendKbps> <maxSendKbps>0</maxSendKbps>
<rescanIntervalS>60</rescanIntervalS> <rescanIntervalS>60</rescanIntervalS>