-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththread_scan.py
More file actions
executable file
·144 lines (127 loc) · 4.49 KB
/
thread_scan.py
File metadata and controls
executable file
·144 lines (127 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import socket
import re
import ssl
import sys
import subprocess
import time
import psutil
from concurrent.futures import ThreadPoolExecutor, as_completed
# Function to parse nmap-services file
def parse_nmap_services(file_path):
services = {}
with open(file_path, 'r') as file:
for line in file:
if not line.startswith("#") and line.strip():
parts = line.split()
service_name = parts[0]
port_proto = parts[1]
port = int(port_proto.split('/')[0])
services[port] = service_name
return services
nmap_services = parse_nmap_services('nmap-services')
def is_valid_ip(ip):
pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return pattern.match(ip) is not None
def get_ip_from_user():
ip = sys.argv[1]
return ip
def scan_port(ip, port):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex((ip, port))
if result == 0:
service_name = nmap_services.get(port, 'unknown')
version = None
if port == 80:
version = get_http_version(ip, port)
elif port == 443:
version = get_https_version(ip, port)
elif port == 554:
version = get_rtsp_version(ip, port)
if version:
return f"{port}/tcp open {service_name} {version}"
else:
banner = get_banner(ip, port)
if banner:
return f"{port}/tcp open {service_name} {banner}"
else:
return f"{port}/tcp open {service_name}"
except:
return None
def get_banner(ip, port):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
s.connect((ip, port))
s.send(b'\n')
banner = s.recv(1024).decode().strip()
return banner
except:
return None
def get_http_version(ip, port):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
s.connect((ip, port))
s.send(b"HEAD / HTTP/1.1\r\nHost: {}\r\n\r\n".format(ip).encode())
response = s.recv(1024).decode()
version = response.split('\r\n')[0]
return version
except:
return None
def get_https_version(ip, port):
try:
context = ssl.create_default_context()
with socket.create_connection((ip, port)) as sock:
with context.wrap_socket(sock, server_hostname=ip) as ssock:
ssock.settimeout(1)
ssock.send(b"HEAD / HTTP/1.1\r\nHost: {}\r\n\r\n".format(ip).encode())
response = ssock.recv(1024).decode()
version = response.split('\r\n')[0]
return version
except:
return None
def get_rtsp_version(ip, port):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
s.connect((ip, port))
s.send(b"OPTIONS rtsp://{}:554 RTSP/1.0\r\nCSeq: 1\r\n\r\n".format(ip).encode())
response = s.recv(1024).decode()
version = response.split('\r\n')[0]
return version
except:
return None
def check_host_up(ip):
try:
output = subprocess.check_output(['ping', '-c', '1', ip], stderr=subprocess.STDOUT)
return True
except subprocess.CalledProcessError:
return False
def main():
ip = get_ip_from_user()
start_port = 1
end_port = 10000
cores = 6
cores = psutil.cpu_count(logical=True)
print(f"number of cores in the cpu: {cores}")
threads = cores
if(check_host_up(ip)):
print("host is up : scanning.........")
else:
print("host is down :/")
return
st = time.time()
print(f"Scanning {ip} from port {start_port} to {end_port}...")
with ThreadPoolExecutor(max_workers=threads) as executor: # Use a thread pool with 12 workers
futures = [executor.submit(scan_port, ip, port) for port in range(start_port, end_port + 1)]
for future in as_completed(futures):
result = future.result()
if result:
print(result)
end = time.time()
duration = end - st
print(f"Duration of scanning is {duration:.2f} seconds")
if __name__ == "__main__":
main()