From c5d82fab115a777f50f5cafdbd34c2ee452ecf63 Mon Sep 17 00:00:00 2001 From: Kevin Cotugno Date: Sun, 24 Sep 2017 22:16:16 -0700 Subject: [PATCH 1/4] Add logger setter to listener --- gdax/listener_service.go | 4 ++++ listener.go | 1 + 2 files changed, 5 insertions(+) diff --git a/gdax/listener_service.go b/gdax/listener_service.go index 9053769..29262ca 100644 --- a/gdax/listener_service.go +++ b/gdax/listener_service.go @@ -69,6 +69,10 @@ func (s *ListenerService) Error() <-chan error { return s.err } +func (s *ListenerService) SetLogger(logger tacitus.Logger) { + s.Logger = logger +} + func (s *ListenerService) listen() { go func() { for msg := range s.Client.Stream() { diff --git a/listener.go b/listener.go index 9f9391f..b2bdb55 100644 --- a/listener.go +++ b/listener.go @@ -6,4 +6,5 @@ type ListenerService interface { Subscribe(product string) Stream() <-chan Trade Error() <-chan error + SetLogger(logger Logger) } From db866e672b803c9ca4c8f89b69477d3dd0f68901 Mon Sep 17 00:00:00 2001 From: Kevin Cotugno Date: Sun, 24 Sep 2017 22:17:17 -0700 Subject: [PATCH 2/4] Add registrar --- ops/registrar.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 ops/registrar.go diff --git a/ops/registrar.go b/ops/registrar.go new file mode 100644 index 0000000..a518651 --- /dev/null +++ b/ops/registrar.go @@ -0,0 +1,67 @@ +package ops + +import ( + "github.com/kcotugno/tacitus" + "github.com/kcotugno/tacitus/gdax" + "github.com/kcotugno/tacitus/gdax/websocket" + "github.com/kcotugno/tacitus/postgres" +) + +type Registrar struct { + db *postgres.Client + wc *gdax.ListenerService + + logger tacitus.Logger +} + +func NewRegistrar() *Registrar { + r := Registrar{} + + r.db = postgres.NewClient() + r.wc = &gdax.ListenerService{} + r.wc.Client = websocket.NewClient() + + return &r +} + +func (r *Registrar) Record(products ...string) error { + r.db.Name = "gdax" + r.db.User = "gdax" + if err := r.db.Open(); err != nil { + return err + } + + if err := r.wc.Open(); err != nil { + return err + } + + r.record() + r.wc.Subscribe(products...) + + return nil +} + +func (r *Registrar) SetLogger(logger tacitus.Logger) { + r.db.SetLogger(logger) + r.wc.Logger = logger + r.logger = logger +} + +func (r *Registrar) record() { + go func () { + for t := range r.wc.Stream() { + go func (trade tacitus.Trade) { + _, err := r.db.TradeService().CreateTrade(trade) + if err != nil { + r.logger.Info("Error inserting trade: %v", err) + } + }(t) + } + }() + + go func () { + for err := range r.wc.Error() { + r.logger.Info("Registrar received error: %v", err) + } + }() +} From 04c17fc7b3337be3c21f5b0ed57fd5e90738c434 Mon Sep 17 00:00:00 2001 From: Kevin Cotugno Date: Sun, 24 Sep 2017 22:21:13 -0700 Subject: [PATCH 3/4] Use the new registrar --- cmd/watcher/main.go | 40 +++++++--------------------------------- 1 file changed, 7 insertions(+), 33 deletions(-) diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index d9a46a5..6688efc 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -1,45 +1,19 @@ package main import ( - "github.com/kcotugno/tacitus" + "github.com/kcotugno/tacitus/ops" "github.com/kcotugno/tacitus/osutil" - "github.com/kcotugno/tacitus/postgres" - "github.com/kcotugno/tacitus/gdax" - "github.com/kcotugno/tacitus/gdax/websocket" - - "log" ) func main() { logger := osutil.NewLogger() - c := websocket.NewClient() - ls := gdax.ListenerService{} - ls.Client = c - ls.Logger = logger - - db := postgres.NewClient() - db.Name = "gdax" - db.User = "gdax" - db.SetLogger(logger) - err := db.Open() - if err != nil { - log.Panic(err) + r := ops.NewRegistrar() + r.SetLogger(logger) + if err := r.Record("ETH-USD", "BTC-USD"); err != nil { + logger.Info("Error: %v", err) } - err = ls.Open() - if err != nil { - log.Print(err) - } - - ls.Subscribe("ETH-USD", "BTC-USD") - - for t := range ls.Stream() { - go func (trade tacitus.Trade) { - _, err := db.TradeService().CreateTrade(trade) - if err != nil { - logger.Info("Error inserting trade: %v", err) - } - }(t) - } + t := make(chan bool) + <- t } From c6289f8f5645f6d3e857941af5936c4330522180 Mon Sep 17 00:00:00 2001 From: Kevin Cotugno Date: Sun, 24 Sep 2017 22:23:13 -0700 Subject: [PATCH 4/4] gofmt --- cmd/watcher/main.go | 2 +- cmd/webapp/main.go | 6 +++--- gdax/listener_service.go | 4 ++-- gdax/websocket/client.go | 2 +- ops/registrar.go | 6 +++--- osutil/logger_service.go | 4 ++-- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index 6688efc..3ba41d6 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -15,5 +15,5 @@ func main() { } t := make(chan bool) - <- t + <-t } diff --git a/cmd/webapp/main.go b/cmd/webapp/main.go index 14e9b94..181bfc7 100644 --- a/cmd/webapp/main.go +++ b/cmd/webapp/main.go @@ -1,10 +1,10 @@ package main import ( - "log" "github.com/kcotugno/tacitus/http" - "github.com/kcotugno/tacitus/postgres" "github.com/kcotugno/tacitus/osutil" + "github.com/kcotugno/tacitus/postgres" + "log" ) func main() { @@ -23,5 +23,5 @@ func main() { defer s.Close() s.Open() - <- s.Err + <-s.Err } diff --git a/gdax/listener_service.go b/gdax/listener_service.go index 29262ca..f2cad63 100644 --- a/gdax/listener_service.go +++ b/gdax/listener_service.go @@ -16,9 +16,9 @@ type ListenerService struct { subs []string - closed bool + closed bool shouldRestart bool - restMu sync.Mutex + restMu sync.Mutex } func (s *ListenerService) Open() error { diff --git a/gdax/websocket/client.go b/gdax/websocket/client.go index 8ac263e..38c4cba 100644 --- a/gdax/websocket/client.go +++ b/gdax/websocket/client.go @@ -1,8 +1,8 @@ package websocket import ( - "github.com/kcotugno/tacitus/gdax" "github.com/gorilla/websocket" + "github.com/kcotugno/tacitus/gdax" "sync" "time" diff --git a/ops/registrar.go b/ops/registrar.go index a518651..928b633 100644 --- a/ops/registrar.go +++ b/ops/registrar.go @@ -48,9 +48,9 @@ func (r *Registrar) SetLogger(logger tacitus.Logger) { } func (r *Registrar) record() { - go func () { + go func() { for t := range r.wc.Stream() { - go func (trade tacitus.Trade) { + go func(trade tacitus.Trade) { _, err := r.db.TradeService().CreateTrade(trade) if err != nil { r.logger.Info("Error inserting trade: %v", err) @@ -59,7 +59,7 @@ func (r *Registrar) record() { } }() - go func () { + go func() { for err := range r.wc.Error() { r.logger.Info("Registrar received error: %v", err) } diff --git a/osutil/logger_service.go b/osutil/logger_service.go index 98fd2cf..a2d2e2c 100644 --- a/osutil/logger_service.go +++ b/osutil/logger_service.go @@ -11,10 +11,10 @@ const tmpl = "%v - %v: " type LoggerService struct { DateFormat string - Utc bool + Utc bool file *os.File - log bool + log bool } func NewLogger() *LoggerService {