mirror of
https://github.com/syncthing/syncthing.git
synced 2024-11-16 10:28:49 -07:00
lib/db: Prevent IndexID creation race (#7211)
This commit is contained in:
parent
78a41828fc
commit
4a787986cd
@ -65,6 +65,7 @@ type Lowlevel struct {
|
||||
gcKeyCount int
|
||||
indirectGCInterval time.Duration
|
||||
recheckInterval time.Duration
|
||||
oneFileSetCreated chan struct{}
|
||||
}
|
||||
|
||||
func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
|
||||
@ -78,6 +79,7 @@ func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
|
||||
gcMut: sync.NewRWMutex(),
|
||||
indirectGCInterval: indirectGCDefaultInterval,
|
||||
recheckInterval: recheckDefaultInterval,
|
||||
oneFileSetCreated: make(chan struct{}),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(db)
|
||||
|
@ -39,13 +39,27 @@ type FileSet struct {
|
||||
type Iterator func(f protocol.FileIntf) bool
|
||||
|
||||
func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet {
|
||||
return &FileSet{
|
||||
select {
|
||||
case <-db.oneFileSetCreated:
|
||||
default:
|
||||
close(db.oneFileSetCreated)
|
||||
}
|
||||
s := &FileSet{
|
||||
folder: folder,
|
||||
fs: fs,
|
||||
db: db,
|
||||
meta: db.loadMetadataTracker(folder),
|
||||
updateMutex: sync.NewMutex(),
|
||||
}
|
||||
if id := s.IndexID(protocol.LocalDeviceID); id == 0 {
|
||||
// No index ID set yet. We create one now.
|
||||
id = protocol.NewIndexID()
|
||||
err := s.db.setIndexID(protocol.LocalDeviceID[:], []byte(s.folder), id)
|
||||
if err != nil && !backend.IsClosed(err) {
|
||||
fatalError(err, fmt.Sprintf("%s Creating new IndexID", s.folder), s.db)
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *FileSet) Drop(device protocol.DeviceID) {
|
||||
@ -357,16 +371,6 @@ func (s *FileSet) IndexID(device protocol.DeviceID) protocol.IndexID {
|
||||
} else if err != nil {
|
||||
fatalError(err, opStr, s.db)
|
||||
}
|
||||
if id == 0 && device == protocol.LocalDeviceID {
|
||||
// No index ID set yet. We create one now.
|
||||
id = protocol.NewIndexID()
|
||||
err := s.db.setIndexID(device[:], []byte(s.folder), id)
|
||||
if backend.IsClosed(err) {
|
||||
return 0
|
||||
} else if err != nil {
|
||||
fatalError(err, opStr, s.db)
|
||||
}
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
@ -433,7 +437,14 @@ func DropFolder(db *Lowlevel, folder string) {
|
||||
|
||||
// DropDeltaIndexIDs removes all delta index IDs from the database.
|
||||
// This will cause a full index transmission on the next connection.
|
||||
// Must be called before using FileSets, i.e. before NewFileSet is called for
|
||||
// the first time.
|
||||
func DropDeltaIndexIDs(db *Lowlevel) {
|
||||
select {
|
||||
case <-db.oneFileSetCreated:
|
||||
panic("DropDeltaIndexIDs must not be called after NewFileSet for the same Lowlevel")
|
||||
default:
|
||||
}
|
||||
opStr := "DropDeltaIndexIDs"
|
||||
l.Debugf(opStr)
|
||||
dbi, err := db.NewPrefixIterator([]byte{KeyTypeIndexID})
|
||||
|
@ -1757,6 +1757,32 @@ func TestNoIndexIDResetOnDrop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentIndexID(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
var ids [2]protocol.IndexID
|
||||
setID := func(s *db.FileSet, i int) {
|
||||
ids[i] = s.IndexID(protocol.LocalDeviceID)
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
max := 100
|
||||
if testing.Short() {
|
||||
max = 10
|
||||
}
|
||||
for i := 0; i < max; i++ {
|
||||
ldb := db.NewLowlevel(backend.OpenMemory())
|
||||
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeFake, ""), ldb)
|
||||
go setID(s, 0)
|
||||
go setID(s, 1)
|
||||
<-done
|
||||
<-done
|
||||
ldb.Close()
|
||||
if ids[0] != ids[1] {
|
||||
t.Fatalf("IDs differ after %v rounds", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func replace(fs *db.FileSet, device protocol.DeviceID, files []protocol.FileInfo) {
|
||||
fs.Drop(device)
|
||||
fs.Update(device, files)
|
||||
|
Loading…
Reference in New Issue
Block a user