watcher: inital validator

This begin this code for the validator. Currently this just build an
array of the starting and ending missing trades.

TODO: Download the missing trades
This commit is contained in:
Kevin Cotugno 2017-09-27 23:23:34 -07:00
parent 69edac109b
commit f4fa7f24b3
6 changed files with 194 additions and 8 deletions

View File

@ -35,6 +35,12 @@ func main() {
return return
} }
v := validator{}
v.db = db
v.logger = logger
v.validate("ETH-USD", "BTC-USD")
v.stop()
t := make(chan bool) t := make(chan bool)
<-t <-t
} }

80
cmd/watcher/validator.go Normal file
View File

@ -0,0 +1,80 @@
package main
import (
"github.com/kcotugno/tacitus"
"time"
)
type validator struct {
db tacitus.DatabaseClientService
logger tacitus.Logger
ticker *time.Ticker
done chan bool
}
func (v *validator) stop() {
if v.ticker == nil || v.stop == nil {
v.ticker.Stop()
v.done <- true
}
}
func (v *validator) validate(products ...string) {
v.ticker = time.NewTicker(10 * time.Minute)
v.done = make(chan bool)
go func() {
v.emit_products(products...)
var done bool
for !done {
select {
case <-v.done:
done = true
case <-v.ticker.C:
v.emit_products(products...)
}
}
v.ticker = nil
v.done = nil
}()
}
func (v *validator) emit_products(products ...string) {
for _, p := range products {
go v.validate_product(p)
}
}
func (v *validator) validate_product(product string) {
v.logger.Info("Staring validation of %v", product)
results, err := v.db.TradeService().TradesAfterAll(product, 0)
if err != nil {
v.logger.Info("Error getting all trades: %v", err)
}
var trade tacitus.Trade
current := 0
missing := [][]int{}
for results.Next() {
trade = results.Value().(tacitus.Trade)
if trade.TradeId != current+1 {
missing = append(missing, []int{current + 1, trade.TradeId - 1})
}
current = trade.TradeId
}
for _, i := range missing {
v.logger.Info("%v", i)
}
v.logger.Info("DONE: %v", product)
}

55
postgres/trade_result.go Normal file
View File

@ -0,0 +1,55 @@
package postgres
import (
"github.com/kcotugno/tacitus"
"database/sql"
"errors"
)
type TradeResults struct {
rows *sql.Rows
trade tacitus.Trade
err error
}
func (r *TradeResults) Next() bool {
var t tacitus.Trade
var done bool
if r.rows == nil {
r.err = errors.New("No query results")
return false
}
if r.rows.Next() {
r.err = r.rows.Scan(&t.Id, &t.TradeId, &t.Product, &t.Price, &t.Size,
&t.Buy, &t.Sell, &t.Timestamp)
r.trade = t
} else {
done = true
r.err = r.rows.Err()
}
if r.err == nil && !done {
return true
} else {
return false
}
}
func (r *TradeResults) Value() interface{} {
return r.trade
}
func (r *TradeResults) Error() error {
return r.err
}
func (r *TradeResults) Close() error {
if r.rows != nil {
return r.rows.Close()
}
return nil
}

View File

@ -1,14 +1,14 @@
package postgres package postgres
import ( import (
"github.com/kcotugno/tacitus"
"database/sql" "database/sql"
"time" "time"
"github.com/kcotugno/tacitus"
) )
const ( const (
trade_columns = `trade_id, product, price, size, buy, sell, timestamp` trade_columns = ` trade_id, product, price, size, buy, sell, timestamp `
trade_insert = `INSERT INTO trades (` + trade_columns + `) VALUES ` + trade_insert = `INSERT INTO trades (` + trade_columns + `) VALUES ` +
`($1, $2, $3, $4, $5, $6, $7) RETURNING id,` + trade_columns + `;` `($1, $2, $3, $4, $5, $6, $7) RETURNING id,` + trade_columns + `;`
trade_find = `SELECT id, ` + trade_columns + ` FROM trades WHERE id = $1;` trade_find = `SELECT id, ` + trade_columns + ` FROM trades WHERE id = $1;`
@ -25,6 +25,9 @@ const (
`WHERE product = $1 AND trade_id < $2 ORDER BY trade_id DESC LIMIT $3;` `WHERE product = $1 AND trade_id < $2 ORDER BY trade_id DESC LIMIT $3;`
trade_before = `SELECT id, ` + trade_columns + ` FROM trades ` + trade_before = `SELECT id, ` + trade_columns + ` FROM trades ` +
`WHERE product = $1 AND trade_id > $2 ORDER BY trade_id ASC LIMIT $3;` `WHERE product = $1 AND trade_id > $2 ORDER BY trade_id ASC LIMIT $3;`
trade_product = `SELECT DISTINCT product FROM trades;`
trade_after_all = `SELECT id,` + trade_columns + `FROM trades WHERE product = $1 ` +
`AND trade_id > $2 ORDER BY trade_id ASC;`
) )
type TradeService struct { type TradeService struct {
@ -153,6 +156,44 @@ func (s *TradeService) TradesBefore(product string, id, limit int) ([]tacitus.Tr
} }
func (s *TradeService) TradeProducts() ([]string, error) {
s.client.logQuery(trade_product)
rows, err := s.client.db.Query(trade_product)
if err != nil {
s.client.logError(trade_product, err)
return nil, err
}
products := []string{}
for rows.Next() {
var p string
if err = rows.Scan(&p); err != nil {
s.client.logError(trade_product, err)
return nil, err
}
products = append(products, p)
}
return products, nil
}
func (s *TradeService) TradesAfterAll(product string, id int) (tacitus.Results, error) {
s.client.logQuery(trade_after_all, product, id)
rows, err := s.client.db.Query(trade_after_all, product, id)
if err != nil {
s.client.logError(trade_after_all, err, product, id)
return nil, err
}
results := TradeResults{}
results.rows = rows
return &results, nil
}
func deserializeTrades(rows *sql.Rows) ([]tacitus.Trade, error) { func deserializeTrades(rows *sql.Rows) ([]tacitus.Trade, error) {
defer rows.Close() defer rows.Close()

8
results.go Normal file
View File

@ -0,0 +1,8 @@
package tacitus
type Results interface {
Next() bool
Value() interface{}
Error() error
Close() error
}

View File

@ -32,11 +32,7 @@ type TradeService interface {
TradesAfter(product string, id, limit int) ([]Trade, error) TradesAfter(product string, id, limit int) ([]Trade, error)
TradesBefore(product string, id, limit int) ([]Trade, error) TradesBefore(product string, id, limit int) ([]Trade, error)
TradesAfterAll(product string, id int) error TradesAfterAll(product string, id int) (Results, error)
Next() (Trade, error)
Done() bool
CloseRows() error
} }
func ValidProduct(prod string) bool { func ValidProduct(prod string) bool {