-
Notifications
You must be signed in to change notification settings - Fork 2
/
proxy.py
executable file
·116 lines (103 loc) · 3.08 KB
/
proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# proxy.py
#
# Copyright 2021 Alvarito050506 <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
import sys
import socket
import threading
class Proxy:
def __init__(self):
self.__options = {
"src_addr": None,
"src_port": 19132,
"dst_port": 19133
};
self.__running_lock = threading.Lock();
self.__running = 0;
def set_option(self, name, value):
if name in self.__options:
self.__options[name] = value;
else:
raise NameError(name);
return self.__options;
def get_options(self):
return self.__options;
def run(self):
self.__running_lock.acquire();
self.__running += 1;
self.__running_lock.release();
dst_addr = ("0.0.0.0", self.__options["dst_port"]);
try:
proc_addr = socket.gethostbyname_ex(self.__options["src_addr"])[2][0]
except socket.gaierror:
print("Error: Invalid address.");
return 1;
src_addr = (proc_addr, self.__options["src_port"]);
client_addr = None;
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP);
self.__socket.bind(dst_addr);
self.__socket.setblocking(False);
while True:
self.__running_lock.acquire();
condition = self.__running < 1;
self.__running_lock.release();
if condition:
# End Loop
break;
try:
data, addr = self.__socket.recvfrom(4096);
if addr == src_addr:
self.__socket.sendto(data, client_addr);
else:
if client_addr is None or client_addr[0] == addr[0]:
client_addr = addr;
self.__socket.sendto(data, src_addr);
except:
# No Data Available
pass
self.__socket.close();
return 0;
def stop(self):
self.__running_lock.acquire();
self.__running -= 1;
self.__running_lock.release();
return 0;
if __name__ == '__main__':
args = sys.argv;
if len(args) < 2:
print("Error: You must provide a source address.");
print("Usage: " + args[0] + " src_addr [src_port [dst_port]]");
print("Where src_addr is a valid internet address and src_port and dst_port are valid internet ports.")
sys.exit(1);
proxy = Proxy();
proxy.set_option("src_addr", args[1]);
if len(args) > 2:
proxy.set_option("src_port", int(args[2]));
if len(args) > 3:
proxy.set_option("dst_port", int(args[3]));
options = proxy.get_options();
print(options["src_addr"] + ":" + str(options["src_port"]) + " --> 0.0.0.0:" + str(options["dst_port"]));
try:
proxy.run();
except KeyboardInterrupt:
proxy.stop();
print("");
sys.exit(0);