package gdax import ( "git.kevincotugno.com/kcotugno/tacitus" "errors" "sync" ) type ListenerService struct { Client ListenerClient Logger tacitus.Logger trades chan tacitus.Trade err chan error subs []string closed bool shouldRestart bool restMu sync.Mutex errorsMu sync.Mutex sendErrors bool } func (s *ListenerService) Open() error { if s.closed { return errors.New("Already used") } err := s.Client.Open() if err != nil { s.Logger.Info("GDAX Listener Error: %v", err) } s.Logger.Info("GDAX listener started") s.trades = make(chan tacitus.Trade, 1024) s.err = make(chan error) s.setRestart(true) s.listen() return err } func (s *ListenerService) Close() error { s.closed = true s.setRestart(false) s.Client.Close() s.Logger.Info("GDAX listener stopped") return nil } func (s *ListenerService) Subscribe(products ...string) { req := Request{} req.Type = Subscribe req.Channels = make([]Channel, 1) req.Channels[0].Name = "matches" req.Channels[0].ProductIds = products s.subs = products s.Client.Send(req) } func (s *ListenerService) Stream() <-chan tacitus.Trade { return s.trades } func (s *ListenerService) Error() <-chan error { return s.err } func (s *ListenerService) SetLogger(logger tacitus.Logger) { s.Logger = logger } func (s *ListenerService) listen() { go func() { for msg := range s.Client.Stream() { if msg.Type == "match" { t := tacitus.Trade{} t.TradeId = msg.TradeId t.Product = msg.ProductId t.Price = msg.Price t.Size = msg.Size t.Timestamp = msg.Time switch msg.Side { case "buy": t.Buy = true case "sell": t.Sell = true } s.trades <- t } } }() go func() { for err := range s.Client.Error() { s.Logger.Info("Error from GDAX client: %v", err) if s.SendErrors() { s.err <- err } } s.Logger.Info("Error closed") if s.restart() { err := s.Open() if err != nil { s.Logger.Info("GDAX Unable to reastablish connection") s.Close() return } s.Subscribe(s.subs...) } }() } func (s *ListenerService) restart() bool { s.restMu.Lock() defer s.restMu.Unlock() return s.shouldRestart } func (s *ListenerService) setRestart(restart bool) { s.restMu.Lock() defer s.restMu.Unlock() s.shouldRestart = restart } func (s *ListenerService) SendErrors() bool { s.errorsMu.Lock() defer s.errorsMu.Unlock() return s.sendErrors } func (s *ListenerService) SetSendErrors(send bool) { s.errorsMu.Lock() defer s.errorsMu.Unlock() s.sendErrors = send }