This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.
spectator/order_book.go

450 lines
7.5 KiB
Go

package main
import (
"github.com/emirpasic/gods/trees/redblacktree"
"github.com/gorilla/websocket"
"github.com/shopspring/decimal"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
type Message struct {
Sequence int64 `json:"sequence"`
Type string `json:"type"`
Side string `json:"side"`
Price decimal.Decimal `json:"price"`
Size decimal.Decimal `json:"size"`
OrderId string `json:"order_id"`
MakerOrderId string `json:"maker_order_id"`
RemainingSize decimal.Decimal `json:"remaining_size"`
NewSize decimal.Decimal `json:"new_size"`
ProductId string `json:"product_id"`
Time time.Time `json:"time"`
Reason string `json:"reason"`
OrderType string `json:"order_type"`
ClientOid string `json:"client_oid"`
}
type LevelThree struct {
Sequence int64 `json:"sequence"`
Bids []LevelThreeEntry `json:"bids"`
Asks []LevelThreeEntry `json:"asks"`
}
type LevelThreeEntry []string
type Sub struct {
Type string `json:"type"`
ProductIds []string `json:"product_ids"`
Channels []string `json:"channels"`
}
type Entry struct {
Id string
Side string
Price decimal.Decimal
Size decimal.Decimal
}
type Entries map[string]Entry
type OrderBook struct {
Msg <-chan Message
Err <-chan error
asks *redblacktree.Tree
bids *redblacktree.Tree
askLock sync.Mutex
bidLock sync.Mutex
msg chan Message
err chan error
running bool
coin string
sequence int64
conn *websocket.Conn
}
func NewOrderBook(coin string) (*OrderBook, error) {
var o OrderBook
var err error
o.conn, _, err = websocket.DefaultDialer.Dial("wss://ws-feed.pro.coinbase.com", nil)
if err != nil {
return nil, err
}
o.asks = redblacktree.NewWith(DecimalComparator)
o.bids = redblacktree.NewWith(ReverseDecimalComparator)
o.msg = make(chan Message, 2048)
o.Msg = o.msg
o.err = make(chan error, 0)
o.Err = o.err
o.running = true
o.coin = coin
o.watchBook()
return &o, nil
}
func (o *OrderBook) Shutdown() {
if !o.running {
return
}
o.running = false
o.conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.
CloseNormalClosure, ""))
}
func (o *OrderBook) Entries(side string, count int) []Entries {
entries := make([]Entries, 0)
tree := o.tree(side)
lock := o.lock(side)
lock.Lock()
defer lock.Unlock()
it := tree.Iterator()
for i := 0; i < count; i++ {
copies := make(Entries, 0)
ok := it.Next()
if !ok {
break
}
e := it.Value().(Entries)
for _, j := range e {
copies[j.Id] = j
}
entries = append(entries, copies)
}
return entries
}
func (o *OrderBook) watchBook() {
go func() {
defer func() {
close(o.err)
close(o.msg)
o.conn.Close()
o.running = false
}()
var msg Message
var err error
sub := Sub{"subscribe", []string{o.coin}, []string{"full"}}
err = o.conn.WriteJSON(sub)
if err != nil {
o.sendError(err)
return
}
o.loadOrderBook()
for {
err := o.conn.ReadJSON(&msg)
if err != nil {
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
o.sendError(err)
}
break
}
if msg.Sequence <= o.sequence {
continue
}
if msg.Sequence != o.sequence+1 {
o.loadOrderBook()
continue
}
o.sequence = msg.Sequence
switch msg.Type {
case "received":
case "open":
o.open(msg)
case "done":
o.done(msg)
case "match":
o.match(msg)
case "change":
o.change(msg)
default:
o.sendError(errors.New("Unknown message type"))
}
o.msg <- msg
}
}()
}
func (o *OrderBook) open(msg Message) {
var e Entry
e.Id = msg.OrderId
e.Side = msg.Side
e.Price = msg.Price
e.Size = msg.RemainingSize
o.setEntry(e)
}
func (o *OrderBook) done(msg Message) {
if msg.Price.Equal(decimal.Zero) {
return
}
var e Entry
e.Id = msg.OrderId
e.Side = msg.Side
e.Price = msg.Price
o.removeEntry(e)
}
func (o *OrderBook) match(msg Message) {
e, ok := o.entry(msg.Side, msg.Price, msg.MakerOrderId)
if !ok {
return
}
e.Size = e.Size.Sub(msg.Size)
o.setEntry(e)
}
func (o *OrderBook) change(msg Message) {
var e Entry
e.Id = msg.OrderId
e.Side = msg.Side
e.Price = msg.Price
e.Size = msg.NewSize
o.setEntry(e)
}
func (o *OrderBook) loadOrderBook() {
o.askLock.Lock()
o.bidLock.Lock()
o.bids.Clear()
o.asks.Clear()
o.askLock.Unlock()
o.bidLock.Unlock()
resp, err := http.Get(fmt.Sprintf("https://api.pro.coinbase.com/products/%v/book?level=3", coin))
if err != nil {
o.sendError(err)
o.Shutdown()
return
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
o.sendError(err)
o.Shutdown()
return
}
var parsed LevelThree
err = json.Unmarshal(buf, &parsed)
if err != nil {
o.sendError(err)
o.Shutdown()
return
}
for _, b := range parsed.Bids {
var e Entry
e.Price, err = decimal.NewFromString(b[0])
if err != nil {
o.sendError(err)
o.Shutdown()
return
}
e.Size, err = decimal.NewFromString(b[1])
if err != nil {
o.sendError(err)
o.Shutdown()
return
}
e.Id = b[2]
e.Side = "buy"
o.setEntry(e)
}
for _, a := range parsed.Asks {
var e Entry
e.Price, err = decimal.NewFromString(a[0])
if err != nil {
o.sendError(err)
o.Shutdown()
return
}
e.Size, err = decimal.NewFromString(a[1])
if err != nil {
o.sendError(err)
o.Shutdown()
return
}
e.Id = a[2]
e.Side = "sell"
o.setEntry(e)
}
o.sequence = parsed.Sequence
}
func (o *OrderBook) lock(side string) *sync.Mutex {
switch side {
case "sell":
return &o.askLock
case "buy":
return &o.bidLock
}
return nil
}
func (o *OrderBook) tree(side string) *redblacktree.Tree {
switch side {
case "sell":
return o.asks
case "buy":
return o.bids
}
return nil
}
func (o *OrderBook) entries(side string, key decimal.Decimal) (Entries, bool) {
tree := o.tree(side)
lock := o.lock(side)
lock.Lock()
defer lock.Unlock()
values, ok := tree.Get(key)
if !ok {
return nil, false
}
entries := make(Entries)
for k, v := range values.(Entries) {
entries[k] = v
}
return entries, true
}
func (o *OrderBook) updateEntries(side string, price decimal.Decimal, e Entries) {
tree := o.tree(side)
lock := o.lock(side)
lock.Lock()
defer lock.Unlock()
if len(e) == 0 {
tree.Remove(price)
} else {
tree.Put(price, e)
}
}
func (o *OrderBook) entry(side string, price decimal.Decimal,
id string) (Entry, bool) {
var entry Entry
entries, ok := o.entries(side, price)
if !ok {
return entry, false
}
entry, ok = entries[id]
return entry, ok
}
func (o *OrderBook) setEntry(e Entry) {
entries, ok := o.entries(e.Side, e.Price)
if !ok {
entries = Entries{}
}
entries[e.Id] = e
o.updateEntries(e.Side, e.Price, entries)
}
func (o *OrderBook) removeEntry(e Entry) {
entries, ok := o.entries(e.Side, e.Price)
if !ok {
return
}
delete(entries, e.Id)
o.updateEntries(e.Side, e.Price, entries)
}
func DecimalComparator(a, b interface{}) int {
aAsserted := a.(decimal.Decimal)
bAsserted := b.(decimal.Decimal)
switch {
case aAsserted.GreaterThan(bAsserted):
return 1
case aAsserted.LessThan(bAsserted):
return -1
default:
return 0
}
}
func ReverseDecimalComparator(a, b interface{}) int {
aAsserted := a.(decimal.Decimal)
bAsserted := b.(decimal.Decimal)
switch {
case aAsserted.GreaterThan(bAsserted):
return -1
case aAsserted.LessThan(bAsserted):
return 1
default:
return 0
}
}
func (o *OrderBook) sendError(err error) {
select {
case o.err <- err:
default:
}
}