Test case to pinpoint DB corruption (failing)

This commit is contained in:
Jakob Borg 2014-10-30 20:32:54 +01:00
parent 8449a65cdf
commit bccd21ac14
4 changed files with 144 additions and 281 deletions

1
internal/files/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
testdata/*.db

View File

@ -1,206 +0,0 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package files_test
import (
"fmt"
"log"
"math/rand"
"os"
"runtime"
"sync"
"testing"
"time"
"github.com/syncthing/syncthing/internal/files"
"github.com/syncthing/syncthing/internal/protocol"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
func TestLongConcurrent(t *testing.T) {
if testing.Short() || runtime.GOMAXPROCS(-1) < 4 {
return
}
os.RemoveAll("/tmp/test.db")
db, err := leveldb.OpenFile("/tmp/test.db", &opt.Options{CachedOpenFiles: 100})
if err != nil {
t.Fatal(err)
}
start := make(chan struct{})
log.Println("preparing")
var wg sync.WaitGroup
for i := 0; i < runtime.GOMAXPROCS(-1); i++ {
i := i
rem0, rem1 := generateFiles()
wg.Add(1)
go func() {
defer wg.Done()
longConcurrentTest(db, fmt.Sprintf("folder%d", i), rem0, rem1, start)
}()
}
log.Println("starting")
close(start)
wg.Wait()
}
func generateFiles() ([]protocol.FileInfo, []protocol.FileInfo) {
var rem0, rem1 fileList
for i := 0; i < 10000; i++ {
n := rand.Int()
rem0 = append(rem0, protocol.FileInfo{
Name: fmt.Sprintf("path/path/path/path/path/path/path%d/path%d/path%d/file%d", n, n, n, n),
Version: uint64(rand.Int63()),
Blocks: genBlocks(rand.Intn(25)),
Flags: uint32(rand.Int31()),
})
}
for i := 0; i < 10000; i++ {
if i%2 == 0 {
// Same file as rem0, randomly newer or older
f := rem0[i]
f.Version = uint64(rand.Int63())
rem1 = append(rem1, f)
} else {
// Different file
n := rand.Int()
f := protocol.FileInfo{
Name: fmt.Sprintf("path/path/path/path/path/path/path%d/path%d/path%d/file%d", n, n, n, n),
Version: uint64(rand.Int63()),
Blocks: genBlocks(rand.Intn(25)),
Flags: uint32(rand.Int31()),
}
rem1 = append(rem1, f)
}
}
return rem0, rem1
}
func longConcurrentTest(db *leveldb.DB, folder string, rem0, rem1 []protocol.FileInfo, start chan struct{}) {
s := files.NewSet(folder, db)
<-start
t0 := time.Now()
cont := func() bool {
return time.Since(t0) < 60*time.Second
}
log.Println(folder, "start")
var wg sync.WaitGroup
// Fast updater
wg.Add(1)
go func() {
defer wg.Done()
for cont() {
log.Println(folder, "u0")
for i := 0; i < 10000; i += 250 {
s.Update(remoteDevice0, rem0[i:i+250])
}
time.Sleep(25 * time.Millisecond)
s.Replace(remoteDevice0, nil)
time.Sleep(25 * time.Millisecond)
}
}()
// Fast updater
wg.Add(1)
go func() {
defer wg.Done()
for cont() {
log.Println(folder, "u1")
for i := 0; i < 10000; i += 250 {
s.Update(remoteDevice1, rem1[i:i+250])
}
time.Sleep(25 * time.Millisecond)
s.Replace(remoteDevice1, nil)
time.Sleep(25 * time.Millisecond)
}
}()
// Fast need list
wg.Add(1)
go func() {
defer wg.Done()
for cont() {
needList(s, protocol.LocalDeviceID)
time.Sleep(25 * time.Millisecond)
}
}()
// Fast global list
wg.Add(1)
go func() {
defer wg.Done()
for cont() {
globalList(s)
time.Sleep(25 * time.Millisecond)
}
}()
// Long running need lists
go func() {
for i := 0; i < 10; i++ {
time.Sleep(25 * time.Millisecond)
wg.Add(1)
go func() {
defer wg.Done()
for cont() {
s.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
time.Sleep(50 * time.Millisecond)
return cont()
})
}
}()
}
}()
// Long running global lists
go func() {
for i := 0; i < 10; i++ {
time.Sleep(25 * time.Millisecond)
wg.Add(1)
go func() {
defer wg.Done()
for cont() {
s.WithGlobal(func(intf protocol.FileIntf) bool {
time.Sleep(50 * time.Millisecond)
return cont()
})
}
}()
}
}()
wg.Wait()
log.Println(folder, "done")
}

View File

@ -2,7 +2,6 @@ package files_test
import ( import (
"crypto/rand" "crypto/rand"
"fmt"
"log" "log"
"os" "os"
"sync" "sync"
@ -10,63 +9,40 @@ import (
"time" "time"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
var items map[string][]byte var keys [][]byte
var keys map[string]string
func init() {
for i := 0; i < nItems; i++ {
keys = append(keys, randomData(1))
}
}
const nItems = 10000 const nItems = 10000
func setupMaps() { func randomData(prefix byte) []byte {
data := make([]byte, 1+32+64+32)
// Set up two simple maps, one "key" => data and one "indirect key" => _, err := rand.Reader.Read(data)
// "key". if err != nil {
panic(err)
items = make(map[string][]byte, nItems)
keys = make(map[string]string, nItems)
for i := 0; i < nItems; i++ {
k1 := fmt.Sprintf("key%d", i)
data := make([]byte, 87)
_, err := rand.Reader.Read(data)
if err != nil {
panic(err)
}
items[k1] = data
k2 := fmt.Sprintf("indirect%d", i)
keys[k2] = k1
} }
} return append([]byte{prefix}, data...)
func makeK1(s string) []byte {
k1 := make([]byte, 1+len(s))
k1[0] = 1
copy(k1[1:], []byte(s))
return k1
}
func makeK2(s string) []byte {
k2 := make([]byte, 1+len(s))
k2[0] = 2 // Only difference from makeK1
copy(k2[1:], []byte(s))
return k2
} }
func setItems(db *leveldb.DB) error { func setItems(db *leveldb.DB) error {
snap, err := db.GetSnapshot() batch := new(leveldb.Batch)
if err != nil { for _, k1 := range keys {
return err k2 := randomData(2)
// k2 -> data
batch.Put(k2, randomData(42))
// k1 -> k2
batch.Put(k1, k2)
} }
defer snap.Release() if testing.Verbose() {
log.Printf("batch write (set) %p", batch)
batch := &leveldb.Batch{}
for k2, k1 := range keys {
// Create k1 => item mapping first
batch.Put(makeK1(k1), items[k1])
// Then the k2 => k1 mapping
batch.Put(makeK2(k2), makeK1(k1))
} }
return db.Write(batch, nil) return db.Write(batch, nil)
} }
@ -78,77 +54,100 @@ func clearItems(db *leveldb.DB) error {
} }
defer snap.Release() defer snap.Release()
// Iterate from the start of k2 space to the end // Iterate over k2
it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil)
it := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
defer it.Release() defer it.Release()
batch := &leveldb.Batch{} batch := new(leveldb.Batch)
for it.Next() { for it.Next() {
k2 := it.Key() k1 := it.Key()
k1 := it.Value() k2 := it.Value()
// k1 should exist // k2 should exist
_, err := snap.Get(k1, nil) _, err := snap.Get(k2, nil)
if err != nil { if err != nil {
return err return err
} }
// Delete the k2 => k1 mapping first // Delete the k1 => k2 mapping first
batch.Delete(k2)
// Then the k1 => key mapping
batch.Delete(k1) batch.Delete(k1)
// Then the k2 => data mapping
batch.Delete(k2)
}
if testing.Verbose() {
log.Printf("batch write (clear) %p", batch)
} }
return db.Write(batch, nil) return db.Write(batch, nil)
} }
func scanItems(db *leveldb.DB) error { func scanItems(db *leveldb.DB) error {
snap, err := db.GetSnapshot() snap, err := db.GetSnapshot()
if testing.Verbose() {
log.Printf("snap create %p", snap)
}
if err != nil { if err != nil {
return err return err
} }
defer snap.Release() defer func() {
if testing.Verbose() {
log.Printf("snap release %p", snap)
}
snap.Release()
}()
// Iterate from the start of k2 space to the end // Iterate from the start of k2 space to the end
it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil) it := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
defer it.Release() defer it.Release()
i := 0
for it.Next() { for it.Next() {
// k2 => k1 => data // k2 => k1 => data
k2 := it.Key() k1 := it.Key()
k1 := it.Value() k2 := it.Value()
_, err := snap.Get(k1, nil) _, err := snap.Get(k2, nil)
if err != nil { if err != nil {
log.Printf("k1: %q (%x)", k1, k1) log.Printf("k1: %x", k1)
log.Printf("k2: %q (%x)", k2, k2) log.Printf("k2: %x (missing)", k2)
return err return err
} }
i++
}
if testing.Verbose() {
log.Println("scanned", i)
} }
return nil return nil
} }
func TestConcurrent(t *testing.T) { func TestConcurrentSetClear(t *testing.T) {
setupMaps() if testing.Short() {
return
}
dur := 2 * time.Second dur := 30 * time.Second
t0 := time.Now() t0 := time.Now()
var wg sync.WaitGroup var wg sync.WaitGroup
os.RemoveAll("testdata/global.db") os.RemoveAll("testdata/concurrent-set-clear.db")
db, err := leveldb.OpenFile("testdata/global.db", nil) db, err := leveldb.OpenFile("testdata/concurrent-set-clear.db", &opt.Options{CachedOpenFiles: 10})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer os.RemoveAll("testdata/global.db") defer os.RemoveAll("testdata/concurrent-set-clear.db")
errChan := make(chan error, 3)
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for time.Since(t0) < dur { for time.Since(t0) < dur {
if err := setItems(db); err != nil { if err := setItems(db); err != nil {
t.Fatal(err) errChan <- err
return
} }
if err := clearItems(db); err != nil { if err := clearItems(db); err != nil {
t.Fatal(err) errChan <- err
return
} }
} }
}() }()
@ -158,11 +157,71 @@ func TestConcurrent(t *testing.T) {
defer wg.Done() defer wg.Done()
for time.Since(t0) < dur { for time.Since(t0) < dur {
if err := scanItems(db); err != nil { if err := scanItems(db); err != nil {
t.Fatal(err) errChan <- err
return
} }
} }
}() }()
wg.Wait() go func() {
wg.Wait()
errChan <- nil
}()
err = <-errChan
if err != nil {
t.Error(err)
}
db.Close() db.Close()
} }
func TestConcurrentSetOnly(t *testing.T) {
if testing.Short() {
return
}
dur := 30 * time.Second
t0 := time.Now()
var wg sync.WaitGroup
os.RemoveAll("testdata/concurrent-set-only.db")
db, err := leveldb.OpenFile("testdata/concurrent-set-only.db", &opt.Options{CachedOpenFiles: 10})
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll("testdata/concurrent-set-only.db")
errChan := make(chan error, 3)
wg.Add(1)
go func() {
defer wg.Done()
for time.Since(t0) < dur {
if err := setItems(db); err != nil {
errChan <- err
return
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for time.Since(t0) < dur {
if err := scanItems(db); err != nil {
errChan <- err
return
}
}
}()
go func() {
wg.Wait()
errChan <- nil
}()
err = <-errChan
if err != nil {
t.Error(err)
}
}

View File

@ -174,6 +174,9 @@ func ldbGenericReplace(db *leveldb.DB, folder, device []byte, fs []protocol.File
limit := deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files limit := deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
if debugDB {
l.Debugf("new batch %p", batch)
}
snap, err := db.GetSnapshot() snap, err := db.GetSnapshot()
if err != nil { if err != nil {
panic(err) panic(err)
@ -335,6 +338,9 @@ func ldbUpdate(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) ui
runtime.GC() runtime.GC()
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
if debugDB {
l.Debugf("new batch %p", batch)
}
snap, err := db.GetSnapshot() snap, err := db.GetSnapshot()
if err != nil { if err != nil {
panic(err) panic(err)
@ -963,7 +969,10 @@ func ldbCheckGlobals(db *leveldb.DB, folder []byte) {
dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil) dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
defer dbi.Release() defer dbi.Release()
batch := &leveldb.Batch{} batch := new(leveldb.Batch)
if debugDB {
l.Debugf("new batch %p", batch)
}
for dbi.Next() { for dbi.Next() {
gk := dbi.Key() gk := dbi.Key()
var vl versionList var vl versionList