forked from lsdefine/GenericAgent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlaunch.pyw
More file actions
178 lines (158 loc) · 7.82 KB
/
launch.pyw
File metadata and controls
178 lines (158 loc) · 7.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import webview, threading, subprocess, sys, time, os, ctypes, atexit, socket, random, webbrowser
from urllib.request import urlopen
from urllib.error import URLError
APP_TITLE = 'GenericAgent'
WINDOW_WIDTH, WINDOW_HEIGHT = 600, 900
READY_TIMEOUT = 30
script_dir = os.path.dirname(os.path.abspath(__file__))
frontends_dir = os.path.join(script_dir, "frontends")
app_url = ''
window = None
def find_free_port(lo=18501, hi=18599):
ports = list(range(lo, hi+1)); random.shuffle(ports)
for p in ports:
try: s = socket.socket(); s.bind(('127.0.0.1', p)); s.close(); return p
except OSError: continue
raise RuntimeError(f'No free port in {lo}-{hi}')
def get_screen_size():
try:
user32 = ctypes.windll.user32
return user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
except:
return 1920, 1080
def get_window_position():
screen_width, screen_height = get_screen_size()
x_pos = max(0, (screen_width - WINDOW_WIDTH) // 2)
y_pos = max(0, (screen_height - WINDOW_HEIGHT) // 2)
return x_pos, y_pos
def wait_for_streamlit(port, timeout=READY_TIMEOUT):
deadline = time.time() + timeout
url = f'http://localhost:{port}'
while time.time() < deadline:
try:
with urlopen(url, timeout=2) as resp:
if resp.status == 200: return True
except (URLError, OSError):
time.sleep(0.5)
return False
def focus_native_window(title, timeout=15):
if os.name != 'nt': return False
user32 = ctypes.windll.user32
deadline = time.time() + timeout
while time.time() < deadline:
hwnd = user32.FindWindowW(None, title)
if hwnd:
user32.ShowWindow(hwnd, 9)
user32.SetForegroundWindow(hwnd)
return True
time.sleep(0.5)
return False
def post_start_tasks():
if window is not None:
try: window.restore()
except: pass
try: window.show()
except: pass
if not focus_native_window(APP_TITLE):
print('[Launch] WebView window was not brought to foreground, opening browser fallback')
webbrowser.open(app_url)
def start_streamlit(port):
global proc
cmd = [sys.executable, "-m", "streamlit", "run", os.path.join(frontends_dir, "stapp.py"), "--server.port", str(port), "--server.address", "localhost", "--server.headless", "true"]
proc = subprocess.Popen(cmd)
atexit.register(proc.kill)
def inject(text):
window.evaluate_js(f"""
const textarea = document.querySelector('textarea[data-testid="stChatInputTextArea"]');
if (textarea) {{
// 1. 用原生 setter 设置值(绕过 React)
const nativeTextAreaValueSetter = Object.getOwnPropertyDescriptor(window.HTMLTextAreaElement.prototype, 'value').set;
nativeTextAreaValueSetter.call(textarea, {repr(text)});
// 2. 触发 React 的 input 事件
textarea.dispatchEvent(new Event('input', {{ bubbles: true }}));
// 3. 触发 change 事件(有些组件需要)
textarea.dispatchEvent(new Event('change', {{ bubbles: true }}));
// 4. 延迟提交
setTimeout(() => {{
const btn = document.querySelector('[data-testid="stChatInputSubmitButton"]');
if (btn) {{btn.click();console.log('Submitted:', {repr(text)});}}
}}, 200);
}}""")
def get_last_reply_time():
last = window.evaluate_js("""
const el = document.getElementById('last-reply-time');
el ? parseInt(el.textContent) : 0;
""") or 0
return last or int(time.time())
def idle_monitor():
last_trigger_time = 0
while True:
time.sleep(5)
try:
now = time.time()
if now - last_trigger_time < 120: continue
last_reply = get_last_reply_time()
if now - last_reply > 1800:
print('[Idle Monitor] Detected idle state, injecting task...')
inject("[AUTO]🤖 用户已经离开超过30分钟,作为自主智能体,请阅读自动化sop,执行自动任务。")
last_trigger_time = now
except Exception as e:
print(f'[Idle Monitor] Error: {e}')
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('port', nargs='?', default='0');
parser.add_argument('--tg', action='store_true', help='启动 Telegram Bot');
parser.add_argument('--qq', action='store_true', help='启动 QQ Bot');
parser.add_argument('--feishu', '--fs', dest='feishu', action='store_true', help='启动 Feishu Bot');
parser.add_argument('--wecom', action='store_true', help='启动 WeCom Bot');
parser.add_argument('--dingtalk', '--dt', dest='dingtalk', action='store_true', help='启动 DingTalk Bot');
parser.add_argument('--sched', action='store_true', help='启动计划任务调度器')
parser.add_argument('--llm_no', type=int, default=0, help='LLM编号')
args = parser.parse_args()
port = str(find_free_port()) if args.port == '0' else args.port
print(f'[Launch] Using port {port}')
threading.Thread(target=start_streamlit, args=(port,), daemon=True).start()
if args.tg:
tgproc = subprocess.Popen([sys.executable, os.path.join(frontends_dir, "tgapp.py")], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0)
atexit.register(tgproc.kill)
print('[Launch] Telegram Bot started')
else: print('[Launch] Telegram Bot not enabled (use --tg to start)')
if args.qq:
qqproc = subprocess.Popen([sys.executable, os.path.join(frontends_dir, "qqapp.py")], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0)
atexit.register(qqproc.kill)
print('[Launch] QQ Bot started')
else: print('[Launch] QQ Bot not enabled (use --qq to start)')
if args.feishu:
fsproc = subprocess.Popen([sys.executable, os.path.join(frontends_dir, "fsapp.py")], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0)
atexit.register(fsproc.kill)
print('[Launch] Feishu Bot started')
else: print('[Launch] Feishu Bot not enabled (use --feishu to start)')
if args.wecom:
wcproc = subprocess.Popen([sys.executable, os.path.join(frontends_dir, "wecomapp.py")], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0)
atexit.register(wcproc.kill)
print('[Launch] WeCom Bot started')
else: print('[Launch] WeCom Bot not enabled (use --wecom to start)')
if args.dingtalk:
dtproc = subprocess.Popen([sys.executable, os.path.join(frontends_dir, "dingtalkapp.py")], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0)
atexit.register(dtproc.kill)
print('[Launch] DingTalk Bot started')
else: print('[Launch] DingTalk Bot not enabled (use --dingtalk to start)')
if args.sched:
scheduler_proc = subprocess.Popen([sys.executable, os.path.join(script_dir, "agentmain.py"), "--reflect", os.path.join(script_dir, "reflect", "scheduler.py"), "--llm_no", str(args.llm_no)], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0)
atexit.register(scheduler_proc.kill)
print('[Launch] Task Scheduler started (duplicate prevented by scheduler port lock)')
else: print('[Launch] Task Scheduler not enabled (--sched)')
monitor_thread = threading.Thread(target=idle_monitor, daemon=True)
monitor_thread.start()
app_url = f'http://localhost:{port}'
if not wait_for_streamlit(port):
print(f'[Launch] Streamlit not ready after {READY_TIMEOUT}s, opening browser fallback: {app_url}')
webbrowser.open(app_url)
sys.exit(0)
x_pos, y_pos = get_window_position()
window = webview.create_window(
title=APP_TITLE, url=app_url,
width=WINDOW_WIDTH, height=WINDOW_HEIGHT, x=x_pos, y=y_pos,
resizable=True, text_select=True)
webview.start(post_start_tasks)