From 77f22e6e3fe2e8d19e93e4e75d784ac521023ca1 Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Tue, 28 Feb 2023 20:36:59 +0000 Subject: [PATCH 1/7] initial commit --- src/troute-network/troute/DataAssimilation.py | 290 +++++++++++++++++- 1 file changed, 288 insertions(+), 2 deletions(-) diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index b5bb0a2c6..2f453e5ab 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -8,6 +8,291 @@ from collections import defaultdict from datetime import datetime + +### Base DA class: +class AbstractDA(ABC): + """ + + """ + __slots__ = ["_usgs_df", "_last_obs_df", "_reservoir_usgs_df", "_reservoir_usgs_param_df", + "_reservoir_usace_df", "_reservoir_usace_param_df", "_da_parameter_dict"] + def __init__(self, network): + # Trim the time-extent of the streamflow_da usgs_df + # what happens if there are timeslice files missing on the front-end? + # if the first column is some timestamp greater than t0, then this will throw + # an error. Need to think through this more. + if not self._usgs_df.empty: + self._usgs_df = self._usgs_df.loc[:,network.t0:] + + @property + def usgs_df(self): + return self._usgs_df + + +### Fundamental DA class definitions: +class NudgingDA(AbstractDA): + """ + + """ + def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_parameters=None, da_run=[]): + + # isolate user-input parameters for streamflow data assimilation + streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) + + da_parameter_dict = {"da_decay_coefficient": data_assimilation_parameters.get("da_decay_coefficient", 120), + "diffusive_streamflow_nudging": False} + + # determine if user explictly requests streamflow DA + nudging = False + if streamflow_da_parameters: + nudging = streamflow_da_parameters.get('streamflow_nudging', False) + + da_parameter_dict["diffusive_streamflow_nudging"] = streamflow_da_parameters.get("diffusive_streamflow_nudging", False) + + self._da_parameter_dict = da_parameter_dict + + self._last_obs_df = pd.DataFrame() + self._usgs_df = pd.DataFrame() + + # If streamflow nudging is turned on, create lastobs_df and usgs_df: + if nudging: + if wb_gage_ids: + self._usgs_df = pd.DataFrame(data=None, + index=wb_gage_ids) + self._last_obs_df = pd.DataFrame(data=None, + index=wb_gage_ids) + + else: + lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None) + lastobs_crosswalk_file = streamflow_da_parameters.get("gage_segID_crosswalk_file", None) + lastobs_start = streamflow_da_parameters.get("wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time", 0) + + if lastobs_file: + self._last_obs_df = build_lastobs_df( + lastobs_file, + lastobs_crosswalk_file, + lastobs_start, + ) + + # replace link ids with lake ids, for gages at waterbody outlets, + # otherwise, gage data will not be assimilated at waterbody outlet + # segments. + if network.link_lake_crosswalk: + self._last_obs_df = _reindex_link_to_lake_id(self._last_obs_df, network.link_lake_crosswalk) + + self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) + + + +class PersistenceDA(AbstractDA): + """ + + """ + def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_parameters=None, da_run=[]): + + # isolate user-input parameters for reservoir data assimilation + reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', None) + streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) + + # check if user explictly requests USGS and/or USACE reservoir DA + usgs_persistence = False + usace_persistence = False + if reservoir_da_parameters: + usgs_persistence = reservoir_da_parameters.get('reservoir_persistence_usgs', False) + usace_persistence = reservoir_da_parameters.get('reservoir_persistence_usace', False) + + #-------------------------------------------------------------------------------- + # Assemble Reservoir dataframes + #-------------------------------------------------------------------------------- + reservoir_usgs_df = pd.DataFrame() + reservoir_usgs_param_df = pd.DataFrame() + reservoir_usace_df = pd.DataFrame() + reservoir_usace_param_df = pd.DataFrame() + + if wb_gage_ids: + if usgs_persistence: + reservoir_usgs_df = pd.DataFrame(data=None, + index=wb_gage_ids) + # create reservoir hybrid DA initial parameters dataframe + if not reservoir_usgs_df.empty: + reservoir_usgs_param_df = pd.DataFrame( + data = 0, + index = reservoir_usgs_df.index, + columns = ['update_time'] + ) + reservoir_usgs_param_df['prev_persisted_outflow'] = np.nan + reservoir_usgs_param_df['persistence_update_time'] = 0 + reservoir_usgs_param_df['persistence_index'] = 0 + else: + reservoir_usgs_param_df = pd.DataFrame() + + else: + if usgs_persistence: + # if usgs_df is already created, make reservoir_usgs_df from that rather than reading in data again + if not self._usgs_df.empty: + + gage_lake_df = ( + network.usgs_lake_gage_crosswalk. + reset_index(). + set_index(['usgs_gage_id']) # <- TODO use input parameter for this + ) + + # build dataframe that crosswalks gageIDs to segmentIDs + gage_link_df = ( + network.link_gage_df['gages']. + reset_index(). + set_index(['gages']) + ) + + # build dataframe that crosswalks segmentIDs to lakeIDs + link_lake_df = ( + gage_lake_df. + join(gage_link_df, how = 'inner'). + reset_index().set_index('link'). + drop(['index'], axis = 1) + ) + + # resample `usgs_df` to 15 minute intervals + usgs_df_15min = ( + self._usgs_df. + transpose(). + resample('15min').asfreq(). + transpose() + ) + + # subset and re-index `usgs_df`, using the segID <> lakeID crosswalk + reservoir_usgs_df = ( + usgs_df_15min.join(link_lake_df, how = 'inner'). + reset_index(). + set_index('usgs_lake_id'). + drop(['index'], axis = 1) + ) + + # create reservoir hybrid DA initial parameters dataframe + if not reservoir_usgs_df.empty: + reservoir_usgs_param_df = pd.DataFrame( + data = 0, + index = reservoir_usgs_df.index , + columns = ['update_time'] + ) + reservoir_usgs_param_df['prev_persisted_outflow'] = np.nan + reservoir_usgs_param_df['persistence_update_time'] = 0 + reservoir_usgs_param_df['persistence_index'] = 0 + else: + reservoir_usgs_param_df = pd.DataFrame() + + else: + ( + reservoir_usgs_df, + reservoir_usgs_param_df + ) = _create_reservoir_df( + data_assimilation_parameters, + reservoir_da_parameters, + streamflow_da_parameters, + run_parameters, + network, + da_run, + lake_gage_crosswalk = network.usgs_lake_gage_crosswalk, + res_source = 'usgs') + else: + reservoir_usgs_df = pd.DataFrame() + reservoir_usgs_param_df = pd.DataFrame() + + if usace_persistence: + ( + reservoir_usace_df, + reservoir_usace_param_df + ) = _create_reservoir_df( + data_assimilation_parameters, + reservoir_da_parameters, + streamflow_da_parameters, + run_parameters, + network, + da_run, + lake_gage_crosswalk = network.usace_lake_gage_crosswalk, + res_source = 'usace') + else: + reservoir_usace_df = pd.DataFrame() + reservoir_usace_param_df = pd.DataFrame() + + self._reservoir_usgs_df = reservoir_usgs_df + self._reservoir_usgs_param_df = reservoir_usgs_param_df + self._reservoir_usace_df = reservoir_usace_df + self._reservoir_usace_param_df = reservoir_usace_param_df + + +class RFCDA(AbstractDA): + """ + + """ + def __init__(self,): + + pass + + +### Combinations of fundamental DA classes: +class Data_Assimilation(NudgingDA, PersistenceDA, RFCDA): + """ + + """ + def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_parameters=None, da_run=[]): + + NudgingDA.__init__(self, data_assimilation_parameters, network, wb_gage_ids, run_parameters, da_run) + PersistenceDA.__init__(self, data_assimilation_parameters, network, wb_gage_ids, run_parameters, da_run) + RFCDA.__init__(self,) + + def update_after_compute(self,): + ''' + + ''' + pass + + def update_for_next_loop(self,): + ''' + + ''' + pass + + + @property + def assimilation_parameters(self): + return self._da_parameter_dict + + @property + def lastobs_df(self): + return self._last_obs_df + + @property + def usgs_df(self): + return self._usgs_df + + @property + def reservoir_usgs_df(self): + return self._reservoir_usgs_df + + @property + def reservoir_usgs_param_df(self): + return self._reservoir_usgs_param_df + + @property + def reservoir_usace_df(self): + return self._reservoir_usace_df + + @property + def reservoir_usace_param_df(self): + return self._reservoir_usace_param_df + + + + + + + + + + + + #FIXME parameterize into construciton showtiming = True verbose = True @@ -506,6 +791,7 @@ def read_reservoir_parameter_file( return df1, usgs_crosswalk, usace_crosswalk + class DataAssimilation(ABC): """ @@ -515,7 +801,7 @@ class DataAssimilation(ABC): @abstractmethod def usgs_df(self): pass - +''' class NudgingDA(DataAssimilation): """ @@ -582,7 +868,7 @@ def last_obs(self): @property def usgs_df(self): return self._usgs_df - +''' class AllDA(DataAssimilation): """ From 5f57f15d6a2cb72ba43fc0112edb18b9d19e234c Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Wed, 1 Mar 2023 21:28:17 +0000 Subject: [PATCH 2/7] edit update functions for each DA base class --- src/troute-network/troute/DataAssimilation.py | 835 ++++++------------ 1 file changed, 266 insertions(+), 569 deletions(-) diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index 2f453e5ab..e0d8d22e4 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -1,40 +1,27 @@ -from abc import ABC, abstractmethod import troute.nhd_io as nhd_io import pandas as pd import numpy as np import pathlib -import time import xarray as xr -from collections import defaultdict from datetime import datetime -### Base DA class: -class AbstractDA(ABC): +# ----------------------------------------------------------------------------- +# Base DA class definitions: +# 1. NudgingDA: streamflow nudging from usgs gages +# 2. PersistenceDA: USGS and USACE reservoir persistence +# 3. RFCDA: RFC forecasts for reservoirs +# ----------------------------------------------------------------------------- +class NudgingDA: """ """ - __slots__ = ["_usgs_df", "_last_obs_df", "_reservoir_usgs_df", "_reservoir_usgs_param_df", - "_reservoir_usace_df", "_reservoir_usace_param_df", "_da_parameter_dict"] - def __init__(self, network): - # Trim the time-extent of the streamflow_da usgs_df - # what happens if there are timeslice files missing on the front-end? - # if the first column is some timestamp greater than t0, then this will throw - # an error. Need to think through this more. - if not self._usgs_df.empty: - self._usgs_df = self._usgs_df.loc[:,network.t0:] - - @property - def usgs_df(self): - return self._usgs_df + __slots__ = ["_usgs_df", "_last_obs_df", "_da_parameter_dict"] + def __init__(self, network, wb_gage_ids=None, da_run=[]): -### Fundamental DA class definitions: -class NudgingDA(AbstractDA): - """ - - """ - def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_parameters=None, da_run=[]): + data_assimilation_parameters = self._data_assimilation_parameters + run_parameters = self._run_parameters # isolate user-input parameters for streamflow data assimilation streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) @@ -81,15 +68,74 @@ def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_ self._last_obs_df = _reindex_link_to_lake_id(self._last_obs_df, network.link_lake_crosswalk) self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) + + def update_after_compute(self, run_results,): + ''' + Function to update data assimilation object after running routing module. + + Arguments: + ---------- + - run_results (list): output from the compute kernel sequence, + organized (because that is how it comes + out of the kernel) by network. + For each item in the result, there are + seven elements, the fifth (usgs) and sixth + (usace) of which are lists of five elements + containing: 1) a list of the segments ids + where data assimilation was performed (if any) + in that network; 2) a list of the lupdate time; + 3) a list of the previously persisted outflow; + 4) a list of the persistence index; 5) a list + of the persistence update time. + + Returns: + -------- + - data_assimilation (Object): Object containing all data assimilation information + - lastobs_df (DataFrame): Last gage observations data for DA + ''' + streamflow_da_parameters = self._data_assimilation_parameters.get('streamflow_da', None) + + if streamflow_da_parameters: + if streamflow_da_parameters.get('streamflow_nudging', False): + self._last_obs_df = new_lastobs(run_results, self._run_parameters.get("dt") * self._run_parameters.get("nts")) + + def update_for_next_loop(self, network, da_run,): + ''' + Function to update data assimilation object for the next loop iteration. + + Arguments: + ---------- + - network (Object): network object created from abstract class + - da_run (list): list of data assimilation files separated + by for loop chunks + Returns: + -------- + - data_assimilation (Object): Object containing all data assimilation information + - usgs_df (DataFrame): dataframe of USGS gage observations + ''' + data_assimilation_parameters = self._data_assimilation_parameters + run_parameters = self._run_parameters + # update usgs_df if it is not empty + streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) + + if streamflow_da_parameters.get('streamflow_nudging', False): + self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) + -class PersistenceDA(AbstractDA): +class PersistenceDA: """ """ - def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_parameters=None, da_run=[]): + __slots__ = ["_reservoir_usgs_df", "_reservoir_usgs_param_df", + "_reservoir_usace_df", "_reservoir_usace_param_df"] + + def __init__(self, network, wb_gage_ids=None, da_run=[]): + data_assimilation_parameters = self._data_assimilation_parameters + run_parameters = self._run_parameters + # isolate user-input parameters for reservoir data assimilation reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', None) streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) @@ -219,39 +265,219 @@ def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_ self._reservoir_usgs_param_df = reservoir_usgs_param_df self._reservoir_usace_df = reservoir_usace_df self._reservoir_usace_param_df = reservoir_usace_param_df + + # Trim the time-extent of the streamflow_da usgs_df + # what happens if there are timeslice files missing on the front-end? + # if the first column is some timestamp greater than t0, then this will throw + # an error. Need to think through this more. + if not self._usgs_df.empty: + self._usgs_df = self._usgs_df.loc[:,network.t0:] + def update_after_compute(self, run_results,): + ''' + Function to update data assimilation object after running routing module. + + Arguments: + ---------- + - run_results (list): output from the compute kernel sequence, + organized (because that is how it comes + out of the kernel) by network. + For each item in the result, there are + seven elements, the fifth (usgs) and sixth + (usace) of which are lists of five elements + containing: 1) a list of the segments ids + where data assimilation was performed (if any) + in that network; 2) a list of the lupdate time; + 3) a list of the previously persisted outflow; + 4) a list of the persistence index; 5) a list + of the persistence update time. + + Returns: + -------- + - data_assimilation (Object): Object containing all data assimilation information + - reservoir_usgs_param_df (DataFrame): USGS reservoir DA parameters + - reservoir_usace_param_df (DataFrame): USACE reservoir DA parameters + ''' + # get reservoir DA initial parameters for next loop itteration + self._reservoir_usgs_param_df, self._reservoir_usace_param_df = _set_reservoir_da_params(run_results) + + def update_for_next_loop(self, network, da_run,): + ''' + Function to update data assimilation object for the next loop iteration. + + Arguments: + ---------- + - network (Object): network object created from abstract class + - da_run (list): list of data assimilation files separated + by for loop chunks + + Returns: + -------- + - data_assimilation (Object): Object containing all data assimilation information + - reservoir_usgs_df (DataFrame): USGS reservoir observations + - reservoir_usace_df (DataFrame): USACE reservoir observations + ''' + data_assimilation_parameters = self._data_assimilation_parameters + run_parameters = self._run_parameters + + # update usgs_df if it is not empty + streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) + reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', None) + + if not self.usgs_df.empty: + + if reservoir_da_parameters.get('reservoir_persistence_usgs', False): + + gage_lake_df = ( + network.usgs_lake_gage_crosswalk. + reset_index(). + set_index(['usgs_gage_id']) # <- TODO use input parameter for this + ) + + # build dataframe that crosswalks gageIDs to segmentIDs + gage_link_df = ( + network.link_gage_df['gages']. + reset_index(). + set_index(['gages']) + ) + + # build dataframe that crosswalks segmentIDs to lakeIDs + link_lake_df = ( + gage_lake_df. + join(gage_link_df, how = 'inner'). + reset_index().set_index('link'). + drop(['index'], axis = 1) + ) + + # resample `usgs_df` to 15 minute intervals + usgs_df_15min = ( + self.usgs_df. + transpose(). + resample('15min').asfreq(). + transpose() + ) + + # subset and re-index `usgs_df`, using the segID <> lakeID crosswalk + self._reservoir_usgs_df = ( + usgs_df_15min.join(link_lake_df, how = 'inner'). + reset_index(). + set_index('usgs_lake_id'). + drop(['index'], axis = 1) + ) + + elif reservoir_da_parameters.get('reservoir_persistence_usgs', False): + ( + self._reservoir_usgs_df, + _, + ) = _create_reservoir_df( + data_assimilation_parameters, + reservoir_da_parameters, + streamflow_da_parameters, + run_parameters, + network, + da_run, + lake_gage_crosswalk = network.usgs_lake_gage_crosswalk, + res_source = 'usgs') + + # USACE + if reservoir_da_parameters.get('reservoir_persistence_usace', False): + + ( + self._reservoir_usace_df, + _, + ) = _create_reservoir_df( + data_assimilation_parameters, + reservoir_da_parameters, + streamflow_da_parameters, + run_parameters, + network, + da_run, + lake_gage_crosswalk = network.usace_lake_gage_crosswalk, + res_source = 'usace') + + # if there are no TimeSlice files available for hybrid reservoir DA in the next loop, + # but there are DA parameters from the previous loop, then create a + # dummy observations df. This allows the reservoir persistence to continue across loops. + # USGS Reservoirs + if not network.waterbody_types_dataframe.empty: + if 2 in network.waterbody_types_dataframe['reservoir_type'].unique(): + if self.reservoir_usgs_df.empty and len(self.reservoir_usgs_param_df.index) > 0: + self._reservoir_usgs_df = pd.DataFrame( + data = np.nan, + index = self.reservoir_usgs_param_df.index, + columns = [network.t0] + ) + + # USACE Reservoirs + if 3 in network.waterbody_types_dataframe['reservoir_type'].unique(): + if self.reservoir_usace_df.empty and len(self.reservoir_usace_param_df.index) > 0: + self._reservoir_usace_df = pd.DataFrame( + data = np.nan, + index = self.reservoir_usace_param_df.index, + columns = [network.t0] + ) + + ''' + # update RFC lookback hours if there are RFC-type reservoirs in the simulation domain + if 4 in network._waterbody_types_df['reservoir_type'].unique(): + waterbody_parameters = update_lookback_hours(run_parameters.get("dt"), run_parameters.get("nts"), waterbody_parameters) + ''' + + # Trim the time-extent of the streamflow_da usgs_df + # what happens if there are timeslice files missing on the front-end? + # if the first column is some timestamp greater than t0, then this will throw + # an error. Need to think through this more. + if not self.usgs_df.empty: + self._usgs_df = self.usgs_df.loc[:,network.t0:] -class RFCDA(AbstractDA): +class RFCDA: """ """ def __init__(self,): + pass + + def update_after_compute(self,): + pass + def update_for_next_loop(self,): pass -### Combinations of fundamental DA classes: -class Data_Assimilation(NudgingDA, PersistenceDA, RFCDA): +# -------------------------------------------------------------------- +# Combination of base DA classes. This is the DA object that is called +# by t-route. +# -------------------------------------------------------------------- +class DataAssimilation(NudgingDA, PersistenceDA, RFCDA): """ """ - def __init__(self, data_assimilation_parameters, network, wb_gage_ids=None, run_parameters=None, da_run=[]): + __slots__ = ["_data_assimilation_parameters", "_run_parameters"] + + def __init__(self, network, data_assimilation_parameters, run_parameters, wb_gage_ids=None, da_run=[]): - NudgingDA.__init__(self, data_assimilation_parameters, network, wb_gage_ids, run_parameters, da_run) - PersistenceDA.__init__(self, data_assimilation_parameters, network, wb_gage_ids, run_parameters, da_run) + self._data_assimilation_parameters = data_assimilation_parameters + self._run_parameters = run_parameters + + NudgingDA.__init__(self, network, wb_gage_ids, da_run) + PersistenceDA.__init__(self, network, wb_gage_ids, da_run) RFCDA.__init__(self,) - def update_after_compute(self,): + def update_after_compute(self, run_results,): ''' ''' - pass + NudgingDA.update_after_compute(self, run_results) + PersistenceDA.update_after_compute(self, run_results) + RFCDA.update_after_compute(self) - def update_for_next_loop(self,): + def update_for_next_loop(self, network, da_run,): ''' - + ''' - pass + NudgingDA.update_for_next_loop(self, network, da_run) + PersistenceDA.update_for_next_loop(self, network, da_run) + RFCDA.update_for_next_loop(self) @property @@ -282,53 +508,10 @@ def reservoir_usace_df(self): def reservoir_usace_param_df(self): return self._reservoir_usace_param_df - - - - - - - - - - - -#FIXME parameterize into construciton -showtiming = True -verbose = True - -def build_data_assimilation_csv(data_assimilation_parameters): - - return nhd_io.get_usgs_df_from_csv( - data_assimilation_parameters["data_assimilation_csv"], - data_assimilation_parameters["wrf_hydro_da_channel_ID_crosswalk_file"], - ) - -def build_data_assimilation_folder(data_assimilation_parameters, run_parameters): - - usgs_timeslices_folder = pathlib.Path( - data_assimilation_parameters["data_assimilation_timeslices_folder"], - ).resolve() - if "data_assimilation_filter" in data_assimilation_parameters: - da_glob_filter = data_assimilation_parameters["data_assimilation_filter"] - usgs_files = sorted(usgs_timeslices_folder.glob(da_glob_filter)) - elif "usgs_timeslice_files" in data_assimilation_parameters: - usgs_files = data_assimilation_parameters.get("usgs_timeslice_files", None) - usgs_files = [usgs_timeslices_folder.joinpath(f) for f in usgs_files] - else: - print("No Files Found for DA") - # TODO: Handle this with a real exception - - return nhd_io.get_usgs_from_time_slices_folder( - data_assimilation_parameters["wrf_hydro_da_channel_ID_crosswalk_file"], - usgs_files, - data_assimilation_parameters.get("qc_threshold", 1), - data_assimilation_parameters.get("data_assimilation_interpolation_limit", 59), - #FIXME/TODO collapse these into da parameters - run_parameters["dt"], - run_parameters["t0"], - ) +# -------------------------------------------------------------- +# Helper functions +# -------------------------------------------------------------- def _reindex_link_to_lake_id(target_df, crosswalk): ''' Utility function for replacing link ID index values @@ -791,489 +974,3 @@ def read_reservoir_parameter_file( return df1, usgs_crosswalk, usace_crosswalk - -class DataAssimilation(ABC): - """ - - """ - - @property - @abstractmethod - def usgs_df(self): - pass -''' -class NudgingDA(DataAssimilation): - """ - - """ - slots = ["_usgs_df", "_lastobs_df", "_da_params"] - def __init__(self, data_assimilation_parameters, run_parameters): - data_assimilation_csv = data_assimilation_parameters.get( - "data_assimilation_csv", None - ) - data_assimilation_folder = data_assimilation_parameters.get( - "data_assimilation_timeslices_folder", None - ) - # TODO: Copy comments from nhd_network_utilitys `build_data_assimilation_lastobs` - lastobs_file = data_assimilation_parameters.get("wrf_hydro_lastobs_file", None) - lastobs_start = data_assimilation_parameters.get( - "wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time", 0 - ) - lastobs_type = data_assimilation_parameters.get("wrf_lastobs_type", "error-based") - lastobs_crosswalk_file = data_assimilation_parameters.get( - "wrf_hydro_da_channel_ID_crosswalk_file", None - ) - - self._da_params = {} - - if data_assimilation_csv or data_assimilation_folder or lastobs_file: - self._da_params["da_decay_coefficient"] = data_assimilation_parameters.get("da_decay_coefficient", 120) - # TODO: Add parameters here for interpolation length (14/59), QC threshold (1.0) - - if showtiming: - start_time = time.time() - if verbose: - print("creating usgs time_slice data array ...") - self._last_obs_df = nhd_io.build_lastobs_df( - lastobs_file, - lastobs_crosswalk_file, - lastobs_type, # TODO: Confirm that we are using this; delete it if not. - lastobs_start, - ) - if data_assimilation_csv: - self._usgs_df = build_data_assimilation_csv(data_assimilation_parameters) - elif data_assimilation_folder: - self._usgs_df = build_data_assimilation_folder(data_assimilation_parameters, run_parameters) - - if not self._last_obs_df.index.empty: - if not self._usgs_df.empty and not self._usgs_df.index.equals(self._last_obs_df.index): - print("USGS Dataframe Index Does Not Match Last Observations Dataframe Index") - self._usgs_df = self._usgs_df.loc[self._last_obs_df.index] - if verbose: - print("usgs array complete") - if showtiming: - print("... in %s seconds." % (time.time() - start_time)) - else: - self._last_obs_df = pd.DataFrame() - self._usgs_df = pd.DataFrame() - - @property - def asssimilation_parameters(self): - return self._da_params - - @property - def last_obs(self): - return self._last_obs_df - - @property - def usgs_df(self): - return self._usgs_df -''' - -class AllDA(DataAssimilation): - """ - Object containing all data assimilation information. Created from user input - and a network object that defines the network and forcing parameters. TODO: this - could be broken up into separate objects specific to each type of DA (streamflow - nudging, reservoir persistence, etc.). But for now it is all done here. - - Arguments: - ---------- - - data_assimilation_parameters (dict): user input data re data assimilation - - run_parameters (dict): user input data re subset of compute configuration - - waterbody_parameters (dict): user input data re waterbody parameters - - network (Object): network object created from abstract class - - da_run (list): list of data assimilation files separated - by for loop chunks - - Returns: - -------- - - data_assimilation (Object): Object containing all data assimilation information - - usgs_df (DataFrame): dataframe of USGS gage observations - - lastobs_df (DataFrame): Last gage observations data for DA - - reservoir_usgs_df (DataFrame): USGS reservoir observations - - reservoir_usgs_param_df (DataFrame): USGS reservoir DA parameters - - reservoir_usace_df (DataFrame): USACE reservoir observations - - reservoir_usace_param_df (DataFrame): USACE reservoir DA parameters - - da_parameter_dict (dict): user input data re data assimilation - """ - __slots__ = ["_usgs_df", "_last_obs_df", "_reservoir_usgs_df", "_reservoir_usgs_param_df", "_reservoir_usace_df", "_reservoir_usace_param_df", "_da_parameter_dict"] - def __init__(self, data_assimilation_parameters, run_parameters, waterbody_parameters, network, da_run): - - #--------------------------------------------------------------------------- - # Determine which DA features to turn on - #--------------------------------------------------------------------------- - - # isolate user-input parameters for streamflow data assimilation - streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) - - # determine if user explictly requests streamflow DA - nudging = False - if streamflow_da_parameters: - nudging = streamflow_da_parameters.get('streamflow_nudging', False) - - usgs_timeslices_folder = data_assimilation_parameters.get('usgs_timeslices_folder', None) - - # isolate user-input parameters for reservoir data assimilation - reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', None) - - # check if user explictly requests USGS and/or USACE reservoir DA - usgs_persistence = False - usace_persistence = False - if reservoir_da_parameters: - usgs_persistence = reservoir_da_parameters.get('reservoir_persistence_usgs', False) - usace_persistence = reservoir_da_parameters.get('reservoir_persistence_usace', False) - param_file = reservoir_da_parameters.get('gage_lakeID_crosswalk_file', None) - - # check if RFC-type reservoirs are set to true - rfc_params = waterbody_parameters.get('rfc') - if rfc_params: - rfc_forecast = rfc_params.get('reservoir_rfc_forecasts',False) - param_file = rfc_params.get('reservoir_parameter_file',None) - else: - rfc_forecast = False - - level_pool_params = waterbody_parameters.get('level_pool', defaultdict(list)) - - - #-------------------------------------------------------------------------------- - # Produce lastobs_df if streamflow nudging is turned on - #-------------------------------------------------------------------------------- - lastobs_df = pd.DataFrame() - da_parameter_dict = {} - - if nudging: - lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None) - lastobs_crosswalk_file = streamflow_da_parameters.get("gage_segID_crosswalk_file", None) - lastobs_start = streamflow_da_parameters.get("wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time", 0) - - if lastobs_file: - lastobs_df = build_lastobs_df( - lastobs_file, - lastobs_crosswalk_file, - lastobs_start, - ) - - #da_parameter_dict["da_decay_coefficient"] = data_assimilation_parameters.get("da_decay_coefficient", 120) - #da_parameter_dict["diffusive_streamflow_nudging"] = streamflow_da_parameters.get("diffusive_streamflow_nudging", False) - - # replace link ids with lake ids, for gages at waterbody outlets, - # otherwise, gage data will not be assimilated at waterbody outlet - # segments. - if network.link_lake_crosswalk: - lastobs_df = _reindex_link_to_lake_id(lastobs_df, network.link_lake_crosswalk) - - da_parameter_dict["da_decay_coefficient"] = data_assimilation_parameters.get("da_decay_coefficient", 120) - da_parameter_dict["diffusive_streamflow_nudging"] = streamflow_da_parameters.get("diffusive_streamflow_nudging", False) - - self._last_obs_df = lastobs_df - self._da_parameter_dict = da_parameter_dict - # TODO: Add parameters here for interpolation length (14/59), QC threshold (1.0) - - - #-------------------------------------------------------------------------------- - # Assemble USGS observation data for Streamflow DA or USGS Reservoir Persistence - #-------------------------------------------------------------------------------- - # if user requested nudging or usgs_persistence and specified a USGS TimeSlice directory, - # then build and return USGS dataframe - if (nudging or usgs_persistence) and usgs_timeslices_folder: - - self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) - else: - self._usgs_df = pd.DataFrame() - - #-------------------------------------------------------------------------------- - # Assemble Reservoir dataframes - #-------------------------------------------------------------------------------- - - if usgs_persistence: - # if usgs_df is already created, make reservoir_usgs_df from that rather than reading in data again - if self._usgs_df.empty == False: - - gage_lake_df = ( - network.usgs_lake_gage_crosswalk. - reset_index(). - set_index(['usgs_gage_id']) # <- TODO use input parameter for this - ) - - # build dataframe that crosswalks gageIDs to segmentIDs - gage_link_df = ( - network.link_gage_df['gages']. - reset_index(). - set_index(['gages']) - ) - - # build dataframe that crosswalks segmentIDs to lakeIDs - link_lake_df = ( - gage_lake_df. - join(gage_link_df, how = 'inner'). - reset_index().set_index('link'). - drop(['index'], axis = 1) - ) - - # resample `usgs_df` to 15 minute intervals - usgs_df_15min = ( - self._usgs_df. - transpose(). - resample('15min').asfreq(). - transpose() - ) - - # subset and re-index `usgs_df`, using the segID <> lakeID crosswalk - reservoir_usgs_df = ( - usgs_df_15min.join(link_lake_df, how = 'inner'). - reset_index(). - set_index('usgs_lake_id'). - drop(['index'], axis = 1) - ) - - # create reservoir hybrid DA initial parameters dataframe - if reservoir_usgs_df.empty == False: - reservoir_usgs_param_df = pd.DataFrame( - data = 0, - index = reservoir_usgs_df.index , - columns = ['update_time'] - ) - reservoir_usgs_param_df['prev_persisted_outflow'] = np.nan - reservoir_usgs_param_df['persistence_update_time'] = 0 - reservoir_usgs_param_df['persistence_index'] = 0 - else: - reservoir_usgs_param_df = pd.DataFrame() - - else: - ( - reservoir_usgs_df, - reservoir_usgs_param_df - ) = _create_reservoir_df( - data_assimilation_parameters, - reservoir_da_parameters, - streamflow_da_parameters, - run_parameters, - network, - da_run, - lake_gage_crosswalk = network.usgs_lake_gage_crosswalk, - res_source = 'usgs') - else: - reservoir_usgs_df = pd.DataFrame() - reservoir_usgs_param_df = pd.DataFrame() - - if usace_persistence: - ( - reservoir_usace_df, - reservoir_usace_param_df - ) = _create_reservoir_df( - data_assimilation_parameters, - reservoir_da_parameters, - streamflow_da_parameters, - run_parameters, - network, - da_run, - lake_gage_crosswalk = network.usace_lake_gage_crosswalk, - res_source = 'usace') - else: - reservoir_usace_df = pd.DataFrame() - reservoir_usace_param_df = pd.DataFrame() - - self._reservoir_usgs_df = reservoir_usgs_df - self._reservoir_usgs_param_df = reservoir_usgs_param_df - self._reservoir_usace_df = reservoir_usace_df - self._reservoir_usace_param_df = reservoir_usace_param_df - - # Trim the time-extent of the streamflow_da usgs_df - # what happens if there are timeslice files missing on the front-end? - # if the first column is some timestamp greater than t0, then this will throw - # an error. Need to think through this more. - if not self._usgs_df.empty: - self._usgs_df = self._usgs_df.loc[:,network.t0:] - - def update_after_compute(self, run_results, data_assimilation_parameters, run_parameters,): - ''' - - ''' - # get reservoir DA initial parameters for next loop itteration - self._reservoir_usgs_param_df, self._reservoir_usace_param_df = _set_reservoir_da_params(run_results) - - streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) - - if streamflow_da_parameters: - if streamflow_da_parameters.get('streamflow_nudging', False): - self._last_obs_df = new_lastobs(run_results, run_parameters.get("dt") * run_parameters.get("nts")) - - def update_for_next_loop( - self, - data_assimilation_parameters, - run_parameters, - network, - da_run - ): - ''' - Function to update data assimilation object for the next loop iteration. - - Arguments: - ---------- - - run_results (list): output from the compute kernel sequence, - organized (because that is how it comes - out of the kernel) by network. - For each item in the result, there are - seven elements, the fifth (usgs) and sixth - (usace) of which are lists of five elements - containing: 1) a list of the segments ids - where data assimilation was performed (if any) - in that network; 2) a list of the lupdate time; - 3) a list of the previously persisted outflow; - 4) a list of the persistence index; 5) a list - of the persistence update time. - - data_assimilation_parameters (dict): user input data re data assimilation - - run_parameters (dict): user input data re subset of compute configuration - - network (Object): network object created from abstract class - - da_run (list): list of data assimilation files separated - by for loop chunks - - Returns: - -------- - - data_assimilation (Object): Object containing all data assimilation information - - usgs_df (DataFrame): dataframe of USGS gage observations - - lastobs_df (DataFrame): Last gage observations data for DA - - reservoir_usgs_df (DataFrame): USGS reservoir observations - - reservoir_usgs_param_df (DataFrame): USGS reservoir DA parameters - - reservoir_usace_df (DataFrame): USACE reservoir observations - - reservoir_usace_param_df (DataFrame): USACE reservoir DA parameters - ''' - # update usgs_df if it is not empty - streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) - reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', None) - - if streamflow_da_parameters.get('streamflow_nudging', False): - self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) - - if reservoir_da_parameters.get('reservoir_persistence_usgs', False): - - gage_lake_df = ( - network.usgs_lake_gage_crosswalk. - reset_index(). - set_index(['usgs_gage_id']) # <- TODO use input parameter for this - ) - - # build dataframe that crosswalks gageIDs to segmentIDs - gage_link_df = ( - network.link_gage_df['gages']. - reset_index(). - set_index(['gages']) - ) - - # build dataframe that crosswalks segmentIDs to lakeIDs - link_lake_df = ( - gage_lake_df. - join(gage_link_df, how = 'inner'). - reset_index().set_index('link'). - drop(['index'], axis = 1) - ) - - # resample `usgs_df` to 15 minute intervals - usgs_df_15min = ( - self._usgs_df. - transpose(). - resample('15min').asfreq(). - transpose() - ) - - # subset and re-index `usgs_df`, using the segID <> lakeID crosswalk - self._reservoir_usgs_df = ( - usgs_df_15min.join(link_lake_df, how = 'inner'). - reset_index(). - set_index('usgs_lake_id'). - drop(['index'], axis = 1) - ) - - elif reservoir_da_parameters.get('reservoir_persistence_usgs', False): - ( - self._reservoir_usgs_df, - _, - ) = _create_reservoir_df( - data_assimilation_parameters, - reservoir_da_parameters, - streamflow_da_parameters, - run_parameters, - network, - da_run, - lake_gage_crosswalk = network.usgs_lake_gage_crosswalk, - res_source = 'usgs') - - # USACE - if reservoir_da_parameters.get('reservoir_persistence_usace', False): - - ( - self._reservoir_usace_df, - _, - ) = _create_reservoir_df( - data_assimilation_parameters, - reservoir_da_parameters, - streamflow_da_parameters, - run_parameters, - network, - da_run, - lake_gage_crosswalk = network.usace_lake_gage_crosswalk, - res_source = 'usace') - - # if there are no TimeSlice files available for hybrid reservoir DA in the next loop, - # but there are DA parameters from the previous loop, then create a - # dummy observations df. This allows the reservoir persistence to continue across loops. - # USGS Reservoirs - if not network.waterbody_types_dataframe.empty: - if 2 in network.waterbody_types_dataframe['reservoir_type'].unique(): - if self.reservoir_usgs_df.empty and len(self.reservoir_usgs_param_df.index) > 0: - self._reservoir_usgs_df = pd.DataFrame( - data = np.nan, - index = self.reservoir_usgs_param_df.index, - columns = [network.t0] - ) - - # USACE Reservoirs - if 3 in network.waterbody_types_dataframe['reservoir_type'].unique(): - if self.reservoir_usace_df.empty and len(self.reservoir_usace_param_df.index) > 0: - self._reservoir_usace_df = pd.DataFrame( - data = np.nan, - index = self.reservoir_usace_param_df.index, - columns = [network.t0] - ) - - ''' - # update RFC lookback hours if there are RFC-type reservoirs in the simulation domain - if 4 in network._waterbody_types_df['reservoir_type'].unique(): - waterbody_parameters = update_lookback_hours(run_parameters.get("dt"), run_parameters.get("nts"), waterbody_parameters) - ''' - - # Trim the time-extent of the streamflow_da usgs_df - # what happens if there are timeslice files missing on the front-end? - # if the first column is some timestamp greater than t0, then this will throw - # an error. Need to think through this more. - if not self.usgs_df.empty: - self._usgs_df = self.usgs_df.loc[:,network.t0:] - - @property - def assimilation_parameters(self): - return self._da_parameter_dict - - @property - def lastobs_df(self): - return self._last_obs_df - - @property - def usgs_df(self): - return self._usgs_df - - @property - def reservoir_usgs_df(self): - return self._reservoir_usgs_df - - @property - def reservoir_usgs_param_df(self): - return self._reservoir_usgs_param_df - - @property - def reservoir_usace_df(self): - return self._reservoir_usace_df - - @property - def reservoir_usace_param_df(self): - return self._reservoir_usace_param_df From 282d368fea652f748ecaf75452837f955e7e3f9a Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Mon, 6 Mar 2023 18:51:21 +0000 Subject: [PATCH 3/7] add ability to load from BMI --- src/troute-network/troute/DataAssimilation.py | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index e0d8d22e4..95152c832 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -18,7 +18,7 @@ class NudgingDA: """ __slots__ = ["_usgs_df", "_last_obs_df", "_da_parameter_dict"] - def __init__(self, network, wb_gage_ids=None, da_run=[]): + def __init__(self, network, from_files, value_dict, da_run=[]): data_assimilation_parameters = self._data_assimilation_parameters run_parameters = self._run_parameters @@ -43,11 +43,11 @@ def __init__(self, network, wb_gage_ids=None, da_run=[]): # If streamflow nudging is turned on, create lastobs_df and usgs_df: if nudging: - if wb_gage_ids: - self._usgs_df = pd.DataFrame(data=None, - index=wb_gage_ids) - self._last_obs_df = pd.DataFrame(data=None, - index=wb_gage_ids) + if not from_files: + self._usgs_df = pd.DataFrame(data=value_dict['usgs_gage_observation__volume_flow_rate'], + index=value_dict['usgs_gage_ids']) + self._last_obs_df = pd.DataFrame(data=value_dict['lastobs__volume_flow_rate'], + index=value_dict['lastobs_ids']) else: lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None) @@ -101,7 +101,8 @@ def update_after_compute(self, run_results,): def update_for_next_loop(self, network, da_run,): ''' - Function to update data assimilation object for the next loop iteration. + Function to update data assimilation object for the next loop iteration. This is assumed + to not be needed when t-route is run through the BMI. Arguments: ---------- @@ -131,7 +132,7 @@ class PersistenceDA: __slots__ = ["_reservoir_usgs_df", "_reservoir_usgs_param_df", "_reservoir_usace_df", "_reservoir_usace_param_df"] - def __init__(self, network, wb_gage_ids=None, da_run=[]): + def __init__(self, network, from_files, value_dict, da_run=[]): data_assimilation_parameters = self._data_assimilation_parameters run_parameters = self._run_parameters @@ -155,10 +156,10 @@ def __init__(self, network, wb_gage_ids=None, da_run=[]): reservoir_usace_df = pd.DataFrame() reservoir_usace_param_df = pd.DataFrame() - if wb_gage_ids: + if not from_files: if usgs_persistence: - reservoir_usgs_df = pd.DataFrame(data=None, - index=wb_gage_ids) + reservoir_usgs_df = pd.DataFrame(data=value_dict['reservoir_usgs_gage_observation__volume_flow_rate'], + index=value_dict['reservoir_usgs_ids']) # create reservoir hybrid DA initial parameters dataframe if not reservoir_usgs_df.empty: reservoir_usgs_param_df = pd.DataFrame( @@ -171,6 +172,22 @@ def __init__(self, network, wb_gage_ids=None, da_run=[]): reservoir_usgs_param_df['persistence_index'] = 0 else: reservoir_usgs_param_df = pd.DataFrame() + + if usace_persistence: + reservoir_usace_df = pd.DataFrame(data=value_dict['reservoir_usace_gage_observation__volume_flow_rate'], + index=value_dict['reservoir_usace_ids']) + # create reservoir hybrid DA initial parameters dataframe + if not reservoir_usace_df.empty: + reservoir_usace_param_df = pd.DataFrame( + data = 0, + index = reservoir_usace_df.index, + columns = ['update_time'] + ) + reservoir_usace_param_df['prev_persisted_outflow'] = np.nan + reservoir_usace_param_df['persistence_update_time'] = 0 + reservoir_usace_param_df['persistence_index'] = 0 + else: + reservoir_usace_param_df = pd.DataFrame() else: if usgs_persistence: @@ -454,13 +471,13 @@ class DataAssimilation(NudgingDA, PersistenceDA, RFCDA): """ __slots__ = ["_data_assimilation_parameters", "_run_parameters"] - def __init__(self, network, data_assimilation_parameters, run_parameters, wb_gage_ids=None, da_run=[]): + def __init__(self, network, data_assimilation_parameters, run_parameters, from_files=True, value_dict=None, da_run=[]): self._data_assimilation_parameters = data_assimilation_parameters self._run_parameters = run_parameters - NudgingDA.__init__(self, network, wb_gage_ids, da_run) - PersistenceDA.__init__(self, network, wb_gage_ids, da_run) + NudgingDA.__init__(self, network, from_files, value_dict, da_run) + PersistenceDA.__init__(self, network, from_files, value_dict, da_run) RFCDA.__init__(self,) def update_after_compute(self, run_results,): From 2497c0e3f78f86867f9c91835c433596615d3be4 Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Tue, 21 Mar 2023 17:02:25 +0000 Subject: [PATCH 4/7] add reservoir_rfc_da.py to handle RFC data assimilation in python --- .../routing/fast_reach/reservoir_rfc_da.py | 187 ++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py diff --git a/src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py b/src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py new file mode 100644 index 000000000..033fec6c0 --- /dev/null +++ b/src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py @@ -0,0 +1,187 @@ + + + +def _validate_RFC_data(lake_number, time_series, synthetic,): + use_RFC = True + + if all(synthetic)==1: + use_RFC = False + print(f"WARNING: RFC Forecast Time Series discharges for reservoir {lake_number} \ + are all synthetic. \ + This reservoir will use level pool calculations instead.") + elif any(time_series)<0: + use_RFC = False + print(f"WARNING: RFC Forecast Time Series discharges for reservoir {lake_number} \ + contains missing or negative values. \ + This reservoir will use level pool calculations instead.") + elif any(time_series)>=90000: + use_RFC = False + print(f"WARNING: RFC Forecast Time Series discharges for reservoir {lake_number} \ + contain one or more values greater than or equal to 90,000 Cubic Meters per \ + Second (twice the Mississippi River historical peak flow). \ + This reservoir will use level pool calculations instead.") + + return use_RFC + +def reservoir_RFC_da(lake_number, time_series, timeseries_idx, synthetic, routing_period, current_time, + update_time, DA_time_step, rfc_forecast_persist_seconds, reservoir_type, inflow, + water_elevation, levelpool_outflow, levelpool_water_elevation, lake_area, + max_water_elevation): + """ + Perform RFC reservoir data assimilation. + + Arguments + --------- + - lake_number (int): unique lake identification number + + - time_series (array): array of RFC observations/forecasts + for this waterbody + + - timeseries_idx (int): index for tracking when to use the + next obs/forecast + + - synthetic (array): array of same length as time_series + flagging whether time_series value + is real (0) or synthetic (1) + + - routing_period (float): Model timestep (sec) used in the reservoir + and stream routing modules + + - current_time (float): Current model time (sec) + + - update_time (float): time to advance to next obs/forecast, + seconds since model initialization time (t0) + + - DA_time_step (float): time increment to add to update_time after + advancing to next obs/forecast (sec) + + - rfc_forecast_persist_seconds (float): maximum number of seconds that an RFC-supplied + forecast can be used/persisted (sec) + + - reservoir_type (int): integer indicating whether reservoir is + CONUS RFC (4) or Alaska RFC (5) + + - inflow (float): Reservoir inflow from upstream + stream reaches (m3/s) + + - water_elevation (float): water surface elevetion (m) from + previous timestep + + - levelpool_outflow (float): Reservoir outflow rate (m3/s) + calculated by levelpool module + + - levelpool_water_elevation (float): Reservoir water surface elevation (m) + calculated by levelpool module + + - lake area (float): waterbody surface area (m2) + + - max_water_elevation (float): maximum waterbody surface elevation (m) + + Returns + ------- + - outflow (float): Persisted reservoir outflow rate (m3/sec) + + - new_water_elevation (float): modified reservoir water surface + elevation (meters above datum). + + - update_time (float): time to advance to next obs/forecast, + seconds since model initialization time (t0) + + - timeseries_idx (int): index for tracking when to use the + next obs/forecast + + - dynamic_reservoir_type (int): integer indicating whether DA scheme used + was CONUS RFC (4), Alaska RFC (5), or + levelpool (1) + + - assimilated_value (float): value that was assimilated + """ + use_RFC = _validate_RFC_data(lake_number, time_series, synthetic) + + if use_RFC and (routing_period<=3600) and current_time<=rfc_forecast_persist_seconds: + if current_time >= update_time: + # Advance update_time to the next timestep and time_series_idx to next index + update_time += DA_time_step + timeseries_idx += 1 + + # If reservoir_type is 4 for CONUS RFC reservoirs + if reservoir_type==4: + # Set outflow to corresponding discharge from array + outflow = time_series[timeseries_idx] + + # Else reservoir_type 5 for for Alaska RFC glacier outflows + else: + # Set outflow to sum inflow and corresponding discharge from array + outflow = inflow + time_series[timeseries_idx] + + # Update water elevation + new_water_elevation = water_elevation + ((inflow - outflow)/lake_area) * routing_period + + # Ensure that the water elevation is within the minimum and maximum elevation + if new_water_elevation < 0.0: + new_water_elevation = 0.0 + + elif new_water_elevation > max_water_elevation: + new_water_elevation = max_water_elevation + + # Set dynamic_reservoir_type to RFC Forecasts Type + dynamic_reservoir_type = reservoir_type + + # Set the assimilated_value to corresponding discharge from array + assimilated_value = time_series[timeseries_idx] + + # Check for outflows less than 0 and cycle backwards in the array until a + # non-negative value is found. If all previous values are negative, then + # use level pool outflow. + if outflow < 0: + missing_outflow_index = timeseries_idx + + while outflow < 0 and missing_outflow_index > 1: + missing_outflow_index = missing_outflow_index - 1 + outflow = time_series[missing_outflow_index] + + if outflow < 0: + # If reservoir_type is 4 for CONUS RFC reservoirs + if reservoir_type == 4: + outflow = levelpool_outflow + + # Else reservoir_type 5 for for Alaska RFC glacier outflows + else: + outflow = inflow + + # Update water elevation to levelpool water elevation + new_water_elevation = levelpool_water_elevation + + # Set dynamic_reservoir_type to levelpool type + dynamic_reservoir_type = 1 + + # Set the assimilated_value to sentinel, -9999.0 + assimilated_value = -9999.0 + + # Set the assimilated_source_file to empty string + #assimilated_source_file = "" + + else: + # If reservoir_type is 4 for CONUS RFC reservoirs + if reservoir_type == 4: + outflow = levelpool_outflow + + # Else reservoir_type 5 for for Alaska RFC glacier outflows + else: + outflow = inflow + + # Update water elevation to levelpool water elevation + new_water_elevation = levelpool_water_elevation + + # Set dynamic_reservoir_type to levelpool type + dynamic_reservoir_type = 1 + + # Set the assimilated_value to sentinel, -9999.0 + assimilated_value = -9999.0 + + # Set the assimilated_source_file to empty string + #assimilated_source_file = "" + + return outflow, new_water_elevation, update_time, timeseries_idx, dynamic_reservoir_type, assimilated_value#, assimilated_source_file + + From ae742ac8770cb6492756007dcca6628585b2104a Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Tue, 21 Mar 2023 20:21:22 +0000 Subject: [PATCH 5/7] initial updates to perform RFC DA with python --- src/troute-network/troute/DataAssimilation.py | 49 ++++++++++++++----- src/troute-nwm/src/nwm_routing/__main__.py | 11 +++-- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index 95152c832..4e2ef3a80 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -44,10 +44,8 @@ def __init__(self, network, from_files, value_dict, da_run=[]): # If streamflow nudging is turned on, create lastobs_df and usgs_df: if nudging: if not from_files: - self._usgs_df = pd.DataFrame(data=value_dict['usgs_gage_observation__volume_flow_rate'], - index=value_dict['usgs_gage_ids']) - self._last_obs_df = pd.DataFrame(data=value_dict['lastobs__volume_flow_rate'], - index=value_dict['lastobs_ids']) + self._usgs_df = pd.DataFrame(index=value_dict['usgs_gage_ids']) + self._last_obs_df = pd.DataFrame(index=value_dict['lastobs_ids']) else: lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None) @@ -158,8 +156,7 @@ def __init__(self, network, from_files, value_dict, da_run=[]): if not from_files: if usgs_persistence: - reservoir_usgs_df = pd.DataFrame(data=value_dict['reservoir_usgs_gage_observation__volume_flow_rate'], - index=value_dict['reservoir_usgs_ids']) + reservoir_usgs_df = pd.DataFrame(index=value_dict['reservoir_usgs_ids']) # create reservoir hybrid DA initial parameters dataframe if not reservoir_usgs_df.empty: reservoir_usgs_param_df = pd.DataFrame( @@ -174,8 +171,7 @@ def __init__(self, network, from_files, value_dict, da_run=[]): reservoir_usgs_param_df = pd.DataFrame() if usace_persistence: - reservoir_usace_df = pd.DataFrame(data=value_dict['reservoir_usace_gage_observation__volume_flow_rate'], - index=value_dict['reservoir_usace_ids']) + reservoir_usace_df = pd.DataFrame(index=value_dict['reservoir_usace_ids']) # create reservoir hybrid DA initial parameters dataframe if not reservoir_usace_df.empty: reservoir_usace_param_df = pd.DataFrame( @@ -451,8 +447,33 @@ class RFCDA: """ """ - def __init__(self,): - pass + def __init__(self, from_files, value_dict): + if from_files: + pass + else: + rfc = self._waterbody_parameters.get('rfc', {}) + reservoir_rfc_forecasts = rfc.get('reservoir_rfc_forecasts', None) + + #-------------------------------------------------------------------------------- + # Assemble Reservoir dataframes + #-------------------------------------------------------------------------------- + if reservoir_rfc_forecasts: + reservoir_rfc_df = pd.DataFrame(index=value_dict['reservoir_rfc_ids']) + reservoir_rfc_synthetic = pd.DataFrame(index=value_dict['reservoir_rfc_ids']) + # create reservoir RFC DA initial parameters dataframe + if not reservoir_rfc_df.empty: + reservoir_rfc_param_df = pd.DataFrame( + data = value_dict['da_idx'], + index = reservoir_rfc_df.index, + columns = ['timeseries_index'] + ) + reservoir_rfc_param_df['update_time'] = 0 + else: + reservoir_rfc_df = pd.DataFrame() + + self._reservoir_rfc_df = reservoir_rfc_df + self._reservoir_rfc_synthetic = reservoir_rfc_synthetic + self._reservoir_rfc_param_df = reservoir_rfc_param_df def update_after_compute(self,): pass @@ -469,16 +490,18 @@ class DataAssimilation(NudgingDA, PersistenceDA, RFCDA): """ """ - __slots__ = ["_data_assimilation_parameters", "_run_parameters"] + __slots__ = ["_data_assimilation_parameters", "_run_parameters", "_waterbody_parameters"] - def __init__(self, network, data_assimilation_parameters, run_parameters, from_files=True, value_dict=None, da_run=[]): + def __init__(self, network, data_assimilation_parameters, run_parameters, waterbody_parameters, + from_files=True, value_dict=None, da_run=[]): self._data_assimilation_parameters = data_assimilation_parameters self._run_parameters = run_parameters + self._waterbody_parameters = waterbody_parameters NudgingDA.__init__(self, network, from_files, value_dict, da_run) PersistenceDA.__init__(self, network, from_files, value_dict, da_run) - RFCDA.__init__(self,) + RFCDA.__init__(self, from_files, value_dict) def update_after_compute(self, run_results,): ''' diff --git a/src/troute-nwm/src/nwm_routing/__main__.py b/src/troute-nwm/src/nwm_routing/__main__.py index 6d646864d..385b7d115 100644 --- a/src/troute-nwm/src/nwm_routing/__main__.py +++ b/src/troute-nwm/src/nwm_routing/__main__.py @@ -9,7 +9,7 @@ from troute.NHDNetwork import NHDNetwork from troute.HYFeaturesNetwork import HYFeaturesNetwork -from troute.DataAssimilation import AllDA +from troute.DataAssimilation import DataAssimilation import numpy as np import pandas as pd @@ -118,13 +118,14 @@ def main_v04(argv): network.assemble_forcings(run_sets[0],) # Create data assimilation object from da_sets for first loop iteration - # TODO: Add data_assimilation for hyfeature network - data_assimilation = AllDA( + data_assimilation = DataAssimilation( + network, data_assimilation_parameters, run_parameters, waterbody_parameters, - network, - da_sets[0] + from_files=True, + value_dict=None, + da_run=da_sets[0], ) if showtiming: From 2cb993c9b5b43cb6d8a94962a9d34462dacd6ad3 Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Wed, 22 Mar 2023 17:44:09 +0000 Subject: [PATCH 6/7] edits to fix multiple inheritance errors --- src/troute-network/troute/DataAssimilation.py | 36 +++++++++++++------ src/troute-nwm/src/nwm_routing/__main__.py | 8 +---- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index 4e2ef3a80..49a0ce65c 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -4,6 +4,25 @@ import pathlib import xarray as xr from datetime import datetime +from abc import ABC, abstractmethod + + +# ----------------------------------------------------------------------------- +# Abstract DA Class: +# Define all slots and pass function definitions to child classes +# ----------------------------------------------------------------------------- +class AbstractDA(ABC): + """ + This just defines all of the slots that might be used by child classes. + These need to be defined in a parent class so each child class can be + combined into a single DataAssimilation object without getting a + 'multiple-inheritance' error. + """ + __slots__ = ["_usgs_df", "_last_obs_df", "_da_parameter_dict", + "_reservoir_usgs_df", "_reservoir_usgs_param_df", + "_reservoir_usace_df", "_reservoir_usace_param_df", + "_reservoir_rfc_df", "_reservoir_rfc_synthetic", + "_reservoir_rfc_param_df"] # ----------------------------------------------------------------------------- @@ -12,12 +31,10 @@ # 2. PersistenceDA: USGS and USACE reservoir persistence # 3. RFCDA: RFC forecasts for reservoirs # ----------------------------------------------------------------------------- -class NudgingDA: +class NudgingDA(AbstractDA): """ """ - __slots__ = ["_usgs_df", "_last_obs_df", "_da_parameter_dict"] - def __init__(self, network, from_files, value_dict, da_run=[]): data_assimilation_parameters = self._data_assimilation_parameters @@ -123,13 +140,10 @@ def update_for_next_loop(self, network, da_run,): self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) -class PersistenceDA: +class PersistenceDA(AbstractDA): """ """ - __slots__ = ["_reservoir_usgs_df", "_reservoir_usgs_param_df", - "_reservoir_usace_df", "_reservoir_usace_param_df"] - def __init__(self, network, from_files, value_dict, da_run=[]): data_assimilation_parameters = self._data_assimilation_parameters @@ -443,7 +457,7 @@ def update_for_next_loop(self, network, da_run,): if not self.usgs_df.empty: self._usgs_df = self.usgs_df.loc[:,network.t0:] -class RFCDA: +class RFCDA(AbstractDA): """ """ @@ -471,9 +485,9 @@ def __init__(self, from_files, value_dict): else: reservoir_rfc_df = pd.DataFrame() - self._reservoir_rfc_df = reservoir_rfc_df - self._reservoir_rfc_synthetic = reservoir_rfc_synthetic - self._reservoir_rfc_param_df = reservoir_rfc_param_df + self._reservoir_rfc_df = reservoir_rfc_df + self._reservoir_rfc_synthetic = reservoir_rfc_synthetic + self._reservoir_rfc_param_df = reservoir_rfc_param_df def update_after_compute(self,): pass diff --git a/src/troute-nwm/src/nwm_routing/__main__.py b/src/troute-nwm/src/nwm_routing/__main__.py index 385b7d115..ddad453e7 100644 --- a/src/troute-nwm/src/nwm_routing/__main__.py +++ b/src/troute-nwm/src/nwm_routing/__main__.py @@ -208,11 +208,7 @@ def main_v04(argv): network.update_waterbody_water_elevation() # update reservoir parameters and lastobs_df - data_assimilation.update_after_compute( - run_results, - data_assimilation_parameters, - run_parameters, - ) + data_assimilation.update_after_compute(run_results) # TODO move the conditional call to write_lite_restart to nwm_output_generator. if "lite_restart" in output_parameters: @@ -233,8 +229,6 @@ def main_v04(argv): # get reservoir DA initial parameters for next loop iteration data_assimilation.update_for_next_loop( - data_assimilation_parameters, - run_parameters, network, da_sets[run_set_iterator + 1]) From 76b674c4c6249618771b000427ce9e10d5c98feb Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Thu, 23 Mar 2023 16:19:39 +0000 Subject: [PATCH 7/7] remove rfc_da python file, update DataAssimilation objects to reflect this --- src/troute-network/troute/DataAssimilation.py | 34 +--- .../routing/fast_reach/reservoir_rfc_da.py | 187 ------------------ 2 files changed, 3 insertions(+), 218 deletions(-) delete mode 100644 src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index 49a0ce65c..8d28da4e7 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -4,7 +4,7 @@ import pathlib import xarray as xr from datetime import datetime -from abc import ABC, abstractmethod +from abc import ABC # ----------------------------------------------------------------------------- @@ -13,7 +13,7 @@ # ----------------------------------------------------------------------------- class AbstractDA(ABC): """ - This just defines all of the slots that might be used by child classes. + This just defines all of the slots that are be used by child classes. These need to be defined in a parent class so each child class can be combined into a single DataAssimilation object without getting a 'multiple-inheritance' error. @@ -444,12 +444,6 @@ def update_for_next_loop(self, network, da_run,): columns = [network.t0] ) - ''' - # update RFC lookback hours if there are RFC-type reservoirs in the simulation domain - if 4 in network._waterbody_types_df['reservoir_type'].unique(): - waterbody_parameters = update_lookback_hours(run_parameters.get("dt"), run_parameters.get("nts"), waterbody_parameters) - ''' - # Trim the time-extent of the streamflow_da usgs_df # what happens if there are timeslice files missing on the front-end? # if the first column is some timestamp greater than t0, then this will throw @@ -465,29 +459,7 @@ def __init__(self, from_files, value_dict): if from_files: pass else: - rfc = self._waterbody_parameters.get('rfc', {}) - reservoir_rfc_forecasts = rfc.get('reservoir_rfc_forecasts', None) - - #-------------------------------------------------------------------------------- - # Assemble Reservoir dataframes - #-------------------------------------------------------------------------------- - if reservoir_rfc_forecasts: - reservoir_rfc_df = pd.DataFrame(index=value_dict['reservoir_rfc_ids']) - reservoir_rfc_synthetic = pd.DataFrame(index=value_dict['reservoir_rfc_ids']) - # create reservoir RFC DA initial parameters dataframe - if not reservoir_rfc_df.empty: - reservoir_rfc_param_df = pd.DataFrame( - data = value_dict['da_idx'], - index = reservoir_rfc_df.index, - columns = ['timeseries_index'] - ) - reservoir_rfc_param_df['update_time'] = 0 - else: - reservoir_rfc_df = pd.DataFrame() - - self._reservoir_rfc_df = reservoir_rfc_df - self._reservoir_rfc_synthetic = reservoir_rfc_synthetic - self._reservoir_rfc_param_df = reservoir_rfc_param_df + pass def update_after_compute(self,): pass diff --git a/src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py b/src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py deleted file mode 100644 index 033fec6c0..000000000 --- a/src/troute-routing/troute/routing/fast_reach/reservoir_rfc_da.py +++ /dev/null @@ -1,187 +0,0 @@ - - - -def _validate_RFC_data(lake_number, time_series, synthetic,): - use_RFC = True - - if all(synthetic)==1: - use_RFC = False - print(f"WARNING: RFC Forecast Time Series discharges for reservoir {lake_number} \ - are all synthetic. \ - This reservoir will use level pool calculations instead.") - elif any(time_series)<0: - use_RFC = False - print(f"WARNING: RFC Forecast Time Series discharges for reservoir {lake_number} \ - contains missing or negative values. \ - This reservoir will use level pool calculations instead.") - elif any(time_series)>=90000: - use_RFC = False - print(f"WARNING: RFC Forecast Time Series discharges for reservoir {lake_number} \ - contain one or more values greater than or equal to 90,000 Cubic Meters per \ - Second (twice the Mississippi River historical peak flow). \ - This reservoir will use level pool calculations instead.") - - return use_RFC - -def reservoir_RFC_da(lake_number, time_series, timeseries_idx, synthetic, routing_period, current_time, - update_time, DA_time_step, rfc_forecast_persist_seconds, reservoir_type, inflow, - water_elevation, levelpool_outflow, levelpool_water_elevation, lake_area, - max_water_elevation): - """ - Perform RFC reservoir data assimilation. - - Arguments - --------- - - lake_number (int): unique lake identification number - - - time_series (array): array of RFC observations/forecasts - for this waterbody - - - timeseries_idx (int): index for tracking when to use the - next obs/forecast - - - synthetic (array): array of same length as time_series - flagging whether time_series value - is real (0) or synthetic (1) - - - routing_period (float): Model timestep (sec) used in the reservoir - and stream routing modules - - - current_time (float): Current model time (sec) - - - update_time (float): time to advance to next obs/forecast, - seconds since model initialization time (t0) - - - DA_time_step (float): time increment to add to update_time after - advancing to next obs/forecast (sec) - - - rfc_forecast_persist_seconds (float): maximum number of seconds that an RFC-supplied - forecast can be used/persisted (sec) - - - reservoir_type (int): integer indicating whether reservoir is - CONUS RFC (4) or Alaska RFC (5) - - - inflow (float): Reservoir inflow from upstream - stream reaches (m3/s) - - - water_elevation (float): water surface elevetion (m) from - previous timestep - - - levelpool_outflow (float): Reservoir outflow rate (m3/s) - calculated by levelpool module - - - levelpool_water_elevation (float): Reservoir water surface elevation (m) - calculated by levelpool module - - - lake area (float): waterbody surface area (m2) - - - max_water_elevation (float): maximum waterbody surface elevation (m) - - Returns - ------- - - outflow (float): Persisted reservoir outflow rate (m3/sec) - - - new_water_elevation (float): modified reservoir water surface - elevation (meters above datum). - - - update_time (float): time to advance to next obs/forecast, - seconds since model initialization time (t0) - - - timeseries_idx (int): index for tracking when to use the - next obs/forecast - - - dynamic_reservoir_type (int): integer indicating whether DA scheme used - was CONUS RFC (4), Alaska RFC (5), or - levelpool (1) - - - assimilated_value (float): value that was assimilated - """ - use_RFC = _validate_RFC_data(lake_number, time_series, synthetic) - - if use_RFC and (routing_period<=3600) and current_time<=rfc_forecast_persist_seconds: - if current_time >= update_time: - # Advance update_time to the next timestep and time_series_idx to next index - update_time += DA_time_step - timeseries_idx += 1 - - # If reservoir_type is 4 for CONUS RFC reservoirs - if reservoir_type==4: - # Set outflow to corresponding discharge from array - outflow = time_series[timeseries_idx] - - # Else reservoir_type 5 for for Alaska RFC glacier outflows - else: - # Set outflow to sum inflow and corresponding discharge from array - outflow = inflow + time_series[timeseries_idx] - - # Update water elevation - new_water_elevation = water_elevation + ((inflow - outflow)/lake_area) * routing_period - - # Ensure that the water elevation is within the minimum and maximum elevation - if new_water_elevation < 0.0: - new_water_elevation = 0.0 - - elif new_water_elevation > max_water_elevation: - new_water_elevation = max_water_elevation - - # Set dynamic_reservoir_type to RFC Forecasts Type - dynamic_reservoir_type = reservoir_type - - # Set the assimilated_value to corresponding discharge from array - assimilated_value = time_series[timeseries_idx] - - # Check for outflows less than 0 and cycle backwards in the array until a - # non-negative value is found. If all previous values are negative, then - # use level pool outflow. - if outflow < 0: - missing_outflow_index = timeseries_idx - - while outflow < 0 and missing_outflow_index > 1: - missing_outflow_index = missing_outflow_index - 1 - outflow = time_series[missing_outflow_index] - - if outflow < 0: - # If reservoir_type is 4 for CONUS RFC reservoirs - if reservoir_type == 4: - outflow = levelpool_outflow - - # Else reservoir_type 5 for for Alaska RFC glacier outflows - else: - outflow = inflow - - # Update water elevation to levelpool water elevation - new_water_elevation = levelpool_water_elevation - - # Set dynamic_reservoir_type to levelpool type - dynamic_reservoir_type = 1 - - # Set the assimilated_value to sentinel, -9999.0 - assimilated_value = -9999.0 - - # Set the assimilated_source_file to empty string - #assimilated_source_file = "" - - else: - # If reservoir_type is 4 for CONUS RFC reservoirs - if reservoir_type == 4: - outflow = levelpool_outflow - - # Else reservoir_type 5 for for Alaska RFC glacier outflows - else: - outflow = inflow - - # Update water elevation to levelpool water elevation - new_water_elevation = levelpool_water_elevation - - # Set dynamic_reservoir_type to levelpool type - dynamic_reservoir_type = 1 - - # Set the assimilated_value to sentinel, -9999.0 - assimilated_value = -9999.0 - - # Set the assimilated_source_file to empty string - #assimilated_source_file = "" - - return outflow, new_water_elevation, update_time, timeseries_idx, dynamic_reservoir_type, assimilated_value#, assimilated_source_file - -