This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/ops/aggregation_validator.go

203 lines
4.4 KiB
Go

package ops
import (
"git.kevincotugno.com/kcotugno/tacitus"
"sort"
"time"
)
type timeSlice []time.Time
func (t timeSlice) Len() int { return len(t) }
func (t timeSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t timeSlice) Less(i, j int) bool { return t[i].Before(t[j]) }
type AggregationValidator struct {
Database tacitus.DatabaseClientService
Logger tacitus.Logger
Products []string
Intervals []time.Duration
done chan struct{}
stopped bool
dirty bool
}
func (a *AggregationValidator) Start(frequency time.Duration) {
if a.dirty {
return
}
a.dirty = true
a.done = make(chan struct{})
go func() {
ticker := time.NewTicker(frequency)
zero := time.NewTicker(100 * frequency)
var done bool
var running int
notify := make(chan int, 256)
for !done {
select {
case <-a.done:
ticker.Stop()
zero.Stop()
done = true
case <-zero.C:
a.emitZero(notify)
case <-ticker.C:
if running == 0 {
a.emitProducts(notify)
} else {
a.Logger.Info("aggregation validator: already running")
}
case i := <-notify:
running = running + i
}
}
}()
}
func (a *AggregationValidator) Stop() {
if a.done != nil && !a.stopped {
a.done <- struct{}{}
a.stopped = true
close(a.done)
}
}
func (a *AggregationValidator) emitProducts(status chan int) {
for _, p := range a.Products {
for _, i := range a.Intervals {
status <- 1
go a.validate(p, i, status)
}
}
}
func (a *AggregationValidator) validate(product string, interval time.Duration,
status chan<- int) {
defer func() { status <- -1 }()
var agg Aggregator
agg.Database = a.Database
agg.Logger = a.Logger
a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, interval)
conf, _ := a.Database.ConfirmationService().Confirmation(product, "a")
times, last_id, err := a.getTimes(product, interval, conf.LastId)
if err != nil {
a.Logger.Info(`aggregation validator: FAIL product="%v interval="%v" error="%v"`,
product, interval, err)
return
}
a.Logger.Info("aggregation validator: %v missing: %v", product, len(times))
for _, t := range times {
agg.aggregate(product, t, interval)
}
conf.Product = product
conf.Type = "a"
conf.LastId = last_id
if conf.Id == 0 {
a.Database.ConfirmationService().CreateConfirmation(conf)
} else {
a.Database.ConfirmationService().UpdateConfirmation(conf)
}
a.Logger.Info(`aggregation validator: DONE product="%v interval="%v"`,
product, interval)
}
func (a *AggregationValidator) getTimes(product string, interval time.Duration,
last int) ([]time.Time, int, error) {
tmp := make(map[time.Time]struct{})
results, err := a.Database.TradeService().
TradesWhereResults("product = $1 AND id > $2 ORDER BY id ASC", product, last)
if err != nil {
a.Logger.Info(`aggregation validator: failed to retrive times error="%v"`, err)
return nil, 0, err
}
var trade tacitus.Trade
for results.Next() {
trade = results.Value().(tacitus.Trade)
time := trade.Timestamp.Truncate(interval).Add(interval)
tmp[time] = struct{}{}
last = trade.Id
}
var i int
times := make([]time.Time, len(tmp))
for k := range tmp {
times[i] = k
i++
}
sort.Sort(timeSlice(times))
return times, last, nil
}
func (a *AggregationValidator) emitZero(status chan int) {
for _, p := range a.Products {
for _, i := range a.Intervals {
status <- 1
go a.checkZero(p, i, status)
}
}
}
func (a *AggregationValidator) checkZero(product string, interval time.Duration,
status chan<- int) {
defer func() { status <- -1 }()
a.Logger.Info("aggregation validator: START missing aggregations. "+
"product: %v. interval: %v", product, interval)
missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()),
product)
if err != nil {
return
}
for missing.Next() {
agg := missing.Value().(tacitus.Aggregation)
agg.Interval = int(interval.Seconds())
last := agg.Timestamp.Add(-interval)
previous, err := a.Database.AggregationService().
Aggregation(int(interval.Seconds()), product, last)
if err != nil {
a.Logger.Info("aggregation validator: FAIL missing aggregations. "+
"product: %v. interval: %v", product, interval)
missing.Close()
return
}
agg.Price = previous.Price
a.Database.AggregationService().CreateAggregation(agg)
}
missing.Close()
a.Logger.Info("aggregation validator: DONE missing aggregations. "+
"product: %v. interval: %v", product, interval)
}