package ops import ( "git.kevincotugno.com/kcotugno/tacitus" "sort" "time" ) type timeSlice []time.Time func (t timeSlice) Len() int { return len(t) } func (t timeSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t timeSlice) Less(i, j int) bool { return t[i].Before(t[j]) } type AggregationValidator struct { Database tacitus.DatabaseClientService Logger tacitus.Logger Products []string Intervals []time.Duration done chan struct{} stopped bool dirty bool } func (a *AggregationValidator) Start(frequency time.Duration) { if a.dirty { return } a.dirty = true a.done = make(chan struct{}) go func() { ticker := time.NewTicker(frequency) zero := time.NewTicker(100 * frequency) var done bool var running int notify := make(chan int, 256) for !done { select { case <-a.done: ticker.Stop() zero.Stop() done = true case <-zero.C: a.emitZero(notify) case <-ticker.C: if running == 0 { a.emitProducts(notify) } else { a.Logger.Info("aggregation validator: already running") } case i := <-notify: running = running + i } } }() } func (a *AggregationValidator) Stop() { if a.done != nil && !a.stopped { a.done <- struct{}{} a.stopped = true close(a.done) } } func (a *AggregationValidator) emitProducts(status chan int) { for _, p := range a.Products { for _, i := range a.Intervals { status <- 1 go a.validate(p, i, status) } } } func (a *AggregationValidator) validate(product string, interval time.Duration, status chan<- int) { defer func() { status <- -1 }() var agg Aggregator agg.Database = a.Database agg.Logger = a.Logger a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, interval) conf, _ := a.Database.ConfirmationService().Confirmation(product, "a") times, last_id, err := a.getTimes(product, interval, conf.LastId) if err != nil { a.Logger.Info(`aggregation validator: FAIL product="%v interval="%v" error="%v"`, product, interval, err) return } a.Logger.Info("aggregation validator: %v missing: %v", product, len(times)) for _, t := range times { agg.aggregate(product, t, interval) } conf.Product = product conf.Type = "a" conf.LastId = last_id if conf.Id == 0 { a.Database.ConfirmationService().CreateConfirmation(conf) } else { a.Database.ConfirmationService().UpdateConfirmation(conf) } a.Logger.Info(`aggregation validator: DONE product="%v interval="%v"`, product, interval) } func (a *AggregationValidator) getTimes(product string, interval time.Duration, last int) ([]time.Time, int, error) { tmp := make(map[time.Time]struct{}) results, err := a.Database.TradeService(). TradesWhereResults("product = $1 AND id > $2 ORDER BY id ASC", product, last) if err != nil { a.Logger.Info(`aggregation validator: failed to retrive times error="%v"`, err) return nil, 0, err } var trade tacitus.Trade for results.Next() { trade = results.Value().(tacitus.Trade) time := trade.Timestamp.Truncate(interval).Add(interval) tmp[time] = struct{}{} last = trade.Id } var i int times := make([]time.Time, len(tmp)) for k := range tmp { times[i] = k i++ } sort.Sort(timeSlice(times)) return times, last, nil } func (a *AggregationValidator) emitZero(status chan int) { for _, p := range a.Products { for _, i := range a.Intervals { status <- 1 go a.checkZero(p, i, status) } } } func (a *AggregationValidator) checkZero(product string, interval time.Duration, status chan<- int) { defer func() { status <- -1 }() a.Logger.Info("aggregation validator: START missing aggregations. "+ "product: %v. interval: %v", product, interval) missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()), product) if err != nil { return } for missing.Next() { agg := missing.Value().(tacitus.Aggregation) agg.Interval = int(interval.Seconds()) last := agg.Timestamp.Add(-interval) previous, err := a.Database.AggregationService(). Aggregation(int(interval.Seconds()), product, last) if err != nil { a.Logger.Info("aggregation validator: FAIL missing aggregations. "+ "product: %v. interval: %v", product, interval) missing.Close() return } agg.Price = previous.Price a.Database.AggregationService().CreateAggregation(agg) } missing.Close() a.Logger.Info("aggregation validator: DONE missing aggregations. "+ "product: %v. interval: %v", product, interval) }