11"""Scrape rs_context projects (HUC10) from Data Exchange and load the data S3 for Athena
22This version queries the Athena index of Data Exchange projects instead of using graphql API
33
4- Lorin 2026-03-23
4+ Downloads specific files and uses geo to bin rasters.
5+ Requires geo extras
6+ `uv sync --extra geo`
7+
8+ Lorin 2026-March-23
59
610"""
711
2125from pydex .lib .athena import query_to_dataframe
2226from pydex .lib .raster import Raster
2327
24- # RegEx for finding DEM files
28+ # RegEx for finding DEM, Vegetation and Metrics files
2529REGEXES = {"DEM_REGEX" : r'.*\/dem\.tif$' , "METRICS_REGEX" : r'.*rscontext_metrics\.json$' , "VEG_REGEX" : r'.*\/existing_veg\.tif$' }
2630S3_BUCKET = 'riverscapes-athena'
2731S3_BASE_PATH = 'data_exchange/rs-context'
@@ -53,29 +57,11 @@ def join_s3_key(*parts: str) -> str:
5357 return str (PurePosixPath (* parts ))
5458
5559
56- def get_matching_file (parent_dir : str , regex : str ) -> str | None :
57- """
58- Get the path to the first file in the parent directory that matches the regex.
59- Returns None if no file is found.
60- This is used to check if the output GeoPackage has already been downloaded and
61- to avoid downloading it again.
62- """
63-
64- regex = re .compile (regex )
65- for root , __dirs , files in os .walk (parent_dir ):
66- for file_name in files :
67- # Check if the file name matches the regex
68- if regex .match (file_name ):
69- return os .path .join (root , file_name )
70-
71- return None
72-
73-
7460def scrape_rscontext_project (s3 , rs_api : RiverscapesAPI , project : RiverscapesProject , download_dir : Path , skip_overwrite : bool ):
7561 """Scrape (download, transform, upload) a single project"""
7662 DOWNLOAD_RETRIES = 3
7763 log = Logger ("Scrape RSContext project" )
78- # upload to s3 key
64+ # S3 key for upload
7965 s3_key = join_s3_key (S3_BASE_PATH , f'{ project .huc } .json' )
8066 if project .huc is None or project .huc == '' :
8167 log .warning (f'Project { project .id } does not have a HUC. Skipping.' )
@@ -106,20 +92,20 @@ def scrape_rscontext_project(s3, rs_api: RiverscapesAPI, project: RiverscapesPro
10692 complete = False
10793 while retry < DOWNLOAD_RETRIES and complete is False :
10894 try :
109- rs_api .download_files (project_id = project .id , download_dir = huc_dir , re_filter = list (REGEXES .values ()))
95+ rs_api .download_files (project_id = project .id , download_dir = str ( huc_dir ) , re_filter = list (REGEXES .values ()))
11096 complete = True
11197 break
11298 except Exception as e :
11399 log .error (f'Error downloading files for project { project .id } : { e } ' )
114100 traceback .print_exc (file = sys .stdout )
115101 retry += 1
116- return
102+ continue
117103
118- dem_tif_path = huc_dir / 'topography' / 'dem_tif '
119- if not dem_tif_path .exists :
104+ dem_tif_path = huc_dir / 'topography' / 'dem.tif '
105+ if not dem_tif_path .exists () :
120106 raise FileNotFoundError (f'Could not find DEM file for project { project .id } ' )
121107 veg_tif_path = huc_dir / 'vegetation' / 'existing_veg.tif'
122- if not veg_tif_path .exists :
108+ if not veg_tif_path .exists () :
123109 raise FileNotFoundError (f'Could not find vegetation file for project { project .id } ' )
124110 metrics_json_path = huc_dir / 'rscontext_metrics.json'
125111 try :
@@ -137,27 +123,27 @@ def scrape_rscontext_project(s3, rs_api: RiverscapesAPI, project: RiverscapesPro
137123 metrics ['rs_context' ] = {}
138124 metrics ['rs_context' ]['dem_bins' ] = dem_bins
139125 metrics ['rs_context' ]['existing_veg_bins' ] = veg_bins
140- log .info (f'Writing HUC10 metrics to { huc10_json_path } ' )
141126
142127 # Add the project ID to the metrics so we can trace this back to its source
143128 metrics ['rs_context' ]['project_id' ] = project .id
144129 metrics ['rs_context' ]['model_version' ] = str (project .model_version )
145130
146- # Write the JSON back to `huc10code.json` (just for debugging purposes really)
131+ log .info (f'Writing HUC10 metrics to { huc10_json_path } ' )
132+ # Write the JSON back to `huc10_{huc}.json` (just for debugging purposes really)
147133 with open (huc10_json_path , 'w' , encoding = 'utf-8' ) as f :
148134 json .dump (metrics , f , indent = 2 )
149135
150136 # Now use boto3 to upload the file to S3
151- log .info (f'Uploading { huc10_json_path } to s3://{ S3_BUCKET } /{ s3_key } ' )
137+ # log.info(f'Uploading metrics to s3://{S3_BUCKET}/{s3_key}')
152138
153- s3 .put_object (Bucket = S3_BUCKET , Key = s3_key , Body = json .dumps (metrics ['rs_context' ]))
139+ # s3.put_object(Bucket=S3_BUCKET, Key=s3_key, Body=json.dumps(metrics['rs_context']))
154140
155141 except Exception as e :
156142 log .error (f'Error scraping HUC { project .huc } : { e } ' )
157143 traceback .print_exc (file = sys .stdout )
158144
159145
160- def scrape_rsprojects (rs_api : RiverscapesAPI , download_dir : Path , delete_downloads : bool ):
146+ def scrape_rsprojects (rs_api : RiverscapesAPI , download_dir : Path , delete_downloads : bool , skip_overwrite : bool ):
161147 """Scrape all projects matching criteria"""
162148 log = Logger ('Scrape RSContext' )
163149 projects_to_add_df = query_to_dataframe (missing_projects_query , 'identify new projects' )
@@ -176,12 +162,12 @@ def scrape_rsprojects(rs_api: RiverscapesAPI, download_dir: Path, delete_downloa
176162 count += 1
177163 prg .update (count )
178164
179- if delete_downloads is True and os . path . isdir ( huc_dir ):
165+ if delete_downloads is True and download_dir . is_dir ( ):
180166 try :
181- log .info (f'Deleting download directory { huc_dir } ' )
182- shutil .rmtree (huc_dir )
167+ log .info (f'Deleting download directory { download_dir } ' )
168+ shutil .rmtree (download_dir )
183169 except Exception as e :
184- log .error (f'Error deleting download directory { huc_dir } : { e } ' )
170+ log .error (f'Error deleting download directory { download_dir } : { e } ' )
185171
186172
187173def main ():
@@ -191,25 +177,29 @@ def main():
191177 parser = argparse .ArgumentParser ()
192178 parser .add_argument ('stage' , help = 'Environment: staging or production' , type = str )
193179 parser .add_argument ('working_folder' , help = 'top level folder for downloads and output' , type = str )
194- parser .add_argument ('--delete' , help = 'Whether or not to delete downloaded GeoPackages' , action = 'store_true' , default = False )
180+ parser .add_argument ('--delete' , help = 'Delete downloaded files after processing' , action = 'store_true' , default = False )
181+ parser .add_argument ('--skip-overwrite' , help = 'Whether or not to skip overwriting existing S3 files' , action = 'store_true' , default = False )
195182
196183 args = dotenv .parse_args_env (parser )
197184
198185 # Set up some reasonable folders to store things
199186 working_folder = Path (args .working_folder )
200187 download_folder = working_folder / 'downloads'
188+ safe_makedirs (str (working_folder ))
201189
202190 log = Logger ('Setup' )
203191 log .setup (log_path = working_folder / 'rscontext_to_athena.log' , log_level = logging .DEBUG )
204192 try :
205193 with RiverscapesAPI (stage = args .stage ) as rs_api :
206- scrape_rsprojects (rs_api , download_folder , args .delete )
194+ scrape_rsprojects (rs_api , download_folder , args .delete , args . skip_overwrite )
207195
208196 except Exception as e :
209197 log .error (e )
210198 traceback .print_exc (file = sys .stdout )
211199 sys .exit (1 )
212200
201+ log .info ('Process complete' )
202+
213203
214204if __name__ == '__main__' :
215205 main ()
0 commit comments