203 lines
4.4 KiB
Go
203 lines
4.4 KiB
Go
package ops
|
|
|
|
import (
|
|
"git.kevincotugno.com/kcotugno/tacitus"
|
|
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
type timeSlice []time.Time
|
|
|
|
func (t timeSlice) Len() int { return len(t) }
|
|
func (t timeSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
|
func (t timeSlice) Less(i, j int) bool { return t[i].Before(t[j]) }
|
|
|
|
type AggregationValidator struct {
|
|
Database tacitus.DatabaseClientService
|
|
Logger tacitus.Logger
|
|
Products []string
|
|
Intervals []time.Duration
|
|
|
|
done chan struct{}
|
|
stopped bool
|
|
dirty bool
|
|
}
|
|
|
|
func (a *AggregationValidator) Start(frequency time.Duration) {
|
|
if a.dirty {
|
|
return
|
|
}
|
|
|
|
a.dirty = true
|
|
a.done = make(chan struct{})
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(frequency)
|
|
zero := time.NewTicker(100 * frequency)
|
|
|
|
var done bool
|
|
var running int
|
|
|
|
notify := make(chan int, 256)
|
|
|
|
for !done {
|
|
select {
|
|
case <-a.done:
|
|
ticker.Stop()
|
|
zero.Stop()
|
|
done = true
|
|
case <-zero.C:
|
|
a.emitZero(notify)
|
|
case <-ticker.C:
|
|
if running == 0 {
|
|
a.emitProducts(notify)
|
|
} else {
|
|
a.Logger.Info("aggregation validator: already running")
|
|
}
|
|
|
|
case i := <-notify:
|
|
running = running + i
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (a *AggregationValidator) Stop() {
|
|
if a.done != nil && !a.stopped {
|
|
a.done <- struct{}{}
|
|
a.stopped = true
|
|
close(a.done)
|
|
}
|
|
}
|
|
|
|
func (a *AggregationValidator) emitProducts(status chan int) {
|
|
for _, p := range a.Products {
|
|
for _, i := range a.Intervals {
|
|
status <- 1
|
|
go a.validate(p, i, status)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *AggregationValidator) validate(product string, interval time.Duration,
|
|
status chan<- int) {
|
|
defer func() { status <- -1 }()
|
|
|
|
var agg Aggregator
|
|
agg.Database = a.Database
|
|
agg.Logger = a.Logger
|
|
|
|
a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, interval)
|
|
|
|
conf, _ := a.Database.ConfirmationService().Confirmation(product, "a")
|
|
|
|
times, last_id, err := a.getTimes(product, interval, conf.LastId)
|
|
if err != nil {
|
|
a.Logger.Info(`aggregation validator: FAIL product="%v interval="%v" error="%v"`,
|
|
product, interval, err)
|
|
return
|
|
}
|
|
|
|
a.Logger.Info("aggregation validator: %v missing: %v", product, len(times))
|
|
|
|
for _, t := range times {
|
|
agg.aggregate(product, t, interval)
|
|
}
|
|
|
|
conf.Product = product
|
|
conf.Type = "a"
|
|
conf.LastId = last_id
|
|
if conf.Id == 0 {
|
|
a.Database.ConfirmationService().CreateConfirmation(conf)
|
|
} else {
|
|
a.Database.ConfirmationService().UpdateConfirmation(conf)
|
|
}
|
|
|
|
a.Logger.Info(`aggregation validator: DONE product="%v interval="%v"`,
|
|
product, interval)
|
|
}
|
|
|
|
func (a *AggregationValidator) getTimes(product string, interval time.Duration,
|
|
last int) ([]time.Time, int, error) {
|
|
|
|
tmp := make(map[time.Time]struct{})
|
|
|
|
results, err := a.Database.TradeService().
|
|
TradesWhereResults("product = $1 AND id > $2 ORDER BY id ASC", product, last)
|
|
if err != nil {
|
|
a.Logger.Info(`aggregation validator: failed to retrive times error="%v"`, err)
|
|
return nil, 0, err
|
|
}
|
|
|
|
var trade tacitus.Trade
|
|
|
|
for results.Next() {
|
|
trade = results.Value().(tacitus.Trade)
|
|
|
|
time := trade.Timestamp.Truncate(interval).Add(interval)
|
|
tmp[time] = struct{}{}
|
|
last = trade.Id
|
|
}
|
|
|
|
var i int
|
|
times := make([]time.Time, len(tmp))
|
|
for k := range tmp {
|
|
times[i] = k
|
|
i++
|
|
}
|
|
|
|
sort.Sort(timeSlice(times))
|
|
return times, last, nil
|
|
}
|
|
|
|
func (a *AggregationValidator) emitZero(status chan int) {
|
|
for _, p := range a.Products {
|
|
for _, i := range a.Intervals {
|
|
status <- 1
|
|
go a.checkZero(p, i, status)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *AggregationValidator) checkZero(product string, interval time.Duration,
|
|
status chan<- int) {
|
|
defer func() { status <- -1 }()
|
|
|
|
a.Logger.Info("aggregation validator: START missing aggregations. "+
|
|
"product: %v. interval: %v", product, interval)
|
|
|
|
missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()),
|
|
product)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for missing.Next() {
|
|
agg := missing.Value().(tacitus.Aggregation)
|
|
agg.Interval = int(interval.Seconds())
|
|
|
|
last := agg.Timestamp.Add(-interval)
|
|
|
|
previous, err := a.Database.AggregationService().
|
|
Aggregation(int(interval.Seconds()), product, last)
|
|
if err != nil {
|
|
a.Logger.Info("aggregation validator: FAIL missing aggregations. "+
|
|
"product: %v. interval: %v", product, interval)
|
|
|
|
missing.Close()
|
|
return
|
|
}
|
|
|
|
agg.Price = previous.Price
|
|
|
|
a.Database.AggregationService().CreateAggregation(agg)
|
|
}
|
|
|
|
missing.Close()
|
|
|
|
a.Logger.Info("aggregation validator: DONE missing aggregations. "+
|
|
"product: %v. interval: %v", product, interval)
|
|
}
|