This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
tacitus/websocket/client.go
Kevin Cotugno f7f51884b5 Revert "Move gdax message/request to new package"
This reverts commit ee3a0adf74b6861f040944abfc5e6087e4b6436f.
2017-09-23 17:35:05 -07:00

122 lines
1.9 KiB
Go

package websocket
import (
"github.com/kcotugno/tacitus"
"github.com/gorilla/websocket"
"sync"
"time"
)
const ENDPOINT = "wss://ws-feed.gdax.com"
type Client struct {
msgs chan tacitus.Message
err chan error
writer chan tacitus.Request
conn *websocket.Conn
conMux sync.Mutex
connect bool
close chan bool
}
func NewClient() *Client {
c := Client{}
c.msgs = make(chan tacitus.Message, 1024)
c.err = make(chan error)
c.writer = make(chan tacitus.Request)
c.close = make(chan bool)
return &c
}
func (c *Client) Open() error {
var err error
c.conn, _, err = websocket.DefaultDialer.Dial(ENDPOINT, nil)
if err != nil {
return err
}
c.setConnected(true)
go func() {
for c.connected() {
var msg tacitus.Message
err := c.conn.ReadJSON(&msg)
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure) {
c.sendClose(false)
c.err <- err
break
}
}
c.msgs <- msg
}
}()
go func() {
for c.connected() {
select {
case msg := <-c.writer:
err := c.conn.WriteJSON(msg)
if err != nil {
c.sendClose(false)
c.err <- err
}
case send := <-c.close:
if send {
c.conn.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure,
"done"),
time.Now().Add(30*time.Second))
}
}
}
}()
return nil
}
func (c *Client) Close() {
c.sendClose(true)
}
func (c *Client) sendClose(send bool) {
if c.connected() {
c.setConnected(false)
c.close <- send
}
}
func (c *Client) Send(msg tacitus.Request) {
if c.connected() {
c.writer <- msg
}
}
func (c *Client) MessageStream() <-chan tacitus.Message {
return c.msgs
}
func (c *Client) Error() <-chan error {
return c.err
}
func (c *Client) connected() bool {
c.conMux.Lock()
defer c.conMux.Unlock()
return c.connect
}
func (c *Client) setConnected(connected bool) {
c.conMux.Lock()
defer c.conMux.Unlock()
c.connect = connected
}