This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/ops/aggregator.go

107 lines
2.2 KiB
Go
Raw Normal View History

2017-10-08 11:19:55 -07:00
package ops
import (
"github.com/kcotugno/tacitus"
"sync"
"time"
)
type Aggregator struct {
Database tacitus.DatabaseClientService
Logger tacitus.Logger
Products []string
isDone bool
doneMu sync.Mutex
}
func (a *Aggregator) Start(interval time.Duration) {
go func() {
2017-10-09 21:51:47 -07:00
a.setDone(false)
2017-10-08 11:19:55 -07:00
var timer *time.Ticker
correction := time.Until(time.Now().Truncate(interval).Add(interval))
2017-10-09 21:51:47 -07:00
a.Logger.Info("aggregator: time correction")
2017-10-08 11:19:55 -07:00
// We add 1.5 seconds to ensure that we do not miss one or two
// that may still be being inserted into the databast.
// This may be changed based off of further testing.
time.Sleep(correction + (1500 * time.Millisecond))
timer = time.NewTicker(interval)
2017-10-09 21:51:47 -07:00
a.Logger.Info("aggregator: starting")
2017-10-08 11:19:55 -07:00
for !a.done() {
select {
case <-timer.C:
if a.Products == nil {
a.Logger.Info("aggregator: products are nil")
break
}
for _, p := range a.Products {
go a.aggregate(p, interval)
}
}
}
}()
}
func (a *Aggregator) aggregate(product string, interval time.Duration) {
2017-10-09 21:51:47 -07:00
a.Logger.Info(`aggregator: start product="%v" interval="%v"`, product, interval)
2017-10-09 17:46:02 -07:00
end := time.Now().Truncate(interval)
start := end.Add(-interval)
2017-10-08 11:19:55 -07:00
2017-10-09 18:01:15 -07:00
trades, err := a.Database.TradeService().TradesInDateRange(product, start, end)
if err != nil {
a.Logger.Info(`aggregator: database error="%v"`, err)
return
}
var agg tacitus.Aggregation
2017-10-09 21:51:47 -07:00
agg.Interval = int(interval.Seconds())
2017-10-09 18:01:15 -07:00
agg.Product = product
agg.Timestamp = end
for _, t := range trades {
agg.Price = t.Price
if t.Buy {
agg.BuyVolume = agg.BuyVolume.Add(t.Size)
agg.BuyTransactions++
} else {
agg.SellVolume = agg.SellVolume.Add(t.Size)
agg.SellTransactions++
}
}
_, err = a.Database.AggregationService().CreateAggregation(agg)
if err != nil {
a.Logger.Info(`aggregator: database error="%v"`, err)
}
2017-10-10 09:55:25 -07:00
a.Logger.Info(`aggregator: DONE product="%v" interval="%v"`, product, interval)
2017-10-08 11:19:55 -07:00
}
func (a *Aggregator) Stop() {
a.setDone(true)
}
func (a *Aggregator) done() bool {
a.doneMu.Lock()
defer a.doneMu.Unlock()
return a.isDone
}
func (a *Aggregator) setDone(done bool) {
a.doneMu.Lock()
defer a.doneMu.Unlock()
a.isDone = done
}