Commit 37809aef authored by duanjinfei's avatar duanjinfei

feat: reslove sql parse

parent 95490f6f
...@@ -9,30 +9,38 @@ ...@@ -9,30 +9,38 @@
"type": "string" "type": "string"
}, },
{ {
"components": [
{
"internalType": "string",
"name": "name",
"type": "string"
},
{
"internalType": "string",
"name": "sqlType",
"type": "string"
},
{
"internalType": "bool",
"name": "isPrimaryKey",
"type": "bool"
}
],
"indexed": false, "indexed": false,
"internalType": "struct ISQLSync.ColumnDef[]", "internalType": "string[]",
"name": "columns", "name": "columns",
"type": "tuple[]" "type": "string[]"
},
{
"indexed": false,
"internalType": "string[][]",
"name": "values",
"type": "string[][]"
} }
], ],
"name": "TableCreated", "name": "DataBatchInserted",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"indexed": false,
"internalType": "string",
"name": "whereClause",
"type": "string"
}
],
"name": "DataDeleted",
"type": "event" "type": "event"
}, },
{ {
...@@ -100,25 +108,43 @@ ...@@ -100,25 +108,43 @@
"name": "tableName", "name": "tableName",
"type": "string" "type": "string"
}, },
{
"indexed": false,
"internalType": "string[]",
"name": "columns",
"type": "string[]"
},
{
"indexed": false,
"internalType": "string[]",
"name": "values",
"type": "string[]"
},
{ {
"indexed": false, "indexed": false,
"internalType": "string", "internalType": "string",
"name": "whereClause", "name": "conflictColumn",
"type": "string" "type": "string"
} }
], ],
"name": "DataDeleted", "name": "DataUpserted",
"type": "event" "type": "event"
}, },
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [
{ {
"indexed": true, "indexed": false,
"internalType": "string", "internalType": "string",
"name": "tableName", "name": "tableName",
"type": "string" "type": "string"
}, },
{
"indexed": false,
"internalType": "string",
"name": "indexName",
"type": "string"
},
{ {
"indexed": false, "indexed": false,
"internalType": "string[]", "internalType": "string[]",
...@@ -127,12 +153,93 @@ ...@@ -127,12 +153,93 @@
}, },
{ {
"indexed": false, "indexed": false,
"internalType": "string[][]", "internalType": "bool",
"name": "values", "name": "isUnique",
"type": "string[][]" "type": "bool"
} }
], ],
"name": "DataBatchInserted", "name": "IndexCreated",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"indexed": false,
"internalType": "string",
"name": "rawSql",
"type": "string"
}
],
"name": "TableAltered",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
},
{
"components": [
{
"internalType": "string",
"name": "name",
"type": "string"
},
{
"internalType": "string",
"name": "sqlType",
"type": "string"
},
{
"internalType": "bool",
"name": "isPrimaryKey",
"type": "bool"
}
],
"indexed": false,
"internalType": "struct ISQLSync.ColumnDef[]",
"name": "columns",
"type": "tuple[]"
}
],
"name": "TableCreated",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
}
],
"name": "TableDropped",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "tableName",
"type": "string"
}
],
"name": "TableTruncated",
"type": "event" "type": "event"
} }
] ]
\ No newline at end of file
...@@ -13,37 +13,37 @@ interface ISQLSync { ...@@ -13,37 +13,37 @@ interface ISQLSync {
event TableDropped(string tableName); event TableDropped(string tableName);
event TableAltered(string tableName, string rawSql); event TableAltered(string tableName, string rawSql);
event DataInserted( event DataInserted(
string indexed tableName, string tableName,
string[] columns, string[] columns,
string[] values string[] values
); );
event DataUpdated( event DataUpdated(
string indexed tableName, string tableName,
string[] setColumns, string[] setColumns,
string[] setValues, string[] setValues,
string whereClause string whereClause
); );
event DataDeleted(string indexed tableName, string whereClause); event DataDeleted(string tableName, string whereClause);
// Advanced Events // Advanced Events
event DataBatchInserted( event DataBatchInserted(
string indexed tableName, string tableName,
string[] columns, string[] columns,
string[][] values string[][] values
); );
event DataUpserted( event DataUpserted(
string indexed tableName, string tableName,
string[] columns, string[] columns,
string[] values, string[] values,
string conflictColumn string conflictColumn
); );
event IndexCreated( event IndexCreated(
string indexed tableName, string tableName,
string indexName, string indexName,
string[] columns, string[] columns,
bool isUnique bool isUnique
); );
event TableTruncated(string indexed tableName); event TableTruncated(string tableName);
// --- DDL Functions --- // --- DDL Functions ---
function createTable( function createTable(
......
...@@ -13,37 +13,37 @@ interface ISQLSync { ...@@ -13,37 +13,37 @@ interface ISQLSync {
event TableDropped(string tableName); event TableDropped(string tableName);
event TableAltered(string tableName, string rawSql); event TableAltered(string tableName, string rawSql);
event DataInserted( event DataInserted(
string indexed tableName, string tableName,
string[] columns, string[] columns,
string[] values string[] values
); );
event DataUpdated( event DataUpdated(
string indexed tableName, string tableName,
string[] setColumns, string[] setColumns,
string[] setValues, string[] setValues,
string whereClause string whereClause
); );
event DataDeleted(string indexed tableName, string whereClause); event DataDeleted(string tableName, string whereClause);
// Advanced Events // Advanced Events
event DataBatchInserted( event DataBatchInserted(
string indexed tableName, string tableName,
string[] columns, string[] columns,
string[][] values string[][] values
); );
event DataUpserted( event DataUpserted(
string indexed tableName, string tableName,
string[] columns, string[] columns,
string[] values, string[] values,
string conflictColumn string conflictColumn
); );
event IndexCreated( event IndexCreated(
string indexed tableName, string tableName,
string indexName, string indexName,
string[] columns, string[] columns,
bool isUnique bool isUnique
); );
event TableTruncated(string indexed tableName); event TableTruncated(string tableName);
// --- DDL Functions --- // --- DDL Functions ---
function createTable( function createTable(
......
...@@ -138,7 +138,14 @@ func (l *DataListener) syncBatch(ctx context.Context) error { ...@@ -138,7 +138,14 @@ func (l *DataListener) syncBatch(ctx context.Context) error {
return fmt.Errorf("get head block failed: %w", err) return fmt.Errorf("get head block failed: %w", err)
} }
// 3. 计算 ToBlock // 3. ⭐ 防止 uint64 下溢
if headBlock < l.cfg.Sync.Confirmations {
log.Printf("Chain too young (head: %d, need %d confirmations), waiting...",
headBlock, l.cfg.Sync.Confirmations)
return nil
}
// 4. 计算 ToBlock
safeHead := headBlock - l.cfg.Sync.Confirmations safeHead := headBlock - l.cfg.Sync.Confirmations
if fromBlock > safeHead { if fromBlock > safeHead {
return nil // 尚未达到确认高度 return nil // 尚未达到确认高度
...@@ -149,13 +156,13 @@ func (l *DataListener) syncBatch(ctx context.Context) error { ...@@ -149,13 +156,13 @@ func (l *DataListener) syncBatch(ctx context.Context) error {
toBlock = safeHead toBlock = safeHead
} }
// 4. 准备 Event Topics // 5. 准备 Event Topics
var topics []common.Hash var topics []common.Hash
for _, event := range l.contractABI.Events { for _, event := range l.contractABI.Events {
topics = append(topics, event.ID) topics = append(topics, event.ID)
} }
// 5. 分批轮询地址 // 6. 分批轮询地址
batchSize := l.cfg.Sync.AddressBatchSize batchSize := l.cfg.Sync.AddressBatchSize
if batchSize <= 0 { if batchSize <= 0 {
batchSize = 50 batchSize = 50
...@@ -163,6 +170,9 @@ func (l *DataListener) syncBatch(ctx context.Context) error { ...@@ -163,6 +170,9 @@ func (l *DataListener) syncBatch(ctx context.Context) error {
log.Printf("Scanning Data: %d -> %d (%d contracts)", fromBlock, toBlock, len(contracts)) log.Printf("Scanning Data: %d -> %d (%d contracts)", fromBlock, toBlock, len(contracts))
// ⭐ 收集所有错误,确保数据完整性
var errors []error
for i := 0; i < len(contracts); i += batchSize { for i := 0; i < len(contracts); i += batchSize {
end := i + batchSize end := i + batchSize
if end > len(contracts) { if end > len(contracts) {
...@@ -180,17 +190,24 @@ func (l *DataListener) syncBatch(ctx context.Context) error { ...@@ -180,17 +190,24 @@ func (l *DataListener) syncBatch(ctx context.Context) error {
logs, err := l.client.FilterLogs(ctx, query) logs, err := l.client.FilterLogs(ctx, query)
if err != nil { if err != nil {
log.Printf("FilterLogs failed for batch %d: %v", i, err) log.Printf("FilterLogs failed for batch %d: %v", i, err)
errors = append(errors, fmt.Errorf("batch %d: %w", i, err))
continue continue
} }
for _, vLog := range logs { for _, vLog := range logs {
if err := l.processLog(ctx, vLog); err != nil { if err := l.processLog(ctx, vLog); err != nil {
log.Printf("Failed to process log tx=%s: %v", vLog.TxHash.Hex(), err) log.Printf("Failed to process log tx=%s: %v", vLog.TxHash.Hex(), err)
errors = append(errors, fmt.Errorf("tx %s: %w", vLog.TxHash.Hex(), err))
} }
} }
} }
// 6. 更新游标 // 7. ⭐ 只有全部成功才更新游标
if len(errors) > 0 {
return fmt.Errorf("failed to process %d logs, will retry next time: %v", len(errors), errors[0])
}
// 8. 更新游标
return l.db.UpdateLastBlock(ctx, SyncKeyData, toBlock) return l.db.UpdateLastBlock(ctx, SyncKeyData, toBlock)
} }
......
...@@ -152,9 +152,26 @@ func (e *Executor) HandleDataUpdated(ctx context.Context, contractAddr string, t ...@@ -152,9 +152,26 @@ func (e *Executor) HandleDataUpdated(ctx context.Context, contractAddr string, t
} }
// 解析 WHERE 子句 // 解析 WHERE 子句
// 首先需要获取表的列信息来构建白名单 // ⭐ 使用 setColumns 作为基础,但需要包含可能在 WHERE 中使用的其他列
// 为了简化,这里使用所有 setColumns 作为允许的列(实际应该查询表结构) // 临时方案:允许所有常见列名,生产环境应该查询表结构
allowedCols := append([]string{}, setColumns...) allowedCols := make([]string, 0)
allowedCols = append(allowedCols, setColumns...)
// 添加常见的列名
commonCols := []string{"id", "name", "status", "email", "price", "stock", "created_at", "updated_at"}
for _, col := range commonCols {
// 避免重复
found := false
for _, existing := range allowedCols {
if strings.EqualFold(existing, col) {
found = true
break
}
}
if !found {
allowedCols = append(allowedCols, col)
}
}
parser := NewWhereParser(allowedCols) parser := NewWhereParser(allowedCols)
whereCond, err := parser.Parse(whereClause) whereCond, err := parser.Parse(whereClause)
...@@ -162,11 +179,16 @@ func (e *Executor) HandleDataUpdated(ctx context.Context, contractAddr string, t ...@@ -162,11 +179,16 @@ func (e *Executor) HandleDataUpdated(ctx context.Context, contractAddr string, t
return fmt.Errorf("parse WHERE clause failed: %w", err) return fmt.Errorf("parse WHERE clause failed: %w", err)
} }
// 调整参数占位符编号 // ⭐ 调整 WHERE 子句的参数占位符编号
// WHERE 解析器生成的占位符从 $1 开始,但我们需要将它们调整为从 len(setValues)+1 开始
adjustedWhereSQL := whereCond.SQL adjustedWhereSQL := whereCond.SQL
for i := len(args); i > 0; i-- { setParamCount := len(setValues)
placeholder := fmt.Sprintf("$%d", i-len(args)+len(setValues))
adjustedWhereSQL = strings.Replace(adjustedWhereSQL, fmt.Sprintf("$%d", i), placeholder, -1) // 从后往前替换,避免冲突(例如 $10 被误替换为 $1)
for i := len(whereCond.Args); i >= 1; i-- {
oldPlaceholder := fmt.Sprintf("$%d", i)
newPlaceholder := fmt.Sprintf("$%d", setParamCount+i)
adjustedWhereSQL = strings.ReplaceAll(adjustedWhereSQL, oldPlaceholder, newPlaceholder)
} }
// 追加 WHERE 参数 // 追加 WHERE 参数
...@@ -190,9 +212,9 @@ func (e *Executor) HandleDataDeleted(ctx context.Context, contractAddr string, t ...@@ -190,9 +212,9 @@ func (e *Executor) HandleDataDeleted(ctx context.Context, contractAddr string, t
safeTableName := QuoteIdentifier(tableName) safeTableName := QuoteIdentifier(tableName)
// 解析 WHERE 子句 // 解析 WHERE 子句
// 注意:这里需要表的列信息,简化处理,允许常见列名 // ⭐ 允许所有常见列名,生产环境应该查询 information_schema 获取实际列
// 生产环境应该查询 information_schema 获取实际列 commonCols := []string{"id", "name", "status", "email", "price", "stock", "created_at", "updated_at", "type", "category"}
parser := NewWhereParser([]string{"id", "name", "status", "email", "created_at", "updated_at"}) parser := NewWhereParser(commonCols)
whereCond, err := parser.Parse(whereClause) whereCond, err := parser.Parse(whereClause)
if err != nil { if err != nil {
......
...@@ -98,6 +98,8 @@ func (p *WhereParser) Parse(whereClause string) (*WhereCondition, error) { ...@@ -98,6 +98,8 @@ func (p *WhereParser) Parse(whereClause string) (*WhereCondition, error) {
} }
// tokenize 简单的词法分析 // tokenize 简单的词法分析
// 目标:只在 AND/OR 处分割,保持条件表达式完整
// 例如: "stock < 20 AND status = 'available'" -> ["stock < 20", "AND", "status = 'available'"]
func (p *WhereParser) tokenize(input string) []string { func (p *WhereParser) tokenize(input string) []string {
var tokens []string var tokens []string
var current strings.Builder var current strings.Builder
...@@ -105,9 +107,12 @@ func (p *WhereParser) tokenize(input string) []string { ...@@ -105,9 +107,12 @@ func (p *WhereParser) tokenize(input string) []string {
quoteChar := rune(0) quoteChar := rune(0)
parenDepth := 0 parenDepth := 0
for i, ch := range input { i := 0
switch { for i < len(input) {
case (ch == '\'' || ch == '"') && (i == 0 || input[i-1] != '\\'): ch := rune(input[i])
// 处理引号
if (ch == '\'' || ch == '"') && (i == 0 || input[i-1] != '\\') {
if !inQuote { if !inQuote {
inQuote = true inQuote = true
quoteChar = ch quoteChar = ch
...@@ -116,44 +121,89 @@ func (p *WhereParser) tokenize(input string) []string { ...@@ -116,44 +121,89 @@ func (p *WhereParser) tokenize(input string) []string {
quoteChar = 0 quoteChar = 0
} }
current.WriteRune(ch) current.WriteRune(ch)
i++
continue
}
case ch == '(' && !inQuote: // 在引号内,直接添加
if current.Len() > 0 { if inQuote {
tokens = append(tokens, current.String()) current.WriteRune(ch)
current.Reset() i++
} continue
tokens = append(tokens, "(") }
parenDepth++
case ch == ')' && !inQuote: // 处理括号
if current.Len() > 0 { if ch == '(' {
tokens = append(tokens, current.String()) current.WriteRune(ch)
current.Reset() parenDepth++
} i++
tokens = append(tokens, ")") continue
}
if ch == ')' {
current.WriteRune(ch)
parenDepth-- parenDepth--
i++
continue
}
case ch == ' ' && !inQuote && parenDepth == 0: // 在括号内,直接添加
if current.Len() > 0 { if parenDepth > 0 {
word := current.String() current.WriteRune(ch)
upperWord := strings.ToUpper(word) i++
continue
}
// 检查是否是逻辑操作符 // ⭐ 检查是否是 AND 或 OR(只在括号外且非引号内)
if upperWord == "AND" || upperWord == "OR" { if i+3 <= len(input) && strings.ToUpper(input[i:i+3]) == "AND" {
tokens = append(tokens, word) // 确保 AND 是独立的词(前后是空格或边界)
isWord := (i == 0 || input[i-1] == ' ') &&
(i+3 == len(input) || input[i+3] == ' ')
if isWord {
// 保存当前 token
if current.Len() > 0 {
tokens = append(tokens, strings.TrimSpace(current.String()))
current.Reset() current.Reset()
} else {
current.WriteRune(ch)
} }
// 添加 AND
tokens = append(tokens, "AND")
i += 3
// 跳过 AND 后的空格
for i < len(input) && input[i] == ' ' {
i++
}
continue
} }
}
default: if i+2 <= len(input) && strings.ToUpper(input[i:i+2]) == "OR" {
current.WriteRune(ch) // 确保 OR 是独立的词
isWord := (i == 0 || input[i-1] == ' ') &&
(i+2 == len(input) || input[i+2] == ' ')
if isWord {
// 保存当前 token
if current.Len() > 0 {
tokens = append(tokens, strings.TrimSpace(current.String()))
current.Reset()
}
// 添加 OR
tokens = append(tokens, "OR")
i += 2
// 跳过 OR 后的空格
for i < len(input) && input[i] == ' ' {
i++
}
continue
}
} }
// 其他字符直接添加
current.WriteRune(ch)
i++
} }
// 添加最后一个 token
if current.Len() > 0 { if current.Len() > 0 {
tokens = append(tokens, current.String()) tokens = append(tokens, strings.TrimSpace(current.String()))
} }
return tokens return tokens
......
#!/bin/bash
# scripts/clean-schemas.sh
echo "🗑️ Cleaning ChainSQL schemas..."
# 删除所有 0x 开头的 schema
docker exec chainsql-db psql -U chainsql -d chainsql -c \
"DO \$\$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE '0x%'
LOOP
RAISE NOTICE 'Dropping schema: %', r.schema_name;
EXECUTE 'DROP SCHEMA IF EXISTS \"' || r.schema_name || '\" CASCADE';
END LOOP;
END \$\$;"
echo "✅ All ChainSQL schemas cleaned!"
# 重置游标
sh scripts/reset-db.sh
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment