-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
494 lines (416 loc) · 17.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
import os
from dotenv import load_dotenv
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
ConversationHandler,
MessageHandler,
filters,
ContextTypes,
CallbackQueryHandler
)
from pymongo import MongoClient
from solana.rpc.api import Client
from datetime import datetime, UTC
from validateBtcAddress import is_valid_bitcoin_address
from validateSolAddress import is_valid_solana_address
from getRatePreview import get_rate_preview
from createSwap import create_signed_jupiter_swap_tx
from initiateChangeNow import initiate_change_now_swap
from createTransfer import create_signed_usdc_transfer_tx
from getMinimumAmt import get_min_amount
from bundle import BundleStatus, send_bundle_with_tip
from getChangeNowStatus import get_status
from verifyDeposit import verify_usdc_deposit
class ChangeNowError(Exception):
pass
class SolanaTransactionError(Exception):
pass
# Load environment variables
load_dotenv()
# Constants and Configurations
SOLANA_RPC_URL = os.getenv("SOLANA_RPC_URL")
client = Client(SOLANA_RPC_URL)
CHANGE_NOW_URL = "https://api.changenow.io/v2/exchange"
CHANGE_NOW_API_KEY = os.getenv("CHANGE_NOW_API_KEY")
MONGO_URI = os.getenv("MONGO_URI")
INTERMEDIARY_SOL_WALLET = os.getenv("INTERMEDIARY_SOL_WALLET")
USDC_MINT = os.getenv("USDC_MINT")
TARGET_TOKEN_MINT_ADDRESS = os.getenv("TARGET_TOKEN_MINT_ADDRESS")
USDC_DECIMALS = os.getenv("USDC_DECIMALS")
PRESET_AMOUNTS = [100, 500, 1000, 5000]
MAX_AMOUNT = 1000000
FEE_PERCENTAGE = 0.05
# MongoDB setup
client_db = MongoClient(MONGO_URI, tls=True, tlsAllowInvalidCertificates=True)
db = client_db["telegram_bot"]
users_collection = db["users"]
# Conversation states
SOLANA_WALLET, BITCOIN_ADDRESS, CUSTOM_AMOUNT = range(3)
# Core Command Handlers
async def start(update, context):
keyboard = [
[InlineKeyboardButton("Register Wallets", callback_data='register')],
[InlineKeyboardButton("Start New Swap", callback_data='swap')],
[InlineKeyboardButton("Help", callback_data='help')]
]
await update.message.reply_text(
"Welcome to the USDC-BTC Swap Bot! 🤖\n\nWhat would you like to do?",
reply_markup=InlineKeyboardMarkup(keyboard)
)
async def register(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"Let's register your wallet addresses! 📝\n\n"
"First, please enter your Solana wallet address:"
)
return SOLANA_WALLET
async def solana_wallet_input(update: Update, context: ContextTypes.DEFAULT_TYPE):
solana_address = update.message.text.strip()
if not is_valid_solana_address(solana_address):
await update.message.reply_text(
"❌ Invalid Solana address. Please enter a valid Solana wallet address or /cancel to abort."
)
return SOLANA_WALLET
# Store the Solana address temporarily in context
context.user_data['solana_address'] = solana_address
await update.message.reply_text(
"✅ Solana address saved!\n\n"
"Now, please enter your Bitcoin address where you'll receive BTC:"
)
return BITCOIN_ADDRESS
async def bitcoin_address_input(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
bitcoin_address = update.message.text.strip()
if not is_valid_bitcoin_address(bitcoin_address):
await update.message.reply_text(
"❌ Invalid Bitcoin address. Please enter a valid Bitcoin address or /cancel to abort."
)
return BITCOIN_ADDRESS
# Get the previously stored Solana address
solana_address = context.user_data.get('solana_address')
# Store both addresses in MongoDB
users_collection.update_one(
{"_id": user_id},
{
"$set": {
"solana_address": solana_address,
"bitcoin_address": bitcoin_address,
"updated_at": datetime.now(UTC)
}
},
upsert=True
)
# Clear temporary data
context.user_data.clear()
await update.message.reply_text(
"✅ Registration complete!\n\n"
"Your addresses have been saved:\n"
f"🔹 Solana: `{solana_address}`\n"
f"🔹 Bitcoin: `{bitcoin_address}`\n\n"
"You can now use /swap to start a new swap transaction.",
parse_mode='Markdown'
)
return ConversationHandler.END
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.user_data.clear()
await update.message.reply_text(
"Registration cancelled. You can start over with /register"
)
return ConversationHandler.END
async def swap(update, context):
user_id = update.effective_user.id
user = users_collection.find_one({"_id": user_id})
if not user:
await update.message.reply_text(
"❌ Please register your wallets first using /register"
)
return
# If no amount provided, show preset amounts
if not context.args:
keyboard = [
[InlineKeyboardButton(f"${amt:,} USDC", callback_data=f"swap_{amt}")
for amt in PRESET_AMOUNTS[i:i+2]]
for i in range(0, len(PRESET_AMOUNTS), 2)
]
keyboard.append([InlineKeyboardButton("Custom Amount", callback_data="swap_custom")])
await update.message.reply_text(
"💱 Select swap amount or enter custom amount:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return
try:
amount = float(context.args[0])
except ValueError:
await update.message.reply_text("❌ Please enter a valid number")
return
if amount > MAX_AMOUNT:
await update.message.reply_text(
f"❌ Amount exceeds maximum limit of ${MAX_AMOUNT:,} USDC"
)
return
# Show swap preview with confirmation buttons
fee = amount * FEE_PERCENTAGE
amount_after_fee = amount - fee
try:
minimum_amount = await get_min_amount("usdc", "btc", "sol", "btc")
total_min_amount = minimum_amount + fee
if amount < total_min_amount:
await update.message.reply_text(
f"❌ Amount too low. Minimum amount after fees is ${total_min_amount:,.2f} USDC"
)
return
rate_preview = await get_rate_preview(amount_after_fee)
keyboard = [
[
InlineKeyboardButton("✅ Confirm", callback_data=f"confirm_swap_{amount}"),
InlineKeyboardButton("❌ Cancel", callback_data="cancel_swap")
]
]
await update.message.reply_text(
f"📊 *Swap Preview*\n\n"
f"💰 Amount: `${amount:,.2f}` USDC\n"
f"📊 Fee ({FEE_PERCENTAGE*100}%): `${fee:,.2f}` USDC\n"
f"💱 Amount after fee: `${amount_after_fee:,.2f}` USDC\n"
f"🔄 Expected BTC: `{format(rate_preview, '.8f')}` BTC\n\n"
f"Please confirm to proceed with the swap.",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup(keyboard)
)
except Exception as e:
await update.message.reply_text(f"❌ An error occurred: {str(e)}")
return
async def check_rate(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
amount = float(context.args[0]) if context.args else 100 # Default amount
rate_preview = await get_rate_preview(amount)
await update.message.reply_text(
f"Current Rate Preview:\n"
f"{amount:,.2f} USDC ≈ {format(rate_preview, '.8f')} BTC"
)
except Exception as e:
await update.message.reply_text("Error fetching rate. Please try again.")
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
if query.data.startswith('swap_'):
if query.data == 'swap_custom':
await query.message.reply_text(
"Please enter the amount of USDC you want to swap:",
)
return CUSTOM_AMOUNT # Return the new state
else:
amount = float(query.data.split('_')[1])
context.args = [str(amount)]
await swap(query, context)
elif query.data.startswith('confirm_swap_'):
try:
amount = float(query.data.split('_')[2])
# First remove the inline keyboard
await query.message.edit_reply_markup(reply_markup=None)
# Then process the swap
await process_swap(query.message, amount, context)
except Exception as e:
await query.message.reply_text(f"❌ Error processing swap: {str(e)}")
elif query.data == 'cancel_swap':
await query.message.edit_text("❌ Swap cancelled")
elif query.data == 'register':
await query.message.reply_text(
"Let's register your wallet addresses! 📝\n\n"
"First, please enter your Solana wallet address:"
)
return SOLANA_WALLET
elif query.data == 'swap':
print(query)
user_id = query.from_user.id
user = users_collection.find_one({"_id": user_id})
if not user:
await query.message.reply_text("Please register first using /register")
return
await query.message.reply_text("Please use the /swap <amount> command to start a swap")
elif query.data == 'help':
help_text = (
"🔹 Use /register to set up your wallet addresses\n"
"🔹 Use /swap to start a new swap\n"
"🔹 Use /getstatus to check swap status\n"
"🔹 Use /cancel to cancel any ongoing operation"
)
await query.message.reply_text(help_text)
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle errors in the telegram bot."""
# Log the error
print(f"Update {update} caused error {context.error}")
# Send a message to the user
error_message = (
"❌ An error occurred while processing your request.\n"
"Please try again later or contact support if the issue persists."
)
try:
if update and update.effective_message:
await update.effective_message.reply_text(error_message)
except Exception as e:
print(f"Failed to send error message: {e}")
async def get_history(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
user = users_collection.find_one({"_id": user_id})
if not user or "transactions" not in user:
await update.message.reply_text("No transaction history found.")
return
history = "📜 Transaction History:\n\n"
for tx in user["transactions"][-5:]: # Show last 5 transactions
history += (
f"Amount: {tx['amount_usdc']} USDC → {tx['amount_btc']} BTC\n"
f"Status: {tx['status'].upper()}\n"
f"Date: {tx['timestamp'].strftime('%Y-%m-%d %H:%M:%S UTC')}\n\n"
)
await update.message.reply_text(history)
async def get_status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the getstatus command with transaction ID"""
# Check if transaction ID was provided
if not context.args:
await update.message.reply_text(
"Please provide a transaction ID: /getstatus <transaction_id>"
)
return
tx_id = context.args[0]
try:
statusMessage = await get_status(tx_id)
await update.message.reply_text(statusMessage)
except Exception as e:
await update.message.reply_text(f"❌ Error fetching status: {str(e)}")
# New function to process the actual swap
async def process_swap(message, amount, context):
user_id = message.chat.id # Changed from message.from_user.id
user = users_collection.find_one({"_id": user_id})
if not user:
await message.reply_text("❌ Please register first using /register")
return
if "sol_wallet" not in user or "btc_address" not in user:
await message.reply_text("❌ Please register both your Solana and Bitcoin addresses first")
return
progress_message = await message.reply_text(
f"🔄 Processing swap\.\.\.\n\n⏳ Step 1/4: Verifying deposit\.\.\.\nPlease send USDC to the address below:\n```{INTERMEDIARY_SOL_WALLET}```",
parse_mode="MarkdownV2"
)
try:
fee = amount * FEE_PERCENTAGE
amount_after_fee = amount - fee
# Verify deposit - updated to use correct field name
deposit_verified = await verify_usdc_deposit(amount_after_fee, user["sol_wallet"], users_collection)
if not deposit_verified:
await progress_message.edit_text(
"❌ Deposit verification timed out after 10 minutes.\n"
"The swap has been cancelled. Please try again with a new swap."
)
return
await progress_message.edit_text(
"🔄 Processing swap...\n\n"
"✅ Step 1/4: Deposit verified\n"
"⏳ Step 2/4: Getting rate preview..."
)
rate_preview = await get_rate_preview(amount_after_fee)
await progress_message.edit_text(
"🔄 Processing swap...\n\n"
"✅ Step 1/4: Deposit verified\n"
"✅ Step 2/4: Rate confirmed\n"
"⏳ Step 3/4: Creating transactions..."
)
# Execute transactions
signed_tx = await create_signed_jupiter_swap_tx(3)
# Initialize ChangeNOW Swap
tx_id, payin_address, amount_received = initiate_change_now_swap(amount_after_fee, user["btc_address"])
if abs(amount_received - rate_preview) / rate_preview > 0.20:
await progress_message.edit_text(
"❌ Rate changed significantly. Please try again.\n"
f"Expected: {format(rate_preview, '.8f')} BTC\n"
f"Current: {format(amount_received, '.8f')} BTC\n"
f"Percentage difference: {abs(amount_received - rate_preview) / rate_preview * 100:.2f}%"
)
return
await progress_message.edit_text(
"🔄 Processing swap...\n\n"
"✅ Step 1/4: Deposit verified\n"
"✅ Step 2/4: Rate confirmed\n"
"✅ Step 3/4: Transactions created\n"
"⏳ Step 4/4: Executing swap..."
)
signed_transfer_tx = await create_signed_usdc_transfer_tx(
USDC_MINT,
USDC_DECIMALS,
payin_address,
amount_after_fee
)
bundle_id, status, landed_slot = await send_bundle_with_tip([signed_tx, signed_transfer_tx], 1000000)
if status == BundleStatus.LANDED:
await progress_message.edit_text(
"✅ Swap initiated successfully!\n\n"
f"Transaction ID: `{tx_id}`\n\n"
"Use /getstatus {tx_id} to check the status of your swap.",
parse_mode='Markdown'
)
# Update database - use correct field names in the query
users_collection.update_one(
{"_id": user_id},
{
"$push": {
"transactions": {
"amount_usdc": amount,
"amount_btc": rate_preview,
"status": "pending",
"timestamp": datetime.now(UTC),
"bundle_id": bundle_id,
"change_now_tx_id": tx_id,
"landed_slot": landed_slot
}
}
}
)
else:
await progress_message.edit_text(
"❌ Swap failed. Please try again."
)
except Exception as e:
await progress_message.edit_text(
f"❌ Swap failed\n\n"
f"Error: {str(e)}\n\n"
f"Please try again or contact support if the issue persists."
)
# Add this new handler function
async def custom_amount_input(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
amount = float(update.message.text.strip())
context.args = [str(amount)]
await swap(update, context)
except ValueError:
await update.message.reply_text("❌ Please enter a valid number")
return ConversationHandler.END
# Main Application Setup
def main():
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
application = Application.builder().token(TOKEN).build()
# Core handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("swap", swap))
application.add_handler(CommandHandler("getstatus", get_status_command))
application.add_handler(CommandHandler("checkrate", check_rate))
application.add_handler(CommandHandler("history", get_history))
# Registration conversation handler
register_handler = ConversationHandler(
entry_points=[
CommandHandler('register', register),
CallbackQueryHandler(button_callback, pattern='^swap_custom$')
],
states={
SOLANA_WALLET: [MessageHandler(filters.TEXT & ~filters.COMMAND, solana_wallet_input)],
BITCOIN_ADDRESS: [MessageHandler(filters.TEXT & ~filters.COMMAND, bitcoin_address_input)],
CUSTOM_AMOUNT: [MessageHandler(filters.TEXT & ~filters.COMMAND, custom_amount_input)]
},
fallbacks=[CommandHandler('cancel', cancel)]
)
application.add_handler(register_handler)
# Button callbacks and error handling
application.add_handler(CallbackQueryHandler(button_callback))
application.add_error_handler(error_handler)
application.run_polling()
if __name__ == '__main__':
main()