Add aggregator
This commit is contained in:
parent
7fce70d7b5
commit
7e59a5b3ae
76
ops/aggregator.go
Normal file
76
ops/aggregator.go
Normal file
@ -0,0 +1,76 @@
|
||||
package ops
|
||||
|
||||
import (
|
||||
"github.com/kcotugno/tacitus"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Aggregator struct {
|
||||
Database tacitus.DatabaseClientService
|
||||
Logger tacitus.Logger
|
||||
Products []string
|
||||
|
||||
isDone bool
|
||||
doneMu sync.Mutex
|
||||
}
|
||||
|
||||
func (a *Aggregator) Start(interval time.Duration) {
|
||||
go func() {
|
||||
a.setDone(true)
|
||||
|
||||
var timer *time.Ticker
|
||||
|
||||
correction := time.Until(time.Now().Truncate(interval).Add(interval))
|
||||
|
||||
// We add 1.5 seconds to ensure that we do not miss one or two
|
||||
// that may still be being inserted into the databast.
|
||||
// This may be changed based off of further testing.
|
||||
|
||||
time.Sleep(correction + (1500 * time.Millisecond))
|
||||
|
||||
timer = time.NewTicker(interval)
|
||||
|
||||
for !a.done() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
if a.Products == nil {
|
||||
a.Logger.Info("aggregator: products are nil")
|
||||
break
|
||||
}
|
||||
|
||||
for _, p := range a.Products {
|
||||
go a.aggregate(p, interval)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (a *Aggregator) aggregate(product string, interval time.Duration) {
|
||||
end := time.Now().Truncate(interval)
|
||||
start := end.Add(-interval)
|
||||
|
||||
_, _ = a.Database.TradeService().
|
||||
TradesInDateRange(product, start, end)
|
||||
}
|
||||
|
||||
func (a *Aggregator) Stop() {
|
||||
a.setDone(true)
|
||||
}
|
||||
|
||||
func (a *Aggregator) done() bool {
|
||||
a.doneMu.Lock()
|
||||
defer a.doneMu.Unlock()
|
||||
|
||||
return a.isDone
|
||||
}
|
||||
|
||||
func (a *Aggregator) setDone(done bool) {
|
||||
a.doneMu.Lock()
|
||||
defer a.doneMu.Unlock()
|
||||
|
||||
a.isDone = done
|
||||
}
|
Reference in New Issue
Block a user