Websocket close handling
This commit is contained in:
parent
26b94963a0
commit
163e3244f9
@ -4,18 +4,22 @@ import (
|
||||
"github.com/kcotugno/tacitus/gdax"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const ENDPOINT = "wss://ws-feed.gdax.com"
|
||||
|
||||
type Client struct {
|
||||
msgs chan gdax.Message
|
||||
err chan error
|
||||
msgs chan gdax.Message
|
||||
err chan error
|
||||
writer chan gdax.Request
|
||||
conn *websocket.Conn
|
||||
conn *websocket.Conn
|
||||
|
||||
connected bool
|
||||
close chan bool
|
||||
conMux sync.Mutex
|
||||
connect bool
|
||||
close chan bool
|
||||
}
|
||||
|
||||
func NewClient() *Client {
|
||||
@ -35,36 +39,41 @@ func (c *Client) Open() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.connected = true
|
||||
c.setConnected(true)
|
||||
|
||||
go func () {
|
||||
for c.connected {
|
||||
select {
|
||||
case <- c.close:
|
||||
default:
|
||||
var msg gdax.Message
|
||||
go func() {
|
||||
for c.connected() {
|
||||
var msg gdax.Message
|
||||
|
||||
err := c.conn.ReadJSON(&msg)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
err := c.conn.ReadJSON(&msg)
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err,
|
||||
websocket.CloseNormalClosure) {
|
||||
c.sendClose(false)
|
||||
c.err <- err
|
||||
break
|
||||
}
|
||||
|
||||
c.msgs <- msg
|
||||
}
|
||||
c.msgs <- msg
|
||||
}
|
||||
}()
|
||||
|
||||
go func () {
|
||||
for c.connected {
|
||||
go func() {
|
||||
for c.connected() {
|
||||
select {
|
||||
case msg := <- c.writer:
|
||||
case msg := <-c.writer:
|
||||
err := c.conn.WriteJSON(msg)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
c.sendClose(false)
|
||||
c.err <- err
|
||||
}
|
||||
case <- c.close:
|
||||
case send := <-c.close:
|
||||
if send {
|
||||
c.conn.WriteControl(websocket.CloseMessage,
|
||||
websocket.FormatCloseMessage(websocket.CloseNormalClosure,
|
||||
"done"),
|
||||
time.Now().Add(30*time.Second))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -73,21 +82,40 @@ func (c *Client) Open() error {
|
||||
}
|
||||
|
||||
func (c *Client) Close() {
|
||||
c.connected = false
|
||||
c.close <- false
|
||||
c.conn.Close()
|
||||
c.sendClose(true)
|
||||
}
|
||||
|
||||
func (c *Client) sendClose(send bool) {
|
||||
if c.connected() {
|
||||
c.setConnected(false)
|
||||
c.close <- send
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Send(msg gdax.Request) {
|
||||
if c.connected {
|
||||
if c.connected() {
|
||||
c.writer <- msg
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) MessageStream() <- chan gdax.Message {
|
||||
func (c *Client) MessageStream() <-chan gdax.Message {
|
||||
return c.msgs
|
||||
}
|
||||
|
||||
func (c *Client) Error() <- chan error {
|
||||
func (c *Client) Error() <-chan error {
|
||||
return c.err
|
||||
}
|
||||
|
||||
func (c *Client) connected() bool {
|
||||
c.conMux.Lock()
|
||||
defer c.conMux.Unlock()
|
||||
|
||||
return c.connect
|
||||
}
|
||||
|
||||
func (c *Client) setConnected(connected bool) {
|
||||
c.conMux.Lock()
|
||||
defer c.conMux.Unlock()
|
||||
|
||||
c.connect = connected
|
||||
}
|
||||
|
Reference in New Issue
Block a user