Commit 90b9cf58 authored by vicotor's avatar vicotor

update processor

parent c1cf3bd2
...@@ -25,7 +25,7 @@ type KLineGeneratorJob struct { ...@@ -25,7 +25,7 @@ type KLineGeneratorJob struct {
ProcessorFactory CoinProcessorFactory ProcessorFactory CoinProcessorFactory
} }
func NewKLineGeneratorJob(factory CoinProcessorFactory, service MarketService) *KLineGeneratorJob { func NewKLineGeneratorJob(factory CoinProcessorFactory) *KLineGeneratorJob {
return &KLineGeneratorJob{ return &KLineGeneratorJob{
ProcessorFactory: factory, ProcessorFactory: factory,
} }
......
...@@ -14,7 +14,7 @@ type mongoMarketHandler struct { ...@@ -14,7 +14,7 @@ type mongoMarketHandler struct {
// HandleTrade inserts an ExchangeTrade into the corresponding collection // HandleTrade inserts an ExchangeTrade into the corresponding collection
func (h *mongoMarketHandler) HandleTrade(symbol string, trade *ExchangeTrade) error { func (h *mongoMarketHandler) HandleTrade(symbol string, trade *ExchangeTrade) error {
collection := h.db.Collection("exchange_trade_" + symbol) collection := h.db.Collection(tradePrefix + symbol)
_, err := collection.InsertOne(context.Background(), trade) _, err := collection.InsertOne(context.Background(), trade)
if err != nil { if err != nil {
return fmt.Errorf("failed to insert trade: %v", err) return fmt.Errorf("failed to insert trade: %v", err)
...@@ -24,7 +24,7 @@ func (h *mongoMarketHandler) HandleTrade(symbol string, trade *ExchangeTrade) er ...@@ -24,7 +24,7 @@ func (h *mongoMarketHandler) HandleTrade(symbol string, trade *ExchangeTrade) er
// HandleKLine inserts a KLine into the corresponding collection // HandleKLine inserts a KLine into the corresponding collection
func (h *mongoMarketHandler) HandleKLine(symbol string, kline *KLine) error { func (h *mongoMarketHandler) HandleKLine(symbol string, kline *KLine) error {
collection := h.db.Collection("exchange_kline_" + symbol + "_" + kline.Period) collection := h.db.Collection(klinePrefix + symbol + "_" + kline.Period)
_, err := collection.InsertOne(context.Background(), kline) _, err := collection.InsertOne(context.Background(), kline)
if err != nil { if err != nil {
return fmt.Errorf("failed to insert KLine: %v", err) return fmt.Errorf("failed to insert KLine: %v", err)
......
...@@ -9,7 +9,7 @@ type Monitor struct { ...@@ -9,7 +9,7 @@ type Monitor struct {
func NewMonitor(conf *config.Config) *Monitor { func NewMonitor(conf *config.Config) *Monitor {
service := NewMarketService(conf) service := NewMarketService(conf)
kline := NewKLineGeneratorJob(&coinProcessorFactory{}, service) kline := NewKLineGeneratorJob(NewCoinProcessorFactory(service))
return &Monitor{ return &Monitor{
kline: kline, kline: kline,
......
...@@ -41,7 +41,7 @@ type ExchangeTrade struct { ...@@ -41,7 +41,7 @@ type ExchangeTrade struct {
type MarketHandler interface { type MarketHandler interface {
HandleTrade(symbol string, trade *ExchangeTrade) error HandleTrade(symbol string, trade *ExchangeTrade) error
HandleKLine(symbol string, kline *KLine) HandleKLine(symbol string, kline *KLine) error
} }
type MarketService interface { type MarketService interface {
...@@ -50,6 +50,7 @@ type MarketService interface { ...@@ -50,6 +50,7 @@ type MarketService interface {
FindTradeByTimeRange(symbol string, start, end int64) []*ExchangeTrade FindTradeByTimeRange(symbol string, start, end int64) []*ExchangeTrade
FindAllKLineByTimeRange(symbol string, fromTime, toTime int64, period string) []*KLine FindAllKLineByTimeRange(symbol string, fromTime, toTime int64, period string) []*KLine
SaveKLine(symbol string, kline *KLine) SaveKLine(symbol string, kline *KLine)
NewHandler(symbol string) MarketHandler
} }
type DefaultCoinProcessor struct { type DefaultCoinProcessor struct {
...@@ -67,20 +68,30 @@ type DefaultCoinProcessor struct { ...@@ -67,20 +68,30 @@ type DefaultCoinProcessor struct {
type coinProcessorFactory struct { type coinProcessorFactory struct {
mux sync.Mutex mux sync.Mutex
service MarketService
coinProcessors map[string]*DefaultCoinProcessor coinProcessors map[string]*DefaultCoinProcessor
} }
func NewCoinProcessorFactory(service MarketService) CoinProcessorFactory {
return &coinProcessorFactory{
service: service,
coinProcessors: make(map[string]*DefaultCoinProcessor),
}
}
func (cpf *coinProcessorFactory) GetCoinProcessor(symbol, baseCoin string) *DefaultCoinProcessor { func (cpf *coinProcessorFactory) GetCoinProcessor(symbol, baseCoin string) *DefaultCoinProcessor {
cpf.mux.Lock() cpf.mux.Lock()
defer cpf.mux.Unlock() defer cpf.mux.Unlock()
if processor, exists := cpf.coinProcessors[symbol]; exists { if processor, exists := cpf.coinProcessors[symbol]; exists {
return processor return processor
} else { } else {
// todo: initialize the processor with the service.
processor = &DefaultCoinProcessor{ processor = &DefaultCoinProcessor{
symbol: symbol, symbol: symbol,
baseCoin: baseCoin, baseCoin: baseCoin,
currentKLine: createNewKLine(), currentKLine: createNewKLine(),
handlers: []MarketHandler{}, handlers: make([]MarketHandler, 0),
coinThumb: &CoinThumb{}, coinThumb: &CoinThumb{},
isHalt: true, isHalt: true,
stopKLine: false, stopKLine: false,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment