102 lines
3.5 KiB
Go
102 lines
3.5 KiB
Go
package postgres
|
|
|
|
import (
|
|
"github.com/kcotugno/tacitus"
|
|
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
agg_columns = `interval, product, price, buy_volume, sell_volume, ` +
|
|
`buy_transactions, sell_transactions, timestamp`
|
|
agg_find = `SELECT id, ` + agg_columns + ` FROM aggregations ` +
|
|
`WHERE interval = $1 AND product = $2 AND timestamp = $3;`
|
|
agg_insert = `INSERT INTO aggregations (` + agg_columns + `) VALUES ` +
|
|
`($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id, ` + agg_columns + `;`
|
|
agg_update = `UPDATE aggregations SET interval = $1, product = $2, ` +
|
|
`price = $3, buy_volume = $4, sell_volume = $5, buy_transactions = $6, ` +
|
|
`sell_transactions = $7, timestamp = $8 WHERE id = $9`
|
|
agg_missing = `SELECT id, ` + agg_columns + ` FROM (SELECT ` +
|
|
`generate_series(min(timestamp), max(timestamp), $1)::timestamptz ` +
|
|
`AS timestamp, $2::char(7) product FROM aggregations WHERE interval = $3 ` +
|
|
`AND product = $4) AS x LEFT OUTER JOIN aggregations a USING(timestamp, product) ` +
|
|
`WHERE a.id IS NULL ORDER BY x.timestamp;`
|
|
)
|
|
|
|
type AggregationService struct {
|
|
client *Client
|
|
}
|
|
|
|
func (s *AggregationService) Aggregation(interval int, product string,
|
|
timestamp time.Time) (tacitus.Aggregation, error) {
|
|
|
|
var agg tacitus.Aggregation
|
|
|
|
s.client.logQuery(agg_find, interval, product, timestamp)
|
|
|
|
row := s.client.db.QueryRow(agg_find, interval, product, timestamp)
|
|
err := row.Scan(&agg.Id, &agg.Interval, &agg.Product, &agg.Price,
|
|
&agg.BuyVolume, &agg.SellVolume, &agg.BuyTransactions,
|
|
&agg.SellTransactions, &agg.Timestamp)
|
|
if err != nil {
|
|
s.client.logError(agg_find, err, interval, product, timestamp)
|
|
}
|
|
|
|
return agg, nil
|
|
}
|
|
|
|
func (s *AggregationService) CreateAggregation(a tacitus.Aggregation) (tacitus.Aggregation, error) {
|
|
var agg tacitus.Aggregation
|
|
|
|
s.client.logQuery(agg_insert, a.Interval, a.Product, a.Price, a.BuyVolume,
|
|
a.SellVolume, a.BuyTransactions, a.SellTransactions, a.Timestamp)
|
|
|
|
row := s.client.db.QueryRow(agg_insert, a.Interval, a.Product, a.Price,
|
|
a.BuyVolume, a.SellVolume, a.BuyTransactions, a.SellTransactions,
|
|
a.Timestamp)
|
|
|
|
err := row.Scan(&agg.Id, &agg.Interval, &agg.Product, &agg.Price,
|
|
&agg.BuyVolume, &agg.SellVolume, &agg.BuyTransactions,
|
|
&agg.SellTransactions, &agg.Timestamp)
|
|
if err != nil {
|
|
s.client.logError(agg_insert, err, a.Interval, a.Product, a.Price, a.BuyVolume,
|
|
a.SellVolume, a.BuyTransactions, a.SellTransactions, a.Timestamp)
|
|
}
|
|
|
|
return agg, err
|
|
}
|
|
|
|
func (s *AggregationService) UpdateAggregation(a tacitus.Aggregation) (tacitus.Aggregation, error) {
|
|
s.client.logQuery(agg_update, a.Interval, a.Product, a.Price, a.BuyVolume,
|
|
a.SellVolume, a.BuyTransactions, a.SellTransactions,
|
|
a.Timestamp, a.Id)
|
|
|
|
_, err := s.client.db.Exec(agg_update, a.Interval, a.Product, a.Price,
|
|
a.BuyVolume, a.SellVolume, a.BuyTransactions, a.SellTransactions,
|
|
a.Timestamp, a.Id)
|
|
if err != nil {
|
|
s.client.logError(agg_update, err, a.Interval, a.Product, a.Price,
|
|
a.BuyVolume, a.SellVolume, a.BuyTransactions, a.SellTransactions,
|
|
a.Timestamp, a.Id)
|
|
}
|
|
|
|
return a, err
|
|
}
|
|
|
|
func (s *AggregationService) MissingTimes(interval int, product string) (tacitus.Results, error) {
|
|
s.client.logQuery(agg_missing, interval, product, interval, product)
|
|
|
|
rows, err := s.client.db.Query(agg_missing, interval, product, interval, product)
|
|
if err != nil {
|
|
s.client.logError(agg_missing, err, strings.Join([]string{string(interval), " seconds"}, ""), product, interval, product)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
results := AggregationResults{}
|
|
results.rows = rows
|
|
|
|
return &results, nil
|
|
}
|