mirror of
https://github.com/syncthing/syncthing.git
synced 2024-11-17 02:48:57 -07:00
Merge pull request #1872 from calmh/large-file-transfer
Large file transfer
This commit is contained in:
commit
5417fb7287
@ -38,10 +38,12 @@ import (
|
|||||||
|
|
||||||
// How many files to send in each Index/IndexUpdate message.
|
// How many files to send in each Index/IndexUpdate message.
|
||||||
const (
|
const (
|
||||||
indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
|
indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
|
||||||
indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
|
indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
|
||||||
IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
|
indexPerBlockSize = 40 // Each BlockInfo is approximately this big
|
||||||
indexBatchSize = 1000 // Either way, don't include more files than this
|
indexBatchSize = 1000 // Either way, don't include more files than this
|
||||||
|
reqValidationTime = time.Hour // How long to cache validation entries for Request messages
|
||||||
|
reqValidationCacheSize = 1000 // How many entries to aim for in the validation cache size
|
||||||
)
|
)
|
||||||
|
|
||||||
type service interface {
|
type service interface {
|
||||||
@ -86,6 +88,9 @@ type Model struct {
|
|||||||
|
|
||||||
addedFolder bool
|
addedFolder bool
|
||||||
started bool
|
started bool
|
||||||
|
|
||||||
|
reqValidationCache map[string]time.Time // folder / file name => time when confirmed to exist
|
||||||
|
rvmut sync.RWMutex // protects reqValidationCache
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -97,29 +102,31 @@ var (
|
|||||||
// for file data without altering the local folder in any way.
|
// for file data without altering the local folder in any way.
|
||||||
func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model {
|
func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model {
|
||||||
m := &Model{
|
m := &Model{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
db: ldb,
|
db: ldb,
|
||||||
finder: db.NewBlockFinder(ldb, cfg),
|
finder: db.NewBlockFinder(ldb, cfg),
|
||||||
progressEmitter: NewProgressEmitter(cfg),
|
progressEmitter: NewProgressEmitter(cfg),
|
||||||
id: id,
|
id: id,
|
||||||
shortID: id.Short(),
|
shortID: id.Short(),
|
||||||
deviceName: deviceName,
|
deviceName: deviceName,
|
||||||
clientName: clientName,
|
clientName: clientName,
|
||||||
clientVersion: clientVersion,
|
clientVersion: clientVersion,
|
||||||
folderCfgs: make(map[string]config.FolderConfiguration),
|
folderCfgs: make(map[string]config.FolderConfiguration),
|
||||||
folderFiles: make(map[string]*db.FileSet),
|
folderFiles: make(map[string]*db.FileSet),
|
||||||
folderDevices: make(map[string][]protocol.DeviceID),
|
folderDevices: make(map[string][]protocol.DeviceID),
|
||||||
deviceFolders: make(map[protocol.DeviceID][]string),
|
deviceFolders: make(map[protocol.DeviceID][]string),
|
||||||
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
|
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
|
||||||
folderIgnores: make(map[string]*ignore.Matcher),
|
folderIgnores: make(map[string]*ignore.Matcher),
|
||||||
folderRunners: make(map[string]service),
|
folderRunners: make(map[string]service),
|
||||||
folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
|
folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
|
||||||
protoConn: make(map[protocol.DeviceID]protocol.Connection),
|
protoConn: make(map[protocol.DeviceID]protocol.Connection),
|
||||||
rawConn: make(map[protocol.DeviceID]io.Closer),
|
rawConn: make(map[protocol.DeviceID]io.Closer),
|
||||||
deviceVer: make(map[protocol.DeviceID]string),
|
deviceVer: make(map[protocol.DeviceID]string),
|
||||||
|
reqValidationCache: make(map[string]time.Time),
|
||||||
|
|
||||||
fmut: sync.NewRWMutex(),
|
fmut: sync.NewRWMutex(),
|
||||||
pmut: sync.NewRWMutex(),
|
pmut: sync.NewRWMutex(),
|
||||||
|
rvmut: sync.NewRWMutex(),
|
||||||
}
|
}
|
||||||
if cfg.Options().ProgressUpdateIntervalS > -1 {
|
if cfg.Options().ProgressUpdateIntervalS > -1 {
|
||||||
go m.progressEmitter.Serve()
|
go m.progressEmitter.Serve()
|
||||||
@ -729,33 +736,61 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
|
|||||||
return nil, fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags)
|
return nil, fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that the requested file exists in the local model.
|
// Verify that the requested file exists in the local model. We only need
|
||||||
m.fmut.RLock()
|
// to validate this file if we haven't done so recently, so we keep a
|
||||||
folderFiles, ok := m.folderFiles[folder]
|
// cache of successfull results. "Recently" can be quite a long time, as
|
||||||
m.fmut.RUnlock()
|
// we remove validation cache entries when we detect local changes. If
|
||||||
|
// we're out of sync here and the file actually doesn't exist any more, or
|
||||||
|
// has shrunk or something, then we'll anyway get a read error that we
|
||||||
|
// pass on to the other side.
|
||||||
|
|
||||||
if !ok {
|
m.rvmut.RLock()
|
||||||
l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder)
|
validated := m.reqValidationCache[folder+"/"+name]
|
||||||
return nil, protocol.ErrNoSuchFile
|
m.rvmut.RUnlock()
|
||||||
}
|
|
||||||
|
|
||||||
lf, ok := folderFiles.Get(protocol.LocalDeviceID, name)
|
if time.Since(validated) > reqValidationTime {
|
||||||
if !ok {
|
m.fmut.RLock()
|
||||||
return nil, protocol.ErrNoSuchFile
|
folderFiles, ok := m.folderFiles[folder]
|
||||||
}
|
m.fmut.RUnlock()
|
||||||
|
|
||||||
if lf.IsInvalid() || lf.IsDeleted() {
|
if !ok {
|
||||||
if debug {
|
l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder)
|
||||||
l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf)
|
return nil, protocol.ErrNoSuchFile
|
||||||
}
|
}
|
||||||
return nil, protocol.ErrInvalid
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset > lf.Size() {
|
// This call is really expensive for large files, as we load the full
|
||||||
if debug {
|
// block list which may be megabytes and megabytes of data to allocate
|
||||||
l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size)
|
// space for, read, and deserialize.
|
||||||
|
lf, ok := folderFiles.Get(protocol.LocalDeviceID, name)
|
||||||
|
if !ok {
|
||||||
|
return nil, protocol.ErrNoSuchFile
|
||||||
}
|
}
|
||||||
return nil, protocol.ErrNoSuchFile
|
|
||||||
|
if lf.IsInvalid() || lf.IsDeleted() {
|
||||||
|
if debug {
|
||||||
|
l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf)
|
||||||
|
}
|
||||||
|
return nil, protocol.ErrInvalid
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset > lf.Size() {
|
||||||
|
if debug {
|
||||||
|
l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size)
|
||||||
|
}
|
||||||
|
return nil, protocol.ErrNoSuchFile
|
||||||
|
}
|
||||||
|
|
||||||
|
m.rvmut.Lock()
|
||||||
|
m.reqValidationCache[folder+"/"+name] = time.Now()
|
||||||
|
if len(m.reqValidationCache) > reqValidationCacheSize {
|
||||||
|
// Don't let the cache grow infinitely
|
||||||
|
for name, validated := range m.reqValidationCache {
|
||||||
|
if time.Since(validated) > time.Minute {
|
||||||
|
delete(m.reqValidationCache, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.rvmut.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
if debug && deviceID != protocol.LocalDeviceID {
|
if debug && deviceID != protocol.LocalDeviceID {
|
||||||
@ -767,7 +802,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
|
|||||||
|
|
||||||
var reader io.ReaderAt
|
var reader io.ReaderAt
|
||||||
var err error
|
var err error
|
||||||
if lf.IsSymlink() {
|
if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 {
|
||||||
target, _, err := symlinks.Read(fn)
|
target, _, err := symlinks.Read(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -1048,7 +1083,7 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold
|
|||||||
}
|
}
|
||||||
|
|
||||||
batch = append(batch, f)
|
batch = append(batch, f)
|
||||||
currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
|
currentBatchSize += indexPerFileSize + len(f.Blocks)*indexPerBlockSize
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -1071,6 +1106,11 @@ func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) {
|
|||||||
m.fmut.RLock()
|
m.fmut.RLock()
|
||||||
m.folderFiles[folder].Update(protocol.LocalDeviceID, fs)
|
m.folderFiles[folder].Update(protocol.LocalDeviceID, fs)
|
||||||
m.fmut.RUnlock()
|
m.fmut.RUnlock()
|
||||||
|
m.rvmut.Lock()
|
||||||
|
for _, f := range fs {
|
||||||
|
delete(m.reqValidationCache, folder+"/"+f.Name)
|
||||||
|
}
|
||||||
|
m.rvmut.Unlock()
|
||||||
|
|
||||||
events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
|
events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
|
||||||
"folder": folder,
|
"folder": folder,
|
||||||
|
@ -10,6 +10,8 @@ package integration
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -19,8 +21,23 @@ func TestBenchmarkTransferManyFiles(t *testing.T) {
|
|||||||
benchmarkTransfer(t, 50000, 15)
|
benchmarkTransfer(t, 50000, 15)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBenchmarkTransferLargeFiles(t *testing.T) {
|
func TestBenchmarkTransferLargeFile1G(t *testing.T) {
|
||||||
benchmarkTransfer(t, 200, 28)
|
benchmarkTransfer(t, 1, 30)
|
||||||
|
}
|
||||||
|
func TestBenchmarkTransferLargeFile2G(t *testing.T) {
|
||||||
|
benchmarkTransfer(t, 1, 31)
|
||||||
|
}
|
||||||
|
func TestBenchmarkTransferLargeFile4G(t *testing.T) {
|
||||||
|
benchmarkTransfer(t, 1, 32)
|
||||||
|
}
|
||||||
|
func TestBenchmarkTransferLargeFile8G(t *testing.T) {
|
||||||
|
benchmarkTransfer(t, 1, 33)
|
||||||
|
}
|
||||||
|
func TestBenchmarkTransferLargeFile16G(t *testing.T) {
|
||||||
|
benchmarkTransfer(t, 1, 34)
|
||||||
|
}
|
||||||
|
func TestBenchmarkTransferLargeFile32G(t *testing.T) {
|
||||||
|
benchmarkTransfer(t, 1, 35)
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkTransfer(t *testing.T, files, sizeExp int) {
|
func benchmarkTransfer(t *testing.T, files, sizeExp int) {
|
||||||
@ -31,7 +48,20 @@ func benchmarkTransfer(t *testing.T, files, sizeExp int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Generating files...")
|
log.Println("Generating files...")
|
||||||
err = generateFiles("s1", files, sizeExp, "../LICENSE")
|
if files == 1 {
|
||||||
|
// Special case. Generate one file with the specified size exactly.
|
||||||
|
fd, err := os.Open("../LICENSE")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = os.MkdirAll("s1", 0755)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = generateOneFile(fd, "s1/onefile", 1<<uint(sizeExp))
|
||||||
|
} else {
|
||||||
|
err = generateFiles("s1", files, sizeExp, "../LICENSE")
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -39,6 +69,15 @@ func benchmarkTransfer(t *testing.T, files, sizeExp int) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
var total int64
|
||||||
|
var nfiles int
|
||||||
|
for _, f := range expected {
|
||||||
|
total += f.size
|
||||||
|
if f.mode.IsRegular() {
|
||||||
|
nfiles++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("Total %.01f MiB in %d files", float64(total)/1024/1024, nfiles)
|
||||||
|
|
||||||
log.Println("Starting sender...")
|
log.Println("Starting sender...")
|
||||||
sender := syncthingProcess{ // id1
|
sender := syncthingProcess{ // id1
|
||||||
@ -116,8 +155,11 @@ loop:
|
|||||||
time.Sleep(250 * time.Millisecond)
|
time.Sleep(250 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
sender.stop()
|
sendProc, err := sender.stop()
|
||||||
proc, err := receiver.stop()
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
recvProc, err := receiver.stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -134,10 +176,26 @@ loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Result: Wall time:", t1.Sub(t0))
|
log.Println("Result: Wall time:", t1.Sub(t0))
|
||||||
|
log.Printf("Result: %.1f MiB/s synced", float64(total)/1024/1024/t1.Sub(t0).Seconds())
|
||||||
|
|
||||||
if rusage, ok := proc.SysUsage().(*syscall.Rusage); ok {
|
if rusage, ok := recvProc.SysUsage().(*syscall.Rusage); ok {
|
||||||
log.Println("Result: Utime:", time.Duration(rusage.Utime.Nano()))
|
log.Println("Receiver: Utime:", time.Duration(rusage.Utime.Nano()))
|
||||||
log.Println("Result: Stime:", time.Duration(rusage.Stime.Nano()))
|
log.Println("Receiver: Stime:", time.Duration(rusage.Stime.Nano()))
|
||||||
log.Println("Result: MaxRSS:", rusage.Maxrss/1024, "KiB")
|
if runtime.GOOS == "darwin" {
|
||||||
|
// Darwin reports in bytes, Linux seems to report in KiB even
|
||||||
|
// though the manpage says otherwise.
|
||||||
|
rusage.Maxrss /= 1024
|
||||||
|
}
|
||||||
|
log.Println("Receiver: MaxRSS:", rusage.Maxrss, "KiB")
|
||||||
|
}
|
||||||
|
if rusage, ok := sendProc.SysUsage().(*syscall.Rusage); ok {
|
||||||
|
log.Println("Sender: Utime:", time.Duration(rusage.Utime.Nano()))
|
||||||
|
log.Println("Sender: Stime:", time.Duration(rusage.Stime.Nano()))
|
||||||
|
if runtime.GOOS == "darwin" {
|
||||||
|
// Darwin reports in bytes, Linux seems to report in KiB even
|
||||||
|
// though the manpage says otherwise.
|
||||||
|
rusage.Maxrss /= 1024
|
||||||
|
}
|
||||||
|
log.Println("Sender: MaxRSS:", rusage.Maxrss, "KiB")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
54
test/util.go
54
test/util.go
@ -60,38 +60,46 @@ func generateFiles(dir string, files, maxexp int, srcname string) error {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s := 1 << uint(rand.Intn(maxexp))
|
p1 := filepath.Join(p0, n)
|
||||||
a := 128 * 1024
|
|
||||||
|
s := int64(1 << uint(rand.Intn(maxexp)))
|
||||||
|
a := int64(128 * 1024)
|
||||||
if a > s {
|
if a > s {
|
||||||
a = s
|
a = s
|
||||||
}
|
}
|
||||||
s += rand.Intn(a)
|
s += rand.Int63n(a)
|
||||||
|
|
||||||
src := io.LimitReader(&inifiteReader{fd}, int64(s))
|
if err := generateOneFile(fd, p1, s); err != nil {
|
||||||
|
|
||||||
p1 := filepath.Join(p0, n)
|
|
||||||
dst, err := os.Create(p1)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, err = io.Copy(dst, src)
|
return nil
|
||||||
if err != nil {
|
}
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = dst.Close()
|
func generateOneFile(fd io.ReadSeeker, p1 string, s int64) error {
|
||||||
if err != nil {
|
src := io.LimitReader(&inifiteReader{fd}, int64(s))
|
||||||
return err
|
dst, err := os.Create(p1)
|
||||||
}
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
_ = os.Chmod(p1, os.FileMode(rand.Intn(0777)|0400))
|
_, err = io.Copy(dst, src)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
t := time.Now().Add(-time.Duration(rand.Intn(30*86400)) * time.Second)
|
err = dst.Close()
|
||||||
err = os.Chtimes(p1, t, t)
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
}
|
||||||
}
|
|
||||||
|
_ = os.Chmod(p1, os.FileMode(rand.Intn(0777)|0400))
|
||||||
|
|
||||||
|
t := time.Now().Add(-time.Duration(rand.Intn(30*86400)) * time.Second)
|
||||||
|
err = os.Chtimes(p1, t, t)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -367,6 +375,7 @@ type fileInfo struct {
|
|||||||
mode os.FileMode
|
mode os.FileMode
|
||||||
mod int64
|
mod int64
|
||||||
hash [16]byte
|
hash [16]byte
|
||||||
|
size int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f fileInfo) String() string {
|
func (f fileInfo) String() string {
|
||||||
@ -428,6 +437,7 @@ func startWalker(dir string, res chan<- fileInfo, abort <-chan struct{}) chan er
|
|||||||
name: rn,
|
name: rn,
|
||||||
mode: info.Mode(),
|
mode: info.Mode(),
|
||||||
mod: info.ModTime().Unix(),
|
mod: info.ModTime().Unix(),
|
||||||
|
size: info.Size(),
|
||||||
}
|
}
|
||||||
sum, err := md5file(path)
|
sum, err := md5file(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user