diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 18e49954e..bd2f7fc55 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -35,7 +35,7 @@ }, { "ImportPath": "github.com/syncthing/protocol", - "Rev": "22e24fc3879b1665077389f96862e222b2cdd8d3" + "Rev": "ebcdea63c07327a342f65415bbadc497462b8f1f" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", diff --git a/Godeps/_workspace/src/github.com/syncthing/protocol/common_test.go b/Godeps/_workspace/src/github.com/syncthing/protocol/common_test.go index f46b6a8da..706a3b877 100644 --- a/Godeps/_workspace/src/github.com/syncthing/protocol/common_test.go +++ b/Godeps/_workspace/src/github.com/syncthing/protocol/common_test.go @@ -31,7 +31,7 @@ func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo, fl func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { } -func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option, buf []byte) error { t.folder = folder t.name = name t.offset = offset @@ -39,7 +39,8 @@ func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64 t.hash = hash t.flags = flags t.options = options - return t.data, nil + copy(buf, t.data) + return nil } func (t *TestModel) Close(deviceID DeviceID, err error) { diff --git a/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_darwin.go b/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_darwin.go index 502a71f23..eb755a6e4 100644 --- a/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_darwin.go +++ b/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_darwin.go @@ -26,9 +26,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI m.next.IndexUpdate(deviceID, folder, files, flags, options) } -func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { name = norm.NFD.String(name) - return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) } func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { diff --git a/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_unix.go b/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_unix.go index 21585e308..0611865e1 100644 --- a/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_unix.go +++ b/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_unix.go @@ -18,8 +18,8 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI m.next.IndexUpdate(deviceID, folder, files, flags, options) } -func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { - return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) } func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { diff --git a/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_windows.go b/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_windows.go index f1a24898c..36a1d2749 100644 --- a/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_windows.go +++ b/Godeps/_workspace/src/github.com/syncthing/protocol/nativemodel_windows.go @@ -34,9 +34,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI m.next.IndexUpdate(deviceID, folder, files, flags, options) } -func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { name = filepath.FromSlash(name) - return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) } func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { diff --git a/Godeps/_workspace/src/github.com/syncthing/protocol/protocol.go b/Godeps/_workspace/src/github.com/syncthing/protocol/protocol.go index d0e23055d..8b41c0138 100644 --- a/Godeps/_workspace/src/github.com/syncthing/protocol/protocol.go +++ b/Godeps/_workspace/src/github.com/syncthing/protocol/protocol.go @@ -81,7 +81,7 @@ type Model interface { // An index update was received from the peer device IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) // A request was made by the peer device - Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) + Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error // A cluster configuration message was received ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) // The peer device closed the connection @@ -112,11 +112,11 @@ type rawConnection struct { idxMut sync.Mutex // ensures serialization of Index calls - nextID chan int - outbox chan hdrMsg - closed chan struct{} - once sync.Once - + nextID chan int + outbox chan hdrMsg + closed chan struct{} + once sync.Once + pool sync.Pool compression Compression rdbuf0 []byte // used & reused by readMessage @@ -129,8 +129,9 @@ type asyncResult struct { } type hdrMsg struct { - hdr header - msg encodable + hdr header + msg encodable + done chan struct{} } type encodable interface { @@ -151,14 +152,19 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv cw := &countingWriter{Writer: writer} c := rawConnection{ - id: deviceID, - name: name, - receiver: nativeModel{receiver}, - cr: cr, - cw: cw, - outbox: make(chan hdrMsg), - nextID: make(chan int), - closed: make(chan struct{}), + id: deviceID, + name: name, + receiver: nativeModel{receiver}, + cr: cr, + cw: cw, + outbox: make(chan hdrMsg), + nextID: make(chan int), + closed: make(chan struct{}), + pool: sync.Pool{ + New: func() interface{} { + return make([]byte, BlockSize) + }, + }, compression: compress, } @@ -195,7 +201,7 @@ func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, optio Files: idx, Flags: flags, Options: options, - }) + }, nil) c.idxMut.Unlock() return nil } @@ -213,7 +219,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, Files: idx, Flags: flags, Options: options, - }) + }, nil) c.idxMut.Unlock() return nil } @@ -243,7 +249,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i Hash: hash, Flags: flags, Options: options, - }) + }, nil) if !ok { return nil, ErrClosed } @@ -257,7 +263,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i // ClusterConfig send the cluster configuration message to the peer and returns any error func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { - c.send(-1, messageTypeClusterConfig, config) + c.send(-1, messageTypeClusterConfig, config, nil) } func (c *rawConnection) ping() bool { @@ -273,7 +279,7 @@ func (c *rawConnection) ping() bool { c.awaiting[id] = rc c.awaitingMut.Unlock() - ok := c.send(id, messageTypePing, nil) + ok := c.send(id, messageTypePing, nil, nil) if !ok { return false } @@ -342,7 +348,7 @@ func (c *rawConnection) readerLoop() (err error) { if state != stateReady { return fmt.Errorf("protocol error: ping message in state %d", state) } - c.send(hdr.msgID, messageTypePong, pongMessage{}) + c.send(hdr.msgID, messageTypePong, pongMessage{}, nil) case pongMessage: if state != stateReady { @@ -519,12 +525,36 @@ func filterIndexMessageFiles(fs []FileInfo) []FileInfo { } func (c *rawConnection) handleRequest(msgID int, req RequestMessage) { - data, err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size), req.Hash, req.Flags, req.Options) + size := int(req.Size) + usePool := size <= BlockSize - c.send(msgID, messageTypeResponse, ResponseMessage{ - Data: data, - Code: errorToCode(err), - }) + var buf []byte + var done chan struct{} + + if usePool { + buf = c.pool.Get().([]byte)[:size] + done = make(chan struct{}) + } else { + buf = make([]byte, size) + } + + err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf) + if err != nil { + c.send(msgID, messageTypeResponse, ResponseMessage{ + Data: nil, + Code: errorToCode(err), + }, done) + } else { + c.send(msgID, messageTypeResponse, ResponseMessage{ + Data: buf, + Code: errorToCode(err), + }, done) + } + + if usePool { + <-done + c.pool.Put(buf) + } } func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) { @@ -547,7 +577,7 @@ func (c *rawConnection) handlePong(msgID int) { c.awaitingMut.Unlock() } -func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool { +func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool { if msgID < 0 { select { case id := <-c.nextID: @@ -564,7 +594,7 @@ func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool { } select { - case c.outbox <- hdrMsg{hdr, msg}: + case c.outbox <- hdrMsg{hdr, msg, done}: return true case <-c.closed: return false @@ -583,6 +613,9 @@ func (c *rawConnection) writerLoop() { if hm.msg != nil { // Uncompressed message in uncBuf uncBuf, err = hm.msg.AppendXDR(uncBuf[:0]) + if hm.done != nil { + close(hm.done) + } if err != nil { c.close(err) return diff --git a/internal/model/model.go b/internal/model/model.go index c14d020c5..d81b5520f 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -705,19 +705,19 @@ func (m *Model) Close(device protocol.DeviceID, err error) { // Request returns the specified data segment by reading it from local disk. // Implements the protocol.Model interface. -func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) { - if offset < 0 || size < 0 { - return nil, protocol.ErrNoSuchFile +func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset int64, hash []byte, flags uint32, options []protocol.Option, buf []byte) error { + if offset < 0 { + return protocol.ErrNoSuchFile } if !m.folderSharedWith(folder, deviceID) { l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder) - return nil, protocol.ErrNoSuchFile + return protocol.ErrNoSuchFile } if flags != 0 { // We don't currently support or expect any flags. - return nil, fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags) + return fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags) } // Verify that the requested file exists in the local model. We only need @@ -739,7 +739,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset if !ok { l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder) - return nil, protocol.ErrNoSuchFile + return protocol.ErrNoSuchFile } // This call is really expensive for large files, as we load the full @@ -747,21 +747,21 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset // space for, read, and deserialize. lf, ok := folderFiles.Get(protocol.LocalDeviceID, name) if !ok { - return nil, protocol.ErrNoSuchFile + return protocol.ErrNoSuchFile } if lf.IsInvalid() || lf.IsDeleted() { if debug { - l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf) + l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, len(buf), lf) } - return nil, protocol.ErrInvalid + return protocol.ErrInvalid } if offset > lf.Size() { if debug { - l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size) + l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, len(buf)) } - return nil, protocol.ErrNoSuchFile + return protocol.ErrNoSuchFile } m.rvmut.Lock() @@ -792,7 +792,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset } if debug && deviceID != protocol.LocalDeviceID { - l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, size) + l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, len(buf)) } m.fmut.RLock() fn := filepath.Join(m.folderCfgs[folder].Path(), name) @@ -803,7 +803,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 { target, _, err := symlinks.Read(fn) if err != nil { - return nil, err + return err } reader = strings.NewReader(target) } else { @@ -811,19 +811,18 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset // at any moment. reader, err = os.Open(fn) if err != nil { - return nil, err + return err } defer reader.(*os.File).Close() } - buf := make([]byte, size) _, err = reader.ReadAt(buf, offset) if err != nil { - return nil, err + return err } - return buf, nil + return nil } func (m *Model) CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool) { diff --git a/internal/model/model_test.go b/internal/model/model_test.go index c285c9c96..9243cc8ca 100644 --- a/internal/model/model_test.go +++ b/internal/model/model_test.go @@ -99,8 +99,11 @@ func TestRequest(t *testing.T) { m.ServeBackground() m.ScanFolder("default") + bs := make([]byte, protocol.BlockSize) + // Existing, shared file - bs, err := m.Request(device1, "default", "foo", 0, 6, nil, 0, nil) + bs = bs[:6] + err := m.Request(device1, "default", "foo", 0, nil, 0, nil, bs) if err != nil { t.Error(err) } @@ -109,58 +112,35 @@ func TestRequest(t *testing.T) { } // Existing, nonshared file - bs, err = m.Request(device2, "default", "foo", 0, 6, nil, 0, nil) + err = m.Request(device2, "default", "foo", 0, nil, 0, nil, bs) if err == nil { t.Error("Unexpected nil error on insecure file read") } - if bs != nil { - t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs)) - } // Nonexistent file - bs, err = m.Request(device1, "default", "nonexistent", 0, 6, nil, 0, nil) + err = m.Request(device1, "default", "nonexistent", 0, nil, 0, nil, bs) if err == nil { t.Error("Unexpected nil error on insecure file read") } - if bs != nil { - t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs)) - } // Shared folder, but disallowed file name - bs, err = m.Request(device1, "default", "../walk.go", 0, 6, nil, 0, nil) + err = m.Request(device1, "default", "../walk.go", 0, nil, 0, nil, bs) if err == nil { t.Error("Unexpected nil error on insecure file read") } - if bs != nil { - t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs)) - } - - // Larger block than available - bs, err = m.Request(device1, "default", "foo", 0, 42, nil, 0, nil) - if err == nil { - t.Error("Unexpected nil error on insecure file read") - } - if bs != nil { - t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs)) - } // Negative offset - bs, err = m.Request(device1, "default", "foo", -4, 6, nil, 0, nil) + err = m.Request(device1, "default", "foo", -4, nil, 0, nil, bs[:0]) if err == nil { t.Error("Unexpected nil error on insecure file read") } - if bs != nil { - t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs)) - } - // Negative size - bs, err = m.Request(device1, "default", "foo", 4, -4, nil, 0, nil) + // Larger block than available + bs = bs[:42] + err = m.Request(device1, "default", "foo", 0, nil, 0, nil, bs) if err == nil { t.Error("Unexpected nil error on insecure file read") } - if bs != nil { - t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs)) - } } func genFiles(n int) []protocol.FileInfo {