forked from cve-search/cve-search
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcpelist.py
191 lines (177 loc) · 6.5 KB
/
cpelist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# CPEList class, used in black-and whitelists
#
# Software is free software released under the "GNU Affero General Public License v3.0"
#
# Copyright (c) 2014-2018 Pieter-Jan Moreels - [email protected]
# Imports
# make sure these modules are available on your system
import os
import sys
runPath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(runPath, ".."))
import json
from lib.Toolkit import toStringFormattedCPE
import lib.DatabaseLayer as db
class CPEList:
def __init__(self, collection, args):
self.collection = collection.title()
self.args = args
# check if there are items in the collection
def countItems(self):
return db.getSize("mgmt_"+self.collection.lower())
# check if a cpe is in the list
def check(self, cpe):
return getattr(db,"isIn"+self.collection)(cpe)
# insert to database
def insert(self, cpe, cpeType):
try:
# split comments from cpe
comments = cpe.split('#')
del comments[0]
cpeID = cpe.split('#')[0]
if cpeType.lower() == "cpe":
cpeID = toStringFormattedCPE(cpeID)
# check format
if cpeID:
# already in db?
if not self.check(cpeID):
getattr(db, "addTo"+self.collection)(cpeID, cpeType, comments)
return True
return False
except Exception as ex:
print("Error inserting item in database: %s"%(ex))
sys.exit()
# remove a cpe from the list
def remove(self, cpe):
try:
cpe = cpe.strip()
# translate cpe
if toStringFormattedCPE(cpe): cpe = toStringFormattedCPE(cpe)
# check if the cpe is in the list
if self.check(cpe):
getattr(db, "removeFrom"+self.collection)(cpe)
return True
else:
return False
except Exception as ex:
print("Error removing item from database: %s"%(ex))
sys.exit()
def update(self, cpeOld, cpeNew, cpeType):
try:
cpeOld = cpeOld.strip()
cpeNew = cpeNew.strip()
# translate cpes
cpeOld = toStringFormattedCPE(cpeOld)
cpeNew = toStringFormattedCPE(cpeNew)
if cpeOld and cpeNew:
# already in db?
if self.check(cpeOld.split('#')[0]):
cpeID = cpeNew.split('#')[0]
cpeID.strip()
# comments
comments = cpeNew.split('#')
del comments[0]
getattr(db, "update"+self.collection)(cpeOld.split('#')[0], cpeID, cpeType, comments)
return True
return False
except Exception as ex:
print(ex)
print("Error updating item in database: %s"%(ex))
sys.exit()
# drop the collection
def dropCollection(self):
try:
count = self.countItems()
db.drop("mgmt_"+self.collection.lower())
if self.args.v:
print("collection of %s items dropped"%(count))
except Exception as ex:
print("Error dropping the database: %s"%(ex))
sys.exit()
# import a file that represents the cpe list
def importList(self, importFile):
count = 0
# read each line from the import file and regex them to a cpe format
try:
for line in json.load(importFile):
try:
t = line['type']
if t not in ['cpe', 'targetsoftware', 'targethardware']:
continue
cpe = line['id']
if 'comments' in line:
cpe += "#" + "#".join(line['comments'])
if self.insert(cpe, t):
count += 1
except:
continue
if self.args.v:
print("%s products added to the list"%(count))
except IOError:
print('The list is corrupted!')
sys.exit()
# export a file that represents the cpe list
def exportList(self, exportFile=None):
listed = getattr(db, "get"+self.collection)()
output = json.dumps(listed, sort_keys=True, indent=2)
if exportFile == None:
return output
else:
if not os.path.exists(exportFile) or self.args.f:
export = open(exportFile, 'w')
export.write(output)
export.close()
if self.args.v:
print("%s listed items exported"%(len(listed)))
else:
print("file already exists")
# process the arguments and use it to take actions
def process(self):
if self.args.d:
# drop the list
self.dropCollection()
elif self.args.i:
# get import file
textfile = self.args.i
# check if the collection is empty
count = self.countItems()
if count > 0 and self.args.f is False:
# not empty and not forced to drop
print("list already populated")
else:
# drop collection and repopulate it
self.dropCollection()
self.importList(open(textfile))
elif self.args.e:
# get export file
textfile = self.args.e
self.exportList(textfile)
elif self.args.a or self.args.A:
# get list of cpe's to add
if self.args.a:
cpeList = self.args.a
else:
cpeList = [x for x in open(self.args.A[0])]
# add each item from the list
count = 0
for cpeID in cpeList:
if self.insert(cpeID,self.args.t):
count += 1
if self.args.v:
print("%s products added to the list"%(count))
elif self.args.r or self.args.R:
# get list of cpe's to remove
if self.args.r:
cpeList = self.args.r
else:
cpeList = [x for x in open(self.args.R[0])]
# remove each item from the list
count = 0
for cpeID in cpeList:
amount = self.remove(cpeID)
count += amount
if self.args.v:
print("%s products removed from the list"%(count))