2626import gzip
2727import io
2828import json
29+ import logging
2930import re
3031import time
3132import urllib .request
3940from torchci .clickhouse import query_clickhouse
4041
4142
43+ logger = logging .getLogger (__name__ )
44+ handler = logging .StreamHandler ()
45+ formatter = logging .Formatter ("%(asctime)s [%(levelname)s] %(message)s" )
46+ logger .setLevel (logging .DEBUG )
47+ handler .setLevel (logging .DEBUG )
48+ handler .setFormatter (formatter )
49+ logger .addHandler (handler )
50+
51+
4252def get_temp_dir () -> Path :
4353 """Create a temporary directory for processing files"""
4454 temp_dir = Path ("/tmp/file_report_generator" )
@@ -61,7 +71,7 @@ def __init__(self, dry_run: bool = True):
6171 @lru_cache
6272 def load_runner_costs (self ) -> Dict [str , float ]:
6373 """Load runner costs from the S3 endpoint"""
64- print ("Fetching EC2 pricing data from S3..." )
74+ logger . debug ("Fetching EC2 pricing data from S3..." )
6575 with urllib .request .urlopen (self .EC2_PRICING_URL ) as response :
6676 compressed_data = response .read ()
6777
@@ -77,7 +87,7 @@ def load_runner_costs(self) -> Dict[str, float]:
7787 def load_test_owners (self ) -> List [Dict [str , Any ]]:
7888 """Load the test owner labels JSON file from S3"""
7989 S3_URL = "https://ossci-metrics.s3.us-east-1.amazonaws.com/test_owner_labels/test_owner_labels.json.gz"
80- print (f"Fetching test owner labels from S3: { S3_URL } " )
90+ logger . debug (f"Fetching test owner labels from S3: { S3_URL } " )
8191 with urllib .request .urlopen (S3_URL ) as response :
8292 compressed_data = response .read ()
8393 decompressed_data = gzip .decompress (compressed_data )
@@ -107,7 +117,7 @@ def _get_first_suitable_sha(self, shas: list[dict[str, Any]]) -> Optional[str]:
107117 has_no_job_name = True
108118 break
109119 if has_no_job_name :
110- print (f"Has entries with no job name for { head_sha } " )
120+ logger . debug (f"Has entries with no job name for { head_sha } " )
111121 continue
112122
113123 lens .append ((head_sha , len (test_data )))
@@ -119,7 +129,7 @@ def _get_first_suitable_sha(self, shas: list[dict[str, Any]]) -> Optional[str]:
119129 _ , len2 = lens [1 ]
120130
121131 if abs (len1 - len2 ) * 2 / (len1 + len2 ) < 0.1 :
122- print (f"Using SHA { sha1 } with { len1 } entries" )
132+ logger . debug (f"Using SHA { sha1 } with { len1 } entries" )
123133 return sha1
124134 return None
125135
@@ -135,7 +145,7 @@ def find_suitable_sha(self, date: str) -> Optional[str]:
135145 - All test entries have job names
136146 """
137147
138- print ("Searching for suitable SHAs from PyTorch main branch..." )
148+ logger . debug ("Searching for suitable SHAs from PyTorch main branch..." )
139149
140150 params = {
141151 "start_date" : date + " 00:00:00" ,
@@ -160,10 +170,10 @@ def find_suitable_sha(self, date: str) -> Optional[str]:
160170 ORDER BY
161171 min(w.head_commit.'timestamp') DESC
162172 """
163- print (f"Querying ClickHouse for successful shas on { date } " )
173+ logger . debug (f"Querying ClickHouse for successful shas on { date } " )
164174 candidates = query_clickhouse (query , params )
165175
166- print (f"Found { len (candidates )} candidate SHAs" )
176+ logger . debug (f"Found { len (candidates )} candidate SHAs" )
167177
168178 return self ._get_first_suitable_sha (candidates )
169179
@@ -185,7 +195,7 @@ def _get_workflow_jobs_for_sha(self, sha: str) -> List[Dict[str, Any]]:
185195
186196 params = {"sha" : sha }
187197
188- print (f"Querying ClickHouse for workflow runs with SHA: { sha } " )
198+ logger . debug (f"Querying ClickHouse for workflow runs with SHA: { sha } " )
189199 result = query_clickhouse (query , params )
190200
191201 for row in result :
@@ -265,7 +275,7 @@ def _fetch_from_s3(self, bucket: str, key: str) -> str:
265275 try :
266276 file_loc = get_temp_dir () / f"cache_{ bucket } _{ key .replace ('/' , '_' )} "
267277 if file_loc .exists ():
268- print (f"Using cached download for { file_loc } " )
278+ logger . debug (f"Using cached download for { file_loc } " )
269279 compressed_data = file_loc .read_bytes ()
270280 else :
271281 url = f"https://{ bucket } .s3.amazonaws.com/{ key } "
@@ -279,7 +289,7 @@ def _fetch_from_s3(self, bucket: str, key: str) -> str:
279289 text_data = decompressed_data .decode ("utf-8" )
280290 return text_data
281291 except Exception as e :
282- print (f"Failed to fetch from s3://{ bucket } /{ key } : { e } " )
292+ logger . debug (f"Failed to fetch from s3://{ bucket } /{ key } : { e } " )
283293 raise e
284294
285295 def _fetch_invoking_file_summary_from_s3 (
@@ -305,7 +315,7 @@ def _fetch_invoking_file_summary_from_s3(
305315 entry ["short_job_name" ] = f"{ build } / test ({ config } )"
306316 data_as_list .append (entry )
307317
308- print (
318+ logger . debug (
309319 f"Fetched { len (data_as_list )} test entries from { key } , took { time .time () - start_time :.2f} seconds"
310320 )
311321 return data_as_list
@@ -403,7 +413,7 @@ def _fetch_status_changes_from_s3(
403413 data ["run_id" ] = workflow_run_id
404414 test_data .append (data )
405415
406- print (
416+ logger . debug (
407417 f"Fetched { len (test_data )} test entries from { key } , took { time .time () - start_time :.2f} seconds"
408418 )
409419 return test_data
@@ -456,7 +466,9 @@ def _check_status_change_already_exists(self, sha1: str, sha2: str) -> bool:
456466 try :
457467 with urllib .request .urlopen (url ) as response :
458468 if response .status == 200 :
459- print (f"Status changes for { sha1 } to { sha2 } already exist in S3." )
469+ logger .debug (
470+ f"Status changes for { sha1 } to { sha2 } already exist in S3."
471+ )
460472 return True
461473 except Exception :
462474 pass
@@ -515,6 +527,9 @@ def get_status_changes(
515527 to_write = []
516528 for key , entries in counts .items ():
517529 to_write .extend (entries [:10 ])
530+ logger .debug (
531+ f"Found { len (status_changes )} status changes between { sha1 } and { sha2 } , truncated to { len (to_write )} for upload"
532+ )
518533
519534 self .upload_to_s3 (
520535 to_write ,
@@ -588,9 +603,14 @@ def upload_to_s3(
588603 html_url = f"https://{ bucket_name } .s3.amazonaws.com/{ key } "
589604
590605 if self .dry_run :
591- print (f"Dry run: would upload data to s3: { html_url } " )
606+ local_file = get_temp_dir () / f"dry_run_{ key .replace ('/' , '_' )} .json"
607+ logger .info (
608+ f"Dry run: would upload data to s3: { html_url } , writing to local file { local_file } instead"
609+ )
610+ with open (local_file , "w" ) as f :
611+ f .write (body .getvalue ())
592612 return
593- print (f"Uploading data to s3: { html_url } " )
613+ logger . info (f"Uploading data to s3: { html_url } " )
594614 self .get_s3_resource ().Object (bucket_name , key ).put (
595615 Body = gzip .compress (body .getvalue ().encode ()),
596616 ContentEncoding = "gzip" ,
@@ -599,12 +619,11 @@ def upload_to_s3(
599619
600620 def remove_key_from_s3 (self , bucket : str , key : str ) -> None :
601621 """Remove a specific key from S3"""
602- s3_path = f"s3://{ bucket } /{ key } "
603622 html_url = f"https://{ bucket } .s3.amazonaws.com/{ key } "
604623 if self .dry_run :
605- print (f"Dry run: would remove from s3: { html_url } " )
624+ logger . info (f"Dry run: would remove from s3: { html_url } " )
606625 return
607- print (f"Removing from s3: { html_url } " )
626+ logger . info (f"Removing from s3: { html_url } " )
608627 self .get_s3_resource ().Object (bucket , key ).delete ()
609628
610629
@@ -645,7 +664,7 @@ def main() -> None:
645664 if args .remove_sha :
646665 for i , entry in enumerate (existing_metadata ):
647666 if entry ["sha" ] == args .remove_sha :
648- print (f"Removing SHA { args .remove_sha } from existing metadata" )
667+ logger . info (f"Removing SHA { args .remove_sha } from existing metadata" )
649668 generator .remove_key_from_s3 (
650669 "ossci-raw-job-status" ,
651670 f"additional_info/weekly_file_report/data_{ args .remove_sha } .json.gz" ,
@@ -668,19 +687,19 @@ def main() -> None:
668687 shas : list [str ] = []
669688 for date in args .add_dates or []:
670689 if date in _existing_dates :
671- print (f"Date { date } already exists in metadata, skipping" )
690+ logger . info (f"Date { date } already exists in metadata, skipping" )
672691 continue
673692 sha = generator .find_suitable_sha (date )
674693 if sha is None :
675- print (f"No suitable SHA found for date { date } , skipping" )
694+ logger . info (f"No suitable SHA found for date { date } , skipping" )
676695 continue
677- print (f"Found suitable SHA { sha } for date { date } " )
696+ logger . info (f"Found suitable SHA { sha } for date { date } " )
678697 shas .append (sha )
679698
680699 for sha in args .add_shas or []:
681700 shas .append (cast (str , sha ))
682701
683- print (f"Adding SHAs: { shas } " )
702+ logger . info (f"Adding SHAs: { shas } " )
684703
685704 # Load data to get dates/ordering
686705 for sha in shas :
@@ -690,7 +709,7 @@ def main() -> None:
690709
691710 existing_metadata = sorted (existing_metadata , key = lambda x : x ["push_date" ])
692711
693- print ("Calculating diffs for all files and grouping by labels..." )
712+ logger . debug ("Calculating diffs for all files and grouping by labels..." )
694713 for i in range (1 , len (existing_metadata )):
695714 if not generator ._check_status_change_already_exists (
696715 existing_metadata [i - 1 ]["sha" ],
0 commit comments