-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmoogly.py
462 lines (386 loc) · 21.2 KB
/
moogly.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
import discord
from discord.ext import commands, tasks
import json
import sqlite3
import traceback
import os
from datetime import datetime, timezone, timedelta
class BotClient(commands.Bot):
def __init__(self, config, dyes_fr):
intents = discord.Intents.default()
intents.members = True
intents.message_content = True
dir = os.path.dirname(os.path.realpath(__file__))
db_path = os.path.join(dir, config['database'])
self.db_conn = sqlite3.connect(db_path)
self.db_cursor = self.db_conn.cursor()
self.db_conn.row_factory = sqlite3.Row
self.create_tables()
self.config = config
self.dyes_fr = dyes_fr
super().__init__(
command_prefix=commands.when_mentioned_or(config['prefix']),
intents=intents,
help_command=None,
)
async def setup_hook(self):
self.add_view(AdmissionMessage(timeout=None))
print('Registered persistent view: AdmissionMessage')
self.add_view(ApplicationMessage(timeout=None))
print('Registered persistent view: ApplicationMessage')
# Retrieve the message_id, discord_timestamp, and timestamp from the database
self.db_cursor.execute('SELECT message_id, discord_timestamp, timestamp, message, available_slots FROM maps_runs')
maps_runs = self.db_cursor.fetchall()
# Recreate the MapsRunView instance for each message_id
for message_id, discord_timestamp, timestamp, message, available_slots in maps_runs:
view = MapsRunView(message_id=int(message_id), timestamp=discord_timestamp, message=message, available_slots=available_slots)
self.add_view(view)
print(f"Registered persistent view: MapsRunView (message_id={message_id})")
return await super().setup_hook()
async def on_ready(self):
self.ping_task.start()
print(f"Logged in as {self.user} (ID: {self.user.id})")
print('------')
def create_tables(self):
self.db_cursor.execute('''
CREATE TABLE IF NOT EXISTS applications (
user_id INTEGER PRIMARY KEY,
fc TEXT,
ingame_name TEXT
)
''')
self.db_cursor.execute('''
CREATE TABLE IF NOT EXISTS maps_runs (
message_id INTEGER PRIMARY KEY,
discord_timestamp TEXT,
timestamp TIMESTAMP,
message TEXT,
available_slots INTEGER DEFAULT 8,
user_ids TEXT,
pinged INTEGER DEFAULT 0
)
''')
self.db_conn.commit()
@tasks.loop(seconds=60)
async def ping_task(self):
# Fetch maps runs that have not been pinged yet
self.db_cursor.execute('SELECT * FROM maps_runs WHERE pinged=0')
maps_runs = self.db_cursor.fetchall()
if not maps_runs:
return
current_time = datetime.now(timezone.utc)
for maps_run in maps_runs:
# Calculate time difference between current time and ping time
current_time = datetime.now(timezone.utc)
ping_time = datetime.fromtimestamp(maps_run[2], tz=timezone.utc) - timedelta(minutes=20)
if current_time >= ping_time:
# Fetch the joined users
joined_user_ids = maps_run[5].split(',')
joined_users = [f"<@{user_id}>" for user_id in joined_user_ids if user_id]
# Create an embed with the ping message
embed = discord.Embed(
title="Treasure Maps run reminder 🚨",
description=f"The maps run will start in 20 minutes. Are you ready?\n\nJoined Users:\n" + "\n".join(joined_users),
color=0xff8a08
)
# Find the message to ping
message_id = maps_run[0]
channel_id = self.config['events_channel_id']
channel = await self.fetch_channel(channel_id)
if channel:
try:
await channel.send(f"<@&{self.config['maps_notifications_role_id']}> ", embed=embed)
# Update the pinged status to true
self.db_cursor.execute('UPDATE maps_runs SET pinged=1 WHERE message_id=?', (message_id,))
self.db_conn.commit()
except Exception as e:
print(e)
# Load config from config.json file
def load_config():
dir = os.path.dirname(os.path.realpath(__file__))
config_path = os.path.join(dir, '/config/config.json')
with open(config_path, 'r') as f:
return json.load(f)
def load_dyes_fr():
dir = os.path.dirname(os.path.realpath(__file__))
config_path = os.path.join(dir, 'dyes_fr.json')
with open(config_path, 'r') as f:
return json.load(f)
bot = BotClient(load_config(), load_dyes_fr())
# UI Name modal
class ApplicationModal(discord.ui.Modal, title='Access application'):
def __init__(self, fc: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fc = fc
name = discord.ui.TextInput(
label='What is your in-game name?',
placeholder='Name LastName',
)
async def on_submit(self, interaction: discord.Interaction):
if not self.name.value or len(self.name.value.split(' ')) != 2:
await interaction.response.send_message('Error: Invalid in-game name (format: Name LastName)', ephemeral=True)
return
bot.db_cursor.execute('INSERT INTO applications (user_id, fc, ingame_name) VALUES (?, ?, ?)', (interaction.user.id, self.fc, self.name.value))
bot.db_conn.commit()
application_channel = await bot.fetch_channel(bot.config['admission_channel_id'])
message_content = f"New application from {interaction.user.mention} (ID: {interaction.user.id}):\nIn-game name: {self.name.value}\nFC: {self.fc}"
await application_channel.send(message_content, view=AdmissionMessage())
await interaction.response.send_message(f"Application sent, awaiting approval...", ephemeral=True)
async def on_error(self, interaction: discord.Interaction, error: Exception):
await interaction.response.send_message('Oops! Something went wrong. Please try again', ephemeral=True)
traceback.print_exception(type(error), error, error.__traceback__) # Make sure we know what the error actually is
# Approve/Deny view
class AdmissionMessage(discord.ui.View):
async def interaction_check(self, interaction: discord.Interaction[discord.Client]) -> bool:
member = await interaction.guild.fetch_member(interaction.user.id)
if member.guild_permissions.administrator and bot.config['administrator_role_id'] in [role.id for role in member.roles]:
return True
else:
await interaction.response.send_message("You don't have permission to use this button.", ephemeral=True)
return False
def extract_user_id(self, message_content: str) -> int:
start_index = message_content.find('(ID: ') + len('(ID: ')
end_index = message_content.find(')', start_index)
user_id_str = message_content[start_index:end_index]
return int(user_id_str)
@discord.ui.button(label='Approve', style=discord.ButtonStyle.green, custom_id='AdmissionMessage:approve_button')
async def approve_button(self, interaction: discord.Interaction, button: discord.ui.Button):
user_id = self.extract_user_id(interaction.message.content)
bot.db_cursor.execute('SELECT * FROM applications WHERE user_id=?', (user_id,))
application = bot.db_cursor.fetchone()
if not application:
await interaction.response.send_message('Error: user_id not found in applications database', ephemeral=True)
return
user = await interaction.guild.fetch_member(user_id)
if user:
if application[1] == 'Seventh Haven':
role = interaction.guild.get_role(bot.config['seventh_haven_role_id'])
elif application[1] == 'FC Friend':
role = interaction.guild.get_role(bot.config['fc_friend_role_id'])
newcomer_role = interaction.guild.get_role(bot.config['newcomer_role_id'])
bot.db_cursor.execute('DELETE FROM applications WHERE user_id=?', (user_id,))
bot.db_conn.commit()
logs_channel = await bot.fetch_channel(bot.config['logs_channel_id'])
try:
await user.remove_roles(newcomer_role)
await user.add_roles(role)
await user.edit(nick=application[2])
except discord.errors.Forbidden:
await logs_channel.send(f"Execution of application for {user.mention} failed, are you sure the user doesn\'t have a role above Moogly\'s role?")
await interaction.message.delete()
await interaction.response.send_message(f"Application for {user.mention} approved", ephemeral=True)
await logs_channel.send(f"Application from {user.mention} (ID: {user_id}) approved:\nIn-game name: {application[2]}\nFC: {application[1]}")
await user.send('Your application to get access to Seventh Haven server has been approved, you now have access to the server.')
else:
await interaction.response.send_message('Error: Failed to fetch user', ephemeral=True)
@discord.ui.button(label='Decline', style=discord.ButtonStyle.red, custom_id='AdmissionMessage:decline_button')
async def decline_button(self, interaction: discord.Interaction, button: discord.ui.Button):
user_id = self.extract_user_id(interaction.message.content)
bot.db_cursor.execute('SELECT * FROM applications WHERE user_id=?', (user_id,))
application = bot.db_cursor.fetchone()
if not application:
await interaction.response.send_message('Error: user_id not found in applications database', ephemeral=True)
return
user = await bot.fetch_user(user_id)
if user:
bot.db_cursor.execute('DELETE FROM applications WHERE user_id=?', (user_id,))
bot.db_conn.commit()
await interaction.response.send_message(f"Application for {user.mention} declined", ephemeral=True)
logs_channel = await bot.fetch_channel(bot.config['logs_channel_id'])
await logs_channel.send(f"Application from {user.mention} (ID: {user_id}) declined:\nIn-game name: {application[2]}\nFC: {application[1]}")
await user.send('Your application to get access to Seventh Haven server has been declined, please try again.')
await interaction.message.delete()
else:
await interaction.response.send_message('Error: Failed to fetch user', ephemeral=True)
# Send application view
class ApplicationMessage(discord.ui.View):
async def interaction_check(self, interaction: discord.Interaction[discord.Client]) -> bool:
member = await interaction.guild.fetch_member(interaction.user.id)
if bot.config['newcomer_role_id'] in [role.id for role in member.roles]:
return True
else:
await interaction.response.send_message("You don't have permission to use this button. Please contact an administrator.", ephemeral=True)
return False
@discord.ui.button(label='Seventh Haven', style=discord.ButtonStyle.blurple, custom_id='ApplicationMessage:seventh_haven_button', emoji=discord.PartialEmoji.from_str('<:seventhhaven:1281356115954761758>'))
async def seventh_haven_button(self, interaction: discord.Interaction, button: discord.ui.Button):
bot.db_cursor.execute('SELECT * FROM applications WHERE user_id=?', (interaction.user.id,))
application = bot.db_cursor.fetchone()
if not application:
await interaction.response.send_modal(ApplicationModal('Seventh Haven'))
else:
await interaction.response.send_message('You already sent an application, please wait until an administrator reviews it.', ephemeral=True)
@discord.ui.button(label='FC Friend', style=discord.ButtonStyle.green, custom_id='ApplicationMessage:fc_friend_button', emoji=discord.PartialEmoji.from_str('👋'))
async def fc_friend_button(self, interaction: discord.Interaction, button: discord.ui.Button):
bot.db_cursor.execute('SELECT * FROM applications WHERE user_id=?', (interaction.user.id,))
application = bot.db_cursor.fetchone()
if not application:
await interaction.response.send_modal(ApplicationModal('FC Friend'))
else:
await interaction.response.send_message('You already sent an application, please wait until an administrator reviews it.', ephemeral=True)
# Add application form to current channel | !application_form
@bot.command()
@commands.has_permissions(administrator=True)
@commands.has_role(bot.config['administrator_role_id'])
async def application_form(interaction: discord.Interaction):
await interaction.channel.send('Where are you from?', view=ApplicationMessage())
# Clear all applications | !application_clear
@bot.command()
@commands.has_permissions(administrator=True)
@commands.has_role(bot.config['administrator_role_id'])
async def application_clear(interaction: discord.Interaction):
bot.db_cursor.execute('DELETE FROM applications')
bot.db_conn.commit()
await interaction.channel.send('All applications cleared.')
# Delete a specific application | !application_delete <mention>
@bot.command()
@commands.has_permissions(administrator=True)
@commands.has_role(bot.config['administrator_role_id'])
async def application_delete(interaction: discord.Interaction, user: discord.User):
if not isinstance(user, discord.User):
await interaction.response.send_message('Error: invalid user argument, format: !application_delete <mention>')
return
bot.db_cursor.execute('DELETE FROM applications WHERE user_id=?', (user.id,))
bot.db_conn.commit()
await interaction.channel.send(f"Application deleted for user {user.mention}.")
# List guild's emojis ids
@commands.has_permissions(administrator=True)
@commands.has_role(bot.config['administrator_role_id'])
@bot.command()
async def get_guild_emojis(interaction: discord.Interaction):
emojis = [f"{emoji} | {emoji.name} | {emoji.id}" for emoji in interaction.guild.emojis]
if emojis:
await interaction.channel.send("\n".join(emojis))
else:
await interaction.channel.send("No emojis found in this server.")
# Translate english dyes in french using | as a separator
@bot.command()
async def translate_dyes_fr(interaction: discord.Interaction, *args):
# Split the input arguments
arguments = ''.join(args)
dyes = arguments.split('|')
# Count occurrences of each translated dye name
dye_counts = {}
for dye_name in dyes:
for dye_entry in bot.dyes_fr:
if dye_entry["original_name"].replace(" ", "") == dye_name.strip():
translated_name = dye_entry["translated_name"]
dye_counts[translated_name] = dye_counts.get(translated_name, 0) + 1
break
# Create an embed to display the results
embed = discord.Embed(
title="Translated dyes (french)",
color=0x5d3fd3
)
for translated_name, count in dye_counts.items():
original_name = [entry["original_name"] for entry in bot.dyes_fr if entry["translated_name"] == translated_name][0]
embed.add_field(name=f"{count}x {translated_name}", value=f"({original_name})", inline=False)
await interaction.channel.send(embed=embed)
class MapsRunView(discord.ui.View):
def __init__(self, message_id, timestamp, description, available_slots):
super().__init__(timeout=None)
self.message_id = message_id
self.timestamp = timestamp
self.description = description
self.available_slots = available_slots
self.embed = discord.Embed()
self.update_embed()
def update_embed(self):
# Fetch joined users
bot.db_cursor.execute('SELECT user_ids FROM maps_runs WHERE message_id=?', (self.message_id,))
maps_run = bot.db_cursor.fetchone()
if maps_run and maps_run[0]:
joined_user_ids = maps_run[0].split(',')
joined_users = [f"<@{user_id}>" for user_id in joined_user_ids if user_id]
joined_users_description = "\n\n**Joined users**:\n" + "\n".join(joined_users)
else:
joined_users_description = ""
self.embed = discord.Embed(
title="Treasure Maps run 🧭",
description=f"{self.description}\nNext maps run on {self.timestamp}\nWho's in? 💰\nCurrently available slots: **{self.available_slots} / 8{joined_users_description}**",
color=0xffc100
)
@discord.ui.button(label="Join", style=discord.ButtonStyle.green, custom_id="join_map_run")
async def join_button(self, interaction: discord.Interaction, button: discord.ui.Button):
user_id = interaction.user.id
bot.db_cursor.execute('SELECT * FROM maps_runs WHERE message_id=?', (self.message_id,))
maps_run = bot.db_cursor.fetchone()
if maps_run:
user_ids = maps_run[5].split(',')
if str(user_id) not in user_ids:
if self.available_slots > 0:
if maps_run[6] == 0: # Check if ping was already sent, meaning that the map will be running soon
user_ids.append(str(user_id))
self.available_slots -= 1
bot.db_cursor.execute(
'UPDATE maps_runs SET user_ids=?, available_slots=? WHERE message_id=?',
(','.join(user_ids), self.available_slots, self.message_id)
)
bot.db_conn.commit()
self.update_embed()
await interaction.message.edit(embed=self.embed, view=self)
await interaction.response.send_message('You have successfully joined the map run! You will be pinged 20 minutes before the maps run starts.', ephemeral=True)
else:
await interaction.response.send_message('The map run is starting soon. You cannot join now.', ephemeral=True)
else:
await interaction.response.send_message('Sorry, no more available slots for this map run.', ephemeral=True)
else:
await interaction.response.send_message('You have already joined this map run.', ephemeral=True)
# Create a new map run message | !maps_create <timestamp>
@bot.command()
@commands.has_permissions(administrator=True)
@commands.has_role(bot.config['administrator_role_id'])
async def maps_create(interaction: discord.Interaction, timestamp: str, *args):
channel = await bot.fetch_channel(bot.config['events_channel_id'])
# Check if the timestamp is valid
try:
timestamp_dt = datetime.fromtimestamp(int(timestamp.split(':')[1].split(':')[0]), tz=timezone.utc)
except ValueError:
await interaction.channel.send('Invalid timestamp format. Please use the correct Discord timestamp format.')
return
# Reassemble the message after the timestamp
msg = ' '.join(args)
# Enforce the timestamp to be in full format by replacing characters after the last ':' with 'F'
new_timestamp = f"{timestamp.rsplit(':', 1)[0]}:F>"
view = MapsRunView(message_id=None, timestamp=new_timestamp, message=msg, available_slots=8)
message = await channel.send(f"<@&{bot.config['maps_notifications_role_id']}>", embed=view.embed, view=view)
# Update the message_id in the view
view.message_id = message.id
# Store the message info in the database
bot.db_cursor.execute(
'INSERT INTO maps_runs (message_id, discord_timestamp, timestamp, message, available_slots, user_ids, pinged) VALUES (?, ?, ?, ?, ?, ?, ?)',
(int(message.id), new_timestamp, timestamp_dt.timestamp(), msg, 8, '', 0)
)
bot.db_conn.commit()
# Update the embed with the joined users
view.update_embed()
await message.edit(embed=view.embed, view=view)
# Outputs a list of users who have joined the map run | !maps_list <timestamp>
@bot.command()
@commands.has_permissions(administrator=True)
@commands.has_role(bot.config['administrator_role_id'])
async def maps_list(interaction: discord.Interaction, message_id: int):
# Fetch the message
channel = await bot.fetch_channel(bot.config['events_channel_id'])
try:
message = await channel.fetch_message(message_id)
except discord.NotFound:
await interaction.channel.send('Message not found.')
return
# Check if the message is a map run message
bot.db_cursor.execute('SELECT * FROM maps_runs WHERE message_id=?', (message_id,))
maps_run = bot.db_cursor.fetchone()
if not maps_run:
await interaction.channel.send('Message is not a maps run message.')
return
# Fetch joined users
joined_user_ids = maps_run[5].split(',')
joined_users = [await interaction.guild.fetch_member(int(user_id)).mention for user_id in joined_user_ids if user_id]
# Create an embed with the joined users
embed = discord.Embed(
title="Joined Users",
description="\n".join(joined_users) if joined_users else "No users have joined yet.",
color=0x5d3fd3
)
await interaction.channel.send(embed=embed)
# Run the bot
bot.run(bot.config['token'])