diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index 7852802..3abad58 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -35,6 +35,12 @@ func main() { return } + v := validator{} + v.db = db + v.logger = logger + v.validate("ETH-USD", "BTC-USD") + v.stop() + t := make(chan bool) <-t } diff --git a/cmd/watcher/validator.go b/cmd/watcher/validator.go new file mode 100644 index 0000000..23e26bc --- /dev/null +++ b/cmd/watcher/validator.go @@ -0,0 +1,80 @@ +package main + +import ( + "github.com/kcotugno/tacitus" + + "time" +) + +type validator struct { + db tacitus.DatabaseClientService + logger tacitus.Logger + + ticker *time.Ticker + done chan bool +} + +func (v *validator) stop() { + if v.ticker == nil || v.stop == nil { + v.ticker.Stop() + v.done <- true + } +} + +func (v *validator) validate(products ...string) { + v.ticker = time.NewTicker(10 * time.Minute) + v.done = make(chan bool) + + go func() { + v.emit_products(products...) + + var done bool + + for !done { + select { + case <-v.done: + done = true + case <-v.ticker.C: + v.emit_products(products...) + } + } + + v.ticker = nil + v.done = nil + }() +} + +func (v *validator) emit_products(products ...string) { + for _, p := range products { + go v.validate_product(p) + } +} + +func (v *validator) validate_product(product string) { + v.logger.Info("Staring validation of %v", product) + + results, err := v.db.TradeService().TradesAfterAll(product, 0) + if err != nil { + v.logger.Info("Error getting all trades: %v", err) + } + + var trade tacitus.Trade + current := 0 + missing := [][]int{} + for results.Next() { + + trade = results.Value().(tacitus.Trade) + + if trade.TradeId != current+1 { + missing = append(missing, []int{current + 1, trade.TradeId - 1}) + } + + current = trade.TradeId + } + + for _, i := range missing { + v.logger.Info("%v", i) + } + + v.logger.Info("DONE: %v", product) +} diff --git a/postgres/trade_result.go b/postgres/trade_result.go new file mode 100644 index 0000000..10816d3 --- /dev/null +++ b/postgres/trade_result.go @@ -0,0 +1,55 @@ +package postgres + +import ( + "github.com/kcotugno/tacitus" + + "database/sql" + "errors" +) + +type TradeResults struct { + rows *sql.Rows + trade tacitus.Trade + err error +} + +func (r *TradeResults) Next() bool { + var t tacitus.Trade + var done bool + + if r.rows == nil { + r.err = errors.New("No query results") + return false + } + + if r.rows.Next() { + r.err = r.rows.Scan(&t.Id, &t.TradeId, &t.Product, &t.Price, &t.Size, + &t.Buy, &t.Sell, &t.Timestamp) + r.trade = t + } else { + done = true + r.err = r.rows.Err() + } + + if r.err == nil && !done { + return true + } else { + return false + } +} + +func (r *TradeResults) Value() interface{} { + return r.trade +} + +func (r *TradeResults) Error() error { + return r.err +} + +func (r *TradeResults) Close() error { + if r.rows != nil { + return r.rows.Close() + } + + return nil +} diff --git a/postgres/trade_service.go b/postgres/trade_service.go index d8842df..99dc451 100644 --- a/postgres/trade_service.go +++ b/postgres/trade_service.go @@ -1,14 +1,14 @@ package postgres import ( + "github.com/kcotugno/tacitus" + "database/sql" "time" - - "github.com/kcotugno/tacitus" ) const ( - trade_columns = `trade_id, product, price, size, buy, sell, timestamp` + trade_columns = ` trade_id, product, price, size, buy, sell, timestamp ` trade_insert = `INSERT INTO trades (` + trade_columns + `) VALUES ` + `($1, $2, $3, $4, $5, $6, $7) RETURNING id,` + trade_columns + `;` trade_find = `SELECT id, ` + trade_columns + ` FROM trades WHERE id = $1;` @@ -25,6 +25,9 @@ const ( `WHERE product = $1 AND trade_id < $2 ORDER BY trade_id DESC LIMIT $3;` trade_before = `SELECT id, ` + trade_columns + ` FROM trades ` + `WHERE product = $1 AND trade_id > $2 ORDER BY trade_id ASC LIMIT $3;` + trade_product = `SELECT DISTINCT product FROM trades;` + trade_after_all = `SELECT id,` + trade_columns + `FROM trades WHERE product = $1 ` + + `AND trade_id > $2 ORDER BY trade_id ASC;` ) type TradeService struct { @@ -153,6 +156,44 @@ func (s *TradeService) TradesBefore(product string, id, limit int) ([]tacitus.Tr } +func (s *TradeService) TradeProducts() ([]string, error) { + s.client.logQuery(trade_product) + rows, err := s.client.db.Query(trade_product) + if err != nil { + s.client.logError(trade_product, err) + + return nil, err + } + + products := []string{} + for rows.Next() { + var p string + if err = rows.Scan(&p); err != nil { + s.client.logError(trade_product, err) + + return nil, err + } + + products = append(products, p) + } + + return products, nil +} + +func (s *TradeService) TradesAfterAll(product string, id int) (tacitus.Results, error) { + s.client.logQuery(trade_after_all, product, id) + rows, err := s.client.db.Query(trade_after_all, product, id) + if err != nil { + s.client.logError(trade_after_all, err, product, id) + return nil, err + } + + results := TradeResults{} + results.rows = rows + + return &results, nil +} + func deserializeTrades(rows *sql.Rows) ([]tacitus.Trade, error) { defer rows.Close() diff --git a/results.go b/results.go new file mode 100644 index 0000000..09053a5 --- /dev/null +++ b/results.go @@ -0,0 +1,8 @@ +package tacitus + +type Results interface { + Next() bool + Value() interface{} + Error() error + Close() error +} diff --git a/trade.go b/trade.go index de43055..9591001 100644 --- a/trade.go +++ b/trade.go @@ -32,11 +32,7 @@ type TradeService interface { TradesAfter(product string, id, limit int) ([]Trade, error) TradesBefore(product string, id, limit int) ([]Trade, error) - TradesAfterAll(product string, id int) error - - Next() (Trade, error) - Done() bool - CloseRows() error + TradesAfterAll(product string, id int) (Results, error) } func ValidProduct(prod string) bool {