From 7dc290c3edebdc6c3b1f0d9c5a1153290fa85e50 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Mon, 11 May 2020 14:02:22 +0100 Subject: [PATCH] lib/connections: React to listeners going up and down faster (#6590) --- lib/connections/quic_listen.go | 2 ++ lib/connections/relay_listen.go | 4 +++- lib/connections/service.go | 8 ++++---- lib/connections/structs.go | 26 +++++++++++++++++++++++--- lib/connections/tcp_listen.go | 2 ++ lib/discover/global.go | 32 +++++++++++++++++++++++--------- 6 files changed, 57 insertions(+), 17 deletions(-) diff --git a/lib/connections/quic_listen.go b/lib/connections/quic_listen.go index ac1b4437e..1ae970134 100644 --- a/lib/connections/quic_listen.go +++ b/lib/connections/quic_listen.go @@ -102,7 +102,9 @@ func (t *quicListener) serve(ctx context.Context) error { l.Infoln("Listen (BEP/quic):", err) return err } + t.notifyAddressesChanged(t) defer listener.Close() + defer t.clearAddresses(t) l.Infof("QUIC listener (%v) starting", packetConn.LocalAddr()) defer l.Infof("QUIC listener (%v) shutting down", packetConn.LocalAddr()) diff --git a/lib/connections/relay_listen.go b/lib/connections/relay_listen.go index c66b344dd..2ffb61cbe 100644 --- a/lib/connections/relay_listen.go +++ b/lib/connections/relay_listen.go @@ -57,10 +57,12 @@ func (t *relayListener) serve(ctx context.Context) error { defer clnt.Stop() t.mut.Unlock() - oldURI := clnt.URI() + // Start with nil, so that we send a addresses changed notification as soon as we connect somewhere. + var oldURI *url.URL l.Infof("Relay listener (%v) starting", t) defer l.Infof("Relay listener (%v) shutting down", t) + defer t.clearAddresses(t) for { select { diff --git a/lib/connections/service.go b/lib/connections/service.go index e24cdbdb3..980c468d8 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -565,11 +565,11 @@ func (s *service) createListener(factory listenerFactory, uri *url.URL) bool { return true } -func (s *service) logListenAddressesChangedEvent(l genericListener) { +func (s *service) logListenAddressesChangedEvent(l ListenerAddresses) { s.evLogger.Log(events.ListenAddressesChanged, map[string]interface{}{ - "address": l.URI(), - "lan": l.LANAddresses(), - "wan": l.WANAddresses(), + "address": l.URI, + "lan": l.LANAddresses, + "wan": l.WANAddresses, }) } diff --git a/lib/connections/structs.go b/lib/connections/structs.go index d85adde44..4bfe7e05e 100644 --- a/lib/connections/structs.go +++ b/lib/connections/structs.go @@ -174,6 +174,12 @@ type listenerFactory interface { Valid(config.Configuration) error } +type ListenerAddresses struct { + URI *url.URL + WANAddresses []*url.URL + LANAddresses []*url.URL +} + type genericListener interface { Serve() Stop() @@ -188,7 +194,7 @@ type genericListener interface { WANAddresses() []*url.URL LANAddresses() []*url.URL Error() error - OnAddressesChanged(func(genericListener)) + OnAddressesChanged(func(ListenerAddresses)) String() string Factory() listenerFactory NATType() string @@ -203,14 +209,28 @@ type Model interface { } type onAddressesChangedNotifier struct { - callbacks []func(genericListener) + callbacks []func(ListenerAddresses) } -func (o *onAddressesChangedNotifier) OnAddressesChanged(callback func(genericListener)) { +func (o *onAddressesChangedNotifier) OnAddressesChanged(callback func(ListenerAddresses)) { o.callbacks = append(o.callbacks, callback) } func (o *onAddressesChangedNotifier) notifyAddressesChanged(l genericListener) { + o.notifyAddresses(ListenerAddresses{ + URI: l.URI(), + WANAddresses: l.WANAddresses(), + LANAddresses: l.LANAddresses(), + }) +} + +func (o *onAddressesChangedNotifier) clearAddresses(l genericListener) { + o.notifyAddresses(ListenerAddresses{ + URI: l.URI(), + }) +} + +func (o *onAddressesChangedNotifier) notifyAddresses(l ListenerAddresses) { for _, callback := range o.callbacks { callback(l) } diff --git a/lib/connections/tcp_listen.go b/lib/connections/tcp_listen.go index 351eaa908..278407bb3 100644 --- a/lib/connections/tcp_listen.go +++ b/lib/connections/tcp_listen.go @@ -55,7 +55,9 @@ func (t *tcpListener) serve(ctx context.Context) error { l.Infoln("Listen (BEP/tcp):", err) return err } + t.notifyAddressesChanged(t) defer listener.Close() + defer t.clearAddresses(t) l.Infof("TCP listener (%v) starting", listener.Addr()) defer l.Infof("TCP listener (%v) shutting down", listener.Addr()) diff --git a/lib/discover/global.go b/lib/discover/global.go index cc58a2b72..2bb810c6a 100644 --- a/lib/discover/global.go +++ b/lib/discover/global.go @@ -46,9 +46,10 @@ type httpClient interface { } const ( - defaultReannounceInterval = 30 * time.Minute - announceErrorRetryInterval = 5 * time.Minute - requestTimeout = 5 * time.Second + defaultReannounceInterval = 30 * time.Minute + announceErrorRetryInterval = 5 * time.Minute + requestTimeout = 5 * time.Second + maxAddressChangesBetweenAnnouncements = 10 ) type announcement struct { @@ -197,20 +198,33 @@ func (c *globalClient) serve(ctx context.Context) { return } - timer := time.NewTimer(0) + timer := time.NewTimer(5 * time.Second) defer timer.Stop() eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged) defer eventSub.Unsubscribe() + timerResetCount := 0 + for { select { case <-eventSub.C(): - // Defer announcement by 2 seconds, essentially debouncing - // if we have a stream of events incoming in quick succession. - timer.Reset(2 * time.Second) - + if timerResetCount < maxAddressChangesBetweenAnnouncements { + // Defer announcement by 2 seconds, essentially debouncing + // if we have a stream of events incoming in quick succession. + timer.Reset(2 * time.Second) + } else if timerResetCount == maxAddressChangesBetweenAnnouncements { + // Yet only do it if we haven't had to reset maxAddressChangesBetweenAnnouncements times in a row, + // so if something is flip-flopping within 2 seconds, we don't end up in a permanent reset loop. + l.Warnf("Detected a flip-flopping listener") + c.setError(errors.New("flip flopping listener")) + // Incrementing the count above 10 will prevent us from warning or setting the error again + // It will also suppress event based resets until we've had a proper round after announceErrorRetryInterval + timer.Reset(announceErrorRetryInterval) + } + timerResetCount++ case <-timer.C: + timerResetCount = 0 c.sendAnnouncement(ctx, timer) case <-ctx.Done(): @@ -237,7 +251,7 @@ func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer) // The marshal doesn't fail, I promise. postData, _ := json.Marshal(ann) - l.Debugf("Announcement: %s", postData) + l.Debugf("Announcement: %v", ann) resp, err := c.announceClient.Post(ctx, c.server, "application/json", bytes.NewReader(postData)) if err != nil {