Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get hardware info #202

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9094949
listed archive files
maxdudek May 23, 2019
3a877ef
added transformation and replacement
maxdudek May 24, 2019
42d5a7b
added numa and ethernet support
maxdudek May 30, 2019
6522055
Cleaned up code
maxdudek May 30, 2019
8839076
Cleaned up code
maxdudek May 30, 2019
afbbd61
Fixed gpu
maxdudek May 31, 2019
664d643
Cleaned up code
maxdudek Jun 3, 2019
b76d429
Reorganized code, added extracting functions, resource_name
maxdudek Jun 3, 2019
676b3fa
removed data file
maxdudek Jun 3, 2019
6a6f6ca
Added logging info
maxdudek Jun 4, 2019
464928e
Added specific error checking
maxdudek Jun 5, 2019
9d46399
Better error checking
maxdudek Jun 5, 2019
585ac1c
Better error handling
maxdudek Jun 6, 2019
8880f14
Remove archives which reach the end
maxdudek Jun 7, 2019
f1d67de
Started to fix missing gpu data
maxdudek Jun 10, 2019
bc33973
Added DMI info
maxdudek Jun 12, 2019
c8c24fa
Started adding tests
maxdudek Jun 19, 2019
0914809
More testing
maxdudek Jun 20, 2019
807fadb
Finalized help
maxdudek Jun 21, 2019
ecbfb3d
Fix for shippable test
maxdudek Jun 21, 2019
b510fa6
pylint fix
maxdudek Jun 21, 2019
06d3bcd
clean up
maxdudek Jun 21, 2019
5d96fcc
renamed archives
maxdudek Jun 21, 2019
4c5676f
fix logs
maxdudek Jun 21, 2019
ee11f8b
Added patching of missing data
maxdudek Jun 26, 2019
1808908
Patching is a separate script
maxdudek Jun 27, 2019
24db5b4
Improved patching
maxdudek Jul 9, 2019
18f0236
Added better patching
maxdudek Jul 12, 2019
b51c48f
Simpler patching
maxdudek Jul 15, 2019
f0fddb8
More command line options, multi-file input
maxdudek Jul 16, 2019
dc8617b
Merge branch 'master' into better_patching
jpwhite4 Jul 18, 2019
2cbf3a3
fixed imports
maxdudek Jul 18, 2019
4d6e38c
Merge branch 'better_patching' of github.com:maxdudek/supremm into be…
maxdudek Jul 18, 2019
4fa6742
Better testing
maxdudek Jul 18, 2019
f4c98aa
Fixed expected output of test
maxdudek Jul 18, 2019
5335f8d
Added truncating
maxdudek Jul 24, 2019
bc3e83c
Removed TODOs
maxdudek Aug 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleaned up code
maxdudek committed Jun 3, 2019
commit 664d6435635e85d82ec6f2aa6c70cfa266544b38
262 changes: 119 additions & 143 deletions src/supremm/get_hardware_info.py
Original file line number Diff line number Diff line change
@@ -62,12 +62,6 @@
class PcpArchiveHardwareProcessor(object):
""" Parses a pcp archive and adds the archive information to the index """

def __init__(self, resconf):
self.hostname_mode = resconf['hostname_mode']
if self.hostname_mode == "fqdn":
self.hostnameext = resconf['host_name_ext']
# self.tz_adjuster = TimezoneAdjuster(resconf.get("timezone"))

@staticmethod
def getDataFromArchive(archive, host_from_path=None):
""" Open the pcp archive and get hardware data
@@ -82,169 +76,147 @@ def getDataFromArchive(archive, host_from_path=None):
pmfg = pmapi.fetchgroup(c_pmapi.PM_CONTEXT_ARCHIVE, archive)

metrics = {
"hinv.ncpu": {
"type": "item",
"alias": "ncpu",
'hinv.ncpu': {
'type': 'item',
'alias': 'ncpu',
},
"hinv.ndisk": {
"type": "item",
"alias": "ndisk",
'hinv.ndisk': {
'type': 'item',
'alias': 'ndisk',
},
"hinv.physmem": {
"type": "item",
"alias": "physmem",
'hinv.physmem': {
'type': 'item',
'alias': 'physmem',
},
"hinv.cpu.vendor": {
"type": "indom",
"alias": "manufacturer",
'hinv.cpu.vendor': {
'type': 'indom',
'alias': 'manufacturer',
},
"hinv.cpu.model_name": {
"type": "indom",
"alias": "model_name",
'hinv.cpu.model_name': {
'type': 'indom',
'alias': 'model_name',
},
"hinv.map.cpu_node": {
"type": "indom",
"alias": "numa_mapping",
'hinv.map.cpu_node': {
'type': 'indom',
'alias': 'numa_mapping',
},
"infiniband.hca.type": {
"type": "indom",
"alias": "ina",
'infiniband.hca.type': {
'type': 'indom',
'alias': 'ina',
},
"infiniband.hca.ca_type": {
"type": "indom",
"alias": "inb",
'infiniband.hca.ca_type': {
'type': 'indom',
'alias': 'inb',
},
"infiniband.hca.numports": {
"type": "indom",
"alias": "inc",
'infiniband.hca.numports': {
'type': 'indom',
'alias': 'inc',
},
"network.interface.in.bytes": {
"type": "indom",
"alias": "ethernet",
'network.interface.in.bytes': {
'type': 'indom',
'alias': 'ethernet',
},
"nvidia.cardname": {
"type": "indom",
"alias": "nvidia",
'nvidia.cardname': {
'type': 'indom',
'alias': 'nvidia',
},
}

# Extend objects maps metric aliases to PCP extend objects
# ExtObjs maps metric aliases to PCP extend objects
# Metrics map to None if the metric does not appear in the archive
extObj = {}
fetchedData = {}
gpuData = {}
for metric in metrics:
try:
metricType = metrics[metric]["type"]
alias = metrics[metric]["alias"]
if metricType == "item":
metricType = metrics[metric]['type']
alias = metrics[metric]['alias']
if metricType == 'item':
extObj[alias] = pmfg.extend_item(metric)
elif metricType == "indom":
elif metricType == 'indom':
extObj[alias] = pmfg.extend_indom(metric)
except pmapi.pmErr as exc:
# If the metric doesn't appear in the archive
if exc.message().startswith("Unknown metric name"):
if exc.message().startswith('Unknown metric name'):
extObj[alias] = None
fetchedData[alias] = None
##########
expected = [
"nvidia",
"model_name",
"ina",
"inb",
"inc",
'nvidia',
'model_name',
'ina',
'inb',
'inc',
]
if alias not in expected:
print("A metric could not be found for alias '%s', returning None..." % (alias))
print('A metric could not be found for alias "%s", returning None...' % (alias))
##########
else:
err = {"archive": archive, "metric": metric, "error": exc.message()}
print(json.dumps(err) + ",")
print("ERROR: pmfg.extend_item or pmfd.extend_indom threw an unexpected exception")
traceback.print_exception(exc)
print('ERROR: pmfg.extend_item or pmfd.extend_indom threw an unexpected exception')
sys.exit(1)

# fetch data until all metrics have been retrieved, or the end of the archive is reached
while not (all(metrics[metric]["alias"] in fetchedData for metric in metrics)):
while not (all(metrics[metric]['alias'] in fetchedData for metric in metrics)):
try:
pmfg.fetch()
except pmapi.pmErr as exc:
if exc.message().startswith("End of PCP archive log"):
if exc.message().startswith('End of PCP archive log'):
# End of archive - fill in missing data with 'None'
for metric in metrics:
if metric not in fetchedData:
fetchedData[metric] = None
break
else:
err = {"archive": archive, "error": exc.message()}
print(json.dumps(err) + ",")
print("ERROR: pmfg.fetch() threw an unexpected exception")
traceback.print_exception(exc)
print('ERROR: pmfg.fetch() threw an unexpected exception')
sys.exit(1)

for metric in metrics:
metricType = metrics[metric]["type"]
alias = metrics[metric]["alias"]
if ((metricType == "item" and alias not in fetchedData) or # item case
(metricType == "indom" and alias not in fetchedData and len(extObj[alias]()) > 0)): # indom case
metricType = metrics[metric]['type']
alias = metrics[metric]['alias']
if ((metricType == 'item' and alias not in fetchedData) or # item case
(metricType == 'indom' and alias not in fetchedData and len(extObj[alias]()) > 0)): # indom case (list)
fetchedData[alias] = extObj[alias]()
if alias == "nvidia":
for _, iname, value in fetchedData["nvidia"]:
gpuData[iname] = value()
print("GPU value = " + str(value()))



# Load gpu data
if alias == 'nvidia':
fetchedData['nvidia'] = {}
for _, iname, value in extObj[alias]():
fetchedData['nvidia'][iname] = value()

# Map the fetched data to the data that needs to be collected
data = {
'record_time_ts': record_time_ts,
'hostname': hostname,
'core_count': fetchedData["ncpu"],
'disk_count': fetchedData["ndisk"],
'physmem': fetchedData["physmem"],
'manufacturer': fetchedData["manufacturer"][0][2]() if fetchedData["manufacturer"] else None,
'model_name': fetchedData["model_name"][0][2]() if fetchedData["model_name"] else None,
'numa_node_count': max([n[2]() for n in fetchedData["numa_mapping"]]) + 1,
'ethernet_count': len([device[1] for device in fetchedData["ethernet"] if device[1] != "lo"]) if fetchedData["ethernet"] else 0,
'core_count': fetchedData['ncpu'],
'disk_count': fetchedData['ndisk'],
'physmem': fetchedData['physmem'],
'manufacturer': fetchedData['manufacturer'][0][2]() if fetchedData['manufacturer'] else None,
'model_name': fetchedData['model_name'][0][2]() if fetchedData['model_name'] else None,
'numa_node_count': max([n[2]() for n in fetchedData['numa_mapping']]) + 1,
'ethernet_count': len([device[1] for device in fetchedData['ethernet'] if device[1] != 'lo']) if fetchedData['ethernet'] else 0,
'gpu': fetchedData.get('nvidia')
}

if gpuData != {}:
data["gpu"] = gpuData

# Transform the infiniband data
infini = defaultdict(list)
if fetchedData.get("ina"):
for _, iname, value in fetchedData["ina"]:
if fetchedData.get('ina'):
for _, iname, value in fetchedData['ina']:
infini[iname].append(value())
if fetchedData.get("inb"):
for _, iname, value in fetchedData["inb"]:
if fetchedData.get('inb'):
for _, iname, value in fetchedData['inb']:
infini[iname].append(value())
if fetchedData.get("inc"):
for _, iname, value in fetchedData["inc"]:
if fetchedData.get('inc'):
for _, iname, value in fetchedData['inc']:
infini[iname].append(value())
if infini:
data['infiniband'] = dict(infini)

# Transform gpu data
try:
if fetchedData.get("nvidia"):
"""
data['gpu'] = {}
for _, iname, value in fetchedData["nvidia"]:
data['gpu'][iname] = value()
"""
except pmapi.pmErr as exc:
print("ERROR: unexpected exception raised while entering gpu data")
tmp = [value for _, iname, value in fetchedData["nvidia"]]
print("Fetched data for nvidia = " + str(tmp))
traceback.print_exc()


return data

@staticmethod
def isJobArchive(archive):
fname = os.path.basename(archive)
return fname.startswith("job-")
return fname.startswith('job-')

class HardwareStagingTransformer(object):
""" Transforms the raw data from the archive into a list
@@ -262,19 +234,7 @@ def __init__(self, archiveData, replacementPath=None, outputFilename='hardware_s

self.result = [ STAGING_COLUMNS ]

# Generate replacement rules from file
if replacementPath == None:
replacementPath = self.autoDetectReplacementPath()

if replacementPath is None or os.path.isdir(replacementPath) == False:
print("No replacement_rules.json file found. Replacement will not be performed")
else:
replacementFile = os.path.join(replacementPath, "replacement_rules.json")
try:
with open(replacementFile, 'r') as inFile:
self.replacementRules = json.load(inFile)
except IOError as e:
self.replacementRules = None


# Transform the archive data
for hw_info in archiveData:
@@ -285,7 +245,7 @@ def __init__(self, archiveData, replacementPath=None, outputFilename='hardware_s
hw_info['ib_ca_type'] = hw_info['infiniband'][device][1]
hw_info['ib_ports'] = hw_info['infiniband'][device][2]

if ('gpu' in hw_info) and ('gpu0' in hw_info['gpu']):
if (hw_info.get('gpu')) and ('gpu0' in hw_info['gpu']):
devices = list(hw_info['gpu'])
hw_info['gpu_device_count'] = len(devices)
hw_info['gpu_device_manufacturer'] = 'NA'
@@ -320,29 +280,43 @@ def __init__(self, archiveData, replacementPath=None, outputFilename='hardware_s
self.get(hw_info.get('record_time_ts'))
])

if self.replacementRules != None:
self.doReplacement()
# Generate replacement rules from file
if replacementPath == None:
replacementPath = self.autoDetectReplacementPath()

if (replacementPath is not None) and os.path.isdir(replacementPath):
replacementFile = os.path.join(replacementPath, 'replacement_rules.json')
try:
with open(replacementFile, 'r') as inFile:
self.replacementRules = json.load(inFile)
self.doReplacement()
except IOError as e:
pass
##########
else:
print('No replacement_rules.json file found. Replacement will not be performed')
##########

with open(outputFilename, "w") as outFile:
with open(outputFilename, 'w') as outFile:
outFile.write(json.dumps(self.result, indent=4, separators=(',', ': ')))

@staticmethod
def autoDetectReplacementPath():
searchpaths = [
os.path.dirname(os.path.abspath(__file__)) + "/../../../../etc/supremm",
"/etc/supremm",
pkg_resources.resource_filename(pkg_resources.Requirement.parse("supremm"), "etc/supremm")
os.path.dirname(os.path.abspath(__file__)) + '/../../../../etc/supremm',
'/etc/supremm',
pkg_resources.resource_filename(pkg_resources.Requirement.parse('supremm'), 'etc/supremm')
]

for path in searchpaths:
if os.path.exists(os.path.join(path, "replacement_rules.json")):
if os.path.exists(os.path.join(path, 'replacement_rules.json')):
return os.path.abspath(path)

return None

@staticmethod
def get(value, typehint='str'):
if (value != None) and (value != ""):
if (value != None) and (value != ''):
return value
if typehint == 'str':
return 'NA'
@@ -395,32 +369,32 @@ def getOptions():
""" process comandline options """
parser = argparse.ArgumentParser()

parser.add_argument("-c", "--config", help="Specify the path to the configuration directory")
parser.add_argument('-c', '--config', help='Specify the path to the configuration directory')

parser.add_argument("-r", "--replace", help="Specify the path to the replacement_rules directory (if none, check config dir)")
parser.add_argument('-r', '--replace', help='Specify the path to the replacement_rules directory (if none, check config dir)')

parser.add_argument("-o", "--output", default="hardware_staging.json", help="Specify the name and path of the output json file")
parser.add_argument('-o', '--output', default='hardware_staging.json', help='Specify the name and path of the output json file')

parser.add_argument(
"-m", "--mindate", metavar="DATE", type=parsetime, default=datetime.now() - timedelta(days=DAY_DELTA),
help="Specify the minimum datestamp of archives to process (default {} days ago)".format(DAY_DELTA)
'-m', '--mindate', metavar='DATE', type=parsetime, default=datetime.now() - timedelta(days=DAY_DELTA),
help='Specify the minimum datestamp of archives to process (default {} days ago)'.format(DAY_DELTA)
)

parser.add_argument(
"-M", "--maxdate", metavar="DATE", type=parsetime, default=datetime.now() - timedelta(minutes=10),
help="Specify the maximum datestamp of archives to process (default now())"
'-M', '--maxdate', metavar='DATE', type=parsetime, default=datetime.now() - timedelta(minutes=10),
help='Specify the maximum datestamp of archives to process (default now())'
)

parser.add_argument("-a", "--all", action="store_true", help="Process all archives regardless of age")
parser.add_argument('-a', '--all', action='store_true', help='Process all archives regardless of age')

grp = parser.add_mutually_exclusive_group()
grp.add_argument("-d", "--debug", dest="log", action="store_const", const=logging.DEBUG, default=logging.INFO,
help="Set log level to debug")
grp.add_argument("-q", "--quiet", dest="log", action="store_const", const=logging.ERROR,
help="Only log errors")
grp.add_argument('-d', '--debug', dest='log', action='store_const', const=logging.DEBUG, default=logging.INFO,
help='Set log level to debug')
grp.add_argument('-q', '--quiet', dest='log', action='store_const', const=logging.ERROR,
help='Only log errors')

parser.add_argument(
"-D", "--debugfile",
'-D', '--debugfile',
help="""
Specify the path to a log file. If this option is present the process will log a DEBUG level to this file.
This logging is independent of the console log.
@@ -439,6 +413,7 @@ def getHardwareInfo():
##########
if opts['config'] != None:
print('CONFIG_DIR = ' + os.path.abspath(opts['config']))
print('MIN_DATE = ' + str(opts['mindate']))
##########

data = []
@@ -447,29 +422,30 @@ def getHardwareInfo():
for resourcename, resource in config.resourceconfigs():

##########
print('\nresource name = ' + resourcename)
print('LOG_DIR = ' + resource['pcp_log_dir'])
print('MIN_DATE = ' + str(opts['mindate']))
##########
count = 0

acache = PcpArchiveHardwareProcessor(resource)
afind = PcpArchiveFinder(opts['mindate'], opts['maxdate'], opts['all'])
for archive, fast_index, hostname in afind.find(resource['pcp_log_dir']):
if not PcpArchiveHardwareProcessor.isJobArchive(archive):
hw_info = PcpArchiveHardwareProcessor.getDataFromArchive(archive)
if hw_info != None:
data.append(hw_info)
##########
##########
count += 1
if count % 100 == 0:
print('count = ' + str(count))
##########
else:
print('Job archive skipped')
##########

HardwareStagingTransformer(data, replacementPath=opts['replace'], outputFilename=outputFilename)

# with open(outputFilename, "w") as outFile:
# outFile.write(json.dumps(data, indent=4, separators=(',', ': ')))

if __name__ == "__main__":
if __name__ == '__main__':
getHardwareInfo()