-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
260 lines (205 loc) Β· 9.14 KB
/
bot.py
File metadata and controls
260 lines (205 loc) Β· 9.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from url_analyzer import URLAnalyzer
from database import cache
from config import TELEGRAM_BOT_TOKEN
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
class PhishingDetectorBot:
def __init__(self):
self.analyzer = URLAnalyzer()
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command"""
welcome_message = """
π **Welcome to Phishing Link Detector Bot!**
I help you identify dangerous links before you click them, especially fake meeting invitations.
**How to use:**
β’ Send me any message with links and I'll analyze them
β’ Use /check <url> to check a specific link
β’ Forward suspicious messages to me
**Commands:**
/start - Show this message
/check <url> - Check a specific URL
/help - Show detailed help
/stats - Show bot statistics
π **Stay safe online!**
"""
await update.message.reply_text(welcome_message, parse_mode='Markdown')
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /help command"""
help_message = """
π‘οΈ **Phishing Link Detector - Help**
**What I check:**
β
Domain legitimacy (Google Meet, Zoom, Teams, etc.)
β
Typosquatting detection
β
Domain age and SSL certificates
β
Security reputation via VirusTotal
β
Suspicious URL patterns
**Security Levels:**
π’ **SAFE** - Link appears legitimate
π‘ **SUSPICIOUS** - Exercise caution
π΄ **DANGEROUS** - Do not click!
**Examples of legitimate links:**
β’ Google Meet: https://meet.google.com/abc-defg-hij
β’ Zoom: https://zoom.us/j/1234567890
β’ Teams: https://teams.microsoft.com/l/meetup-join/...
**Red flags:**
β gmeeting.org (fake Google)
β zo0m.us (typosquatting)
β Newly registered domains
β No SSL certificate
β Flagged by security services
**Tips:**
β’ Always verify meeting invitations through official channels
β’ Check the sender's identity
β’ When in doubt, don't click!
"""
await update.message.reply_text(help_message, parse_mode='Markdown')
async def stats_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /stats command"""
stats = cache.get_stats()
stats_message = f"""
π **Bot Statistics**
π Total links checked: {stats['total_checks']}
π¨ Threats detected: {stats['threats_found']}
β‘ Cache hits: {stats['cache_hits']}
Detection rate: {(stats['threats_found'] / max(stats['total_checks'], 1) * 100):.1f}%
"""
await update.message.reply_text(stats_message, parse_mode='Markdown')
async def check_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /check command"""
if not context.args:
await update.message.reply_text(
"Please provide a URL to check.\nExample: /check https://example.com"
)
return
url = ' '.join(context.args)
await self._analyze_and_respond(update, [url])
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle regular messages with potential URLs"""
user_id = update.effective_user.id
# Check rate limiting
if not cache.check_rate_limit(user_id):
await update.message.reply_text(
"β οΈ Rate limit exceeded. Please wait before sending more requests."
)
return
# Extract URLs from message
urls = self.analyzer.extract_urls(update.message.text)
if not urls:
await update.message.reply_text(
"No URLs found in your message. Send me a link to analyze!"
)
return
await self._analyze_and_respond(update, urls)
async def _analyze_and_respond(self, update: Update, urls):
"""Analyze URLs and send response"""
if len(urls) > 5:
await update.message.reply_text(
"β οΈ Too many URLs in one message. Please send up to 5 URLs at a time."
)
return
# Send "analyzing" message
analyzing_msg = await update.message.reply_text("π Analyzing link(s)...")
responses = []
for url in urls:
# Check cache first
cached_result = cache.get_cached_result(url)
if cached_result:
analysis = cached_result
else:
# Perform analysis
analysis = self.analyzer.analyze_url(url)
if 'error' not in analysis:
cache.cache_result(url, analysis)
# Generate response
response = self._format_analysis_response(analysis)
responses.append(response)
# Delete analyzing message
await analyzing_msg.delete()
# Send results
full_response = '\n\n' + '='*50 + '\n\n'.join(responses)
# Split long messages
if len(full_response) > 4000:
for response in responses:
await update.message.reply_text(response, parse_mode='Markdown')
else:
await update.message.reply_text(full_response, parse_mode='Markdown')
def _format_analysis_response(self, analysis):
"""Format analysis results into user-friendly message"""
if 'error' in analysis:
return f"β Error analyzing URL: {analysis['error']}"
url = analysis['url']
security_level = analysis['security_level']
confidence = analysis.get('confidence', 0)
issues = analysis.get('issues', [])
recommendations = analysis.get('recommendations', [])
# Security level emoji and message
if security_level == 'DANGEROUS':
level_emoji = "π΄"
level_text = "**DANGEROUS LINK - DO NOT CLICK**"
elif security_level == 'SUSPICIOUS':
level_emoji = "π‘"
level_text = "**SUSPICIOUS LINK - EXERCISE CAUTION**"
else:
level_emoji = "π’"
level_text = "**SAFE LINK**"
# Build response
response = f"{level_emoji} {level_text}\n\n"
response += f"π **URL:** `{url}`\n"
response += f"π **Confidence:** {confidence:.0f}%\n\n"
# Add issues if any
if issues:
response += "β οΈ **Issues found:**\n"
for issue in issues[:5]: # Limit to 5 issues
response += f"β {issue}\n"
if len(issues) > 5:
response += f"... and {len(issues) - 5} more issues\n"
response += "\n"
# Add recommendations
if recommendations:
response += "π‘ **Recommendations:**\n"
for rec in recommendations[:3]: # Limit to 3 recommendations
response += f"β’ {rec}\n"
# Add domain analysis details for dangerous/suspicious links
if security_level in ['DANGEROUS', 'SUSPICIOUS']:
domain_analysis = analysis.get('domain_analysis', {})
if domain_analysis.get('domain_age_days') is not None:
response += f"\nπ
Domain age: {domain_analysis['domain_age_days']} days"
# Add VirusTotal results if available
security_analysis = analysis.get('security_analysis', {})
vt_results = security_analysis.get('api_results', {}).get('virustotal', {})
if 'positives' in vt_results and 'total' in vt_results:
response += f"\nπ‘οΈ VirusTotal: {vt_results['positives']}/{vt_results['total']} vendors flagged"
return response
async def error_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle errors"""
logger.error(f"Update {update} caused error {context.error}")
if update and update.message:
await update.message.reply_text(
"β An error occurred while processing your request. Please try again."
)
def main():
"""Start the bot"""
# Create application
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Create bot instance
bot = PhishingDetectorBot()
# Add handlers
application.add_handler(CommandHandler("start", bot.start_command))
application.add_handler(CommandHandler("help", bot.help_command))
application.add_handler(CommandHandler("stats", bot.stats_command))
application.add_handler(CommandHandler("check", bot.check_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, bot.handle_message))
# Add error handler
application.add_error_handler(bot.error_handler)
# Start bot
logger.info("Starting Phishing Detector Bot...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()