mirror of
https://github.com/syncthing/syncthing.git
synced 2024-11-16 10:28:49 -07:00
all: Send Close BEP msg on intentional disconnect (#5440)
This avoids waiting until next ping and timeout until the connection is actually closed both by notifying the peer of the disconnect and by immediately closing the local end of the connection after that. As a nice side effect, info level logging about dropped connections now have the actual reason in it, not a generic timeout error which looks like a real problem with the connection.
This commit is contained in:
parent
d924bd7bd9
commit
24ffd8be99
@ -9,7 +9,6 @@ package connections
|
|||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
@ -23,7 +22,6 @@ import (
|
|||||||
// that can be closed and has some metadata.
|
// that can be closed and has some metadata.
|
||||||
type Connection interface {
|
type Connection interface {
|
||||||
protocol.Connection
|
protocol.Connection
|
||||||
io.Closer
|
|
||||||
Type() string
|
Type() string
|
||||||
Transport() string
|
Transport() string
|
||||||
RemoteAddr() net.Addr
|
RemoteAddr() net.Addr
|
||||||
@ -39,6 +37,11 @@ type completeConn struct {
|
|||||||
protocol.Connection
|
protocol.Connection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c completeConn) Close(err error) {
|
||||||
|
c.Connection.Close(err)
|
||||||
|
c.internalConn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// internalConn is the raw TLS connection plus some metadata on where it
|
// internalConn is the raw TLS connection plus some metadata on where it
|
||||||
// came from (type, priority).
|
// came from (type, priority).
|
||||||
type internalConn struct {
|
type internalConn struct {
|
||||||
@ -82,6 +85,14 @@ func (t connType) Transport() string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c internalConn) Close() {
|
||||||
|
// *tls.Conn.Close() does more than it says on the tin. Specifically, it
|
||||||
|
// sends a TLS alert message, which might block forever if the
|
||||||
|
// connection is dead and we don't have a deadline set.
|
||||||
|
c.SetWriteDeadline(time.Now().Add(250 * time.Millisecond))
|
||||||
|
c.Conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (c internalConn) Type() string {
|
func (c internalConn) Type() string {
|
||||||
return c.connType.String()
|
return c.connType.String()
|
||||||
}
|
}
|
||||||
|
@ -8,11 +8,9 @@ package model
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/tls"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -128,6 +126,9 @@ var (
|
|||||||
errFolderNotRunning = errors.New("folder is not running")
|
errFolderNotRunning = errors.New("folder is not running")
|
||||||
errFolderMissing = errors.New("no such folder")
|
errFolderMissing = errors.New("no such folder")
|
||||||
errNetworkNotAllowed = errors.New("network not allowed")
|
errNetworkNotAllowed = errors.New("network not allowed")
|
||||||
|
// errors about why a connection is closed
|
||||||
|
errIgnoredFolderRemoved = errors.New("folder no longer ignored")
|
||||||
|
errReplacingConnection = errors.New("replacing connection")
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewModel creates and starts a new model. The model starts in read-only mode,
|
// NewModel creates and starts a new model. The model starts in read-only mode,
|
||||||
@ -226,7 +227,7 @@ func (m *Model) startFolderLocked(folder string) config.FolderType {
|
|||||||
|
|
||||||
// Close connections to affected devices
|
// Close connections to affected devices
|
||||||
for _, id := range cfg.DeviceIDs() {
|
for _, id := range cfg.DeviceIDs() {
|
||||||
m.closeLocked(id)
|
m.closeLocked(id, fmt.Errorf("started folder %v", cfg.Description()))
|
||||||
}
|
}
|
||||||
|
|
||||||
v, ok := fs.Sequence(protocol.LocalDeviceID), true
|
v, ok := fs.Sequence(protocol.LocalDeviceID), true
|
||||||
@ -339,7 +340,7 @@ func (m *Model) RemoveFolder(cfg config.FolderConfiguration) {
|
|||||||
// Delete syncthing specific files
|
// Delete syncthing specific files
|
||||||
cfg.Filesystem().RemoveAll(config.DefaultMarkerName)
|
cfg.Filesystem().RemoveAll(config.DefaultMarkerName)
|
||||||
|
|
||||||
m.tearDownFolderLocked(cfg)
|
m.tearDownFolderLocked(cfg, fmt.Errorf("removing folder %v", cfg.Description()))
|
||||||
// Remove it from the database
|
// Remove it from the database
|
||||||
db.DropFolder(m.db, cfg.ID)
|
db.DropFolder(m.db, cfg.ID)
|
||||||
|
|
||||||
@ -347,12 +348,12 @@ func (m *Model) RemoveFolder(cfg config.FolderConfiguration) {
|
|||||||
m.fmut.Unlock()
|
m.fmut.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) {
|
func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration, err error) {
|
||||||
// Close connections to affected devices
|
// Close connections to affected devices
|
||||||
// Must happen before stopping the folder service to abort ongoing
|
// Must happen before stopping the folder service to abort ongoing
|
||||||
// transmissions and thus allow timely service termination.
|
// transmissions and thus allow timely service termination.
|
||||||
for _, dev := range cfg.Devices {
|
for _, dev := range cfg.Devices {
|
||||||
m.closeLocked(dev.DeviceID)
|
m.closeLocked(dev.DeviceID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the services running for this folder and wait for them to finish
|
// Stop the services running for this folder and wait for them to finish
|
||||||
@ -398,14 +399,26 @@ func (m *Model) RestartFolder(from, to config.FolderConfiguration) {
|
|||||||
defer m.fmut.Unlock()
|
defer m.fmut.Unlock()
|
||||||
defer m.pmut.Unlock()
|
defer m.pmut.Unlock()
|
||||||
|
|
||||||
m.tearDownFolderLocked(from)
|
var infoMsg string
|
||||||
if to.Paused {
|
var errMsg string
|
||||||
l.Infoln("Paused folder", to.Description())
|
switch {
|
||||||
} else {
|
case to.Paused:
|
||||||
m.addFolderLocked(to)
|
infoMsg = "Paused"
|
||||||
folderType := m.startFolderLocked(to.ID)
|
errMsg = "pausing"
|
||||||
l.Infoln("Restarted folder", to.Description(), fmt.Sprintf("(%s)", folderType))
|
case from.Paused:
|
||||||
|
infoMsg = "Unpaused"
|
||||||
|
errMsg = "unpausing"
|
||||||
|
default:
|
||||||
|
infoMsg = "Restarted"
|
||||||
|
errMsg = "restarting"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.tearDownFolderLocked(from, fmt.Errorf("%v folder %v", errMsg, to.Description()))
|
||||||
|
if !to.Paused {
|
||||||
|
m.addFolderLocked(to)
|
||||||
|
m.startFolderLocked(to.ID)
|
||||||
|
}
|
||||||
|
l.Infof("%v folder %v (%v)", infoMsg, to.Description(), to.Type)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) UsageReportingStats(version int, preview bool) map[string]interface{} {
|
func (m *Model) UsageReportingStats(version int, preview bool) map[string]interface{} {
|
||||||
@ -1342,21 +1355,21 @@ func (m *Model) Closed(conn protocol.Connection, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// close will close the underlying connection for a given device
|
// close will close the underlying connection for a given device
|
||||||
func (m *Model) close(device protocol.DeviceID) {
|
func (m *Model) close(device protocol.DeviceID, err error) {
|
||||||
m.pmut.Lock()
|
m.pmut.Lock()
|
||||||
m.closeLocked(device)
|
m.closeLocked(device, err)
|
||||||
m.pmut.Unlock()
|
m.pmut.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeLocked will close the underlying connection for a given device
|
// closeLocked will close the underlying connection for a given device
|
||||||
func (m *Model) closeLocked(device protocol.DeviceID) {
|
func (m *Model) closeLocked(device protocol.DeviceID, err error) {
|
||||||
conn, ok := m.conn[device]
|
conn, ok := m.conn[device]
|
||||||
if !ok {
|
if !ok {
|
||||||
// There is no connection to close
|
// There is no connection to close
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRawConn(conn)
|
conn.Close(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements protocol.RequestResponse
|
// Implements protocol.RequestResponse
|
||||||
@ -1719,7 +1732,7 @@ func (m *Model) AddConnection(conn connections.Connection, hello protocol.HelloR
|
|||||||
// back into Closed() for the cleanup.
|
// back into Closed() for the cleanup.
|
||||||
closed := m.closed[deviceID]
|
closed := m.closed[deviceID]
|
||||||
m.pmut.Unlock()
|
m.pmut.Unlock()
|
||||||
closeRawConn(oldConn)
|
oldConn.Close(errReplacingConnection)
|
||||||
<-closed
|
<-closed
|
||||||
m.pmut.Lock()
|
m.pmut.Lock()
|
||||||
}
|
}
|
||||||
@ -2582,6 +2595,10 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if fromCfg.Paused && toCfg.Paused {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// This folder exists on both sides. Settings might have changed.
|
// This folder exists on both sides. Settings might have changed.
|
||||||
// Check if anything differs that requires a restart.
|
// Check if anything differs that requires a restart.
|
||||||
if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) {
|
if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) {
|
||||||
@ -2616,12 +2633,12 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool {
|
|||||||
|
|
||||||
// Ignored folder was removed, reconnect to retrigger the prompt.
|
// Ignored folder was removed, reconnect to retrigger the prompt.
|
||||||
if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) {
|
if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) {
|
||||||
m.close(deviceID)
|
m.close(deviceID, errIgnoredFolderRemoved)
|
||||||
}
|
}
|
||||||
|
|
||||||
if toCfg.Paused {
|
if toCfg.Paused {
|
||||||
l.Infoln("Pausing", deviceID)
|
l.Infoln("Pausing", deviceID)
|
||||||
m.close(deviceID)
|
m.close(deviceID, errDevicePaused)
|
||||||
events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()})
|
events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()})
|
||||||
} else {
|
} else {
|
||||||
events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()})
|
events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()})
|
||||||
@ -2717,17 +2734,6 @@ func getChunk(data []string, skip, get int) ([]string, int, int) {
|
|||||||
return data[skip : skip+get], 0, 0
|
return data[skip : skip+get], 0, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeRawConn(conn io.Closer) error {
|
|
||||||
if conn, ok := conn.(*tls.Conn); ok {
|
|
||||||
// If the underlying connection is a *tls.Conn, Close() does more
|
|
||||||
// than it says on the tin. Specifically, it sends a TLS alert
|
|
||||||
// message, which might block forever if the connection is dead
|
|
||||||
// and we don't have a deadline set.
|
|
||||||
conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond))
|
|
||||||
}
|
|
||||||
return conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func stringSliceWithout(ss []string, s string) []string {
|
func stringSliceWithout(ss []string, s string) []string {
|
||||||
for i := range ss {
|
for i := range ss {
|
||||||
if ss[i] == s {
|
if ss[i] == s {
|
||||||
|
@ -316,11 +316,10 @@ type fakeConnection struct {
|
|||||||
mut sync.Mutex
|
mut sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeConnection) Close() error {
|
func (f *fakeConnection) Close(_ error) {
|
||||||
f.mut.Lock()
|
f.mut.Lock()
|
||||||
defer f.mut.Unlock()
|
defer f.mut.Unlock()
|
||||||
f.closed = true
|
f.closed = true
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeConnection) Start() {
|
func (f *fakeConnection) Start() {
|
||||||
|
@ -143,6 +143,7 @@ type RequestResponse interface {
|
|||||||
|
|
||||||
type Connection interface {
|
type Connection interface {
|
||||||
Start()
|
Start()
|
||||||
|
Close(err error)
|
||||||
ID() DeviceID
|
ID() DeviceID
|
||||||
Name() string
|
Name() string
|
||||||
Index(folder string, files []FileInfo) error
|
Index(folder string, files []FileInfo) error
|
||||||
@ -171,6 +172,7 @@ type rawConnection struct {
|
|||||||
nextIDMut sync.Mutex
|
nextIDMut sync.Mutex
|
||||||
|
|
||||||
outbox chan asyncMessage
|
outbox chan asyncMessage
|
||||||
|
sendClose chan asyncMessage
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
once sync.Once
|
once sync.Once
|
||||||
compression Compression
|
compression Compression
|
||||||
@ -214,6 +216,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
|
|||||||
cw: cw,
|
cw: cw,
|
||||||
awaiting: make(map[int32]chan asyncResult),
|
awaiting: make(map[int32]chan asyncResult),
|
||||||
outbox: make(chan asyncMessage),
|
outbox: make(chan asyncMessage),
|
||||||
|
sendClose: make(chan asyncMessage),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
compression: compress,
|
compression: compress,
|
||||||
}
|
}
|
||||||
@ -334,7 +337,7 @@ func (c *rawConnection) ping() bool {
|
|||||||
|
|
||||||
func (c *rawConnection) readerLoop() (err error) {
|
func (c *rawConnection) readerLoop() (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
c.close(err)
|
c.internalClose(err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
fourByteBuf := make([]byte, 4)
|
fourByteBuf := make([]byte, 4)
|
||||||
@ -636,10 +639,15 @@ func (c *rawConnection) writerLoop() {
|
|||||||
close(hm.done)
|
close(hm.done)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.close(err)
|
c.internalClose(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case m := <-c.sendClose:
|
||||||
|
c.writeMessage(m)
|
||||||
|
close(m.done)
|
||||||
|
return // No message must be sent after the Close message.
|
||||||
|
|
||||||
case <-c.closed:
|
case <-c.closed:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -801,24 +809,47 @@ func (c *rawConnection) shouldCompressMessage(msg message) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *rawConnection) close(err error) {
|
// Close is called when the connection is regularely closed and thus the Close
|
||||||
|
// BEP message is sent before terminating the actual connection. The error
|
||||||
|
// argument specifies the reason for closing the connection.
|
||||||
|
func (c *rawConnection) Close(err error) {
|
||||||
c.once.Do(func() {
|
c.once.Do(func() {
|
||||||
l.Debugln("close due to", err)
|
done := make(chan struct{})
|
||||||
close(c.closed)
|
c.sendClose <- asyncMessage{&Close{err.Error()}, done}
|
||||||
|
<-done
|
||||||
|
|
||||||
c.awaitingMut.Lock()
|
// No more sends are necessary, therefore closing the underlying
|
||||||
for i, ch := range c.awaiting {
|
// connection can happen at the same time as the internal cleanup.
|
||||||
if ch != nil {
|
// And this prevents a potential deadlock due to calling c.receiver.Closed
|
||||||
close(ch)
|
go c.commonClose(err)
|
||||||
delete(c.awaiting, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.awaitingMut.Unlock()
|
|
||||||
|
|
||||||
c.receiver.Closed(c, err)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// internalClose is called if there is an unexpected error during normal operation.
|
||||||
|
func (c *rawConnection) internalClose(err error) {
|
||||||
|
c.once.Do(func() {
|
||||||
|
c.commonClose(err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// commonClose is a utility function that must only be called from within
|
||||||
|
// rawConnection.once.Do (i.e. in Close and close).
|
||||||
|
func (c *rawConnection) commonClose(err error) {
|
||||||
|
l.Debugln("close due to", err)
|
||||||
|
close(c.closed)
|
||||||
|
|
||||||
|
c.awaitingMut.Lock()
|
||||||
|
for i, ch := range c.awaiting {
|
||||||
|
if ch != nil {
|
||||||
|
close(ch)
|
||||||
|
delete(c.awaiting, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.awaitingMut.Unlock()
|
||||||
|
|
||||||
|
c.receiver.Closed(c, err)
|
||||||
|
}
|
||||||
|
|
||||||
// The pingSender makes sure that we've sent a message within the last
|
// The pingSender makes sure that we've sent a message within the last
|
||||||
// PingSendInterval. If we already have something sent in the last
|
// PingSendInterval. If we already have something sent in the last
|
||||||
// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
|
// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
|
||||||
@ -859,7 +890,7 @@ func (c *rawConnection) pingReceiver() {
|
|||||||
d := time.Since(c.cr.Last())
|
d := time.Since(c.cr.Last())
|
||||||
if d > ReceiveTimeout {
|
if d > ReceiveTimeout {
|
||||||
l.Debugln(c.id, "ping timeout", d)
|
l.Debugln(c.id, "ping timeout", d)
|
||||||
c.close(ErrTimeout)
|
c.internalClose(ErrTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Debugln(c.id, "last read within", d)
|
l.Debugln(c.id, "last read within", d)
|
||||||
|
@ -56,7 +56,7 @@ func TestClose(t *testing.T) {
|
|||||||
c0.ClusterConfig(ClusterConfig{})
|
c0.ClusterConfig(ClusterConfig{})
|
||||||
c1.ClusterConfig(ClusterConfig{})
|
c1.ClusterConfig(ClusterConfig{})
|
||||||
|
|
||||||
c0.close(errors.New("manual close"))
|
c0.internalClose(errors.New("manual close"))
|
||||||
|
|
||||||
<-c0.closed
|
<-c0.closed
|
||||||
if err := m0.closedError(); err == nil || !strings.Contains(err.Error(), "manual close") {
|
if err := m0.closedError(); err == nil || !strings.Contains(err.Error(), "manual close") {
|
||||||
|
Loading…
Reference in New Issue
Block a user