This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/ops/aggregator.go

101 lines
1.9 KiB
Go

package ops
import (
"github.com/kcotugno/tacitus"
"sync"
"time"
)
type Aggregator struct {
Database tacitus.DatabaseClientService
Logger tacitus.Logger
Products []string
isDone bool
doneMu sync.Mutex
}
func (a *Aggregator) Start(interval time.Duration) {
go func() {
a.setDone(true)
var timer *time.Ticker
correction := time.Until(time.Now().Truncate(interval).Add(interval))
// We add 1.5 seconds to ensure that we do not miss one or two
// that may still be being inserted into the databast.
// This may be changed based off of further testing.
time.Sleep(correction + (1500 * time.Millisecond))
timer = time.NewTicker(interval)
for !a.done() {
select {
case <-timer.C:
if a.Products == nil {
a.Logger.Info("aggregator: products are nil")
break
}
for _, p := range a.Products {
go a.aggregate(p, interval)
}
}
}
}()
}
func (a *Aggregator) aggregate(product string, interval time.Duration) {
end := time.Now().Truncate(interval)
start := end.Add(-interval)
trades, err := a.Database.TradeService().TradesInDateRange(product, start, end)
if err != nil {
a.Logger.Info(`aggregator: database error="%v"`, err)
return
}
var agg tacitus.Aggregation
agg.Interval = int(interval)
agg.Product = product
agg.Timestamp = end
for _, t := range trades {
agg.Price = t.Price
if t.Buy {
agg.BuyVolume = agg.BuyVolume.Add(t.Size)
agg.BuyTransactions++
} else {
agg.SellVolume = agg.SellVolume.Add(t.Size)
agg.SellTransactions++
}
}
_, err = a.Database.AggregationService().CreateAggregation(agg)
if err != nil {
a.Logger.Info(`aggregator: database error="%v"`, err)
}
}
func (a *Aggregator) Stop() {
a.setDone(true)
}
func (a *Aggregator) done() bool {
a.doneMu.Lock()
defer a.doneMu.Unlock()
return a.isDone
}
func (a *Aggregator) setDone(done bool) {
a.doneMu.Lock()
defer a.doneMu.Unlock()
a.isDone = done
}