66
77import sqlite3
88import json
9+ import argparse
910from pathlib import Path
1011from typing import Dict , List , Any
1112from collections import defaultdict
@@ -127,6 +128,82 @@ def analyze_database_structure(self, db_path: Path) -> Dict[str, Any]:
127128
128129 try :
129130 cursor = conn .cursor ()
131+
132+ def parse_columns_from_create_sql (create_sql : str ) -> list [tuple [str , str ]]:
133+ """
134+ 从建表 SQL 中尽力解析列名(用于 FTS5/缺失 tokenizer 扩展导致 PRAGMA 失败的情况)。
135+ 返回 (name, type);类型缺失时默认 TEXT。
136+ """
137+ out : list [tuple [str , str ]] = []
138+ if not create_sql :
139+ return out
140+ try :
141+ start = create_sql .find ("(" )
142+ end = create_sql .rfind (")" )
143+ if start == - 1 or end == - 1 or end <= start :
144+ return out
145+ inner = create_sql [start + 1 :end ]
146+
147+ parts : list [str ] = []
148+ buf = ""
149+ depth = 0
150+ for ch in inner :
151+ if ch == "(" :
152+ depth += 1
153+ elif ch == ")" :
154+ depth -= 1
155+ if ch == "," and depth == 0 :
156+ parts .append (buf .strip ())
157+ buf = ""
158+ else :
159+ buf += ch
160+ if buf .strip ():
161+ parts .append (buf .strip ())
162+
163+ for part in parts :
164+ token = part .strip ()
165+ if not token :
166+ continue
167+ low = token .lower ()
168+ # 跳过约束/外键等
169+ if low .startswith (("constraint" , "primary" , "unique" , "foreign" , "check" )):
170+ continue
171+ # fts5 选项(tokenize/prefix/content/content_rowid 等)
172+ if "=" in token :
173+ key = token .split ("=" , 1 )[0 ].strip ().lower ()
174+ if key in ("tokenize" , "prefix" , "content" , "content_rowid" , "compress" , "uncompress" ):
175+ continue
176+ tokens = token .split ()
177+ if not tokens :
178+ continue
179+ name = tokens [0 ].strip ("`\" []" )
180+ typ = tokens [1 ].upper () if len (tokens ) > 1 and "=" not in tokens [1 ] else "TEXT"
181+ out .append ((name , typ ))
182+ except Exception :
183+ return out
184+ return out
185+
186+ def get_table_columns (table_name : str ) -> list [tuple [str , str ]]:
187+ # 先尝试 PRAGMA
188+ try :
189+ cursor .execute (f"PRAGMA table_info({ table_name } )" )
190+ columns = cursor .fetchall ()
191+ if columns :
192+ return [(col [1 ], col [2 ]) for col in columns ]
193+ except Exception :
194+ pass
195+
196+ # 兜底:从 sqlite_master.sql 解析
197+ try :
198+ cursor .execute (
199+ "SELECT sql FROM sqlite_master WHERE type='table' AND name=?" ,
200+ (table_name ,),
201+ )
202+ row = cursor .fetchone ()
203+ create_sql = row [0 ] if row and len (row ) > 0 else ""
204+ return parse_columns_from_create_sql (create_sql or "" )
205+ except Exception :
206+ return []
130207
131208 # 获取所有表名
132209 cursor .execute ("SELECT name FROM sqlite_master WHERE type='table'" )
@@ -152,13 +229,10 @@ def analyze_database_structure(self, db_path: Path) -> Dict[str, Any]:
152229 table_key = f"{ prefix } _*" # 使用模式名
153230
154231 # 获取代表表的字段信息
155- cursor .execute (f"PRAGMA table_info({ representative_table } )" )
156- columns = cursor .fetchall ()
232+ columns = get_table_columns (representative_table )
157233
158234 fields = {}
159- for col in columns :
160- field_name = col [1 ]
161- field_type = col [2 ]
235+ for field_name , field_type in columns :
162236 fields [field_name ] = {
163237 "type" : field_type ,
164238 "meaning" : "" , # 留空供用户填写
@@ -188,13 +262,10 @@ def analyze_database_structure(self, db_path: Path) -> Dict[str, Any]:
188262
189263 try :
190264 # 获取表字段信息
191- cursor .execute (f"PRAGMA table_info({ table_name } )" )
192- columns = cursor .fetchall ()
265+ columns = get_table_columns (table_name )
193266
194267 fields = {}
195- for col in columns :
196- field_name = col [1 ]
197- field_type = col [2 ]
268+ for field_name , field_type in columns :
198269 fields [field_name ] = {
199270 "type" : field_type ,
200271 "meaning" : "" , # 留空供用户填写
@@ -219,16 +290,23 @@ def analyze_database_structure(self, db_path: Path) -> Dict[str, Any]:
219290 finally :
220291 conn .close ()
221292
222- def generate_template (self , output_file : str = "wechat_db_config_template.json" ):
293+ def generate_template (
294+ self ,
295+ output_file : str = "wechat_db_config_template.json" ,
296+ * ,
297+ include_excluded : bool = False ,
298+ include_message_shards : bool = False ,
299+ exclude_db_stems : set [str ] | None = None ,
300+ ):
223301 """生成配置模板"""
224302 print ("开始生成微信数据库配置模板..." )
225303
226304 # 定义要排除的数据库模式和描述
227- excluded_patterns = {
228- r'biz_message_\d+\.db$' : '企业微信聊天记录数据库' ,
229- r'bizchat\.db$' : '企业微信联系人数据库 ' ,
230- r'contact_fts\.db$' : '搜索联系人数据库 ' ,
231- r'favorite_fts\.db$' : '搜索收藏数据库 '
305+ excluded_patterns = {} if include_excluded else {
306+ r'biz_message_\d+\.db$' : '公众号/ 企业微信聊天记录数据库(通常不参与个人聊天分析) ' ,
307+ r'bizchat\.db$' : '企业微信联系人/会话数据库(通常不参与个人聊天分析) ' ,
308+ r'contact_fts\.db$' : '联系人搜索索引数据库(FTS) ' ,
309+ r'favorite_fts\.db$' : '收藏搜索索引数据库(FTS) '
232310 }
233311
234312 # 查找所有数据库文件
@@ -263,29 +341,38 @@ def generate_template(self, output_file: str = "wechat_db_config_template.json")
263341 for excluded_file , description in excluded_files :
264342 print (f" - { excluded_file .name } ({ description } )" )
265343
344+ # 显式排除指定 stem(不含 .db)
345+ if exclude_db_stems :
346+ before = len (db_files )
347+ db_files = [p for p in db_files if p .stem not in exclude_db_stems ]
348+ after = len (db_files )
349+ if before != after :
350+ print (f"\n 按 --exclude-db-stem 排除 { before - after } 个数据库: { sorted (exclude_db_stems )} " )
351+
266352 print (f"\n 实际处理 { len (db_files )} 个数据库文件" )
267353
268354 # 过滤message数据库,只保留倒数第二个(与主脚本逻辑一致)
269- message_numbered_dbs = []
270- message_other_dbs = []
271-
272- for db in db_files :
273- if re .match (r'message_\d+$' , db .stem ): # message_{数字}.db
274- message_numbered_dbs .append (db )
275- elif db .stem .startswith ('message_' ): # message_fts.db, message_resource.db等
276- message_other_dbs .append (db )
277-
278- if len (message_numbered_dbs ) > 1 :
279- # 按数字编号排序(提取数字进行排序)
280- message_numbered_dbs .sort (key = lambda x : int (re .search (r'message_(\d+)' , x .stem ).group (1 )))
281- # 选择倒数第二个(按编号排序)
282- selected_message_db = message_numbered_dbs [- 2 ] # 倒数第二个
283- print (f"检测到 { len (message_numbered_dbs )} 个message_{{数字}}.db数据库" )
284- print (f"选择倒数第二个: { selected_message_db .name } " )
285-
286- # 从db_files中移除其他message_{数字}.db数据库,但保留message_fts.db等
287- db_files = [db for db in db_files if not re .match (r'message_\d+$' , db .stem )]
288- db_files .append (selected_message_db )
355+ if not include_message_shards :
356+ message_numbered_dbs = []
357+ message_other_dbs = []
358+
359+ for db in db_files :
360+ if re .match (r'message_\d+$' , db .stem ): # message_{数字}.db
361+ message_numbered_dbs .append (db )
362+ elif db .stem .startswith ('message_' ): # message_fts.db, message_resource.db等
363+ message_other_dbs .append (db )
364+
365+ if len (message_numbered_dbs ) > 1 :
366+ # 按数字编号排序(提取数字进行排序)
367+ message_numbered_dbs .sort (key = lambda x : int (re .search (r'message_(\d+)' , x .stem ).group (1 )))
368+ # 选择倒数第二个(按编号排序)
369+ selected_message_db = message_numbered_dbs [- 2 ] # 倒数第二个
370+ print (f"检测到 { len (message_numbered_dbs )} 个message_{{数字}}.db数据库" )
371+ print (f"选择倒数第二个: { selected_message_db .name } " )
372+
373+ # 从db_files中移除其他message_{数字}.db数据库,但保留message_fts.db等
374+ db_files = [db for db in db_files if not re .match (r'message_\d+$' , db .stem )]
375+ db_files .append (selected_message_db )
289376
290377 print (f"实际分析 { len (db_files )} 个数据库文件" )
291378
@@ -370,11 +457,24 @@ def generate_template(self, output_file: str = "wechat_db_config_template.json")
370457
371458def main ():
372459 """主函数"""
460+ parser = argparse .ArgumentParser (description = "微信数据库字段配置模板生成器" )
461+ parser .add_argument ("--databases-path" , default = "output/databases" , help = "解密后的数据库根目录(按账号分目录)" )
462+ parser .add_argument ("--output" , default = "wechat_db_config_template.json" , help = "输出 JSON 模板路径" )
463+ parser .add_argument ("--include-excluded" , action = "store_true" , help = "包含默认会被排除的数据库(如 bizchat/contact_fts/favorite_fts 等)" )
464+ parser .add_argument ("--include-message-shards" , action = "store_true" , help = "包含所有 message_{n}.db(否则仅保留倒数第二个作代表)" )
465+ parser .add_argument ("--exclude-db-stem" , action = "append" , default = [], help = "按 stem(不含 .db)排除数据库,可重复,例如: --exclude-db-stem digital_twin" )
466+ args = parser .parse_args ()
467+
373468 print ("微信数据库配置模板生成器" )
374469 print ("=" * 50 )
375-
376- generator = ConfigTemplateGenerator ()
377- generator .generate_template ()
470+
471+ generator = ConfigTemplateGenerator (databases_path = args .databases_path )
472+ generator .generate_template (
473+ output_file = args .output ,
474+ include_excluded = bool (args .include_excluded ),
475+ include_message_shards = bool (args .include_message_shards ),
476+ exclude_db_stems = set (args .exclude_db_stem or []),
477+ )
378478
379479if __name__ == "__main__" :
380- main ()
480+ main ()
0 commit comments