Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
c1cf3bd2
Commit
c1cf3bd2
authored
May 12, 2025
by
vicotor
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add kline generator
parent
7b933c41
Changes
9
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
1438 additions
and
1 deletion
+1438
-1
server.go
exchain/exchainapi/server.go
+0
-1
klinegenerator.go
exchain/exmonitor/klinegenerator.go
+114
-0
markethandle.go
exchain/exmonitor/markethandle.go
+33
-0
marketservice.go
exchain/exmonitor/marketservice.go
+181
-0
monitor.go
exchain/exmonitor/monitor.go
+24
-0
processor.go
exchain/exmonitor/processor.go
+295
-0
orderbook.proto
exchain/protocol/proto/orderbook/v1/orderbook.proto
+785
-0
go.mod
go.mod
+2
-0
go.sum
go.sum
+4
-0
No files found.
exchain/exchainapi/server.go
View file @
c1cf3bd2
...
...
@@ -56,7 +56,6 @@ func (s *server) GetBlockByHash(ctx context.Context, request *nodev1.GetBlockReq
return
&
nodev1
.
GetBlockResponse
{
Block
:
blk
,
},
nil
}
func
(
s
*
server
)
GetTransactionByHash
(
ctx
context
.
Context
,
request
*
nodev1
.
GetTransactionRequest
)
(
*
nodev1
.
GetTransactionResponse
,
error
)
{
...
...
exchain/exmonitor/klinegenerator.go
0 → 100644
View file @
c1cf3bd2
package
exmonitor
import
(
"log"
"time"
"github.com/robfig/cron/v3"
)
const
(
K_FIELD_MIN
=
time
.
Minute
K_FIELD_HOUR
=
time
.
Hour
K_FIELD_DAY
=
24
*
time
.
Hour
K_FIELD_WEEK
=
7
*
24
*
time
.
Hour
K_FIELD_MONTH
=
30
*
24
*
time
.
Hour
K_FIELD_YEAR
=
365
*
24
*
time
.
Hour
)
type
CoinProcessorFactory
interface
{
GetCoinProcessor
(
symbol
,
baseCoin
string
)
*
DefaultCoinProcessor
GetProcessorMap
()
map
[
string
]
*
DefaultCoinProcessor
}
type
KLineGeneratorJob
struct
{
ProcessorFactory
CoinProcessorFactory
}
func
NewKLineGeneratorJob
(
factory
CoinProcessorFactory
,
service
MarketService
)
*
KLineGeneratorJob
{
return
&
KLineGeneratorJob
{
ProcessorFactory
:
factory
,
}
}
func
(
job
*
KLineGeneratorJob
)
Handle5MinKLine
()
{
now
:=
time
.
Now
()
log
.
Printf
(
"Minute KLine: %v"
,
now
)
// Set seconds and milliseconds to 0
truncatedTime
:=
now
.
Truncate
(
time
.
Minute
)
minute
:=
truncatedTime
.
Minute
()
hour
:=
truncatedTime
.
Hour
()
for
symbol
,
processor
:=
range
job
.
ProcessorFactory
.
GetProcessorMap
()
{
if
!
processor
.
IsStopKline
()
{
log
.
Printf
(
"Generating 1-minute KLine for %s"
,
symbol
)
processor
.
AutoGenerate
()
processor
.
Update24HVolume
(
truncatedTime
.
UnixMilli
())
if
minute
%
5
==
0
{
processor
.
GenerateKLine
(
5
,
K_FIELD_MIN
,
truncatedTime
.
UnixMilli
())
}
if
minute
%
10
==
0
{
processor
.
GenerateKLine
(
10
,
K_FIELD_MIN
,
truncatedTime
.
UnixMilli
())
}
if
minute
%
15
==
0
{
processor
.
GenerateKLine
(
15
,
K_FIELD_MIN
,
truncatedTime
.
UnixMilli
())
}
if
minute
%
30
==
0
{
processor
.
GenerateKLine
(
30
,
K_FIELD_MIN
,
truncatedTime
.
UnixMilli
())
}
if
hour
==
0
&&
minute
==
0
{
processor
.
ResetThumb
()
}
}
}
}
func
(
job
*
KLineGeneratorJob
)
HandleHourKLine
()
{
now
:=
time
.
Now
()
log
.
Printf
(
"Hour KLine: %v"
,
now
)
// Set minutes, seconds, and milliseconds to 0
truncatedTime
:=
now
.
Truncate
(
time
.
Hour
)
for
_
,
processor
:=
range
job
.
ProcessorFactory
.
GetProcessorMap
()
{
if
!
processor
.
IsStopKline
()
{
processor
.
GenerateKLine
(
1
,
K_FIELD_HOUR
,
truncatedTime
.
UnixMilli
())
}
}
}
func
(
job
*
KLineGeneratorJob
)
HandleDayKLine
()
{
now
:=
time
.
Now
()
log
.
Printf
(
"Day KLine: %v"
,
now
)
// Set hours, minutes, seconds, and milliseconds to 0
truncatedTime
:=
now
.
Truncate
(
24
*
time
.
Hour
)
week
:=
int
(
truncatedTime
.
Weekday
())
dayOfMonth
:=
truncatedTime
.
Day
()
for
_
,
processor
:=
range
job
.
ProcessorFactory
.
GetProcessorMap
()
{
if
!
processor
.
IsStopKline
()
{
if
week
==
0
{
// Sunday
processor
.
GenerateKLine
(
1
,
K_FIELD_WEEK
,
truncatedTime
.
UnixMilli
())
}
if
dayOfMonth
==
1
{
processor
.
GenerateKLine
(
1
,
K_FIELD_MONTH
,
truncatedTime
.
UnixMilli
())
}
processor
.
GenerateKLine
(
1
,
K_FIELD_YEAR
,
truncatedTime
.
UnixMilli
())
}
}
}
func
(
job
*
KLineGeneratorJob
)
Start
()
{
c
:=
cron
.
New
()
// Schedule tasks
c
.
AddFunc
(
"0 * * * *"
,
job
.
Handle5MinKLine
)
// Every minute
c
.
AddFunc
(
"0 0 * * *"
,
job
.
HandleHourKLine
)
// Every hour
c
.
AddFunc
(
"0 0 0 * *"
,
job
.
HandleDayKLine
)
// Every day at midnight
// Start the cron scheduler
c
.
Start
()
}
exchain/exmonitor/markethandle.go
0 → 100644
View file @
c1cf3bd2
package
exmonitor
import
(
"context"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
)
// MongoMarketHandler handles market data operations
type
mongoMarketHandler
struct
{
client
*
mongo
.
Client
db
*
mongo
.
Database
}
// HandleTrade inserts an ExchangeTrade into the corresponding collection
func
(
h
*
mongoMarketHandler
)
HandleTrade
(
symbol
string
,
trade
*
ExchangeTrade
)
error
{
collection
:=
h
.
db
.
Collection
(
"exchange_trade_"
+
symbol
)
_
,
err
:=
collection
.
InsertOne
(
context
.
Background
(),
trade
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to insert trade: %v"
,
err
)
}
return
nil
}
// HandleKLine inserts a KLine into the corresponding collection
func
(
h
*
mongoMarketHandler
)
HandleKLine
(
symbol
string
,
kline
*
KLine
)
error
{
collection
:=
h
.
db
.
Collection
(
"exchange_kline_"
+
symbol
+
"_"
+
kline
.
Period
)
_
,
err
:=
collection
.
InsertOne
(
context
.
Background
(),
kline
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to insert KLine: %v"
,
err
)
}
return
nil
}
exchain/exmonitor/marketservice.go
0 → 100644
View file @
c1cf3bd2
package
exmonitor
import
(
"context"
"github.com/exchain/go-exchain/op-supervisor/config"
"log"
"math/big"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const
(
mongodbName
=
"exmarket"
klinePrefix
=
"exchange_kline_"
tradePrefix
=
"exchange_trade_"
)
type
marketService
struct
{
mongoClient
*
mongo
.
Client
}
func
NewMarketService
(
conf
*
config
.
Config
)
MarketService
{
// Initialize MongoDB client
mongoURI
:=
""
// conf.MongoDBURI
clientOptions
:=
options
.
Client
()
.
ApplyURI
(
mongoURI
)
client
,
err
:=
mongo
.
Connect
(
context
.
TODO
(),
clientOptions
)
if
err
!=
nil
{
log
.
Fatalf
(
"Failed to connect to MongoDB: %v"
,
err
)
}
return
&
marketService
{
mongoClient
:
client
}
}
func
(
s
*
marketService
)
FindAllKLine
(
symbol
,
period
string
)
[]
*
KLine
{
collection
:=
s
.
mongoClient
.
Database
(
mongodbName
)
.
Collection
(
klinePrefix
+
symbol
+
"_"
+
period
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
opts
:=
options
.
Find
()
.
SetSort
(
bson
.
D
{{
Key
:
"time"
,
Value
:
-
1
}})
.
SetLimit
(
1000
)
cursor
,
err
:=
collection
.
Find
(
ctx
,
bson
.
M
{},
opts
)
if
err
!=
nil
{
log
.
Fatalf
(
"Error finding KLine: %v"
,
err
)
}
defer
cursor
.
Close
(
ctx
)
var
kLines
[]
*
KLine
if
err
:=
cursor
.
All
(
ctx
,
&
kLines
);
err
!=
nil
{
log
.
Fatalf
(
"Error decoding KLine: %v"
,
err
)
}
return
kLines
}
func
(
s
*
marketService
)
FindAllKLineByTimeRange
(
symbol
string
,
fromTime
,
toTime
int64
,
period
string
)
[]
*
KLine
{
collection
:=
s
.
mongoClient
.
Database
(
mongodbName
)
.
Collection
(
klinePrefix
+
symbol
+
"_"
+
period
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
filter
:=
bson
.
M
{
"time"
:
bson
.
M
{
"$gte"
:
fromTime
,
"$lte"
:
toTime
,
},
}
opts
:=
options
.
Find
()
.
SetSort
(
bson
.
D
{{
Key
:
"time"
,
Value
:
1
}})
cursor
,
err
:=
collection
.
Find
(
ctx
,
filter
,
opts
)
if
err
!=
nil
{
log
.
Fatalf
(
"Error finding KLine by time range: %v"
,
err
)
}
defer
cursor
.
Close
(
ctx
)
var
kLines
[]
*
KLine
if
err
:=
cursor
.
All
(
ctx
,
&
kLines
);
err
!=
nil
{
log
.
Fatalf
(
"Error decoding KLine: %v"
,
err
)
}
return
kLines
}
func
(
s
*
marketService
)
FindFirstTrade
(
symbol
string
,
fromTime
,
toTime
int64
)
*
ExchangeTrade
{
return
s
.
findTrade
(
symbol
,
fromTime
,
toTime
,
bson
.
D
{{
Key
:
"time"
,
Value
:
1
}})
}
func
(
s
*
marketService
)
FindLastTrade
(
symbol
string
,
fromTime
,
toTime
int64
)
*
ExchangeTrade
{
return
s
.
findTrade
(
symbol
,
fromTime
,
toTime
,
bson
.
D
{{
Key
:
"time"
,
Value
:
-
1
}})
}
func
(
s
*
marketService
)
findTrade
(
symbol
string
,
fromTime
,
toTime
int64
,
sortOrder
bson
.
D
)
*
ExchangeTrade
{
collection
:=
s
.
mongoClient
.
Database
(
mongodbName
)
.
Collection
(
tradePrefix
+
symbol
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
filter
:=
bson
.
M
{
"time"
:
bson
.
M
{
"$gte"
:
fromTime
,
"$lte"
:
toTime
,
},
}
opts
:=
options
.
FindOne
()
.
SetSort
(
sortOrder
)
var
trade
ExchangeTrade
err
:=
collection
.
FindOne
(
ctx
,
filter
,
opts
)
.
Decode
(
&
trade
)
if
err
!=
nil
{
if
err
==
mongo
.
ErrNoDocuments
{
return
nil
}
log
.
Fatalf
(
"Error finding trade: %v"
,
err
)
}
return
&
trade
}
func
(
s
*
marketService
)
FindTradeByTimeRange
(
symbol
string
,
timeStart
,
timeEnd
int64
)
[]
*
ExchangeTrade
{
collection
:=
s
.
mongoClient
.
Database
(
mongodbName
)
.
Collection
(
tradePrefix
+
symbol
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
filter
:=
bson
.
M
{
"time"
:
bson
.
M
{
"$gte"
:
timeStart
,
"$lt"
:
timeEnd
,
},
}
opts
:=
options
.
Find
()
.
SetSort
(
bson
.
D
{{
Key
:
"time"
,
Value
:
1
}})
cursor
,
err
:=
collection
.
Find
(
ctx
,
filter
,
opts
)
if
err
!=
nil
{
log
.
Fatalf
(
"Error finding trades by time range: %v"
,
err
)
}
defer
cursor
.
Close
(
ctx
)
var
trades
[]
*
ExchangeTrade
if
err
:=
cursor
.
All
(
ctx
,
&
trades
);
err
!=
nil
{
log
.
Fatalf
(
"Error decoding trades: %v"
,
err
)
}
return
trades
}
func
(
s
*
marketService
)
SaveKLine
(
symbol
string
,
kLine
*
KLine
)
{
collection
:=
s
.
mongoClient
.
Database
(
mongodbName
)
.
Collection
(
klinePrefix
+
symbol
+
"_"
+
kLine
.
Period
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
_
,
err
:=
collection
.
InsertOne
(
ctx
,
kLine
)
if
err
!=
nil
{
log
.
Fatalf
(
"Error saving KLine: %v"
,
err
)
}
}
func
(
s
*
marketService
)
FindTradeVolume
(
symbol
string
,
timeStart
,
timeEnd
int64
)
*
big
.
Float
{
collection
:=
s
.
mongoClient
.
Database
(
mongodbName
)
.
Collection
(
klinePrefix
+
symbol
+
"_1min"
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
filter
:=
bson
.
M
{
"time"
:
bson
.
M
{
"$gt"
:
timeStart
,
"$lte"
:
timeEnd
,
},
}
opts
:=
options
.
Find
()
.
SetSort
(
bson
.
D
{{
Key
:
"time"
,
Value
:
1
}})
cursor
,
err
:=
collection
.
Find
(
ctx
,
filter
,
opts
)
if
err
!=
nil
{
log
.
Fatalf
(
"Error finding trade volume: %v"
,
err
)
}
defer
cursor
.
Close
(
ctx
)
totalVolume
:=
big
.
NewFloat
(
0
)
for
cursor
.
Next
(
ctx
)
{
var
kLine
KLine
if
err
:=
cursor
.
Decode
(
&
kLine
);
err
!=
nil
{
log
.
Fatalf
(
"Error decoding KLine: %v"
,
err
)
}
totalVolume
=
totalVolume
.
Add
(
totalVolume
,
kLine
.
Volume
)
}
return
totalVolume
}
func
(
s
*
marketService
)
NewHandler
(
symbol
string
)
MarketHandler
{
return
&
mongoMarketHandler
{
client
:
s
.
mongoClient
,
db
:
s
.
mongoClient
.
Database
(
mongodbName
),
}
}
exchain/exmonitor/monitor.go
0 → 100644
View file @
c1cf3bd2
package
exmonitor
import
"github.com/exchain/go-exchain/op-supervisor/config"
type
Monitor
struct
{
kline
*
KLineGeneratorJob
service
MarketService
}
func
NewMonitor
(
conf
*
config
.
Config
)
*
Monitor
{
service
:=
NewMarketService
(
conf
)
kline
:=
NewKLineGeneratorJob
(
&
coinProcessorFactory
{},
service
)
return
&
Monitor
{
kline
:
kline
,
service
:
service
,
}
}
func
(
m
*
Monitor
)
Start
()
{
// Initialize the monitor
// Start the monitoring process
m
.
kline
.
Start
()
}
exchain/exmonitor/processor.go
0 → 100644
View file @
c1cf3bd2
package
exmonitor
import
(
"fmt"
"math/big"
"sync"
"time"
)
type
KLine
struct
{
Time
int64
Period
string
Count
int
OpenPrice
*
big
.
Float
ClosePrice
*
big
.
Float
HighestPrice
*
big
.
Float
LowestPrice
*
big
.
Float
Volume
*
big
.
Float
Turnover
*
big
.
Float
}
type
CoinThumb
struct
{
Symbol
string
Open
*
big
.
Float
Close
*
big
.
Float
High
*
big
.
Float
Low
*
big
.
Float
Volume
*
big
.
Float
Turnover
*
big
.
Float
Change
*
big
.
Float
Chg
*
big
.
Float
BaseUsdRate
*
big
.
Float
UsdRate
*
big
.
Float
LastDayClose
*
big
.
Float
}
type
ExchangeTrade
struct
{
Price
*
big
.
Float
Amount
*
big
.
Float
}
type
MarketHandler
interface
{
HandleTrade
(
symbol
string
,
trade
*
ExchangeTrade
)
error
HandleKLine
(
symbol
string
,
kline
*
KLine
)
}
type
MarketService
interface
{
//FindAllKLine(symbol string, start, end int64, period string) []*KLine
FindTradeVolume
(
symbol
string
,
start
,
end
int64
)
*
big
.
Float
FindTradeByTimeRange
(
symbol
string
,
start
,
end
int64
)
[]
*
ExchangeTrade
FindAllKLineByTimeRange
(
symbol
string
,
fromTime
,
toTime
int64
,
period
string
)
[]
*
KLine
SaveKLine
(
symbol
string
,
kline
*
KLine
)
}
type
DefaultCoinProcessor
struct
{
symbol
string
baseCoin
string
currentKLine
*
KLine
handlers
[]
MarketHandler
coinThumb
*
CoinThumb
service
MarketService
coinExchangeRate
map
[
string
]
*
big
.
Float
isHalt
bool
stopKLine
bool
mutex
sync
.
Mutex
}
type
coinProcessorFactory
struct
{
mux
sync
.
Mutex
coinProcessors
map
[
string
]
*
DefaultCoinProcessor
}
func
(
cpf
*
coinProcessorFactory
)
GetCoinProcessor
(
symbol
,
baseCoin
string
)
*
DefaultCoinProcessor
{
cpf
.
mux
.
Lock
()
defer
cpf
.
mux
.
Unlock
()
if
processor
,
exists
:=
cpf
.
coinProcessors
[
symbol
];
exists
{
return
processor
}
else
{
processor
=
&
DefaultCoinProcessor
{
symbol
:
symbol
,
baseCoin
:
baseCoin
,
currentKLine
:
createNewKLine
(),
handlers
:
[]
MarketHandler
{},
coinThumb
:
&
CoinThumb
{},
isHalt
:
true
,
stopKLine
:
false
,
}
cpf
.
coinProcessors
[
symbol
]
=
processor
return
processor
}
}
func
(
cpf
*
coinProcessorFactory
)
GetProcessorMap
()
map
[
string
]
*
DefaultCoinProcessor
{
cpf
.
mux
.
Lock
()
defer
cpf
.
mux
.
Unlock
()
// copy the map to avoid concurrent map writes
// and return a new map
processorMap
:=
make
(
map
[
string
]
*
DefaultCoinProcessor
)
for
k
,
v
:=
range
cpf
.
coinProcessors
{
processorMap
[
k
]
=
v
}
return
processorMap
}
func
createNewKLine
()
*
KLine
{
now
:=
time
.
Now
()
nextMinute
:=
now
.
Add
(
time
.
Minute
)
return
&
KLine
{
Time
:
nextMinute
.
UnixMilli
(),
Period
:
"1min"
,
Count
:
0
,
OpenPrice
:
big
.
NewFloat
(
0
),
ClosePrice
:
big
.
NewFloat
(
0
),
HighestPrice
:
big
.
NewFloat
(
0
),
LowestPrice
:
big
.
NewFloat
(
0
),
Volume
:
big
.
NewFloat
(
0
),
Turnover
:
big
.
NewFloat
(
0
),
}
}
func
(
p
*
DefaultCoinProcessor
)
InitializeThumb
()
{
p
.
mutex
.
Lock
()
defer
p
.
mutex
.
Unlock
()
now
:=
time
.
Now
()
startOfDay
:=
time
.
Date
(
now
.
Year
(),
now
.
Month
(),
now
.
Day
(),
0
,
0
,
0
,
0
,
now
.
Location
())
lines
:=
p
.
service
.
FindAllKLineByTimeRange
(
p
.
symbol
,
startOfDay
.
UnixMilli
(),
now
.
UnixMilli
(),
"1min"
)
p
.
coinThumb
=
&
CoinThumb
{
Symbol
:
p
.
symbol
,
Open
:
big
.
NewFloat
(
0
),
High
:
big
.
NewFloat
(
0
),
Low
:
big
.
NewFloat
(
0
),
Close
:
big
.
NewFloat
(
0
),
Volume
:
big
.
NewFloat
(
0
),
Turnover
:
big
.
NewFloat
(
0
),
}
for
_
,
line
:=
range
lines
{
if
line
.
OpenPrice
.
Cmp
(
big
.
NewFloat
(
0
))
==
0
{
continue
}
if
p
.
coinThumb
.
Open
.
Cmp
(
big
.
NewFloat
(
0
))
==
0
{
p
.
coinThumb
.
Open
=
line
.
OpenPrice
}
if
p
.
coinThumb
.
High
.
Cmp
(
line
.
HighestPrice
)
<
0
{
p
.
coinThumb
.
High
=
line
.
HighestPrice
}
if
line
.
LowestPrice
.
Cmp
(
big
.
NewFloat
(
0
))
>
0
&&
p
.
coinThumb
.
Low
.
Cmp
(
line
.
LowestPrice
)
>
0
{
p
.
coinThumb
.
Low
=
line
.
LowestPrice
}
if
line
.
ClosePrice
.
Cmp
(
big
.
NewFloat
(
0
))
>
0
{
p
.
coinThumb
.
Close
=
line
.
ClosePrice
}
p
.
coinThumb
.
Volume
.
Add
(
p
.
coinThumb
.
Volume
,
line
.
Volume
)
p
.
coinThumb
.
Turnover
.
Add
(
p
.
coinThumb
.
Turnover
,
line
.
Turnover
)
}
change
:=
new
(
big
.
Float
)
.
Sub
(
p
.
coinThumb
.
Close
,
p
.
coinThumb
.
Open
)
p
.
coinThumb
.
Change
=
change
if
p
.
coinThumb
.
Low
.
Cmp
(
big
.
NewFloat
(
0
))
>
0
{
p
.
coinThumb
.
Chg
=
new
(
big
.
Float
)
.
Quo
(
change
,
p
.
coinThumb
.
Low
)
}
}
func
(
p
*
DefaultCoinProcessor
)
IsStopKline
()
bool
{
return
p
.
stopKLine
}
func
(
p
*
DefaultCoinProcessor
)
ResetThumb
()
{
p
.
mutex
.
Lock
()
defer
p
.
mutex
.
Unlock
()
p
.
coinThumb
.
Open
=
big
.
NewFloat
(
0
)
p
.
coinThumb
.
High
=
big
.
NewFloat
(
0
)
p
.
coinThumb
.
Low
=
big
.
NewFloat
(
0
)
p
.
coinThumb
.
Close
=
big
.
NewFloat
(
0
)
p
.
coinThumb
.
Change
=
big
.
NewFloat
(
0
)
p
.
coinThumb
.
Chg
=
big
.
NewFloat
(
0
)
p
.
coinThumb
.
LastDayClose
=
p
.
coinThumb
.
Close
}
func
(
p
*
DefaultCoinProcessor
)
AutoGenerate
()
{
p
.
mutex
.
Lock
()
defer
p
.
mutex
.
Unlock
()
if
p
.
coinThumb
!=
nil
{
if
p
.
currentKLine
.
OpenPrice
.
Cmp
(
big
.
NewFloat
(
0
))
==
0
{
p
.
currentKLine
.
OpenPrice
=
p
.
coinThumb
.
Close
p
.
currentKLine
.
LowestPrice
=
p
.
coinThumb
.
Close
p
.
currentKLine
.
HighestPrice
=
p
.
coinThumb
.
Close
p
.
currentKLine
.
ClosePrice
=
p
.
coinThumb
.
Close
}
p
.
currentKLine
.
Time
=
time
.
Now
()
.
UnixMilli
()
p
.
handleKLineStorage
(
p
.
currentKLine
)
p
.
currentKLine
=
createNewKLine
()
}
}
func
(
p
*
DefaultCoinProcessor
)
handleKLineStorage
(
kline
*
KLine
)
{
for
_
,
handler
:=
range
p
.
handlers
{
handler
.
HandleKLine
(
p
.
symbol
,
kline
)
}
}
func
(
p
*
DefaultCoinProcessor
)
AddHandler
(
handler
MarketHandler
)
{
p
.
handlers
=
append
(
p
.
handlers
,
handler
)
}
func
(
p
*
DefaultCoinProcessor
)
GenerateKLine
(
rangeValue
int
,
field
time
.
Duration
,
timestamp
int64
)
{
p
.
stopKLine
=
false
defer
func
()
{
p
.
stopKLine
=
true
}()
endTime
:=
time
.
UnixMilli
(
timestamp
)
startTime
:=
endTime
.
Add
(
-
field
*
time
.
Duration
(
rangeValue
))
trades
:=
p
.
service
.
FindTradeByTimeRange
(
p
.
symbol
,
startTime
.
UnixMilli
(),
endTime
.
UnixMilli
())
kline
:=
&
KLine
{
Time
:
endTime
.
UnixMilli
(),
Period
:
formatPeriod
(
rangeValue
,
field
),
OpenPrice
:
big
.
NewFloat
(
0
),
ClosePrice
:
big
.
NewFloat
(
0
),
HighestPrice
:
big
.
NewFloat
(
0
),
LowestPrice
:
big
.
NewFloat
(
0
),
Volume
:
big
.
NewFloat
(
0
),
Turnover
:
big
.
NewFloat
(
0
),
}
for
_
,
trade
:=
range
trades
{
p
.
processTrade
(
kline
,
trade
)
}
if
kline
.
OpenPrice
.
Cmp
(
big
.
NewFloat
(
0
))
==
0
{
kline
.
OpenPrice
=
p
.
coinThumb
.
Close
kline
.
ClosePrice
=
p
.
coinThumb
.
Close
kline
.
LowestPrice
=
p
.
coinThumb
.
Close
kline
.
HighestPrice
=
p
.
coinThumb
.
Close
}
p
.
service
.
SaveKLine
(
p
.
symbol
,
kline
)
}
func
(
p
*
DefaultCoinProcessor
)
processTrade
(
kline
*
KLine
,
trade
*
ExchangeTrade
)
{
if
kline
.
OpenPrice
.
Cmp
(
big
.
NewFloat
(
0
))
==
0
{
kline
.
OpenPrice
=
trade
.
Price
kline
.
HighestPrice
=
trade
.
Price
kline
.
LowestPrice
=
trade
.
Price
kline
.
ClosePrice
=
trade
.
Price
}
else
{
if
trade
.
Price
.
Cmp
(
kline
.
HighestPrice
)
>
0
{
kline
.
HighestPrice
=
trade
.
Price
}
if
trade
.
Price
.
Cmp
(
kline
.
LowestPrice
)
<
0
{
kline
.
LowestPrice
=
trade
.
Price
}
kline
.
ClosePrice
=
trade
.
Price
}
kline
.
Count
++
kline
.
Volume
.
Add
(
kline
.
Volume
,
trade
.
Amount
)
turnover
:=
new
(
big
.
Float
)
.
Mul
(
trade
.
Price
,
trade
.
Amount
)
kline
.
Turnover
.
Add
(
kline
.
Turnover
,
turnover
)
}
func
formatPeriod
(
rangeValue
int
,
field
time
.
Duration
)
string
{
switch
field
{
case
K_FIELD_MIN
:
return
fmt
.
Sprintf
(
"%dmin"
,
rangeValue
)
case
K_FIELD_HOUR
:
return
fmt
.
Sprintf
(
"%dhour"
,
rangeValue
)
case
K_FIELD_DAY
:
return
fmt
.
Sprintf
(
"%dday"
,
rangeValue
)
case
K_FIELD_WEEK
:
return
fmt
.
Sprintf
(
"%dweek"
,
rangeValue
)
case
K_FIELD_MONTH
:
return
fmt
.
Sprintf
(
"%dmonth"
,
rangeValue
)
case
K_FIELD_YEAR
:
return
fmt
.
Sprintf
(
"%dyear"
,
rangeValue
)
default
:
return
"unknown"
}
}
func
(
p
*
DefaultCoinProcessor
)
Update24HVolume
(
currentTime
int64
)
{
if
p
.
coinThumb
!=
nil
{
p
.
mutex
.
Lock
()
defer
p
.
mutex
.
Unlock
()
// Calculate the start time (24 hours ago)
startTime
:=
time
.
UnixMilli
(
currentTime
)
.
Add
(
-
24
*
time
.
Hour
)
.
UnixMilli
()
// Fetch the trade volume from the service
volume
:=
p
.
service
.
FindTradeVolume
(
p
.
symbol
,
startTime
,
currentTime
)
// Set the volume in the coinThumb, rounded to 4 decimal places
p
.
coinThumb
.
Volume
=
new
(
big
.
Float
)
.
SetPrec
(
4
)
.
SetMode
(
big
.
ToZero
)
.
Set
(
volume
)
}
}
exchain/protocol/proto/orderbook/v1/orderbook.proto
View file @
c1cf3bd2
This diff is collapsed.
Click to expand it.
go.mod
View file @
c1cf3bd2
...
...
@@ -59,6 +59,8 @@ require (
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
go.mongodb.org/mongo-driver v1.17.3 // indirect
)
require (
...
...
go.sum
View file @
c1cf3bd2
...
...
@@ -697,6 +697,8 @@ github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtD
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
...
...
@@ -803,6 +805,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeHxQ=
go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment