package main import ( "github.com/emirpasic/gods/trees/redblacktree" "github.com/gorilla/websocket" "github.com/shopspring/decimal" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "sync" "time" ) type Message struct { Sequence int64 `json:"sequence"` Type string `json:"type"` Side string `json:"side"` Price decimal.Decimal `json:"price"` Size decimal.Decimal `json:"size"` OrderId string `json:"order_id"` MakerOrderId string `json:"maker_order_id"` RemainingSize decimal.Decimal `json:"remaining_size"` NewSize decimal.Decimal `json:"new_size"` ProductId string `json:"product_id"` Time time.Time `json:"time"` Reason string `json:"reason"` OrderType string `json:"order_type"` ClientOid string `json:"client_oid"` } type LevelThree struct { Sequence int64 `json:"sequence"` Bids []LevelThreeEntry `json:"bids"` Asks []LevelThreeEntry `json:"asks"` } type LevelThreeEntry []string type Sub struct { Type string `json:"type"` ProductIds []string `json:"product_ids"` Channels []string `json:"channels"` } type Entry struct { Id string Side string Price decimal.Decimal Size decimal.Decimal } type Entries map[string]Entry type OrderBook struct { Msg <-chan Message Err <-chan error asks *redblacktree.Tree bids *redblacktree.Tree askLock sync.Mutex bidLock sync.Mutex msg chan Message err chan error running bool coin string sequence int64 conn *websocket.Conn } func NewOrderBook(coin string) (*OrderBook, error) { var o OrderBook var err error o.conn, _, err = websocket.DefaultDialer.Dial("wss://ws-feed.pro.coinbase.com", nil) if err != nil { return nil, err } o.asks = redblacktree.NewWith(DecimalComparator) o.bids = redblacktree.NewWith(ReverseDecimalComparator) o.msg = make(chan Message, 2048) o.Msg = o.msg o.err = make(chan error, 0) o.Err = o.err o.running = true o.coin = coin o.watchBook() return &o, nil } func (o *OrderBook) Shutdown() { if !o.running { return } o.running = false o.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket. CloseNormalClosure, "")) } func (o *OrderBook) Entries(side string, count int) []Entries { entries := make([]Entries, 0) tree := o.tree(side) lock := o.lock(side) lock.Lock() defer lock.Unlock() it := tree.Iterator() for i := 0; i < count; i++ { copies := make(Entries, 0) ok := it.Next() if !ok { break } e := it.Value().(Entries) for _, j := range e { copies[j.Id] = j } entries = append(entries, copies) } return entries } func (o *OrderBook) watchBook() { go func() { defer func() { close(o.err) close(o.msg) o.conn.Close() o.running = false }() var msg Message var err error sub := Sub{"subscribe", []string{o.coin}, []string{"full"}} err = o.conn.WriteJSON(sub) if err != nil { o.sendError(err) return } o.loadOrderBook() for { err := o.conn.ReadJSON(&msg) if err != nil { if !websocket.IsCloseError(err, websocket.CloseNormalClosure) { o.sendError(err) } break } if msg.Sequence <= o.sequence { continue } if msg.Sequence != o.sequence+1 { o.loadOrderBook() continue } o.sequence = msg.Sequence switch msg.Type { case "received": case "open": o.open(msg) case "done": o.done(msg) case "match": o.match(msg) case "change": o.change(msg) default: o.sendError(errors.New("Unknown message type")) } o.msg <- msg } }() } func (o *OrderBook) open(msg Message) { var e Entry e.Id = msg.OrderId e.Side = msg.Side e.Price = msg.Price e.Size = msg.RemainingSize o.setEntry(e) } func (o *OrderBook) done(msg Message) { if msg.Price.Equal(decimal.Zero) { return } var e Entry e.Id = msg.OrderId e.Side = msg.Side e.Price = msg.Price o.removeEntry(e) } func (o *OrderBook) match(msg Message) { e, ok := o.entry(msg.Side, msg.Price, msg.MakerOrderId) if !ok { return } e.Size = e.Size.Sub(msg.Size) o.setEntry(e) } func (o *OrderBook) change(msg Message) { var e Entry e.Id = msg.OrderId e.Side = msg.Side e.Price = msg.Price e.Size = msg.NewSize o.setEntry(e) } func (o *OrderBook) loadOrderBook() { o.askLock.Lock() o.bidLock.Lock() o.bids.Clear() o.asks.Clear() o.askLock.Unlock() o.bidLock.Unlock() resp, err := http.Get(fmt.Sprintf("https://api.pro.coinbase.com/products/%v/book?level=3", coin)) if err != nil { o.sendError(err) o.Shutdown() return } defer resp.Body.Close() buf, err := ioutil.ReadAll(resp.Body) if err != nil { o.sendError(err) o.Shutdown() return } var parsed LevelThree err = json.Unmarshal(buf, &parsed) if err != nil { o.sendError(err) o.Shutdown() return } for _, b := range parsed.Bids { var e Entry e.Price, err = decimal.NewFromString(b[0]) if err != nil { o.sendError(err) o.Shutdown() return } e.Size, err = decimal.NewFromString(b[1]) if err != nil { o.sendError(err) o.Shutdown() return } e.Id = b[2] e.Side = "buy" o.setEntry(e) } for _, a := range parsed.Asks { var e Entry e.Price, err = decimal.NewFromString(a[0]) if err != nil { o.sendError(err) o.Shutdown() return } e.Size, err = decimal.NewFromString(a[1]) if err != nil { o.sendError(err) o.Shutdown() return } e.Id = a[2] e.Side = "sell" o.setEntry(e) } o.sequence = parsed.Sequence } func (o *OrderBook) lock(side string) *sync.Mutex { switch side { case "sell": return &o.askLock case "buy": return &o.bidLock } return nil } func (o *OrderBook) tree(side string) *redblacktree.Tree { switch side { case "sell": return o.asks case "buy": return o.bids } return nil } func (o *OrderBook) entries(side string, key decimal.Decimal) (Entries, bool) { tree := o.tree(side) lock := o.lock(side) lock.Lock() defer lock.Unlock() values, ok := tree.Get(key) if !ok { return nil, false } entries := make(Entries) for k, v := range values.(Entries) { entries[k] = v } return entries, true } func (o *OrderBook) updateEntries(side string, price decimal.Decimal, e Entries) { tree := o.tree(side) lock := o.lock(side) lock.Lock() defer lock.Unlock() if len(e) == 0 { tree.Remove(price) } else { tree.Put(price, e) } } func (o *OrderBook) entry(side string, price decimal.Decimal, id string) (Entry, bool) { var entry Entry entries, ok := o.entries(side, price) if !ok { return entry, false } entry, ok = entries[id] return entry, ok } func (o *OrderBook) setEntry(e Entry) { entries, ok := o.entries(e.Side, e.Price) if !ok { entries = Entries{} } entries[e.Id] = e o.updateEntries(e.Side, e.Price, entries) } func (o *OrderBook) removeEntry(e Entry) { entries, ok := o.entries(e.Side, e.Price) if !ok { return } delete(entries, e.Id) o.updateEntries(e.Side, e.Price, entries) } func DecimalComparator(a, b interface{}) int { aAsserted := a.(decimal.Decimal) bAsserted := b.(decimal.Decimal) switch { case aAsserted.GreaterThan(bAsserted): return 1 case aAsserted.LessThan(bAsserted): return -1 default: return 0 } } func ReverseDecimalComparator(a, b interface{}) int { aAsserted := a.(decimal.Decimal) bAsserted := b.(decimal.Decimal) switch { case aAsserted.GreaterThan(bAsserted): return -1 case aAsserted.LessThan(bAsserted): return 1 default: return 0 } } func (o *OrderBook) sendError(err error) { select { case o.err <- err: default: } }