ops: some updates
This commit is contained in:
parent
4e82c44a50
commit
f227f7f79b
@ -2,56 +2,37 @@ package ops
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/kcotugno/tacitus"
|
"github.com/kcotugno/tacitus"
|
||||||
"github.com/kcotugno/tacitus/gdax"
|
|
||||||
"github.com/kcotugno/tacitus/gdax/websocket"
|
|
||||||
"github.com/kcotugno/tacitus/postgres"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Registrar struct {
|
type Registrar struct {
|
||||||
db *postgres.Client
|
Database tacitus.DatabaseClientService
|
||||||
wc *gdax.ListenerService
|
Listener tacitus.ListenerService
|
||||||
|
|
||||||
logger tacitus.Logger
|
logger tacitus.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRegistrar() *Registrar {
|
|
||||||
r := Registrar{}
|
|
||||||
|
|
||||||
r.db = postgres.NewClient()
|
|
||||||
r.wc = &gdax.ListenerService{}
|
|
||||||
r.wc.Client = websocket.NewClient()
|
|
||||||
|
|
||||||
return &r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Registrar) Record(products ...string) error {
|
func (r *Registrar) Record(products ...string) error {
|
||||||
r.db.Name = "gdax"
|
if err := r.Listener.Open(); err != nil {
|
||||||
r.db.User = "gdax"
|
|
||||||
if err := r.db.Open(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := r.wc.Open(); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.record()
|
r.record()
|
||||||
r.wc.Subscribe(products...)
|
r.Listener.Subscribe(products...)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registrar) SetLogger(logger tacitus.Logger) {
|
func (r *Registrar) SetLogger(logger tacitus.Logger) {
|
||||||
r.db.SetLogger(logger)
|
r.Database.SetLogger(logger)
|
||||||
r.wc.Logger = logger
|
r.Listener.SetLogger(logger)
|
||||||
r.logger = logger
|
r.logger = logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registrar) record() {
|
func (r *Registrar) record() {
|
||||||
go func() {
|
go func() {
|
||||||
for t := range r.wc.Stream() {
|
for t := range r.Listener.Stream() {
|
||||||
go func(trade tacitus.Trade) {
|
go func(trade tacitus.Trade) {
|
||||||
_, err := r.db.TradeService().CreateTrade(trade)
|
_, err := r.Database.TradeService().CreateTrade(trade)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Info("Error inserting trade: %v", err)
|
r.logger.Info("Error inserting trade: %v", err)
|
||||||
}
|
}
|
||||||
@ -60,7 +41,7 @@ func (r *Registrar) record() {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for err := range r.wc.Error() {
|
for err := range r.Listener.Error() {
|
||||||
r.logger.Info("Registrar received error: %v", err)
|
r.logger.Info("Registrar received error: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
Reference in New Issue
Block a user