Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions Flash shell cookbook.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
1. start docker
2. anaconda
3. fdb.bat
4. flask.bat
5. In Flask:
from app.data.processing.fetch_calculate_and_output import fetch_and_output
begin_date = "2025-07-02"
end_date = "2025-08-01"
fetch_and_output(begin_date, end_date)


query = f"""SELECT * FROM predictions"""
df.to_csv("prediction.csv")

import importlib
importlib.reload(parse_usgs_data)
flask code to extract

import requests
from app.data.processing.hobolink import get_live_hobolink_data
from app.data.processing.predictive_models import v4
from app.data.processing.usgs import parse_usgs_data

from datetime import datetime

This section was attempting to build an extract that exactly matched the date/time of the model (legacy) extract.
Turns out that the stream flow calculations are not sensitive to the date range -- all the aggregation in the code is to collect all samples for one day. I have concluded that the flow data that the USGS API now provides is slightly different than the legacy extract.

begin_date = "2025-07-02 13:00:00"
end_date = "2025-08-01 12:00:00"
u_begin_date = "2025-07-02T13:00-04:00"
u_end_date = "2025-08-01T12:00-04:00"
begin_date = "2025-07-02"
end_date = "2025-08-01"

d_begin_date = datetime.strptime(begin_date, "%Y-%m-%d")
d_begin_date = datetime.strptime(begin_date, "%Y-%m-%d %H:%M:%S")
d_end_date = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")
d_end_date = datetime.strptime(end_date, "%Y-%m-%d")

df_hobolink = get_live_hobolink_data(start_date = d_begin_date, end_date = d_end_date)

res_w = requests.get("https://nwis.waterdata.usgs.gov/usa/nwis/uv/", params={"cb_00060": "on", "cb_00065": "on", "format": "rdb", "site_no": "01104500", "legacy": "1", "period": "", "begin_date": u_begin_date, "end_date": u_end_date})


res_b = requests.get("https://nwis.waterdata.usgs.gov/usa/nwis/uv/", params={"cb_00045": "off", "cb_00065": "on", "format": "rdb", "site_no": "01104683", "legacy": "1", "period": "", "begin_date": u_begin_date, "end_date": u_end_date})


df_usgs_w = parse_usgs_data(res_w,site_no="01104500" )


df_usgs_b = parse_usgs_data(res_b, site_no="01104683")

df_combined = v4.process_data(df_hobolink=df_hobolink, df_usgs_w=df_usgs_w, df_usgs_b=df_usgs_b)
df_predictions = v4.all_models(df_combined)

df_combined.to_csv(f"{begin_date} - {end_date}-combined.csv", index=False)
df_predictions.to_csv(f"{begin_date} - {end_date}-predictions.csv", index=False)

df_usgs_w.to_csv(f"{begin_date} - {end_date}-usgs_w.csv", index=False)
df_usgs_b.to_csv(f"{begin_date} - {end_date}-usgs_b.csv", index=False)
df_hobolink.to_csv(f"{begin_date} - {end_date}-hobolink.csv", index=False)

This is the recommendation from Claude to reduce the SSL errors. "Solution 1: Use the newer waterservices endpoint (recommended)"

res_w = requests.get("https://waterservices.usgs.gov/nwis/iv/", params={"sites": "01104500","parameterCd": "00060,00065", "startDT": u_begin_date, "endDT": u_end_date,"format": "rdb"})


res_b = requests.get("https://waterservices.usgs.gov/nwis/iv/",params={"sites": "01104683","parameterCd": "00065", "startDT": u_begin_date, "endDT": u_end_date,"format": "rdb"})


2. TO start flask shell:
docker compose run web flask shell



from app.data.processing.fetch_calculate_and_output import fetch_and_output
begin_date = "2025-05-01"
end_date = "2025-06-04"
fetch_and_output(begin_date, end_date)
1 change: 1 addition & 0 deletions app/admin/views/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def download_from_db(self, sql_table_name: str):
# The reason it's OK in this case is because users don't touch it.
# However it is dangerous to do this in some other contexts.
query = f"""SELECT * FROM {sql_table_name}"""
# query = f"""SELECT * FROM prediction"""
try:
df = execute_sql(query)
except ProgrammingError:
Expand Down
16 changes: 16 additions & 0 deletions app/data/processing/Predictive Model Outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pandas as pd
import requests


# Get data and parse returned JSON
url = "http://localhost/api/v1/model"
res = requests.get(url).json()
records = [
{"reach": reach["reach"], **row}
for reach in res["model_outputs"]
for row in reach["predictions"]
]

# Turn into Pandas DataFrame
df = pd.DataFrame(records)
print(df.head())
91 changes: 91 additions & 0 deletions app/data/processing/fetch_calculate_and_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
These are intended to be run from the Flask Shell. They are not incorporated in the web app.

To run:
1. start docker
2. anaconda
3. fdb.bat
4. flask.bat
5. In Flask:
from app.data.processing.fetch_calculate_and_output import fetch_and_output, consolidate
begin_date = "2025-07-02"
end_date = "2025-08-01"
fetch_and_output(begin_date, end_date)
"""

import os
import re
from datetime import datetime

import pandas as pd
import requests

from app.data.processing.hobolink import get_live_hobolink_data
from app.data.processing.predictive_models import v4
from app.data.processing.usgs import parse_usgs_data


dir_pattern_re = re.compile("^2025-[0-9][0-9]-[0-9][0-9]")


def fetch_and_output(begin_date: str, end_date: str) -> None:
d_begin_date = datetime.strptime(begin_date, "%Y-%m-%d")
d_end_date = datetime.strptime(end_date, "%Y-%m-%d")
df_hobolink = get_live_hobolink_data(start_date=d_begin_date, end_date=d_end_date)
res_w = requests.get(
"https://waterservices.usgs.gov/nwis/iv/",
params={
"sites": "01104500",
"parameterCd": "00060,00065",
"startDT": begin_date,
"endDT": end_date,
"format": "rdb",
},
)

res_b = requests.get(
"https://waterservices.usgs.gov/nwis/iv/",
params={
"sites": "01104683",
"parameterCd": "00065",
"startDT": begin_date,
"endDT": end_date,
"format": "rdb",
},
)

df_usgs_w = parse_usgs_data(res_w, site_no="01104500")
df_usgs_b = parse_usgs_data(res_b, site_no="01104683")

df_combined = v4.process_data(df_hobolink=df_hobolink, df_usgs_w=df_usgs_w, df_usgs_b=df_usgs_b)
df_predictions = v4.all_models(df_combined)

df_combined.to_csv(f"{begin_date} - {end_date}-combined.csv", index=False)
df_predictions.to_csv(f"{begin_date} - {end_date}-predictions.csv", index=False)

df_usgs_w.to_csv(f"{begin_date} - {end_date}-usgs_w.csv", index=False)
df_usgs_b.to_csv(f"{begin_date} - {end_date}-usgs_b.csv", index=False)
df_hobolink.to_csv(f"{begin_date} - {end_date}-hobolink.csv", index=False)
return


def consolidate(category: str) -> None:
dir_list = os.listdir(".")
match_list = []
for file in dir_list:
if dir_pattern_re.search(file) and file.find(category.lower()) > -1:
match_list.append(file)
match_list.sort()
first = True
for file in match_list:
df = pd.read_csv(file)
if first:
first = False
consol_df = df
else:
consol_df = pd.concat([consol_df, df])
subset_cols = ["reach_id", "time"] if category.lower().startswith("pred") else ["time"]
consol_df = consol_df.drop_duplicates(subset=subset_cols)
consol_df.to_csv(match_list[0][:10] + " - " + match_list[-1][13:] + ".csv", index=False)
print(consol_df.shape)
return
5 changes: 3 additions & 2 deletions app/data/processing/hobolink.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
def get_live_hobolink_data(
start_date: datetime | None = None,
end_date: datetime | None = None,
# When this value was 50, Hobolink wasn't returning Temperature. Change to 30 Apri 2026
days_ago: int = 30,
loggers: str | None = None,
exclude_sensors: list[str] | None = None,
Expand Down Expand Up @@ -73,9 +74,9 @@ def request_to_hobolink(
data: list[dict[str, Any]] = []

start_date_for_req = start_date
end_date_for_req = min(start_date + pagination_delta, end_date)
# end_date_for_req = min(start_date + pagination_delta, end_date)
end_date_for_req = end_date
half_interval = False

while True:
res = requests.get(
urljoin(BASE_URL, "/v1/data"),
Expand Down
3 changes: 0 additions & 3 deletions app/data/processing/predictive_models/v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def process_data(
df_hobolink = df_hobolink.copy()
df_usgs_w = df_usgs_w.copy()
df_usgs_b = df_usgs_b.copy()

# Cast to datetime type.
# When this comes from Celery, it might be a string.
df_hobolink["time"] = pd.to_datetime(df_hobolink["time"])
Expand All @@ -55,7 +54,6 @@ def process_data(
gamma = np.log(df_hobolink["rh"] / 100) + (b * temp_celsius) / (c + temp_celsius)
dew_point_est = (c * gamma / (b - gamma)) * 9 / 5 + 32
df_hobolink["dew_point"] = df_hobolink["dew_point"].fillna(dew_point_est)

# Now collapse the data.
# Take the mean measurements of everything except rain; rain is the sum
# within an hour. (HOBOlink devices record all rain seen in 10 minutes).
Expand Down Expand Up @@ -97,7 +95,6 @@ def process_data(
)
.reset_index()
)

# This is an outer join to include all the data (we collect more Hobolink
# data than USGS data). With that said, for the most recent value, we need
# to make sure one of the sources didn't update before the other one did.
Expand Down
41 changes: 37 additions & 4 deletions app/data/processing/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"""

import os
from datetime import datetime
from datetime import timedelta
from typing import Union

import pandas as pd
Expand All @@ -22,8 +24,9 @@


USGS_URL = "https://waterdata.usgs.gov/nwis/uv"
NEW_USGS_URL = "https://waterservices.usgs.gov/nwis/iv/"
USGS_STATIC_FILE_NAME = "usgs.pickle"
USGS_DEFAULT_DAYS_AGO = 30
USGS_DEFAULT_DAYS_AGO = 100
USGS_ROWS_PER_HOUR_WALTHAM = 4
USGS_ROWS_PER_HOUR_MUDDY_RIVER = 6

Expand All @@ -48,6 +51,7 @@ def get_live_usgs_data(
return df

res = request_to_usgs(days_ago=days_ago, site_no=site_no)

df = parse_usgs_data(res, site_no=site_no)
return df

Expand All @@ -62,13 +66,17 @@ def request_to_usgs(days_ago: int = 14, site_no: str = "01104500") -> requests.m
Returns:
Request Response containing the data from the request.
"""

"""Replaced this code with the new calling sequence for USGS data April 2026
# if site is waltham, takes both gage height and flow discharge,
# otherwise, only takes gage height

if site_no == "01104500":
additional_feature = "on"
else:
additional_feature = "off"


payload = {
"cb_00060": additional_feature,
"cb_00065": "on", # always accepts gage height
Expand All @@ -78,6 +86,27 @@ def request_to_usgs(days_ago: int = 14, site_no: str = "01104500") -> requests.m
}

res = requests.get(USGS_URL, params=payload)
"""

# New code April 2026

today = datetime.now().date()
delta_days_ago = today - timedelta(days=days_ago)

start_date_str = delta_days_ago.strftime("%Y-%m-%d")
end_date_str = today.strftime("%Y-%m-%d")

payload = {
"sites": site_no,
"parameterCd": "00060,00065",
"format": "rdb",
"startDT": start_date_str,
"endDT": end_date_str,
}

res = requests.get(NEW_USGS_URL, params=payload)
###

if res.status_code >= 400:
error_msg = (
"API request to the USGS endpoint failed with status " f"code {res.status_code}."
Expand Down Expand Up @@ -118,17 +147,21 @@ def parse_usgs_data(res: Union[str, requests.models.Response], site_no: str) ->
if site_no not in column_map:
raise ValueError(f"Unknown site number {site_no}. Cannot map columns.")
df = df.rename(columns=column_map[site_no])

df = df[list(column_map[site_no].values())]

# Convert types
df["time"] = pd.to_datetime(df["time"]).dt.tz_localize("US/Eastern").dt.tz_convert("UTC")
df["time"] = (
pd.to_datetime(df["time"])
.dt.tz_localize("US/Eastern", ambiguous="NaT")
.dt.tz_convert("UTC")
)
# Note to self: ran this once in a test and it gave the following error:
# >>> ValueError: could not convert string to float: ''
# Reran and it went away
# The error was here in this line casting `stream_flow` to a float:

numeric_columns = set(column_map[site_no].values()) - {"time"} # All columns except "time"
for col in numeric_columns:
df[col] = df[col].replace("", None).astype(float)
df[col] = df[col].replace("", None).replace("Ice", None).astype(float)

return df
Loading