From bca1ff4d39a0a79c6f94d2f515a8fb9666657146 Mon Sep 17 00:00:00 2001 From: Kevin Cotugno Date: Sun, 24 Sep 2017 16:21:16 -0700 Subject: [PATCH] GDAX Service --- gdax/client.go | 9 ++ gdax/listener_service.go | 124 ++++++++++++++++++++++++ message.go => gdax/message.go | 2 +- request.go => gdax/request.go | 2 +- {websocket => gdax/websocket}/client.go | 22 +++-- listener.go | 9 ++ 6 files changed, 156 insertions(+), 12 deletions(-) create mode 100644 gdax/client.go create mode 100644 gdax/listener_service.go rename message.go => gdax/message.go (98%) rename request.go => gdax/request.go (96%) rename {websocket => gdax/websocket}/client.go (84%) create mode 100644 listener.go diff --git a/gdax/client.go b/gdax/client.go new file mode 100644 index 0000000..996d6f7 --- /dev/null +++ b/gdax/client.go @@ -0,0 +1,9 @@ +package gdax + +type Client interface { + Open() error + Close() + Send(msg Request) + Stream() <-chan Message + Error() <-chan error +} diff --git a/gdax/listener_service.go b/gdax/listener_service.go new file mode 100644 index 0000000..9dfcd35 --- /dev/null +++ b/gdax/listener_service.go @@ -0,0 +1,124 @@ +package gdax + +import ( + "github.com/kcotugno/tacitus" + + "errors" + "sync" +) + +type ListenerService struct { + Client Client + Logger tacitus.Logger + + trades chan tacitus.Trade + err chan error + + subs []string + + closed bool + shouldRestart bool + restMu sync.Mutex +} + +func (s *ListenerService) Open() error { + if s.closed { + return errors.New("Already used") + } + + err := s.Client.Open() + if err != nil { + s.Logger.Info("GDAX Listener Error: %v", err) + } + s.Logger.Info("GDAX listener started") + + s.trades = make(chan tacitus.Trade, 1024) + s.err = make(chan error) + s.setRestart(true) + + s.listen() + + return err +} + +func (s *ListenerService) Close() { + s.closed = true + s.setRestart(false) + s.Client.Close() + s.Logger.Info("GDAX listener stopped") +} + +func (s *ListenerService) Subscribe(products ...string) { + req := Request{} + req.Type = Subscribe + + req.Channels = make([]Channel, 1) + req.Channels[0].Name = "matches" + req.Channels[0].ProductIds = products + + s.subs = products + + s.Client.Send(req) +} + +func (s *ListenerService) Stream() <-chan tacitus.Trade { + return s.trades +} + +func (s *ListenerService) Error() <-chan error { + return s.err +} + +func (s *ListenerService) listen() { + go func() { + for msg := range s.Client.Stream() { + if msg.Type == "match" { + t := tacitus.Trade{} + t.TradeId = msg.TradeId + t.Product = msg.ProductId + t.Price = msg.Price + t.Size = msg.Size + t.Timestamp = msg.Time + + switch msg.Side { + case "buy": + t.Buy = true + case "sell": + t.Sell = true + } + + s.trades <- t + } + } + }() + + go func() { + for err := range s.Client.Error() { + s.err <- err + } + + if s.restart() { + err := s.Open() + if err != nil { + s.Logger.Info("GDAX Unable to reastablish connection") + return + } + s.Subscribe(s.subs...) + } + }() + +} + +func (s *ListenerService) restart() bool { + s.restMu.Lock() + defer s.restMu.Unlock() + + return s.shouldRestart +} + +func (s *ListenerService) setRestart(restart bool) { + s.restMu.Lock() + defer s.restMu.Unlock() + + s.shouldRestart = restart +} diff --git a/message.go b/gdax/message.go similarity index 98% rename from message.go rename to gdax/message.go index 86f6922..f441ef0 100644 --- a/message.go +++ b/gdax/message.go @@ -1,4 +1,4 @@ -package tacitus +package gdax import ( "github.com/shopspring/decimal" diff --git a/request.go b/gdax/request.go similarity index 96% rename from request.go rename to gdax/request.go index 890ae81..579ca93 100644 --- a/request.go +++ b/gdax/request.go @@ -1,4 +1,4 @@ -package tacitus +package gdax const Subscribe = RequestType("subscribe") const Unsubscribe = RequestType("unsubscribe") diff --git a/websocket/client.go b/gdax/websocket/client.go similarity index 84% rename from websocket/client.go rename to gdax/websocket/client.go index cb3288a..8ac263e 100644 --- a/websocket/client.go +++ b/gdax/websocket/client.go @@ -1,8 +1,7 @@ package websocket import ( - "github.com/kcotugno/tacitus" - + "github.com/kcotugno/tacitus/gdax" "github.com/gorilla/websocket" "sync" @@ -12,9 +11,9 @@ import ( const ENDPOINT = "wss://ws-feed.gdax.com" type Client struct { - msgs chan tacitus.Message + msgs chan gdax.Message err chan error - writer chan tacitus.Request + writer chan gdax.Request conn *websocket.Conn conMux sync.Mutex @@ -24,9 +23,7 @@ type Client struct { func NewClient() *Client { c := Client{} - c.msgs = make(chan tacitus.Message, 1024) - c.err = make(chan error) - c.writer = make(chan tacitus.Request) + c.writer = make(chan gdax.Request) c.close = make(chan bool) return &c @@ -41,9 +38,11 @@ func (c *Client) Open() error { } c.setConnected(true) + c.msgs = make(chan gdax.Message, 1024) + c.err = make(chan error) go func() { for c.connected() { - var msg tacitus.Message + var msg gdax.Message err := c.conn.ReadJSON(&msg) if err != nil { @@ -56,6 +55,9 @@ func (c *Client) Open() error { } c.msgs <- msg } + + close(c.msgs) + close(c.err) }() go func() { @@ -92,13 +94,13 @@ func (c *Client) sendClose(send bool) { } } -func (c *Client) Send(msg tacitus.Request) { +func (c *Client) Send(msg gdax.Request) { if c.connected() { c.writer <- msg } } -func (c *Client) MessageStream() <-chan tacitus.Message { +func (c *Client) Stream() <-chan gdax.Message { return c.msgs } diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..9f9391f --- /dev/null +++ b/listener.go @@ -0,0 +1,9 @@ +package tacitus + +type ListenerService interface { + Open() error + Close() + Subscribe(product string) + Stream() <-chan Trade + Error() <-chan error +}