Fix aggregator bugs

This commit is contained in:
Kevin Cotugno 2017-10-09 21:51:47 -07:00
parent cabe6ab0b1
commit fbd9d80b96

View File

@ -18,11 +18,12 @@ type Aggregator struct {
func (a *Aggregator) Start(interval time.Duration) {
go func() {
a.setDone(true)
a.setDone(false)
var timer *time.Ticker
correction := time.Until(time.Now().Truncate(interval).Add(interval))
a.Logger.Info("aggregator: time correction")
// We add 1.5 seconds to ensure that we do not miss one or two
// that may still be being inserted into the databast.
@ -30,6 +31,7 @@ func (a *Aggregator) Start(interval time.Duration) {
time.Sleep(correction + (1500 * time.Millisecond))
timer = time.NewTicker(interval)
a.Logger.Info("aggregator: starting")
for !a.done() {
select {
@ -49,6 +51,8 @@ func (a *Aggregator) Start(interval time.Duration) {
}
func (a *Aggregator) aggregate(product string, interval time.Duration) {
a.Logger.Info(`aggregator: start product="%v" interval="%v"`, product, interval)
end := time.Now().Truncate(interval)
start := end.Add(-interval)
@ -59,7 +63,7 @@ func (a *Aggregator) aggregate(product string, interval time.Duration) {
}
var agg tacitus.Aggregation
agg.Interval = int(interval)
agg.Interval = int(interval.Seconds())
agg.Product = product
agg.Timestamp = end