mirror of
https://github.com/syncthing/syncthing.git
synced 2024-11-17 19:08:46 -07:00
65aaa607ab
Change made by: - running "gvt fetch" on each of the packages mentioned in Godeps/Godeps.json - `rm -rf Godeps` - tweaking the build scripts to not mention Godeps - tweaking the build scripts to test `./lib/...`, `./cmd/...` explicitly (to avoid testing vendor) - tweaking the build scripts to not juggle GOPATH for Godeps and instead set GO15VENDOREXPERIMENT. This also results in some updated packages at the same time I bet. Building with Go 1.3 and 1.4 still *works* but won't use our vendored dependencies - the user needs to have the actual packages in their GOPATH then, which they'll get with a normal "go get". Building with Go 1.6+ will get our vendored dependencies by default even when not using our build script, which is nice. By doing this we gain some freedom in that we can pick and choose manually what to include in vendor, as it's not based on just dependency analysis of our own code. This is also a risk as we might pick up dependencies we are unaware of, as the build may work locally with those packages present in GOPATH. On the other hand the build server will detect this as it has no packages in it's GOPATH beyond what is included in the repo. Recommended tool to manage dependencies is github.com/FiloSottile/gvt.
827 lines
17 KiB
Go
827 lines
17 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
|
)
|
|
|
|
var (
|
|
errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
|
|
)
|
|
|
|
type cStat struct {
|
|
duration time.Duration
|
|
read int64
|
|
write int64
|
|
}
|
|
|
|
func (p *cStat) add(n *cStatStaging) {
|
|
p.duration += n.duration
|
|
p.read += n.read
|
|
p.write += n.write
|
|
}
|
|
|
|
func (p *cStat) get() (duration time.Duration, read, write int64) {
|
|
return p.duration, p.read, p.write
|
|
}
|
|
|
|
type cStatStaging struct {
|
|
start time.Time
|
|
duration time.Duration
|
|
on bool
|
|
read int64
|
|
write int64
|
|
}
|
|
|
|
func (p *cStatStaging) startTimer() {
|
|
if !p.on {
|
|
p.start = time.Now()
|
|
p.on = true
|
|
}
|
|
}
|
|
|
|
func (p *cStatStaging) stopTimer() {
|
|
if p.on {
|
|
p.duration += time.Since(p.start)
|
|
p.on = false
|
|
}
|
|
}
|
|
|
|
type cStats struct {
|
|
lk sync.Mutex
|
|
stats []cStat
|
|
}
|
|
|
|
func (p *cStats) addStat(level int, n *cStatStaging) {
|
|
p.lk.Lock()
|
|
if level >= len(p.stats) {
|
|
newStats := make([]cStat, level+1)
|
|
copy(newStats, p.stats)
|
|
p.stats = newStats
|
|
}
|
|
p.stats[level].add(n)
|
|
p.lk.Unlock()
|
|
}
|
|
|
|
func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
|
|
p.lk.Lock()
|
|
defer p.lk.Unlock()
|
|
if level < len(p.stats) {
|
|
return p.stats[level].get()
|
|
}
|
|
return
|
|
}
|
|
|
|
func (db *DB) compactionError() {
|
|
var err error
|
|
noerr:
|
|
// No error.
|
|
for {
|
|
select {
|
|
case err = <-db.compErrSetC:
|
|
switch {
|
|
case err == nil:
|
|
case err == ErrReadOnly, errors.IsCorrupted(err):
|
|
goto hasperr
|
|
default:
|
|
goto haserr
|
|
}
|
|
case _, _ = <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
haserr:
|
|
// Transient error.
|
|
for {
|
|
select {
|
|
case db.compErrC <- err:
|
|
case err = <-db.compErrSetC:
|
|
switch {
|
|
case err == nil:
|
|
goto noerr
|
|
case err == ErrReadOnly, errors.IsCorrupted(err):
|
|
goto hasperr
|
|
default:
|
|
}
|
|
case _, _ = <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
hasperr:
|
|
// Persistent error.
|
|
for {
|
|
select {
|
|
case db.compErrC <- err:
|
|
case db.compPerErrC <- err:
|
|
case db.writeLockC <- struct{}{}:
|
|
// Hold write lock, so that write won't pass-through.
|
|
db.compWriteLocking = true
|
|
case _, _ = <-db.closeC:
|
|
if db.compWriteLocking {
|
|
// We should release the lock or Close will hang.
|
|
<-db.writeLockC
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type compactionTransactCounter int
|
|
|
|
func (cnt *compactionTransactCounter) incr() {
|
|
*cnt++
|
|
}
|
|
|
|
type compactionTransactInterface interface {
|
|
run(cnt *compactionTransactCounter) error
|
|
revert() error
|
|
}
|
|
|
|
func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
if x == errCompactionTransactExiting {
|
|
if err := t.revert(); err != nil {
|
|
db.logf("%s revert error %q", name, err)
|
|
}
|
|
}
|
|
panic(x)
|
|
}
|
|
}()
|
|
|
|
const (
|
|
backoffMin = 1 * time.Second
|
|
backoffMax = 8 * time.Second
|
|
backoffMul = 2 * time.Second
|
|
)
|
|
var (
|
|
backoff = backoffMin
|
|
backoffT = time.NewTimer(backoff)
|
|
lastCnt = compactionTransactCounter(0)
|
|
|
|
disableBackoff = db.s.o.GetDisableCompactionBackoff()
|
|
)
|
|
for n := 0; ; n++ {
|
|
// Check wether the DB is closed.
|
|
if db.isClosed() {
|
|
db.logf("%s exiting", name)
|
|
db.compactionExitTransact()
|
|
} else if n > 0 {
|
|
db.logf("%s retrying N·%d", name, n)
|
|
}
|
|
|
|
// Execute.
|
|
cnt := compactionTransactCounter(0)
|
|
err := t.run(&cnt)
|
|
if err != nil {
|
|
db.logf("%s error I·%d %q", name, cnt, err)
|
|
}
|
|
|
|
// Set compaction error status.
|
|
select {
|
|
case db.compErrSetC <- err:
|
|
case perr := <-db.compPerErrC:
|
|
if err != nil {
|
|
db.logf("%s exiting (persistent error %q)", name, perr)
|
|
db.compactionExitTransact()
|
|
}
|
|
case _, _ = <-db.closeC:
|
|
db.logf("%s exiting", name)
|
|
db.compactionExitTransact()
|
|
}
|
|
if err == nil {
|
|
return
|
|
}
|
|
if errors.IsCorrupted(err) {
|
|
db.logf("%s exiting (corruption detected)", name)
|
|
db.compactionExitTransact()
|
|
}
|
|
|
|
if !disableBackoff {
|
|
// Reset backoff duration if counter is advancing.
|
|
if cnt > lastCnt {
|
|
backoff = backoffMin
|
|
lastCnt = cnt
|
|
}
|
|
|
|
// Backoff.
|
|
backoffT.Reset(backoff)
|
|
if backoff < backoffMax {
|
|
backoff *= backoffMul
|
|
if backoff > backoffMax {
|
|
backoff = backoffMax
|
|
}
|
|
}
|
|
select {
|
|
case <-backoffT.C:
|
|
case _, _ = <-db.closeC:
|
|
db.logf("%s exiting", name)
|
|
db.compactionExitTransact()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type compactionTransactFunc struct {
|
|
runFunc func(cnt *compactionTransactCounter) error
|
|
revertFunc func() error
|
|
}
|
|
|
|
func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
|
|
return t.runFunc(cnt)
|
|
}
|
|
|
|
func (t *compactionTransactFunc) revert() error {
|
|
if t.revertFunc != nil {
|
|
return t.revertFunc()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
|
|
db.compactionTransact(name, &compactionTransactFunc{run, revert})
|
|
}
|
|
|
|
func (db *DB) compactionExitTransact() {
|
|
panic(errCompactionTransactExiting)
|
|
}
|
|
|
|
func (db *DB) compactionCommit(name string, rec *sessionRecord) {
|
|
db.compCommitLk.Lock()
|
|
defer db.compCommitLk.Unlock() // Defer is necessary.
|
|
db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
|
|
return db.s.commit(rec)
|
|
}, nil)
|
|
}
|
|
|
|
func (db *DB) memCompaction() {
|
|
mdb := db.getFrozenMem()
|
|
if mdb == nil {
|
|
return
|
|
}
|
|
defer mdb.decref()
|
|
|
|
db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
|
|
|
|
// Don't compact empty memdb.
|
|
if mdb.Len() == 0 {
|
|
db.logf("memdb@flush skipping")
|
|
// drop frozen memdb
|
|
db.dropFrozenMem()
|
|
return
|
|
}
|
|
|
|
// Pause table compaction.
|
|
resumeC := make(chan struct{})
|
|
select {
|
|
case db.tcompPauseC <- (chan<- struct{})(resumeC):
|
|
case <-db.compPerErrC:
|
|
close(resumeC)
|
|
resumeC = nil
|
|
case _, _ = <-db.closeC:
|
|
return
|
|
}
|
|
|
|
var (
|
|
rec = &sessionRecord{}
|
|
stats = &cStatStaging{}
|
|
flushLevel int
|
|
)
|
|
|
|
// Generate tables.
|
|
db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
|
|
stats.startTimer()
|
|
flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
|
|
stats.stopTimer()
|
|
return
|
|
}, func() error {
|
|
for _, r := range rec.addedTables {
|
|
db.logf("memdb@flush revert @%d", r.num)
|
|
if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
rec.setJournalNum(db.journalFd.Num)
|
|
rec.setSeqNum(db.frozenSeq)
|
|
|
|
// Commit.
|
|
stats.startTimer()
|
|
db.compactionCommit("memdb", rec)
|
|
stats.stopTimer()
|
|
|
|
db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
|
|
|
|
for _, r := range rec.addedTables {
|
|
stats.write += r.size
|
|
}
|
|
db.compStats.addStat(flushLevel, stats)
|
|
|
|
// Drop frozen memdb.
|
|
db.dropFrozenMem()
|
|
|
|
// Resume table compaction.
|
|
if resumeC != nil {
|
|
select {
|
|
case <-resumeC:
|
|
close(resumeC)
|
|
case _, _ = <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
|
|
// Trigger table compaction.
|
|
db.compTrigger(db.tcompCmdC)
|
|
}
|
|
|
|
type tableCompactionBuilder struct {
|
|
db *DB
|
|
s *session
|
|
c *compaction
|
|
rec *sessionRecord
|
|
stat0, stat1 *cStatStaging
|
|
|
|
snapHasLastUkey bool
|
|
snapLastUkey []byte
|
|
snapLastSeq uint64
|
|
snapIter int
|
|
snapKerrCnt int
|
|
snapDropCnt int
|
|
|
|
kerrCnt int
|
|
dropCnt int
|
|
|
|
minSeq uint64
|
|
strict bool
|
|
tableSize int
|
|
|
|
tw *tWriter
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
|
|
// Create new table if not already.
|
|
if b.tw == nil {
|
|
// Check for pause event.
|
|
if b.db != nil {
|
|
select {
|
|
case ch := <-b.db.tcompPauseC:
|
|
b.db.pauseCompaction(ch)
|
|
case _, _ = <-b.db.closeC:
|
|
b.db.compactionExitTransact()
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Create new table.
|
|
var err error
|
|
b.tw, err = b.s.tops.create()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Write key/value into table.
|
|
return b.tw.append(key, value)
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) needFlush() bool {
|
|
return b.tw.tw.BytesLen() >= b.tableSize
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) flush() error {
|
|
t, err := b.tw.finish()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.rec.addTableFile(b.c.sourceLevel+1, t)
|
|
b.stat1.write += t.size
|
|
b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
|
|
b.tw = nil
|
|
return nil
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) cleanup() {
|
|
if b.tw != nil {
|
|
b.tw.drop()
|
|
b.tw = nil
|
|
}
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
|
|
snapResumed := b.snapIter > 0
|
|
hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
|
|
lastUkey := append([]byte{}, b.snapLastUkey...)
|
|
lastSeq := b.snapLastSeq
|
|
b.kerrCnt = b.snapKerrCnt
|
|
b.dropCnt = b.snapDropCnt
|
|
// Restore compaction state.
|
|
b.c.restore()
|
|
|
|
defer b.cleanup()
|
|
|
|
b.stat1.startTimer()
|
|
defer b.stat1.stopTimer()
|
|
|
|
iter := b.c.newIterator()
|
|
defer iter.Release()
|
|
for i := 0; iter.Next(); i++ {
|
|
// Incr transact counter.
|
|
cnt.incr()
|
|
|
|
// Skip until last state.
|
|
if i < b.snapIter {
|
|
continue
|
|
}
|
|
|
|
resumed := false
|
|
if snapResumed {
|
|
resumed = true
|
|
snapResumed = false
|
|
}
|
|
|
|
ikey := iter.Key()
|
|
ukey, seq, kt, kerr := parseInternalKey(ikey)
|
|
|
|
if kerr == nil {
|
|
shouldStop := !resumed && b.c.shouldStopBefore(ikey)
|
|
|
|
if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
|
|
// First occurrence of this user key.
|
|
|
|
// Only rotate tables if ukey doesn't hop across.
|
|
if b.tw != nil && (shouldStop || b.needFlush()) {
|
|
if err := b.flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Creates snapshot of the state.
|
|
b.c.save()
|
|
b.snapHasLastUkey = hasLastUkey
|
|
b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
|
|
b.snapLastSeq = lastSeq
|
|
b.snapIter = i
|
|
b.snapKerrCnt = b.kerrCnt
|
|
b.snapDropCnt = b.dropCnt
|
|
}
|
|
|
|
hasLastUkey = true
|
|
lastUkey = append(lastUkey[:0], ukey...)
|
|
lastSeq = keyMaxSeq
|
|
}
|
|
|
|
switch {
|
|
case lastSeq <= b.minSeq:
|
|
// Dropped because newer entry for same user key exist
|
|
fallthrough // (A)
|
|
case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
|
|
// For this user key:
|
|
// (1) there is no data in higher levels
|
|
// (2) data in lower levels will have larger seq numbers
|
|
// (3) data in layers that are being compacted here and have
|
|
// smaller seq numbers will be dropped in the next
|
|
// few iterations of this loop (by rule (A) above).
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
lastSeq = seq
|
|
b.dropCnt++
|
|
continue
|
|
default:
|
|
lastSeq = seq
|
|
}
|
|
} else {
|
|
if b.strict {
|
|
return kerr
|
|
}
|
|
|
|
// Don't drop corrupted keys.
|
|
hasLastUkey = false
|
|
lastUkey = lastUkey[:0]
|
|
lastSeq = keyMaxSeq
|
|
b.kerrCnt++
|
|
}
|
|
|
|
if err := b.appendKV(ikey, iter.Value()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := iter.Error(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Finish last table.
|
|
if b.tw != nil && !b.tw.empty() {
|
|
return b.flush()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) revert() error {
|
|
for _, at := range b.rec.addedTables {
|
|
b.s.logf("table@build revert @%d", at.num)
|
|
if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
|
|
defer c.release()
|
|
|
|
rec := &sessionRecord{}
|
|
rec.addCompPtr(c.sourceLevel, c.imax)
|
|
|
|
if !noTrivial && c.trivial() {
|
|
t := c.levels[0][0]
|
|
db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
|
|
rec.delTable(c.sourceLevel, t.fd.Num)
|
|
rec.addTableFile(c.sourceLevel+1, t)
|
|
db.compactionCommit("table-move", rec)
|
|
return
|
|
}
|
|
|
|
var stats [2]cStatStaging
|
|
for i, tables := range c.levels {
|
|
for _, t := range tables {
|
|
stats[i].read += t.size
|
|
// Insert deleted tables into record
|
|
rec.delTable(c.sourceLevel+i, t.fd.Num)
|
|
}
|
|
}
|
|
sourceSize := int(stats[0].read + stats[1].read)
|
|
minSeq := db.minSeq()
|
|
db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
|
|
|
|
b := &tableCompactionBuilder{
|
|
db: db,
|
|
s: db.s,
|
|
c: c,
|
|
rec: rec,
|
|
stat1: &stats[1],
|
|
minSeq: minSeq,
|
|
strict: db.s.o.GetStrict(opt.StrictCompaction),
|
|
tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
|
|
}
|
|
db.compactionTransact("table@build", b)
|
|
|
|
// Commit.
|
|
stats[1].startTimer()
|
|
db.compactionCommit("table", rec)
|
|
stats[1].stopTimer()
|
|
|
|
resultSize := int(stats[1].write)
|
|
db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
|
|
|
|
// Save compaction stats
|
|
for i := range stats {
|
|
db.compStats.addStat(c.sourceLevel+1, &stats[i])
|
|
}
|
|
}
|
|
|
|
func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
|
|
db.logf("table@compaction range L%d %q:%q", level, umin, umax)
|
|
if level >= 0 {
|
|
if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
|
|
db.tableCompaction(c, true)
|
|
}
|
|
} else {
|
|
// Retry until nothing to compact.
|
|
for {
|
|
compacted := false
|
|
|
|
// Scan for maximum level with overlapped tables.
|
|
v := db.s.version()
|
|
m := 1
|
|
for i := m; i < len(v.levels); i++ {
|
|
tables := v.levels[i]
|
|
if tables.overlaps(db.s.icmp, umin, umax, false) {
|
|
m = i
|
|
}
|
|
}
|
|
v.release()
|
|
|
|
for level := 0; level < m; level++ {
|
|
if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
|
|
db.tableCompaction(c, true)
|
|
compacted = true
|
|
}
|
|
}
|
|
|
|
if !compacted {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) tableAutoCompaction() {
|
|
if c := db.s.pickCompaction(); c != nil {
|
|
db.tableCompaction(c, false)
|
|
}
|
|
}
|
|
|
|
func (db *DB) tableNeedCompaction() bool {
|
|
v := db.s.version()
|
|
defer v.release()
|
|
return v.needCompaction()
|
|
}
|
|
|
|
func (db *DB) pauseCompaction(ch chan<- struct{}) {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
case _, _ = <-db.closeC:
|
|
db.compactionExitTransact()
|
|
}
|
|
}
|
|
|
|
type cCmd interface {
|
|
ack(err error)
|
|
}
|
|
|
|
type cAuto struct {
|
|
ackC chan<- error
|
|
}
|
|
|
|
func (r cAuto) ack(err error) {
|
|
if r.ackC != nil {
|
|
defer func() {
|
|
recover()
|
|
}()
|
|
r.ackC <- err
|
|
}
|
|
}
|
|
|
|
type cRange struct {
|
|
level int
|
|
min, max []byte
|
|
ackC chan<- error
|
|
}
|
|
|
|
func (r cRange) ack(err error) {
|
|
if r.ackC != nil {
|
|
defer func() {
|
|
recover()
|
|
}()
|
|
r.ackC <- err
|
|
}
|
|
}
|
|
|
|
// This will trigger auto compaction but will not wait for it.
|
|
func (db *DB) compTrigger(compC chan<- cCmd) {
|
|
select {
|
|
case compC <- cAuto{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// This will trigger auto compation and/or wait for all compaction to be done.
|
|
func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
|
|
ch := make(chan error)
|
|
defer close(ch)
|
|
// Send cmd.
|
|
select {
|
|
case compC <- cAuto{ch}:
|
|
case err = <-db.compErrC:
|
|
return
|
|
case _, _ = <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
// Wait cmd.
|
|
select {
|
|
case err = <-ch:
|
|
case err = <-db.compErrC:
|
|
case _, _ = <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Send range compaction request.
|
|
func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
|
|
ch := make(chan error)
|
|
defer close(ch)
|
|
// Send cmd.
|
|
select {
|
|
case compC <- cRange{level, min, max, ch}:
|
|
case err := <-db.compErrC:
|
|
return err
|
|
case _, _ = <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
// Wait cmd.
|
|
select {
|
|
case err = <-ch:
|
|
case err = <-db.compErrC:
|
|
case _, _ = <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (db *DB) mCompaction() {
|
|
var x cCmd
|
|
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
if x != errCompactionTransactExiting {
|
|
panic(x)
|
|
}
|
|
}
|
|
if x != nil {
|
|
x.ack(ErrClosed)
|
|
}
|
|
db.closeW.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case x = <-db.mcompCmdC:
|
|
switch x.(type) {
|
|
case cAuto:
|
|
db.memCompaction()
|
|
x.ack(nil)
|
|
x = nil
|
|
default:
|
|
panic("leveldb: unknown command")
|
|
}
|
|
case _, _ = <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (db *DB) tCompaction() {
|
|
var x cCmd
|
|
var ackQ []cCmd
|
|
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
if x != errCompactionTransactExiting {
|
|
panic(x)
|
|
}
|
|
}
|
|
for i := range ackQ {
|
|
ackQ[i].ack(ErrClosed)
|
|
ackQ[i] = nil
|
|
}
|
|
if x != nil {
|
|
x.ack(ErrClosed)
|
|
}
|
|
db.closeW.Done()
|
|
}()
|
|
|
|
for {
|
|
if db.tableNeedCompaction() {
|
|
select {
|
|
case x = <-db.tcompCmdC:
|
|
case ch := <-db.tcompPauseC:
|
|
db.pauseCompaction(ch)
|
|
continue
|
|
case _, _ = <-db.closeC:
|
|
return
|
|
default:
|
|
}
|
|
} else {
|
|
for i := range ackQ {
|
|
ackQ[i].ack(nil)
|
|
ackQ[i] = nil
|
|
}
|
|
ackQ = ackQ[:0]
|
|
select {
|
|
case x = <-db.tcompCmdC:
|
|
case ch := <-db.tcompPauseC:
|
|
db.pauseCompaction(ch)
|
|
continue
|
|
case _, _ = <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
if x != nil {
|
|
switch cmd := x.(type) {
|
|
case cAuto:
|
|
ackQ = append(ackQ, x)
|
|
case cRange:
|
|
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
|
|
default:
|
|
panic("leveldb: unknown command")
|
|
}
|
|
x = nil
|
|
}
|
|
db.tableAutoCompaction()
|
|
}
|
|
}
|