Merge branch 'master' of gitlab.com:kcotugno/tacitus

This commit is contained in:
Kevin Cotugno 2017-09-25 16:48:48 -07:00
commit d8b76de011
7 changed files with 86 additions and 40 deletions

View File

@ -1,45 +1,19 @@
package main package main
import ( import (
"github.com/kcotugno/tacitus" "github.com/kcotugno/tacitus/ops"
"github.com/kcotugno/tacitus/osutil" "github.com/kcotugno/tacitus/osutil"
"github.com/kcotugno/tacitus/postgres"
"github.com/kcotugno/tacitus/gdax"
"github.com/kcotugno/tacitus/gdax/websocket"
"log"
) )
func main() { func main() {
logger := osutil.NewLogger() logger := osutil.NewLogger()
c := websocket.NewClient()
ls := gdax.ListenerService{}
ls.Client = c
ls.Logger = logger
r := ops.NewRegistrar()
db := postgres.NewClient() r.SetLogger(logger)
db.Name = "gdax" if err := r.Record("ETH-USD", "BTC-USD"); err != nil {
db.User = "gdax" logger.Info("Error: %v", err)
db.SetLogger(logger)
err := db.Open()
if err != nil {
log.Panic(err)
} }
err = ls.Open() t := make(chan bool)
if err != nil { <-t
log.Print(err)
}
ls.Subscribe("ETH-USD", "BTC-USD")
for t := range ls.Stream() {
go func (trade tacitus.Trade) {
_, err := db.TradeService().CreateTrade(trade)
if err != nil {
logger.Info("Error inserting trade: %v", err)
}
}(t)
}
} }

View File

@ -1,10 +1,10 @@
package main package main
import ( import (
"log"
"github.com/kcotugno/tacitus/http" "github.com/kcotugno/tacitus/http"
"github.com/kcotugno/tacitus/postgres"
"github.com/kcotugno/tacitus/osutil" "github.com/kcotugno/tacitus/osutil"
"github.com/kcotugno/tacitus/postgres"
"log"
) )
func main() { func main() {
@ -23,5 +23,5 @@ func main() {
defer s.Close() defer s.Close()
s.Open() s.Open()
<- s.Err <-s.Err
} }

View File

@ -16,7 +16,7 @@ type ListenerService struct {
subs []string subs []string
closed bool closed bool
shouldRestart bool shouldRestart bool
restMu sync.Mutex restMu sync.Mutex
@ -72,6 +72,10 @@ func (s *ListenerService) Error() <-chan error {
return s.err return s.err
} }
func (s *ListenerService) SetLogger(logger tacitus.Logger) {
s.Logger = logger
}
func (s *ListenerService) listen() { func (s *ListenerService) listen() {
go func() { go func() {
for msg := range s.Client.Stream() { for msg := range s.Client.Stream() {

View File

@ -1,8 +1,8 @@
package websocket package websocket
import ( import (
"github.com/kcotugno/tacitus/gdax"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/kcotugno/tacitus/gdax"
"sync" "sync"
"time" "time"

View File

@ -6,4 +6,5 @@ type ListenerService interface {
Subscribe(product string) Subscribe(product string)
Stream() <-chan Trade Stream() <-chan Trade
Error() <-chan error Error() <-chan error
SetLogger(logger Logger)
} }

67
ops/registrar.go Normal file
View File

@ -0,0 +1,67 @@
package ops
import (
"github.com/kcotugno/tacitus"
"github.com/kcotugno/tacitus/gdax"
"github.com/kcotugno/tacitus/gdax/websocket"
"github.com/kcotugno/tacitus/postgres"
)
type Registrar struct {
db *postgres.Client
wc *gdax.ListenerService
logger tacitus.Logger
}
func NewRegistrar() *Registrar {
r := Registrar{}
r.db = postgres.NewClient()
r.wc = &gdax.ListenerService{}
r.wc.Client = websocket.NewClient()
return &r
}
func (r *Registrar) Record(products ...string) error {
r.db.Name = "gdax"
r.db.User = "gdax"
if err := r.db.Open(); err != nil {
return err
}
if err := r.wc.Open(); err != nil {
return err
}
r.record()
r.wc.Subscribe(products...)
return nil
}
func (r *Registrar) SetLogger(logger tacitus.Logger) {
r.db.SetLogger(logger)
r.wc.Logger = logger
r.logger = logger
}
func (r *Registrar) record() {
go func() {
for t := range r.wc.Stream() {
go func(trade tacitus.Trade) {
_, err := r.db.TradeService().CreateTrade(trade)
if err != nil {
r.logger.Info("Error inserting trade: %v", err)
}
}(t)
}
}()
go func() {
for err := range r.wc.Error() {
r.logger.Info("Registrar received error: %v", err)
}
}()
}

View File

@ -11,10 +11,10 @@ const tmpl = "%v - %v: "
type LoggerService struct { type LoggerService struct {
DateFormat string DateFormat string
Utc bool Utc bool
file *os.File file *os.File
log bool log bool
} }
func NewLogger() *LoggerService { func NewLogger() *LoggerService {