Track total block counts, count copier blocks

Will eventually allow us to track progress per file
This commit is contained in:
Audrius Butkevicius 2014-10-12 21:38:22 +01:00 committed by Jakob Borg
parent 4360b2c815
commit 53da778506
2 changed files with 29 additions and 13 deletions

View File

@ -448,7 +448,7 @@ FilesAreDifferent:
tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
realName := filepath.Join(p.dir, file.Name)
var reuse bool
reused := 0
var blocks []protocol.BlockInfo
// Check for an old temporary file which might have some blocks we could
@ -472,11 +472,10 @@ FilesAreDifferent:
}
}
// If any blocks could be reused, let the sharedpullerstate know
// which flags it is expected to set on the file.
if len(blocks) != len(file.Blocks) {
reuse = true
} else {
// The sharedpullerstate will know which flags to use when opening the
// temp file depending if we are reusing any blocks or not.
reused = len(file.Blocks) - len(blocks)
if reused == 0 {
// Otherwise, discard the file ourselves in order for the
// sharedpuller not to panic when it fails to exlusively create a
// file which already exists
@ -491,12 +490,13 @@ FilesAreDifferent:
folder: p.folder,
tempName: tempName,
realName: realName,
copyTotal: len(blocks),
copyNeeded: len(blocks),
reuse: reuse,
reused: reused,
}
if debug {
l.Debugf("%v need file %s; copy %d, reuse %v", p, file.Name, len(blocks), reuse)
l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
}
cs := copyBlocksState{
@ -591,6 +591,9 @@ nextFile:
if err != nil {
state.earlyClose("dst write", err)
}
if file == state.file.Name {
state.copiedFromOrigin()
}
return true
})
@ -605,8 +608,9 @@ nextFile:
block: block,
}
pullChan <- ps
} else {
state.copyDone()
}
state.copyDone()
}
fdCache.Evict(fdCache.Len())
close(evictionChan)

View File

@ -31,13 +31,16 @@ type sharedPullerState struct {
folder string
tempName string
realName string
reuse bool
reused int // Number of blocks reused from temporary file
// Mutable, must be locked for access
err error // The first error we hit
fd *os.File // The fd of the temp file
copyNeeded int // Number of copy actions we expect to happen
pullNeeded int // Number of block pulls we expect to happen
copyTotal int // Total number of copy actions for the whole job
pullTotal int // Total number of pull actions for the whole job
copyNeeded int // Number of copy actions still pending
pullNeeded int // Number of block pulls still pending
copyOrigin int // Number of blocks copied from the original file
closed bool // Set when the file has been closed
mut sync.Mutex // Protects the above
}
@ -79,7 +82,7 @@ func (s *sharedPullerState) tempFile() (*os.File, error) {
// Attempt to create the temp file
flags := os.O_WRONLY
if !s.reuse {
if s.reused == 0 {
flags |= os.O_CREATE | os.O_EXCL
}
fd, err := os.OpenFile(s.tempName, flags, 0644)
@ -154,8 +157,17 @@ func (s *sharedPullerState) copyDone() {
s.mut.Unlock()
}
func (s *sharedPullerState) copiedFromOrigin() {
s.mut.Lock()
s.copyOrigin++
s.mut.Unlock()
}
func (s *sharedPullerState) pullStarted() {
s.mut.Lock()
s.copyTotal--
s.copyNeeded--
s.pullTotal++
s.pullNeeded++
if debug {
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)