GDAX Service

This commit is contained in:
Kevin Cotugno 2017-09-24 16:21:16 -07:00
parent f7f51884b5
commit bca1ff4d39
6 changed files with 156 additions and 12 deletions

9
gdax/client.go Normal file
View File

@ -0,0 +1,9 @@
package gdax
type Client interface {
Open() error
Close()
Send(msg Request)
Stream() <-chan Message
Error() <-chan error
}

124
gdax/listener_service.go Normal file
View File

@ -0,0 +1,124 @@
package gdax
import (
"github.com/kcotugno/tacitus"
"errors"
"sync"
)
type ListenerService struct {
Client Client
Logger tacitus.Logger
trades chan tacitus.Trade
err chan error
subs []string
closed bool
shouldRestart bool
restMu sync.Mutex
}
func (s *ListenerService) Open() error {
if s.closed {
return errors.New("Already used")
}
err := s.Client.Open()
if err != nil {
s.Logger.Info("GDAX Listener Error: %v", err)
}
s.Logger.Info("GDAX listener started")
s.trades = make(chan tacitus.Trade, 1024)
s.err = make(chan error)
s.setRestart(true)
s.listen()
return err
}
func (s *ListenerService) Close() {
s.closed = true
s.setRestart(false)
s.Client.Close()
s.Logger.Info("GDAX listener stopped")
}
func (s *ListenerService) Subscribe(products ...string) {
req := Request{}
req.Type = Subscribe
req.Channels = make([]Channel, 1)
req.Channels[0].Name = "matches"
req.Channels[0].ProductIds = products
s.subs = products
s.Client.Send(req)
}
func (s *ListenerService) Stream() <-chan tacitus.Trade {
return s.trades
}
func (s *ListenerService) Error() <-chan error {
return s.err
}
func (s *ListenerService) listen() {
go func() {
for msg := range s.Client.Stream() {
if msg.Type == "match" {
t := tacitus.Trade{}
t.TradeId = msg.TradeId
t.Product = msg.ProductId
t.Price = msg.Price
t.Size = msg.Size
t.Timestamp = msg.Time
switch msg.Side {
case "buy":
t.Buy = true
case "sell":
t.Sell = true
}
s.trades <- t
}
}
}()
go func() {
for err := range s.Client.Error() {
s.err <- err
}
if s.restart() {
err := s.Open()
if err != nil {
s.Logger.Info("GDAX Unable to reastablish connection")
return
}
s.Subscribe(s.subs...)
}
}()
}
func (s *ListenerService) restart() bool {
s.restMu.Lock()
defer s.restMu.Unlock()
return s.shouldRestart
}
func (s *ListenerService) setRestart(restart bool) {
s.restMu.Lock()
defer s.restMu.Unlock()
s.shouldRestart = restart
}

View File

@ -1,4 +1,4 @@
package tacitus
package gdax
import (
"github.com/shopspring/decimal"

View File

@ -1,4 +1,4 @@
package tacitus
package gdax
const Subscribe = RequestType("subscribe")
const Unsubscribe = RequestType("unsubscribe")

View File

@ -1,8 +1,7 @@
package websocket
import (
"github.com/kcotugno/tacitus"
"github.com/kcotugno/tacitus/gdax"
"github.com/gorilla/websocket"
"sync"
@ -12,9 +11,9 @@ import (
const ENDPOINT = "wss://ws-feed.gdax.com"
type Client struct {
msgs chan tacitus.Message
msgs chan gdax.Message
err chan error
writer chan tacitus.Request
writer chan gdax.Request
conn *websocket.Conn
conMux sync.Mutex
@ -24,9 +23,7 @@ type Client struct {
func NewClient() *Client {
c := Client{}
c.msgs = make(chan tacitus.Message, 1024)
c.err = make(chan error)
c.writer = make(chan tacitus.Request)
c.writer = make(chan gdax.Request)
c.close = make(chan bool)
return &c
@ -41,9 +38,11 @@ func (c *Client) Open() error {
}
c.setConnected(true)
c.msgs = make(chan gdax.Message, 1024)
c.err = make(chan error)
go func() {
for c.connected() {
var msg tacitus.Message
var msg gdax.Message
err := c.conn.ReadJSON(&msg)
if err != nil {
@ -56,6 +55,9 @@ func (c *Client) Open() error {
}
c.msgs <- msg
}
close(c.msgs)
close(c.err)
}()
go func() {
@ -92,13 +94,13 @@ func (c *Client) sendClose(send bool) {
}
}
func (c *Client) Send(msg tacitus.Request) {
func (c *Client) Send(msg gdax.Request) {
if c.connected() {
c.writer <- msg
}
}
func (c *Client) MessageStream() <-chan tacitus.Message {
func (c *Client) Stream() <-chan gdax.Message {
return c.msgs
}

9
listener.go Normal file
View File

@ -0,0 +1,9 @@
package tacitus
type ListenerService interface {
Open() error
Close()
Subscribe(product string)
Stream() <-chan Trade
Error() <-chan error
}