Refactor aggreagtion validator

This commit is contained in:
Kevin Cotugno 2017-10-20 17:02:53 -07:00
parent 09a3e1352b
commit 6beed7b064

View File

@ -19,55 +19,71 @@ type AggregationValidator struct {
Products []string
Intervals []time.Duration
ticker *time.Ticker
done chan bool
running bool
done chan struct{}
stopped bool
dirty bool
}
func (a *AggregationValidator) Start(frequency time.Duration) {
a.Stop()
if a.dirty {
return
}
a.dirty = true
a.done = make(chan struct{})
go func() {
var done bool
a.ticker = time.NewTicker(frequency)
ticker := a.ticker
a.done = make(chan bool)
stop := a.done
ticker := time.NewTicker(frequency)
zero := time.NewTicker(100 * frequency)
var done bool
var running int
notify := make(chan int, 256)
for !done {
select {
case <-stop:
case <-a.done:
ticker.Stop()
zero.Stop()
done = true
case <-zero.C:
a.emitZero()
a.emitZero(notify)
case <-ticker.C:
a.emitProducts()
if running == 0 {
a.emitProducts(notify)
} else {
a.Logger.Info("aggregation validator: already running")
}
case i := <-notify:
running = running + i
}
}
}()
}
func (a *AggregationValidator) Stop() {
if a.done != nil {
a.done <- true
if a.done != nil && !a.stopped {
a.done <- struct{}{}
a.stopped = true
close(a.done)
}
}
func (a *AggregationValidator) emitProducts() {
func (a *AggregationValidator) emitProducts(status chan int) {
for _, p := range a.Products {
for _, i := range a.Intervals {
go a.validate(p, i)
status <- 1
go a.validate(p, i, status)
}
}
}
func (a *AggregationValidator) validate(product string, interval time.Duration) {
func (a *AggregationValidator) validate(product string, interval time.Duration,
status chan<- int) {
defer func() { status <- -1 }()
var agg Aggregator
agg.Database = a.Database
agg.Logger = a.Logger
@ -135,15 +151,19 @@ func (a *AggregationValidator) getTimes(product string, interval time.Duration,
return times, last, nil
}
func (a *AggregationValidator) emitZero() {
func (a *AggregationValidator) emitZero(status chan int) {
for _, p := range a.Products {
for _, i := range a.Intervals {
go a.checkZero(p, i)
status <- 1
go a.checkZero(p, i, status)
}
}
}
func (a *AggregationValidator) checkZero(product string, interval time.Duration) {
func (a *AggregationValidator) checkZero(product string, interval time.Duration,
status chan<- int) {
defer func() { status <- -1 }()
a.Logger.Info("aggregation validator: START missing aggregations. "+
"product: %v. interval: %v", product, interval)