This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/websocket/client.go

122 lines
1.9 KiB
Go
Raw Normal View History

2017-09-18 21:17:12 -07:00
package websocket
import (
"github.com/kcotugno/tacitus/gdax"
2017-09-18 23:11:53 -07:00
2017-09-18 21:17:12 -07:00
"github.com/gorilla/websocket"
2017-09-21 23:18:54 -07:00
"sync"
"time"
2017-09-18 21:17:12 -07:00
)
const ENDPOINT = "wss://ws-feed.gdax.com"
type Client struct {
2017-09-21 23:18:54 -07:00
msgs chan gdax.Message
err chan error
writer chan gdax.Request
2017-09-21 23:18:54 -07:00
conn *websocket.Conn
2017-09-18 23:11:53 -07:00
2017-09-21 23:18:54 -07:00
conMux sync.Mutex
connect bool
close chan bool
2017-09-18 21:17:12 -07:00
}
func NewClient() *Client {
c := Client{}
c.msgs = make(chan gdax.Message, 1024)
2017-09-20 23:20:20 -07:00
c.err = make(chan error)
c.writer = make(chan gdax.Request)
2017-09-20 23:20:20 -07:00
c.close = make(chan bool)
2017-09-18 21:17:12 -07:00
return &c
}
func (c *Client) Open() error {
2017-09-18 23:11:53 -07:00
var err error
c.conn, _, err = websocket.DefaultDialer.Dial(ENDPOINT, nil)
2017-09-18 21:17:12 -07:00
if err != nil {
2017-09-18 23:11:53 -07:00
return err
2017-09-18 21:17:12 -07:00
}
2017-09-21 23:18:54 -07:00
c.setConnected(true)
2017-09-18 21:17:12 -07:00
2017-09-21 23:18:54 -07:00
go func() {
for c.connected() {
var msg gdax.Message
2017-09-20 23:20:20 -07:00
2017-09-21 23:18:54 -07:00
err := c.conn.ReadJSON(&msg)
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure) {
c.sendClose(false)
2017-09-20 23:20:20 -07:00
c.err <- err
2017-09-21 23:18:54 -07:00
break
2017-09-20 23:20:20 -07:00
}
}
2017-09-21 23:18:54 -07:00
c.msgs <- msg
2017-09-18 21:17:12 -07:00
}
}()
2017-09-21 23:18:54 -07:00
go func() {
for c.connected() {
2017-09-18 23:11:53 -07:00
select {
2017-09-21 23:18:54 -07:00
case msg := <-c.writer:
2017-09-18 23:11:53 -07:00
err := c.conn.WriteJSON(msg)
2017-09-20 23:20:20 -07:00
if err != nil {
2017-09-21 23:18:54 -07:00
c.sendClose(false)
2017-09-20 23:20:20 -07:00
c.err <- err
2017-09-18 23:11:53 -07:00
}
2017-09-21 23:18:54 -07:00
case send := <-c.close:
if send {
c.conn.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure,
"done"),
time.Now().Add(30*time.Second))
}
2017-09-18 23:11:53 -07:00
}
2017-09-18 21:17:12 -07:00
}
}()
return nil
}
2017-09-18 23:11:53 -07:00
2017-09-20 23:20:20 -07:00
func (c *Client) Close() {
2017-09-21 23:18:54 -07:00
c.sendClose(true)
}
func (c *Client) sendClose(send bool) {
if c.connected() {
c.setConnected(false)
c.close <- send
}
2017-09-20 23:20:20 -07:00
}
func (c *Client) Send(msg gdax.Request) {
2017-09-21 23:18:54 -07:00
if c.connected() {
2017-09-18 23:11:53 -07:00
c.writer <- msg
}
}
2017-09-20 23:20:20 -07:00
2017-09-21 23:18:54 -07:00
func (c *Client) MessageStream() <-chan gdax.Message {
2017-09-20 23:20:20 -07:00
return c.msgs
}
2017-09-21 23:18:54 -07:00
func (c *Client) Error() <-chan error {
2017-09-20 23:20:20 -07:00
return c.err
}
2017-09-21 23:18:54 -07:00
func (c *Client) connected() bool {
c.conMux.Lock()
defer c.conMux.Unlock()
return c.connect
}
func (c *Client) setConnected(connected bool) {
c.conMux.Lock()
defer c.conMux.Unlock()
c.connect = connected
}