Fix aggregation validator bugs

This commit is contained in:
Kevin Cotugno 2017-10-13 09:34:35 -07:00
parent 7a1e1fd8d8
commit 232b6a0215
5 changed files with 78 additions and 34 deletions

View File

@ -49,6 +49,13 @@ func main() {
v.Products = []string{"ETH-USD", "BTC-USD"}
v.Start(30 * time.Second)
av := ops.AggregationValidator{}
av.Database = db
av.Logger = logger
av.Products = []string{"ETH-USD", "BTC-USD"}
av.Intervals = []time.Duration{30 * time.Second}
av.Start(30 * time.Second)
t := make(chan bool)
<-t
}

View File

@ -3,17 +3,26 @@ package ops
import (
"github.com/kcotugno/tacitus"
"sort"
"time"
)
type timeSlice []time.Time
func (t timeSlice) Len() int { return len(t) }
func (t timeSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t timeSlice) Less(i, j int) bool { return t[i].Before(t[j]) }
type AggregationValidator struct {
Database tacitus.DatabaseClientService
Logger tacitus.Logger
Products []string
Interval time.Duration
Database tacitus.DatabaseClientService
Logger tacitus.Logger
Products []string
Intervals []time.Duration
ticker *time.Ticker
done chan bool
running bool
}
func (a *AggregationValidator) Start(frequency time.Duration) {
@ -22,15 +31,18 @@ func (a *AggregationValidator) Start(frequency time.Duration) {
go func() {
var done bool
a.ticker = time.NewTicker(frequency)
ticker := a.ticker
a.done = make(chan bool)
stop := a.done
a.emitProducts()
for !done {
select {
case <-a.done:
case <-stop:
ticker.Stop()
done = true
case <-a.ticker.C:
case <-ticker.C:
a.emitProducts()
}
}
@ -38,10 +50,6 @@ func (a *AggregationValidator) Start(frequency time.Duration) {
}
func (a *AggregationValidator) Stop() {
if a.ticker != nil {
a.ticker.Stop()
}
if a.done != nil {
a.done <- true
close(a.done)
@ -49,23 +57,34 @@ func (a *AggregationValidator) Stop() {
}
func (a *AggregationValidator) emitProducts() {
for _, p := range a.Products {
a.validate(p)
for _, i := range a.Intervals {
go a.validate(p, i)
}
}
}
func (a *AggregationValidator) validate(product string) {
a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, a.Interval)
conf, _ := a.Database.ConfirmationService().Confirmation(product, "a")
times, last_id := a.getTimes(product, conf.LastId)
func (a *AggregationValidator) validate(product string, interval time.Duration) {
var agg Aggregator
agg.Database = a.Database
agg.Logger = a.Logger
a.Logger.Info(`aggregation validator: product="%v" interval="%v"`, product, interval)
conf, _ := a.Database.ConfirmationService().Confirmation(product, "a")
times, last_id, err := a.getTimes(product, interval, conf.LastId)
if err != nil {
a.Logger.Info(`aggregation validator: FAIL product="%v interval="%v" error="%v"`,
product, interval, err)
return
}
a.Logger.Info("aggregation validator: %v missing: %v", product, len(times))
for _, t := range times {
agg.aggregate(product, t, a.Interval)
agg.aggregate(product, t, interval)
}
conf.Product = product
@ -78,29 +97,38 @@ func (a *AggregationValidator) validate(product string) {
}
a.Logger.Info(`aggregation validator: DONE product="%v interval="%v"`,
product, a.Interval)
product, interval)
}
func (a *AggregationValidator) getTimes(product string, last int) ([]time.Time, int) {
var newLast int
func (a *AggregationValidator) getTimes(product string, interval time.Duration,
last int) ([]time.Time, int, error) {
tmp := make(map[time.Time]struct{})
trades, err := a.Database.TradeService().
TradesWhere("product = $1 AND id >= $2 ORDER BY id ASC", product, last)
results, err := a.Database.TradeService().
TradesWhereResults("product = $1 AND id > $2 ORDER BY id ASC", product, last)
if err != nil {
a.Logger.Info(`aggregation validator: failed to retrive times error="%v"`, err)
return nil, 0, err
}
for _, t := range trades {
time := t.Timestamp.Truncate(a.Interval).Add(a.Interval)
var trade tacitus.Trade
for results.Next() {
trade = results.Value().(tacitus.Trade)
time := trade.Timestamp.Truncate(interval).Add(interval)
tmp[time] = struct{}{}
last = t.Id
last = trade.Id
}
var i int
t := make([]time.Time, len(tmp))
for k := range tmp {
t = append(t, k)
t[i] = k
i++
}
return t, newLast
sort.Sort(timeSlice(t))
return t, last, nil
}

View File

@ -62,7 +62,8 @@ func (a *Aggregator) aggregate(product string, end time.Time, interval time.Dura
return
}
var agg tacitus.Aggregation
agg, _ := a.Database.AggregationService().Aggregation(int(interval.Seconds()), product, end)
agg.Interval = int(interval.Seconds())
agg.Product = product
agg.Timestamp = end
@ -79,7 +80,12 @@ func (a *Aggregator) aggregate(product string, end time.Time, interval time.Dura
}
}
_, err = a.Database.AggregationService().CreateAggregation(agg)
if agg.Id == 0 {
_, err = a.Database.AggregationService().CreateAggregation(agg)
} else {
_, err = a.Database.AggregationService().UpdateAggregation(agg)
}
if err != nil {
a.Logger.Info(`aggregator: database error="%v"`, err)
}

View File

@ -158,7 +158,7 @@ func (s *TradeService) TradesBefore(product string, id, limit int) ([]tacitus.Tr
}
func (s *TradeService) TradesWhere(sql string, params ...interface{}) ([]tacitus.Trade, error) {
func (s *TradeService) TradesWhereResults(sql string, params ...interface{}) (tacitus.Results, error) {
stmt := strings.Join([]string{trade_where, sql, ";"}, " ")
s.client.logQuery(stmt, params...)
@ -170,7 +170,10 @@ func (s *TradeService) TradesWhere(sql string, params ...interface{}) ([]tacitus
return nil, err
}
return deserializeTrades(rows)
results := TradeResults{}
results.rows = rows
return &results, nil
}
func (s *TradeService) TradeProducts() ([]string, error) {

View File

@ -31,9 +31,9 @@ type TradeService interface {
LastTrades(product string, limit int) ([]Trade, error)
TradesAfter(product string, id, limit int) ([]Trade, error)
TradesBefore(product string, id, limit int) ([]Trade, error)
TradesWhere(sql string, params ...interface{}) ([]Trade, error)
TradesAfterAll(product string, id int) (Results, error)
TradesWhereResults(sql string, params ...interface{}) (Results, error)
}
func ValidProduct(prod string) bool {