diff --git a/ops/aggregator.go b/ops/aggregator.go index 1823b00..cae56d0 100644 --- a/ops/aggregator.go +++ b/ops/aggregator.go @@ -52,8 +52,33 @@ func (a *Aggregator) aggregate(product string, interval time.Duration) { end := time.Now().Truncate(interval) start := end.Add(-interval) - _, _ = a.Database.TradeService(). - TradesInDateRange(product, start, end) + trades, err := a.Database.TradeService().TradesInDateRange(product, start, end) + if err != nil { + a.Logger.Info(`aggregator: database error="%v"`, err) + return + } + + var agg tacitus.Aggregation + agg.Interval = int(interval) + agg.Product = product + agg.Timestamp = end + + for _, t := range trades { + agg.Price = t.Price + + if t.Buy { + agg.BuyVolume = agg.BuyVolume.Add(t.Size) + agg.BuyTransactions++ + } else { + agg.SellVolume = agg.SellVolume.Add(t.Size) + agg.SellTransactions++ + } + } + + _, err = a.Database.AggregationService().CreateAggregation(agg) + if err != nil { + a.Logger.Info(`aggregator: database error="%v"`, err) + } } func (a *Aggregator) Stop() {