Commit 6d77f28e authored by duanjinfei's avatar duanjinfei

feat: Add SQLSyncFactory.sol contract and install project dependencies.

parent 3e60eb32
...@@ -51,4 +51,5 @@ yarn-error.log ...@@ -51,4 +51,5 @@ yarn-error.log
# Cursor specific # Cursor specific
.cursor/ .cursor/
go.sum go.sum
\ No newline at end of file *.pid
\ No newline at end of file
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
COPY . . COPY . .
# 编译 (CGO_ENABLED=0 静态链接) # 编译 (CGO_ENABLED=0 静态链接)
RUN CGO_ENABLED=0 GOOS=linux go build -o chainsql ./cmd/chainsql/main.go RUN CGO_ENABLED=0 GOOS=linux go build -o chainsql ./cmd/main.go
# --- Final Stage --- # --- Final Stage ---
FROM alpine:3.19 FROM alpine:3.19
......
# ChainSQL 使用示例
## 快速开始
### 1. 启动服务
```bash
# 使用 Docker Compose(推荐)
docker-compose up -d
# 或本地运行
make run
```
### 2. 部署智能合约
```solidity
// 部署 SQLSyncFactory
const factory = await SQLSyncFactory.deploy();
// 创建实例
const tx = await factory.createInstance();
const receipt = await tx.wait();
const instanceAddr = receipt.events[0].args.instance;
```
### 3. 使用合约操作数据库
```javascript
const sqlSync = await ethers.getContractAt("SQLSync", instanceAddr);
// 创建表
await sqlSync.createTable("users", [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true },
{ name: "name", sqlType: "VARCHAR(100)", isPrimaryKey: false },
{ name: "email", sqlType: "VARCHAR(255)", isPrimaryKey: false },
{ name: "status", sqlType: "VARCHAR(20)", isPrimaryKey: false }
]);
// 插入数据
await sqlSync.insert("users",
["id", "name", "email", "status"],
["1", "Alice", "alice@example.com", "active"]
);
// 更新数据
await sqlSync.update("users",
["status"],
["inactive"],
"id = 1"
);
// 删除数据
await sqlSync.deleteData("users", "status = 'inactive'");
// 批量插入
await sqlSync.batchInsert("users",
["id", "name", "email", "status"],
[
"2", "Bob", "bob@example.com", "active",
"3", "Charlie", "charlie@example.com", "active"
]
);
```
### 4. 查询数据库
ChainSQL 会自动将事件同步到 PostgreSQL,你可以直接查询:
```sql
-- 连接数据库
psql -U chainsql -d chainsql
-- 查询数据(每个合约有独立的 schema)
SELECT * FROM "0x1234...abcd".users WHERE status = 'active';
-- 查看所有活跃合约
SELECT * FROM _chainsql_instances WHERE status = 'active';
-- 查看同步状态
SELECT * FROM _chainsql_cursor;
```
### 5. 监控服务
```bash
# 健康检查
curl http://localhost:8080/health
# 就绪检查
curl http://localhost:8080/ready
# Prometheus 指标
curl http://localhost:8080/metrics
```
## 配置示例
### 开发环境
```yaml
# configs/config.yaml
chain:
rpc_url: "http://localhost:8545"
factory_address: "0x5FbDB2315678afecb367f032d93F642f64180aa3"
chain_id: 31337
database:
dsn: "postgres://chainsql:password123@localhost:5432/chainsql?sslmode=disable"
sync:
start_block: 0
confirmations: 1
poll_interval: "1s"
max_batch_size: 100
address_batch_size: 50
log:
level: "debug"
development: true
http:
port: 8080
```
### 生产环境
```yaml
# configs/config.yaml
chain:
rpc_url: "https://mainnet.infura.io/v3/YOUR_API_KEY"
factory_address: "0x..."
chain_id: 1
database:
dsn: "postgres://user:password@db.example.com:5432/chainsql?sslmode=require"
sync:
start_block: 18000000
confirmations: 12
poll_interval: "5s"
max_batch_size: 1000
address_batch_size: 100
log:
level: "info"
development: false
http:
port: 8080
```
## WHERE 子句示例
ChainSQL 支持安全的 WHERE 子句解析:
```javascript
// 简单条件
await sqlSync.update("users", ["status"], ["active"], "id = 1");
// 多条件
await sqlSync.deleteData("users", "status = 'inactive' AND created_at < '2024-01-01'");
// IN 操作
await sqlSync.update("users", ["verified"], ["true"], "id IN (1, 2, 3)");
// LIKE 操作
await sqlSync.deleteData("users", "email LIKE '%@spam.com'");
// IS NULL
await sqlSync.deleteData("users", "email IS NULL");
// 复杂条件
await sqlSync.update("users",
["status"],
["premium"],
"score > 100 AND status = 'active'"
);
```
## 监控集成
### Prometheus 配置
```yaml
# prometheus.yml
scrape_configs:
- job_name: 'chainsql'
static_configs:
- targets: ['localhost:8080']
```
### Grafana 仪表板
关键指标:
- `chainsql_events_processed_total` - 事件处理总数
- `chainsql_sync_block_height` - 同步进度
- `chainsql_active_contracts` - 活跃合约数
- `chainsql_database_operations_total` - 数据库操作数
## 故障排查
### 检查日志
```bash
# Docker
docker-compose logs -f chainsql
# 本地
tail -f logs/chainsql.log
```
### 常见问题
**Q: 服务启动失败**
```bash
# 检查数据库连接
curl http://localhost:8080/ready
# 查看详细日志
docker-compose logs chainsql
```
**Q: 事件未同步**
```bash
# 检查同步状态
psql -U chainsql -c "SELECT * FROM _chainsql_cursor;"
# 检查合约注册
psql -U chainsql -c "SELECT * FROM _chainsql_instances;"
```
**Q: 性能问题**
```bash
# 查看 Prometheus 指标
curl http://localhost:8080/metrics | grep duration
```
## 最佳实践
1. **安全性**
- 始终使用参数化查询
- 验证 WHERE 子句的列名
- 定期审查数据库权限
2. **性能**
- 使用批量插入代替单条插入
- 合理设置 `max_batch_size`
- 监控数据库连接池
3. **可靠性**
- 设置合适的 `confirmations`
- 定期备份数据库
- 监控同步延迟
4. **监控**
- 配置 Prometheus 告警
- 监控关键指标
- 定期检查健康状态
...@@ -9,7 +9,7 @@ all: build ...@@ -9,7 +9,7 @@ all: build
# 编译 Go 二进制文件 # 编译 Go 二进制文件
build: build:
@echo "Building binary..." @echo "Building binary..."
@go build -o bin/$(BINARY_NAME) ./cmd/chainsql/main.go @go build -o bin/$(BINARY_NAME) ./cmd/main.go
# 运行服务 (本地开发) # 运行服务 (本地开发)
run: build run: build
......
# ChainSQL
[![Go Version](https://img.shields.io/badge/Go-1.24+-00ADD8?style=flat&logo=go)](https://golang.org)
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE)
**ChainSQL** 是一个区块链到 SQL 数据库的同步服务,它监听以太坊智能合约发出的事件,并将这些事件转换为实际的数据库操作。
## ✨ 核心特性
- 🔄 **自动同步**:实时监听区块链事件并同步到 PostgreSQL
- 🏭 **工厂模式**:支持动态创建和管理多个合约实例
- 🔒 **安全第一**:SQL 注入防护、WHERE 子句解析、类型白名单验证
- 📊 **完整 CRUD**:支持 CREATE TABLE、INSERT、UPDATE、DELETE 操作
- 🚀 **高性能**:批量插入优化、并发处理、连接池管理
- 📈 **可观测性**:Prometheus 监控指标、结构化日志、健康检查
- 🐳 **容器化**:Docker 和 Docker Compose 支持
## 🏗️ 架构概览
```
┌─────────────────┐
│ Smart Contract │ (Solidity)
│ SQLSync.sol │
└────────┬────────┘
│ Events (TableCreated, DataInserted, etc.)
┌─────────────────┐
│ ChainSQL App │ (Go)
│ │
│ ┌───────────┐ │
│ │ Factory │ │ 监听新合约实例
│ │ Listener │ │
│ └───────────┘ │
│ │
│ ┌───────────┐ │
│ │ Data │ │ 监听数据事件
│ │ Listener │ │
│ └───────────┘ │
│ │
│ ┌───────────┐ │
│ │ SQL │ │ 执行数据库操作
│ │ Executor │ │
│ └───────────┘ │
└────────┬────────┘
┌─────────────────┐
│ PostgreSQL │
└─────────────────┘
```
## 🚀 快速开始
### 前置要求
- Go 1.24+
- PostgreSQL 15+
- Docker & Docker Compose (可选)
### 本地开发
1. **克隆项目**
```bash
git clone <repository-url>
cd chain-sql
```
2. **配置文件**
```bash
cp configs/config.example.yaml configs/config.yaml
# 编辑 config.yaml 填入你的配置
```
3. **安装依赖**
```bash
go mod download
```
4. **运行服务**
```bash
make run
# 或者
go run cmd/main.go
```
### Docker 部署
```bash
# 构建并启动所有服务(PostgreSQL + ChainSQL)
make docker-up
# 查看日志
docker-compose logs -f chainsql
# 停止服务
make docker-down
```
## ⚙️ 配置说明
配置文件位于 `configs/config.yaml`
```yaml
chain:
rpc_url: "http://localhost:8545" # 以太坊节点 RPC
factory_address: "0x..." # 工厂合约地址
chain_id: 1337 # 链 ID
database:
dsn: "postgres://user:password@localhost:5432/chainsql?sslmode=disable"
sync:
start_block: 0 # 起始区块
confirmations: 12 # 确认区块数
poll_interval: "2s" # 轮询间隔
max_batch_size: 100 # 单次查询最大区块数
address_batch_size: 50 # 单次查询最大合约数
log:
level: "info" # debug, info, warn, error
development: false # 开发模式(彩色输出)
http:
port: 8080 # HTTP 服务端口
```
## 📡 HTTP 端点
ChainSQL 提供以下 HTTP 端点:
- `GET /metrics` - Prometheus 监控指标
- `GET /health` - 健康检查
- `GET /ready` - 就绪检查(包含数据库连接测试)
示例:
```bash
curl http://localhost:8080/health
curl http://localhost:8080/metrics
```
## 📊 监控指标
### 事件处理
- `chainsql_events_processed_total` - 事件处理总数(按类型和状态)
- `chainsql_event_processing_duration_seconds` - 事件处理延迟
### 同步状态
- `chainsql_sync_block_height` - 当前同步区块高度
- `chainsql_active_contracts` - 活跃合约数量
### 数据库操作
- `chainsql_database_operations_total` - 数据库操作总数
- `chainsql_database_operation_duration_seconds` - 数据库操作延迟
### 区块链 RPC
- `chainsql_blockchain_rpc_calls_total` - RPC 调用总数
- `chainsql_blockchain_rpc_duration_seconds` - RPC 调用延迟
## 🔐 安全特性
### SQL 注入防护
1. **WHERE 子句解析器**:安全解析 WHERE 条件,使用参数化查询
2. **类型白名单**:只允许预定义的 SQL 类型
3. **标识符转义**:自动转义表名和列名
4. **列名白名单**:验证 WHERE 子句中的列名
### 示例
```go
// ❌ 不安全(直接拼接)
sql := "DELETE FROM table WHERE " + whereClause
// ✅ 安全(使用解析器)
parser := NewWhereParser(allowedColumns)
cond, _ := parser.Parse("id = 123 AND status = 'active'")
sql := fmt.Sprintf("DELETE FROM table WHERE %s", cond.SQL)
db.Exec(ctx, sql, cond.Args...)
```
## 🛠️ 开发指南
### 项目结构
```
chain-sql/
├── cmd/ # 应用入口
├── internal/
│ ├── api/ # HTTP 服务器
│ ├── blockchain/ # 区块链客户端
│ ├── config/ # 配置管理
│ ├── core/ # 核心业务逻辑
│ ├── database/ # 数据库操作
│ ├── executor/ # SQL 执行器
│ ├── logger/ # 结构化日志
│ └── metrics/ # 监控指标
├── contract/ # Solidity 合约
├── configs/ # 配置文件
└── docker-compose.yaml # Docker 编排
```
### 添加新的事件处理
1.`internal/core/data_listener.go` 添加事件结构体
2.`internal/core/interfaces.go` 扩展 `EventHandler` 接口
3.`internal/executor/sql_builder.go` 实现处理方法
4.`processLog` 方法中添加 case 分支
### 运行测试
```bash
go test ./...
```
## 📝 智能合约
### SQLSync.sol
核心合约,提供 SQL 操作接口:
```solidity
// 创建表
function createTable(string calldata tableName, ColumnDef[] calldata columns) external;
// 插入数据
function insert(string calldata tableName, string[] calldata columns, string[] calldata values) external;
// 更新数据
function update(string calldata tableName, string[] calldata setColumns, string[] calldata setValues, string calldata whereClause) external;
// 删除数据
function deleteData(string calldata tableName, string calldata whereClause) external;
```
### SQLSyncFactory.sol
工厂合约,用于创建 SQLSync 实例:
```solidity
function createInstance() external returns (address instance);
```
## 🔄 工作流程
1. **部署合约**:部署 SQLSyncFactory 和 SQLSync 合约
2. **配置服务**:填写 `config.yaml` 中的合约地址和 RPC 端点
3. **启动服务**:运行 ChainSQL 应用
4. **创建实例**:调用工厂合约创建新的 SQLSync 实例
5. **执行操作**:通过合约调用 SQL 操作
6. **自动同步**:ChainSQL 监听事件并同步到数据库
## 🎯 使用场景
- 📊 **链上数据索引**:为 DApp 提供快速查询能力
- 🔍 **数据分析**:使用 SQL 分析区块链数据
- 📈 **报表生成**:基于链上数据生成业务报表
- 🏢 **多租户 SaaS**:每个用户一个合约实例
## 🤝 贡献
欢迎提交 Issue 和 Pull Request!
## 📄 许可证
MIT License
## 🙏 致谢
- [go-ethereum](https://github.com/ethereum/go-ethereum) - 以太坊 Go 客户端
- [pgx](https://github.com/jackc/pgx) - PostgreSQL 驱动
- [zap](https://github.com/uber-go/zap) - 高性能日志库
- [prometheus](https://prometheus.io/) - 监控系统
...@@ -3,12 +3,20 @@ package main ...@@ -3,12 +3,20 @@ package main
import ( import (
"context" "context"
"log" "log"
"os"
"os/signal"
"syscall"
"time" "time"
"chain-sql/internal/api"
"chain-sql/internal/blockchain" "chain-sql/internal/blockchain"
"chain-sql/internal/config" "chain-sql/internal/config"
"chain-sql/internal/core" "chain-sql/internal/core"
"chain-sql/internal/database" "chain-sql/internal/database"
"chain-sql/internal/logger"
"chain-sql/internal/metrics"
"go.uber.org/zap"
) )
func main() { func main() {
...@@ -17,49 +25,87 @@ func main() { ...@@ -17,49 +25,87 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("Failed to load config: %v", err) log.Fatalf("Failed to load config: %v", err)
} }
log.Printf("Config loaded. ChainID: %d, RPC: %s", cfg.Chain.ChainID, cfg.Chain.RPCURL)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // 2. 初始化日志
if err := logger.Init(cfg.Log.Level, cfg.Log.Development); err != nil {
log.Fatalf("Failed to initialize logger: %v", err)
}
defer logger.Sync()
logger.Log.Info("ChainSQL starting",
zap.Int64("chain_id", cfg.Chain.ChainID),
zap.String("rpc_url", cfg.Chain.RPCURL),
zap.String("log_level", cfg.Log.Level),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// 2. 连接数据库 // 3. 连接数据库
db, err := database.New(ctx, cfg.Database.DSN) db, err := database.New(ctx, cfg.Database.DSN)
if err != nil { if err != nil {
log.Fatalf("Failed to connect to database: %v", err) logger.Log.Fatal("Failed to connect to database", zap.Error(err))
} }
defer db.Close() defer db.Close()
log.Println("Database connected and schema initialized") logger.Log.Info("Database connected and schema initialized")
// 3. 连接区块链节点 // 4. 连接区块链节点
ethClient, err := blockchain.NewClient(cfg.Chain.RPCURL) ethClient, err := blockchain.NewClient(cfg.Chain.RPCURL)
if err != nil { if err != nil {
log.Fatalf("Failed to connect to blockchain: %v", err) logger.Log.Fatal("Failed to connect to blockchain", zap.Error(err))
} }
defer ethClient.Close() defer ethClient.Close()
// 测试节点连接 // 测试节点连接
blockNum, err := ethClient.BlockNumber(ctx) blockNum, err := ethClient.BlockNumber(ctx)
if err != nil { if err != nil {
log.Printf("Warning: Failed to get block number (is node running?): %v", err) logger.Log.Warn("Failed to get block number (is node running?)", zap.Error(err))
} else { } else {
log.Printf("Connected to Ethereum node. Current Block: %d", blockNum) logger.Log.Info("Connected to Ethereum node", zap.Uint64("current_block", blockNum))
} }
// 5. 启动 HTTP 服务器 (metrics, health checks)
httpServer := api.NewServer(cfg.HTTP.Port, db)
go func() {
if err := httpServer.Start(); err != nil {
logger.Log.Error("HTTP server error", zap.Error(err))
}
}()
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := httpServer.Shutdown(shutdownCtx); err != nil {
logger.Log.Error("HTTP server shutdown error", zap.Error(err))
}
}()
// 6. 启动 Factory Listener
factoryListener := core.NewFactoryListener(cfg, ethClient, db) factoryListener := core.NewFactoryListener(cfg, ethClient, db)
// 使用 goroutine 启动,保持 main 不退出
go factoryListener.Start(ctx) go factoryListener.Start(ctx)
// 5. 初始化 Contract Manager // 7. 初始化 Contract Manager
mgr := core.NewContractManager(db) mgr := core.NewContractManager(db)
go mgr.StartAutoReload(ctx, 30*time.Second) // 每30秒刷新一次列表 go mgr.StartAutoReload(ctx, 30*time.Second) // 每30秒刷新一次列表
// 6. 启动 Data Listener // 8. 启动 Data Listener
dataListener := core.NewDataListener(cfg, ethClient, db, mgr) dataListener := core.NewDataListener(cfg, ethClient, db, mgr)
go dataListener.Start(ctx) go dataListener.Start(ctx)
// 阻塞主线程,等待信号退出 // 9. 初始化监控指标
<-ctx.Done() metrics.ActiveContracts.Set(0) // 初始值
log.Println("Shutting down...")
logger.Log.Info("All services started successfully",
zap.Int("http_port", cfg.HTTP.Port),
)
// 10. 等待信号退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
logger.Log.Info("Shutting down gracefully...")
cancel()
time.Sleep(2 * time.Second) // 给各个 goroutine 一些时间清理
log.Println("System infrastructure initialization successful!") logger.Log.Info("ChainSQL stopped")
} }
...@@ -4,18 +4,27 @@ go 1.24.0 ...@@ -4,18 +4,27 @@ go 1.24.0
toolchain go1.24.11 toolchain go1.24.11
require (
github.com/ethereum/go-ethereum v1.16.8
github.com/jackc/pgx/v5 v5.8.0
github.com/prometheus/client_golang v1.20.5
github.com/spf13/viper v1.21.0
go.uber.org/zap v1.27.0
)
require ( require (
github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 // indirect github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect github.com/StackExchange/wmi v1.2.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.20.0 // indirect github.com/bits-and-blooms/bitset v1.20.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/consensys/gnark-crypto v0.18.0 // indirect github.com/consensys/gnark-crypto v0.18.0 // indirect
github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/ethereum/c-kzg-4844/v2 v2.1.5 // indirect github.com/ethereum/c-kzg-4844/v2 v2.1.5 // indirect
github.com/ethereum/go-ethereum v1.16.8 // indirect
github.com/ethereum/go-verkle v0.2.2 // indirect github.com/ethereum/go-verkle v0.2.2 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect
...@@ -24,23 +33,28 @@ require ( ...@@ -24,23 +33,28 @@ require (
github.com/holiman/uint256 v1.3.2 // indirect github.com/holiman/uint256 v1.3.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.8.0 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/sagikazarmark/locafero v0.11.0 // indirect github.com/sagikazarmark/locafero v0.11.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
github.com/spf13/afero v1.15.0 // indirect github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/cast v1.10.0 // indirect github.com/spf13/cast v1.10.0 // indirect
github.com/spf13/pflag v1.0.10 // indirect github.com/spf13/pflag v1.0.10 // indirect
github.com/spf13/viper v1.21.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect github.com/tklauser/numcpus v0.6.1 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.36.0 // indirect golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sync v0.17.0 // indirect golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.36.0 // indirect golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0 // indirect golang.org/x/text v0.29.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
) )
node_modules/
cache/
artifacts/
deployments/
coverage/
typechain/
typechain-types/
# Hardhat files
.env
.env.test
# IDE
.vscode/
.idea/
# Hardhat 测试环境
本目录包含 ChainSQL 项目的 Hardhat 测试环境。
## 🚀 快速开始
### 1. 安装依赖
```bash
cd hardhat
npm install
```
### 2. 启动本地节点
```bash
# Terminal 1
npm run node
```
这将启动一个本地以太坊节点在 `http://localhost:8545`
### 3. 部署合约
```bash
# Terminal 2
npm run deploy
```
部署信息会保存在 `deployments/localhost.json`
### 4. 更新 ChainSQL 配置
根据部署输出,更新 `../configs/config.yaml`
```yaml
chain:
rpc_url: "http://localhost:8545"
factory_address: "0x5FbDB2315678afecb367f032d93F642f64180aa3" # 使用实际地址
chain_id: 31337
```
### 5. 启动 ChainSQL
```bash
# Terminal 3
cd ..
make run
```
### 6. 运行集成测试
```bash
# Terminal 2
cd hardhat
node scripts/integration-test.js
```
## 📋 可用命令
```bash
npm run node # 启动本地节点
npm run compile # 编译合约
npm run test # 运行单元测试
npm run deploy # 部署合约
npm run clean # 清理编译产物
```
## 🧪 测试
### 运行单元测试
```bash
npm test
```
测试覆盖:
- ✅ Factory 合约功能
- ✅ 表创建和管理
- ✅ 数据插入、更新、删除
- ✅ 批量操作
- ✅ 权限控制
- ✅ 完整工作流
### 运行集成测试
```bash
node scripts/integration-test.js
```
这将执行一个完整的工作流,包括:
1. 创建表
2. 插入数据
3. 批量插入
4. 更新数据
5. 删除数据
6. 创建索引
## 📁 目录结构
```
hardhat/
├── contracts/ # Solidity 合约
│ ├── ISQLSync.sol
│ ├── SQLSync.sol
│ └── SQLSyncFactory.sol
├── scripts/ # 部署和测试脚本
│ ├── deploy.js
│ └── integration-test.js
├── test/ # 单元测试
│ └── ChainSQL.test.js
├── deployments/ # 部署信息(自动生成)
├── hardhat.config.js # Hardhat 配置
└── package.json # 依赖配置
```
## 🔧 配置
### Hardhat 网络配置
```javascript
// hardhat.config.js
networks: {
hardhat: {
chainId: 31337,
mining: {
auto: true, // 自动挖矿
interval: 0 // 立即出块
}
}
}
```
### 账户
Hardhat 默认提供 10 个测试账户,每个账户有 10000 ETH。
查看账户:
```bash
npx hardhat accounts
```
## 💡 使用技巧
### 1. 查看合约事件
```javascript
const receipt = await tx.wait();
console.log(receipt.events);
```
### 2. 使用 console.log 调试
在 Solidity 中:
```solidity
import "hardhat/console.sol";
console.log("Value:", value);
```
### 3. 时间旅行
```javascript
await network.provider.send("evm_increaseTime", [3600]); // 前进 1 小时
await network.provider.send("evm_mine"); // 挖一个块
```
### 4. 快照和恢复
```javascript
const snapshot = await network.provider.send("evm_snapshot");
// ... 执行操作
await network.provider.send("evm_revert", [snapshot]);
```
## 🐛 故障排查
### 问题:端口已被占用
```bash
# 查找占用端口的进程
lsof -i :8545
# 杀死进程
kill -9 <PID>
```
### 问题:合约部署失败
```bash
# 清理并重新编译
npm run clean
npm run compile
```
### 问题:测试失败
```bash
# 重启节点
# Ctrl+C 停止节点,然后重新启动
npm run node
```
## 📚 参考资料
- [Hardhat 文档](https://hardhat.org/docs)
- [Ethers.js 文档](https://docs.ethers.org/)
- [Chai 断言库](https://www.chaijs.com/)
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
interface ISQLSync {
struct ColumnDef {
string name;
string sqlType;
bool isPrimaryKey;
}
// --- Events ---
event TableCreated(string tableName, ColumnDef[] columns);
event TableDropped(string tableName);
event TableAltered(string tableName, string rawSql);
event DataInserted(
string indexed tableName,
string[] columns,
string[] values
);
event DataUpdated(
string indexed tableName,
string[] setColumns,
string[] setValues,
string whereClause
);
event DataDeleted(string indexed tableName, string whereClause);
// Advanced Events
event DataBatchInserted(
string indexed tableName,
string[] columns,
string[][] values
);
event DataUpserted(
string indexed tableName,
string[] columns,
string[] values,
string conflictColumn
);
event IndexCreated(
string indexed tableName,
string indexName,
string[] columns,
bool isUnique
);
event TableTruncated(string indexed tableName);
// --- DDL Functions ---
function createTable(
string calldata tableName,
ColumnDef[] calldata columns
) external;
function dropTable(string calldata tableName) external;
function alterTable(
string calldata tableName,
string calldata rawSql
) external;
function createIndex(
string calldata tableName,
string calldata indexName,
string[] calldata columns,
bool isUnique
) external;
function truncateTable(string calldata tableName) external;
// --- DML Functions ---
function insert(
string calldata tableName,
string[] calldata columns,
string[] calldata values
) external;
function update(
string calldata tableName,
string[] calldata setColumns,
string[] calldata setValues,
string calldata whereClause
) external;
function deleteData(
string calldata tableName,
string calldata whereClause
) external;
function batchInsert(
string calldata tableName,
string[] calldata columns,
string[] calldata flatValues
) external;
function upsert(
string calldata tableName,
string[] calldata columns,
string[] calldata values,
string calldata conflictColumn
) external;
// 移除 runRawSQL
}
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
import "./ISQLSync.sol";
contract SQLSync is ISQLSync {
address public owner;
modifier onlyOwner() {
require(msg.sender == owner, "Not owner");
_;
}
// 关键修改:接收 _owner 参数
constructor(address _owner) {
owner = _owner;
}
// --- DDL ---
function createTable(
string calldata tableName,
ColumnDef[] calldata columns
) external override onlyOwner {
emit TableCreated(tableName, columns);
}
function dropTable(string calldata tableName) external override onlyOwner {
emit TableDropped(tableName);
}
function alterTable(
string calldata tableName,
string calldata rawSql
) external override onlyOwner {
emit TableAltered(tableName, rawSql);
}
function createIndex(
string calldata tableName,
string calldata indexName,
string[] calldata columns,
bool isUnique
) external override onlyOwner {
emit IndexCreated(tableName, indexName, columns, isUnique);
}
function truncateTable(
string calldata tableName
) external override onlyOwner {
emit TableTruncated(tableName);
}
// --- DML ---
function insert(
string calldata tableName,
string[] calldata columns,
string[] calldata values
) external override onlyOwner {
emit DataInserted(tableName, columns, values);
}
function update(
string calldata tableName,
string[] calldata setColumns,
string[] calldata setValues,
string calldata whereClause
) external override onlyOwner {
emit DataUpdated(tableName, setColumns, setValues, whereClause);
}
function deleteData(
string calldata tableName,
string calldata whereClause
) external override onlyOwner {
emit DataDeleted(tableName, whereClause);
}
function batchInsert(
string calldata tableName,
string[] calldata columns,
string[] calldata flatValues
) external override onlyOwner {
// 这里只是为了演示接口,实际业务逻辑可以加参数校验
emit DataBatchInserted(
tableName,
columns,
_reconstructBatch(flatValues, columns.length)
);
}
function upsert(
string calldata tableName,
string[] calldata columns,
string[] calldata values,
string calldata conflictColumn
) external override onlyOwner {
emit DataUpserted(tableName, columns, values, conflictColumn);
}
// 辅助函数:将一维数组重构为二维数组以匹配 Event 定义 (Gas 消耗较高,生产环境建议直接传 bytes)
function _reconstructBatch(
string[] memory flatValues,
uint256 colCount
) internal pure returns (string[][] memory) {
require(colCount > 0, "Col count 0");
require(
flatValues.length % colCount == 0,
"Flat values length mismatch"
);
uint256 rowCount = flatValues.length / colCount;
string[][] memory result = new string[][](rowCount);
for (uint256 i = 0; i < rowCount; i++) {
string[] memory row = new string[](colCount);
for (uint256 j = 0; j < colCount; j++) {
row[j] = flatValues[i * colCount + j];
}
result[i] = row;
}
return result;
}
}
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
import "./ISQLSync.sol";
import "./SQLSync.sol"; // 假设这是你的具体实现文件
contract SQLSyncFactory {
// 记录所有创建出来的实例
mapping(address => bool) public isValidInstance;
address[] public allInstances;
event InstanceCreated(address indexed owner, address indexed instance);
/**
* @notice 创建一个新的 SQLSync 实例
* @return instance 新合约地址
*/
function createInstance() external returns (address instance) {
// 部署新的 SQLSync 合约
// 注意:SQLSync 的构造函数需要能够接收 owner,或者在这之后 transferOwnership
SQLSync newContract = new SQLSync(msg.sender);
instance = address(newContract);
isValidInstance[instance] = true;
allInstances.push(instance);
emit InstanceCreated(msg.sender, instance);
}
/**
* @notice 获取实例总数
*/
function getInstanceCount() external view returns (uint256) {
return allInstances.length;
}
}
require("@nomicfoundation/hardhat-toolbox");
/** @type import('hardhat/config').HardhatUserConfig */
module.exports = {
solidity: {
version: "0.8.20",
settings: {
optimizer: {
enabled: true,
runs: 200
}
}
},
networks: {
hardhat: {
chainId: 31337,
mining: {
auto: true,
interval: 0
},
accounts: {
count: 10,
accountsBalance: "10000000000000000000000" // 10000 ETH
}
},
localhost: {
url: "http://127.0.0.1:8545",
chainId: 31337
}
},
paths: {
sources: "./contracts",
tests: "./test",
cache: "./cache",
artifacts: "./artifacts"
}
};
This diff is collapsed.
{
"name": "chainsql-contracts",
"version": "1.0.0",
"description": "ChainSQL Smart Contracts",
"scripts": {
"compile": "hardhat compile",
"test": "hardhat test",
"deploy": "hardhat run scripts/deploy.js --network localhost",
"node": "hardhat node",
"clean": "hardhat clean"
},
"devDependencies": {
"@nomicfoundation/hardhat-toolbox": "^4.0.0",
"hardhat": "^2.19.0",
"ethers": "^6.4.0",
"chai": "^4.3.10"
}
}
\ No newline at end of file
const hre = require("hardhat");
async function main() {
console.log("🚀 Starting deployment...\n");
// 获取部署账户
const [deployer] = await hre.ethers.getSigners();
console.log("📝 Deploying contracts with account:", deployer.address);
const balance = await hre.ethers.provider.getBalance(deployer.address);
console.log("💰 Account balance:", hre.ethers.formatEther(balance), "ETH\n");
// 部署 SQLSyncFactory
console.log("📦 Deploying SQLSyncFactory...");
const SQLSyncFactory = await hre.ethers.getContractFactory("SQLSyncFactory");
const factory = await SQLSyncFactory.deploy();
await factory.waitForDeployment();
const factoryAddress = await factory.getAddress();
console.log("✅ SQLSyncFactory deployed to:", factoryAddress);
// 创建一个测试实例
console.log("\n🏭 Creating test instance...");
const tx = await factory.createInstance();
const receipt = await tx.wait();
// 从事件中获取实例地址 (ethers v6)
const event = receipt.logs.find(log => {
try {
const parsed = factory.interface.parseLog(log);
return parsed && parsed.name === "InstanceCreated";
} catch {
return false;
}
});
const parsedEvent = factory.interface.parseLog(event);
const instanceAddress = parsedEvent.args.instance;
console.log("✅ Test instance created at:", instanceAddress);
console.log("👤 Instance owner:", parsedEvent.args.owner);
// 保存部署信息
const network = await hre.ethers.provider.getNetwork();
const deploymentInfo = {
network: hre.network.name,
chainId: Number(network.chainId), // 转换 BigInt 为 Number
deployer: deployer.address,
contracts: {
SQLSyncFactory: factoryAddress,
testInstance: instanceAddress
},
timestamp: new Date().toISOString()
};
console.log("\n📄 Deployment Summary:");
console.log(JSON.stringify(deploymentInfo, null, 2));
// 保存到文件
const fs = require("fs");
const path = require("path");
const deploymentPath = path.join(__dirname, "../deployments");
if (!fs.existsSync(deploymentPath)) {
fs.mkdirSync(deploymentPath, { recursive: true });
}
fs.writeFileSync(
path.join(deploymentPath, `${hre.network.name}.json`),
JSON.stringify(deploymentInfo, null, 2)
);
console.log("\n✅ Deployment info saved to deployments/" + hre.network.name + ".json");
// 更新 ChainSQL 配置
console.log("\n📝 Update your ChainSQL config with:");
console.log(`
chain:
rpc_url: "http://localhost:8545"
factory_address: "${factoryAddress}"
chain_id: 31337
`);
}
main()
.then(() => process.exit(0))
.catch((error) => {
console.error(error);
process.exit(1);
});
const hre = require("hardhat");
const fs = require("fs");
const path = require("path");
async function main() {
console.log("🎯 ChainSQL Integration Test\n");
// 读取部署信息
const deploymentPath = path.join(__dirname, "../deployments/localhost.json");
if (!fs.existsSync(deploymentPath)) {
console.error("❌ No deployment found. Please run: npm run deploy");
process.exit(1);
}
const deployment = JSON.parse(fs.readFileSync(deploymentPath, "utf8"));
console.log("📄 Using deployment:", deployment.contracts);
const [owner] = await hre.ethers.getSigners();
const sqlSync = await hre.ethers.getContractAt("SQLSync", deployment.contracts.testInstance);
console.log("\n1️⃣ Creating table 'products'...");
let tx = await sqlSync.createTable("products", [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true },
{ name: "name", sqlType: "VARCHAR(100)", isPrimaryKey: false },
{ name: "price", sqlType: "DECIMAL(10,2)", isPrimaryKey: false },
{ name: "stock", sqlType: "INTEGER", isPrimaryKey: false },
{ name: "status", sqlType: "VARCHAR(20)", isPrimaryKey: false }
]);
await tx.wait();
console.log("✅ Table created", tx.hash);
console.log("\n2️⃣ Inserting product...");
tx = await sqlSync.insert("products",
["id", "name", "price", "stock", "status"],
["1", "Laptop", "999.99", "10", "available"]
);
await tx.wait();
console.log("✅ Product inserted");
console.log("\n3️⃣ Batch inserting products...");
tx = await sqlSync.batchInsert("products",
["id", "name", "price", "stock", "status"],
[
"2", "Mouse", "29.99", "50", "available",
"3", "Keyboard", "79.99", "30", "available",
"4", "Monitor", "299.99", "15", "available",
"5", "Headphones", "149.99", "0", "out_of_stock"
]
);
await tx.wait();
console.log("✅ Batch insert completed");
console.log("\n4️⃣ Updating product status...");
tx = await sqlSync.update("products",
["status"],
["low_stock"],
"stock < 20 AND status = 'available'"
);
await tx.wait();
console.log("✅ Product status updated");
console.log("\n5️⃣ Updating price...");
tx = await sqlSync.update("products",
["price"],
["899.99"],
"id = 1"
);
await tx.wait();
console.log("✅ Price updated");
console.log("\n6️⃣ Deleting out of stock products...");
tx = await sqlSync.deleteData("products", "status = 'out_of_stock'");
await tx.wait();
console.log("✅ Products deleted");
console.log("\n7️⃣ Creating index...");
tx = await sqlSync.createIndex("products", "idx_status", ["status"], false);
await tx.wait();
console.log("✅ Index created");
console.log("\n✅ Integration test completed!");
console.log("\n📊 Check your PostgreSQL database:");
console.log(` SELECT * FROM "${deployment.contracts.testInstance}".products;`);
console.log("\n💡 ChainSQL should have synced all these operations to the database.");
}
main()
.then(() => process.exit(0))
.catch((error) => {
console.error(error);
process.exit(1);
});
const { expect } = require("chai");
const { ethers } = require("hardhat");
describe("ChainSQL Contracts", function () {
let factory;
let sqlSync;
let owner;
let addr1;
let addr2;
beforeEach(async function () {
[owner, addr1, addr2] = await ethers.getSigners();
// 部署 Factory
const SQLSyncFactory = await ethers.getContractFactory("SQLSyncFactory");
factory = await SQLSyncFactory.deploy();
await factory.deployed();
// 创建实例
const tx = await factory.createInstance();
const receipt = await tx.wait();
const event = receipt.events?.find(e => e.event === "InstanceCreated");
const instanceAddress = event?.args?.instance;
// 获取实例合约
sqlSync = await ethers.getContractAt("SQLSync", instanceAddress);
});
describe("SQLSyncFactory", function () {
it("Should create a new instance", async function () {
const tx = await factory.createInstance();
const receipt = await tx.wait();
const event = receipt.events?.find(e => e.event === "InstanceCreated");
expect(event).to.not.be.undefined;
expect(event.args.owner).to.equal(owner.address);
expect(event.args.instance).to.be.properAddress;
});
it("Should track instance count", async function () {
const initialCount = await factory.getInstanceCount();
await factory.createInstance();
const newCount = await factory.getInstanceCount();
expect(newCount).to.equal(initialCount.add(1));
});
it("Should mark instances as valid", async function () {
const tx = await factory.createInstance();
const receipt = await tx.wait();
const event = receipt.events?.find(e => e.event === "InstanceCreated");
const instanceAddr = event.args.instance;
expect(await factory.isValidInstance(instanceAddr)).to.be.true;
});
});
describe("SQLSync - Table Operations", function () {
it("Should create a table", async function () {
const columns = [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true },
{ name: "name", sqlType: "VARCHAR(100)", isPrimaryKey: false },
{ name: "email", sqlType: "VARCHAR(255)", isPrimaryKey: false }
];
await expect(sqlSync.createTable("users", columns))
.to.emit(sqlSync, "TableCreated")
.withArgs("users", columns);
});
it("Should only allow owner to create tables", async function () {
const columns = [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true }
];
await expect(
sqlSync.connect(addr1).createTable("users", columns)
).to.be.revertedWith("Not owner");
});
});
describe("SQLSync - Data Operations", function () {
beforeEach(async function () {
// 先创建表
const columns = [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true },
{ name: "name", sqlType: "VARCHAR(100)", isPrimaryKey: false },
{ name: "status", sqlType: "VARCHAR(20)", isPrimaryKey: false }
];
await sqlSync.createTable("users", columns);
});
it("Should insert data", async function () {
const columns = ["id", "name", "status"];
const values = ["1", "Alice", "active"];
await expect(sqlSync.insert("users", columns, values))
.to.emit(sqlSync, "DataInserted")
.withArgs("users", columns, values);
});
it("Should update data", async function () {
const setColumns = ["status"];
const setValues = ["inactive"];
const whereClause = "id = 1";
await expect(sqlSync.update("users", setColumns, setValues, whereClause))
.to.emit(sqlSync, "DataUpdated")
.withArgs("users", setColumns, setValues, whereClause);
});
it("Should delete data", async function () {
const whereClause = "status = 'inactive'";
await expect(sqlSync.deleteData("users", whereClause))
.to.emit(sqlSync, "DataDeleted")
.withArgs("users", whereClause);
});
it("Should batch insert data", async function () {
const columns = ["id", "name", "status"];
const flatValues = [
"2", "Bob", "active",
"3", "Charlie", "active",
"4", "David", "pending"
];
await expect(sqlSync.batchInsert("users", columns, flatValues))
.to.emit(sqlSync, "DataBatchInserted");
});
it("Should only allow owner to modify data", async function () {
await expect(
sqlSync.connect(addr1).insert("users", ["id"], ["1"])
).to.be.revertedWith("Not owner");
});
});
describe("SQLSync - Advanced Operations", function () {
it("Should create index", async function () {
const columns = [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true },
{ name: "email", sqlType: "VARCHAR(255)", isPrimaryKey: false }
];
await sqlSync.createTable("users", columns);
await expect(
sqlSync.createIndex("users", "idx_email", ["email"], false)
).to.emit(sqlSync, "IndexCreated")
.withArgs("users", "idx_email", ["email"], false);
});
it("Should truncate table", async function () {
const columns = [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true }
];
await sqlSync.createTable("users", columns);
await expect(sqlSync.truncateTable("users"))
.to.emit(sqlSync, "TableTruncated")
.withArgs("users");
});
it("Should upsert data", async function () {
const columns = [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true },
{ name: "name", sqlType: "VARCHAR(100)", isPrimaryKey: false }
];
await sqlSync.createTable("users", columns);
const cols = ["id", "name"];
const vals = ["1", "Alice"];
const conflictCol = "id";
await expect(sqlSync.upsert("users", cols, vals, conflictCol))
.to.emit(sqlSync, "DataUpserted")
.withArgs("users", cols, vals, conflictCol);
});
});
describe("Integration Test", function () {
it("Should handle complete workflow", async function () {
// 1. 创建表
const columns = [
{ name: "id", sqlType: "INTEGER", isPrimaryKey: true },
{ name: "username", sqlType: "VARCHAR(50)", isPrimaryKey: false },
{ name: "email", sqlType: "VARCHAR(255)", isPrimaryKey: false },
{ name: "status", sqlType: "VARCHAR(20)", isPrimaryKey: false }
];
await sqlSync.createTable("users", columns);
// 2. 插入数据
await sqlSync.insert("users",
["id", "username", "email", "status"],
["1", "alice", "alice@example.com", "active"]
);
// 3. 批量插入
await sqlSync.batchInsert("users",
["id", "username", "email", "status"],
[
"2", "bob", "bob@example.com", "active",
"3", "charlie", "charlie@example.com", "pending"
]
);
// 4. 更新数据
await sqlSync.update("users",
["status"],
["verified"],
"id = 1"
);
// 5. 删除数据
await sqlSync.deleteData("users", "status = 'pending'");
// 验证所有操作都成功(通过事件)
// 实际测试中,ChainSQL 会监听这些事件并同步到数据库
});
});
});
package api
import (
"context"
"fmt"
"net/http"
"time"
"chain-sql/internal/database"
"chain-sql/internal/logger"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
type Server struct {
httpServer *http.Server
db *database.DB
}
func NewServer(port int, db *database.DB) *Server {
mux := http.NewServeMux()
// Prometheus metrics endpoint
mux.Handle("/metrics", promhttp.Handler())
// Health check endpoint
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
// Readiness check endpoint
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()
// Check database connection
if err := db.Ping(ctx); err != nil {
logger.Log.Error("Readiness check failed", zap.Error(err))
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Database not ready"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Ready"))
})
return &Server{
httpServer: &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
},
db: db,
}
}
func (s *Server) Start() error {
logger.Log.Info("Starting HTTP server", zap.String("addr", s.httpServer.Addr))
if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}
return nil
}
func (s *Server) Shutdown(ctx context.Context) error {
logger.Log.Info("Shutting down HTTP server")
return s.httpServer.Shutdown(ctx)
}
...@@ -11,6 +11,8 @@ type Config struct { ...@@ -11,6 +11,8 @@ type Config struct {
Chain ChainConfig `mapstructure:"chain"` Chain ChainConfig `mapstructure:"chain"`
Database DatabaseConfig `mapstructure:"database"` Database DatabaseConfig `mapstructure:"database"`
Sync SyncConfig `mapstructure:"sync"` Sync SyncConfig `mapstructure:"sync"`
Log LogConfig `mapstructure:"log"`
HTTP HTTPConfig `mapstructure:"http"`
} }
type ChainConfig struct { type ChainConfig struct {
...@@ -31,6 +33,15 @@ type SyncConfig struct { ...@@ -31,6 +33,15 @@ type SyncConfig struct {
AddressBatchSize int `mapstructure:"address_batch_size"` AddressBatchSize int `mapstructure:"address_batch_size"`
} }
type LogConfig struct {
Level string `mapstructure:"level"`
Development bool `mapstructure:"development"`
}
type HTTPConfig struct {
Port int `mapstructure:"port"`
}
func Load(path string) (*Config, error) { func Load(path string) (*Config, error) {
v := viper.New() v := viper.New()
v.SetConfigFile(path) v.SetConfigFile(path)
......
...@@ -39,6 +39,24 @@ type EvDataInserted struct { ...@@ -39,6 +39,24 @@ type EvDataInserted struct {
Values []string Values []string
} }
type EvDataUpdated struct {
TableName string
SetColumns []string
SetValues []string
WhereClause string
}
type EvDataDeleted struct {
TableName string
WhereClause string
}
type EvDataBatchInserted struct {
TableName string
Columns []string
Values [][]string
}
// --- Listener Implementation --- // --- Listener Implementation ---
type DataListener struct { type DataListener struct {
...@@ -217,6 +235,33 @@ func (l *DataListener) processLog(ctx context.Context, vLog types.Log) error { ...@@ -217,6 +235,33 @@ func (l *DataListener) processLog(ctx context.Context, vLog types.Log) error {
log.Printf("[DML] DataInserted: %s.%s", contractAddr, ev.TableName) log.Printf("[DML] DataInserted: %s.%s", contractAddr, ev.TableName)
return l.executor.HandleDataInserted(ctx, contractAddr, ev.TableName, ev.Columns, ev.Values) return l.executor.HandleDataInserted(ctx, contractAddr, ev.TableName, ev.Columns, ev.Values)
case "DataUpdated":
var ev EvDataUpdated
if err := l.contractABI.UnpackIntoInterface(&ev, event.Name, vLog.Data); err != nil {
return fmt.Errorf("unpack DataUpdated error: %w", err)
}
log.Printf("[DML] DataUpdated: %s.%s", contractAddr, ev.TableName)
return l.executor.HandleDataUpdated(ctx, contractAddr, ev.TableName, ev.SetColumns, ev.SetValues, ev.WhereClause)
case "DataDeleted":
var ev EvDataDeleted
if err := l.contractABI.UnpackIntoInterface(&ev, event.Name, vLog.Data); err != nil {
return fmt.Errorf("unpack DataDeleted error: %w", err)
}
log.Printf("[DML] DataDeleted: %s.%s", contractAddr, ev.TableName)
return l.executor.HandleDataDeleted(ctx, contractAddr, ev.TableName, ev.WhereClause)
case "DataBatchInserted":
var ev EvDataBatchInserted
if err := l.contractABI.UnpackIntoInterface(&ev, event.Name, vLog.Data); err != nil {
return fmt.Errorf("unpack DataBatchInserted error: %w", err)
}
log.Printf("[DML] DataBatchInserted: %s.%s (%d rows)", contractAddr, ev.TableName, len(ev.Values))
return l.executor.HandleBatchInserted(ctx, contractAddr, ev.TableName, ev.Columns, ev.Values)
default: default:
// 其他事件忽略 // 其他事件忽略
} }
......
...@@ -32,7 +32,6 @@ type FactoryListener struct { ...@@ -32,7 +32,6 @@ type FactoryListener struct {
} }
func NewFactoryListener(cfg *config.Config, client blockchain.Client, db *database.DB) *FactoryListener { func NewFactoryListener(cfg *config.Config, client blockchain.Client, db *database.DB) *FactoryListener {
// 手动定义 ABI,或者使用 abigen 生成的代码。这里为了演示独立性手动定义。
// Event: InstanceCreated(address indexed owner, address indexed instance) // Event: InstanceCreated(address indexed owner, address indexed instance)
eventSig := crypto.Keccak256Hash([]byte("InstanceCreated(address,address)")) eventSig := crypto.Keccak256Hash([]byte("InstanceCreated(address,address)"))
...@@ -108,9 +107,13 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error { ...@@ -108,9 +107,13 @@ func (l *FactoryListener) syncOnce(ctx context.Context) error {
return fmt.Errorf("filter logs failed: %w", err) return fmt.Errorf("filter logs failed: %w", err)
} }
log.Printf("Found %d logs from Factory", len(logs))
// 6. 处理日志 // 6. 处理日志
for _, vLog := range logs { for _, vLog := range logs {
log.Printf("Processing log with %d topics", len(vLog.Topics))
if len(vLog.Topics) < 3 { if len(vLog.Topics) < 3 {
log.Printf("Skipping log: not enough topics (need 3, got %d)", len(vLog.Topics))
continue // 异常日志 continue // 异常日志
} }
// topic[0] is signature, topic[1] is owner (indexed), topic[2] is instance (indexed) // topic[0] is signature, topic[1] is owner (indexed), topic[2] is instance (indexed)
......
...@@ -18,6 +18,12 @@ type EventHandler interface { ...@@ -18,6 +18,12 @@ type EventHandler interface {
}) error }) error
HandleDataInserted(ctx context.Context, contractAddr string, tableName string, columns []string, values []string) error HandleDataInserted(ctx context.Context, contractAddr string, tableName string, columns []string, values []string) error
HandleDataUpdated(ctx context.Context, contractAddr string, tableName string, setColumns []string, setValues []string, whereClause string) error
HandleDataDeleted(ctx context.Context, contractAddr string, tableName string, whereClause string) error
HandleBatchInserted(ctx context.Context, contractAddr string, tableName string, columns []string, rows [][]string) error
} }
// ContractProvider 定义了获取目标合约列表的接口 // ContractProvider 定义了获取目标合约列表的接口
......
...@@ -40,6 +40,11 @@ func (d *DB) Close() { ...@@ -40,6 +40,11 @@ func (d *DB) Close() {
d.pool.Close() d.pool.Close()
} }
// Ping 检查数据库连接
func (d *DB) Ping(ctx context.Context) error {
return d.pool.Ping(ctx)
}
// Exec 执行通用 SQL // Exec 执行通用 SQL
func (d *DB) Exec(ctx context.Context, sql string, args ...interface{}) error { func (d *DB) Exec(ctx context.Context, sql string, args ...interface{}) error {
_, err := d.pool.Exec(ctx, sql, args...) _, err := d.pool.Exec(ctx, sql, args...)
......
...@@ -120,3 +120,129 @@ func (e *Executor) HandleDataInserted(ctx context.Context, contractAddr string, ...@@ -120,3 +120,129 @@ func (e *Executor) HandleDataInserted(ctx context.Context, contractAddr string,
func QuoteIdentifier(s string) string { func QuoteIdentifier(s string) string {
return fmt.Sprintf(`"%s"`, strings.ReplaceAll(s, `"`, `""`)) return fmt.Sprintf(`"%s"`, strings.ReplaceAll(s, `"`, `""`))
} }
// HandleDataUpdated 处理更新事件
func (e *Executor) HandleDataUpdated(ctx context.Context, contractAddr string, tableName string, setColumns []string, setValues []string, whereClause string) error {
if len(setColumns) != len(setValues) {
return fmt.Errorf("columns and values length mismatch: %d vs %d", len(setColumns), len(setValues))
}
schemaName := QuoteIdentifier(contractAddr)
safeTableName := QuoteIdentifier(tableName)
// 构建 SET 子句
var setParts []string
args := make([]interface{}, 0, len(setValues))
for i, col := range setColumns {
setParts = append(setParts, fmt.Sprintf("%s = $%d", QuoteIdentifier(col), i+1))
args = append(args, setValues[i])
}
// 解析 WHERE 子句
// 首先需要获取表的列信息来构建白名单
// 为了简化,这里使用所有 setColumns 作为允许的列(实际应该查询表结构)
allowedCols := append([]string{}, setColumns...)
parser := NewWhereParser(allowedCols)
whereCond, err := parser.Parse(whereClause)
if err != nil {
return fmt.Errorf("parse WHERE clause failed: %w", err)
}
// 调整参数占位符编号
adjustedWhereSQL := whereCond.SQL
for i := len(args); i > 0; i-- {
placeholder := fmt.Sprintf("$%d", i-len(args)+len(setValues))
adjustedWhereSQL = strings.Replace(adjustedWhereSQL, fmt.Sprintf("$%d", i), placeholder, -1)
}
// 追加 WHERE 参数
args = append(args, whereCond.Args...)
sql := fmt.Sprintf(
`UPDATE %s.%s SET %s WHERE %s`,
schemaName,
safeTableName,
strings.Join(setParts, ", "),
adjustedWhereSQL,
)
log.Printf("Executing UPDATE: %s", sql)
return e.db.Exec(ctx, sql, args...)
}
// HandleDataDeleted 处理删除事件
func (e *Executor) HandleDataDeleted(ctx context.Context, contractAddr string, tableName string, whereClause string) error {
schemaName := QuoteIdentifier(contractAddr)
safeTableName := QuoteIdentifier(tableName)
// 解析 WHERE 子句
// 注意:这里需要表的列信息,简化处理,允许常见列名
// 生产环境应该查询 information_schema 获取实际列
parser := NewWhereParser([]string{"id", "name", "status", "email", "created_at", "updated_at"})
whereCond, err := parser.Parse(whereClause)
if err != nil {
return fmt.Errorf("parse WHERE clause failed: %w", err)
}
if whereCond.SQL == "" {
return fmt.Errorf("DELETE without WHERE clause is not allowed for safety")
}
sql := fmt.Sprintf(
`DELETE FROM %s.%s WHERE %s`,
schemaName,
safeTableName,
whereCond.SQL,
)
log.Printf("Executing DELETE: %s", sql)
return e.db.Exec(ctx, sql, whereCond.Args...)
}
// HandleBatchInserted 处理批量插入事件
func (e *Executor) HandleBatchInserted(ctx context.Context, contractAddr string, tableName string, columns []string, rows [][]string) error {
if len(rows) == 0 {
return nil
}
schemaName := QuoteIdentifier(contractAddr)
safeTableName := QuoteIdentifier(tableName)
safeCols := make([]string, len(columns))
for i, c := range columns {
safeCols[i] = QuoteIdentifier(c)
}
// 构建批量插入 SQL
var valueParts []string
args := make([]interface{}, 0, len(rows)*len(columns))
paramIndex := 1
for _, row := range rows {
if len(row) != len(columns) {
return fmt.Errorf("row length mismatch: expected %d, got %d", len(columns), len(row))
}
placeholders := make([]string, len(row))
for i, val := range row {
placeholders[i] = fmt.Sprintf("$%d", paramIndex)
args = append(args, val)
paramIndex++
}
valueParts = append(valueParts, fmt.Sprintf("(%s)", strings.Join(placeholders, ", ")))
}
sql := fmt.Sprintf(
`INSERT INTO %s.%s (%s) VALUES %s`,
schemaName,
safeTableName,
strings.Join(safeCols, ", "),
strings.Join(valueParts, ", "),
)
log.Printf("Executing BATCH INSERT: %d rows into %s.%s", len(rows), contractAddr, tableName)
return e.db.Exec(ctx, sql, args...)
}
package executor
import (
"fmt"
"regexp"
"strings"
)
// WhereCondition 表示解析后的 WHERE 条件
type WhereCondition struct {
SQL string // 参数化的 SQL 片段
Args []interface{} // 参数值列表
}
// WhereParser 安全的 WHERE 子句解析器
type WhereParser struct {
allowedColumns map[string]bool // 允许的列名白名单
paramCounter int // 参数计数器
}
// NewWhereParser 创建新的解析器
func NewWhereParser(allowedColumns []string) *WhereParser {
colMap := make(map[string]bool)
for _, col := range allowedColumns {
colMap[strings.ToLower(col)] = true
}
return &WhereParser{
allowedColumns: colMap,
paramCounter: 1,
}
}
// 支持的操作符
var allowedOperators = map[string]bool{
"=": true,
"!=": true,
"<>": true,
">": true,
"<": true,
">=": true,
"<=": true,
"LIKE": true,
"ILIKE": true,
"IN": true,
"NOT IN": true,
"IS NULL": true,
"IS NOT NULL": true,
}
// Parse 解析 WHERE 子句
// 输入格式示例:
//
// "id = 1"
// "name = 'John' AND age > 18"
// "status IN ('active', 'pending')"
// "email IS NOT NULL"
func (p *WhereParser) Parse(whereClause string) (*WhereCondition, error) {
if strings.TrimSpace(whereClause) == "" {
return &WhereCondition{SQL: "", Args: []interface{}{}}, nil
}
// 简单的词法分析:按 AND/OR 分割
tokens := p.tokenize(whereClause)
var sqlParts []string
var args []interface{}
for _, token := range tokens {
token = strings.TrimSpace(token)
// 处理逻辑操作符
upperToken := strings.ToUpper(token)
if upperToken == "AND" || upperToken == "OR" {
sqlParts = append(sqlParts, upperToken)
continue
}
// 处理括号
if token == "(" || token == ")" {
sqlParts = append(sqlParts, token)
continue
}
// 解析单个条件
condSQL, condArgs, err := p.parseCondition(token)
if err != nil {
return nil, fmt.Errorf("parse condition '%s' failed: %w", token, err)
}
sqlParts = append(sqlParts, condSQL)
args = append(args, condArgs...)
}
return &WhereCondition{
SQL: strings.Join(sqlParts, " "),
Args: args,
}, nil
}
// tokenize 简单的词法分析
func (p *WhereParser) tokenize(input string) []string {
var tokens []string
var current strings.Builder
inQuote := false
quoteChar := rune(0)
parenDepth := 0
for i, ch := range input {
switch {
case (ch == '\'' || ch == '"') && (i == 0 || input[i-1] != '\\'):
if !inQuote {
inQuote = true
quoteChar = ch
} else if ch == quoteChar {
inQuote = false
quoteChar = 0
}
current.WriteRune(ch)
case ch == '(' && !inQuote:
if current.Len() > 0 {
tokens = append(tokens, current.String())
current.Reset()
}
tokens = append(tokens, "(")
parenDepth++
case ch == ')' && !inQuote:
if current.Len() > 0 {
tokens = append(tokens, current.String())
current.Reset()
}
tokens = append(tokens, ")")
parenDepth--
case ch == ' ' && !inQuote && parenDepth == 0:
if current.Len() > 0 {
word := current.String()
upperWord := strings.ToUpper(word)
// 检查是否是逻辑操作符
if upperWord == "AND" || upperWord == "OR" {
tokens = append(tokens, word)
current.Reset()
} else {
current.WriteRune(ch)
}
}
default:
current.WriteRune(ch)
}
}
if current.Len() > 0 {
tokens = append(tokens, current.String())
}
return tokens
}
// parseCondition 解析单个条件表达式
// 例如: "id = 1", "name LIKE 'John%'", "status IN ('a', 'b')"
func (p *WhereParser) parseCondition(condition string) (string, []interface{}, error) {
condition = strings.TrimSpace(condition)
// 处理 IS NULL / IS NOT NULL
if matched, _ := regexp.MatchString(`(?i)\s+IS\s+NOT\s+NULL\s*$`, condition); matched {
column := regexp.MustCompile(`(?i)\s+IS\s+NOT\s+NULL\s*$`).ReplaceAllString(condition, "")
column = strings.TrimSpace(column)
if err := p.validateColumn(column); err != nil {
return "", nil, err
}
return fmt.Sprintf("%s IS NOT NULL", QuoteIdentifier(column)), []interface{}{}, nil
}
if matched, _ := regexp.MatchString(`(?i)\s+IS\s+NULL\s*$`, condition); matched {
column := regexp.MustCompile(`(?i)\s+IS\s+NULL\s*$`).ReplaceAllString(condition, "")
column = strings.TrimSpace(column)
if err := p.validateColumn(column); err != nil {
return "", nil, err
}
return fmt.Sprintf("%s IS NULL", QuoteIdentifier(column)), []interface{}{}, nil
}
// 处理 IN / NOT IN
inPattern := regexp.MustCompile(`(?i)^(\w+)\s+(NOT\s+)?IN\s*\((.+)\)$`)
if matches := inPattern.FindStringSubmatch(condition); matches != nil {
column := matches[1]
notIn := strings.TrimSpace(matches[2]) != ""
valuesPart := matches[3]
if err := p.validateColumn(column); err != nil {
return "", nil, err
}
// 解析 IN 列表中的值
values := p.parseInValues(valuesPart)
if len(values) == 0 {
return "", nil, fmt.Errorf("empty IN clause")
}
placeholders := make([]string, len(values))
args := make([]interface{}, len(values))
for i, v := range values {
placeholders[i] = fmt.Sprintf("$%d", p.paramCounter)
args[i] = v
p.paramCounter++
}
operator := "IN"
if notIn {
operator = "NOT IN"
}
sql := fmt.Sprintf("%s %s (%s)", QuoteIdentifier(column), operator, strings.Join(placeholders, ", "))
return sql, args, nil
}
// 处理标准比较操作符
for op := range allowedOperators {
if op == "IN" || op == "NOT IN" || op == "IS NULL" || op == "IS NOT NULL" {
continue // 已经处理过
}
pattern := regexp.MustCompile(fmt.Sprintf(`(?i)^(\w+)\s+%s\s+(.+)$`, regexp.QuoteMeta(op)))
if matches := pattern.FindStringSubmatch(condition); matches != nil {
column := matches[1]
value := strings.TrimSpace(matches[2])
if err := p.validateColumn(column); err != nil {
return "", nil, err
}
// 去除值的引号
value = p.unquoteValue(value)
sql := fmt.Sprintf("%s %s $%d", QuoteIdentifier(column), strings.ToUpper(op), p.paramCounter)
p.paramCounter++
return sql, []interface{}{value}, nil
}
}
return "", nil, fmt.Errorf("invalid condition format: %s", condition)
}
// validateColumn 验证列名是否在白名单中
func (p *WhereParser) validateColumn(column string) error {
column = strings.ToLower(strings.TrimSpace(column))
if !p.allowedColumns[column] {
return fmt.Errorf("column '%s' not allowed in WHERE clause", column)
}
return nil
}
// parseInValues 解析 IN 子句中的值列表
func (p *WhereParser) parseInValues(valuesPart string) []string {
var values []string
var current strings.Builder
inQuote := false
quoteChar := rune(0)
for i, ch := range valuesPart {
switch {
case (ch == '\'' || ch == '"') && (i == 0 || valuesPart[i-1] != '\\'):
if !inQuote {
inQuote = true
quoteChar = ch
} else if ch == quoteChar {
inQuote = false
quoteChar = 0
}
current.WriteRune(ch)
case ch == ',' && !inQuote:
if current.Len() > 0 {
values = append(values, p.unquoteValue(strings.TrimSpace(current.String())))
current.Reset()
}
default:
current.WriteRune(ch)
}
}
if current.Len() > 0 {
values = append(values, p.unquoteValue(strings.TrimSpace(current.String())))
}
return values
}
// unquoteValue 去除值的引号
func (p *WhereParser) unquoteValue(value string) string {
value = strings.TrimSpace(value)
if len(value) >= 2 {
if (value[0] == '\'' && value[len(value)-1] == '\'') ||
(value[0] == '"' && value[len(value)-1] == '"') {
return value[1 : len(value)-1]
}
}
return value
}
package logger
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var Log *zap.Logger
// Init 初始化全局日志器
func Init(level string, development bool) error {
var config zap.Config
if development {
config = zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
} else {
config = zap.NewProductionConfig()
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}
// 解析日志级别
var zapLevel zapcore.Level
if err := zapLevel.UnmarshalText([]byte(level)); err != nil {
zapLevel = zapcore.InfoLevel
}
config.Level = zap.NewAtomicLevelAt(zapLevel)
logger, err := config.Build(
zap.AddCallerSkip(0),
zap.AddStacktrace(zapcore.ErrorLevel),
)
if err != nil {
return err
}
Log = logger
return nil
}
// Sync 刷新日志缓冲区
func Sync() {
if Log != nil {
_ = Log.Sync()
}
}
// WithContext 创建带上下文字段的日志器
func WithContext(fields ...zap.Field) *zap.Logger {
if Log == nil {
return zap.NewNop()
}
return Log.With(fields...)
}
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// EventsProcessed 事件处理计数器
EventsProcessed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "chainsql_events_processed_total",
Help: "Total number of blockchain events processed",
},
[]string{"event_type", "status"}, // status: success, error
)
// EventProcessingDuration 事件处理延迟
EventProcessingDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "chainsql_event_processing_duration_seconds",
Help: "Duration of event processing in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"event_type"},
)
// SyncBlockHeight 同步区块高度
SyncBlockHeight = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "chainsql_sync_block_height",
Help: "Current synced block height",
},
[]string{"listener_type"}, // factory, data
)
// ActiveContracts 活跃合约数量
ActiveContracts = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "chainsql_active_contracts",
Help: "Number of active contracts being monitored",
},
)
// DatabaseOperations 数据库操作计数器
DatabaseOperations = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "chainsql_database_operations_total",
Help: "Total number of database operations",
},
[]string{"operation", "status"}, // operation: insert, update, delete, create_table
)
// DatabaseOperationDuration 数据库操作延迟
DatabaseOperationDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "chainsql_database_operation_duration_seconds",
Help: "Duration of database operations in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"operation"},
)
// BlockchainRPCCalls RPC 调用计数器
BlockchainRPCCalls = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "chainsql_blockchain_rpc_calls_total",
Help: "Total number of blockchain RPC calls",
},
[]string{"method", "status"},
)
// BlockchainRPCDuration RPC 调用延迟
BlockchainRPCDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "chainsql_blockchain_rpc_duration_seconds",
Help: "Duration of blockchain RPC calls in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method"},
)
)
// RecordEventProcessed 记录事件处理结果
func RecordEventProcessed(eventType string, success bool) {
status := "success"
if !success {
status = "error"
}
EventsProcessed.WithLabelValues(eventType, status).Inc()
}
// RecordDatabaseOperation 记录数据库操作结果
func RecordDatabaseOperation(operation string, success bool) {
status := "success"
if !success {
status = "error"
}
DatabaseOperations.WithLabelValues(operation, status).Inc()
}
// RecordRPCCall 记录 RPC 调用结果
func RecordRPCCall(method string, success bool) {
status := "success"
if !success {
status = "error"
}
BlockchainRPCCalls.WithLabelValues(method, status).Inc()
}
#!/bin/bash
# ChainSQL 手动启动指南
# 如果自动脚本有问题,请按照以下步骤手动启动
echo "📚 ChainSQL 手动启动步骤"
echo "========================"
echo ""
echo "步骤 1: 启动 PostgreSQL"
echo "----------------------"
echo "docker-compose up -d postgres"
echo ""
echo "等待 5 秒让数据库启动..."
echo ""
echo "步骤 2: 安装 Hardhat 依赖(首次运行)"
echo "-----------------------------------"
echo "cd hardhat"
echo "npm install"
echo "cd .."
echo ""
echo "步骤 3: 启动 Hardhat 节点(新终端)"
echo "--------------------------------"
echo "cd hardhat"
echo "npx hardhat node"
echo ""
echo "保持这个终端运行!"
echo ""
echo "步骤 4: 部署合约(新终端)"
echo "----------------------"
echo "cd hardhat"
echo "npm run deploy"
echo ""
echo "记录输出的 factory_address"
echo ""
echo "步骤 5: 更新配置"
echo "---------------"
echo "编辑 configs/config.yaml,填入:"
echo " factory_address: \"<从步骤4获取的地址>\""
echo ""
echo "步骤 6: 启动 ChainSQL(新终端)"
echo "----------------------------"
echo "make build"
echo "./bin/chainsql"
echo ""
echo "步骤 7: 运行集成测试(新终端)"
echo "----------------------------"
echo "cd hardhat"
echo "node scripts/integration-test.js"
echo ""
echo "✅ 完成!"
echo ""
echo "查看服务状态:"
echo " curl http://localhost:8080/health"
echo " curl http://localhost:8080/metrics"
echo ""
#!/bin/bash
# ChainSQL 快速启动脚本
set -e
echo "🚀 ChainSQL Quick Start Script"
echo "================================"
echo ""
# 检查依赖
check_dependency() {
if ! command -v $1 &> /dev/null; then
echo "❌ $1 is not installed"
echo "Please install $1 first"
exit 1
fi
}
echo "📋 Checking dependencies..."
check_dependency "node"
check_dependency "npm"
check_dependency "go"
check_dependency "docker"
echo "✅ All dependencies found"
echo ""
# 1. 安装 Hardhat 依赖
if [ ! -d "hardhat/node_modules" ]; then
echo "📦 Installing Hardhat dependencies..."
cd hardhat
npm install
cd ..
echo "✅ Hardhat dependencies installed"
else
echo "✅ Hardhat dependencies already installed"
fi
echo ""
# 2. 启动 PostgreSQL(如果使用 Docker)
echo "🐘 Checking PostgreSQL..."
if docker ps | grep -q chainsql-db; then
echo "✅ PostgreSQL is already running"
else
echo "Starting PostgreSQL with Docker Compose..."
docker-compose up -d postgres
echo "Waiting for PostgreSQL to be ready..."
sleep 5
echo "✅ PostgreSQL started"
fi
echo ""
# # 3. 启动 Hardhat 节点(后台)
# echo "⛓️ Starting Hardhat node..."
# cd hardhat
# npx hardhat node > ../hardhat-node.log 2>&1 &
# HARDHAT_PID=$!
# echo $HARDHAT_PID > ../hardhat.pid
# cd ..
# echo "✅ Hardhat node started (PID: $HARDHAT_PID)"
# echo " Logs: hardhat-node.log"
# echo " Waiting for node to be ready..."
# sleep 3
# echo ""
# 4. 部署合约
echo "📝 Deploying contracts..."
cd hardhat
npm run deploy > ../deploy.log 2>&1
cd ..
echo "✅ Contracts deployed"
echo " Deployment info: hardhat/deployments/localhost.json"
echo ""
# 5. 更新 ChainSQL 配置
echo "⚙️ Updating ChainSQL configuration..."
FACTORY_ADDRESS=$(cat hardhat/deployments/localhost.json | grep -o '"SQLSyncFactory": "[^"]*"' | cut -d'"' -f4)
cat > configs/config.yaml << EOF
chain:
rpc_url: "http://localhost:8545"
factory_address: "$FACTORY_ADDRESS"
chain_id: 31337
database:
dsn: "postgres://chainsql:password123@localhost:5432/chainsql?sslmode=disable"
sync:
start_block: 0
confirmations: 1
poll_interval: "1s"
max_batch_size: 100
address_batch_size: 50
log:
level: "debug"
development: true
http:
port: 8080
EOF
echo "✅ Configuration updated"
echo ""
# # 6. 启动 ChainSQL
# echo "🔄 Starting ChainSQL..."
# make build
# ./bin/chainsql > chainsql.log 2>&1 &
# CHAINSQL_PID=$!
# echo $CHAINSQL_PID > chainsql.pid
# echo "✅ ChainSQL started (PID: $CHAINSQL_PID)"
# echo " Logs: chainsql.log"
# echo ""
# 7. 等待服务就绪
echo "⏳ Waiting for services to be ready..."
sleep 3
# 检查健康状态
if curl -s http://localhost:8080/health > /dev/null; then
echo "✅ ChainSQL is healthy"
else
echo "⚠️ ChainSQL health check failed, check logs"
fi
echo ""
# 8. 显示状态
echo "================================"
echo "🎉 All services started!"
echo ""
echo "📊 Service Status:"
echo " - Hardhat Node: http://localhost:8545"
echo " - ChainSQL HTTP: http://localhost:8080"
echo " - PostgreSQL: localhost:5432"
echo ""
echo "📝 Useful Commands:"
echo " - View ChainSQL logs: tail -f chainsql.log"
echo " - View Hardhat logs: tail -f hardhat-node.log"
echo " - Check metrics: curl http://localhost:8080/metrics"
echo " - Stop all services: ./scripts/stop.sh"
echo ""
echo "🧪 Run Integration Test:"
echo " cd hardhat && node scripts/integration-test.js"
echo ""
echo "🗄️ Query Database:"
echo " docker exec -it chainsql-db psql -U chainsql -d chainsql -c \"SELECT * FROM _chainsql_instances;\""
echo ""
#!/bin/bash
# ChainSQL 数据库重置脚本
# 当重启 Hardhat 节点后,需要重置同步游标
echo "🔄 Resetting ChainSQL database cursors..."
# 重置游标
docker exec chainsql-db psql -U chainsql -d chainsql -c "DELETE FROM _chainsql_cursor;"
docker exec chainsql-db psql -U chainsql -d chainsql -c "DELETE FROM _chainsql_instances;"
echo "✅ Database reset complete!"
echo ""
echo "📊 Current status:"
docker exec chainsql-db psql -U chainsql -d chainsql -c "SELECT * FROM _chainsql_cursor;"
docker exec chainsql-db psql -U chainsql -d chainsql -c "SELECT * FROM _chainsql_instances;"
echo ""
echo "现在可以重新启动 ChainSQL 了:"
echo " go run cmd/main.go"
#!/bin/bash
# ChainSQL 停止脚本
echo "🛑 Stopping ChainSQL services..."
# 停止 ChainSQL
if [ -f chainsql.pid ]; then
PID=$(cat chainsql.pid)
if ps -p $PID > /dev/null; then
kill $PID
echo "✅ ChainSQL stopped (PID: $PID)"
fi
rm chainsql.pid
fi
# 停止 Hardhat
if [ -f hardhat.pid ]; then
PID=$(cat hardhat.pid)
if ps -p $PID > /dev/null; then
kill $PID
echo "✅ Hardhat node stopped (PID: $PID)"
fi
rm hardhat.pid
fi
# 停止 Docker 服务(可选)
read -p "Stop PostgreSQL (Docker)? (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
docker-compose down
echo "✅ PostgreSQL stopped"
fi
echo "✅ All services stopped"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment