package ops import ( "github.com/kcotugno/tacitus" "sort" "time" ) type timeSlice []time.Time func (t timeSlice) Len() int { return len(t) } func (t timeSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t timeSlice) Less(i, j int) bool { return t[i].Before(t[j]) } type AggregationValidator struct { Database tacitus.DatabaseClientService Logger tacitus.Logger Products []string Intervals []time.Duration ticker *time.Ticker done chan bool running bool } func (a *AggregationValidator) Start(frequency time.Duration) { a.Stop() go func() { var done bool a.ticker = time.NewTicker(frequency) ticker := a.ticker a.done = make(chan bool) stop := a.done zero := time.NewTicker(100 * frequency) for !done { select { case <-stop: ticker.Stop() zero.Stop() done = true case <-zero.C: a.emitZero() case <-ticker.C: a.emitProducts() } } }() } func (a *AggregationValidator) Stop() { if a.done != nil { a.done <- true close(a.done) } } func (a *AggregationValidator) emitProducts() { for _, p := range a.Products { for _, i := range a.Intervals { go a.validate(p, i) } } } func (a *AggregationValidator) validate(product string, interval time.Duration) { var agg Aggregator agg.Database = a.Database agg.Logger = a.Logger a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, interval) conf, _ := a.Database.ConfirmationService().Confirmation(product, "a") times, last_id, err := a.getTimes(product, interval, conf.LastId) if err != nil { a.Logger.Info(`aggregation validator: FAIL product="%v interval="%v" error="%v"`, product, interval, err) return } a.Logger.Info("aggregation validator: %v missing: %v", product, len(times)) for _, t := range times { agg.aggregate(product, t, interval) } conf.Product = product conf.Type = "a" conf.LastId = last_id if conf.Id == 0 { a.Database.ConfirmationService().CreateConfirmation(conf) } else { a.Database.ConfirmationService().UpdateConfirmation(conf) } a.Logger.Info(`aggregation validator: DONE product="%v interval="%v"`, product, interval) } func (a *AggregationValidator) getTimes(product string, interval time.Duration, last int) ([]time.Time, int, error) { tmp := make(map[time.Time]struct{}) results, err := a.Database.TradeService(). TradesWhereResults("product = $1 AND id > $2 ORDER BY id ASC", product, last) if err != nil { a.Logger.Info(`aggregation validator: failed to retrive times error="%v"`, err) return nil, 0, err } var trade tacitus.Trade for results.Next() { trade = results.Value().(tacitus.Trade) time := trade.Timestamp.Truncate(interval).Add(interval) tmp[time] = struct{}{} last = trade.Id } var i int times := make([]time.Time, len(tmp)) for k := range tmp { times[i] = k i++ } sort.Sort(timeSlice(times)) return times, last, nil } func (a *AggregationValidator) emitZero() { for _, p := range a.Products { for _, i := range a.Intervals { go a.checkZero(p, i) } } } func (a *AggregationValidator) checkZero(product string, interval time.Duration) { a.Logger.Info("aggregation validator: START missing aggregations. "+ "product: %v. interval: %v", product, interval) missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()), product) if err != nil { return } for missing.Next() { agg := missing.Value().(tacitus.Aggregation) agg.Interval = int(interval.Seconds()) last := agg.Timestamp.Add(-interval) previous, err := a.Database.AggregationService(). Aggregation(int(interval.Seconds()), product, last) if err != nil { a.Logger.Info("aggregation validator: FAIL missing aggregations. "+ "product: %v. interval: %v", product, interval) missing.Close() return } agg.Price = previous.Price a.Database.AggregationService().CreateAggregation(agg) } missing.Close() a.Logger.Info("aggregation validator: DONE missing aggregations. "+ "product: %v. interval: %v", product, interval) }