Compare commits

..

3 Commits

Author SHA1 Message Date
f324ba576c Only send an error if there is a receiver 2017-11-27 20:13:30 -08:00
81ec3c1eb8 Continue refactor 2017-11-27 20:13:30 -08:00
3513e845c6 WIP 2017-11-27 20:13:30 -08:00
23 changed files with 102 additions and 249 deletions

View File

@ -23,13 +23,13 @@ watcher: $(WATCHER)
webapp: $(WEBAPP) webapp: $(WEBAPP)
get: get:
go get $(GETFLAGS) git.kevincotugno.com/kcotugno/tacitus go get $(GETFLAGS) github.com/kcotugno/tacitus
go get $(GETFLAGS) git.kevincotugno.com/kcotugno/tacitus/cmd/interval go get $(GETFLAGS) github.com/kcotugno/tacitus/cmd/interval
go get $(GETFLAGS) git.kevincotugno.com/kcotugno/tacitus/cmd/watcher go get $(GETFLAGS) github.com/kcotugno/tacitus/cmd/watcher
go get $(GETFLAGS) git.kevincotugno.com/kcotugno/tacitus/cmd/webapp go get $(GETFLAGS) github.com/kcotugno/tacitus/cmd/webapp
go get $(GETFLAGS) git.kevincotugno.com/kcotugno/tacitus/gdax go get $(GETFLAGS) github.com/kcotugno/tacitus/gdax
go get $(GETFLAGS) git.kevincotugno.com/kcotugno/tacitus/gdax/websocket go get $(GETFLAGS) github.com/kcotugno/tacitus/gdax/websocket
go get $(GETFLAGS) git.kevincotugno.com/kcotugno/tacitus/postgres go get $(GETFLAGS) github.com/kcotugno/tacitus/postgres
fmt: $(TACITUS_FILES) $(OSUTIL_FILES) $(POSTGRES_FILES) $(OPS_FILES) \ fmt: $(TACITUS_FILES) $(OSUTIL_FILES) $(POSTGRES_FILES) $(OPS_FILES) \
$(GDAX_FILES) $(GDAX_WEBSOCKET_FILES) $(HTTP_FILES) $(GDAX_FILES) $(GDAX_WEBSOCKET_FILES) $(HTTP_FILES)

View File

@ -9,7 +9,7 @@ import (
"strconv" "strconv"
"time" "time"
"git.kevincotugno.com/kcotugno/tacitus/postgres" "github.com/kcotugno/tacitus/postgres"
) )
func main() { func main() {

View File

@ -1,91 +1,27 @@
package main package main
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus/gdax"
"git.kevincotugno.com/kcotugno/tacitus/gdax" "github.com/kcotugno/tacitus/gdax/websocket"
"git.kevincotugno.com/kcotugno/tacitus/gdax/websocket" "github.com/kcotugno/tacitus/ops"
"git.kevincotugno.com/kcotugno/tacitus/ops" "github.com/kcotugno/tacitus/osutil"
"git.kevincotugno.com/kcotugno/tacitus/osutil" "github.com/kcotugno/tacitus/postgres"
"git.kevincotugno.com/kcotugno/tacitus/postgres"
"flag"
"fmt"
"os"
"os/signal"
"time" "time"
) )
type Parameters struct {
Products []string
Host string
Port int
User string
Name string
Password string
SSL string
}
type StringList []string
func parseFlags() Parameters {
var p Parameters
p.Products = make([]string, 0)
host := flag.String("host", "localhost", "Database host")
port := flag.Int("port", 5432, "Database port")
user := flag.String("user", "gdax", "Database user")
db := flag.String("name", "gdax", "Database name")
password := flag.String("password", `""`, "Database password")
ssl := flag.String("ssl", "disable", "Database ssl")
products := make(StringList, 0)
flag.Var(&products, "product", "Product to watch. This option can "+
"be specified multiple time")
flag.Parse()
for _, v := range products {
if !tacitus.ValidProduct(v) {
fmt.Printf("error: %v is not a valid product", v)
os.Exit(128)
}
}
p.Products = []string(products)
p.Host = *host
p.Port = *port
p.User = *user
p.Name = *db
p.Password = *password
p.SSL = *ssl
return p
}
func main() { func main() {
params := parseFlags()
if len(params.Products) == 0 {
flag.PrintDefaults()
os.Exit(1)
}
logger := osutil.NewLogger() logger := osutil.NewLogger()
db := postgres.NewClient() db := postgres.NewClient()
db.Name = params.Name db.Name = "gdax"
db.User = params.User db.User = "gdax"
db.Password = params.Password
db.Host = params.Host
db.Port = params.Port
db.SslMode = params.SSL
db.SetLogger(logger) db.SetLogger(logger)
err := db.Open() err := db.Open()
if err != nil { if err != nil {
logger.Info("Error openning database connection: %v", err) logger.Info("Error openning database connection: %v", err)
return return
} }
defer db.Close()
ws := websocket.NewClient() ws := websocket.NewClient()
ls := &gdax.ListenerService{} ls := &gdax.ListenerService{}
@ -95,50 +31,31 @@ func main() {
r := ops.Registrar{} r := ops.Registrar{}
r.Database = db r.Database = db
r.Listener = ls r.Listener = ls
r.SetLogger(logger) r.Logger = logger
if err := r.Record(params.Products...); err != nil { if err := r.Record("ETH-USD", "BTC-USD"); err != nil {
logger.Info("Error: %v", err) logger.Info("Error: %v", err)
os.Exit(3) return
} }
defer ls.Close()
a := ops.Aggregator{} a := ops.Aggregator{}
a.Database = db a.Database = db
a.Logger = logger a.Logger = logger
a.Products = params.Products a.Products = []string{"ETH-USD", "BTC-USD"}
a.Start(30 * time.Second) a.Start(30 * time.Second)
v := ops.Validator{} v := ops.Validator{}
v.Database = db v.Database = db
v.Logger = logger v.Logger = logger
v.Products = params.Products v.Products = []string{"ETH-USD", "BTC-USD"}
v.Start(30 * time.Second) v.Start(30 * time.Second)
av := ops.AggregationValidator{} av := ops.AggregationValidator{}
av.Database = db av.Database = db
av.Logger = logger av.Logger = logger
av.Products = params.Products av.Products = []string{"ETH-USD", "BTC-USD"}
av.Intervals = []time.Duration{30 * time.Second} av.Intervals = []time.Duration{30 * time.Second}
av.Start(30 * time.Second) av.Start(30 * time.Second)
c := make(chan os.Signal) t := make(chan bool)
signal.Notify(c, os.Interrupt) <-t
var done bool
for !done {
select {
case <-c:
done = true
}
}
}
func (l *StringList) String() string {
return "List of strings"
}
func (l *StringList) Set(value string) error {
*l = append(*l, value)
return nil
} }

View File

@ -1,9 +1,9 @@
package main package main
import ( import (
"git.kevincotugno.com/kcotugno/tacitus/http" "github.com/kcotugno/tacitus/http"
"git.kevincotugno.com/kcotugno/tacitus/osutil" "github.com/kcotugno/tacitus/osutil"
"git.kevincotugno.com/kcotugno/tacitus/postgres" "github.com/kcotugno/tacitus/postgres"
"log" "log"
) )

View File

@ -1,12 +1,13 @@
package gdax package gdax
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"errors" "errors"
"sync"
) )
const buf = 1024
type ListenerService struct { type ListenerService struct {
Client ListenerClient Client ListenerClient
Logger tacitus.Logger Logger tacitus.Logger
@ -14,19 +15,11 @@ type ListenerService struct {
trades chan tacitus.Trade trades chan tacitus.Trade
err chan error err chan error
subs []string dirty bool
closed bool
shouldRestart bool
restMu sync.Mutex
errorsMu sync.Mutex
sendErrors bool
} }
func (s *ListenerService) Open() error { func (s *ListenerService) Open() error {
if s.closed { if s.dirty {
return errors.New("Already used") return errors.New("Already used")
} }
@ -36,9 +29,8 @@ func (s *ListenerService) Open() error {
} }
s.Logger.Info("GDAX listener started") s.Logger.Info("GDAX listener started")
s.trades = make(chan tacitus.Trade, 1024) s.trades = make(chan tacitus.Trade, buf)
s.err = make(chan error) s.err = make(chan error)
s.setRestart(true)
s.listen() s.listen()
@ -46,15 +38,22 @@ func (s *ListenerService) Open() error {
} }
func (s *ListenerService) Close() error { func (s *ListenerService) Close() error {
s.closed = true
s.setRestart(false)
s.Client.Close() s.Client.Close()
s.Logger.Info("GDAX listener stopped") s.Logger.Info("GDAX listener stopped")
close(s.trades)
close(s.err)
return nil return nil
} }
func (s *ListenerService) Subscribe(products ...string) { func (s *ListenerService) Subscribe(products ...string) {
if s.dirty {
return
}
s.dirty = true
req := Request{} req := Request{}
req.Type = Subscribe req.Type = Subscribe
@ -62,8 +61,6 @@ func (s *ListenerService) Subscribe(products ...string) {
req.Channels[0].Name = "matches" req.Channels[0].Name = "matches"
req.Channels[0].ProductIds = products req.Channels[0].ProductIds = products
s.subs = products
s.Client.Send(req) s.Client.Send(req)
} }
@ -75,10 +72,6 @@ func (s *ListenerService) Error() <-chan error {
return s.err return s.err
} }
func (s *ListenerService) SetLogger(logger tacitus.Logger) {
s.Logger = logger
}
func (s *ListenerService) listen() { func (s *ListenerService) listen() {
go func() { go func() {
for msg := range s.Client.Stream() { for msg := range s.Client.Stream() {
@ -106,50 +99,16 @@ func (s *ListenerService) listen() {
for err := range s.Client.Error() { for err := range s.Client.Error() {
s.Logger.Info("Error from GDAX client: %v", err) s.Logger.Info("Error from GDAX client: %v", err)
if s.SendErrors() { select {
s.err <- err case s.err <- err:
default:
} }
close(s.trades)
close(s.err)
} }
s.Logger.Info("Error closed") s.Logger.Info("Error closed")
if s.restart() {
err := s.Open()
if err != nil {
s.Logger.Info("GDAX Unable to reastablish connection")
s.Close()
return
}
s.Subscribe(s.subs...)
}
}() }()
} }
func (s *ListenerService) restart() bool {
s.restMu.Lock()
defer s.restMu.Unlock()
return s.shouldRestart
}
func (s *ListenerService) setRestart(restart bool) {
s.restMu.Lock()
defer s.restMu.Unlock()
s.shouldRestart = restart
}
func (s *ListenerService) SendErrors() bool {
s.errorsMu.Lock()
defer s.errorsMu.Unlock()
return s.sendErrors
}
func (s *ListenerService) SetSendErrors(send bool) {
s.errorsMu.Lock()
defer s.errorsMu.Unlock()
s.sendErrors = send
}

View File

@ -1,7 +1,7 @@
package gdax package gdax
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"encoding/json" "encoding/json"
"net/http" "net/http"

View File

@ -2,7 +2,7 @@ package websocket
import ( import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"git.kevincotugno.com/kcotugno/tacitus/gdax" "github.com/kcotugno/tacitus/gdax"
"sync" "sync"
"time" "time"

View File

@ -1,7 +1,7 @@
package http package http
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"net/http" "net/http"
) )

View File

@ -4,7 +4,7 @@ import (
"net" "net"
"net/http" "net/http"
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
) )
const DefaultAddr = ":8080" const DefaultAddr = ":8080"

View File

@ -6,7 +6,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
) )
type TradeHandler struct { type TradeHandler struct {

View File

@ -6,7 +6,4 @@ type ListenerService interface {
Subscribe(products ...string) Subscribe(products ...string)
Stream() <-chan Trade Stream() <-chan Trade
Error() <-chan error Error() <-chan error
SendErrors() bool
SetSendErrors(send bool)
SetLogger(logger Logger)
} }

View File

@ -1,7 +1,7 @@
package ops package ops
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"sort" "sort"
"time" "time"

View File

@ -1,7 +1,7 @@
package ops package ops
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"sync" "sync"

View File

@ -1,14 +1,15 @@
package ops package ops
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
) )
type Registrar struct { type Registrar struct {
Database tacitus.DatabaseClientService Database tacitus.DatabaseClientService
Listener tacitus.ListenerService Listener tacitus.ListenerService
Logger tacitus.Logger
logger tacitus.Logger err chan error
} }
func (r *Registrar) Record(products ...string) error { func (r *Registrar) Record(products ...string) error {
@ -22,26 +23,38 @@ func (r *Registrar) Record(products ...string) error {
return nil return nil
} }
func (r *Registrar) SetLogger(logger tacitus.Logger) {
r.Listener.SetLogger(logger)
r.logger = logger
}
func (r *Registrar) record() { func (r *Registrar) record() {
go func() { go func() {
for t := range r.Listener.Stream() { for t := range r.Listener.Stream() {
go func(trade tacitus.Trade) { go func(trade tacitus.Trade) {
_, err := r.Database.TradeService().CreateTrade(trade) _, err := r.Database.TradeService().CreateTrade(trade)
if err != nil { if err != nil {
r.logger.Info("Error inserting trade: %v", err) r.Logger.Info("Error inserting trade: %v", err)
select {
case r.err <- err:
default:
}
} }
}(t) }(t)
} }
r.Logger.Info("Trade goroutine done")
}() }()
go func() { go func() {
for err := range r.Listener.Error() { for err := range r.Listener.Error() {
r.logger.Info("Registrar received error: %v", err) r.Logger.Info("Registrar received error: %v", err)
select {
case r.err <- err:
default:
} }
}
r.Logger.Info("Error goroutine done")
}() }()
} }
func (r *Registrar) Error() <-chan error {
return r.err
}

View File

@ -1,8 +1,8 @@
package ops package ops
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"git.kevincotugno.com/kcotugno/tacitus/gdax" "github.com/kcotugno/tacitus/gdax"
"sort" "sort"
"time" "time"

View File

@ -1,7 +1,7 @@
package postgres package postgres
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"database/sql" "database/sql"

View File

@ -1,7 +1,7 @@
package postgres package postgres
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"strings" "strings"
"time" "time"

View File

@ -7,19 +7,15 @@ import (
"database/sql" "database/sql"
"text/template" "text/template"
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
) )
const connStr = `host={{.Host}} dbname={{.Name}} port={{.Port}} user={{.User}} ` + const connStr = `user={{.User}} dbname={{.Name}} sslmode={{.SslMode}}`
`password={{.Password}} sslmode={{.SslMode}}`
type Client struct { type Client struct {
Host string
Port int
Name string Name string
User string User string
SslMode string SslMode string
Password string
tradeService TradeService tradeService TradeService
confirmationService ConfirmationService confirmationService ConfirmationService
@ -34,12 +30,6 @@ func NewClient() *Client {
c.tradeService.client = &c c.tradeService.client = &c
c.confirmationService.client = &c c.confirmationService.client = &c
c.aggregationService.client = &c c.aggregationService.client = &c
c.Host = "localhost"
c.Port = 5432
c.Name = "postgres"
c.User = "postgres"
c.Password = `""`
c.SslMode = "disable" c.SslMode = "disable"
return &c return &c

View File

@ -1,7 +1,7 @@
package postgres package postgres
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
) )
const ( const (

View File

@ -1,7 +1,7 @@
package postgres package postgres
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"database/sql" "database/sql"
"errors" "errors"

View File

@ -1,7 +1,7 @@
package postgres package postgres
import ( import (
"git.kevincotugno.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"database/sql" "database/sql"
"strings" "strings"

View File

@ -1,37 +0,0 @@
package tacitus
import (
"strings"
)
type Product string
const (
BtcUsd = "BTC-USD"
BtcEur = "BTC-EUR"
BtcGbp = "BTC-GBP"
BchUsd = "BCH-USD"
EthUsd = "ETH-USD"
EthBtc = "ETH-BTC"
EthEur = "ETH-EUR"
LtcUsd = "LTC-USD"
LtcBtc = "LTC-BTC"
LtcEur = "LTC-EUR"
)
var products = [...]string{BtcUsd, BtcEur, BtcGbp, BchUsd, EthUsd, EthBtc,
EthEur, LtcUsd, LtcBtc, LtcEur}
func ValidProduct(prod string) bool {
prod = strings.ToUpper(prod)
for _, p := range products {
if p == prod {
return true
}
}
return false
}

View File

@ -3,9 +3,12 @@ package tacitus
import ( import (
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"strings"
"time" "time"
) )
var PRODUCTS = [...]string{"BTC-USD", "ETH-USD"}
type Trade struct { type Trade struct {
Id int `json:"-"` Id int `json:"-"`
TradeId int `json:"trade_id,"` TradeId int `json:"trade_id,"`
@ -32,3 +35,14 @@ type TradeService interface {
TradesAfterAll(product string, id int) (Results, error) TradesAfterAll(product string, id int) (Results, error)
TradesWhereResults(sql string, params ...interface{}) (Results, error) TradesWhereResults(sql string, params ...interface{}) (Results, error)
} }
func ValidProduct(prod string) bool {
prod = strings.ToUpper(prod)
for _, p := range PRODUCTS {
if p == prod {
return true
}
}
return false
}