Skip to content

Commit

Permalink
progress bars are working for monitor now :) just wanna make cute lit…
Browse files Browse the repository at this point in the history
…tle tables for when the job completes
  • Loading branch information
jgoldverg committed Jan 3, 2024
1 parent d5cf76d commit dbffaf8
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 35 deletions.
1 change: 0 additions & 1 deletion sdk/meta_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def query_job_id_cdb(self, job_id):
def query_transferservice_direct(self, job_id, transfer_url):
param = {"jobId": job_id}
hostStr = transfer_url + JOB_DIRECT + "/execution"
print(hostStr)
r = requests.get(hostStr, params=param)
return r.json()

Expand Down
89 changes: 60 additions & 29 deletions sdk/query_transfer_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from rich.table import Table
from sdk.meta_query_gui import QueryGui, MetaQueryAPI
import json
from pytimeparse.timeparse import timeparse
from datetime import datetime
from math import ceil

BASEPATH = "/api/metadata"
JOB = "/job"
Expand Down Expand Up @@ -95,37 +95,69 @@ def progress(job_uuid, retry):

@query.command("monitor")
@click.option("--job_id", type=click.INT)
@click.option("--url", "-u", type=click.STRING, help="url to query if using an ods connector")
@click.option("--url", "-u", type=click.STRING, help="url to query if using an ods connector", default=None)
@click.option("--experiment_file", type=click.Path(writable=True, dir_okay=False),
help="The file to put the results into")
@click.option("--delta", type=click.INT, default=5)
@click.option("--delta", type=click.INT, default=1)
@click.option("--retry", type=click.INT, default=5)
def monitor_job(job_id, url, experiment_file, delta, retry):
end = False
local_retry = 0
meta_query_api = MetaQueryAPI()
pq = QueryGui()
if job_id == None:
if job_id is None:
if url is not None:
job_ids = meta_query_api.query_job_ids_direct(transfer_url=url)
else:
job_ids = meta_query_api.query_all_jobs_ids()
job_id = job_ids[-1]
while end is False and local_retry < retry:
job_batch_cdb = meta_query_api.query_transferservice_direct(job_id, url)
job_batch_cdb = wrap_json_cdb_to_have_defaults(job_batch_cdb)
job_param_column = visualize_job_param_column(job_batch_cdb['jobParameters'])
job_table = visualize_job_table(job_batch_cdb)
step_table = visualize_step_table(job_batch_cdb)
console.print(job_table)
console.print(job_param_column)
console.print(step_table)

if job_batch_cdb['status'] in ["COMPLETED", "FAILED", "ABANDONED", "STOPPED"]:
pq.finished_job_stdout(batch_job_cdb=job_batch_cdb, output_file=experiment_file, job_id=job_id)
print('\n JobId: ', job_id, ' has final status of ', job_batch_cdb['status'])
return
time.sleep(delta)
job_id = job_ids[-1]
print("Using job_id: ", job_id)
with Progress() as progress:
if url is not None:
job_batch_cdb = meta_query_api.query_transferservice_direct(job_id, url)
else:
job_batch_cdb = meta_query_api.query_job_id_cdb(job_id)
visual_job_cdb = wrap_json_cdb_to_have_defaults(job_batch_cdb)
console.print(visualize_job_table(visual_job_cdb))
# console.print(visualize_step_table(visual_job_cdb))
file_progress_id_map = {}
file_last_write_count_map = {}
#Create and update progress bars as we go

while local_retry < retry:
if url is not None:
job_batch_cdb = meta_query_api.query_transferservice_direct(job_id, url)
else:
job_batch_cdb = meta_query_api.query_job_id_cdb(job_id)

job_batch_cdb = wrap_json_cdb_to_have_defaults(job_batch_cdb)
for step in job_batch_cdb['batchSteps']:
entityInfo = json.loads(
job_batch_cdb['jobParameters'][step['step_name']]) # json format of file in jobParams
if step['step_name'] not in file_progress_id_map:
file_size = entityInfo['size']
chunk_size = entityInfo['chunkSize']
total_chunks = ceil(file_size / chunk_size)
print(step['step_name'] + "Has size: " + str(file_size) + " chunk_size:" + str(chunk_size) + " total_chunks:" +str(total_chunks) + "Initial write_count:" + str(step['writeCount']))
file_task = progress.add_task(step['step_name'], total=total_chunks)
progress.update(file_task, advance=int(step['writeCount']))
file_progress_id_map[step['step_name']] = file_task
file_last_write_count_map[step['step_name']] = int(step['writeCount'])
if step['status'] == ["COMPLETED", "FAILED", "ABANDONED", "STOPPED"]:
progress.update(file_task_id, completed=True)
else:
file_task_id = file_progress_id_map[step['step_name']]
written = step['writeCount'] - file_last_write_count_map[step['step_name']]
file_last_write_count_map[step['step_name']] = step['writeCount']
progress.update(file_task_id, advance=written)
if step['status'] == ["COMPLETED", "FAILED", "ABANDONED", "STOPPED"]:
progress.update(file_task_id, completed=True)

progress.refresh()
if job_batch_cdb['status'] in ["COMPLETED", "FAILED", "ABANDONED", "STOPPED"]:
pq.finished_job_stdout(batch_job_cdb=job_batch_cdb, output_file=experiment_file, job_id=job_id)
print('\n JobId: ', job_id, ' has final status of ', job_batch_cdb['status'])
return
time.sleep(1)


@query.command("ids")
Expand Down Expand Up @@ -279,13 +311,11 @@ def visualize_step_table(job_json):
step_table.add_column("status")

for step in job_json['batchSteps']:
entityInfo = job_json['jobParameters'][step['step_name']]
comma_separated = entityInfo.split(",")
file_id = comma_separated[0].split("id=")[1].strip()[1:]
path = comma_separated[1].split("path=")[1].strip()[1:]
size = comma_separated[2].split("size=")[1].strip()
chunkSize = comma_separated[3].split("chunkSize=")[1].strip()
chunkSize = chunkSize[:-1].strip()
entityInfo = json.loads(job_json['jobParameters'][step['step_name']])
file_id = entityInfo['id']
path = entityInfo['path']
size = entityInfo['size']
chunkSize = entityInfo['chunkSize']
step_table.add_row(step['step_name'], str(size), str(chunkSize), step['startTime'], step['endTime'],
str(step['readCount']), str(step['writeCount']), step['status'])

Expand All @@ -311,6 +341,7 @@ def visualize_job_table(job_json):


def visualize_job_param_column(job_param_json):
# print(job_param_json)
job_param_table = Table(title="Job Parameters")
job_param_table.add_column("Name")
job_param_table.add_column("Value")
Expand All @@ -324,7 +355,7 @@ def visualize_job_param_column(job_param_json):
job_param_table.add_row("File Size Average", job_param_json['fileSizeAvg'])
job_param_table.add_row("Retry", str(job_param_json['retry']))
job_param_table.add_row("Source Credential Id", job_param_json['sourceCredential'])
job_param_table.add_row("Source Uri", job_param_json['sourceURI'])
# job_param_table.add_row("Source Uri", job_param_json['sourceURI'])
job_param_table.add_row("Source Base Path", job_param_json['sourceBasePath'])
job_param_table.add_row("Destination Credential Id", job_param_json['destCredential'])
job_param_table.add_row("Destination Base Path", job_param_json['destBasePath'])
Expand Down
15 changes: 10 additions & 5 deletions [email protected]
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
{
"ownerId": "[email protected]",
"source": {
"credId": "httpTaccNginx",
"type": "http",
"credId": "httpTaccNginx",
"vfsSourceCredential": {
"uri": "",
"username": "",
"secret": ""
},
"fileSourcePath": "/",
"resourceList": [
"infoList": [
{
"path": "/parallel/",
"id": "/parallel/",
Expand All @@ -13,20 +18,20 @@
]
},
"destination": {
"credId": "[email protected]",
"type": "vfs",
"credId": "[email protected]",
"fileDestinationPath": "/Users/jacobgoldverg/testData/"
},
"options": {
"compress": false,
"encrypt": false,
"optimizer": "",
"optimizer": "BO",
"overwrite": false,
"retry": 5,
"verify": false,
"concurrencyThreadCount": 1,
"parallelThreadCount": 1,
"pipeSize": 10,
"pipeSize": 1,
"chunkSize": 10000000,
"scheduledTime": "2023-10-25T14:52:15.183975Z"
},
Expand Down

0 comments on commit dbffaf8

Please sign in to comment.