This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/gdax/listener_service.go

156 lines
2.5 KiB
Go
Raw Permalink Normal View History

2017-09-24 16:21:16 -07:00
package gdax
import (
"git.kevincotugno.com/kcotugno/tacitus"
2017-09-24 16:21:16 -07:00
"errors"
"sync"
)
type ListenerService struct {
2017-09-27 20:36:37 -07:00
Client ListenerClient
2017-09-24 16:21:16 -07:00
Logger tacitus.Logger
trades chan tacitus.Trade
err chan error
subs []string
2017-09-27 20:36:37 -07:00
closed bool
2017-09-24 16:21:16 -07:00
shouldRestart bool
2017-09-25 16:56:34 -07:00
restMu sync.Mutex
2017-09-25 16:40:36 -07:00
2017-09-25 16:56:34 -07:00
errorsMu sync.Mutex
2017-09-25 16:40:36 -07:00
sendErrors bool
2017-09-24 16:21:16 -07:00
}
func (s *ListenerService) Open() error {
if s.closed {
return errors.New("Already used")
}
err := s.Client.Open()
if err != nil {
s.Logger.Info("GDAX Listener Error: %v", err)
}
s.Logger.Info("GDAX listener started")
s.trades = make(chan tacitus.Trade, 1024)
s.err = make(chan error)
s.setRestart(true)
s.listen()
return err
}
2017-09-27 20:36:37 -07:00
func (s *ListenerService) Close() error {
2017-09-24 16:21:16 -07:00
s.closed = true
s.setRestart(false)
s.Client.Close()
s.Logger.Info("GDAX listener stopped")
2017-09-27 20:36:37 -07:00
return nil
2017-09-24 16:21:16 -07:00
}
func (s *ListenerService) Subscribe(products ...string) {
req := Request{}
req.Type = Subscribe
req.Channels = make([]Channel, 1)
req.Channels[0].Name = "matches"
req.Channels[0].ProductIds = products
s.subs = products
s.Client.Send(req)
}
func (s *ListenerService) Stream() <-chan tacitus.Trade {
return s.trades
}
func (s *ListenerService) Error() <-chan error {
return s.err
}
2017-09-24 22:16:16 -07:00
func (s *ListenerService) SetLogger(logger tacitus.Logger) {
s.Logger = logger
}
2017-09-24 16:21:16 -07:00
func (s *ListenerService) listen() {
go func() {
for msg := range s.Client.Stream() {
if msg.Type == "match" {
t := tacitus.Trade{}
t.TradeId = msg.TradeId
t.Product = msg.ProductId
t.Price = msg.Price
t.Size = msg.Size
t.Timestamp = msg.Time
switch msg.Side {
case "buy":
t.Buy = true
case "sell":
t.Sell = true
}
s.trades <- t
}
}
}()
go func() {
for err := range s.Client.Error() {
2017-09-24 16:25:00 -07:00
s.Logger.Info("Error from GDAX client: %v", err)
2017-09-25 16:40:36 -07:00
if s.SendErrors() {
s.err <- err
}
2017-09-24 16:21:16 -07:00
}
2017-09-25 16:40:36 -07:00
s.Logger.Info("Error closed")
2017-09-24 16:21:16 -07:00
if s.restart() {
err := s.Open()
if err != nil {
s.Logger.Info("GDAX Unable to reastablish connection")
2017-09-25 16:40:36 -07:00
s.Close()
2017-09-24 16:21:16 -07:00
return
}
s.Subscribe(s.subs...)
}
}()
}
func (s *ListenerService) restart() bool {
s.restMu.Lock()
defer s.restMu.Unlock()
return s.shouldRestart
}
func (s *ListenerService) setRestart(restart bool) {
s.restMu.Lock()
defer s.restMu.Unlock()
s.shouldRestart = restart
}
2017-09-25 16:40:36 -07:00
func (s *ListenerService) SendErrors() bool {
s.errorsMu.Lock()
defer s.errorsMu.Unlock()
return s.sendErrors
}
func (s *ListenerService) SetSendErrors(send bool) {
s.errorsMu.Lock()
defer s.errorsMu.Unlock()
s.sendErrors = send
}