-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaction.py
181 lines (149 loc) · 7.59 KB
/
action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# action.py: create graph edges
#
# Copyright (C) 2014 Politecnico di Torino, Italy
# TORSEC group -- http://security.polito.it
#
# Author: Roberto Sassu <[email protected]>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see
# <http://www.gnu.org/licenses/>.
from selinux import *
from structs import *
from util import *
class Action(object):
def __init__(self):
return
class DBLibrariesAction(Action):
def get_exec_libraries(self, lib_list):
l = set()
for lname in lib_list:
if len(lname) == 0:
continue
if lname.startswith('linux-vdso') or lname.startswith('linux-gate'):
continue
obj = Object.get(lname)
# No object with this lname exists. Add a fake digest to it
if not hasattr(obj, 'digests'):
fake_digest = Digest.get(None, lname, True)
fake_digest.libraries = []
fake_digest.lib_aliases = [lname]
fake_digest.severity_level = 'fake-lib'
# the full path is not available, use the file name
fake_digest.fullpath = lname
obj.digests = set([fake_digest])
# we have to consider all digests as for the same file name there
# may be different libraries which may have different dependencies
l.update(obj.digests)
for lib in obj.digests:
if lib in self.libraries_deps_cache:
result = self.libraries_deps_cache[lib]
else:
result = self.get_exec_libraries(lib.libraries)
self.libraries_deps_cache[lib] = result
l.update(result)
return l
def __init__(self, conn, distro, graph):
self.libraries_deps_cache = {}
Digest.execute_digests_query(conn, distro)
unknown_digests = [digest for digest in Digest.digests_dict.values()
if digest.event_type == '' and not digest.is_fake]
for subj in Subject.subj_label_dict.values():
# Link unknown digests to set subject severity level to not-found.
# Without knowing what is the event type of an unknown digest,
# it may have been executed by a process.
# This link must be created even if all executable dependencies
# have been recognized as these libraries may have been used by
# other executables (it is possible to load an arbitrary library
# for a process by using the LD_PRELOAD environment variable.
for digest in unknown_digests:
event_name = digest.ima_records[0].entry['event_name']
digest.fullpath = event_name
digest.libraries = []
obj = Object.get(os.path.basename(event_name))
if not hasattr(obj, 'digests'):
obj.digests = set()
obj.digests.add(digest)
graph.add_edge(digest, obj, edge_tag_digest = True)
graph.add_edge(obj, subj, edge_tag_digest = True)
for subj_digest in subj.digests:
graph.add_edge(subj_digest, subj, edge_tag_digest = True)
for library in self.get_exec_libraries(subj_digest.libraries):
obj = Object.get(os.path.basename(library.fullpath))
for obj_digest in obj.digests:
graph.add_edge(obj_digest, obj, edge_tag_digest = True)
graph.add_edge(obj, subj, edge_tag_exec = True)
# Different operations on the same digest, how they should be considered?
# We cannot have granularity of single executable as we don't know to which
# executable a library has been mapped to.
class LSMLabelLoadAction(Action):
def __init__(self, conn, distro, graph):
for r in IMARecord.records:
hook = int(r.get_data(HOOK_ID_FIELD))
mask = int(r.get_data(HOOK_MASK_FIELD))
graph.add_edge(r.digest, r.obj, edge_tag_digest = True)
# handle violations by connecting an unknown digest to a subject
if ima_hooks[hook] in ['BPRM_CHECK', 'MMAP_CHECK', 'MODULE_CHECK'] \
or (ima_hooks[hook] == 'RDWR_VIOLATION_CHECK' and \
mask == MAY_EXEC):
graph.add_edge(r.obj, r.subj, edge_tag_exec = True)
# Consider the impact only of read data (execution events are
# considered only for the other system calls, execve and mmap).
# Execution will be taken into account during the information flow
# analysis.
elif ima_hooks[hook] in ['FILE_CHECK', 'RDWR_VIOLATION_CHECK'] and \
mask & MAY_READ:
graph.add_edge(r.obj, r.subj, edge_tag_data_read = True)
class LSMLabelFlowAction(Action):
def __init__(self, conn, distro, graph):
for r in IMARecord.records:
hook = int(r.get_data(HOOK_ID_FIELD))
mask = int(r.get_data(HOOK_MASK_FIELD))
if ima_hooks[hook] in ['FILE_CHECK', 'RDWR_VIOLATION_CHECK']:
if mask & MAY_READ or mask & MAY_EXEC:
graph.add_edge(r.obj, r.subj, edge_tag_flow = True)
if mask & MAY_WRITE:
graph.add_edge(r.subj, r.obj, edge_tag_flow = True)
class LSMLabelInodeFlowAction(Action):
def __init__(self, conn, distro, graph):
for r in IMARecord.records:
hook = int(r.get_data(HOOK_ID_FIELD))
mask = int(r.get_data(HOOK_MASK_FIELD))
if ima_hooks[hook] in ['FILE_CHECK', 'RDWR_VIOLATION_CHECK']:
if mask & MAY_READ or mask & MAY_EXEC:
graph.add_edge(r.obj, r.subj, edge_tag_flow = True)
if mask & MAY_WRITE:
graph.add_edge(r.subj, r.obj, edge_tag_flow = True)
if r.flows_new_record == r and r.last_flows_new_record != r:
for node in graph.predecessors(r.obj):
if not isinstance(node, Subject):
continue
graph.add_edge(node, r.obj, edge_tag_flow = True)
class LSMLabelSELinuxAction(Action):
def __init__(self, conn, distro, graph, selinux_policy_path):
subjs = [s.split(':')[2] for s in Subject.subj_label_dict.keys()]
s = SELinux(active_processes = subjs, policy_path = selinux_policy_path)
for subj_label in s.reads:
subj = Subject.get_by_type(subj_label)
for obj_label in s.reads[subj_label]:
obj = Object.get('undefined_u:undefined_r:%s:undefined_level' %
obj_label)
graph.add_edge(obj, subj, edge_tag_flow = True)
for subj_label in s.writes:
subj = Subject.get_by_type(subj_label)
for obj_label in s.writes[subj_label]:
obj = Object.get('undefined_u:undefined_r:%s:undefined_level' %
obj_label)
graph.add_edge(subj, obj, edge_tag_flow = True)