This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/ops/aggregation_validator.go

183 lines
4.0 KiB
Go
Raw Normal View History

2017-10-10 18:32:26 -07:00
package ops
import (
"github.com/kcotugno/tacitus"
2017-10-13 09:34:35 -07:00
"sort"
2017-10-10 18:32:26 -07:00
"time"
)
2017-10-13 09:34:35 -07:00
type timeSlice []time.Time
func (t timeSlice) Len() int { return len(t) }
func (t timeSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t timeSlice) Less(i, j int) bool { return t[i].Before(t[j]) }
2017-10-10 18:32:26 -07:00
type AggregationValidator struct {
2017-10-13 09:34:35 -07:00
Database tacitus.DatabaseClientService
Logger tacitus.Logger
Products []string
Intervals []time.Duration
2017-10-10 18:32:26 -07:00
ticker *time.Ticker
done chan bool
2017-10-13 09:34:35 -07:00
running bool
2017-10-10 18:32:26 -07:00
}
func (a *AggregationValidator) Start(frequency time.Duration) {
a.Stop()
go func() {
var done bool
a.ticker = time.NewTicker(frequency)
2017-10-13 09:34:35 -07:00
ticker := a.ticker
2017-10-10 18:32:26 -07:00
a.done = make(chan bool)
2017-10-13 09:34:35 -07:00
stop := a.done
2017-10-10 18:32:26 -07:00
2017-10-17 07:12:24 -07:00
zero := time.NewTicker(100 * frequency)
2017-10-10 18:32:26 -07:00
for !done {
select {
2017-10-13 09:34:35 -07:00
case <-stop:
ticker.Stop()
2017-10-17 07:12:24 -07:00
zero.Stop()
2017-10-10 18:32:26 -07:00
done = true
2017-10-17 07:12:24 -07:00
case <-zero.C:
a.emitZero()
2017-10-13 09:34:35 -07:00
case <-ticker.C:
2017-10-10 18:32:26 -07:00
a.emitProducts()
}
}
}()
}
func (a *AggregationValidator) Stop() {
if a.done != nil {
a.done <- true
close(a.done)
}
}
func (a *AggregationValidator) emitProducts() {
for _, p := range a.Products {
2017-10-13 09:34:35 -07:00
for _, i := range a.Intervals {
go a.validate(p, i)
}
2017-10-10 18:32:26 -07:00
}
}
2017-10-13 09:34:35 -07:00
func (a *AggregationValidator) validate(product string, interval time.Duration) {
var agg Aggregator
agg.Database = a.Database
agg.Logger = a.Logger
a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, interval)
2017-10-10 18:32:26 -07:00
conf, _ := a.Database.ConfirmationService().Confirmation(product, "a")
2017-10-13 09:34:35 -07:00
times, last_id, err := a.getTimes(product, interval, conf.LastId)
if err != nil {
a.Logger.Info(`aggregation validator: FAIL product="%v interval="%v" error="%v"`,
product, interval, err)
return
}
a.Logger.Info("aggregation validator: %v missing: %v", product, len(times))
2017-10-10 18:32:26 -07:00
for _, t := range times {
2017-10-13 09:34:35 -07:00
agg.aggregate(product, t, interval)
2017-10-10 18:32:26 -07:00
}
conf.Product = product
conf.Type = "a"
conf.LastId = last_id
if conf.Id == 0 {
a.Database.ConfirmationService().CreateConfirmation(conf)
} else {
a.Database.ConfirmationService().UpdateConfirmation(conf)
}
a.Logger.Info(`aggregation validator: DONE product="%v interval="%v"`,
2017-10-13 09:34:35 -07:00
product, interval)
2017-10-10 18:32:26 -07:00
}
2017-10-13 09:34:35 -07:00
func (a *AggregationValidator) getTimes(product string, interval time.Duration,
last int) ([]time.Time, int, error) {
2017-10-10 18:32:26 -07:00
tmp := make(map[time.Time]struct{})
2017-10-13 09:34:35 -07:00
results, err := a.Database.TradeService().
TradesWhereResults("product = $1 AND id > $2 ORDER BY id ASC", product, last)
2017-10-10 18:32:26 -07:00
if err != nil {
a.Logger.Info(`aggregation validator: failed to retrive times error="%v"`, err)
2017-10-13 09:34:35 -07:00
return nil, 0, err
2017-10-10 18:32:26 -07:00
}
2017-10-13 09:34:35 -07:00
var trade tacitus.Trade
for results.Next() {
trade = results.Value().(tacitus.Trade)
time := trade.Timestamp.Truncate(interval).Add(interval)
2017-10-10 18:32:26 -07:00
tmp[time] = struct{}{}
2017-10-13 09:34:35 -07:00
last = trade.Id
2017-10-10 18:32:26 -07:00
}
2017-10-13 09:34:35 -07:00
var i int
2017-10-17 07:12:24 -07:00
times := make([]time.Time, len(tmp))
2017-10-10 18:32:26 -07:00
for k := range tmp {
2017-10-17 07:12:24 -07:00
times[i] = k
i++
}
sort.Sort(timeSlice(times))
return times, last, nil
}
func (a *AggregationValidator) emitZero() {
for _, p := range a.Products {
for _, i := range a.Intervals {
go a.checkZero(p, i)
}
}
}
func (a *AggregationValidator) checkZero(product string, interval time.Duration) {
a.Logger.Info("aggregation validator: START missing aggregations. "+
2017-10-17 07:12:24 -07:00
"product: %v. interval: %v", product, interval)
missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()),
product)
if err != nil {
return
}
for missing.Next() {
agg := missing.Value().(tacitus.Aggregation)
agg.Interval = int(interval.Seconds())
last := agg.Timestamp.Add(-interval)
previous, err := a.Database.AggregationService().
Aggregation(int(interval.Seconds()), product, last)
if err != nil {
a.Logger.Info("aggregation validator: FAIL missing aggregations. "+
"product: %v. interval: %v", product, interval)
missing.Close()
return
}
agg.Price = previous.Price
2017-10-17 07:12:24 -07:00
a.Database.AggregationService().CreateAggregation(agg)
2017-10-10 18:32:26 -07:00
}
missing.Close()
a.Logger.Info("aggregation validator: DONE missing aggregations. "+
2017-10-17 07:12:24 -07:00
"product: %v. interval: %v", product, interval)
2017-10-10 18:32:26 -07:00
}