Skip to content

Commit

Permalink
Merge branch 'master' into Comm_layer
Browse files Browse the repository at this point in the history
  • Loading branch information
fbarreir committed Feb 22, 2017
2 parents fca174d + 79c2d1a commit bb508f4
Show file tree
Hide file tree
Showing 33 changed files with 296 additions and 86 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ to provide a uniform view for various resources.

For a detailed description and installation instructions, please check out this project's wiki tab:
https://github.com/PanDAWMS/panda-harvester/wiki

----------
1 change: 1 addition & 0 deletions _config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
theme: jekyll-theme-minimal
Binary file added images/Seq_CredManager.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/Seq_EventFeeder.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/Seq_JobFetcher.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/Seq_Monitor.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified images/Seq_Preparator.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/Seq_Propagator.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/Seq_Stager.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/Seq_Submitter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes
File renamed without changes
File renamed without changes
File renamed without changes
Binary file added images/db_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 3 additions & 12 deletions pandaharvester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,10 @@

# Table structure

SQLite DB structures are generated in the harvestercore/db_proxy.py. Current tables are:
```python
jobTableName = 'job_table'
workTableName = 'work_table'
fileTableName = 'file_table'
cacheTableName = 'cache_table'
eventTableName = 'event_table'
seqNumberTableName = 'seq_table'
pandaQueueTableName = 'pq_table'
jobWorkerTableName = 'jw_table'
```
SQLite DB structures are generated in the harvestercore/db_proxy.py. Individual table structure is defined in each *_spec.py file. The initial definitions below, but will evolve over time, so please consult the relevant spec file for the up to date structure.

![alt text](../images/db_diagram.png "DB ER diagram")

Individual table structure is defined in each *_spec.py file. The initial definitions below, but will evolve over time, so please consult the relevant spec file for the up to date structure.
### job_table
```python
'PandaID:integer primary key',
Expand Down
11 changes: 7 additions & 4 deletions pandaharvester/harvesterbody/cred_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ def execute(self):
mainLog = core_utils.make_logger(_logger)
# check credential
mainLog.debug('check credential')
isValid = self.exeCore.check_credential(harvester_config.pandacon.key_file)
# renew it if necessary
if not isValid:
isValid = self.exeCore.check_credential(harvester_config.credmanager.certFile)
if isValid:
mainLog.debug('valid')
elif not isValid:
# renew it if necessary
mainLog.debug('invalid')
mainLog.debug('renew credential')
tmpStat, tmpOut = self.exeCore.renew_credential(harvester_config.pandacon.key_file)
tmpStat, tmpOut = self.exeCore.renew_credential(harvester_config.credmanager.certFile)
if not tmpStat:
mainLog.error('failed : {0}'.format(tmpOut))
return
Expand Down
2 changes: 2 additions & 0 deletions pandaharvester/harvesterbody/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def start(self):
##################
# loop on stop event to be interruptable since thr.join blocks signal capture in python 2.7
while True:
if self.singleMode:
break
self.stopEvent.wait(1)
if self.stopEvent.is_set():
break
Expand Down
14 changes: 9 additions & 5 deletions pandaharvester/harvesterbody/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pandaharvester.harvestercore.plugin_factory import PluginFactory
from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterbody.worker_maker import WorkerMaker
from pandaharvester.harvesterbody.worker_assigner import WorkerAssigner

# logger
_logger = core_utils.setup_logger()
Expand All @@ -20,6 +21,7 @@ def __init__(self, queue_config_mapper, single_mode=False):
self.queueConfigMapper = queue_config_mapper
self.dbProxy = DBProxy()
self.workerMaker = WorkerMaker()
self.workerAssigner = WorkerAssigner(queue_config_mapper)
self.pluginFactory = PluginFactory()


Expand All @@ -29,15 +31,17 @@ def run(self):
while True:
mainLog = core_utils.make_logger(_logger, 'id={0}'.format(lockedBy))
mainLog.debug('getting queues to submit workers')
# get queues to submit workers
nWorkersPerQueue = self.dbProxy.get_queues_to_submit(harvester_config.submitter.nQueues,
harvester_config.submitter.lookupTime)
mainLog.debug('got {0} queues'.format(len(nWorkersPerQueue)))
# get queues associated to a site to submit workers
curWorkersPerQueue, siteName = self.dbProxy.get_queues_to_submit(harvester_config.submitter.nQueues,
harvester_config.submitter.lookupTime)
mainLog.debug('got {0} queues for site {1}'.format(len(curWorkersPerQueue), siteName))
# define number of new workers
nWorkersPerQueue = self.workerAssigner.define_num_workers(curWorkersPerQueue, siteName)
# loop over all queues
for queueName, tmpVal in nWorkersPerQueue.iteritems():
tmpLog = core_utils.make_logger(_logger, 'queue={0}'.format(queueName))
tmpLog.debug('start')
nWorkers = tmpVal['nWorkers']
nWorkers = tmpVal['nNewWorker'] + tmpVal['nReady']
nReady = tmpVal['nReady']
# check queue
if not self.queueConfigMapper.has_queue(queueName):
Expand Down
82 changes: 82 additions & 0 deletions pandaharvester/harvesterbody/worker_assigner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import copy

from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.db_proxy import DBProxy
from pandaharvester.harvestercore.plugin_factory import PluginFactory

# logger
_logger = core_utils.setup_logger()


# class to define number of workers to submit
class WorkerAssigner:
# constructor
def __init__(self, queue_config_mapper):
self.queueConfigMapper = queue_config_mapper
self.pluginFactory = PluginFactory()
self.dbProxy = DBProxy()

# define number of workers to submit based on various information
def define_num_workers(self, static_num_workers, site_name):
tmpLog = core_utils.make_logger(_logger, 'site={0}'.format(site_name))
tmpLog.debug('start')
dyn_num_workers = copy.copy(static_num_workers)
try:
# get queue status
queueStat = self.dbProxy.get_cache("panda_queues.json", None)
if queueStat is None:
queueStat = dict()
else:
queueStat = queueStat.data
# define num of new workers
for queueName, tmpVal in static_num_workers.iteritems():
# set 0 to num of new workers when the queue is disabled
if queueName in queueStat and queueStat[queueName]['status'] in ['offline']:
dyn_num_workers[queueName]['nNewWorker'] = 0
continue
# get queue
queueConfig = self.queueConfigMapper.get_queue(queueName)
nQueue = tmpVal['nQueue']
nReady = tmpVal['nReady']
nRunning = tmpVal['nRunning']
nQueueLimit = queueConfig.nQueueLimitWorker
maxWorkers = queueConfig.maxWorkers
qrWorkerRatio = queueConfig.qrWorkerRatio
# define num of new workers based on static site config
nNewWorkers = 0
if nQueueLimit > 0 and nQueue >= nQueueLimit:
# enough queued workers
pass
elif maxWorkers > 0 and (nQueue + nReady + nRunning) >= maxWorkers:
# enough workers in the system
pass
elif qrWorkerRatio > 0 and nRunning > 0 and nQueue > qrWorkerRatio * nRunning / 100:
# enough workers in terms of Running-to-Queued ratio
pass
else:
# get max number of queued workers
maxQueuedWorkers = 0
if nQueueLimit > 0:
maxQueuedWorkers = nQueueLimit
if qrWorkerRatio > 0 and nRunning > 0:
maxQueuedWorkers = max(maxQueuedWorkers, qrWorkerRatio * nRunning / 100)
if maxQueuedWorkers == 0:
# use default value
maxQueuedWorkers = 1
# new workers
nNewWorkers = min(max(maxQueuedWorkers - nQueue, 0),
max(maxWorkers - nQueue - nReady - nRunning, 0))
dyn_num_workers[queueName]['nNewWorker'] = nNewWorkers
# correction for global shares
# TO BE IMPLEMENTED
# correction based on commands from PanDA
# TO BE IMPLEMENTED
# correction based on resource information
# TO BE IMPLEMENTED : need plugin?
# dump
tmpLog.debug('defined {0}'.format(str(dyn_num_workers)))
return dyn_num_workers
except:
# dump error
core_utils.dump_error_message(tmpLog)
return dyn_num_workers
23 changes: 23 additions & 0 deletions pandaharvester/harvestercore/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,26 @@ def ack_commands(self, command_ids):
except KeyError:
core_utils.dump_error_message(_logger, tmp_res)
return False

# get proxy
def get_proxy(self, voms_role):
retVal = None
retMsg = ''
# get logger
tmpLog = core_utils.make_logger(_logger)
tmpLog.debug('start')
data = {'role': voms_role}
tmpStat, tmpRes = self.post_ssl('getProxy', data)
if tmpStat is False:
core_utils.dump_error_message(tmpLog, tmpRes)
else:
try:
tmpDict = tmpRes.json()
if tmpDict['StatusCode'] == 0:
retVal = tmpDict['userProxy']
else:
retMsg = tmpDict['errorDialog']
except:
retMsg = core_utils.dump_error_message(tmpLog, tmpRes)
tmpLog.debug('done with {0}'.format(str(retVal)))
return retVal, retMsg
98 changes: 49 additions & 49 deletions pandaharvester/harvestercore/db_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def fill_panda_queue_table(self, panda_queue_list, queue_config_mapper):
for queueName in panda_queue_list:
queueConfig = queue_config_mapper.get_queue(queueName)
if queueConfig is not None:
# check if alrady exist
# check if already exist
sqlC = "SELECT 1 FROM {0} ".format(pandaQueueTableName)
sqlC += "WHERE queueName=:queueName "
varMap = dict()
Expand Down Expand Up @@ -719,73 +719,73 @@ def get_queues_to_submit(self, n_queues, interval):
# get logger
tmpLog = core_utils.make_logger(_logger)
tmpLog.debug('start')
retMap = {}
retMap = dict()
siteName = None
# sql to get a site
sqlS = "SELECT siteName FROM {0} ".format(pandaQueueTableName)
sqlS += "WHERE submitTime IS NULL OR submitTime<:timeLimit "
sqlS += "ORDER BY submitTime "
sqlS += "FOR UPDATE "
# sql to get queues
sqlQ = "SELECT queueName,nQueueLimitWorker,maxWorkers FROM {0} ".format(pandaQueueTableName)
sqlQ += "WHERE submitTime IS NULL OR submitTime<:timeLimit "
sqlQ += "ORDER BY submitTime "
sqlQ += "FOR UPDATE "
sqlQ = "SELECT queueName FROM {0} ".format(pandaQueueTableName)
sqlQ += "WHERE siteName=:siteName "
# sql to count nQueue
sqlN = "SELECT status,count(*) FROM {0} ".format(workTableName)
sqlN += "WHERE computingSite=:computingSite GROUP BY status "
# sql to update timestamp
sqlU = "UPDATE {0} SET submitTime=:submitTime ".format(pandaQueueTableName)
sqlU += "WHERE queueName=:queueName "
# get queues
# get sites
timeNow = datetime.datetime.utcnow()
varMap = dict()
varMap[':timeLimit'] = timeNow - datetime.timedelta(seconds=interval)
self.execute(sqlQ, varMap)
resQ = self.cur.fetchall()
for queueName, nQueueLimit, maxWorkers in resQ:
# count nQueue
varMap = dict()
varMap[':computingSite'] = queueName
self.execute(sqlN, varMap)
nQueue = 0
nReady = 0
nRunning = 0
for workerStatus, tmpNum in self.cur.fetchall():
if workerStatus in [WorkSpec.ST_submitted]:
nQueue += tmpNum
elif workerStatus in [WorkSpec.ST_ready]:
nReady += tmpNum
elif workerStatus in [WorkSpec.ST_running]:
nRunning += tmpNum
# include ready workers
nWorkers = nReady
# new workers
if nQueueLimit > 0 and nQueue >= nQueueLimit:
# only ready workers since enough queued workers are there
pass
elif maxWorkers > 0 and (nQueue + nReady + nRunning) >= maxWorkers:
# only ready workers since enough workers are there
pass
else:
# add new workers
nWorkers += min(max(nQueueLimit - nQueue, 0), max(maxWorkers - nQueue + nReady + nRunning, 0))
# add
retMap[queueName] = {'nWorkers': nWorkers,
'nReady': nReady}
# update timestamp
self.execute(sqlS, varMap)
resS = self.cur.fetchone()
if resS is not None:
# get queues
siteName, = resS
varMap = dict()
varMap[':queueName'] = queueName
varMap[':submitTime'] = timeNow
self.execute(sqlU, varMap)
# enough queues
if len(retMap) >= n_queues:
break
varMap[':siteName'] = siteName
self.execute(sqlQ, varMap)
resQ = self.cur.fetchall()
for queueName, in resQ:
# count nQueue
varMap = dict()
varMap[':computingSite'] = queueName
self.execute(sqlN, varMap)
nQueue = 0
nReady = 0
nRunning = 0
for workerStatus, tmpNum in self.cur.fetchall():
if workerStatus in [WorkSpec.ST_submitted]:
nQueue += tmpNum
elif workerStatus in [WorkSpec.ST_ready]:
nReady += tmpNum
elif workerStatus in [WorkSpec.ST_running]:
nRunning += tmpNum
# add
retMap[queueName] = {'nReady': nReady,
'nRunning': nRunning,
'nQueue': nQueue}
# update timestamp
varMap = dict()
varMap[':queueName'] = queueName
varMap[':submitTime'] = timeNow
self.execute(sqlU, varMap)
# enough queues
if len(retMap) >= n_queues:
break
# commit
self.commit()
tmpLog.debug('got {0}'.format(str(retMap)))
return retMap
return retMap, siteName
except:
# roll back
self.rollback()
# dump error
core_utils.dump_error_message(_logger)
# return
return {}
return {}, None

# get job chunks to make workers
def get_job_chunks_for_workers(self, queue_name, n_workers, n_ready, n_jobs_per_worker, n_workers_per_job,
Expand Down Expand Up @@ -1705,7 +1705,7 @@ def get_cache(self, main_key, sub_key):
self.commit()
if resJ is None:
return None
# make job
# make spec
cacheSpec = CacheSpec()
cacheSpec.pack(resJ)
tmpLog.debug('done')
Expand Down
1 change: 1 addition & 0 deletions pandaharvester/harvestercore/panda_queue_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class PandaQueueSpec(SpecBase):
'maxWorkers:integer',
'jobFetchTime:timestamp',
'submitTime:timestamp',
'siteName:text'
)

# constructor
Expand Down
4 changes: 3 additions & 1 deletion pandaharvester/harvestercore/queue_config_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def __init__(self, queue_name):
self.mapType = WorkSpec.MT_OneToOne
self.useJobLateBinding = False
self.zipPerMB = None
self.siteName = ''
self.qrWorkerRatio = 0


# mapper
Expand All @@ -33,7 +35,7 @@ def __init__(self):
if not os.path.exists(confFilePath):
confFilePath = None
# look into /etc/panda
if confFilePath == None:
if confFilePath is None:
confFilePath = os.path.join('/etc/panda',
harvester_config.qconf.configFile)
# load config from json
Expand Down
Loading

0 comments on commit bb508f4

Please sign in to comment.