wd-rating/utils/sal_utils/litesql.py

81 lines
2.6 KiB
Python
Raw Permalink Normal View History

2023-10-07 15:24:29 +08:00
from pyparsing import *
from sqlalchemy import and_, or_
from mods.user.mods.user.models import User
# 定义词法规则
operand = Word(alphanums) | quotedString.setParseAction(removeQuotes)
integer = Word(nums).setParseAction(lambda t: int(t[0]))
floating = Combine(Word(nums) + '.' + Word(nums)).setParseAction(lambda t: float(t[0]))
none = CaselessKeyword("None").setParseAction(lambda t: None)
boolean = (CaselessKeyword("True") | CaselessKeyword("False")).setParseAction(lambda t: t[0] == "True")
value = operand | integer | floating | none | boolean
array = Group(Suppress('[') + delimitedList(value, delim=',') + Suppress(']'))
operator = oneOf("< = > != like in")
comparison = Group(operand + operator + (value | array))
# 定义解析动作
def convert_to_sqlalchemy_comparison(tokens):
field, op, value = tokens[0]
if op == '<':
return ("condition", getattr(User, field).__lt__(value))
elif op == '>':
return ("condition", getattr(User, field).__gt__(value))
elif op == '=':
return ("condition", getattr(User, field).__eq__(value))
elif op == '!=':
return ("condition", getattr(User, field).__ne__(value))
elif op == 'like':
return ("condition", getattr(User, field).like(value))
elif op == 'in':
return ("condition", getattr(User, field).in_(value))
else:
raise Exception(f"Unknown operator: {op}")
comparison.setParseAction(convert_to_sqlalchemy_comparison)
# 定义解析动作
def combine_and(s, l, t):
return ("and", t[0][::2])
def combine_or(s, l, t):
return ("or", t[0][::2])
# 定义语法规则
and__ = CaselessKeyword("and")
or__ = CaselessKeyword("or")
condition = infixNotation(comparison,
[(and__, 2, opAssoc.LEFT, combine_and),
(or__, 2, opAssoc.LEFT, combine_or)])
from sqlalchemy import and_, or_
def build_query(parsed):
if isinstance(parsed, tuple) and len(parsed) == 2:
operator, conditions = parsed
if operator == "and":
return and_(*[build_query(condition) for condition in conditions])
elif operator == "or":
return or_(*[build_query(condition) for condition in conditions])
else:
raise Exception(f"Unknown operator: {operator}")
elif isinstance(parsed, tuple) and parsed[0] == "condition":
return parsed[1]
else:
raise Exception(f"Unknown condition: {parsed}")
# 测试
# 测试解析
parsed = condition.parseString(
"name like 'lili' or name like 'lina' or (phone < 50 and phone > 10) and email in ['london',1,1.2,None,False,True] and id = 1")
print(parsed)
query = build_query(parsed)
print(query)