-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpush_worker.py
More file actions
204 lines (172 loc) · 6.72 KB
/
Copy pathpush_worker.py
File metadata and controls
204 lines (172 loc) · 6.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from __future__ import annotations
import argparse
import os
import smtplib
import sys
from datetime import datetime, timezone
from email.message import EmailMessage
from zoneinfo import ZoneInfo
from ai_service import generate_push_summary, translate_papers
from email_renderer import render_email_html
from literature_push import (
fetch_pubmed_details,
render_markdown,
score_paper,
search_pubmed,
)
from subscription_store import (
get_last_run_key,
list_active_subscriptions,
mark_seen,
record_push_run,
unseen_pmids,
)
UTC = timezone.utc
def dotenv_value(key: str) -> str:
env_path = os.path.join(os.path.dirname(__file__), ".env")
if not os.path.exists(env_path):
return ""
with open(env_path, encoding="utf-8") as env_file:
for line in env_file:
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
name, value = stripped.split("=", 1)
name = name.strip().lstrip("\ufeff")
if name == key:
return value.strip().strip('"').strip("'")
return ""
def env_value(key: str) -> str:
value = os.environ.get(key, "")
if value:
return value
value = dotenv_value(key)
if value or sys.platform != "win32":
return value
try:
import winreg
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Environment") as env_key:
registry_value, _ = winreg.QueryValueEx(env_key, key)
return str(registry_value)
except OSError:
return ""
def smtp_config() -> dict[str, str | int]:
required = {
"SMTP_HOST": env_value("SMTP_HOST"),
"SMTP_USERNAME": env_value("SMTP_USERNAME"),
"SMTP_PASSWORD": env_value("SMTP_PASSWORD"),
"SMTP_FROM": env_value("SMTP_FROM"),
}
missing = [key for key, value in required.items() if not value]
if missing:
raise RuntimeError(f"Missing SMTP environment variables: {', '.join(missing)}")
return {
**required,
"SMTP_PORT": int(env_value("SMTP_PORT") or "587"),
"SMTP_USE_SSL": env_value("SMTP_USE_SSL").lower() in {"1", "true", "yes"},
"PUBLIC_BASE_URL": env_value("PUBLIC_BASE_URL") or "http://127.0.0.1:5000",
}
def send_push_email(to_addr: str, subject: str, markdown: str, html_body: str) -> None:
cfg = smtp_config()
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = str(cfg["SMTP_FROM"])
msg["To"] = to_addr
msg.set_content(markdown)
msg.add_alternative(html_body, subtype="html")
smtp_cls = smtplib.SMTP_SSL if cfg["SMTP_USE_SSL"] else smtplib.SMTP
with smtp_cls(str(cfg["SMTP_HOST"]), int(cfg["SMTP_PORT"])) as smtp:
if not cfg["SMTP_USE_SSL"]:
smtp.starttls()
smtp.login(str(cfg["SMTP_USERNAME"]), str(cfg["SMTP_PASSWORD"]))
smtp.send_message(msg)
def push_settings(config: dict) -> dict:
return config.get(
"push",
{"frequency": "daily", "time": "08:30", "weekday": "1", "timezone": "Asia/Shanghai"},
)
def scheduled_run_key(config: dict, now_utc: datetime | None = None) -> str | None:
settings = push_settings(config)
frequency = settings.get("frequency", "daily")
if frequency == "off":
return None
tz = ZoneInfo(settings.get("timezone", "Asia/Shanghai"))
now_local = (now_utc or datetime.now(UTC)).astimezone(tz)
push_time = settings.get("time", "08:30")
if now_local.strftime("%H:%M") < push_time:
return None
if frequency == "weekdays" and now_local.isoweekday() > 5:
return None
if frequency == "weekly" and str(now_local.isoweekday()) != str(settings.get("weekday", "1")):
return None
if frequency == "weekly":
return f"weekly:{now_local.strftime('%G-W%V')}:{push_time}"
return f"{frequency}:{now_local.strftime('%Y-%m-%d')}:{push_time}"
def subscription_is_due(subscription, now_utc: datetime | None = None) -> tuple[bool, str | None]:
run_key = scheduled_run_key(subscription.config, now_utc)
if run_key is None:
return False, None
return get_last_run_key(subscription.id) != run_key, run_key
def push_subscription(
subscription,
dry_run: bool = False,
force: bool = False,
mark_delivered: bool = True,
respect_seen: bool = True,
) -> int:
config = subscription.config
pmids = search_pubmed(
config["search"]["pubmed_query"],
int(config["search"]["days_back"]),
int(config["search"]["max_results"]),
)
target_pmids = unseen_pmids(subscription.id, pmids) if respect_seen else pmids
if not target_pmids:
return 0
papers = fetch_pubmed_details(target_pmids)
ranked = sorted(
(score_paper(paper, config["ranking"]) for paper in papers),
key=lambda p: (p.score, p.pub_date, p.pmid),
reverse=True,
)[: int(config["report"]["top_n"])]
if not ranked:
return 0
ranked = translate_papers(ranked, config)
push_summary = generate_push_summary(ranked, config)
generated_at = datetime.now(UTC)
markdown = render_markdown(ranked, config, generated_at, push_summary)
base_url = (env_value("PUBLIC_BASE_URL") or "http://127.0.0.1:5000").rstrip("/")
unsubscribe_url = f"{base_url}/unsubscribe/{subscription.token}"
markdown += f"\n\n---\n退订:{unsubscribe_url}\n"
html_body = render_email_html(ranked, config, generated_at, unsubscribe_url, push_summary)
prefix = "[测试推送]" if force else "[文献推送]"
subject = f"{prefix} {config['profile']['stage']} {datetime.now().strftime('%Y-%m-%d')}"
if dry_run:
print(f"DRY RUN: would send {len(ranked)} papers to {subscription.email}")
else:
send_push_email(subscription.email, subject, markdown, html_body)
if mark_delivered:
mark_seen(subscription.id, [paper.pmid for paper in ranked])
return len(ranked)
def push_once(dry_run: bool = False) -> int:
sent = 0
now_utc = datetime.now(UTC)
for subscription in list_active_subscriptions():
due, run_key = subscription_is_due(subscription, now_utc)
if not due:
continue
count = push_subscription(subscription, dry_run=dry_run)
if count:
if not dry_run and run_key:
record_push_run(subscription.id, run_key)
sent += 1
return sent
def main() -> int:
parser = argparse.ArgumentParser(description="Run scheduled literature pushes.")
parser.add_argument("--dry-run", action="store_true", help="Do not send email or mark papers seen.")
args = parser.parse_args()
sent = push_once(dry_run=args.dry_run)
print(f"Push jobs completed. Emails sent: {sent}")
return 0
if __name__ == "__main__":
raise SystemExit(main())