From db866e672b803c9ca4c8f89b69477d3dd0f68901 Mon Sep 17 00:00:00 2001 From: Kevin Cotugno Date: Sun, 24 Sep 2017 22:17:17 -0700 Subject: [PATCH] Add registrar --- ops/registrar.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 ops/registrar.go diff --git a/ops/registrar.go b/ops/registrar.go new file mode 100644 index 0000000..a518651 --- /dev/null +++ b/ops/registrar.go @@ -0,0 +1,67 @@ +package ops + +import ( + "github.com/kcotugno/tacitus" + "github.com/kcotugno/tacitus/gdax" + "github.com/kcotugno/tacitus/gdax/websocket" + "github.com/kcotugno/tacitus/postgres" +) + +type Registrar struct { + db *postgres.Client + wc *gdax.ListenerService + + logger tacitus.Logger +} + +func NewRegistrar() *Registrar { + r := Registrar{} + + r.db = postgres.NewClient() + r.wc = &gdax.ListenerService{} + r.wc.Client = websocket.NewClient() + + return &r +} + +func (r *Registrar) Record(products ...string) error { + r.db.Name = "gdax" + r.db.User = "gdax" + if err := r.db.Open(); err != nil { + return err + } + + if err := r.wc.Open(); err != nil { + return err + } + + r.record() + r.wc.Subscribe(products...) + + return nil +} + +func (r *Registrar) SetLogger(logger tacitus.Logger) { + r.db.SetLogger(logger) + r.wc.Logger = logger + r.logger = logger +} + +func (r *Registrar) record() { + go func () { + for t := range r.wc.Stream() { + go func (trade tacitus.Trade) { + _, err := r.db.TradeService().CreateTrade(trade) + if err != nil { + r.logger.Info("Error inserting trade: %v", err) + } + }(t) + } + }() + + go func () { + for err := range r.wc.Error() { + r.logger.Info("Registrar received error: %v", err) + } + }() +}