-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathwinobj.py
487 lines (411 loc) · 17.9 KB
/
winobj.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
# Creator: Aviel Zohar ([email protected])
import logging
from typing import List
from volatility3.framework import renderers, interfaces, objects, exceptions, constants
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
import volatility3.plugins.windows.info as info
import volatility3.plugins.windows.handles as handles
from volatility3.plugins.windows import pslist
vollog = logging.getLogger(__name__)
#Globals
NAME = 0x1
ADDR = 0x0
HEADER = 0x2
VALUES = 0x1
ADDITIONAL_INFO = 0x3
class WinObj(interfaces.plugins.PluginInterface):
"""
Object Manager Enumeration
"""
_required_framework_version = (2, 0, 0)
_version = (2, 0, 0)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config['primary'] = self.context.modules[self.config['kernel']].layer_name
self.config['nt_symbols'] = self.context.modules[self.config['kernel']].symbol_table_name
self.kaddr_space = self.config['primary']
self.kvo = self.context.layers[self.config['primary']].config["kernel_virtual_offset"]
self.ntkrnlmp = self._context.module(self.config['nt_symbols'],
layer_name=self.kaddr_space,
offset=self.kvo)
# Get the cookie (or none if this version dont use cookie).
try:
offset = self.context.symbol_space.get_symbol(self.config["nt_symbols"] + constants.BANG + "ObHeaderCookie").address
kernel = self.context.modules[self.config["kernel"]]
physical_layer_name = self.context.layers[kernel.layer_name]
kvo = self.context.layers[kernel.layer_name].config['kernel_virtual_offset']
self.cookie = self.context.object(self.config["nt_symbols"] + constants.BANG + "unsigned int" , self.config["primary"], offset=kvo + offset)
except exceptions.SymbolError:
self.cookie = None
self._protect_values = None
self.root_obj_list = []
self.tables = {}
self.exlude_types = []
# Sets default values of a 64 bit machine,
#the values will be updated according to the profile
self.POINTER_SIZE = 0x8
self.OBJECT_HEADER_QUOTA_INFO_SIZE = 0x20
self.OBJECT_HEADER_PROCESS_INFO_SIZE = 0x10
self.OBJECT_HEADER_HANDLE_INFO_SIZE = 0x10
self.OBJECT_HEADER_NAME_INFO_SIZE = 0x20
self.OBJECT_HEADER_CREATOR_INFO_SIZE = 0x20
self.OBJECT_HEADER_NAME_INFO_ID = 0x2
self.OBJECT_HEADER_CREATOR_INFO_ID = 0x1
self.OBJECT_HEADER_HANDLE_INFO_ID = 0x4
self.OBJECT_HEADER_QUOTA_INFO_ID = 0x8
self.OBJECT_HEADER_PROCESS_INFO_ID = 0x10
self.OBJECT_HEADER_SIZE = 0x30
self.OBJECT_POOL_HEADER = 0x10
self.OBJECT_INFO_HEADERS_LIST = [self.OBJECT_HEADER_CREATOR_INFO_ID,
self.OBJECT_HEADER_HANDLE_INFO_ID,
self.OBJECT_HEADER_QUOTA_INFO_ID,
self.OBJECT_HEADER_NAME_INFO_ID,
self.OBJECT_HEADER_PROCESS_INFO_ID]
self.OBJECT_INFO_HEADERS_ID_TO_SIZE ={self.OBJECT_HEADER_NAME_INFO_ID: self.OBJECT_HEADER_NAME_INFO_SIZE,
self.OBJECT_HEADER_CREATOR_INFO_ID: self.OBJECT_HEADER_CREATOR_INFO_SIZE,
self.OBJECT_HEADER_HANDLE_INFO_ID : self.OBJECT_HEADER_HANDLE_INFO_SIZE,
self.OBJECT_HEADER_QUOTA_INFO_ID : self.OBJECT_HEADER_QUOTA_INFO_SIZE,
self.OBJECT_HEADER_PROCESS_INFO_ID: self.OBJECT_HEADER_PROCESS_INFO_SIZE}
self.type_map = handles.Handles.get_type_map(self.context, self.config["primary"], self.config["nt_symbols"])
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
# Since we're calling the plugin, make sure we have the plugin's requirements
return [requirements.ModuleRequirement(name='kernel', description='Windows kernel',
architectures=["Intel32", "Intel64"]),
requirements.SymbolTableRequirement(name="nt_symbols", description="Windows kernel symbols"),
requirements.BooleanRequirement(name='PARSE_ALL',
description='Parse every directory under the root dir',
optional=True),
requirements.StringRequirement(name='SUPPLY_ADDR',
description='Parse directories under specific addresses',
optional=True),
requirements.StringRequirement(name='FULL_PATH',
description='Parse a directory found by full path location',
optional=True),
]
def get_root_directory(self):
"""
:return : a pointer to the root directory
"""
# gets the pointer
# if for some reason ObpRootDirectoryObject not exist lets take the value from ObpRootDirectoryObject
try:
import struct
_pointer_struct = struct.Struct("<Q") if self.ntkrnlmp.get_type('pointer').size == 8 else struct.Struct('I')
kernel = self.context.modules[self.config["kernel"]]
root_dir_addr = int(_pointer_struct.unpack(
self.context.layers[kernel.layer_name].read(self.ntkrnlmp.get_symbol('ObpRootDirectoryObject').address + self.ntkrnlmp.offset, self.ntkrnlmp.get_type('pointer').size))[0])
except:
root_dir_addr = info.Info.get_kdbg_structure(self.context, self.config_path, self.config['primary'], self.config['nt_symbols']).ObpRootDirectoryObject
root_dir_addr = self.ntkrnlmp.object("pointer", offset=root_dir_addr - self.ntkrnlmp.offset)
return root_dir_addr
def update_sizes(self):
"""
:return : None
the function will update the sizes of the vtype objects according to their sizes from the selected profile
"""
# updates pointer size
self.POINTER_SIZE = self.ntkrnlmp.get_type("pointer").size
# checks if the profile has the structure
try:
self.OBJECT_HEADER_QUOTA_INFO_SIZE = self.ntkrnlmp.get_type("_OBJECT_HEADER_QUOTA_INFO").size
except:
self.OBJECT_HEADER_QUOTA_INFO_SIZE = 0x0
# checks if the profile has the structure
try:
self.OBJECT_HEADER_PROCESS_INFO_SIZE = self.ntkrnlmp.get_type("_OBJECT_HEADER_PROCESS_INFO").size
except:
self.OBJECT_HEADER_PROCESS_INFO_SIZE = 0x0
# checks if the profile has the structure
try:
self.OBJECT_HEADER_HANDLE_INFO_SIZE = self.ntkrnlmp.get_type("_OBJECT_HEADER_HANDLE_INFO").size
except:
self.OBJECT_HEADER_HANDLE_INFO_SIZE = 0
# checks if the profile has the structure
try:
self.OBJECT_HEADER_CREATOR_INFO_SIZE = self.ntkrnlmp.get_type("_OBJECT_HEADER_CREATOR_INFO").size
except:
self.OBJECT_HEADER_CREATOR_INFO_SIZE = 0x0
self.OBJECT_HEADER_NAME_INFO_SIZE = self.ntkrnlmp.get_type("_OBJECT_HEADER_NAME_INFO").size
# subtract 0x8 from the size to remove the body itself (the last member of the _object_header)
self.OBJECT_HEADER_SIZE = self.ntkrnlmp.get_type('_OBJECT_HEADER').relative_child_offset('Body')
def get_all_object_headers(self, mask):
"""
:param mask: InfoMask from the object header
:return : list
the function will return all the info headers that present in the object
"""
present_info_headers = []
for info_id in self.OBJECT_INFO_HEADERS_LIST:
# checks if the header presents
if mask & info_id != 0:
present_info_headers.append(info_id)
return present_info_headers
def get_additional_info(self, myObj, obj_type, obj_header):
"""
:param myObj : pointer object
:param obj_type : string of the type
:param obj_header: "_OBJECT_HEADER"
:return : list
the function will return additional information about the object
"""
layer_name = self.config['primary']
kvo = self.context.layers[layer_name].config["kernel_virtual_offset"]
# additional information about SymbolicLink
if obj_type == "SymbolicLink":
myObj = self.ntkrnlmp.object("pointer", offset=myObj.vol.offset - kvo).cast("_EX_FAST_REF").dereference().cast(
'_OBJECT_SYMBOLIC_LINK')
return "Target: {}".format(myObj.LinkTarget.get_string())
# additional information about Section
elif obj_type == "Section" and self.ntkrnlmp.has_type("_OBJECT_HEADER"):
try:
if self.ntkrnlmp.has_type('_SECTION_OBJECT'):
myObj = self.ntkrnlmp.object("pointer", offset=myObj.vol.offset - kvo).cast("_EX_FAST_REF").dereference().cast(
'_SECTION_OBJECT').Segment.dereference().cast("_SEGMENT").ControlArea
else:
# Windows 10 rename _SECTION_OBJECT -> _SECTION
myObj = self.ntkrnlmp.object("pointer", offset=myObj.vol.offset - kvo).cast(
"_EX_FAST_REF").dereference().cast('_SECTION').u1.ControlArea
except:
return "(parse object failed on address: {}".format(myObj)
# the default is "_SEGMENT_OBJECT", and we need _SEGMENT
try:
fileName = myObj.FilePointer.dereference().cast("_FILE_OBJECT").file_name_with_device()
except:
return "(parse file name failed on address: {}".format(myObj)
return "FileObj: {}".format(fileName)
# additional information about Driver
elif obj_type == "Driver":
driver = self.ntkrnlmp.object("pointer", offset=myObj.vol.offset - kvo).cast("_EX_FAST_REF").dereference().cast(
'_DRIVER_OBJECT')
try:
return "Full Name: {}".format(driver.DriverName.String)
except:
return "(parse name failed on address: {}".format(driver)
# additional information about Device
elif obj_type == "Device":
device = self.ntkrnlmp.object("pointer", offset=myObj.vol.offset - kvo).cast("_EX_FAST_REF").dereference().cast(
'_DEVICE_OBJECT')
try:
return "Driver: {}".format(device.DriverObject.DriverName.String)
except:
return "(parse name failed on address: {}".format(device)
# additional information about Type
elif obj_type == "Type":
myType = self.ntkrnlmp.object("pointer", offset=myObj.vol.offset - kvo).cast("_EX_FAST_REF").dereference().cast(
'_OBJECT_TYPE')
key = self.ntkrnlmp.object("string", offset=myType.Key.vol.offset - kvo, max_length=4, errors="replace")
return "Key: {}".format(key)
# additional information about Window Station (Not supported yet..)
elif obj_type == "WindowStation" and False:
win_sta = self.ntkrnlmp.object("pointer", offset=myObj.vol.offset - kvo).cast("_EX_FAST_REF").dereference().cast(
'tagWINDOWSTATION')
names = "".join("{} ".format(Desktop.Name) for Desktop in win_sta.desktops()).strip()
session_id = win_sta.dwSessionId
atom_table = hex(win_sta.pGlobalAtomTable)[:-1]
return "Desktop Names:{},Session Id:{},Atoms:{}".format(names,session_id,atom_table)
# additional information about all the others
else:
return "Handle Count - {}, Pointer Count {}".format(obj_header.HandleCount,obj_header.PointerCount)
def GetName(self, obj_header):
"""
:param obj_header: "_OBJECT_HEADER"
:return : string
the function will return the name of the object
"""
# When this work in volatility for all version just replace the function with this
#try:
# name_info = obj_header.NameInfo()
# return name_info.Name.get_string()
#except:
# return ''
# checks if this is an old version
if self.ntkrnlmp.get_type("_OBJECT_HEADER").has_member('NameInfoOffset'):
size = obj_header.NameInfoOffset
# new version
else:
try:
info_headers = self.get_all_object_headers(obj_header.InfoMask)
except:
return ""
# calculates the size according to the info headers
if self.OBJECT_HEADER_CREATOR_INFO_ID in info_headers:
size = self.OBJECT_HEADER_NAME_INFO_SIZE + self.OBJECT_HEADER_CREATOR_INFO_SIZE
else:
size = self.OBJECT_HEADER_NAME_INFO_SIZE
layer_name = self.config['primary']
kvo = self.context.layers[layer_name].config["kernel_virtual_offset"]
name_info = self.ntkrnlmp.object("_OBJECT_HEADER_NAME_INFO", offset=obj_header.vol.offset - kvo - size)
# checks that the name is not empty
if name_info.Name:
# validates the name
#if name_info.Name.Buffer and name_info.Name.Length <= name_info.Name.MaximumLength:
try:
return name_info.Name.get_string()
except:
return ""
return ""
def AddToList(self, myObj, l):
"""
:param myObj : pointer object
:param l : list
:return : None
the function will add the object to the received list after a validation
"""
layer_name = self.config['primary']
kvo = self.context.layers[layer_name].config["kernel_virtual_offset"]
obj_header = self.ntkrnlmp.object("_OBJECT_HEADER", offset=myObj.cast('pointer').real - self.OBJECT_HEADER_SIZE - kvo)
# Make sure that there is no duplicated, and validate the pointer.
for item in l:
try:
if item[0] == myObj:
return
elif (not obj_header.is_valid()) or (obj_header.PointerCount < 1 and obj_header.HandleCount < 1) or \
(obj_header.PointerCount < 0 or obj_header.HandleCount < 0):
return
except:
return
name = self.GetName(obj_header)
# validates the object
if name:
obj_type = obj_header.get_object_type(self.type_map, self.cookie)
if obj_type in self.exlude_types:
return
add_info = self.get_additional_info(myObj, obj_type, obj_header)
l.append((myObj,name,obj_header,add_info))
def parse_directory(self, addr, l):
"""
:param addr : long, pointer the the driectory
:param l : list
:return : None
the function will parse the directory and add every valid object to the received list
"""
seen = set()
layer_name = self.config['primary']
kvo = self.context.layers[layer_name].config["kernel_virtual_offset"]
directory_array = self.ntkrnlmp.object('_OBJECT_DIRECTORY', addr - self.ntkrnlmp.offset)
for pointer_addr in directory_array.HashBuckets:
if not pointer_addr or pointer_addr == 0xffffffff:
continue
# Walk the ChainLink foreach item inside the directory.
while pointer_addr not in seen:
try:
myObj = self.ntkrnlmp.object("pointer", offset=pointer_addr+self.POINTER_SIZE - kvo)
self.AddToList(myObj, l)
except exceptions.InvalidAddressException:
pass
seen.add(pointer_addr)
try:
pointer_addr = pointer_addr.ChainLink
except exceptions.InvalidAddressException:
break
if not pointer_addr:
break
def get_directory(self, name="", root_dir=[]):
"""
:param name : string
:param root_dir : list of tuples
:return : None
the function will parse the root directory object and add every directory/given name,
to the tables dictionary
"""
l = []
name = str(name)
# checks whether a root dir was given or not
if not root_dir:
# default option
root_dir = self.root_obj_list
# parses the root directory
for obj,obj_name,obj_header,add_info in root_dir:
# if there is a specific name
if name:
# if this is the name that was received
if name.lower() == obj_name.lower():
self.parse_directory(obj, l)
self.tables[obj_name] = (obj.vol.offset, l)
break
# parse all
else:
# checks if object is a directory
if obj_header.get_object_type(self.type_map) == "Directory":
self.parse_directory(obj, l)
self.tables[obj_name] = (obj.vol.offset,l)
l = []
def SaveByPath(self, path):
"""
This function get a path to directory append all the data in this directory to self.tables
:param path: path in the object directory to get all the object information from.
:return:
"""
# validation
try:
# takes a copy in order to remove all stages from the final parser
save = self.tables.copy()
stages = path.split("/")[1:]
# allow backslashes as well
if len(stages) == 0:
stages = path.split("\\")[1:]
self.get_directory(stages[0])
addr,current_dir = self.tables[stages[0]]
for place,stage in enumerate(stages[1:]):
self.get_directory(stage,current_dir)
addr,current_dir = self.tables[stage]
# removes all stages
save_list = current_dir
self.tables = save
#sets the full path in the dictionary
self.tables[path] = (addr,current_dir)
except KeyError:
raise KeyError("Invalid Path -> {}".format(path))
def get_object_information(self):
"""
Check user parameters and start to get the information
:return: None
"""
# updates objects size
self.update_sizes()
# Get root directory
root_dir = self.get_root_directory()
self.parse_directory(root_dir, self.root_obj_list)
# checks for the SUPPLY_ADDR option
if self.config.get('SUPPLY_ADDR', None):
addrs = self.config.get('SUPPLY_ADDR', None).split(",")
for addr in addrs:
l = []
# validates the address
try:
addr = eval(addr)
# addr is not valid
except (SyntaxError,NameError):
continue
obj_header = self.ntkrnlmp.object("_OBJECT_HEADER", offset=addr-self.OBJECT_HEADER_SIZE - self.ntkrnlmp.offset)
name = self.GetName(obj_header)
# validates the directory
if name:
self.parse_directory(addr, l)
self.tables[name] = (addr, l)
# checks for the FULL_PATH option
elif self.config.get('FULL_PATH', None):
# gets all dirs
dirs = self.config.get('FULL_PATH', None).split(",")
for path in dirs:
self.SaveByPath(path)
# default option
else:
self.tables["/"] = (root_dir,self.root_obj_list)
# checks for the PARSE_ALL option
if self.config.get('PARSE_ALL', None):
self.get_directory()
def _generator(self):
self.get_object_information()
for table in self.tables:
l = self.tables[table][VALUES]
for obj in l:
yield (0,[hex(obj[ADDR]), str(obj[NAME]), str(obj[HEADER].get_object_type(self.type_map)), str(obj[ADDITIONAL_INFO])])
def run(self):
return renderers.TreeGrid([("Object Address(V)", str), ("Name", str), ("str", str), ("Additional Info", str)],
self._generator())