-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathknowflow.py
More file actions
291 lines (242 loc) · 10.2 KB
/
knowflow.py
File metadata and controls
291 lines (242 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import argparse
import logging
from dataclasses import dataclass
from core.config import CONFIG
from core.infra.file_store import FileStore
from core.infra.index_repository import IndexRepository
from core.infra.llm_client import LLMClient
from core.infra.prompt_registry import PromptRegistry
from core.parsers import SafeParser
from core.services.audit_service import AuditService
from core.services.cleaner import CleanerService
from core.services.compiler import CompilerService
from core.services.indexer import IndexPolicyService
from core.services.log_service import LogService
from core.services.query_service import QueryService
from core.services.research_draft_service import ResearchDraftService
from core.services.source_page import SourcePageService
from core.services.splitter import SplitService
from core.services.topicer import TopicPageService
from helper import print_help
logger = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True)
class AppServices:
cleaner: CleanerService
compiler: CompilerService
policy: IndexPolicyService
split_service: SplitService
topic_page: TopicPageService
source_page: SourcePageService
log_service: LogService
audit_service: AuditService
research_draft_service: ResearchDraftService
query_service: QueryService
def _setup_logging() -> None:
"""根据配置初始化日志,保证全链路输出可追踪。"""
level = getattr(logging, CONFIG.log_level, logging.INFO)
if not isinstance(level, int):
level = logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)
def build_app_services() -> AppServices:
"""创建并返回 knowflow 主流程所需的服务实例。"""
file_store = FileStore()
index_repository = IndexRepository(CONFIG, file_store)
llm_client = LLMClient(CONFIG)
prompt_registry = PromptRegistry(CONFIG)
return AppServices(
cleaner=CleanerService(CONFIG, index_repository, llm_client, prompt_registry, file_store, SafeParser),
compiler=CompilerService(CONFIG, index_repository, llm_client, prompt_registry, file_store, SafeParser),
policy=IndexPolicyService(index_repository, llm_client, prompt_registry, SafeParser),
split_service=SplitService(index_repository, llm_client, prompt_registry, SafeParser),
topic_page=TopicPageService(CONFIG, index_repository, llm_client, prompt_registry, file_store),
source_page=SourcePageService(CONFIG, file_store, index_repository),
log_service=LogService(CONFIG, file_store),
audit_service=AuditService(CONFIG, index_repository, file_store),
research_draft_service=ResearchDraftService(CONFIG, index_repository, llm_client, prompt_registry, file_store),
query_service=QueryService(CONFIG, file_store),
)
def _run_private_pipeline(svc: AppServices, *, no_split: bool) -> None:
svc.cleaner.clean()
svc.policy.reconcile_index()
if not no_split:
svc.split_service.split()
svc.topic_page.build_topics()
svc.source_page.build_sources()
try:
svc.compiler.build_private_graph()
except Exception:
logger.exception("private knowledge graph 生成失败")
def _run_private_incremental_pipeline(svc: AppServices, *, raw_paths: list[str], no_split: bool) -> None:
svc.cleaner.clean_paths(raw_paths)
svc.policy.reconcile_index()
if not no_split:
svc.split_service.split()
svc.topic_page.build_topics()
svc.source_page.build_sources()
try:
svc.compiler.build_private_graph()
except Exception:
logger.exception("private knowledge graph 生成失败")
def _run_public_pipeline(svc: AppServices, *, command_name: str, write_log: bool) -> dict:
svc.compiler.compile()
payload = svc.audit_service.build_report()
if write_log:
svc.log_service.append_run(command_name)
return payload
def _run_only_pipeline(svc: AppServices, *, raw_path: str, command_name: str) -> None:
svc.cleaner.clean_paths([raw_path])
svc.policy.reconcile_index()
svc.topic_page.build_topics()
svc.source_page.build_sources()
try:
svc.compiler.build_private_graph()
except Exception:
logger.exception("private knowledge graph 生成失败")
_run_public_pipeline(svc, command_name=command_name, write_log=True)
def _run_build(svc: AppServices, *, no_split: bool, command_name: str) -> None:
_run_private_pipeline(svc, no_split=no_split)
payload = _run_public_pipeline(svc, command_name=command_name, write_log=False)
research_manifest = svc.research_draft_service.materialize_from_audit(payload)
created_raw_paths = [
str(item.get("raw_path") or "").strip()
for item in research_manifest.get("created", [])
if isinstance(item, dict) and str(item.get("raw_path") or "").strip()
]
if created_raw_paths:
_run_private_incremental_pipeline(svc, raw_paths=created_raw_paths, no_split=no_split)
_run_public_pipeline(svc, command_name=command_name, write_log=False)
svc.log_service.append_run(command_name)
def _run_lint(svc: AppServices) -> int:
payload = svc.audit_service.build_report()
findings = payload.get("findings", {}) if isinstance(payload, dict) else {}
loop = payload.get("loop", {}) if isinstance(payload, dict) else {}
blocking_items = [
("raw_without_meta", "raw 文档缺少 metadata"),
("posts_missing_concepts", "文章缺少 concepts"),
("posts_missing_sources", "文章缺少 sources"),
("concept_refs_without_pages", "frontmatter 引用了不存在的概念页"),
("source_refs_without_pages", "frontmatter 引用了不存在的来源页"),
]
warning_items = [
("unused_sources", "来源页未进入任何 post"),
("oversized_topics", "topic 仍然偏大"),
("topics_without_summary", "topic 缺少 summary"),
("single_source_dense_topics", "单来源高密度主题"),
]
blocking_count = 0
warning_count = 0
print("lint summary")
for key, label in blocking_items:
items = findings.get(key, [])
count = len(items) if isinstance(items, list) else 0
blocking_count += count
print(f"- blocking {label}: {count}")
for key, label in warning_items:
items = findings.get(key, [])
count = len(items) if isinstance(items, list) else 0
warning_count += count
print(f"- warning {label}: {count}")
for stage in ("ingest", "governance", "linking", "insight", "research"):
info = loop.get(stage, {})
print(f"- stage {stage}: {info.get('status', 'unknown')}")
if blocking_count > 0:
if len(findings.get("raw_without_meta", [])) > 0:
print("- hint: 先执行 python3 knowflow.py build 重新生成 metadata")
print("lint failed")
return 1
print("lint ok" if warning_count == 0 else "lint ok with warnings")
return 0
def _run_build_target(target: str, svc: AppServices, *, no_split: bool) -> int:
normalized = str(target or "all").strip().lower()
command_name = "build" if normalized == "all" else f"build {normalized}"
if normalized == "all":
_run_build(svc, no_split=no_split, command_name=command_name)
return 0
if normalized == "private":
_run_private_pipeline(svc, no_split=no_split)
svc.log_service.append_run(command_name)
return 0
if normalized == "public":
_run_public_pipeline(svc, command_name=command_name, write_log=True)
return 0
if normalized == "clean":
svc.cleaner.clean()
return 0
if normalized == "index":
svc.policy.reconcile_index()
return 0
if normalized == "split":
svc.split_service.split()
return 0
if normalized == "topics":
svc.topic_page.build_topics()
return 0
if normalized == "sources":
svc.source_page.build_sources()
return 0
if normalized == "compile":
svc.compiler.compile()
return 0
if normalized == "audit":
svc.audit_service.build_report()
return 0
if normalized == "graph":
svc.compiler.build_graph()
return 0
if normalized == "log":
svc.log_service.append_run(command_name)
return 0
print(f"unknown build target: {target}")
print_help()
return 1
def main() -> None:
"""CLI 入口。"""
_setup_logging()
svc = build_app_services()
p = argparse.ArgumentParser(add_help=False, prog="knowflow")
p.add_argument("cmd", nargs="?")
p.add_argument("--no-split", action="store_true", help="build 时跳过 split")
p.add_argument("--only", metavar="PATH", help="增量构建:只处理指定的 raw 目录")
args, remaining = p.parse_known_args()
args.rest = remaining
if not args.cmd:
print_help()
return
logger.debug("执行命令: %s", args.cmd)
if args.cmd == "build":
if args.only:
if args.rest:
print("build --only 不能与 stage 参数一起使用")
print_help()
raise SystemExit(1)
raise SystemExit(
_run_only_pipeline(svc, raw_path=args.only, command_name=f"build --only {args.only}")
)
if len(args.rest) > 1:
print(f"build 只接受一个 stage,收到: {' '.join(args.rest)}")
print_help()
raise SystemExit(1)
raise SystemExit(
_run_build_target(args.rest[0] if args.rest else "all", svc, no_split=args.no_split)
)
elif args.cmd == "lint":
raise SystemExit(_run_lint(svc))
elif args.cmd == "query":
phrase = " ".join(args.rest).strip()
if not phrase:
print("query 需要关键字,例如: python3 knowflow.py query ai agent")
raise SystemExit(1)
raise SystemExit(svc.query_service.print_query(phrase))
elif args.cmd == "research":
payload = svc.audit_service.build_report()
svc.research_draft_service.materialize_from_audit(payload)
svc.log_service.append_run("research")
else:
print(f"unknown command: {args.cmd}")
print_help()
raise SystemExit(1)
if __name__ == "__main__":
main()