Commit 95490f6f authored by duanjinfei's avatar duanjinfei

feat: improve local development experience, enhance block synchronization, and...

feat: improve local development experience, enhance block synchronization, and ensure schema creation for DML events.
parent 6d77f28e
...@@ -109,5 +109,30 @@ ...@@ -109,5 +109,30 @@
], ],
"name": "DataDeleted", "name": "DataDeleted",
"type": "event" "type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"indexed": false,
"internalType": "string[]",
"name": "columns",
"type": "string[]"
},
{
"indexed": false,
"internalType": "string[][]",
"name": "values",
"type": "string[][]"
}
],
"name": "DataBatchInserted",
"type": "event"
} }
] ]
\ No newline at end of file
...@@ -16,7 +16,7 @@ module.exports = { ...@@ -16,7 +16,7 @@ module.exports = {
chainId: 31337, chainId: 31337,
mining: { mining: {
auto: true, auto: true,
interval: 0 interval: 3000
}, },
accounts: { accounts: {
count: 10, count: 10,
......
...@@ -26,11 +26,11 @@ type DatabaseConfig struct { ...@@ -26,11 +26,11 @@ type DatabaseConfig struct {
} }
type SyncConfig struct { type SyncConfig struct {
StartBlock uint64 `mapstructure:"start_block"` StartBlock uint64 `mapstructure:"start_block"` // 从哪个区块开始同步
Confirmations uint64 `mapstructure:"confirmations"` Confirmations uint64 `mapstructure:"confirmations"` // 确认区块数
PollInterval time.Duration `mapstructure:"poll_interval"` PollInterval time.Duration `mapstructure:"poll_interval"` // 轮询间隔
MaxBatchSize int `mapstructure:"max_batch_size"` MaxBatchSize int `mapstructure:"max_batch_size"` // 批量大小
AddressBatchSize int `mapstructure:"address_batch_size"` AddressBatchSize int `mapstructure:"address_batch_size"` // 地址批量大小
} }
type LogConfig struct { type LogConfig struct {
......
...@@ -259,6 +259,10 @@ func (l *DataListener) processLog(ctx context.Context, vLog types.Log) error { ...@@ -259,6 +259,10 @@ func (l *DataListener) processLog(ctx context.Context, vLog types.Log) error {
return fmt.Errorf("unpack DataBatchInserted error: %w", err) return fmt.Errorf("unpack DataBatchInserted error: %w", err)
} }
// 注意:tableName 虽然是 indexed,但 indexed string 在 Topics 中只存储 hash
// 无法恢复原始值,需要从合约调用或其他方式获取
// 临时方案:从 Data 中的其他字段推断,或者修改合约不要 index tableName
log.Printf("[DML] DataBatchInserted: %s.%s (%d rows)", contractAddr, ev.TableName, len(ev.Values)) log.Printf("[DML] DataBatchInserted: %s.%s (%d rows)", contractAddr, ev.TableName, len(ev.Values))
return l.executor.HandleBatchInserted(ctx, contractAddr, ev.TableName, ev.Columns, ev.Values) return l.executor.HandleBatchInserted(ctx, contractAddr, ev.TableName, ev.Columns, ev.Values)
......
...@@ -81,12 +81,20 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error { ...@@ -81,12 +81,20 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error {
return fmt.Errorf("get head block failed: %w", err) return fmt.Errorf("get head block failed: %w", err)
} }
// 4. 计算 ToBlock (考虑确认数和BatchSize) // 4. ⭐ 防止 uint64 下溢
if headBlock < l.cfg.Sync.Confirmations {
log.Printf("Chain too young (head: %d, need %d confirmations), waiting...",
headBlock, l.cfg.Sync.Confirmations)
return nil
}
// 5. 计算 ToBlock (考虑确认数和BatchSize)
safeHead := headBlock - l.cfg.Sync.Confirmations safeHead := headBlock - l.cfg.Sync.Confirmations
if fromBlock > safeHead { if fromBlock > safeHead {
return nil // 还没到安全高度 return nil // 还没到安全高度
} }
// 6. 计算扫描范围
toBlock := fromBlock + uint64(l.cfg.Sync.MaxBatchSize) toBlock := fromBlock + uint64(l.cfg.Sync.MaxBatchSize)
if toBlock > safeHead { if toBlock > safeHead {
toBlock = safeHead toBlock = safeHead
...@@ -94,7 +102,7 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error { ...@@ -94,7 +102,7 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error {
log.Printf("Scanning Factory: %d -> %d", fromBlock, toBlock) log.Printf("Scanning Factory: %d -> %d", fromBlock, toBlock)
// 5. 拉取日志 // 7. 拉取日志
query := ethereum.FilterQuery{ query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)), FromBlock: big.NewInt(int64(fromBlock)),
ToBlock: big.NewInt(int64(toBlock)), ToBlock: big.NewInt(int64(toBlock)),
...@@ -109,7 +117,7 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error { ...@@ -109,7 +117,7 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error {
log.Printf("Found %d logs from Factory", len(logs)) log.Printf("Found %d logs from Factory", len(logs))
// 6. 处理日志 // 8. 处理日志
for _, vLog := range logs { for _, vLog := range logs {
log.Printf("Processing log with %d topics", len(vLog.Topics)) log.Printf("Processing log with %d topics", len(vLog.Topics))
if len(vLog.Topics) < 3 { if len(vLog.Topics) < 3 {
...@@ -131,7 +139,7 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error { ...@@ -131,7 +139,7 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error {
} }
} }
// 7. 更新游标 // 9. 更新游标
if err := l.db.UpdateLastBlock(ctx, SyncKeyFactory, toBlock); err != nil { if err := l.db.UpdateLastBlock(ctx, SyncKeyFactory, toBlock); err != nil {
return fmt.Errorf("update cursor failed: %w", err) return fmt.Errorf("update cursor failed: %w", err)
} }
......
...@@ -76,6 +76,14 @@ func (e *Executor) HandleTableCreated(ctx context.Context, contractAddr string, ...@@ -76,6 +76,14 @@ func (e *Executor) HandleTableCreated(ctx context.Context, contractAddr string,
colDefs = append(colDefs, pkDef) colDefs = append(colDefs, pkDef)
} }
// ⭐ 先创建 Schema(如果不存在)
createSchemaSql := fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS %s`, schemaName)
log.Printf("Ensuring schema exists: %s", createSchemaSql)
if err := e.db.Exec(ctx, createSchemaSql); err != nil {
return fmt.Errorf("create schema failed: %w", err)
}
// 再创建表
fullSql := fmt.Sprintf( fullSql := fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s.%s (%s)`, `CREATE TABLE IF NOT EXISTS %s.%s (%s)`,
schemaName, schemaName,
...@@ -84,7 +92,11 @@ func (e *Executor) HandleTableCreated(ctx context.Context, contractAddr string, ...@@ -84,7 +92,11 @@ func (e *Executor) HandleTableCreated(ctx context.Context, contractAddr string,
) )
log.Printf("Executing DDL: %s", fullSql) log.Printf("Executing DDL: %s", fullSql)
return e.db.Exec(ctx, fullSql) if err := e.db.Exec(ctx, fullSql); err != nil {
return fmt.Errorf("create table failed: %w", err)
}
return nil
} }
// HandleDataInserted 处理插入事件 // HandleDataInserted 处理插入事件
......
...@@ -50,18 +50,18 @@ else ...@@ -50,18 +50,18 @@ else
fi fi
echo "" echo ""
# # 3. 启动 Hardhat 节点(后台) # 3. 启动 Hardhat 节点(后台)
# echo "⛓️ Starting Hardhat node..." echo "⛓️ Starting Hardhat node..."
# cd hardhat cd hardhat
# npx hardhat node > ../hardhat-node.log 2>&1 & npx hardhat node > ../hardhat-node.log 2>&1 &
# HARDHAT_PID=$! HARDHAT_PID=$!
# echo $HARDHAT_PID > ../hardhat.pid echo $HARDHAT_PID > ../hardhat.pid
# cd .. cd ..
# echo "✅ Hardhat node started (PID: $HARDHAT_PID)" echo "✅ Hardhat node started (PID: $HARDHAT_PID)"
# echo " Logs: hardhat-node.log" echo " Logs: hardhat-node.log"
# echo " Waiting for node to be ready..." echo " Waiting for node to be ready..."
# sleep 3 sleep 3
# echo "" echo ""
# 4. 部署合约 # 4. 部署合约
echo "📝 Deploying contracts..." echo "📝 Deploying contracts..."
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment