This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/postgres/aggregation_service.go

102 lines
3.5 KiB
Go

package postgres
import (
"git.kevincotugno.com/kcotugno/tacitus"
"strings"
"time"
)
const (
agg_columns = `interval, product, price, buy_volume, sell_volume, ` +
`buy_transactions, sell_transactions, timestamp`
agg_find = `SELECT id, ` + agg_columns + ` FROM aggregations ` +
`WHERE interval = $1 AND product = $2 AND timestamp = $3;`
agg_insert = `INSERT INTO aggregations (` + agg_columns + `) VALUES ` +
`($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id, ` + agg_columns + `;`
agg_update = `UPDATE aggregations SET interval = $1, product = $2, ` +
`price = $3, buy_volume = $4, sell_volume = $5, buy_transactions = $6, ` +
`sell_transactions = $7, timestamp = $8 WHERE id = $9`
agg_missing = `SELECT id, ` + agg_columns + ` FROM (SELECT ` +
`generate_series(min(timestamp), max(timestamp), $1)::timestamptz ` +
`AS timestamp, $2::char(7) product FROM aggregations WHERE interval = $3 ` +
`AND product = $4) AS x LEFT OUTER JOIN aggregations a USING(timestamp, product) ` +
`WHERE a.id IS NULL ORDER BY x.timestamp;`
)
type AggregationService struct {
client *Client
}
func (s *AggregationService) Aggregation(interval int, product string,
timestamp time.Time) (tacitus.Aggregation, error) {
var agg tacitus.Aggregation
s.client.logQuery(agg_find, interval, product, timestamp)
row := s.client.db.QueryRow(agg_find, interval, product, timestamp)
err := row.Scan(&agg.Id, &agg.Interval, &agg.Product, &agg.Price,
&agg.BuyVolume, &agg.SellVolume, &agg.BuyTransactions,
&agg.SellTransactions, &agg.Timestamp)
if err != nil {
s.client.logError(agg_find, err, interval, product, timestamp)
}
return agg, nil
}
func (s *AggregationService) CreateAggregation(a tacitus.Aggregation) (tacitus.Aggregation, error) {
var agg tacitus.Aggregation
s.client.logQuery(agg_insert, a.Interval, a.Product, a.Price, a.BuyVolume,
a.SellVolume, a.BuyTransactions, a.SellTransactions, a.Timestamp)
row := s.client.db.QueryRow(agg_insert, a.Interval, a.Product, a.Price,
a.BuyVolume, a.SellVolume, a.BuyTransactions, a.SellTransactions,
a.Timestamp)
err := row.Scan(&agg.Id, &agg.Interval, &agg.Product, &agg.Price,
&agg.BuyVolume, &agg.SellVolume, &agg.BuyTransactions,
&agg.SellTransactions, &agg.Timestamp)
if err != nil {
s.client.logError(agg_insert, err, a.Interval, a.Product, a.Price, a.BuyVolume,
a.SellVolume, a.BuyTransactions, a.SellTransactions, a.Timestamp)
}
return agg, err
}
func (s *AggregationService) UpdateAggregation(a tacitus.Aggregation) (tacitus.Aggregation, error) {
s.client.logQuery(agg_update, a.Interval, a.Product, a.Price, a.BuyVolume,
a.SellVolume, a.BuyTransactions, a.SellTransactions,
a.Timestamp, a.Id)
_, err := s.client.db.Exec(agg_update, a.Interval, a.Product, a.Price,
a.BuyVolume, a.SellVolume, a.BuyTransactions, a.SellTransactions,
a.Timestamp, a.Id)
if err != nil {
s.client.logError(agg_update, err, a.Interval, a.Product, a.Price,
a.BuyVolume, a.SellVolume, a.BuyTransactions, a.SellTransactions,
a.Timestamp, a.Id)
}
return a, err
}
func (s *AggregationService) MissingTimes(interval int, product string) (tacitus.Results, error) {
s.client.logQuery(agg_missing, interval, product, interval, product)
rows, err := s.client.db.Query(agg_missing, interval, product, interval, product)
if err != nil {
s.client.logError(agg_missing, err, strings.Join([]string{string(interval), " seconds"}, ""), product, interval, product)
return nil, err
}
results := AggregationResults{}
results.rows = rows
return &results, nil
}