-
Notifications
You must be signed in to change notification settings - Fork 91
Description
Description
Utilize rolling temporal aggregations rather than directly from the datetime value.
For example, in a monthly dataset I may have 5 years worth of data, but the most recent year is only partially complete. With the current temporal aggregation, the last year values will be artificially low due to being a partial year. Instead, I could taking the rolling aggregates of the variables (12, 6, 4, & 1) to use the most recent information and still utilize temporal aggregation. I could then truncate the partial rolling year (or other frequency) at the beginning of the time series allowing the use of the most recent information.
In the code below, it will error out due to the horizon being 8 and that creates a partial year, but more importantly the final year value in the "Y_train_df" will be artificially low due to being a partial year.
import numpy as np
import pandas as pd
import plotly.express as px
from utilsforecast.data import generate_series
from statsforecast.models import AutoETS
from statsforecast.core import StatsForecast
from hierarchicalforecast.methods import BottomUp, MinTrace
from hierarchicalforecast.core import HierarchicalReconciliation
from hierarchicalforecast.utils import aggregate_temporal
from hierarchicalforecast.utils import make_future_dataframe
horizon = 8
Y_df = generate_series(1, freq='MS', min_length=144, max_length=144, with_trend=True)
Y_test_df = Y_df.groupby("unique_id", as_index=False, observed=True).tail(horizon)
Y_train_df = Y_df.drop(Y_test_df.index)
spec_temporal = {"year": 12, "semiannual": 6, "quarter": 3, "month":1}
Y_train_df, S_train_df, tags_train = aggregate_temporal(df=Y_train_df, spec=spec_temporal)
Y_test_df, S_test_df, tags_test = aggregate_temporal(df=Y_test_df, spec=spec_temporal)
Y_test_df_new = make_future_dataframe(Y_train_df, freq="MS", h=horizon)
Y_test_df_new, S_test_df_new, tags_test_new = aggregate_temporal(df=Y_test_df_new, spec=spec_temporal)
Y_hat_dfs = []
id_cols = ["unique_id", "temporal_id", "ds", "y"]
# We will train a model for each temporal level
for level, temporal_ids_train in tags_train.items():
# Filter the data for the level
Y_level_train = Y_train_df.query("temporal_id in @temporal_ids_train")
temporal_ids_test = tags_test[level]
Y_level_test = Y_test_df.query("temporal_id in @temporal_ids_test")
# For each temporal level we have a different frequency and forecast horizon
freq_level = pd.infer_freq(Y_level_train["ds"].unique())
horizon_level = Y_level_test["ds"].nunique()
# Train a model and create forecasts
fcst = StatsForecast(models=[AutoETS(model='ZZZ')], freq=freq_level, n_jobs=-1)
Y_hat_df_level = fcst.forecast(df=Y_level_train[["ds", "unique_id", "y"]], h=horizon_level, level=[80, 90])
# Add the test set to the forecast
Y_hat_df_level = Y_hat_df_level.merge(Y_level_test, on=["ds", "unique_id"], how="left")
# Put cols in the right order (for readability)
Y_hat_cols = id_cols + [col for col in Y_hat_df_level.columns if col not in id_cols]
Y_hat_df_level = Y_hat_df_level[Y_hat_cols]
# Append the forecast to the list
Y_hat_dfs.append(Y_hat_df_level)
Y_hat_df = pd.concat(Y_hat_dfs, ignore_index=True)
reconcilers = [
BottomUp(),
MinTrace(method="ols"),
]
hrec = HierarchicalReconciliation(reconcilers=reconcilers)
Y_rec_df = hrec.reconcile(Y_hat_df=Y_hat_df,
S=S_test_df,
tags=tags_test,
temporal=True,
level=[80, 90])
Use case
No response