From 2abed9d9ed75b21961969dd6d6c60974fbaff8a1 Mon Sep 17 00:00:00 2001 From: Kevin Cotugno Date: Tue, 17 Oct 2017 07:12:24 -0700 Subject: [PATCH] Missing aggregations --- aggregation.go | 1 + ops/aggregation_validator.go | 46 ++++++++++++++--- postgres/aggregation_results.go | 89 +++++++++++++++++++++++++++++++++ postgres/aggregation_service.go | 22 ++++++++ postgres/trade_result.go | 5 +- 5 files changed, 154 insertions(+), 9 deletions(-) create mode 100644 postgres/aggregation_results.go diff --git a/aggregation.go b/aggregation.go index 5d8dd94..3df93e5 100644 --- a/aggregation.go +++ b/aggregation.go @@ -22,4 +22,5 @@ type AggregationService interface { Aggregation(interval int, product string, timestamp time.Time) (Aggregation, error) CreateAggregation(a Aggregation) (Aggregation, error) UpdateAggregation(a Aggregation) (Aggregation, error) + MissingTimes(interval int, product string) (Results, error) } diff --git a/ops/aggregation_validator.go b/ops/aggregation_validator.go index 96cfdd2..b41f8c2 100644 --- a/ops/aggregation_validator.go +++ b/ops/aggregation_validator.go @@ -35,13 +35,16 @@ func (a *AggregationValidator) Start(frequency time.Duration) { a.done = make(chan bool) stop := a.done - a.emitProducts() + zero := time.NewTicker(100 * frequency) for !done { select { case <-stop: ticker.Stop() + zero.Stop() done = true + case <-zero.C: + a.emitZero() case <-ticker.C: a.emitProducts() } @@ -57,7 +60,6 @@ func (a *AggregationValidator) Stop() { } func (a *AggregationValidator) emitProducts() { - for _, p := range a.Products { for _, i := range a.Intervals { go a.validate(p, i) @@ -123,12 +125,44 @@ func (a *AggregationValidator) getTimes(product string, interval time.Duration, } var i int - t := make([]time.Time, len(tmp)) + times := make([]time.Time, len(tmp)) for k := range tmp { - t[i] = k + times[i] = k i++ } - sort.Sort(timeSlice(t)) - return t, last, nil + sort.Sort(timeSlice(times)) + return times, last, nil +} + +func (a *AggregationValidator) emitZero() { + for _, p := range a.Products { + for _, i := range a.Intervals { + go a.checkZero(p, i) + } + } +} + +func (a *AggregationValidator) checkZero(product string, interval time.Duration) { + a.Logger.Info("aggregation validator: START missing aggregations. " + + "product: %v. interval: %v", product, interval) + + missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()), + product) + + if err != nil { + return + } + + var i int + for missing.Next() { + i++ + agg := missing.Value().(tacitus.Aggregation) + agg.Interval = int(interval.Seconds()) + + a.Database.AggregationService().CreateAggregation(agg) + } + + a.Logger.Info("aggregation validator: DONE missing aggregations. " + + "product: %v. interval: %v", product, interval) } diff --git a/postgres/aggregation_results.go b/postgres/aggregation_results.go new file mode 100644 index 0000000..22e25b7 --- /dev/null +++ b/postgres/aggregation_results.go @@ -0,0 +1,89 @@ +package postgres + +import ( + "github.com/kcotugno/tacitus" + "github.com/shopspring/decimal" + + "database/sql" + "errors" + "time" +) + +type AggregationResults struct { + rows *sql.Rows + aggregation tacitus.Aggregation + err error +} + +func (r *AggregationResults) Next() bool { + var a tacitus.Aggregation + var id, interval, buyTransactions, sellTransactions *int + var product *string + var price, buyVolume, sellVolume *decimal.Decimal + var timestamp *time.Time + + if r.rows == nil { + r.err = errors.New("No query results") + return false + } + + if r.rows.Next() { + r.err = r.rows.Scan(&id, &interval, &product, &price, &buyVolume, + &sellVolume, &buyTransactions, &sellTransactions, ×tamp) + + if id != nil { + a.Id = *id + } + if interval != nil { + a.Interval = *interval + } + if product != nil { + a.Product = *product + } + if price != nil { + a.Price = *price + } + if buyVolume != nil { + a.BuyVolume = *buyVolume + } + if sellVolume != nil { + a.SellVolume = *sellVolume + } + if buyTransactions != nil { + a.BuyTransactions = *buyTransactions + } + if sellTransactions != nil { + a.SellTransactions = *sellTransactions + } + if timestamp != nil { + a.Timestamp = *timestamp + } + + r.aggregation = a + } else { + r.err = r.rows.Err() + return false + } + + if r.err == nil { + return true + } else { + return false + } +} + +func (r *AggregationResults) Value() interface{} { + return r.aggregation +} + +func (r *AggregationResults) Error() error { + return r.err +} + +func (r *AggregationResults) Close() error { + if r.rows != nil { + return r.rows.Close() + } + + return nil +} diff --git a/postgres/aggregation_service.go b/postgres/aggregation_service.go index 91f89db..0205cab 100644 --- a/postgres/aggregation_service.go +++ b/postgres/aggregation_service.go @@ -4,6 +4,7 @@ import ( "github.com/kcotugno/tacitus" "time" + "strings" ) const ( @@ -16,6 +17,11 @@ const ( agg_update = `UPDATE aggregations SET interval = $1, product = $2, ` + `price = $3, buy_volume = $4, sell_volume = $5, buy_transactions = $6, ` + `sell_transactions = $7, timestamp = $8 WHERE id = $9` + agg_missing = `SELECT id, ` + agg_columns + ` FROM (SELECT ` + + `generate_series(min(timestamp), max(timestamp), $1)::timestamptz ` + + `AS timestamp, $2::char(7) product FROM aggregations WHERE interval = $3 ` + + `AND product = $4) AS x LEFT OUTER JOIN aggregations a USING(timestamp, product) ` + + `WHERE a.id IS NULL ORDER BY x.timestamp;` ) type AggregationService struct { @@ -77,3 +83,19 @@ func (s *AggregationService) UpdateAggregation(a tacitus.Aggregation) (tacitus.A return a, err } + +func (s *AggregationService) MissingTimes(interval int, product string) (tacitus.Results, error) { + s.client.logQuery(agg_missing, interval, product, interval, product) + + rows, err := s.client.db.Query(agg_missing, interval, product, interval, product) + if err != nil { + s.client.logError(agg_missing, err, strings.Join([]string{string(interval), " seconds"}, ""), product, interval, product) + + return nil, err + } + + results := AggregationResults{} + results.rows = rows + + return &results, nil +} diff --git a/postgres/trade_result.go b/postgres/trade_result.go index 10816d3..72f0c00 100644 --- a/postgres/trade_result.go +++ b/postgres/trade_result.go @@ -15,7 +15,6 @@ type TradeResults struct { func (r *TradeResults) Next() bool { var t tacitus.Trade - var done bool if r.rows == nil { r.err = errors.New("No query results") @@ -27,11 +26,11 @@ func (r *TradeResults) Next() bool { &t.Buy, &t.Sell, &t.Timestamp) r.trade = t } else { - done = true r.err = r.rows.Err() + return false } - if r.err == nil && !done { + if r.err == nil { return true } else { return false