-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
447 lines (374 loc) · 17.7 KB
/
bot.py
File metadata and controls
447 lines (374 loc) · 17.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
import yaml
from telegram import ReplyKeyboardMarkup, Update, ReplyKeyboardRemove
from telegram.ext import (CommandHandler, MessageHandler, Application, filters,
ConversationHandler, CallbackContext, ContextTypes)
from logger_config import logger
from constants import (TIMEOUT_IN_SEC, STATION_SELECT_ONE_TIME,
STATION_SELECT_SUBSCRIBE, ONE_TIME, SUBSCRIBE,
UNSUBSCRIBE, VALID_SUMMARY_INTERVALS,
BOT_JOBQUEUE_DELAY, BOT_DEFAULT_USER_ID,
BOT_MAX_RESCHEDULE_TIME)
class PlotBot:
def __init__(self, config_file, station_config, db=None, ecmwf=None):
self._config = yaml.safe_load(open(config_file))
self._admin_ids = self._config['bot'].get('admin_ids', [])
self.app = Application.builder().token(
self._config['bot']['token']).build()
self._db = db
self._ecmwf = ecmwf
self._station_names = sorted(
[station["name"] for station in station_config])
self._region_of_stations = {
station["name"]: station["region"]
for station in station_config
}
self._station_regions = sorted(
{station["region"]
for station in station_config})
# filter for stations
self._filter_stations = filters.Regex("^(" +
"|".join(self._station_names) +
")$")
# filter for regions
self._filter_regions = filters.Regex("^(" +
"|".join(self._station_regions) +
")$")
# filter for all commands of bot
self._filter_all_commands = filters.Regex(
"^(/locations|/subscribe|/unsubscribe|/plots|/help|/cancel|/start|/stats)$"
)
# filter for meaningful messages that are explicitly handled by the bot
# inverse of all filters above
self._filter_meaningful_messages = ~self._filter_all_commands & ~self._filter_regions & ~self._filter_stations
self.app.add_handler(CommandHandler('start', self._help))
self.app.add_handler(CommandHandler('help', self._help))
self.app.add_handler(CommandHandler('cancel', self._cancel))
self.app.add_handler(CommandHandler('stats', self._stats))
self.app.add_handler(
CommandHandler('locations', self._overview_locations))
# add help handler for all other messages
self.app.add_handler(
MessageHandler(self._filter_meaningful_messages, self._help))
subscription_handler = ConversationHandler(
entry_points=[
CommandHandler('subscribe', self._choose_all_region)
],
states={
STATION_SELECT_SUBSCRIBE:
[MessageHandler(self._filter_regions, self._choose_station)],
SUBSCRIBE: [
MessageHandler(self._filter_stations,
self._subscribe_for_station)
],
},
fallbacks=[CommandHandler('cancel', self._cancel)],
conversation_timeout=TIMEOUT_IN_SEC,
)
one_time_forecast_handler = ConversationHandler(
entry_points=[CommandHandler('plots', self._choose_all_region)],
states={
STATION_SELECT_ONE_TIME: [
MessageHandler(self._filter_regions,
self._choose_all_station)
],
ONE_TIME: [
MessageHandler(self._filter_stations,
self._request_one_time_forecast_for_station)
],
},
fallbacks=[CommandHandler('cancel', self._cancel)],
conversation_timeout=TIMEOUT_IN_SEC,
)
unsubscription_handler = ConversationHandler(
entry_points=[CommandHandler('unsubscribe', self._revoke_station)],
states={
UNSUBSCRIBE: [
MessageHandler(self._filter_stations,
self._unsubscribe_for_station)
],
},
fallbacks=[CommandHandler('cancel', self._cancel)],
conversation_timeout=TIMEOUT_IN_SEC,
)
self.app.add_handler(subscription_handler)
self.app.add_handler(unsubscription_handler)
self.app.add_handler(one_time_forecast_handler)
self.app.add_error_handler(self._error)
# schedule jobs
self.app.job_queue.run_once(
self._override_basetime,
when=0,
name='override basetime',
)
self.app.job_queue.run_repeating(
self._update_basetime,
first=120,
interval=60,
name='update basetime',
)
self.app.job_queue.run_repeating(
self._cache_plots,
interval=30,
name='cache plots',
)
self.app.job_queue.run_repeating(
self._broadcast,
interval=30,
name='broadcast',
)
async def _override_basetime(self, context: CallbackContext):
self._ecmwf.override_base_time_from_init()
async def _update_basetime(self, context: CallbackContext):
self._ecmwf.upgrade_basetime_global()
self._ecmwf.upgrade_basetime_stations()
async def _process_request(self, context: CallbackContext):
job = context.job
user_id, station_name = job.data
plots = self._ecmwf.download_plots([station_name
]).get(station_name, None)
# plots are available
if plots and len(plots) > 0:
await self._send_plots_to_user(plots, station_name, user_id)
job.schedule_removal()
else:
logger.info(
f"Plots not available for {station_name}, rescheduling job.")
def start(self):
logger.info('Starting bot')
self.app.run_polling(allowed_updates=Update.ALL_TYPES)
async def _error(self, update: Update, context: CallbackContext):
if update:
user_id = update.message.chat_id
else:
user_id = BOT_DEFAULT_USER_ID
logger.error(f"Exception while handling an update: {context.error}")
self._db.log_activity(
activity_type="bot-error",
user_id=user_id,
station="unknown",
)
async def _stats(self, update: Update, context: CallbackContext):
user_id = update.message.chat_id
if user_id not in self._admin_ids:
await update.message.reply_text(
"You are not authorized to view stats.")
return
activity_summary_text = []
# Query activity summary for each interval
activity_summary_text.append('*Activity*')
for interval in VALID_SUMMARY_INTERVALS:
activity_summary = self._db.get_activity_summary(interval)
activity_summary_text.append(f"_{interval.lower()}_")
for activity in activity_summary:
activity_summary_text.append(f"- {activity}")
activity_summary_text.append('')
# Query subscription summary
activity_summary_text.append('*Subscriptions*')
subscription_summary = self._db.get_subscription_summary()
for station in subscription_summary:
activity_summary_text.append(f"- {station}")
# Query unique subscribers
unique_subscribers = self._db.count_unique_subscribers()
activity_summary_text.append('')
activity_summary_text.append(
f"_Unique subscribers: {unique_subscribers}_")
activity_summary_text.append('')
activity_summary_text = "\n".join(activity_summary_text)
await update.message.reply_markdown(activity_summary_text)
async def _overview_locations(self, update: Update,
context: CallbackContext):
await update.message.reply_markdown("\n".join(
self._available_locations()))
def _available_locations(self):
text = ["_Available locations_"]
for location in self._station_regions:
text.append(f'')
text.append(f'*{location}*')
text.extend([
f'- {n}' for n in self._get_station_names_for_region(location)
])
return text
async def _help(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
greetings = "Hi! I am OpenEns. I supply you with ECWMF meteograms for places in Switzerland. \
\nTwice a day a new set of meteograms is available, usually at *8:00* for the *00 UTC* run and at *20:00* for the *12 UTC* run. \
\nYou can subscribe for a location or request a forecast only once. \
\n\n*Commands* \
\n- To get a list of available locations type /locations. \
\n- To subscribe type /subscribe. \
\n- To request a forecast type /plots. \
\n- To unsubscribe type /unsubscribe. \
\n- To get this message type /help. \
\n- To cancel any operation type /cancel. \
\n\nAll available commands are also shown in the menu at the bottom of the chat. \
\n\nIf you have any questions, feedback, or if the bot missed a place you want forecasts for, please open an issue on GitHub: \
\nhttps://github.com/jonasjucker/ensplotbot \
\n\n*Have fun!*"
await update.message.reply_markdown(greetings)
async def _choose_station(self, update: Update,
context: CallbackContext) -> int:
region = update.message.text
station_of_region = self._get_station_names_for_region(region)
user_id = update.message.chat_id
# Get the stations that the user has already subscribed to
subscribed_stations = self._db.get_subscriptions_by_user(user_id)
# Only include stations that the user has not already subscribed to
not_subscribed_for_all_stations = await self._send_station_keyboard(
update, [
name for name in station_of_region
if name not in subscribed_stations
])
return SUBSCRIBE if not_subscribed_for_all_stations else ConversationHandler.END
async def _choose_all_region(self, update: Update,
context: CallbackContext) -> int:
entry_point = update.message.text
await self._send_region_keyboard(
update, [name for name in self._station_regions])
# check that entry point is valid
if entry_point == '/subscribe':
return STATION_SELECT_SUBSCRIBE
elif entry_point == '/plots':
return STATION_SELECT_ONE_TIME
else:
raise ValueError(f'Invalid entry point: {entry_point}')
def _get_station_names_for_region(self, region) -> list[str]:
return sorted([
name for name in sorted(self._station_names)
if self._region_of_stations[name] == region
])
async def _choose_all_station(self, update: Update,
context: CallbackContext) -> int:
region = update.message.text
await self._send_station_keyboard(
update, self._get_station_names_for_region(region))
return ONE_TIME
async def _revoke_station(self, update: Update,
context: CallbackContext) -> int:
user_id = update.message.chat_id
# Get the stations that the user has already subscribed to
subscribed_stations = self._db.get_subscriptions_by_user(user_id)
# Only include stations that the user has already subscribed to
subscription_present = await self._send_station_keyboard(
update,
sorted([
name for name in self._station_names
if name in subscribed_stations
]))
return UNSUBSCRIBE if subscription_present else ConversationHandler.END
async def _send_region_keyboard(self, update: Update,
region_names: list[str]):
return await self._send_keyboard(update, region_names, 'region')
async def _send_keyboard(self, update: Update, names: list[str],
type: str):
reply_keyboard = [[name] for name in names]
if reply_keyboard:
reply_text = f'Choose a {type}'
await update.message.reply_text(
reply_text,
reply_markup=ReplyKeyboardMarkup(reply_keyboard,
one_time_keyboard=True),
)
return True
else:
await update.message.reply_text(
f"Sorry, no more {type}s for you here",
reply_markup=ReplyKeyboardRemove())
return False
async def _send_station_keyboard(self, update: Update,
station_names: list[str]):
return await self._send_keyboard(update, station_names, 'station')
async def _unsubscribe_for_station(self, update: Update,
context: CallbackContext) -> int:
user = update.message.from_user
msg_text = update.message.text
self._db.remove_subscription(msg_text, user.id)
reply_text = f'Unubscribed for Station {msg_text}'
await update.message.reply_text(
reply_text,
reply_markup=ReplyKeyboardRemove(),
)
logger.info(f' {user.first_name} unsubscribed for Station {msg_text}')
self._db.log_activity(
activity_type="unsubscription",
user_id=user.id,
station=msg_text,
)
return ConversationHandler.END
async def _subscribe_for_station(self, update: Update,
context: CallbackContext) -> int:
user = update.message.from_user
msg_text = update.message.text
reply_text = f"You sucessfully subscribed for {msg_text}. You will receive your first plots in a minute or two..."
await update.message.reply_text(
reply_text,
reply_markup=ReplyKeyboardRemove(),
)
self._db.add_subscription(msg_text, user.id)
self._schedule_process_request(f"subscription_{msg_text}_{user.id}",
data=(user.id, msg_text))
logger.info(f' {user.first_name} subscribed for Station {msg_text}')
self._db.log_activity(
activity_type="subscription",
user_id=user.id,
station=msg_text,
)
return ConversationHandler.END
def _schedule_process_request(self, job_name, data):
self.app.job_queue.run_repeating(self._process_request,
first=BOT_JOBQUEUE_DELAY,
interval=60,
last=BOT_MAX_RESCHEDULE_TIME,
name=job_name,
data=data)
logger.debug(f"Scheduled job {job_name} with data {data}")
async def _request_one_time_forecast_for_station(
self, update: Update, context: CallbackContext) -> int:
user = update.message.from_user
msg_text = update.message.text
reply_text = f"You sucessfully requested a forecast for {msg_text}. You will receive your first plots in a minute or two..."
await update.message.reply_text(
reply_text,
reply_markup=ReplyKeyboardRemove(),
)
self._schedule_process_request(
f"one_time_forecast_{msg_text}_{user.id}",
data=(user.id, msg_text))
logger.info(
f' {user.first_name} requested forecast for Station {msg_text}')
self._db.log_activity(
activity_type="one-time-request",
user_id=user.id,
station=msg_text,
)
return ConversationHandler.END
async def _cancel(self, update: Update, context: CallbackContext) -> int:
user = update.message.from_user
logger.info("User %s canceled the conversation.", user.first_name)
await update.message.reply_text(
'Bye! I hope we can talk again some day.',
reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def _cache_plots(self, context: CallbackContext):
self._ecmwf.cache_plots()
async def _send_plots_to_user(self, plots, station_name, user_id):
logger.debug(f'Send plots of {station_name} to user: {user_id}')
try:
await self.app.bot.send_message(chat_id=user_id, text=station_name)
for plot in plots:
logger.debug(f'Plot: {plot}')
await self.app.bot.send_photo(chat_id=user_id,
photo=open(plot, 'rb'))
except Exception as e:
logger.error(f'Error sending plots to user {user_id}: {e}')
async def _broadcast(self, context: CallbackContext):
latest_plots = self._ecmwf.download_latest_plots(
self._db.stations_with_subscribers())
if latest_plots:
for station_name, plots in latest_plots.items():
if len(plots) == 0:
continue
else:
for user_id in self._db.get_subscriptions_by_station(
station_name):
await self._send_plots_to_user(plots, station_name,
user_id)
logger.info(f'Broadcasted {station_name}')