-
Notifications
You must be signed in to change notification settings - Fork 150
/
Copy pathvideo_note_generator.py
1090 lines (925 loc) · 43.2 KB
/
video_note_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import sys
import json
import time
import shutil
import re
import subprocess
from typing import Dict, List, Optional, Tuple
import datetime
from pathlib import Path
import random
from itertools import zip_longest
import yt_dlp
import httpx
from unsplash.api import Api as UnsplashApi
from unsplash.auth import Auth as UnsplashAuth
from dotenv import load_dotenv
from bs4 import BeautifulSoup
import whisper
import openai
import argparse
# 加载环境变量
load_dotenv()
# 检查必要的环境变量
required_env_vars = {
'OPENROUTER_API_KEY': '用于OpenRouter API',
'OPENROUTER_API_URL': '用于OpenRouter API',
'OPENROUTER_APP_NAME': '用于OpenRouter API',
'OPENROUTER_HTTP_REFERER': '用于OpenRouter API',
'UNSPLASH_ACCESS_KEY': '用于图片搜索',
'UNSPLASH_SECRET_KEY': '用于Unsplash认证',
'UNSPLASH_REDIRECT_URI': '用于Unsplash回调'
}
missing_env_vars = []
for var, desc in required_env_vars.items():
if not os.getenv(var):
missing_env_vars.append(f" - {var} ({desc})")
if missing_env_vars:
print("注意:以下环境变量未设置:")
print("\n".join(missing_env_vars))
print("\n将使用基本功能继续运行(无AI优化和图片)。")
print("如需完整功能,请在 .env 文件中设置相应的 API 密钥。")
print("继续处理...\n")
# 配置代理
http_proxy = os.getenv('HTTP_PROXY')
https_proxy = os.getenv('HTTPS_PROXY')
proxies = {
'http': http_proxy,
'https': https_proxy
} if http_proxy and https_proxy else None
# 禁用 SSL 验证(仅用于开发环境)
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
# OpenRouter configuration
openrouter_api_key = os.getenv('OPENROUTER_API_KEY')
openrouter_app_name = os.getenv('OPENROUTER_APP_NAME', 'video_note_generator')
openrouter_http_referer = os.getenv('OPENROUTER_HTTP_REFERER', 'https://github.com')
openrouter_available = False
# 配置 OpenAI API
client = openai.OpenAI(
api_key=openrouter_api_key,
base_url="https://openrouter.ai/api/v1",
default_headers={
"HTTP-Referer": openrouter_http_referer,
"X-Title": openrouter_app_name,
}
)
# 选择要使用的模型
AI_MODEL = "google/gemini-pro" # 使用 Gemini Pro 模型
# Test OpenRouter connection
if openrouter_api_key:
try:
print(f"正在测试 OpenRouter API 连接...")
response = client.models.list() # 使用更简单的API调用来测试连接
print("✅ OpenRouter API 连接测试成功")
openrouter_available = True
except Exception as e:
print(f"⚠️ OpenRouter API 连接测试失败: {str(e)}")
print("将继续尝试使用API,但可能会遇到问题")
# 检查Unsplash配置
unsplash_access_key = os.getenv('UNSPLASH_ACCESS_KEY')
unsplash_client = None
if unsplash_access_key:
try:
auth = UnsplashAuth(
client_id=unsplash_access_key,
client_secret=None,
redirect_uri=None
)
unsplash_client = UnsplashApi(auth)
print("✅ Unsplash API 配置成功")
except Exception as e:
print(f"❌ Failed to initialize Unsplash client: {str(e)}")
# 检查ffmpeg
ffmpeg_path = None
try:
subprocess.run(["/opt/homebrew/bin/ffmpeg", "-version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print("✅ ffmpeg is available at /opt/homebrew/bin/ffmpeg")
ffmpeg_path = "/opt/homebrew/bin/ffmpeg"
except Exception:
try:
subprocess.run(["ffmpeg", "-version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print("✅ ffmpeg is available (from PATH)")
ffmpeg_path = "ffmpeg"
except Exception as e:
print(f"⚠️ ffmpeg not found: {str(e)}")
class DownloadError(Exception):
"""自定义下载错误类"""
def __init__(self, message: str, platform: str, error_type: str, details: str = None):
self.message = message
self.platform = platform
self.error_type = error_type
self.details = details
super().__init__(self.message)
class VideoNoteGenerator:
def __init__(self, output_dir: str = "temp_notes"):
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
self.openrouter_available = openrouter_available
self.unsplash_client = unsplash_client
self.ffmpeg_path = ffmpeg_path
# 初始化whisper模型
print("正在加载Whisper模型...")
self.whisper_model = None
try:
self.whisper_model = whisper.load_model("medium")
print("✅ Whisper模型加载成功")
except Exception as e:
print(f"⚠️ Whisper模型加载失败: {str(e)}")
print("将在需要时重试加载")
# 日志目录
self.log_dir = os.path.join(self.output_dir, 'logs')
os.makedirs(self.log_dir, exist_ok=True)
# cookie目录
self.cookie_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cookies')
os.makedirs(self.cookie_dir, exist_ok=True)
# 平台cookie文件
self.platform_cookies = {
'douyin': os.path.join(self.cookie_dir, 'douyin_cookies.txt'),
'bilibili': os.path.join(self.cookie_dir, 'bilibili_cookies.txt'),
'youtube': os.path.join(self.cookie_dir, 'youtube_cookies.txt')
}
def _ensure_whisper_model(self) -> None:
"""确保Whisper模型已加载"""
if self.whisper_model is None:
try:
print("正在加载Whisper模型...")
self.whisper_model = whisper.load_model("medium")
print("✅ Whisper模型加载成功")
except Exception as e:
print(f"⚠️ Whisper模型加载失败: {str(e)}")
def _determine_platform(self, url: str) -> Optional[str]:
"""
确定视频平台
Args:
url: 视频URL
Returns:
str: 平台名称 ('youtube', 'douyin', 'bilibili') 或 None
"""
if 'youtube.com' in url or 'youtu.be' in url:
return 'youtube'
elif 'douyin.com' in url:
return 'douyin'
elif 'bilibili.com' in url:
return 'bilibili'
return None
def _handle_download_error(self, error: Exception, platform: str, url: str) -> str:
"""
处理下载错误并返回用户友好的错误消息
Args:
error: 异常对象
platform: 平台名称
url: 视频URL
Returns:
str: 用户友好的错误消息
"""
error_msg = str(error)
if "SSL" in error_msg:
return "⚠️ SSL证书验证失败,请检查网络连接"
elif "cookies" in error_msg.lower():
return f"⚠️ {platform}访问被拒绝,可能需要更新cookie或更换IP地址"
elif "404" in error_msg:
return "⚠️ 视频不存在或已被删除"
elif "403" in error_msg:
return "⚠️ 访问被拒绝,可能需要登录或更换IP地址"
elif "unavailable" in error_msg.lower():
return "⚠️ 视频当前不可用,可能是地区限制或版权问题"
else:
return f"⚠️ 下载失败: {error_msg}"
def _get_platform_options(self, platform: str) -> Dict:
"""获取平台特定的下载选项"""
# 基本选项
options = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': '%(title)s.%(ext)s'
}
if platform in self.platform_cookies and os.path.exists(self.platform_cookies[platform]):
options['cookiefile'] = self.platform_cookies[platform]
return options
def _validate_cookies(self, platform: str) -> bool:
"""验证cookie是否有效"""
if platform not in self.platform_cookies:
return False
cookie_file = self.platform_cookies[platform]
return os.path.exists(cookie_file)
def _get_alternative_download_method(self, platform: str, url: str) -> Optional[str]:
"""获取备用下载方法"""
if platform == 'youtube':
return 'pytube'
elif platform == 'douyin':
return 'requests'
elif platform == 'bilibili':
return 'you-get'
return None
def _download_with_alternative_method(self, platform: str, url: str, temp_dir: str, method: str) -> Optional[str]:
"""使用备用方法下载"""
try:
if method == 'you-get':
cmd = ['you-get', '--no-proxy', '--no-check-certificate', '-o', temp_dir, url]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# 查找下载的文件
files = [f for f in os.listdir(temp_dir) if f.endswith(('.mp4', '.flv', '.webm'))]
if files:
return os.path.join(temp_dir, files[0])
raise Exception(result.stderr)
elif method == 'requests':
# 使用requests直接下载
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# 首先获取页面内容
response = httpx.get(url, headers=headers, verify=False)
if response.status_code == 200:
# 尝试从页面中提取视频URL
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
video_url = None
# 查找video标签
video_tags = soup.find_all('video')
for video in video_tags:
src = video.get('src') or video.get('data-src')
if src:
video_url = src
break
if not video_url:
# 尝试查找其他可能包含视频URL的元素
import re
video_patterns = [
r'https?://[^"\'\s]+\.(?:mp4|m3u8)[^"\'\s]*',
r'playAddr":"([^"]+)"',
r'play_url":"([^"]+)"'
]
for pattern in video_patterns:
matches = re.findall(pattern, response.text)
if matches:
video_url = matches[0]
break
if video_url:
if not video_url.startswith('http'):
video_url = 'https:' + video_url if video_url.startswith('//') else video_url
# 下载视频
video_response = httpx.get(video_url, headers=headers, stream=True, verify=False)
if video_response.status_code == 200:
file_path = os.path.join(temp_dir, 'video.mp4')
with open(file_path, 'wb') as f:
for chunk in video_response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return file_path
raise Exception(f"无法下载视频: HTTP {video_response.status_code}")
raise Exception(f"无法访问页面: HTTP {response.status_code}")
elif method == 'pytube':
# 禁用SSL验证
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
from pytube import YouTube
yt = YouTube(url)
# 获取最高质量的MP4格式视频
video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
if video:
return video.download(output_path=temp_dir)
raise Exception("未找到合适的视频流")
except Exception as e:
print(f"备用下载方法 {method} 失败: {str(e)}")
return None
def _download_video(self, url: str, temp_dir: str) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
"""下载视频并返回音频文件路径和信息"""
try:
platform = self._determine_platform(url)
if not platform:
raise DownloadError("不支持的视频平台", "unknown", "platform_error")
# 基本下载选项
options = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(temp_dir, '%(title)s.%(ext)s'),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
}],
'quiet': True,
'no_warnings': True,
}
# 下载视频
for attempt in range(3): # 最多重试3次
try:
with yt_dlp.YoutubeDL(options) as ydl:
print(f"正在尝试下载(第{attempt + 1}次)...")
info = ydl.extract_info(url, download=True)
if not info:
raise DownloadError("无法获取视频信息", platform, "info_error")
# 找到下载的音频文件
downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.mp3')]
if not downloaded_files:
raise DownloadError("未找到下载的音频文件", platform, "file_error")
audio_path = os.path.join(temp_dir, downloaded_files[0])
if not os.path.exists(audio_path):
raise DownloadError("音频文件不存在", platform, "file_error")
video_info = {
'title': info.get('title', '未知标题'),
'uploader': info.get('uploader', '未知作者'),
'description': info.get('description', ''),
'duration': info.get('duration', 0),
'platform': platform
}
print(f"✅ {platform}视频下载成功")
return audio_path, video_info
except Exception as e:
print(f"⚠️ 下载失败(第{attempt + 1}次): {str(e)}")
if attempt < 2: # 如果不是最后一次尝试
print("等待5秒后重试...")
time.sleep(5)
else:
raise # 最后一次失败,抛出异常
except Exception as e:
error_msg = self._handle_download_error(e, platform, url)
print(f"⚠️ {error_msg}")
return None, None
def _transcribe_audio(self, audio_path: str) -> str:
"""使用Whisper转录音频"""
try:
self._ensure_whisper_model()
if not self.whisper_model:
raise Exception("Whisper模型未加载")
print("正在转录音频(这可能需要几分钟)...")
result = self.whisper_model.transcribe(
audio_path,
language='zh', # 指定中文
task='transcribe',
best_of=5,
initial_prompt="以下是一段视频的转录内容。请用流畅的中文输出。" # 添加中文提示
)
return result["text"].strip()
except Exception as e:
print(f"⚠️ 音频转录失败: {str(e)}")
return ""
def _organize_content(self, content: str) -> str:
"""使用AI整理内容"""
try:
if not self.openrouter_available:
print("⚠️ OpenRouter API 未配置,将返回原始内容")
return content
# 构建系统提示词
system_prompt = """你是一位著名的科普作家和博客作者,著作等身,屡获殊荣,尤其在内容创作领域有深厚的造诣。
请使用 4C 模型(建立联系 Connection、展示冲突 Conflict、强调改变 Change、即时收获 Catch)为转录的文字内容创建结构。
写作要求:
- 从用户的问题出发,引导读者理解核心概念及其背景
- 使用第二人称与读者对话,语气亲切平实
- 确保所有观点和内容基于用户提供的转录文本
- 如无具体实例,则不编造
- 涉及复杂逻辑时,使用直观类比
- 避免内容重复冗余
- 逻辑递进清晰,从问题开始,逐步深入
Markdown格式要求:
- 大标题突出主题,吸引眼球,最好使用疑问句
- 小标题简洁有力,结构清晰,尽量使用单词或短语
- 直入主题,在第一部分清晰阐述问题和需求
- 正文使用自然段,避免使用列表形式
- 内容翔实,避免过度简略,特别注意保留原文中的数据和示例信息
- 如有来源URL,使用文内链接形式
- 保留原文中的Markdown格式图片链接"""
# 构建用户提示词
final_prompt = f"""请根据以下转录文字内容,创作一篇结构清晰、易于理解的博客文章。
转录文字内容:
{content}"""
# 调用API
response = client.chat.completions.create(
model=AI_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": final_prompt}
],
temperature=0.7,
max_tokens=4000
)
if response.choices:
return response.choices[0].message.content.strip()
return content
except Exception as e:
print(f"⚠️ 内容整理失败: {str(e)}")
return content
def split_content(self, text: str, max_chars: int = 2000) -> List[str]:
"""按段落分割文本,保持上下文的连贯性
特点:
1. 保持段落完整性:不会在段落中间断开
2. 保持句子完整性:确保句子不会被截断
3. 添加重叠内容:每个chunk都包含上一个chunk的最后一段
4. 智能分割:对于超长段落,按句子分割并保持完整性
"""
if not text:
return []
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_length = 0
last_paragraph = None # 用于存储上一个chunk的最后一段
for para in paragraphs:
para = para.strip()
if not para: # 跳过空段落
continue
para_length = len(para)
# 如果这是新chunk的开始,且有上一个chunk的最后一段,添加它作为上下文
if not current_chunk and last_paragraph:
current_chunk.append(f"上文概要:\n{last_paragraph}\n")
current_length += len(last_paragraph) + 20 # 加上标题的长度
# 如果单个段落就超过了最大长度,需要按句子分割
if para_length > max_chars:
# 如果当前块不为空,先保存
if current_chunk:
last_paragraph = current_chunk[-1]
chunks.append('\n\n'.join(current_chunk))
current_chunk = []
current_length = 0
if last_paragraph:
current_chunk.append(f"上文概要:\n{last_paragraph}\n")
current_length += len(last_paragraph) + 20
# 按句子分割长段落
sentences = re.split(r'([。!?])', para)
current_sentence = []
current_sentence_length = 0
for i in range(0, len(sentences), 2):
sentence = sentences[i]
# 如果有标点符号,加上标点
if i + 1 < len(sentences):
sentence += sentences[i + 1]
# 如果加上这个句子会超过最大长度,保存当前块并开始新块
if current_sentence_length + len(sentence) > max_chars and current_sentence:
chunks.append(''.join(current_sentence))
current_sentence = [sentence]
current_sentence_length = len(sentence)
else:
current_sentence.append(sentence)
current_sentence_length += len(sentence)
# 保存最后一个句子块
if current_sentence:
chunks.append(''.join(current_sentence))
else:
# 如果加上这个段落会超过最大长度,保存当前块并开始新块
if current_length + para_length > max_chars and current_chunk:
last_paragraph = current_chunk[-1]
chunks.append('\n\n'.join(current_chunk))
current_chunk = []
current_length = 0
if last_paragraph:
current_chunk.append(f"上文概要:\n{last_paragraph}\n")
current_length += len(last_paragraph) + 20
current_chunk.append(para)
current_length += para_length
# 保存最后一个块
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def _organize_long_content(self, content: str, duration: int = 0) -> str:
"""使用AI整理长文内容"""
if not content.strip():
return ""
if not self.openrouter_available:
print("⚠️ OpenRouter API 不可用,将返回原始内容")
return content
content_chunks = self.split_content(content)
organized_chunks = []
print(f"内容将分为 {len(content_chunks)} 个部分进行处理...")
for i, chunk in enumerate(content_chunks, 1):
print(f"正在处理第 {i}/{len(content_chunks)} 部分...")
organized_chunk = self._organize_content(chunk)
organized_chunks.append(organized_chunk)
return "\n\n".join(organized_chunks)
def convert_to_xiaohongshu(self, content: str) -> Tuple[str, List[str], List[str], List[str]]:
"""将博客文章转换为小红书风格的笔记,并生成标题和标签"""
try:
if not self.openrouter_available:
print("⚠️ OpenRouter API 未配置,将返回原始内容")
return content, [], [], []
# 构建系统提示词
system_prompt = """你是一位专业的小红书爆款文案写作大师,擅长将普通内容转换为刷屏级爆款笔记。
请将输入的内容转换为小红书风格的笔记,需要满足以下要求:
1. 标题创作(重要‼️):
- 二极管标题法:
* 追求快乐:产品/方法 + 只需N秒 + 逆天效果
* 逃避痛苦:不采取行动 + 巨大损失 + 紧迫感
- 爆款关键词(必选1-2个):
* 高转化词:好用到哭、宝藏、神器、压箱底、隐藏干货、高级感
* 情感词:绝绝子、破防了、治愈、万万没想到、爆款、永远可以相信
* 身份词:小白必看、手残党必备、打工人、普通女生
* 程度词:疯狂点赞、超有料、无敌、一百分、良心推荐
- 标题规则:
* 字数:20字以内
* emoji:2-4个相关表情
* 标点:感叹号、省略号增强表达
* 风格:口语化、制造悬念
2. 正文创作:
- 开篇设置(抓住痛点):
* 共情开场:描述读者痛点
* 悬念引导:埋下解决方案的伏笔
* 场景还原:具体描述场景
- 内容结构:
* 每段开头用emoji引导
* 重点内容加粗突出
* 适当空行增加可读性
* 步骤说明要清晰
- 写作风格:
* 热情亲切的语气
* 大量使用口语化表达
* 插入互动性问句
* 加入个人经验分享
- 高级技巧:
* 使用平台热梗
* 加入流行口头禅
* 设置悬念和爆点
* 情感共鸣描写
3. 标签优化:
- 提取4类标签(每类1-2个):
* 核心关键词:主题相关
* 关联关键词:长尾词
* 高转化词:购买意向强
* 热搜词:行业热点
4. 整体要求:
- 内容体量:根据内容自动调整
- 结构清晰:善用分点和空行
- 情感真实:避免过度营销
- 互动引导:设置互动机会
- AI友好:避免机器味
注意:创作时要始终记住,标题决定打开率,内容决定完播率,互动决定涨粉率!"""
# 构建用户提示词
user_prompt = f"""请将以下内容转换为爆款小红书笔记。
内容如下:
{content}
请按照以下格式返回:
1. 第一行:爆款标题(遵循二极管标题法,必须有emoji)
2. 空一行
3. 正文内容(注意结构、风格、技巧的运用,控制在600-800字之间)
4. 空一行
5. 标签列表(每类标签都要有,用#号开头)
创作要求:
1. 标题要让人忍不住点进来看
2. 内容要有干货,但表达要轻松
3. 每段都要用emoji装饰
4. 标签要覆盖核心词、关联词、转化词、热搜词
5. 设置2-3处互动引导
6. 通篇要有感情和温度
7. 正文控制在600-800字之间
"""
# 调用API
response = client.chat.completions.create(
model=AI_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.7,
max_tokens=2000
)
if not response.choices:
raise Exception("API 返回结果为空")
# 处理返回的内容
xiaohongshu_content = response.choices[0].message.content.strip()
print(f"\n📝 API返回内容:\n{xiaohongshu_content}\n")
# 提取标题(第一行)
content_lines = xiaohongshu_content.split('\n')
titles = []
for line in content_lines:
line = line.strip()
if line and not line.startswith('#') and ':' not in line and '。' not in line:
titles = [line]
break
if not titles:
print("⚠️ 未找到标题,尝试其他方式提取...")
# 尝试其他方式提取标题
title_match = re.search(r'^[^#\n]+', xiaohongshu_content)
if title_match:
titles = [title_match.group(0).strip()]
if titles:
print(f"✅ 提取到标题: {titles[0]}")
else:
print("⚠️ 未能提取到标题")
# 提取标签(查找所有#开头的标签)
tags = []
tag_matches = re.findall(r'#([^\s#]+)', xiaohongshu_content)
if tag_matches:
tags = tag_matches
print(f"✅ 提取到{len(tags)}个标签")
else:
print("⚠️ 未找到标签")
# 获取相关图片
images = []
if self.unsplash_client:
# 使用标题和标签作为搜索关键词
search_terms = titles + tags[:2] if tags else titles
search_query = ' '.join(search_terms)
try:
images = self._get_unsplash_images(search_query, count=4)
if images:
print(f"✅ 成功获取{len(images)}张配图")
else:
print("⚠️ 未找到相关配图")
except Exception as e:
print(f"⚠️ 获取配图失败: {str(e)}")
return xiaohongshu_content, titles, tags, images
except Exception as e:
print(f"⚠️ 转换小红书笔记失败: {str(e)}")
return content, [], [], []
def _get_unsplash_images(self, query: str, count: int = 3) -> List[str]:
"""从Unsplash获取相关图片"""
if not self.unsplash_client:
print("⚠️ Unsplash客户端未初始化")
return []
try:
# 将查询词翻译成英文以获得更好的结果
if self.openrouter_available:
try:
response = client.chat.completions.create(
model=AI_MODEL,
messages=[
{"role": "system", "content": "你是一个翻译助手。请将输入的中文关键词翻译成最相关的1-3个英文关键词,用逗号分隔。直接返回翻译结果,不要加任何解释。例如:\n输入:'保险理财知识'\n输出:insurance,finance,investment"},
{"role": "user", "content": query}
],
temperature=0.3,
max_tokens=50
)
if response.choices:
query = response.choices[0].message.content.strip()
except Exception as e:
print(f"⚠️ 翻译关键词失败: {str(e)}")
# 使用httpx直接调用Unsplash API
headers = {
'Authorization': f'Client-ID {os.getenv("UNSPLASH_ACCESS_KEY")}'
}
# 对每个关键词分别搜索
all_photos = []
for keyword in query.split(','):
response = httpx.get(
'https://api.unsplash.com/search/photos',
params={
'query': keyword.strip(),
'per_page': count,
'orientation': 'portrait', # 小红书偏好竖版图片
'content_filter': 'high' # 只返回高质量图片
},
headers=headers,
verify=False # 禁用SSL验证
)
if response.status_code == 200:
data = response.json()
if data['results']:
# 获取图片URL,优先使用regular尺寸
photos = [photo['urls'].get('regular', photo['urls']['small'])
for photo in data['results']]
all_photos.extend(photos)
# 如果收集到的图片不够,用最后一个关键词继续搜索
while len(all_photos) < count and query:
response = httpx.get(
'https://api.unsplash.com/search/photos',
params={
'query': query.split(',')[-1].strip(),
'per_page': count - len(all_photos),
'orientation': 'portrait',
'content_filter': 'high',
'page': 2 # 获取下一页的结果
},
headers=headers,
verify=False
)
if response.status_code == 200:
data = response.json()
if data['results']:
photos = [photo['urls'].get('regular', photo['urls']['small'])
for photo in data['results']]
all_photos.extend(photos)
else:
break
else:
break
# 返回指定数量的图片
return all_photos[:count]
except Exception as e:
print(f"⚠️ 获取图片失败: {str(e)}")
return []
def process_video(self, url: str) -> List[str]:
"""处理视频链接,生成笔记
Args:
url (str): 视频链接
Returns:
List[str]: 生成的笔记文件路径列表
"""
print("\n📹 正在处理视频...")
# 创建临时目录
temp_dir = os.path.join(self.output_dir, 'temp')
os.makedirs(temp_dir, exist_ok=True)
try:
# 下载视频
print("⬇️ 正在下载视频...")
result = self._download_video(url, temp_dir)
if not result:
return []
audio_path, video_info = result
if not audio_path or not video_info:
return []
print(f"✅ 视频下载成功: {video_info['title']}")
# 转录音频
print("\n🎙️ 正在转录音频...")
print("正在转录音频(这可能需要几分钟)...")
transcript = self._transcribe_audio(audio_path)
if not transcript:
return []
# 保存原始转录内容
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
original_file = os.path.join(self.output_dir, f"{timestamp}_original.md")
with open(original_file, 'w', encoding='utf-8') as f:
f.write(f"# {video_info['title']}\n\n")
f.write(f"## 视频信息\n")
f.write(f"- 作者:{video_info['uploader']}\n")
f.write(f"- 时长:{video_info['duration']}秒\n")
f.write(f"- 平台:{video_info['platform']}\n")
f.write(f"- 链接:{url}\n\n")
f.write(f"## 原始转录内容\n\n")
f.write(transcript)
# 整理长文版本
print("\n📝 正在整理长文版本...")
organized_content = self._organize_long_content(transcript, video_info['duration'])
organized_file = os.path.join(self.output_dir, f"{timestamp}_organized.md")
with open(organized_file, 'w', encoding='utf-8') as f:
f.write(f"# {video_info['title']} - 整理版\n\n")
f.write(f"## 视频信息\n")
f.write(f"- 作者:{video_info['uploader']}\n")
f.write(f"- 时长:{video_info['duration']}秒\n")
f.write(f"- 平台:{video_info['platform']}\n")
f.write(f"- 链接:{url}\n\n")
f.write(f"## 内容整理\n\n")
f.write(organized_content)
# 生成小红书版本
print("\n📱 正在生成小红书版本...")
try:
xiaohongshu_content, titles, tags, images = self.convert_to_xiaohongshu(organized_content)
# 保存小红书版本
xiaohongshu_file = os.path.join(self.output_dir, f"{timestamp}_xiaohongshu.md")
# 写入文件
with open(xiaohongshu_file, "w", encoding="utf-8") as f:
# 写入标题
f.write(f"# {titles[0]}\n\n")
# 如果有图片,先写入第一张作为封面
if images:
f.write(f"\n\n")
# 写入正文内容的前半部分
content_parts = xiaohongshu_content.split('\n\n')
mid_point = len(content_parts) // 2
# 写入前半部分
f.write('\n\n'.join(content_parts[:mid_point]))
f.write('\n\n')
# 如果有第二张图片,插入到中间
if len(images) > 1:
f.write(f"\n\n")
# 写入后半部分
f.write('\n\n'.join(content_parts[mid_point:]))
# 如果有第三张图片,插入到末尾
if len(images) > 2:
f.write(f"\n\n")
# 写入标签
if tags:
f.write("\n\n---\n")
f.write("\n".join([f"#{tag}" for tag in tags]))
print(f"\n✅ 小红书版本已保存至: {xiaohongshu_file}")
return [original_file, organized_file, xiaohongshu_file]
except Exception as e:
print(f"⚠️ 生成小红书版本失败: {str(e)}")
import traceback
print(f"错误详情:\n{traceback.format_exc()}")
print(f"\n✅ 笔记已保存至: {original_file}")
print(f"✅ 整理版内容已保存至: {organized_file}")
return [original_file, organized_file]
except Exception as e:
print(f"⚠️ 处理视频时出错: {str(e)}")
return []
finally:
# 清理临时文件
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def process_markdown_file(self, input_file: str) -> None:
"""处理markdown文件,生成优化后的笔记
Args:
input_file (str): 输入的markdown文件路径
"""
try:
# 读取markdown文件
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
# 提取视频链接
video_links = re.findall(r'https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/|bilibili\.com/video/|douyin\.com/video/)[^\s\)]+', content)
if not video_links:
print("未在markdown文件中找到视频链接")
return
print(f"找到 {len(video_links)} 个视频链接,开始处理...\n")
# 处理每个视频链接
for i, url in enumerate(video_links, 1):
print(f"处理第 {i}/{len(video_links)} 个视频: {url}\n")
self.process_video(url)
except Exception as e:
print(f"处理markdown文件时出错: {str(e)}")
raise
def extract_urls_from_text(text: str) -> list:
"""
从文本中提取所有有效的URL
支持的URL格式:
- 视频平台URL (YouTube, Bilibili, 抖音等)
- 包含http://或https://的标准URL
- 短链接URL (如t.co等)
Args:
text: 输入文本
Returns:
list: 提取到的有效URL列表
"""
# URL正则模式
url_patterns = [
# 标准URL
r'https?://[^\s<>\[\]"\']+[^\s<>\[\]"\'.,]',
# 短链接
r'https?://[a-zA-Z0-9]+\.[a-zA-Z]{2,3}/[^\s<>\[\]"\']+',
# Bilibili
r'BV[a-zA-Z0-9]{10}',