Skip to content

Commit 0bca7ca

Browse files
EliEli
authored andcommitted
Moved period-of-record NCRO download to download_ncro_cnra.py
1 parent f21bd2d commit 0bca7ca

File tree

2 files changed

+300
-1
lines changed

2 files changed

+300
-1
lines changed
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
#!/usr/bin/env python
2+
import argparse
3+
import ssl
4+
import requests
5+
import pandas as pd
6+
import re
7+
import zipfile
8+
import os
9+
import io
10+
import string
11+
import datetime as dt
12+
import numpy as np
13+
import concurrent
14+
import concurrent.futures
15+
import time
16+
import urllib3
17+
18+
from dms_datastore.process_station_variable import (
19+
process_station_list,
20+
stationfile_or_stations,
21+
)
22+
from dms_datastore import dstore_config
23+
from .logging_config import logger
24+
25+
__all__ = ["download_ncro_por"]
26+
27+
ncro_inventory_file = "ncro_por_inventory.txt"
28+
29+
30+
def station_dbase():
31+
dbase_fname = dstore_config.config_file("station_dbase")
32+
dbase_df = pd.read_csv(dbase_fname, header=0, comment="#", index_col="id")
33+
is_ncro = dbase_df.agency.str.lower().str.contains("ncro")
34+
logger.info(is_ncro[is_ncro.isnull()])
35+
return dbase_df.loc[is_ncro, :]
36+
37+
38+
def download_ncro_inventory(dest, cache=True):
39+
url = "https://data.cnra.ca.gov/dataset/fcba3a88-a359-4a71-a58c-6b0ff8fdc53f/resource/cdb5dd35-c344-4969-8ab2-d0e2d6c00821/download/station-trace-download-links.csv"
40+
41+
max_attempt = 10
42+
session = requests.Session()
43+
inventory_html = ""
44+
for attempt in range(1, (max_attempt + 1)):
45+
logger.info(f"Downloading inventory for NCRO attempt #{attempt}")
46+
try:
47+
response = session.get(url, verify=False, stream=True,headers={'User-Agent': 'Mozilla/6.0'})
48+
response.raise_for_status()
49+
for chunk in response.iter_lines(chunk_size=1024): # Iterate over lines
50+
if chunk: # Filter out keep-alive new chunks
51+
inventory_html += chunk.decode('utf-8')+"\n"
52+
53+
#response.encoding = 'UTF-8'
54+
#inventory_html = response.content.decode('utf-8')
55+
fio = io.StringIO(inventory_html)
56+
57+
idf = pd.read_csv(
58+
fio,
59+
header=0,
60+
parse_dates=["start_time", "end_time"],
61+
skip_blank_lines=True
62+
)
63+
64+
idf = idf.loc[
65+
(idf.station_type != "Groundwater") & (idf.output_interval == "RAW"), :
66+
]
67+
logger.info(idf)
68+
69+
idf.to_csv(
70+
os.path.join(dest, ncro_inventory_file),
71+
sep=",",
72+
index=False,
73+
date_format="%Y-%d%-mT%H:%M",
74+
)
75+
return idf
76+
except:
77+
if attempt == max_attempt:
78+
raise Exception("Could not open inventory.")
79+
continue
80+
81+
82+
def ncro_variable_map():
83+
varmap = pd.read_csv("variable_mappings.csv", header=0, comment="#")
84+
return varmap.loc[varmap.src_name == "wdl", :]
85+
86+
87+
# station_number,station_type,start_time,end_time,parameter,output_interval,download_link
88+
89+
mappings = {
90+
"Water Temperature": "temp",
91+
"Stage": "elev",
92+
"Conductivity": "ec",
93+
"Electrical Conductivity at 25C": "ec",
94+
"Fluorescent Dissolved Organic Matter": "fdom",
95+
"Water Temperature ADCP": "temp",
96+
"Dissolved Oxygen": "do",
97+
"Chlorophyll": "cla",
98+
"Dissolved Oxygen (%)": None,
99+
"Dissolved Oxygen Percentage": None,
100+
"Velocity": "velocity",
101+
"pH": "ph",
102+
"Turbidity": "turbidity",
103+
"Flow": "flow",
104+
"Salinity": "salinity",
105+
"ECat25C": "ec",
106+
"StreamFlow": "flow",
107+
"WaterTemp": "temp",
108+
"WaterTempADCP": "temp",
109+
"DissolvedOxygen": "do",
110+
"DissolvedOxygenPercentage": None,
111+
"StreamLevel": "elev",
112+
"WaterSurfaceElevationNAVD88": "elev",
113+
"fDOM": "fdom",
114+
}
115+
116+
117+
def download_station_period_record(row, dbase, dest, variables, failures):
118+
"""Downloads station/param combo period of record"""
119+
agency_id = row.station_number
120+
param = row.parameter
121+
if param in mappings.keys():
122+
var = mappings[param]
123+
if var is None:
124+
return
125+
if var not in variables:
126+
return
127+
else:
128+
logger.info(f"Problem on row: {row}")
129+
if type(param) == float:
130+
if np.isnan(param):
131+
return # todo: this is a fix for an NCRO-end bug. Really the ValueError is best
132+
raise ValueError(f"No standard mapping for NCRO parameter {param}.")
133+
134+
# printed.add(param)
135+
var = mappings[param]
136+
link_url = row.download_link
137+
sdate = row.start_time
138+
edate = row.end_time
139+
entry = None
140+
ndx = ""
141+
for suffix in ["", "00", "Q"]:
142+
full_id = dbase.agency_id + suffix
143+
entry = dbase.index[full_id == agency_id]
144+
if len(entry) > 1:
145+
raise ValueError(f"multiple entries for agency id {agency_id} in database")
146+
elif not entry.empty:
147+
station_id = str(entry[0])
148+
149+
if station_id == "":
150+
raise ValueError(
151+
f"Item {agency_id} not found in station database after accounting for Q and 00 suffixes"
152+
)
153+
154+
fname = f"ncro_{station_id}_{agency_id}_{var}_{sdate.year}_{edate.year}.csv".lower()
155+
fpath = os.path.join(dest, fname)
156+
logger.info(f"Processing: {agency_id} {param} {sdate} {edate}")
157+
logger.info(link_url)
158+
159+
attempt = 0
160+
max_attempt = 20
161+
station_html = ""
162+
163+
while attempt < max_attempt:
164+
attempt = attempt + 1
165+
try:
166+
if attempt > 16:
167+
logger.info(f"{station_id} attempt {attempt}")
168+
if attempt > 16:
169+
logger.info(fname)
170+
171+
response = requests.get(link_url,verify=False, stream=True)
172+
response.raise_for_status()
173+
with open(fpath, "w") as f:
174+
for chunk in response.iter_lines(chunk_size=4096): # Iterate over lines
175+
if chunk: # Filter out keep-alive new chunks
176+
#station_html += chunk.decode('utf-8')+"\n"
177+
#station_html = response.content.decode('utf-8').replace("\r", "")
178+
f.write(chunk.decode('utf-8')+"\n")
179+
break
180+
except Exception as e:
181+
if attempt == max_attempt:
182+
logger.warning(
183+
f"Failure in URL request or reading the response after {attempt} tries for station {station_id} param {param}. Link=\n{link_url}\nException below:"
184+
)
185+
logger.exception(e)
186+
failures.append((station_id, agency_id, var, param))
187+
attampt = 0
188+
return
189+
else:
190+
time.sleep(
191+
attempt
192+
) # Wait one second more second each time to clear any short term bad stuff
193+
if len(station_html) > 30 and not "No sites found matching" in station_html:
194+
found = True
195+
if attempt > 1:
196+
logger.info(f"{station_id} found on attempt {attempt}")
197+
with open(fpath, "w") as f:
198+
pass #f.write(station_html)
199+
else:
200+
logger.info(f"{station_id} not found after attempt {attempt}")
201+
logger.info("Station %s produced no data" % station_id)
202+
failures.append((station_id, agency_id, var, param))
203+
return
204+
205+
206+
def download_ncro_period_record(inventory, dbase, dest, variables=None):
207+
208+
if variables is None:
209+
variables = ["flow", "elev", "ec", "temp", "do", "ph", "turbidity", "cla"]
210+
global mappings
211+
# mappings = ncro_variable_map()
212+
213+
failures = []
214+
# Use ThreadPoolExecutor
215+
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
216+
# Schedule the download tasks and handle them asynchronously
217+
futures = []
218+
for ndx, row in inventory.iterrows():
219+
future = executor.submit(
220+
download_station_period_record,
221+
row,
222+
dbase,
223+
dest,
224+
variables,
225+
failures
226+
)
227+
futures.append(future)
228+
# Optionally, handle the results of the tasks
229+
for future in concurrent.futures.as_completed(futures):
230+
try:
231+
future.result() # This line can be used to handle results or exceptions from the tasks
232+
except Exception as e:
233+
logger.error(f"Exception occurred during download: {e}")
234+
235+
logger.debug("Failures in download_ncro")
236+
for f in failures:
237+
logger.debug(f)
238+
239+
240+
def download_ncro_por(dest, variables=None):
241+
idf = download_ncro_inventory(dest)
242+
dbase = station_dbase()
243+
upper_station = idf.station_number.str.upper()
244+
is_in_dbase = (
245+
upper_station.isin(dbase.agency_id)
246+
| upper_station.isin(dbase.agency_id + "00")
247+
| upper_station.isin(dbase.agency_id + "Q")
248+
)
249+
if variables is None:
250+
variables = [
251+
"flow",
252+
"velocity",
253+
"elev",
254+
"ec",
255+
"temp",
256+
"do",
257+
"ph",
258+
"turbidity",
259+
"cla",
260+
]
261+
download_ncro_period_record(idf.loc[is_in_dbase, :], dbase, dest, variables)
262+
263+
264+
def create_arg_parser():
265+
parser = argparse.ArgumentParser()
266+
parser.add_argument(
267+
"--por",
268+
dest="por",
269+
action="store_true",
270+
help="Do period of record download. Must be explicitly set to true in anticipation of other options",
271+
)
272+
parser.add_argument(
273+
"--dest",
274+
dest="dest_dir",
275+
default=".",
276+
help="Destination directory for downloaded files.",
277+
)
278+
parser.add_argument(
279+
"--param",
280+
dest="param",
281+
nargs="+",
282+
default=None,
283+
help="Parameters to download.",
284+
)
285+
return parser
286+
287+
288+
def main():
289+
parser = create_arg_parser()
290+
args = parser.parse_args()
291+
destdir = args.dest_dir
292+
por = args.por
293+
variables = args.param
294+
dest = "."
295+
download_ncro_por(destdir, variables)
296+
297+
298+
if __name__ == "__main__":
299+
main()

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
"download_wdl=dms_datastore.download_wdl:main",
6666
"download_nwis=dms_datastore.download_nwis:main",
6767
"download_des=dms_datastore.download_des:main",
68-
"download_ncro=dms_datastore.download_ncro:main",
68+
"download_ncro=dms_datastore.download_ncro_cnra:main",
6969
"download_mokelumne=dms_datastore.download_mokelumne:main",
7070
"download_ucdipm=dms_datastore.download_ucdipm:main",
7171
"download_cimis=dms_datastore.download_cimis:main",

0 commit comments

Comments
 (0)