This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/gdax/websocket/client.go

126 lines
2.0 KiB
Go
Raw Normal View History

2017-09-18 21:17:12 -07:00
package websocket
import (
"github.com/gorilla/websocket"
2017-09-24 22:23:13 -07:00
"github.com/kcotugno/tacitus/gdax"
2017-09-21 23:18:54 -07:00
"sync"
"time"
2017-09-18 21:17:12 -07:00
)
2017-09-27 20:36:37 -07:00
const endpoint = "wss://ws-feed.gdax.com"
2017-09-18 21:17:12 -07:00
type Client struct {
2017-09-24 16:21:16 -07:00
msgs chan gdax.Message
2017-09-21 23:18:54 -07:00
err chan error
2017-09-24 16:21:16 -07:00
writer chan gdax.Request
2017-09-21 23:18:54 -07:00
conn *websocket.Conn
2017-09-18 23:11:53 -07:00
2017-09-21 23:18:54 -07:00
conMux sync.Mutex
connect bool
close chan bool
2017-09-18 21:17:12 -07:00
}
func NewClient() *Client {
c := Client{}
2017-09-24 16:21:16 -07:00
c.writer = make(chan gdax.Request)
2017-09-20 23:20:20 -07:00
c.close = make(chan bool)
2017-09-18 21:17:12 -07:00
return &c
}
func (c *Client) Open() error {
2017-09-18 23:11:53 -07:00
var err error
2017-09-27 20:36:37 -07:00
c.conn, _, err = websocket.DefaultDialer.Dial(endpoint, nil)
2017-09-18 21:17:12 -07:00
if err != nil {
2017-09-18 23:11:53 -07:00
return err
2017-09-18 21:17:12 -07:00
}
2017-09-21 23:18:54 -07:00
c.setConnected(true)
2017-09-18 21:17:12 -07:00
2017-09-24 16:21:16 -07:00
c.msgs = make(chan gdax.Message, 1024)
c.err = make(chan error)
2017-09-21 23:18:54 -07:00
go func() {
for c.connected() {
2017-09-24 16:21:16 -07:00
var msg gdax.Message
2017-09-20 23:20:20 -07:00
2017-09-21 23:18:54 -07:00
err := c.conn.ReadJSON(&msg)
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure) {
c.sendClose(false)
2017-09-20 23:20:20 -07:00
c.err <- err
}
2017-09-25 16:40:36 -07:00
break
2017-09-20 23:20:20 -07:00
}
2017-09-21 23:18:54 -07:00
c.msgs <- msg
2017-09-18 21:17:12 -07:00
}
2017-09-24 16:21:16 -07:00
close(c.msgs)
close(c.err)
2017-09-18 21:17:12 -07:00
}()
2017-09-21 23:18:54 -07:00
go func() {
for c.connected() {
2017-09-18 23:11:53 -07:00
select {
2017-09-21 23:18:54 -07:00
case msg := <-c.writer:
2017-09-18 23:11:53 -07:00
err := c.conn.WriteJSON(msg)
2017-09-20 23:20:20 -07:00
if err != nil {
2017-09-21 23:18:54 -07:00
c.sendClose(false)
2017-09-20 23:20:20 -07:00
c.err <- err
2017-09-18 23:11:53 -07:00
}
2017-09-21 23:18:54 -07:00
case send := <-c.close:
if send {
c.conn.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure,
"done"),
time.Now().Add(30*time.Second))
}
2017-09-18 23:11:53 -07:00
}
2017-09-18 21:17:12 -07:00
}
}()
return nil
}
2017-09-18 23:11:53 -07:00
2017-09-27 20:36:37 -07:00
func (c *Client) Close() error {
2017-09-21 23:18:54 -07:00
c.sendClose(true)
2017-09-27 20:36:37 -07:00
return nil
2017-09-21 23:18:54 -07:00
}
func (c *Client) sendClose(send bool) {
if c.connected() {
c.setConnected(false)
c.close <- send
}
2017-09-20 23:20:20 -07:00
}
2017-09-24 16:21:16 -07:00
func (c *Client) Send(msg gdax.Request) {
2017-09-21 23:18:54 -07:00
if c.connected() {
2017-09-18 23:11:53 -07:00
c.writer <- msg
}
}
2017-09-20 23:20:20 -07:00
2017-09-24 16:21:16 -07:00
func (c *Client) Stream() <-chan gdax.Message {
2017-09-20 23:20:20 -07:00
return c.msgs
}
2017-09-21 23:18:54 -07:00
func (c *Client) Error() <-chan error {
2017-09-20 23:20:20 -07:00
return c.err
}
2017-09-21 23:18:54 -07:00
func (c *Client) connected() bool {
c.conMux.Lock()
defer c.conMux.Unlock()
return c.connect
}
func (c *Client) setConnected(connected bool) {
c.conMux.Lock()
defer c.conMux.Unlock()
c.connect = connected
}