Compare commits

...
This repository has been archived on 2022-11-30. You can view files and clone it, but cannot push or open issues or pull requests.

3 Commits

Author SHA1 Message Date
f324ba576c Only send an error if there is a receiver 2017-11-27 20:13:30 -08:00
81ec3c1eb8 Continue refactor 2017-11-27 20:13:30 -08:00
3513e845c6 WIP 2017-11-27 20:13:30 -08:00
4 changed files with 42 additions and 73 deletions

View File

@ -31,7 +31,7 @@ func main() {
r := ops.Registrar{} r := ops.Registrar{}
r.Database = db r.Database = db
r.Listener = ls r.Listener = ls
r.SetLogger(logger) r.Logger = logger
if err := r.Record("ETH-USD", "BTC-USD"); err != nil { if err := r.Record("ETH-USD", "BTC-USD"); err != nil {
logger.Info("Error: %v", err) logger.Info("Error: %v", err)
return return

View File

@ -4,9 +4,10 @@ import (
"github.com/kcotugno/tacitus" "github.com/kcotugno/tacitus"
"errors" "errors"
"sync"
) )
const buf = 1024
type ListenerService struct { type ListenerService struct {
Client ListenerClient Client ListenerClient
Logger tacitus.Logger Logger tacitus.Logger
@ -14,19 +15,11 @@ type ListenerService struct {
trades chan tacitus.Trade trades chan tacitus.Trade
err chan error err chan error
subs []string dirty bool
closed bool
shouldRestart bool
restMu sync.Mutex
errorsMu sync.Mutex
sendErrors bool
} }
func (s *ListenerService) Open() error { func (s *ListenerService) Open() error {
if s.closed { if s.dirty {
return errors.New("Already used") return errors.New("Already used")
} }
@ -36,9 +29,8 @@ func (s *ListenerService) Open() error {
} }
s.Logger.Info("GDAX listener started") s.Logger.Info("GDAX listener started")
s.trades = make(chan tacitus.Trade, 1024) s.trades = make(chan tacitus.Trade, buf)
s.err = make(chan error) s.err = make(chan error)
s.setRestart(true)
s.listen() s.listen()
@ -46,15 +38,22 @@ func (s *ListenerService) Open() error {
} }
func (s *ListenerService) Close() error { func (s *ListenerService) Close() error {
s.closed = true
s.setRestart(false)
s.Client.Close() s.Client.Close()
s.Logger.Info("GDAX listener stopped") s.Logger.Info("GDAX listener stopped")
close(s.trades)
close(s.err)
return nil return nil
} }
func (s *ListenerService) Subscribe(products ...string) { func (s *ListenerService) Subscribe(products ...string) {
if s.dirty {
return
}
s.dirty = true
req := Request{} req := Request{}
req.Type = Subscribe req.Type = Subscribe
@ -62,8 +61,6 @@ func (s *ListenerService) Subscribe(products ...string) {
req.Channels[0].Name = "matches" req.Channels[0].Name = "matches"
req.Channels[0].ProductIds = products req.Channels[0].ProductIds = products
s.subs = products
s.Client.Send(req) s.Client.Send(req)
} }
@ -75,10 +72,6 @@ func (s *ListenerService) Error() <-chan error {
return s.err return s.err
} }
func (s *ListenerService) SetLogger(logger tacitus.Logger) {
s.Logger = logger
}
func (s *ListenerService) listen() { func (s *ListenerService) listen() {
go func() { go func() {
for msg := range s.Client.Stream() { for msg := range s.Client.Stream() {
@ -106,50 +99,16 @@ func (s *ListenerService) listen() {
for err := range s.Client.Error() { for err := range s.Client.Error() {
s.Logger.Info("Error from GDAX client: %v", err) s.Logger.Info("Error from GDAX client: %v", err)
if s.SendErrors() { select {
s.err <- err case s.err <- err:
default:
} }
close(s.trades)
close(s.err)
} }
s.Logger.Info("Error closed") s.Logger.Info("Error closed")
if s.restart() {
err := s.Open()
if err != nil {
s.Logger.Info("GDAX Unable to reastablish connection")
s.Close()
return
}
s.Subscribe(s.subs...)
}
}() }()
} }
func (s *ListenerService) restart() bool {
s.restMu.Lock()
defer s.restMu.Unlock()
return s.shouldRestart
}
func (s *ListenerService) setRestart(restart bool) {
s.restMu.Lock()
defer s.restMu.Unlock()
s.shouldRestart = restart
}
func (s *ListenerService) SendErrors() bool {
s.errorsMu.Lock()
defer s.errorsMu.Unlock()
return s.sendErrors
}
func (s *ListenerService) SetSendErrors(send bool) {
s.errorsMu.Lock()
defer s.errorsMu.Unlock()
s.sendErrors = send
}

View File

@ -6,7 +6,4 @@ type ListenerService interface {
Subscribe(products ...string) Subscribe(products ...string)
Stream() <-chan Trade Stream() <-chan Trade
Error() <-chan error Error() <-chan error
SendErrors() bool
SetSendErrors(send bool)
SetLogger(logger Logger)
} }

View File

@ -7,8 +7,9 @@ import (
type Registrar struct { type Registrar struct {
Database tacitus.DatabaseClientService Database tacitus.DatabaseClientService
Listener tacitus.ListenerService Listener tacitus.ListenerService
Logger tacitus.Logger
logger tacitus.Logger err chan error
} }
func (r *Registrar) Record(products ...string) error { func (r *Registrar) Record(products ...string) error {
@ -22,26 +23,38 @@ func (r *Registrar) Record(products ...string) error {
return nil return nil
} }
func (r *Registrar) SetLogger(logger tacitus.Logger) {
r.Listener.SetLogger(logger)
r.logger = logger
}
func (r *Registrar) record() { func (r *Registrar) record() {
go func() { go func() {
for t := range r.Listener.Stream() { for t := range r.Listener.Stream() {
go func(trade tacitus.Trade) { go func(trade tacitus.Trade) {
_, err := r.Database.TradeService().CreateTrade(trade) _, err := r.Database.TradeService().CreateTrade(trade)
if err != nil { if err != nil {
r.logger.Info("Error inserting trade: %v", err) r.Logger.Info("Error inserting trade: %v", err)
select {
case r.err <- err:
default:
}
} }
}(t) }(t)
} }
r.Logger.Info("Trade goroutine done")
}() }()
go func() { go func() {
for err := range r.Listener.Error() { for err := range r.Listener.Error() {
r.logger.Info("Registrar received error: %v", err) r.Logger.Info("Registrar received error: %v", err)
select {
case r.err <- err:
default:
}
} }
r.Logger.Info("Error goroutine done")
}() }()
} }
func (r *Registrar) Error() <-chan error {
return r.err
}