Commit e7474c0f authored by duanjinfei's avatar duanjinfei

init commit

parents
Pipeline #919 canceled with stages
# Binaries
# 忽略编译生成的二进制文件
bin/
chain-sql
main
# Object files and caches
# 忽略临时对象文件和缓存
*.o
*.a
*.so
*.dylib
*.dll
# Go module
# 忽略 vendor 目录(除非你明确使用 vendor模式)
vendor/
# IDE specific files
# 忽略 VSCode, JetBrains 等 IDE 的配置文件
.vscode/
.idea/
*.iml
*.swp
*.swo
.DS_Store
# Local configuration
# 忽略本地的配置文件(包含私钥或数据库密码),只提交 config.example.yaml
configs/config.yaml
!configs/config.example.yaml
# Logs
# 忽略运行日志
*.log
logs/
# Docker
# 忽略 Docker 构建产物
docker-compose.override.yml
# Solidity / Hardhat / Truffle
# 忽略合约编译产物
artifacts/
cache/
build/
node_modules/
npm-debug.log
yarn-error.log
# Cursor specific
.cursor/
go.sum
\ No newline at end of file
# --- Build Stage ---
FROM golang:1.22-alpine AS builder
# 安装构建依赖 (git 等)
RUN apk add --no-cache git
WORKDIR /app
# 预先下载依赖,利用 Docker 缓存层
COPY go.mod go.sum ./
RUN go mod download
# 复制源代码
COPY . .
# 编译 (CGO_ENABLED=0 静态链接)
RUN CGO_ENABLED=0 GOOS=linux go build -o chainsql ./cmd/chainsql/main.go
# --- Final Stage ---
FROM alpine:3.19
WORKDIR /app
# 安装必要的运行时库 (CA 证书用于 HTTPS 请求)
RUN apk add --no-cache ca-certificates tzdata
# 从 Build 阶段复制二进制文件
COPY --from=builder /app/chainsql .
# 复制配置文件 (注意:生产环境通常挂载 ConfigMap,这里作为默认值)
COPY configs/ ./configs/
# 暴露端口 (如果有 HTTP 服务的话)
# EXPOSE 8080
# 运行
CMD ["./chainsql"]
\ No newline at end of file
# 项目变量
BINARY_NAME=chainsql
DOCKER_IMAGE=chainsql-service
VERSION=0.1.0
# 默认目标
all: build
# 编译 Go 二进制文件
build:
@echo "Building binary..."
@go build -o bin/$(BINARY_NAME) ./cmd/chainsql/main.go
# 运行服务 (本地开发)
run: build
@echo "Running service..."
@./bin/$(BINARY_NAME)
# 整理依赖
tidy:
@go mod tidy
# 清理构建产物
clean:
@echo "Cleaning..."
@rm -rf bin/
# Docker 构建
docker-build:
@echo "Building Docker image..."
@docker build -t $(DOCKER_IMAGE):$(VERSION) .
@docker tag $(DOCKER_IMAGE):$(VERSION) $(DOCKER_IMAGE):latest
# Docker Compose 启动 (需先安装 docker-compose)
docker-up:
@echo "Starting services with Docker Compose..."
@docker-compose up -d
docker-down:
@echo "Stopping services..."
@docker-compose down
# 帮助信息
help:
@echo "Available commands:"
@echo " make build - Build the binary"
@echo " make run - Build and run locally"
@echo " make tidy - Run go mod tidy"
@echo " make clean - Remove binary"
@echo " make docker-build - Build Docker image"
@echo " make docker-up - Start services (DB + App)"
@echo " make docker-down - Stop services"
\ No newline at end of file
package main
import (
"context"
"log"
"time"
"chain-sql/internal/blockchain"
"chain-sql/internal/config"
"chain-sql/internal/core"
"chain-sql/internal/database"
)
func main() {
// 1. 加载配置
cfg, err := config.Load("configs/config.yaml")
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
log.Printf("Config loaded. ChainID: %d, RPC: %s", cfg.Chain.ChainID, cfg.Chain.RPCURL)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 2. 连接数据库
db, err := database.New(ctx, cfg.Database.DSN)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
log.Println("Database connected and schema initialized")
// 3. 连接区块链节点
ethClient, err := blockchain.NewClient(cfg.Chain.RPCURL)
if err != nil {
log.Fatalf("Failed to connect to blockchain: %v", err)
}
defer ethClient.Close()
// 测试节点连接
blockNum, err := ethClient.BlockNumber(ctx)
if err != nil {
log.Printf("Warning: Failed to get block number (is node running?): %v", err)
} else {
log.Printf("Connected to Ethereum node. Current Block: %d", blockNum)
}
factoryListener := core.NewFactoryListener(cfg, ethClient, db)
// 使用 goroutine 启动,保持 main 不退出
go factoryListener.Start(ctx)
// 5. 初始化 Contract Manager
mgr := core.NewContractManager(db)
go mgr.StartAutoReload(ctx, 30*time.Second) // 每30秒刷新一次列表
// 6. 启动 Data Listener
dataListener := core.NewDataListener(cfg, ethClient, db, mgr)
go dataListener.Start(ctx)
// 阻塞主线程,等待信号退出
<-ctx.Done()
log.Println("Shutting down...")
log.Println("System infrastructure initialization successful!")
}
[
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"components": [
{
"internalType": "string",
"name": "name",
"type": "string"
},
{
"internalType": "string",
"name": "sqlType",
"type": "string"
},
{
"internalType": "bool",
"name": "isPrimaryKey",
"type": "bool"
}
],
"indexed": false,
"internalType": "struct ISQLSync.ColumnDef[]",
"name": "columns",
"type": "tuple[]"
}
],
"name": "TableCreated",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"indexed": false,
"internalType": "string[]",
"name": "columns",
"type": "string[]"
},
{
"indexed": false,
"internalType": "string[]",
"name": "values",
"type": "string[]"
}
],
"name": "DataInserted",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"indexed": false,
"internalType": "string[]",
"name": "setColumns",
"type": "string[]"
},
{
"indexed": false,
"internalType": "string[]",
"name": "setValues",
"type": "string[]"
},
{
"indexed": false,
"internalType": "string",
"name": "whereClause",
"type": "string"
}
],
"name": "DataUpdated",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"indexed": false,
"internalType": "string",
"name": "whereClause",
"type": "string"
}
],
"name": "DataDeleted",
"type": "event"
}
]
\ No newline at end of file
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
interface ISQLSync {
struct ColumnDef {
string name;
string sqlType;
bool isPrimaryKey;
}
// --- Events ---
event TableCreated(string tableName, ColumnDef[] columns);
event TableDropped(string tableName);
event TableAltered(string tableName, string rawSql);
event DataInserted(
string indexed tableName,
string[] columns,
string[] values
);
event DataUpdated(
string indexed tableName,
string[] setColumns,
string[] setValues,
string whereClause
);
event DataDeleted(string indexed tableName, string whereClause);
// Advanced Events
event DataBatchInserted(
string indexed tableName,
string[] columns,
string[][] values
);
event DataUpserted(
string indexed tableName,
string[] columns,
string[] values,
string conflictColumn
);
event IndexCreated(
string indexed tableName,
string indexName,
string[] columns,
bool isUnique
);
event TableTruncated(string indexed tableName);
// --- DDL Functions ---
function createTable(
string calldata tableName,
ColumnDef[] calldata columns
) external;
function dropTable(string calldata tableName) external;
function alterTable(
string calldata tableName,
string calldata rawSql
) external;
function createIndex(
string calldata tableName,
string calldata indexName,
string[] calldata columns,
bool isUnique
) external;
function truncateTable(string calldata tableName) external;
// --- DML Functions ---
function insert(
string calldata tableName,
string[] calldata columns,
string[] calldata values
) external;
function update(
string calldata tableName,
string[] calldata setColumns,
string[] calldata setValues,
string calldata whereClause
) external;
function deleteData(
string calldata tableName,
string calldata whereClause
) external;
function batchInsert(
string calldata tableName,
string[] calldata columns,
string[] calldata flatValues
) external;
function upsert(
string calldata tableName,
string[] calldata columns,
string[] calldata values,
string calldata conflictColumn
) external;
// 移除 runRawSQL
}
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
import "./ISQLSync.sol";
contract SQLSync is ISQLSync {
address public owner;
modifier onlyOwner() {
require(msg.sender == owner, "Not owner");
_;
}
// 关键修改:接收 _owner 参数
constructor(address _owner) {
owner = _owner;
}
// --- DDL ---
function createTable(
string calldata tableName,
ColumnDef[] calldata columns
) external override onlyOwner {
emit TableCreated(tableName, columns);
}
function dropTable(string calldata tableName) external override onlyOwner {
emit TableDropped(tableName);
}
function alterTable(
string calldata tableName,
string calldata rawSql
) external override onlyOwner {
emit TableAltered(tableName, rawSql);
}
function createIndex(
string calldata tableName,
string calldata indexName,
string[] calldata columns,
bool isUnique
) external override onlyOwner {
emit IndexCreated(tableName, indexName, columns, isUnique);
}
function truncateTable(
string calldata tableName
) external override onlyOwner {
emit TableTruncated(tableName);
}
// --- DML ---
function insert(
string calldata tableName,
string[] calldata columns,
string[] calldata values
) external override onlyOwner {
emit DataInserted(tableName, columns, values);
}
function update(
string calldata tableName,
string[] calldata setColumns,
string[] calldata setValues,
string calldata whereClause
) external override onlyOwner {
emit DataUpdated(tableName, setColumns, setValues, whereClause);
}
function deleteData(
string calldata tableName,
string calldata whereClause
) external override onlyOwner {
emit DataDeleted(tableName, whereClause);
}
function batchInsert(
string calldata tableName,
string[] calldata columns,
string[] calldata flatValues
) external override onlyOwner {
// 这里只是为了演示接口,实际业务逻辑可以加参数校验
emit DataBatchInserted(
tableName,
columns,
_reconstructBatch(flatValues, columns.length)
);
}
function upsert(
string calldata tableName,
string[] calldata columns,
string[] calldata values,
string calldata conflictColumn
) external override onlyOwner {
emit DataUpserted(tableName, columns, values, conflictColumn);
}
// 辅助函数:将一维数组重构为二维数组以匹配 Event 定义 (Gas 消耗较高,生产环境建议直接传 bytes)
function _reconstructBatch(
string[] memory flatValues,
uint256 colCount
) internal pure returns (string[][] memory) {
require(colCount > 0, "Col count 0");
require(
flatValues.length % colCount == 0,
"Flat values length mismatch"
);
uint256 rowCount = flatValues.length / colCount;
string[][] memory result = new string[][](rowCount);
for (uint256 i = 0; i < rowCount; i++) {
string[] memory row = new string[](colCount);
for (uint256 j = 0; j < colCount; j++) {
row[j] = flatValues[i * colCount + j];
}
result[i] = row;
}
return result;
}
}
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
import "./ISQLSync.sol";
import "./SQLSync.sol"; // 假设这是你的具体实现文件
contract SQLSyncFactory {
// 记录所有创建出来的实例
mapping(address => bool) public isValidInstance;
address[] public allInstances;
event InstanceCreated(address indexed owner, address indexed instance);
/**
* @notice 创建一个新的 SQLSync 实例
* @return instance 新合约地址
*/
function createInstance() external returns (address instance) {
// 部署新的 SQLSync 合约
// 注意:SQLSync 的构造函数需要能够接收 owner,或者在这之后 transferOwnership
SQLSync newContract = new SQLSync(msg.sender);
instance = address(newContract);
isValidInstance[instance] = true;
allInstances.push(instance);
emit InstanceCreated(msg.sender, instance);
}
/**
* @notice 获取实例总数
*/
function getInstanceCount() external view returns (uint256) {
return allInstances.length;
}
}
version: '3.8'
services:
# PostgreSQL 数据库
postgres:
image: postgres:15-alpine
container_name: chainsql-db
environment:
POSTGRES_USER: chainsql
POSTGRES_PASSWORD: password123
POSTGRES_DB: chainsql
ports:
- "5432:5432"
volumes:
- pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U chainsql"]
interval: 5s
timeout: 5s
retries: 5
# ChainSQL 服务
chainsql:
build: .
container_name: chainsql-app
depends_on:
postgres:
condition: service_healthy
environment:
# 覆盖 config.yaml 中的配置
CHAIN_RPC_URL: "https://rpc.ankr.com/eth" # 替换为你的 RPC
CHAIN_FACTORY_ADDRESS: "0x..." # 替换为 Factory 地址
# 使用 Docker 网络中的 DB 地址
DATABASE_DSN: "postgres://chainsql:password123@postgres:5432/chainsql?sslmode=disable"
volumes:
# 挂载本地配置 (可选,方便开发调试)
- ./configs/config.yaml:/app/configs/config.yaml
# 挂载 ABI 文件
- ./configs/abi.json:/app/configs/abi.json
restart: unless-stopped
volumes:
pg_data:
\ No newline at end of file
module chain-sql
go 1.24.0
toolchain go1.24.11
require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/bits-and-blooms/bitset v1.20.0 // indirect
github.com/consensys/gnark-crypto v0.18.0 // indirect
github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/ethereum/c-kzg-4844/v2 v2.1.5 // indirect
github.com/ethereum/go-ethereum v1.16.8 // indirect
github.com/ethereum/go-verkle v0.2.2 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/holiman/uint256 v1.3.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.8.0 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/sagikazarmark/locafero v0.11.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/cast v1.10.0 // indirect
github.com/spf13/pflag v1.0.10 // indirect
github.com/spf13/viper v1.21.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0 // indirect
)
package blockchain
import (
"context"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
// Client 定义了我们需要用到的链上操作接口,方便测试时 Mock
type Client interface {
BlockNumber(ctx context.Context) (uint64, error)
FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error)
ChainID(ctx context.Context) (*big.Int, error)
Close()
}
type EthClient struct {
client *ethclient.Client
}
func NewClient(rawurl string) (*EthClient, error) {
client, err := ethclient.Dial(rawurl)
if err != nil {
return nil, fmt.Errorf("failed to connect to eth node: %w", err)
}
return &EthClient{client: client}, nil
}
func (c *EthClient) Close() {
c.client.Close()
}
func (c *EthClient) BlockNumber(ctx context.Context) (uint64, error) {
return c.client.BlockNumber(ctx)
}
func (c *EthClient) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
return c.client.FilterLogs(ctx, query)
}
func (c *EthClient) ChainID(ctx context.Context) (*big.Int, error) {
return c.client.ChainID(ctx)
}
package config
import (
"strings"
"time"
"github.com/spf13/viper"
)
type Config struct {
Chain ChainConfig `mapstructure:"chain"`
Database DatabaseConfig `mapstructure:"database"`
Sync SyncConfig `mapstructure:"sync"`
}
type ChainConfig struct {
RPCURL string `mapstructure:"rpc_url"`
FactoryAddress string `mapstructure:"factory_address"`
ChainID int64 `mapstructure:"chain_id"`
}
type DatabaseConfig struct {
DSN string `mapstructure:"dsn"`
}
type SyncConfig struct {
StartBlock uint64 `mapstructure:"start_block"`
Confirmations uint64 `mapstructure:"confirmations"`
PollInterval time.Duration `mapstructure:"poll_interval"`
MaxBatchSize int `mapstructure:"max_batch_size"`
AddressBatchSize int `mapstructure:"address_batch_size"`
}
func Load(path string) (*Config, error) {
v := viper.New()
v.SetConfigFile(path)
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
if err := v.ReadInConfig(); err != nil {
return nil, err
}
var cfg Config
if err := v.Unmarshal(&cfg); err != nil {
return nil, err
}
return &cfg, nil
}
package core
import (
"context"
"fmt"
"log"
"math/big"
"os"
"strings"
"time"
"chain-sql/internal/blockchain"
"chain-sql/internal/config"
"chain-sql/internal/database"
"chain-sql/internal/executor"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
const SyncKeyData = "data_listener"
// --- Event Data Structures ---
type EvTableCreated struct {
TableName string
Columns []struct {
Name string
SqlType string
IsPrimaryKey bool
}
}
type EvDataInserted struct {
TableName string
Columns []string
Values []string
}
// --- Listener Implementation ---
type DataListener struct {
cfg *config.Config
client blockchain.Client
db *database.DB
// 修改点:使用接口,不再依赖具体实现
manager ContractProvider
executor EventHandler
contractABI abi.ABI
}
func NewDataListener(cfg *config.Config, client blockchain.Client, db *database.DB, manager ContractProvider) *DataListener {
// 动态加载 ABI
abiPath := "configs/abi.json"
abiBytes, err := os.ReadFile(abiPath)
if err != nil {
log.Fatalf("Failed to read ABI file %s: %v", abiPath, err)
}
parsedABI, err := abi.JSON(strings.NewReader(string(abiBytes)))
if err != nil {
log.Fatalf("Failed to parse ABI JSON: %v", err)
}
log.Printf("DataListener initialized with ABI containing %d events", len(parsedABI.Events))
// 创建具体的 Executor 实现,但在 Struct 中作为 EventHandler 接口存储
realExecutor := executor.NewExecutor(db)
return &DataListener{
cfg: cfg,
client: client,
db: db,
manager: manager,
executor: realExecutor, // 注入实现
contractABI: parsedABI,
}
}
func (l *DataListener) Start(ctx context.Context) {
log.Println("Starting Data Listener...")
ticker := time.NewTicker(l.cfg.Sync.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := l.syncBatch(ctx); err != nil {
log.Printf("Error syncing data events: %v", err)
}
}
}
}
func (l *DataListener) syncBatch(ctx context.Context) error {
contracts := l.manager.GetContracts()
if len(contracts) == 0 {
return nil
}
// 1. 获取同步游标
lastBlock, err := l.db.GetLastBlock(ctx, SyncKeyData)
if err != nil {
return fmt.Errorf("get cursor failed: %w", err)
}
if lastBlock == 0 {
lastBlock = l.cfg.Sync.StartBlock
}
fromBlock := lastBlock + 1
// 2. 获取链头
headBlock, err := l.client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("get head block failed: %w", err)
}
// 3. 计算 ToBlock
safeHead := headBlock - l.cfg.Sync.Confirmations
if fromBlock > safeHead {
return nil // 尚未达到确认高度
}
toBlock := fromBlock + uint64(l.cfg.Sync.MaxBatchSize)
if toBlock > safeHead {
toBlock = safeHead
}
// 4. 准备 Event Topics
var topics []common.Hash
for _, event := range l.contractABI.Events {
topics = append(topics, event.ID)
}
// 5. 分批轮询地址
batchSize := l.cfg.Sync.AddressBatchSize
if batchSize <= 0 {
batchSize = 50
}
log.Printf("Scanning Data: %d -> %d (%d contracts)", fromBlock, toBlock, len(contracts))
for i := 0; i < len(contracts); i += batchSize {
end := i + batchSize
if end > len(contracts) {
end = len(contracts)
}
batchAddrs := contracts[i:end]
query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
ToBlock: big.NewInt(int64(toBlock)),
Addresses: batchAddrs,
Topics: [][]common.Hash{topics},
}
logs, err := l.client.FilterLogs(ctx, query)
if err != nil {
log.Printf("FilterLogs failed for batch %d: %v", i, err)
continue
}
for _, vLog := range logs {
if err := l.processLog(ctx, vLog); err != nil {
log.Printf("Failed to process log tx=%s: %v", vLog.TxHash.Hex(), err)
}
}
}
// 6. 更新游标
return l.db.UpdateLastBlock(ctx, SyncKeyData, toBlock)
}
func (l *DataListener) processLog(ctx context.Context, vLog types.Log) error {
contractAddr := vLog.Address.Hex()
event, err := l.contractABI.EventByID(vLog.Topics[0])
if err != nil {
return nil
}
switch event.Name {
case "TableCreated":
var ev EvTableCreated
if err := l.contractABI.UnpackIntoInterface(&ev, event.Name, vLog.Data); err != nil {
return fmt.Errorf("unpack TableCreated error: %w", err)
}
// 结构体转换以适配接口定义
cols := make([]struct {
Name string
SqlType string
IsPrimaryKey bool
}, len(ev.Columns))
for i, c := range ev.Columns {
cols[i] = struct {
Name string
SqlType string
IsPrimaryKey bool
}{c.Name, c.SqlType, c.IsPrimaryKey}
}
log.Printf("[DDL] TableCreated: %s.%s", contractAddr, ev.TableName)
return l.executor.HandleTableCreated(ctx, contractAddr, ev.TableName, cols)
case "DataInserted":
var ev EvDataInserted
if err := l.contractABI.UnpackIntoInterface(&ev, event.Name, vLog.Data); err != nil {
return fmt.Errorf("unpack DataInserted error: %w", err)
}
log.Printf("[DML] DataInserted: %s.%s", contractAddr, ev.TableName)
return l.executor.HandleDataInserted(ctx, contractAddr, ev.TableName, ev.Columns, ev.Values)
default:
// 其他事件忽略
}
return nil
}
package core
import (
"context"
"fmt"
"log"
"math/big"
"time"
"chain-sql/internal/blockchain"
"chain-sql/internal/config"
"chain-sql/internal/database"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
const (
SyncKeyFactory = "factory_listener"
)
type FactoryListener struct {
cfg *config.Config
client blockchain.Client
db *database.DB
factoryAddr common.Address
// 缓存 Event 签名
eventSig common.Hash
eventABI abi.Event
}
func NewFactoryListener(cfg *config.Config, client blockchain.Client, db *database.DB) *FactoryListener {
// 手动定义 ABI,或者使用 abigen 生成的代码。这里为了演示独立性手动定义。
// Event: InstanceCreated(address indexed owner, address indexed instance)
eventSig := crypto.Keccak256Hash([]byte("InstanceCreated(address,address)"))
return &FactoryListener{
cfg: cfg,
client: client,
db: db,
factoryAddr: common.HexToAddress(cfg.Chain.FactoryAddress),
eventSig: eventSig,
}
}
func (l *FactoryListener) Start(ctx context.Context) {
log.Println("Starting Factory Listener...")
ticker := time.NewTicker(l.cfg.Sync.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := l.syncOnce(ctx); err != nil {
log.Printf("Error syncing factory events: %v", err)
}
}
}
}
func (l *FactoryListener) syncOnce(ctx context.Context) error {
// 1. 获取上次同步高度
lastBlock, err := l.db.GetLastBlock(ctx, SyncKeyFactory)
if err != nil {
return fmt.Errorf("get cursor failed: %w", err)
}
// 2. 如果是从 0 开始,使用配置的 StartBlock
if lastBlock == 0 {
lastBlock = l.cfg.Sync.StartBlock
}
fromBlock := lastBlock + 1
// 3. 获取当前链高度
headBlock, err := l.client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("get head block failed: %w", err)
}
// 4. 计算 ToBlock (考虑确认数和BatchSize)
safeHead := headBlock - l.cfg.Sync.Confirmations
if fromBlock > safeHead {
return nil // 还没到安全高度
}
toBlock := fromBlock + uint64(l.cfg.Sync.MaxBatchSize)
if toBlock > safeHead {
toBlock = safeHead
}
log.Printf("Scanning Factory: %d -> %d", fromBlock, toBlock)
// 5. 拉取日志
query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
ToBlock: big.NewInt(int64(toBlock)),
Addresses: []common.Address{l.factoryAddr},
Topics: [][]common.Hash{{l.eventSig}},
}
logs, err := l.client.FilterLogs(ctx, query)
if err != nil {
return fmt.Errorf("filter logs failed: %w", err)
}
// 6. 处理日志
for _, vLog := range logs {
if len(vLog.Topics) < 3 {
continue // 异常日志
}
// topic[0] is signature, topic[1] is owner (indexed), topic[2] is instance (indexed)
owner := common.HexToAddress(vLog.Topics[1].Hex())
instance := common.HexToAddress(vLog.Topics[2].Hex())
log.Printf("New Instance Found! Address: %s, Owner: %s", instance.Hex(), owner.Hex())
// 存入 DB
if err := l.db.RegisterInstance(ctx, instance.Hex(), owner.Hex(), vLog.BlockNumber); err != nil {
log.Printf("Failed to register instance %s: %v", instance.Hex(), err)
// 这里策略:如果是 DB 错误,应该 retry 或 return err 阻止 cursor 更新
// 简单起见,这里 return err
return err
}
}
// 7. 更新游标
if err := l.db.UpdateLastBlock(ctx, SyncKeyFactory, toBlock); err != nil {
return fmt.Errorf("update cursor failed: %w", err)
}
return nil
}
package core
import (
"context"
"github.com/ethereum/go-ethereum/common"
)
// EventHandler 定义了处理业务逻辑(SQL执行)的接口
// 这使得 DataListener 不需要关心底层是 Postgres 还是 MySQL,也方便 Mock 测试
type EventHandler interface {
EnsureSchema(ctx context.Context, contractAddr string) error
HandleTableCreated(ctx context.Context, contractAddr string, tableName string, columns []struct {
Name string
SqlType string
IsPrimaryKey bool
}) error
HandleDataInserted(ctx context.Context, contractAddr string, tableName string, columns []string, values []string) error
}
// ContractProvider 定义了获取目标合约列表的接口
type ContractProvider interface {
GetContracts() []common.Address
}
package core
import (
"context"
"log"
"sync"
"time"
"chain-sql/internal/database"
"github.com/ethereum/go-ethereum/common"
)
type ContractManager struct {
db *database.DB
contracts []common.Address
mu sync.RWMutex
}
func NewContractManager(db *database.DB) *ContractManager {
return &ContractManager{
db: db,
}
}
// Reload 从数据库重新加载活跃合约列表
func (m *ContractManager) Reload(ctx context.Context) error {
addrs, err := m.db.GetActiveInstances(ctx)
if err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
// 转换 string -> common.Address
m.contracts = make([]common.Address, len(addrs))
for i, addr := range addrs {
m.contracts[i] = common.HexToAddress(addr)
}
log.Printf("Loaded %d active contracts", len(m.contracts))
return nil
}
// GetContracts 获取当前缓存的合约列表
func (m *ContractManager) GetContracts() []common.Address {
m.mu.RLock()
defer m.mu.RUnlock()
// 返回副本以防并发问题
result := make([]common.Address, len(m.contracts))
copy(result, m.contracts)
return result
}
// StartAutoReload 周期性刷新缓存(例如每分钟,或者由 FactoryListener 触发)
func (m *ContractManager) StartAutoReload(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
// 启动时先加载一次
m.Reload(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := m.Reload(ctx); err != nil {
log.Printf("Error reloading contracts: %v", err)
}
}
}
}
package database
import (
"context"
"fmt"
"log"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type DB struct {
pool *pgxpool.Pool
}
func New(ctx context.Context, dsn string) (*DB, error) {
config, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("failed to parse dsn: %w", err)
}
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to connect to db: %w", err)
}
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("failed to ping db: %w", err)
}
db := &DB{pool: pool}
if err := db.initSchema(ctx); err != nil {
return nil, err
}
return db, nil
}
func (d *DB) Close() {
d.pool.Close()
}
// Exec 执行通用 SQL
func (d *DB) Exec(ctx context.Context, sql string, args ...interface{}) error {
_, err := d.pool.Exec(ctx, sql, args...)
return err
}
// initSchema 初始化系统表
func (d *DB) initSchema(ctx context.Context) error {
// 1. 合约实例注册表
// 2. 游标记录表
sql := `
CREATE TABLE IF NOT EXISTS _chainsql_instances (
contract_address VARCHAR(42) PRIMARY KEY,
owner_address VARCHAR(42),
created_at_block BIGINT,
status VARCHAR(20) DEFAULT 'active'
);
CREATE TABLE IF NOT EXISTS _chainsql_cursor (
sync_key VARCHAR(50) PRIMARY KEY,
last_block BIGINT,
updated_at TIMESTAMP
);
`
_, err := d.pool.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("failed to init schema: %w", err)
}
log.Println("Database schema initialized")
return nil
}
// RegisterInstance 注册新发现的合约实例
func (d *DB) RegisterInstance(ctx context.Context, address string, owner string, blockNumber uint64) error {
sql := `
INSERT INTO _chainsql_instances (contract_address, owner_address, created_at_block, status)
VALUES ($1, $2, $3, 'active')
ON CONFLICT (contract_address) DO NOTHING
`
_, err := d.pool.Exec(ctx, sql, address, owner, blockNumber)
return err
}
// GetLastBlock 获取指定 key 的同步高度
func (d *DB) GetLastBlock(ctx context.Context, key string) (uint64, error) {
var blockNum uint64
sql := `SELECT last_block FROM _chainsql_cursor WHERE sync_key = $1`
err := d.pool.QueryRow(ctx, sql, key).Scan(&blockNum)
if err == pgx.ErrNoRows {
return 0, nil
}
if err != nil {
return 0, err
}
return blockNum, nil
}
// UpdateLastBlock 更新同步高度
func (d *DB) UpdateLastBlock(ctx context.Context, key string, blockNum uint64) error {
sql := `
INSERT INTO _chainsql_cursor (sync_key, last_block, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (sync_key)
DO UPDATE SET last_block = $2, updated_at = NOW()
`
_, err := d.pool.Exec(ctx, sql, key, blockNum)
return err
}
func (d *DB) GetActiveInstances(ctx context.Context) ([]string, error) {
sql := `SELECT contract_address FROM _chainsql_instances WHERE status = 'active'`
rows, err := d.pool.Query(ctx, sql)
if err != nil {
return nil, err
}
defer rows.Close()
var addrs []string
for rows.Next() {
var addr string
if err := rows.Scan(&addr); err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return addrs, nil
}
package executor
import (
"context"
"fmt"
"log"
"strings"
"chain-sql/internal/database"
)
// SQL 类型白名单 (PostgreSQL 常用类型)
// 凡是不在这个列表里的类型定义,HandleTableCreated 都会拒绝执行
var allowedSqlTypes = map[string]bool{
"int": true, "integer": true, "bigint": true, "smallint": true,
"text": true, "varchar": true, "char": true, "character varying": true,
"boolean": true, "bool": true,
"decimal": true, "numeric": true, "real": true, "double precision": true,
"timestamp": true, "date": true, "time": true,
"json": true, "jsonb": true,
"bytea": true,
}
type Executor struct {
db *database.DB
}
func NewExecutor(db *database.DB) *Executor {
return &Executor{db: db}
}
// validateSqlType 校验 SQL 类型是否合法
func validateSqlType(t string) bool {
// 移除长度修饰符,例如 "VARCHAR(50)" -> "varchar"
parts := strings.Split(t, "(")
baseType := strings.TrimSpace(strings.ToLower(parts[0]))
return allowedSqlTypes[baseType]
}
// EnsureSchema 为每个合约创建独立的 Schema
func (e *Executor) EnsureSchema(ctx context.Context, contractAddr string) error {
schemaName := QuoteIdentifier(contractAddr)
sql := fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS %s`, schemaName)
return e.db.Exec(ctx, sql)
}
// HandleTableCreated 处理建表事件 (含安全校验)
func (e *Executor) HandleTableCreated(ctx context.Context, contractAddr string, tableName string, columns []struct {
Name string
SqlType string
IsPrimaryKey bool
}) error {
schemaName := QuoteIdentifier(contractAddr)
safeTableName := QuoteIdentifier(tableName)
var colDefs []string
var pks []string
for _, col := range columns {
// --- 安全检查核心逻辑 ---
if !validateSqlType(col.SqlType) {
return fmt.Errorf("security error: invalid or forbidden sql type '%s' for column '%s'", col.SqlType, col.Name)
}
// ---------------------
def := fmt.Sprintf("%s %s", QuoteIdentifier(col.Name), col.SqlType)
colDefs = append(colDefs, def)
if col.IsPrimaryKey {
pks = append(pks, QuoteIdentifier(col.Name))
}
}
if len(pks) > 0 {
pkDef := fmt.Sprintf("PRIMARY KEY (%s)", strings.Join(pks, ", "))
colDefs = append(colDefs, pkDef)
}
fullSql := fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s.%s (%s)`,
schemaName,
safeTableName,
strings.Join(colDefs, ", "),
)
log.Printf("Executing DDL: %s", fullSql)
return e.db.Exec(ctx, fullSql)
}
// HandleDataInserted 处理插入事件
func (e *Executor) HandleDataInserted(ctx context.Context, contractAddr string, tableName string, columns []string, values []string) error {
schemaName := QuoteIdentifier(contractAddr)
safeTableName := QuoteIdentifier(tableName)
// 构建占位符 ($1, $2...)
placeholders := make([]string, len(values))
args := make([]interface{}, len(values))
safeCols := make([]string, len(columns))
for i, v := range values {
placeholders[i] = fmt.Sprintf("$%d", i+1)
args[i] = v
}
for i, c := range columns {
safeCols[i] = QuoteIdentifier(c)
}
sql := fmt.Sprintf(
`INSERT INTO %s.%s (%s) VALUES (%s)`,
schemaName,
safeTableName,
strings.Join(safeCols, ", "),
strings.Join(placeholders, ", "),
)
return e.db.Exec(ctx, sql, args...)
}
// QuoteIdentifier 对标识符加双引号,防止 SQL 注入和关键字冲突
func QuoteIdentifier(s string) string {
return fmt.Sprintf(`"%s"`, strings.ReplaceAll(s, `"`, `""`))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment