Skip to content

Commit b9eff44

Browse files
authoredFeb 24, 2025··
Merge pull request #1 from LeadsiftAI/feature.code_review_base
code-review基础方法
2 parents 223f3cd + 9321341 commit b9eff44

File tree

8 files changed

+222
-26
lines changed

8 files changed

+222
-26
lines changed
 

‎ai_review/analyzers/ast_parser.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,7 @@ def visit_Assign(self, node):
2121
# 检测未使用变量
2222
if isinstance(node.targets[0].ctx, ast.Store):
2323
if not any(ref.id == node.targets[0].id for ref in self.references):
24-
self.findings.append(f"未使用变量: {node.targets[0].id}")
24+
self.findings.append(f"未使用变量: {node.targets[0].id}")
25+
26+
def visit_Assert(self, node: ast.Assert) -> ast.Any:
27+
return super().visit_Assert(node)

‎ai_review/cli/main.py

+49-22
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,54 @@
1-
from analyzers.ast_parser import ASTAnalyzer
2-
from rules import style, security
1+
# from analyzers.ast_parser import ASTAnalyzer
2+
# from rules import style, security
33

4-
def code_review(file_path):
5-
# AST解析
6-
with open(file_path) as f:
7-
tree = ast.parse(f.read())
4+
# def code_review(file_path):
5+
# # AST解析
6+
# with open(file_path) as f:
7+
# tree = ast.parse(f.read())
8+
9+
# # 执行静态分析
10+
# analyzer = ASTAnalyzer()
11+
# analyzer.visit(tree)
12+
13+
# # 应用规则检查
14+
# style_violations = StyleRules.check_naming_convention(tree)
15+
# security_issues = SecurityRules.check_injection(tree)
816

9-
# 执行静态分析
10-
analyzer = ASTAnalyzer()
11-
analyzer.visit(tree)
17+
# # AI增强审查
18+
# if config['ai']['enable']:
19+
# ai_reviewer = AICodeReviewer(config['ai']['api_key'])
20+
# ai_suggestions = ai_reviewer.get_optimization_suggestion(code_snippet)
21+
22+
# # 生成报告
23+
# generate_report({
24+
# 'static_analysis': analyzer.findings,
25+
# 'style_violations': style_violations,
26+
# 'ai_suggestions': ai_suggestions
27+
# })
28+
import click
29+
from ai_review.core.analyzer import CodeAnalyzer
30+
from ai_review.core.model_adapter import AIModelHandler
31+
32+
@click.command()
33+
@click.argument('file_path')
34+
@click.option('--ai', is_flag=True, help='启用AI增强审查')
35+
def review(file_path: str, ai: bool):
36+
"""执行代码审查流水线"""
37+
with open(file_path) as f:
38+
code = f.read()
1239

13-
# 应用规则检查
14-
style_violations = StyleRules.check_naming_convention(tree)
15-
security_issues = SecurityRules.check_injection(tree)
40+
# 静态规则审查
41+
analyzer = CodeAnalyzer(['ai_review.rules.security_rules'])
42+
findings = analyzer.analyze(code)
1643

17-
# AI增强审查
18-
if config['ai']['enable']:
19-
ai_reviewer = AICodeReviewer(config['ai']['api_key'])
20-
ai_suggestions = ai_reviewer.get_optimization_suggestion(code_snippet)
44+
# AI增强分析
45+
if ai:
46+
ai_handler = AIModelHandler()
47+
ai_comment = ai_handler.get_code_review(code, {'findings': findings})
48+
click.echo(f"\nAI审查建议:\n{ai_comment}")
2149

22-
# 生成报告
23-
generate_report({
24-
'static_analysis': analyzer.findings,
25-
'style_violations': style_violations,
26-
'ai_suggestions': ai_suggestions
27-
})
50+
# 输出格式化结果
51+
click.echo("\n静态审查结果:")
52+
for issue in findings:
53+
click.secho(f"[{issue['severity'].upper()}] Line {issue['line']}: {issue['message']}",
54+
fg='red' if issue['severity'] == 'critical' else 'yellow')

‎ai_review/config/model_config.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
openai:
2+
api_key: ${OPENAI_API_KEY}
3+
model_name: gpt-4-turbo
4+
temperature: 0.3
5+
max_tokens: 1024
6+
anthropic:
7+
api_key: ${ANTHROPIC_API_KEY}
8+
model_name: claude-3-opus
9+
local_models:
10+
codegen2_path: /models/codegen2-3.7B

‎ai_review/core/analyzer.py

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from typing import Dict, List
2+
from .parser import ASTParser
3+
4+
import importlib
5+
from typing import Dict, List, Callable
6+
7+
class CodeAnalyzer:
8+
def __init__(self, rule_modules: List[str]):
9+
"""Initialize the analyzer with rule modules."""
10+
if not rule_modules:
11+
raise ValueError("No rule modules specified")
12+
self.rules = self._load_rules(rule_modules)
13+
14+
def _load_rules(self, modules: List[str]) -> Dict[str, Callable]:
15+
"""动态加载规则检测器"""
16+
rules = {}
17+
for module_name in modules:
18+
try:
19+
module = importlib.import_module(module_name)
20+
for rule_name in dir(module):
21+
if rule_name.startswith('_'):
22+
continue
23+
rule = getattr(module, rule_name)
24+
if callable(rule):
25+
rules[rule_name] = rule
26+
except ImportError as e:
27+
raise ImportError(f"Failed to import rule module {module_name}: {e}")
28+
except Exception as e:
29+
raise RuntimeError(f"Error loading rules from {module_name}: {e}")
30+
return rules
31+
def analyze(self, code: str) -> List[dict]:
32+
"""执行多维度代码审查"""
33+
try:
34+
ast_parser = ASTParser(code)
35+
except ValueError as e:
36+
return [{
37+
'rule': 'syntax_check',
38+
'message': str(e),
39+
'severity': 'critical',
40+
'line': 0
41+
}]
42+
43+
findings = []
44+
45+
# 执行静态规则检查
46+
for rule_name, check_func in self.rules.items():
47+
try:
48+
if issues := check_func(ast_parser.tree):
49+
findings.extend({
50+
'rule': rule_name,
51+
'message': issue['msg'],
52+
'severity': issue['level'],
53+
'line': issue['lineno'],
54+
'code_snippet': ast_parser.raw_code.splitlines()[issue['lineno']-1]
55+
} for issue in issues)
56+
except Exception as e:
57+
findings.append({
58+
'rule': rule_name,
59+
'message': f"Rule execution failed: {e}",
60+
'severity': 'error',
61+
'line': 0
62+
})
63+
64+
return findings

‎ai_review/core/model_adapter.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import openai
2+
from configparser import ConfigParser
3+
4+
class AIModelHandler:
5+
def __init__(self, config_path='model_config.yaml'):
6+
self.config = self._load_config(config_path)
7+
8+
def _load_config(self, path):
9+
with open(path) as f:
10+
return yaml.safe_load(f)
11+
12+
def get_code_review(self, code_snippet: str, context: dict) -> str:
13+
"""调用AI模型进行语义级审查"""
14+
prompt = f"""作为资深Python架构师,请审查以下代码:
15+
{code_snippet}
16+
审查重点:
17+
1. 架构设计合理性
18+
2. 异常处理完整性
19+
3. 性能优化空间
20+
4. 安全合规性
21+
请用中文按严重性分级输出建议"""
22+
23+
response = openai.ChatCompletion.create(
24+
model=self.config['openai']['model_name'],
25+
messages=[{"role": "user", "content": prompt}],
26+
temperature=self.config['openai']['temperature']
27+
)
28+
return response.choices[0].message.content

‎ai_review/core/parser.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import ast
2+
import astunparse
3+
4+
class ASTParser:
5+
def __init__(self, source_code: str):
6+
self.raw_code = source_code
7+
try:
8+
self.tree = ast.parse(source_code)
9+
except SyntaxError as e:
10+
raise ValueError(f"Invalid Python syntax at line {e.lineno}: {e.msg}")
11+
def get_function_defs(self) -> list:
12+
"""提取所有函数定义元数据"""
13+
return [{
14+
'name': node.name,
15+
'args': [arg.arg for arg in node.args.args],
16+
'lineno': node.lineno,
17+
'docstring': ast.get_docstring(node)
18+
} for node in ast.walk(self.tree) if isinstance(node, ast.FunctionDef)]
19+
20+
def get_class_hierarchy(self) -> list:
21+
"""解析类继承结构"""
22+
return [{
23+
'name': node.name,
24+
'bases': [base.id for base in node.bases if isinstance(base, ast.Name)],
25+
'methods': [n.name for n in node.body if isinstance(n, ast.FunctionDef)]
26+
} for node in ast.walk(self.tree) if isinstance(node, ast.ClassDef)]
27+
28+
def code_visualization(self) -> str:
29+
"""生成AST可视化结构"""
30+
return astunparse.dump(self.tree)

‎ai_review/rules/security_rules.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import ast
2+
3+
def detect_unsafe_functions(tree) -> list:
4+
"""识别高危函数调用"""
5+
unsafe_calls = {
6+
'eval', 'exec', 'pickle.loads', 'pickle.load',
7+
'subprocess.call', 'subprocess.Popen', 'os.system',
8+
'tempfile.mktemp' # Use mkstemp instead
9+
}
10+
return [{
11+
'msg': f"检测到不安全函数调用: {node.func.id}",
12+
'level': 'critical',
13+
'lineno': node.lineno
14+
} for node in ast.walk(tree)
15+
if isinstance(node, ast.Call)
16+
and hasattr(node.func, 'id')
17+
and node.func.id in unsafe_calls]
18+
19+
def check_hardcoded_secrets(tree) -> list:
20+
"""检测硬编码密钥"""
21+
secret_patterns = {'password', 'secret_key', 'api_key'}
22+
return [{
23+
'msg': f"疑似硬编码凭证: {node.targets[0].id}",
24+
'level': 'high',
25+
'lineno': node.lineno
26+
} for node in ast.walk(tree)
27+
if isinstance(node, ast.Assign)
28+
and any(pattern in node.targets[0].id.lower() for pattern in secret_patterns)]

‎requirements.txt

+9-3
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ tree-sitter==0.20.2
1414
# AI模型集成
1515
transformers==4.37.2
1616
torch==2.2.0
17-
openai==1.12.0
18-
1917
# 代码格式化
2018
black==23.12.0
2119
isort==5.13.2
2220

2321
# 项目构建
2422
setuptools==69.0.3
25-
wheel==0.42.0
23+
wheel==0.42.0
24+
25+
astunparse==1.6.3
26+
PyYAML==6.0.1
27+
openai>=1.12.0
28+
click==8.1.7
29+
tqdm==4.66.2
30+
requests==2.31.0
31+
typing_extensions==4.9.0

0 commit comments

Comments
 (0)
Please sign in to comment.