From dec479532ebd929d1779608f94861a67230bf52a Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Wed, 7 Jan 2015 23:12:12 +0000 Subject: [PATCH] All roads lead to Finisher (fixes #1201) --- internal/model/progressemitter_test.go | 2 - internal/model/puller.go | 26 ++-- internal/model/puller_test.go | 169 +++++++++++++++++++++++ internal/model/sharedpullerstate.go | 28 ++-- internal/model/sharedpullerstate_test.go | 2 +- 5 files changed, 193 insertions(+), 34 deletions(-) diff --git a/internal/model/progressemitter_test.go b/internal/model/progressemitter_test.go index c22faaad1..8137fb89c 100644 --- a/internal/model/progressemitter_test.go +++ b/internal/model/progressemitter_test.go @@ -47,8 +47,6 @@ func expectTimeout(w *events.Subscription, t *testing.T) { } func TestProgressEmitter(t *testing.T) { - l.Debugln("test progress emitter") - w := events.Default.Subscribe(events.DownloadProgress) c := config.Wrap("/tmp/test", config.Configuration{}) diff --git a/internal/model/puller.go b/internal/model/puller.go index 0070d10c3..15856478c 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -613,17 +613,17 @@ func (p *Puller) shortcutSymlink(curFile, file protocol.FileInfo) { func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { buf := make([]byte, protocol.BlockSize) -nextFile: for state := range in { + if p.progressEmitter != nil { + p.progressEmitter.Register(state.sharedPullerState) + } + dstFd, err := state.tempFile() if err != nil { // Nothing more to do for this failed file (the error was logged // when it happened) - continue nextFile - } - - if p.progressEmitter != nil { - p.progressEmitter.Register(state.sharedPullerState) + out <- state.sharedPullerState + continue } evictionChan := make(chan lfu.Eviction) @@ -687,7 +687,7 @@ nextFile: _, err = dstFd.WriteAt(buf, block.Offset) if err != nil { - state.earlyClose("dst write", err) + state.fail("dst write", err) } if file == state.file.Name { state.copiedFromOrigin() @@ -739,9 +739,9 @@ func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPulle selected := activity.leastBusy(potentialDevices) if selected == (protocol.DeviceID{}) { if lastError != nil { - state.earlyClose("pull", lastError) + state.fail("pull", lastError) } else { - state.earlyClose("pull", errNoDevice) + state.fail("pull", errNoDevice) } break } @@ -767,13 +767,13 @@ func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPulle // Save the block data we got from the cluster _, err = fd.WriteAt(buf, state.block.Offset) if err != nil { - state.earlyClose("save", err) + state.fail("save", err) } else { state.pullDone() - out <- state.sharedPullerState } break } + out <- state.sharedPullerState } } @@ -863,7 +863,9 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) { } p.queue.Done(state.file.Name) - p.performFinish(state) + if state.failed() == nil { + p.performFinish(state) + } p.model.receivedFile(p.folder, state.file.Name) if p.progressEmitter != nil { p.progressEmitter.Deregister(state) diff --git a/internal/model/puller_test.go b/internal/model/puller_test.go index d7576b46f..811ac9238 100644 --- a/internal/model/puller_test.go +++ b/internal/model/puller_test.go @@ -382,3 +382,172 @@ func TestLastResortPulling(t *testing.T) { (<-finisherChan).fd.Close() os.Remove(filepath.Join("testdata", defTempNamer.TempName("newfile"))) } + +func TestDeregisterOnFailInCopy(t *testing.T) { + file := protocol.FileInfo{ + Name: "filex", + Flags: 0, + Modified: 0, + Blocks: []protocol.BlockInfo{ + blocks[0], blocks[2], blocks[0], blocks[0], + blocks[5], blocks[0], blocks[0], blocks[8], + }, + } + defer os.Remove(defTempNamer.TempName("filex")) + + db, _ := leveldb.Open(storage.NewMemStorage(), nil) + cw := config.Wrap("/tmp/test", config.Configuration{}) + m := NewModel(cw, "device", "syncthing", "dev", db) + m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"}) + + emitter := NewProgressEmitter(cw) + go emitter.Serve() + + p := Puller{ + folder: "default", + dir: "testdata", + model: m, + queue: newJobQueue(), + progressEmitter: emitter, + } + + // queue.Done should be called by the finisher routine + p.queue.Push("filex") + p.queue.Pop() + + if len(p.queue.progress) != 1 { + t.Fatal("Expected file in progress") + } + + copyChan := make(chan copyBlocksState) + pullChan := make(chan pullBlockState) + finisherBufferChan := make(chan *sharedPullerState) + finisherChan := make(chan *sharedPullerState) + + go p.copierRoutine(copyChan, pullChan, finisherBufferChan) + go p.finisherRoutine(finisherChan) + + p.handleFile(file, copyChan, finisherChan) + + // Receive a block at puller, to indicate that atleast a single copier + // loop has been performed. + toPull := <-pullChan + // Wait until copier is trying to pass something down to the puller again + time.Sleep(100 * time.Millisecond) + // Close the file + toPull.sharedPullerState.fail("test", os.ErrNotExist) + // Unblock copier + <-pullChan + + select { + case state := <-finisherBufferChan: + // At this point the file should still be registered with both the job + // queue, and the progress emitter. Verify this. + if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 { + t.Fatal("Could not find file") + } + + // Pass the file down the real finisher, and give it time to consume + finisherChan <- state + time.Sleep(100 * time.Millisecond) + + if state.fd != nil { + t.Fatal("File not closed?") + } + + if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 { + t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued)) + } + + // Doing it again should have no effect + finisherChan <- state + time.Sleep(100 * time.Millisecond) + + if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 { + t.Fatal("Still registered") + } + case <-time.After(time.Second): + t.Fatal("Didn't get anything to the finisher") + } +} + +func TestDeregisterOnFailInPull(t *testing.T) { + file := protocol.FileInfo{ + Name: "filex", + Flags: 0, + Modified: 0, + Blocks: []protocol.BlockInfo{ + blocks[0], blocks[2], blocks[0], blocks[0], + blocks[5], blocks[0], blocks[0], blocks[8], + }, + } + defer os.Remove(defTempNamer.TempName("filex")) + + db, _ := leveldb.Open(storage.NewMemStorage(), nil) + cw := config.Wrap("/tmp/test", config.Configuration{}) + m := NewModel(cw, "device", "syncthing", "dev", db) + m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"}) + + emitter := NewProgressEmitter(cw) + go emitter.Serve() + + p := Puller{ + folder: "default", + dir: "testdata", + model: m, + queue: newJobQueue(), + progressEmitter: emitter, + } + + // queue.Done should be called by the finisher routine + p.queue.Push("filex") + p.queue.Pop() + + if len(p.queue.progress) != 1 { + t.Fatal("Expected file in progress") + } + + copyChan := make(chan copyBlocksState) + pullChan := make(chan pullBlockState) + finisherBufferChan := make(chan *sharedPullerState) + finisherChan := make(chan *sharedPullerState) + + go p.copierRoutine(copyChan, pullChan, finisherBufferChan) + go p.pullerRoutine(pullChan, finisherBufferChan) + go p.finisherRoutine(finisherChan) + + p.handleFile(file, copyChan, finisherChan) + + // Receove at finisher, we shoud error out as puller has nowhere to pull + // from. + select { + case state := <-finisherBufferChan: + // At this point the file should still be registered with both the job + // queue, and the progress emitter. Verify this. + if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 { + t.Fatal("Could not find file") + } + + // Pass the file down the real finisher, and give it time to consume + finisherChan <- state + time.Sleep(100 * time.Millisecond) + + if state.fd != nil { + t.Fatal("File not closed?") + } + + if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 { + t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued)) + } + + // Doing it again should have no effect + finisherChan <- state + time.Sleep(100 * time.Millisecond) + + if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 { + t.Fatal("Still registered") + } + case <-time.After(time.Second): + t.Fatal("Didn't get anything to the finisher") + } +} diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go index 943e04a18..23836acac 100644 --- a/internal/model/sharedpullerstate.go +++ b/internal/model/sharedpullerstate.go @@ -43,7 +43,6 @@ type sharedPullerState struct { copyOrigin uint32 // Number of blocks copied from the original file copyNeeded uint32 // Number of copy actions still pending pullNeeded uint32 // Number of block pulls still pending - closed bool // Set when the file has been closed mut sync.Mutex // Protects the above } @@ -93,7 +92,7 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) { // here. dir := filepath.Dir(s.tempName) if info, err := os.Stat(dir); err != nil { - s.earlyCloseLocked("dst stat dir", err) + s.failLocked("dst stat dir", err) return nil, err } else if info.Mode()&0200 == 0 { err := os.Chmod(dir, 0755) @@ -119,13 +118,13 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) { // make sure we have write permissions on the file before opening it. err := os.Chmod(s.tempName, 0644) if err != nil { - s.earlyCloseLocked("dst create chmod", err) + s.failLocked("dst create chmod", err) return nil, err } } fd, err := os.OpenFile(s.tempName, flags, 0644) if err != nil { - s.earlyCloseLocked("dst create", err) + s.failLocked("dst create", err) return nil, err } @@ -148,7 +147,7 @@ func (s *sharedPullerState) sourceFile() (*os.File, error) { // Attempt to open the existing file fd, err := os.Open(s.realName) if err != nil { - s.earlyCloseLocked("src open", err) + s.failLocked("src open", err) return nil, err } @@ -158,24 +157,20 @@ func (s *sharedPullerState) sourceFile() (*os.File, error) { // earlyClose prints a warning message composed of the context and // error, and marks the sharedPullerState as failed. Is a no-op when called on // an already failed state. -func (s *sharedPullerState) earlyClose(context string, err error) { +func (s *sharedPullerState) fail(context string, err error) { s.mut.Lock() defer s.mut.Unlock() - s.earlyCloseLocked(context, err) + s.failLocked(context, err) } -func (s *sharedPullerState) earlyCloseLocked(context string, err error) { +func (s *sharedPullerState) failLocked(context string, err error) { if s.err != nil { return } l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err) s.err = err - if s.fd != nil { - s.fd.Close() - } - s.closed = true } func (s *sharedPullerState) failed() error { @@ -230,21 +225,16 @@ func (s *sharedPullerState) finalClose() (bool, error) { s.mut.Lock() defer s.mut.Unlock() - if s.pullNeeded+s.copyNeeded != 0 { + if s.pullNeeded+s.copyNeeded != 0 && s.err == nil { // Not done yet. return false, nil } - if s.closed { - // Already handled. - return false, nil - } - s.closed = true if fd := s.fd; fd != nil { s.fd = nil return true, fd.Close() } - return true, nil + return false, nil } // Returns the momentarily progress for the puller diff --git a/internal/model/sharedpullerstate_test.go b/internal/model/sharedpullerstate_test.go index 2beda1e28..f01767ba6 100644 --- a/internal/model/sharedpullerstate_test.go +++ b/internal/model/sharedpullerstate_test.go @@ -86,5 +86,5 @@ func TestReadOnlyDir(t *testing.T) { t.Fatal("Unexpected nil fd") } - s.earlyClose("Test done", nil) + s.fail("Test done", nil) }