154 lines
3.0 KiB
Go
154 lines
3.0 KiB
Go
package ops
|
|
|
|
import (
|
|
"git.kevincotugno.com/kcotugno/tacitus"
|
|
"git.kevincotugno.com/kcotugno/tacitus/gdax"
|
|
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
type byTradeId []tacitus.Trade
|
|
|
|
func (t byTradeId) Len() int { return len(t) }
|
|
func (t byTradeId) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
|
func (t byTradeId) Less(i, j int) bool { return t[i].TradeId > t[j].TradeId }
|
|
|
|
type Validator struct {
|
|
Database tacitus.DatabaseClientService
|
|
Logger tacitus.Logger
|
|
Products []string
|
|
|
|
done chan struct{}
|
|
stopped bool
|
|
dirty bool
|
|
}
|
|
|
|
func (v *Validator) Stop() {
|
|
if v.done != nil && !v.stopped {
|
|
v.done <- struct{}{}
|
|
v.stopped = true
|
|
close(v.done)
|
|
}
|
|
}
|
|
|
|
func (v *Validator) Start(frequency time.Duration) {
|
|
if v.dirty {
|
|
return
|
|
}
|
|
|
|
v.dirty = true
|
|
v.done = make(chan struct{})
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(frequency)
|
|
|
|
var done bool
|
|
|
|
var running bool
|
|
emitDone := make(chan struct{})
|
|
|
|
for !done {
|
|
select {
|
|
case <-v.done:
|
|
v.Logger.Info("validator: STOP")
|
|
|
|
done = true
|
|
ticker.Stop()
|
|
case <-ticker.C:
|
|
if running {
|
|
break
|
|
}
|
|
running = true
|
|
v.emitProducts(emitDone)
|
|
case <-emitDone:
|
|
running = false
|
|
}
|
|
}
|
|
|
|
v.done = nil
|
|
}()
|
|
}
|
|
|
|
func (v *Validator) emitProducts(done chan struct{}) {
|
|
go func() {
|
|
for _, p := range v.Products {
|
|
v.validateProduct(p)
|
|
}
|
|
|
|
select {
|
|
case done <- struct{}{}:
|
|
default:
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
func (v *Validator) validateProduct(product string) {
|
|
v.Logger.Info("validator: %v", product)
|
|
|
|
conf, _ := v.Database.ConfirmationService().Confirmation(product, "t")
|
|
|
|
groups, last_id := v.findMissingGroups(product, conf.LastId)
|
|
v.getMissingTrades(product, groups)
|
|
|
|
conf.Product = product
|
|
conf.Type = "t"
|
|
conf.LastId = last_id
|
|
if conf.Id == 0 {
|
|
v.Database.ConfirmationService().CreateConfirmation(conf)
|
|
} else {
|
|
v.Database.ConfirmationService().UpdateConfirmation(conf)
|
|
}
|
|
|
|
v.Logger.Info("validator: DONE %v", product)
|
|
}
|
|
|
|
func (v *Validator) getMissingTrades(product string, groups [][]int) {
|
|
c := gdax.NewPublicClient()
|
|
|
|
for _, group := range groups {
|
|
total := 1 + group[1] - group[0]
|
|
v.Logger.Info("validator: retrieving %v missing trade(s): %v", total, group)
|
|
|
|
for i := group[1]; i >= group[0]; i-- {
|
|
ts, _ := c.GetTradesBefore(product, i+1)
|
|
sort.Sort(byTradeId(ts))
|
|
|
|
for _, t := range ts {
|
|
if t.TradeId < group[0] && t.TradeId > group[1] {
|
|
i = ts[len(ts)-1].TradeId
|
|
} else {
|
|
v.Database.TradeService().CreateTrade(t)
|
|
i = t.TradeId
|
|
}
|
|
}
|
|
}
|
|
|
|
v.Logger.Info("validator: DONE")
|
|
}
|
|
}
|
|
|
|
func (v *Validator) findMissingGroups(product string, starting int) ([][]int, int) {
|
|
results, err := v.Database.TradeService().TradesAfterAll(product, starting)
|
|
if err != nil {
|
|
v.Logger.Info("Error getting all trades: %v", err)
|
|
}
|
|
|
|
var trade tacitus.Trade
|
|
missing := [][]int{}
|
|
current := starting
|
|
|
|
for results.Next() {
|
|
trade = results.Value().(tacitus.Trade)
|
|
|
|
if trade.TradeId != current+1 && trade.TradeId != current {
|
|
missing = append(missing, []int{current + 1, trade.TradeId - 1})
|
|
}
|
|
|
|
current = trade.TradeId
|
|
}
|
|
|
|
return missing, current
|
|
}
|