Skip to content

Commit

Permalink
Corrected accidental deletion of code from PR688
Browse files Browse the repository at this point in the history
  • Loading branch information
JurgenZach-NOAA committed Nov 22, 2023
1 parent 1b5511b commit 9bcdcba
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 206 deletions.
200 changes: 0 additions & 200 deletions src/bmi_array2df.py

This file was deleted.

6 changes: 3 additions & 3 deletions src/model_DAforcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import glob
import pathlib

from bmi_array2df import *
import bmi_array2df as a2df
#from bmi_array2df import *
#import bmi_array2df as a2df

from bmi_df2array import *
#from bmi_df2array import *
import bmi_df2array as df2a

from troute.routing.fast_reach.reservoir_RFC_da import _validate_RFC_data
Expand Down
96 changes: 93 additions & 3 deletions src/troute-network/troute/DataAssimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

from troute.routing.fast_reach.reservoir_RFC_da import _validate_RFC_data

from bmi_array2df import *
import bmi_array2df as a2df
#from bmi_array2df import *
#import bmi_array2df as a2df

from . import bmi_array2df as a2df

# set legacy run flag: option to pass data frames through BMI formalism
# not to be used in regular BMI runs any longer, only for debugging
Expand Down Expand Up @@ -1626,4 +1628,92 @@ def _rfc_timeseries_qcqa(discharge,stationId,synthetic,totalCounts,timestamp,tim
use_rfc_df, on='stationId').
set_index('stationId'))

return rfc_df, rfc_param_df
return rfc_df, rfc_param_df


def _read_timeseries_files(filepath, timeseries_dates, t0, final_persist_datetime):
# Search for most recent RFC timseries file based on offset hours and lookback window
# for each location.
files = glob.glob(filepath + '/*')
# create temporary dataframe with file names, split up by location and datetime
df = pd.DataFrame([f.split('/')[-1].split('.') for f in files], columns=['Datetime','dt','ID','rfc','ext'])
df = df[df['Datetime'].isin(timeseries_dates)][['ID','Datetime']]
# For each location, find the most recent timeseries file (within timeseries window calculated a priori)
df['Datetime'] = df['Datetime'].apply(lambda _: datetime.strptime(_, '%Y-%m-%d_%H'))
df = df.groupby('ID').max().reset_index()
df['Datetime'] = df['Datetime'].dt.strftime('%Y-%m-%d_%H')

# Loop through list of timeseries files and store relevent information in dataframe.
file_list = (df['Datetime'] + '.60min.' + df['ID'] + '.RFCTimeSeries.ncdf').tolist()
rfc_df = pd.DataFrame()
for f in file_list:
ds = xr.open_dataset(filepath + '/' + f)
sliceStartTime = datetime.strptime(ds.attrs.get('sliceStartTimeUTC'), '%Y-%m-%d_%H:%M:%S')
sliceTimeResolutionMinutes = ds.attrs.get('sliceTimeResolutionMinutes')
df = ds.to_dataframe().reset_index().sort_values('forecastInd')[['stationId','discharges','synthetic_values','totalCounts','timeSteps']]
df['Datetime'] = pd.date_range(sliceStartTime, periods=df.shape[0], freq=sliceTimeResolutionMinutes+'T')
# Filter out forecasts that go beyond the rfc_persist_days parameter. This isn't necessary, but removes
# excess data, keeping the dataframe of observations as small as possible.
df = df[df['Datetime']<final_persist_datetime]
# Locate where t0 is in the timeseries
df['timeseries_idx'] = df.index[df.Datetime == t0][0]
df['file'] = f

# Validate data to determine whether or not it will be used.
use_rfc = _validate_RFC_data(
df['stationId'][0],
df.discharges,
df.synthetic_values,
filepath,
f,
300, #NOTE: this is t-route's default timestep. This will need to be verifiied again within t-route...
False
)
df['use_rfc'] = use_rfc
df['da_timestep'] = int(sliceTimeResolutionMinutes)*60

rfc_df = pd.concat([rfc_df, df])
rfc_df['stationId'] = rfc_df['stationId'].str.decode('utf-8').str.strip()
return rfc_df

def assemble_rfc_dataframes(rfc_timeseries_df, rfc_lake_gage_crosswalk, t0, rfc_parameters):
# Retrieve rfc timeseries dataframe from BMI dictionary
rfc_df = rfc_timeseries_df
# Create reservoir_rfc_df dataframe of observations, rows are locations and columns are dates.
reservoir_rfc_df = rfc_df[['stationId','discharges','Datetime']].sort_values(['stationId','Datetime']).pivot(index='stationId',columns='Datetime').fillna(-999.0)
reservoir_rfc_df.columns = reservoir_rfc_df.columns.droplevel()
# Replace gage IDs with lake IDs
reservoir_rfc_df = (
rfc_lake_gage_crosswalk.
reset_index().
set_index('rfc_gage_id').
join(reservoir_rfc_df).
set_index('rfc_lake_id')
)
# Create reservoir_rfc_df dataframe of parameters
reservoir_rfc_param_df = rfc_df[['stationId','totalCounts','timeseries_idx','file','use_rfc','da_timestep']].drop_duplicates().set_index('stationId')
reservoir_rfc_param_df = (
rfc_lake_gage_crosswalk.
reset_index().
set_index('rfc_gage_id').
join(reservoir_rfc_param_df).
set_index('rfc_lake_id')
)
# To pass RFC observations to mc_reach they need to be in an array. But the RFC timeseries have different
# lengths/start dates for each location so the array ends up having many NaN observations after we
# pivot the dataframe from long to wide format. We therefore need to adjust the timeseries index and
# total counts to reflect the new position of t0 in the observation array.
new_timeseries_idx = reservoir_rfc_df.columns.get_loc(t0) - 1 #minus 1 so on first call of reservoir_rfc_da(), timeseries_idx will advance 1 position to t0.
reservoir_rfc_param_df['totalCounts'] = reservoir_rfc_param_df['totalCounts'] + (new_timeseries_idx - reservoir_rfc_param_df['timeseries_idx'])
reservoir_rfc_param_df['timeseries_idx'] = new_timeseries_idx
# Fill in NaNs with default values.
reservoir_rfc_param_df['use_rfc'].fillna(False, inplace=True)
reservoir_rfc_param_df['totalCounts'].fillna(0, inplace=True)
reservoir_rfc_param_df['da_timestep'].fillna(0, inplace=True)
# Make sure columns are the correct types
reservoir_rfc_param_df['totalCounts'] = reservoir_rfc_param_df['totalCounts'].astype(int)
reservoir_rfc_param_df['da_timestep'] = reservoir_rfc_param_df['da_timestep'].astype(int)
reservoir_rfc_param_df['update_time'] = 0
reservoir_rfc_param_df['rfc_persist_days'] = rfc_parameters.get('reservoir_rfc_forecast_persist_days', 11)

return reservoir_rfc_df, reservoir_rfc_param_df

0 comments on commit 9bcdcba

Please sign in to comment.