Missing aggregations

This commit is contained in:
Kevin Cotugno 2017-10-17 07:12:24 -07:00
parent 232b6a0215
commit 2abed9d9ed
5 changed files with 154 additions and 9 deletions

View File

@ -22,4 +22,5 @@ type AggregationService interface {
Aggregation(interval int, product string, timestamp time.Time) (Aggregation, error)
CreateAggregation(a Aggregation) (Aggregation, error)
UpdateAggregation(a Aggregation) (Aggregation, error)
MissingTimes(interval int, product string) (Results, error)
}

View File

@ -35,13 +35,16 @@ func (a *AggregationValidator) Start(frequency time.Duration) {
a.done = make(chan bool)
stop := a.done
a.emitProducts()
zero := time.NewTicker(100 * frequency)
for !done {
select {
case <-stop:
ticker.Stop()
zero.Stop()
done = true
case <-zero.C:
a.emitZero()
case <-ticker.C:
a.emitProducts()
}
@ -57,7 +60,6 @@ func (a *AggregationValidator) Stop() {
}
func (a *AggregationValidator) emitProducts() {
for _, p := range a.Products {
for _, i := range a.Intervals {
go a.validate(p, i)
@ -123,12 +125,44 @@ func (a *AggregationValidator) getTimes(product string, interval time.Duration,
}
var i int
t := make([]time.Time, len(tmp))
times := make([]time.Time, len(tmp))
for k := range tmp {
t[i] = k
times[i] = k
i++
}
sort.Sort(timeSlice(t))
return t, last, nil
sort.Sort(timeSlice(times))
return times, last, nil
}
func (a *AggregationValidator) emitZero() {
for _, p := range a.Products {
for _, i := range a.Intervals {
go a.checkZero(p, i)
}
}
}
func (a *AggregationValidator) checkZero(product string, interval time.Duration) {
a.Logger.Info("aggregation validator: START missing aggregations. " +
"product: %v. interval: %v", product, interval)
missing, err := a.Database.AggregationService().MissingTimes(int(interval.Seconds()),
product)
if err != nil {
return
}
var i int
for missing.Next() {
i++
agg := missing.Value().(tacitus.Aggregation)
agg.Interval = int(interval.Seconds())
a.Database.AggregationService().CreateAggregation(agg)
}
a.Logger.Info("aggregation validator: DONE missing aggregations. " +
"product: %v. interval: %v", product, interval)
}

View File

@ -0,0 +1,89 @@
package postgres
import (
"github.com/kcotugno/tacitus"
"github.com/shopspring/decimal"
"database/sql"
"errors"
"time"
)
type AggregationResults struct {
rows *sql.Rows
aggregation tacitus.Aggregation
err error
}
func (r *AggregationResults) Next() bool {
var a tacitus.Aggregation
var id, interval, buyTransactions, sellTransactions *int
var product *string
var price, buyVolume, sellVolume *decimal.Decimal
var timestamp *time.Time
if r.rows == nil {
r.err = errors.New("No query results")
return false
}
if r.rows.Next() {
r.err = r.rows.Scan(&id, &interval, &product, &price, &buyVolume,
&sellVolume, &buyTransactions, &sellTransactions, &timestamp)
if id != nil {
a.Id = *id
}
if interval != nil {
a.Interval = *interval
}
if product != nil {
a.Product = *product
}
if price != nil {
a.Price = *price
}
if buyVolume != nil {
a.BuyVolume = *buyVolume
}
if sellVolume != nil {
a.SellVolume = *sellVolume
}
if buyTransactions != nil {
a.BuyTransactions = *buyTransactions
}
if sellTransactions != nil {
a.SellTransactions = *sellTransactions
}
if timestamp != nil {
a.Timestamp = *timestamp
}
r.aggregation = a
} else {
r.err = r.rows.Err()
return false
}
if r.err == nil {
return true
} else {
return false
}
}
func (r *AggregationResults) Value() interface{} {
return r.aggregation
}
func (r *AggregationResults) Error() error {
return r.err
}
func (r *AggregationResults) Close() error {
if r.rows != nil {
return r.rows.Close()
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"github.com/kcotugno/tacitus"
"time"
"strings"
)
const (
@ -16,6 +17,11 @@ const (
agg_update = `UPDATE aggregations SET interval = $1, product = $2, ` +
`price = $3, buy_volume = $4, sell_volume = $5, buy_transactions = $6, ` +
`sell_transactions = $7, timestamp = $8 WHERE id = $9`
agg_missing = `SELECT id, ` + agg_columns + ` FROM (SELECT ` +
`generate_series(min(timestamp), max(timestamp), $1)::timestamptz ` +
`AS timestamp, $2::char(7) product FROM aggregations WHERE interval = $3 ` +
`AND product = $4) AS x LEFT OUTER JOIN aggregations a USING(timestamp, product) ` +
`WHERE a.id IS NULL ORDER BY x.timestamp;`
)
type AggregationService struct {
@ -77,3 +83,19 @@ func (s *AggregationService) UpdateAggregation(a tacitus.Aggregation) (tacitus.A
return a, err
}
func (s *AggregationService) MissingTimes(interval int, product string) (tacitus.Results, error) {
s.client.logQuery(agg_missing, interval, product, interval, product)
rows, err := s.client.db.Query(agg_missing, interval, product, interval, product)
if err != nil {
s.client.logError(agg_missing, err, strings.Join([]string{string(interval), " seconds"}, ""), product, interval, product)
return nil, err
}
results := AggregationResults{}
results.rows = rows
return &results, nil
}

View File

@ -15,7 +15,6 @@ type TradeResults struct {
func (r *TradeResults) Next() bool {
var t tacitus.Trade
var done bool
if r.rows == nil {
r.err = errors.New("No query results")
@ -27,11 +26,11 @@ func (r *TradeResults) Next() bool {
&t.Buy, &t.Sell, &t.Timestamp)
r.trade = t
} else {
done = true
r.err = r.rows.Err()
return false
}
if r.err == nil && !done {
if r.err == nil {
return true
} else {
return false