// Copyright (C) 2018 The Syncthing Authors. // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this file, // You can obtain one at https://mozilla.org/MPL/2.0/. package db import ( "os" "strconv" "strings" "sync" "sync/atomic" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/util" ) const ( dbMaxOpenFiles = 100 dbWriteBuffer = 16 << 20 ) var ( dbFlushBatch = debugEnvValue("WriteBuffer", dbWriteBuffer) / 4 // Some leeway for any leveldb in-memory optimizations ) // Lowlevel is the lowest level database interface. It has a very simple // purpose: hold the actual *leveldb.DB database, and the in-memory state // that belong to that database. In the same way that a single on disk // database can only be opened once, there should be only one Lowlevel for // any given *leveldb.DB. type Lowlevel struct { committed int64 // atomic, must come first *leveldb.DB location string folderIdx *smallIndex deviceIdx *smallIndex closed bool closeMut *sync.RWMutex iterWG sync.WaitGroup } // Open attempts to open the database at the given location, and runs // recovery on it if opening fails. Worst case, if recovery is not possible, // the database is erased and created from scratch. func Open(location string) (*Lowlevel, error) { opts := &opt.Options{ BlockCacheCapacity: debugEnvValue("BlockCacheCapacity", 0), BlockCacheEvictRemoved: debugEnvValue("BlockCacheEvictRemoved", 0) != 0, BlockRestartInterval: debugEnvValue("BlockRestartInterval", 0), BlockSize: debugEnvValue("BlockSize", 0), CompactionExpandLimitFactor: debugEnvValue("CompactionExpandLimitFactor", 0), CompactionGPOverlapsFactor: debugEnvValue("CompactionGPOverlapsFactor", 0), CompactionL0Trigger: debugEnvValue("CompactionL0Trigger", 0), CompactionSourceLimitFactor: debugEnvValue("CompactionSourceLimitFactor", 0), CompactionTableSize: debugEnvValue("CompactionTableSize", 0), CompactionTableSizeMultiplier: float64(debugEnvValue("CompactionTableSizeMultiplier", 0)) / 10.0, CompactionTotalSize: debugEnvValue("CompactionTotalSize", 0), CompactionTotalSizeMultiplier: float64(debugEnvValue("CompactionTotalSizeMultiplier", 0)) / 10.0, DisableBufferPool: debugEnvValue("DisableBufferPool", 0) != 0, DisableBlockCache: debugEnvValue("DisableBlockCache", 0) != 0, DisableCompactionBackoff: debugEnvValue("DisableCompactionBackoff", 0) != 0, DisableLargeBatchTransaction: debugEnvValue("DisableLargeBatchTransaction", 0) != 0, NoSync: debugEnvValue("NoSync", 0) != 0, NoWriteMerge: debugEnvValue("NoWriteMerge", 0) != 0, OpenFilesCacheCapacity: debugEnvValue("OpenFilesCacheCapacity", dbMaxOpenFiles), WriteBuffer: debugEnvValue("WriteBuffer", dbWriteBuffer), // The write slowdown and pause can be overridden, but even if they // are not and the compaction trigger is overridden we need to // adjust so that we don't pause writes for L0 compaction before we // even *start* L0 compaction... WriteL0SlowdownTrigger: debugEnvValue("WriteL0SlowdownTrigger", 2*debugEnvValue("CompactionL0Trigger", opt.DefaultCompactionL0Trigger)), WriteL0PauseTrigger: debugEnvValue("WriteL0SlowdownTrigger", 3*debugEnvValue("CompactionL0Trigger", opt.DefaultCompactionL0Trigger)), } return open(location, opts) } // OpenRO attempts to open the database at the given location, read only. func OpenRO(location string) (*Lowlevel, error) { opts := &opt.Options{ OpenFilesCacheCapacity: dbMaxOpenFiles, ReadOnly: true, } return open(location, opts) } func open(location string, opts *opt.Options) (*Lowlevel, error) { db, err := leveldb.OpenFile(location, opts) if leveldbIsCorrupted(err) { db, err = leveldb.RecoverFile(location, opts) } if leveldbIsCorrupted(err) { // The database is corrupted, and we've tried to recover it but it // didn't work. At this point there isn't much to do beyond dropping // the database and reindexing... l.Infoln("Database corruption detected, unable to recover. Reinitializing...") if err := os.RemoveAll(location); err != nil { return nil, errorSuggestion{err, "failed to delete corrupted database"} } db, err = leveldb.OpenFile(location, opts) } if err != nil { return nil, errorSuggestion{err, "is another instance of Syncthing running?"} } if debugEnvValue("CompactEverything", 0) != 0 { if err := db.CompactRange(util.Range{}); err != nil { l.Warnln("Compacting database:", err) } } return NewLowlevel(db, location), nil } // OpenMemory returns a new Lowlevel referencing an in-memory database. func OpenMemory() *Lowlevel { db, _ := leveldb.Open(storage.NewMemStorage(), nil) return NewLowlevel(db, "") } // ListFolders returns the list of folders currently in the database func (db *Lowlevel) ListFolders() []string { return db.folderIdx.Values() } // Committed returns the number of items committed to the database since startup func (db *Lowlevel) Committed() int64 { return atomic.LoadInt64(&db.committed) } func (db *Lowlevel) Put(key, val []byte, wo *opt.WriteOptions) error { db.closeMut.RLock() defer db.closeMut.RUnlock() if db.closed { return leveldb.ErrClosed } atomic.AddInt64(&db.committed, 1) return db.DB.Put(key, val, wo) } func (db *Lowlevel) Write(batch *leveldb.Batch, wo *opt.WriteOptions) error { db.closeMut.RLock() defer db.closeMut.RUnlock() if db.closed { return leveldb.ErrClosed } return db.DB.Write(batch, wo) } func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error { db.closeMut.RLock() defer db.closeMut.RUnlock() if db.closed { return leveldb.ErrClosed } atomic.AddInt64(&db.committed, 1) return db.DB.Delete(key, wo) } func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { return db.newIterator(func() iterator.Iterator { return db.DB.NewIterator(slice, ro) }) } // newIterator returns an iterator created with the given constructor only if db // is not yet closed. If it is closed, a closedIter is returned instead. func (db *Lowlevel) newIterator(constr func() iterator.Iterator) iterator.Iterator { db.closeMut.RLock() defer db.closeMut.RUnlock() if db.closed { return &closedIter{} } db.iterWG.Add(1) return &iter{ Iterator: constr(), db: db, } } func (db *Lowlevel) GetSnapshot() snapshot { s, err := db.DB.GetSnapshot() if err != nil { if err == leveldb.ErrClosed { return &closedSnap{} } panic(err) } return &snap{ Snapshot: s, db: db, } } func (db *Lowlevel) Close() { db.closeMut.Lock() if db.closed { db.closeMut.Unlock() return } db.closed = true db.closeMut.Unlock() db.iterWG.Wait() db.DB.Close() } // NewLowlevel wraps the given *leveldb.DB into a *lowlevel func NewLowlevel(db *leveldb.DB, location string) *Lowlevel { return &Lowlevel{ DB: db, location: location, folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}), deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}), closeMut: &sync.RWMutex{}, iterWG: sync.WaitGroup{}, } } // A "better" version of leveldb's errors.IsCorrupted. func leveldbIsCorrupted(err error) bool { switch { case err == nil: return false case errors.IsCorrupted(err): return true case strings.Contains(err.Error(), "corrupted"): return true } return false } type batch struct { *leveldb.Batch db *Lowlevel } func (db *Lowlevel) newBatch() *batch { return &batch{ Batch: new(leveldb.Batch), db: db, } } // checkFlush flushes and resets the batch if its size exceeds dbFlushBatch. func (b *batch) checkFlush() { if len(b.Dump()) > dbFlushBatch { b.flush() b.Reset() } } func (b *batch) flush() { if err := b.db.Write(b.Batch, nil); err != nil && err != leveldb.ErrClosed { panic(err) } } type closedIter struct{} func (it *closedIter) Release() {} func (it *closedIter) Key() []byte { return nil } func (it *closedIter) Value() []byte { return nil } func (it *closedIter) Next() bool { return false } func (it *closedIter) Prev() bool { return false } func (it *closedIter) First() bool { return false } func (it *closedIter) Last() bool { return false } func (it *closedIter) Seek(key []byte) bool { return false } func (it *closedIter) Valid() bool { return false } func (it *closedIter) Error() error { return leveldb.ErrClosed } func (it *closedIter) SetReleaser(releaser util.Releaser) {} type snapshot interface { Get([]byte, *opt.ReadOptions) ([]byte, error) Has([]byte, *opt.ReadOptions) (bool, error) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator Release() } type closedSnap struct{} func (s *closedSnap) Get([]byte, *opt.ReadOptions) ([]byte, error) { return nil, leveldb.ErrClosed } func (s *closedSnap) Has([]byte, *opt.ReadOptions) (bool, error) { return false, leveldb.ErrClosed } func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator { return &closedIter{} } func (s *closedSnap) Release() {} type snap struct { *leveldb.Snapshot db *Lowlevel } func (s *snap) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { return s.db.newIterator(func() iterator.Iterator { return s.Snapshot.NewIterator(slice, ro) }) } // iter implements iterator.Iterator which allows tracking active iterators // and aborts if the underlying database is being closed. type iter struct { iterator.Iterator db *Lowlevel } func (it *iter) Release() { it.db.iterWG.Done() it.Iterator.Release() } func (it *iter) Next() bool { return it.execIfNotClosed(it.Iterator.Next) } func (it *iter) Prev() bool { return it.execIfNotClosed(it.Iterator.Prev) } func (it *iter) First() bool { return it.execIfNotClosed(it.Iterator.First) } func (it *iter) Last() bool { return it.execIfNotClosed(it.Iterator.Last) } func (it *iter) Seek(key []byte) bool { return it.execIfNotClosed(func() bool { return it.Iterator.Seek(key) }) } func (it *iter) execIfNotClosed(fn func() bool) bool { it.db.closeMut.RLock() defer it.db.closeMut.RUnlock() if it.db.closed { return false } return fn() } func debugEnvValue(key string, def int) int { v, err := strconv.ParseInt(os.Getenv("STDEBUG_"+key), 10, 63) if err != nil { return def } return int(v) }