Validator fully working
This commit is contained in:
parent
083519d498
commit
8a78f6e5aa
@ -59,19 +59,31 @@ func (v *validator) emitProducts(products ...string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (v *validator) validateProduct(product string) {
|
func (v *validator) validateProduct(product string) {
|
||||||
v.logger.Info("Validating %v", product)
|
v.logger.Info("validator: %v", product)
|
||||||
|
|
||||||
groups := v.findMissingGroups(product)
|
conf, _ := v.db.ConfirmationService().Confirmation(product)
|
||||||
|
|
||||||
|
groups, last_id := v.findMissingGroups(product, conf.LastTradeId)
|
||||||
v.getMissingTrades(product, groups)
|
v.getMissingTrades(product, groups)
|
||||||
|
|
||||||
v.logger.Info("DONE: %v", product)
|
conf.LastTradeId = last_id
|
||||||
|
conf.Product = product
|
||||||
|
if conf.Id == 0 {
|
||||||
|
v.db.ConfirmationService().CreateConfirmation(conf)
|
||||||
|
} else {
|
||||||
|
v.db.ConfirmationService().UpdateConfirmation(conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.logger.Info("validator: DONE %v", product)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *validator) getMissingTrades(product string, groups [][]int) {
|
func (v *validator) getMissingTrades(product string, groups [][]int) {
|
||||||
c := gdax.NewPublicClient()
|
c := gdax.NewPublicClient()
|
||||||
|
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
v.logger.Info("%v", group)
|
total := 1 + group[1] - group[0]
|
||||||
|
v.logger.Info("validator: retrieving %v missing trade(s): %v", total, group)
|
||||||
|
|
||||||
for i := group[1]; i >= group[0]; i-- {
|
for i := group[1]; i >= group[0]; i-- {
|
||||||
ts, _ := c.GetTradesBefore(product, i+1)
|
ts, _ := c.GetTradesBefore(product, i+1)
|
||||||
sort.Sort(ByTradeId(ts))
|
sort.Sort(ByTradeId(ts))
|
||||||
@ -85,27 +97,31 @@ func (v *validator) getMissingTrades(product string, groups [][]int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v.logger.Info("validator: DONE")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *validator) findMissingGroups(product string) [][]int {
|
func (v *validator) findMissingGroups(product string, starting int) ([][]int, int) {
|
||||||
results, err := v.db.TradeService().TradesAfterAll(product, 0)
|
results, err := v.db.TradeService().TradesAfterAll(product, starting)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
v.logger.Info("Error getting all trades: %v", err)
|
v.logger.Info("Error getting all trades: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
var trade tacitus.Trade
|
var trade tacitus.Trade
|
||||||
current := 0
|
|
||||||
missing := [][]int{}
|
missing := [][]int{}
|
||||||
|
current := starting
|
||||||
|
|
||||||
for results.Next() {
|
for results.Next() {
|
||||||
trade = results.Value().(tacitus.Trade)
|
trade = results.Value().(tacitus.Trade)
|
||||||
|
|
||||||
if trade.TradeId != current+1 {
|
if trade.TradeId != current+1 && trade.TradeId != current {
|
||||||
missing = append(missing, []int{current + 1, trade.TradeId - 1})
|
missing = append(missing, []int{current + 1, trade.TradeId - 1})
|
||||||
}
|
}
|
||||||
|
|
||||||
current = trade.TradeId
|
current = trade.TradeId
|
||||||
}
|
}
|
||||||
|
|
||||||
return missing
|
return missing, current
|
||||||
}
|
}
|
||||||
|
@ -4,5 +4,6 @@ type DatabaseClientService interface {
|
|||||||
Open() error
|
Open() error
|
||||||
Close() error
|
Close() error
|
||||||
TradeService() TradeService
|
TradeService() TradeService
|
||||||
|
ConfirmationService() ConfirmationService
|
||||||
SetLogger(logger Logger)
|
SetLogger(logger Logger)
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ type Client struct {
|
|||||||
SslMode string
|
SslMode string
|
||||||
|
|
||||||
tradeService TradeService
|
tradeService TradeService
|
||||||
|
confirmationService ConfirmationService
|
||||||
|
|
||||||
logger tacitus.Logger
|
logger tacitus.Logger
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
@ -26,6 +27,7 @@ type Client struct {
|
|||||||
func NewClient() *Client {
|
func NewClient() *Client {
|
||||||
c := Client{}
|
c := Client{}
|
||||||
c.tradeService.client = &c
|
c.tradeService.client = &c
|
||||||
|
c.confirmationService.client = &c
|
||||||
c.SslMode = "disable"
|
c.SslMode = "disable"
|
||||||
|
|
||||||
return &c
|
return &c
|
||||||
@ -66,6 +68,10 @@ func (c *Client) TradeService() tacitus.TradeService {
|
|||||||
return &c.tradeService
|
return &c.tradeService
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) ConfirmationService() tacitus.ConfirmationService {
|
||||||
|
return &c.confirmationService
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) log(format string, params ...interface{}) {
|
func (c *Client) log(format string, params ...interface{}) {
|
||||||
if c.logger != nil {
|
if c.logger != nil {
|
||||||
c.logger.Info(format, params...)
|
c.logger.Info(format, params...)
|
||||||
|
@ -8,9 +8,9 @@ const (
|
|||||||
conf_columns = `product, last_trade_id`
|
conf_columns = `product, last_trade_id`
|
||||||
conf_insert = `INSERT INTO confirmations (` + conf_columns +
|
conf_insert = `INSERT INTO confirmations (` + conf_columns +
|
||||||
`) VALUES ($1, $2) RETURNING id, ` + conf_columns + `;`
|
`) VALUES ($1, $2) RETURNING id, ` + conf_columns + `;`
|
||||||
conf_find_product = `SELECT id, ` + conf_columns + ` FROM confirmations` +
|
conf_find_product = `SELECT id, ` + conf_columns + ` FROM confirmations ` +
|
||||||
`WHERE product = $1;`
|
`WHERE product = $1;`
|
||||||
conf_update = `UPDATE confirmation SET last_trade_id = $1 WHERE product = $2;`
|
conf_update = `UPDATE confirmations SET last_trade_id = $1 WHERE product = $2;`
|
||||||
)
|
)
|
||||||
|
|
||||||
type ConfirmationService struct {
|
type ConfirmationService struct {
|
||||||
@ -50,7 +50,7 @@ func (s *ConfirmationService) CreateConfirmation(c tacitus.Confirmation) (tacitu
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ConfirmationService) UpdateConfirmation(c tacitus.Confirmation) (tacitus.Confirmation, error) {
|
func (s *ConfirmationService) UpdateConfirmation(c tacitus.Confirmation) (tacitus.Confirmation, error) {
|
||||||
s.client.logQuery(conf_insert, c.LastTradeId, c.Product)
|
s.client.logQuery(conf_update, c.LastTradeId, c.Product)
|
||||||
|
|
||||||
_, err := s.client.db.Exec(conf_update, c.LastTradeId, c.Product)
|
_, err := s.client.db.Exec(conf_update, c.LastTradeId, c.Product)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -27,7 +27,7 @@ const (
|
|||||||
`WHERE product = $1 AND trade_id > $2 ORDER BY trade_id ASC LIMIT $3;`
|
`WHERE product = $1 AND trade_id > $2 ORDER BY trade_id ASC LIMIT $3;`
|
||||||
trade_product = `SELECT DISTINCT product FROM trades;`
|
trade_product = `SELECT DISTINCT product FROM trades;`
|
||||||
trade_after_all = `SELECT id,` + trade_columns + `FROM trades WHERE product = $1 ` +
|
trade_after_all = `SELECT id,` + trade_columns + `FROM trades WHERE product = $1 ` +
|
||||||
`AND trade_id > $2 ORDER BY trade_id ASC;`
|
`AND trade_id >= $2 ORDER BY trade_id ASC;`
|
||||||
)
|
)
|
||||||
|
|
||||||
type TradeService struct {
|
type TradeService struct {
|
||||||
|
Reference in New Issue
Block a user