1
- import logging
2
1
import queue
3
2
import subprocess
4
3
import threading
8
7
import requests
9
8
from google .cloud import storage
10
9
10
+ from clinvar_gk_pilot .logger import logger
11
11
from clinvar_gk_pilot .utils import make_progress_logger
12
12
13
- _logger = logging .getLogger ("clinvar_gk_pilot" )
14
-
15
13
16
14
def _get_gcs_client () -> storage .Client :
17
15
if getattr (_get_gcs_client , "client" , None ) is None :
@@ -37,12 +35,12 @@ def copy_file_to_bucket(
37
35
"""
38
36
Upload the contents of file `local_file_uri` on local filesystem, to `remote_blob_uri` in
39
37
"""
40
- _logger .info (f"Uploading { local_file_uri } to { remote_blob_uri } " )
38
+ logger .info (f"Uploading { local_file_uri } to { remote_blob_uri } " )
41
39
if client is None :
42
40
client = _get_gcs_client ()
43
41
blob = parse_blob_uri (remote_blob_uri , client = client )
44
42
blob .upload_from_filename (client = client , filename = local_file_uri )
45
- _logger .info (f"Finished uploading { local_file_uri } to { remote_blob_uri } " )
43
+ logger .info (f"Finished uploading { local_file_uri } to { remote_blob_uri } " )
46
44
47
45
48
46
def blob_writer (
@@ -89,15 +87,15 @@ def http_download_requests(
89
87
"""
90
88
Download the contents of `http_uri` to `local_path` using requests.get
91
89
"""
92
- _logger .info (f"Downloading { http_uri } to { local_path } " )
90
+ logger .info (f"Downloading { http_uri } to { local_path } " )
93
91
94
92
bytes_read = 0
95
93
response = requests .get (http_uri , stream = True , timeout = 10 )
96
94
response .raise_for_status ()
97
95
opened_file_size = int (response .headers .get ("Content-Length" ))
98
96
99
97
log_progress = make_progress_logger (
100
- logger = _logger ,
98
+ logger = logger ,
101
99
fmt = "Read {elapsed_value} bytes in {elapsed:.2f} seconds. Total bytes read: {current_value}/{max_value}." ,
102
100
max_value = opened_file_size ,
103
101
)
@@ -118,7 +116,7 @@ def http_download_requests(
118
116
119
117
if len (chunk ) == 0 :
120
118
wait_time = 10
121
- _logger .warning (
119
+ logger .warning (
122
120
f"Received an empty chunk from { http_uri } at byte { bytes_read } . Pausing { wait_time } seconds"
123
121
)
124
122
time .sleep (wait_time )
@@ -163,22 +161,21 @@ def file_stat(path: Path, q: queue.Queue):
163
161
break
164
162
except queue .Empty :
165
163
if not path .exists ():
166
- _logger .info (f"{ path } does not exist" )
164
+ logger .info (f"{ path } does not exist" )
167
165
else :
168
- _logger .info (f"{ path } size: { path .stat ().st_size } " )
166
+ logger .info (f"{ path } size: { path .stat ().st_size } " )
169
167
time .sleep (10 )
170
168
171
169
t_stat_stop = queue .Queue ()
172
- t_stat = threading .Thread (
173
- target = file_stat , args = (Path (local_path ), t_stat_stop ))
170
+ t_stat = threading .Thread (target = file_stat , args = (Path (local_path ), t_stat_stop ))
174
171
t_stat .start ()
175
172
176
173
for _ in range (2 ):
177
174
for pipe , line in iter (q .get , None ):
178
- _logger .info (f"{ pipe } : { line .decode ('utf-8' )} " )
175
+ logger .info (f"{ pipe } : { line .decode ('utf-8' )} " )
179
176
180
177
returncode = p .wait ()
181
- _logger .info (f"curl return code: { returncode } " )
178
+ logger .info (f"curl return code: { returncode } " )
182
179
183
180
t_stat_stop .put (None )
184
181
0 commit comments