Compare commits
3 Commits
master
...
refactor-r
Author | SHA1 | Date | |
---|---|---|---|
f324ba576c | |||
81ec3c1eb8 | |||
3513e845c6 |
@ -31,7 +31,7 @@ func main() {
|
|||||||
r := ops.Registrar{}
|
r := ops.Registrar{}
|
||||||
r.Database = db
|
r.Database = db
|
||||||
r.Listener = ls
|
r.Listener = ls
|
||||||
r.SetLogger(logger)
|
r.Logger = logger
|
||||||
if err := r.Record("ETH-USD", "BTC-USD"); err != nil {
|
if err := r.Record("ETH-USD", "BTC-USD"); err != nil {
|
||||||
logger.Info("Error: %v", err)
|
logger.Info("Error: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -4,9 +4,10 @@ import (
|
|||||||
"github.com/kcotugno/tacitus"
|
"github.com/kcotugno/tacitus"
|
||||||
|
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const buf = 1024
|
||||||
|
|
||||||
type ListenerService struct {
|
type ListenerService struct {
|
||||||
Client ListenerClient
|
Client ListenerClient
|
||||||
Logger tacitus.Logger
|
Logger tacitus.Logger
|
||||||
@ -14,19 +15,11 @@ type ListenerService struct {
|
|||||||
trades chan tacitus.Trade
|
trades chan tacitus.Trade
|
||||||
err chan error
|
err chan error
|
||||||
|
|
||||||
subs []string
|
dirty bool
|
||||||
|
|
||||||
closed bool
|
|
||||||
|
|
||||||
shouldRestart bool
|
|
||||||
restMu sync.Mutex
|
|
||||||
|
|
||||||
errorsMu sync.Mutex
|
|
||||||
sendErrors bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ListenerService) Open() error {
|
func (s *ListenerService) Open() error {
|
||||||
if s.closed {
|
if s.dirty {
|
||||||
return errors.New("Already used")
|
return errors.New("Already used")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,9 +29,8 @@ func (s *ListenerService) Open() error {
|
|||||||
}
|
}
|
||||||
s.Logger.Info("GDAX listener started")
|
s.Logger.Info("GDAX listener started")
|
||||||
|
|
||||||
s.trades = make(chan tacitus.Trade, 1024)
|
s.trades = make(chan tacitus.Trade, buf)
|
||||||
s.err = make(chan error)
|
s.err = make(chan error)
|
||||||
s.setRestart(true)
|
|
||||||
|
|
||||||
s.listen()
|
s.listen()
|
||||||
|
|
||||||
@ -46,15 +38,22 @@ func (s *ListenerService) Open() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ListenerService) Close() error {
|
func (s *ListenerService) Close() error {
|
||||||
s.closed = true
|
|
||||||
s.setRestart(false)
|
|
||||||
s.Client.Close()
|
s.Client.Close()
|
||||||
s.Logger.Info("GDAX listener stopped")
|
s.Logger.Info("GDAX listener stopped")
|
||||||
|
|
||||||
|
close(s.trades)
|
||||||
|
close(s.err)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ListenerService) Subscribe(products ...string) {
|
func (s *ListenerService) Subscribe(products ...string) {
|
||||||
|
if s.dirty {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.dirty = true
|
||||||
|
|
||||||
req := Request{}
|
req := Request{}
|
||||||
req.Type = Subscribe
|
req.Type = Subscribe
|
||||||
|
|
||||||
@ -62,8 +61,6 @@ func (s *ListenerService) Subscribe(products ...string) {
|
|||||||
req.Channels[0].Name = "matches"
|
req.Channels[0].Name = "matches"
|
||||||
req.Channels[0].ProductIds = products
|
req.Channels[0].ProductIds = products
|
||||||
|
|
||||||
s.subs = products
|
|
||||||
|
|
||||||
s.Client.Send(req)
|
s.Client.Send(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,10 +72,6 @@ func (s *ListenerService) Error() <-chan error {
|
|||||||
return s.err
|
return s.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ListenerService) SetLogger(logger tacitus.Logger) {
|
|
||||||
s.Logger = logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ListenerService) listen() {
|
func (s *ListenerService) listen() {
|
||||||
go func() {
|
go func() {
|
||||||
for msg := range s.Client.Stream() {
|
for msg := range s.Client.Stream() {
|
||||||
@ -106,50 +99,16 @@ func (s *ListenerService) listen() {
|
|||||||
for err := range s.Client.Error() {
|
for err := range s.Client.Error() {
|
||||||
s.Logger.Info("Error from GDAX client: %v", err)
|
s.Logger.Info("Error from GDAX client: %v", err)
|
||||||
|
|
||||||
if s.SendErrors() {
|
select {
|
||||||
s.err <- err
|
case s.err <- err:
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close(s.trades)
|
||||||
|
close(s.err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Logger.Info("Error closed")
|
s.Logger.Info("Error closed")
|
||||||
|
|
||||||
if s.restart() {
|
|
||||||
err := s.Open()
|
|
||||||
if err != nil {
|
|
||||||
s.Logger.Info("GDAX Unable to reastablish connection")
|
|
||||||
s.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.Subscribe(s.subs...)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ListenerService) restart() bool {
|
|
||||||
s.restMu.Lock()
|
|
||||||
defer s.restMu.Unlock()
|
|
||||||
|
|
||||||
return s.shouldRestart
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ListenerService) setRestart(restart bool) {
|
|
||||||
s.restMu.Lock()
|
|
||||||
defer s.restMu.Unlock()
|
|
||||||
|
|
||||||
s.shouldRestart = restart
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ListenerService) SendErrors() bool {
|
|
||||||
s.errorsMu.Lock()
|
|
||||||
defer s.errorsMu.Unlock()
|
|
||||||
|
|
||||||
return s.sendErrors
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ListenerService) SetSendErrors(send bool) {
|
|
||||||
s.errorsMu.Lock()
|
|
||||||
defer s.errorsMu.Unlock()
|
|
||||||
|
|
||||||
s.sendErrors = send
|
|
||||||
}
|
|
||||||
|
@ -6,7 +6,4 @@ type ListenerService interface {
|
|||||||
Subscribe(products ...string)
|
Subscribe(products ...string)
|
||||||
Stream() <-chan Trade
|
Stream() <-chan Trade
|
||||||
Error() <-chan error
|
Error() <-chan error
|
||||||
SendErrors() bool
|
|
||||||
SetSendErrors(send bool)
|
|
||||||
SetLogger(logger Logger)
|
|
||||||
}
|
}
|
||||||
|
@ -7,8 +7,9 @@ import (
|
|||||||
type Registrar struct {
|
type Registrar struct {
|
||||||
Database tacitus.DatabaseClientService
|
Database tacitus.DatabaseClientService
|
||||||
Listener tacitus.ListenerService
|
Listener tacitus.ListenerService
|
||||||
|
Logger tacitus.Logger
|
||||||
|
|
||||||
logger tacitus.Logger
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registrar) Record(products ...string) error {
|
func (r *Registrar) Record(products ...string) error {
|
||||||
@ -22,26 +23,38 @@ func (r *Registrar) Record(products ...string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registrar) SetLogger(logger tacitus.Logger) {
|
|
||||||
r.Listener.SetLogger(logger)
|
|
||||||
r.logger = logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Registrar) record() {
|
func (r *Registrar) record() {
|
||||||
go func() {
|
go func() {
|
||||||
for t := range r.Listener.Stream() {
|
for t := range r.Listener.Stream() {
|
||||||
go func(trade tacitus.Trade) {
|
go func(trade tacitus.Trade) {
|
||||||
_, err := r.Database.TradeService().CreateTrade(trade)
|
_, err := r.Database.TradeService().CreateTrade(trade)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Info("Error inserting trade: %v", err)
|
r.Logger.Info("Error inserting trade: %v", err)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case r.err <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}(t)
|
}(t)
|
||||||
}
|
}
|
||||||
|
r.Logger.Info("Trade goroutine done")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for err := range r.Listener.Error() {
|
for err := range r.Listener.Error() {
|
||||||
r.logger.Info("Registrar received error: %v", err)
|
r.Logger.Info("Registrar received error: %v", err)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case r.err <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.Logger.Info("Error goroutine done")
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Registrar) Error() <-chan error {
|
||||||
|
return r.err
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user