-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubnet_resolver.py
executable file
·113 lines (89 loc) · 3.67 KB
/
subnet_resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python
import scapy.all as scapy
import csv
import ipwhois
import ipaddress
NETFLIX_NET_MASKS_PATH = './netflix_net_masks.conf'
DOMAIN_CONFIG_PATH = './config/domains.conf'
INTERFACE = 'eth0'
identified_ipv4 = set()
identified_ipv6 = set()
netflix_domains = set()
def load_stored_net_masks(path):
with open(path, 'r') as net_mask_file:
loaded_mask_set = set()
reader = csv.reader(net_mask_file)
loaded_mask_list = list(reader)
for entry in loaded_mask_list:
ip, bit_mask = entry[0].split('/')
loaded_mask_set.add((ip, bit_mask))
return loaded_mask_set
def load_domain_config(path):
with open(path, 'r') as net_mask_file:
loaded_mask_set = set()
reader = csv.reader(net_mask_file)
loaded_mask_list = list(reader)
for entry in loaded_mask_list:
domain = entry[0]
loaded_mask_set.add(domain)
return loaded_mask_set
def get_subnet_for_ipv4(ip):
who_is = ipwhois.IPWhois(ip)
result = who_is.lookup_rdap()
print('Address {} belongs to {} with cidr {}.'.format(ip,
result.get('asn_description'),
result.get('asn_cidr')))
subnet, bit_mask = result.get('asn_cidr').split('/')
return subnet, bit_mask
def check_if_ip_in_stored_sub_nets(ip, sub_nets):
match_found = False
for stored_net_mask, stored_bit_mask in sub_nets:
stored_network = ipaddress.ip_network(stored_net_mask + "/" + stored_bit_mask)
ip = ipaddress.ip_address(ip)
if ip in stored_network.hosts():
print("Match found for stored subnet {}/{}, with ip {}.".format(stored_net_mask, stored_bit_mask, ip))
match_found = True
return match_found
def to_capture(packet):
domains = load_domain_config(DOMAIN_CONFIG_PATH)
capture = False
for domain in domains:
if domain in str(packet['DNS'].qd[0].qname):
capture = True
return capture
def process_sniffed_packet(packet):
if to_capture:
if packet['DNS'].ancount > 0:
answer_count = packet['DNS'].ancount
# print(packet.show())
for count in range(0, answer_count):
# Process ipv4 responses
if packet['DNS'].an[count].type == 1:
ip = packet['DNS'].an[count].rdata
identified_ipv4.add((packet['DNS'].an[count].rrname, ip))
# Process ipv6 responses
elif packet['DNS'].an[count].type == 28:
identified_ipv6.add((packet['DNS'].an[count].rrname, packet['DNS'].an[count].rdata))
# Process CNAME responses
elif packet['DNS'].an[count].type == 5:
print(packet.show())
def append_sub_nets_to_file(sub_nets, file_path):
with open(file_path, mode='a') as file:
for processed_subnet in sub_nets:
file.write(processed_subnet + '\n')
def sniff(interface):
scapy.sniff(iface=interface, store=False, prn=process_sniffed_packet, filter='udp port 53')
def main():
netflix_net_masks = load_stored_net_masks(NETFLIX_NET_MASKS_PATH)
sniff(INTERFACE)
new_sub_nets = set()
for url, ip in identified_ipv4:
print("Process {}.".format(ip))
match_found = check_if_ip_in_stored_sub_nets(ip, netflix_net_masks)
if not match_found:
sub_net, net_mask = get_subnet_for_ipv4(ip)
new_sub_nets.add(sub_net+"/"+net_mask)
netflix_net_masks.add((sub_net, net_mask))
append_sub_nets_to_file(new_sub_nets, NETFLIX_NET_MASKS_PATH)
if __name__ == '__main__':
main()