diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index d9a46a5..3ba41d6 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -1,45 +1,19 @@ package main import ( - "github.com/kcotugno/tacitus" + "github.com/kcotugno/tacitus/ops" "github.com/kcotugno/tacitus/osutil" - "github.com/kcotugno/tacitus/postgres" - "github.com/kcotugno/tacitus/gdax" - "github.com/kcotugno/tacitus/gdax/websocket" - - "log" ) func main() { logger := osutil.NewLogger() - c := websocket.NewClient() - ls := gdax.ListenerService{} - ls.Client = c - ls.Logger = logger - - db := postgres.NewClient() - db.Name = "gdax" - db.User = "gdax" - db.SetLogger(logger) - err := db.Open() - if err != nil { - log.Panic(err) + r := ops.NewRegistrar() + r.SetLogger(logger) + if err := r.Record("ETH-USD", "BTC-USD"); err != nil { + logger.Info("Error: %v", err) } - err = ls.Open() - if err != nil { - log.Print(err) - } - - ls.Subscribe("ETH-USD", "BTC-USD") - - for t := range ls.Stream() { - go func (trade tacitus.Trade) { - _, err := db.TradeService().CreateTrade(trade) - if err != nil { - logger.Info("Error inserting trade: %v", err) - } - }(t) - } + t := make(chan bool) + <-t } diff --git a/cmd/webapp/main.go b/cmd/webapp/main.go index 14e9b94..181bfc7 100644 --- a/cmd/webapp/main.go +++ b/cmd/webapp/main.go @@ -1,10 +1,10 @@ package main import ( - "log" "github.com/kcotugno/tacitus/http" - "github.com/kcotugno/tacitus/postgres" "github.com/kcotugno/tacitus/osutil" + "github.com/kcotugno/tacitus/postgres" + "log" ) func main() { @@ -23,5 +23,5 @@ func main() { defer s.Close() s.Open() - <- s.Err + <-s.Err } diff --git a/gdax/listener_service.go b/gdax/listener_service.go index b1363b5..ac0b9ac 100644 --- a/gdax/listener_service.go +++ b/gdax/listener_service.go @@ -16,7 +16,7 @@ type ListenerService struct { subs []string - closed bool + closed bool shouldRestart bool restMu sync.Mutex @@ -72,6 +72,10 @@ func (s *ListenerService) Error() <-chan error { return s.err } +func (s *ListenerService) SetLogger(logger tacitus.Logger) { + s.Logger = logger +} + func (s *ListenerService) listen() { go func() { for msg := range s.Client.Stream() { diff --git a/gdax/websocket/client.go b/gdax/websocket/client.go index 44861fe..04ed6ff 100644 --- a/gdax/websocket/client.go +++ b/gdax/websocket/client.go @@ -1,8 +1,8 @@ package websocket import ( - "github.com/kcotugno/tacitus/gdax" "github.com/gorilla/websocket" + "github.com/kcotugno/tacitus/gdax" "sync" "time" diff --git a/listener.go b/listener.go index 9f9391f..b2bdb55 100644 --- a/listener.go +++ b/listener.go @@ -6,4 +6,5 @@ type ListenerService interface { Subscribe(product string) Stream() <-chan Trade Error() <-chan error + SetLogger(logger Logger) } diff --git a/ops/registrar.go b/ops/registrar.go new file mode 100644 index 0000000..928b633 --- /dev/null +++ b/ops/registrar.go @@ -0,0 +1,67 @@ +package ops + +import ( + "github.com/kcotugno/tacitus" + "github.com/kcotugno/tacitus/gdax" + "github.com/kcotugno/tacitus/gdax/websocket" + "github.com/kcotugno/tacitus/postgres" +) + +type Registrar struct { + db *postgres.Client + wc *gdax.ListenerService + + logger tacitus.Logger +} + +func NewRegistrar() *Registrar { + r := Registrar{} + + r.db = postgres.NewClient() + r.wc = &gdax.ListenerService{} + r.wc.Client = websocket.NewClient() + + return &r +} + +func (r *Registrar) Record(products ...string) error { + r.db.Name = "gdax" + r.db.User = "gdax" + if err := r.db.Open(); err != nil { + return err + } + + if err := r.wc.Open(); err != nil { + return err + } + + r.record() + r.wc.Subscribe(products...) + + return nil +} + +func (r *Registrar) SetLogger(logger tacitus.Logger) { + r.db.SetLogger(logger) + r.wc.Logger = logger + r.logger = logger +} + +func (r *Registrar) record() { + go func() { + for t := range r.wc.Stream() { + go func(trade tacitus.Trade) { + _, err := r.db.TradeService().CreateTrade(trade) + if err != nil { + r.logger.Info("Error inserting trade: %v", err) + } + }(t) + } + }() + + go func() { + for err := range r.wc.Error() { + r.logger.Info("Registrar received error: %v", err) + } + }() +} diff --git a/osutil/logger_service.go b/osutil/logger_service.go index 98fd2cf..a2d2e2c 100644 --- a/osutil/logger_service.go +++ b/osutil/logger_service.go @@ -11,10 +11,10 @@ const tmpl = "%v - %v: " type LoggerService struct { DateFormat string - Utc bool + Utc bool file *os.File - log bool + log bool } func NewLogger() *LoggerService {