-
Notifications
You must be signed in to change notification settings - Fork 2
/
test.py
70 lines (51 loc) · 2.32 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from bcc import BPF, libbcc
import ctypes
from scapy.all import *
class TestSuite(object):
xdp_retvals = {val: name for name, val in BPF.__dict__.items()
if name.startswith("XDP_")}
def compile_and_load(self, src_file, func_name):
bpf = BPF(src_file=src_file)
self.func = bpf.load_func(func_name, BPF.XDP)
def run_test(self, data, data_out_expect, retval_expect,
repeat=1, data_out_len=1514):
size = len(data)
data = ctypes.create_string_buffer(raw(data), size)
data_out = ctypes.create_string_buffer(data_out_len)
size_out = ctypes.c_uint32()
retval = ctypes.c_uint32()
duration = ctypes.c_uint32()
ret = libbcc.lib.bpf_prog_test_run(self.func.fd, repeat,
ctypes.byref(data), size,
ctypes.byref(data_out),
ctypes.byref(size_out),
ctypes.byref(retval),
ctypes.byref(duration))
assert ret == 0
assert self.xdp_retvals[retval.value] == self.xdp_retvals[retval_expect]
if data_out_expect:
assert data_out[:size_out.value] == raw(data_out_expect)
class TestPass(TestSuite):
def setup_method(self):
self.compile_and_load(b"xdp.c", b"pass_all_packets")
def test_tcp_pass(self):
packet_in = Ether() / IP() / TCP()
self.run_test(packet_in, None, BPF.XDP_PASS)
def test_udp_pass(self):
packet_in = Ether() / IP() / UDP()
self.run_test(packet_in, None, BPF.XDP_PASS)
class TestDrop(TestSuite):
def setup_method(self):
self.compile_and_load(b"xdp.c", b"drop_ipv4_tcp_80")
def test_ipv4_tcp_80(self):
packet_in = Ether() / IP() / TCP(dport=80)
self.run_test(packet_in, None, BPF.XDP_DROP)
def test_ipv4_udp_80(self):
packet_in = Ether() / IP() / UDP(dport=80)
self.run_test(packet_in, packet_in, BPF.XDP_PASS)
def test_ipv4_tcp_443(self):
packet_in = Ether() / IP() / TCP(dport=443)
self.run_test(packet_in, packet_in, BPF.XDP_PASS)
def test_ipv6_tcp_80(self):
packet_in = Ether() / IPv6() / TCP(dport=443)
self.run_test(packet_in, packet_in, BPF.XDP_PASS)