Performance: make filequeue not suck

This commit is contained in:
Jakob Borg 2014-01-23 16:39:12 +01:00
parent 01096fff6c
commit fc2ebc6cad
4 changed files with 55 additions and 33 deletions

View File

@ -30,7 +30,7 @@ EOT
mkdir files-$i mkdir files-$i
pushd files-$i >/dev/null pushd files-$i >/dev/null
../genfiles -maxexp 21 -files 400 ../genfiles -maxexp 21 -files 4000
../md5r > ../md5-$i ../md5r > ../md5-$i
popd >/dev/null popd >/dev/null
done done

View File

@ -18,6 +18,7 @@ type FileQueue struct {
fmut sync.Mutex // protects files and sorted fmut sync.Mutex // protects files and sorted
availability map[string][]string availability map[string][]string
amut sync.Mutex // protects availability amut sync.Mutex // protects availability
queued map[string]bool
} }
type queuedFile struct { type queuedFile struct {
@ -60,6 +61,7 @@ type queuedBlock struct {
func NewFileQueue() *FileQueue { func NewFileQueue() *FileQueue {
return &FileQueue{ return &FileQueue{
availability: make(map[string][]string), availability: make(map[string][]string),
queued: make(map[string]bool),
} }
} }
@ -67,10 +69,8 @@ func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
q.fmut.Lock() q.fmut.Lock()
defer q.fmut.Unlock() defer q.fmut.Unlock()
for _, f := range q.files { if q.queued[name] {
if f.name == name { return
panic("re-adding added file " + f.name)
}
} }
q.files = append(q.files, queuedFile{ q.files = append(q.files, queuedFile{
@ -81,6 +81,7 @@ func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
channel: make(chan content), channel: make(chan content),
monitor: monitor, monitor: monitor,
}) })
q.queued[name] = true
q.sorted = false q.sorted = false
} }
@ -116,6 +117,7 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
mon.FileDone() mon.FileDone()
} }
} }
delete(q.queued, qf.name)
q.deleteAt(i) q.deleteAt(i)
return queuedBlock{}, false return queuedBlock{}, false
} }
@ -159,6 +161,7 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
err := qf.monitor.FileBegins(qf.channel) err := qf.monitor.FileBegins(qf.channel)
if err != nil { if err != nil {
log.Printf("WARNING: %s: %v (not synced)", qf.name, err) log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
delete(q.queued, qf.name)
q.deleteAt(i) q.deleteAt(i)
return return
} }
@ -175,6 +178,7 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
log.Printf("WARNING: %s: %v", qf.name, err) log.Printf("WARNING: %s: %v", qf.name, err)
} }
} }
delete(q.queued, qf.name)
q.deleteAt(i) q.deleteAt(i)
} }
return return
@ -183,18 +187,6 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
panic("unreachable") panic("unreachable")
} }
func (q *FileQueue) Queued(file string) bool {
q.fmut.Lock()
defer q.fmut.Unlock()
for _, qf := range q.files {
if qf.name == file {
return true
}
}
return false
}
func (q *FileQueue) QueuedFiles() (files []string) { func (q *FileQueue) QueuedFiles() (files []string) {
q.fmut.Lock() q.fmut.Lock()
defer q.fmut.Unlock() defer q.fmut.Unlock()
@ -213,6 +205,7 @@ func (q *FileQueue) deleteFile(n string) {
for i, file := range q.files { for i, file := range q.files {
if n == file.name { if n == file.name {
q.deleteAt(i) q.deleteAt(i)
delete(q.queued, file.name)
return return
} }
} }

View File

@ -755,9 +755,7 @@ func (m *Model) recomputeNeed() {
m.gmut.RUnlock() m.gmut.RUnlock()
for _, ao := range toAdd { for _, ao := range toAdd {
if !m.fq.Queued(ao.n) { m.fq.Add(ao.n, ao.remote, ao.fm)
m.fq.Add(ao.n, ao.remote, ao.fm)
}
} }
for _, gf := range toDelete { for _, gf := range toDelete {
m.dq <- gf m.dq <- gf

View File

@ -385,10 +385,7 @@ func TestIgnoreWithUnknownFlags(t *testing.T) {
} }
} }
func prepareModel(n int, m *Model) []protocol.FileInfo { func genFiles(n int) []protocol.FileInfo {
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := make([]protocol.FileInfo, n) files := make([]protocol.FileInfo, n)
t := time.Now().Unix() t := time.Now().Unix()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -399,33 +396,39 @@ func prepareModel(n int, m *Model) []protocol.FileInfo {
} }
} }
m.Index("42", files)
return files return files
} }
func BenchmarkRecomputeGlobal10k(b *testing.B) { func BenchmarkIndex10000(b *testing.B) {
m := NewModel("testdata", 1e6) m := NewModel("testdata", 1e6)
prepareModel(10000, m) fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
m.recomputeGlobal() m.Index("42", files)
} }
} }
func BenchmarkRecomputeNeed10K(b *testing.B) { func BenchmarkIndex00100(b *testing.B) {
m := NewModel("testdata", 1e6) m := NewModel("testdata", 1e6)
prepareModel(10000, m) fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(100)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
m.recomputeNeed() m.Index("42", files)
} }
} }
func BenchmarkIndexUpdate10000(b *testing.B) { func BenchmarkIndexUpdate10000f10000(b *testing.B) {
m := NewModel("testdata", 1e6) m := NewModel("testdata", 1e6)
files := prepareModel(10000, m) fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -433,6 +436,34 @@ func BenchmarkIndexUpdate10000(b *testing.B) {
} }
} }
func BenchmarkIndexUpdate10000f00100(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
ufiles := genFiles(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", ufiles)
}
}
func BenchmarkIndexUpdate10000f00001(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
ufiles := genFiles(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", ufiles)
}
}
type FakeConnection struct { type FakeConnection struct {
id string id string
requestData []byte requestData []byte