This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/ops/aggregation_validator.go

203 lines
4.4 KiB
Go
Raw Normal View History

2017-10-10 18:32:26 -07:00
package ops
import (
"github.com/kcotugno/tacitus"
2017-10-13 09:34:35 -07:00
"sort"
2017-10-10 18:32:26 -07:00
"time"
)
2017-10-13 09:34:35 -07:00
type timeSlice []time.Time
func (t timeSlice) Len() int { return len(t) }
func (t timeSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t timeSlice) Less(i, j int) bool { return t[i].Before(t[j]) }
2017-10-10 18:32:26 -07:00
type AggregationValidator struct {
2017-10-13 09:34:35 -07:00
Database tacitus.DatabaseClientService
Logger tacitus.Logger
Products []string
Intervals []time.Duration
2017-10-10 18:32:26 -07:00
2017-10-20 17:02:53 -07:00
done chan struct{}
stopped bool
dirty bool
2017-10-10 18:32:26 -07:00
}
func (a *AggregationValidator) Start(frequency time.Duration) {
2017-10-20 17:02:53 -07:00
if a.dirty {
return
}
a.dirty = true
a.done = make(chan struct{})
2017-10-10 18:32:26 -07:00
go func() {
2017-10-20 17:02:53 -07:00
ticker := time.NewTicker(frequency)
zero := time.NewTicker(100 * frequency)
2017-10-10 18:32:26 -07:00
var done bool
2017-10-20 17:02:53 -07:00
var running int
2017-10-10 18:32:26 -07:00
2017-10-20 17:02:53 -07:00
notify := make(chan int, 256)
2017-10-10 18:32:26 -07:00
for !done {
select {
2017-10-20 17:02:53 -07:00
case <-a.done:
2017-10-13 09:34:35 -07:00
ticker.Stop()
2017-10-17 07:12:24 -07:00
zero.Stop()
2017-10-10 18:32:26 -07:00
done = true
2017-10-17 07:12:24 -07:00
case <-zero.C:
2017-10-20 17:02:53 -07:00
a.emitZero(notify)
2017-10-13 09:34:35 -07:00
case <-ticker.C:
2017-10-20 17:02:53 -07:00
if running == 0 {
a.emitProducts(notify)
} else {
a.Logger.Info("aggregation validator: already running")
}
case i := <-notify:
running = running + i
2017-10-10 18:32:26 -07:00
}
}
}()
}
func (a *AggregationValidator) Stop() {
2017-10-20 17:02:53 -07:00
if a.done != nil && !a.stopped {
a.done <- struct{}{}
a.stopped = true
2017-10-10 18:32:26 -07:00
close(a.done)
}
}
2017-10-20 17:02:53 -07:00
func (a *AggregationValidator) emitProducts(status chan int) {
2017-10-10 18:32:26 -07:00
for _, p := range a.Products {
2017-10-13 09:34:35 -07:00
for _, i := range a.Intervals {
2017-10-20 17:02:53 -07:00
status <- 1
go a.validate(p, i, status)
2017-10-13 09:34:35 -07:00
}
2017-10-10 18:32:26 -07:00
}
}
2017-10-20 17:02:53 -07:00
func (a *AggregationValidator) validate(product string, interval time.Duration,
status chan<- int) {
defer func() { status <- -1 }()
2017-10-13 09:34:35 -07:00
var agg Aggregator
agg.Database = a.Database
agg.Logger = a.Logger
a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, interval)
2017-10-10 18:32:26 -07:00
conf, _ := a.Database.ConfirmationService().Confirmation(product, "a")
2017-10-13 09:34:35 -07:00
times, last_id, err := a.getTimes(product, interval, conf.LastId)
if err != nil {
a.Logger.Info(`aggregation validator: FAIL product="%v interval="%v" error="%v"`,
product, interval, err)
return
}
a.Logger.Info("aggregation validator: %v missing: %v", product, len(times))
2017-10-10 18:32:26 -07:00
for _, t := range times {
2017-10-13 09:34:35 -07:00
agg.aggregate(product, t, interval)
2017-10-10 18:32:26 -07:00
}
conf.Product = product
conf.Type = "a"
conf.LastId = last_id
if conf.Id == 0 {
a.Database.ConfirmationService().CreateConfirmation(conf)
} else {
a.Database.ConfirmationService().UpdateConfirmation(conf)
}
a.Logger.Info(`aggregation validator: DONE product="%v interval="%v"`,
2017-10-13 09:34:35 -07:00
product, interval)
2017-10-10 18:32:26 -07:00
}
2017-10-13 09:34:35 -07:00
func (a *AggregationValidator) getTimes(product string, interval time.Duration,
last int) ([]time.Time, int, error) {
2017-10-10 18:32:26 -07:00
tmp := make(map[time.Time]struct{})
2017-10-13 09:34:35 -07:00
results, err := a.Database.TradeService().
TradesWhereResults("product = $1 AND id > $2 ORDER BY id ASC", product, last)
2017-10-10 18:32:26 -07:00
if err != nil {
a.Logger.Info(`aggregation validator: failed to retrive times error="%v"`, err)
2017-10-13 09:34:35 -07:00
return nil, 0, err
2017-10-10 18:32:26 -07:00
}
2017-10-13 09:34:35 -07:00
var trade tacitus.Trade
for results.Next() {
trade = results.Value().(tacitus.Trade)
time := trade.Timestamp.Truncate(interval).Add(interval)
2017-10-10 18:32:26 -07:00
tmp[time] = struct{}{}
2017-10-13 09:34:35 -07:00
last = trade.Id
2017-10-10 18:32:26 -07:00
}
2017-10-13 09:34:35 -07:00
var i int
2017-10-17 07:12:24 -07:00
times := make([]time.Time, len(tmp))
2017-10-10 18:32:26 -07:00
for k := range tmp {
2017-10-17 07:12:24 -07:00
times[i] = k
i++
}
sort.Sort(timeSlice(times))
return times, last, nil
}
2017-10-20 17:02:53 -07:00
func (a *AggregationValidator) emitZero(status chan int) {
2017-10-17 07:12:24 -07:00
for _, p := range a.Products {
for _, i := range a.Intervals {
2017-10-20 17:02:53 -07:00
status <- 1
go a.checkZero(p, i, status)
2017-10-17 07:12:24 -07:00
}
}
}
2017-10-20 17:02:53 -07:00
func (a *AggregationValidator) checkZero(product string, interval time.Duration,
status chan<- int) {
defer func() { status <- -1 }()
a.Logger.Info("aggregation validator: START missing aggregations. "+
2017-10-17 07:12:24 -07:00
"product: %v. interval: %v", product, interval)
missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()),
product)
if err != nil {
return
}
for missing.Next() {
agg := missing.Value().(tacitus.Aggregation)
agg.Interval = int(interval.Seconds())
last := agg.Timestamp.Add(-interval)
previous, err := a.Database.AggregationService().
Aggregation(int(interval.Seconds()), product, last)
if err != nil {
a.Logger.Info("aggregation validator: FAIL missing aggregations. "+
"product: %v. interval: %v", product, interval)
missing.Close()
return
}
agg.Price = previous.Price
2017-10-17 07:12:24 -07:00
a.Database.AggregationService().CreateAggregation(agg)
2017-10-10 18:32:26 -07:00
}
missing.Close()
a.Logger.Info("aggregation validator: DONE missing aggregations. "+
2017-10-17 07:12:24 -07:00
"product: %v. interval: %v", product, interval)
2017-10-10 18:32:26 -07:00
}