126 lines
2.0 KiB
Go
126 lines
2.0 KiB
Go
package gdax
|
|
|
|
import (
|
|
"github.com/kcotugno/tacitus"
|
|
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
type ListenerService struct {
|
|
Client Client
|
|
Logger tacitus.Logger
|
|
|
|
trades chan tacitus.Trade
|
|
err chan error
|
|
|
|
subs []string
|
|
|
|
closed bool
|
|
shouldRestart bool
|
|
restMu sync.Mutex
|
|
}
|
|
|
|
func (s *ListenerService) Open() error {
|
|
if s.closed {
|
|
return errors.New("Already used")
|
|
}
|
|
|
|
err := s.Client.Open()
|
|
if err != nil {
|
|
s.Logger.Info("GDAX Listener Error: %v", err)
|
|
}
|
|
s.Logger.Info("GDAX listener started")
|
|
|
|
s.trades = make(chan tacitus.Trade, 1024)
|
|
s.err = make(chan error)
|
|
s.setRestart(true)
|
|
|
|
s.listen()
|
|
|
|
return err
|
|
}
|
|
|
|
func (s *ListenerService) Close() {
|
|
s.closed = true
|
|
s.setRestart(false)
|
|
s.Client.Close()
|
|
s.Logger.Info("GDAX listener stopped")
|
|
}
|
|
|
|
func (s *ListenerService) Subscribe(products ...string) {
|
|
req := Request{}
|
|
req.Type = Subscribe
|
|
|
|
req.Channels = make([]Channel, 1)
|
|
req.Channels[0].Name = "matches"
|
|
req.Channels[0].ProductIds = products
|
|
|
|
s.subs = products
|
|
|
|
s.Client.Send(req)
|
|
}
|
|
|
|
func (s *ListenerService) Stream() <-chan tacitus.Trade {
|
|
return s.trades
|
|
}
|
|
|
|
func (s *ListenerService) Error() <-chan error {
|
|
return s.err
|
|
}
|
|
|
|
func (s *ListenerService) listen() {
|
|
go func() {
|
|
for msg := range s.Client.Stream() {
|
|
if msg.Type == "match" {
|
|
t := tacitus.Trade{}
|
|
t.TradeId = msg.TradeId
|
|
t.Product = msg.ProductId
|
|
t.Price = msg.Price
|
|
t.Size = msg.Size
|
|
t.Timestamp = msg.Time
|
|
|
|
switch msg.Side {
|
|
case "buy":
|
|
t.Buy = true
|
|
case "sell":
|
|
t.Sell = true
|
|
}
|
|
|
|
s.trades <- t
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for err := range s.Client.Error() {
|
|
s.Logger.Info("Error from GDAX client: %v", err)
|
|
s.err <- err
|
|
}
|
|
|
|
if s.restart() {
|
|
err := s.Open()
|
|
if err != nil {
|
|
s.Logger.Info("GDAX Unable to reastablish connection")
|
|
return
|
|
}
|
|
s.Subscribe(s.subs...)
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
func (s *ListenerService) restart() bool {
|
|
s.restMu.Lock()
|
|
defer s.restMu.Unlock()
|
|
|
|
return s.shouldRestart
|
|
}
|
|
|
|
func (s *ListenerService) setRestart(restart bool) {
|
|
s.restMu.Lock()
|
|
defer s.restMu.Unlock()
|
|
|
|
s.shouldRestart = restart
|
|
}
|