Skip to content

fier module

fierpy.fier

find_fits(reof_ds, q_df, stack, train_size=0.7, random_state=0)

Function to fit multiple polynomial curves on different temporal modes and test results

Source code in fierpy/fier.py
def find_fits(reof_ds: xr.Dataset, q_df: xr.DataArray, stack: xr.DataArray, train_size: float = 0.7, random_state: int = 0, ):
    """Function to fit multiple polynomial curves on different temporal modes and test results

    """        

    X = q_df
    y = reof_ds.temporal_modes

    # ---- Randomly split data into 2 groups -----
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=train_size, random_state=random_state)

    logger.debug(X_train)
    logger.debug(X_test)

    spatial_test = stack.where(stack.time.isin(y_test.time),drop=True)

    shape3d = spatial_test.shape
    spatial_shape = shape3d[1:]
    shape2d = (shape3d[0],np.prod(spatial_shape))

    spatial_test_flat = xr.DataArray(
        spatial_test.values.reshape(shape2d),
        coords = [spatial_test.time,np.arange(shape2d[1])],
        dims=['time','space']
    )

    non_masked_idx= np.where(np.logical_not(np.isnan(spatial_test_flat[0,:])))[0]

    modes = reof_ds.mode.values

    fit_dict = dict()
    dict_keys = ['fit_r2','pred_r','pred_rmse']

    for mode in modes:

        logger.debug(mode)
        y_train_mode = y_train.sel(mode=mode)
        y_test_mode = y_test.sel(mode=mode)

        for order in range(1,4):

            # apply polynomial fitting
            c = np.polyfit(X_train,y_train_mode,deg=order)
            f = np.poly1d(c)

            y_pred = f(X_test)

            synth_test = synthesize(reof_ds,X_test,f,mode=mode)                      


            synth_test_flat = xr.DataArray(
                synth_test.values.reshape(shape2d),
                coords = [synth_test.time,np.arange(shape2d[1])],
                dims=['time','space']
            )

            # calculate statistics
            # calculate the stats of fitting on a test subsample
            temporal_r2 = metrics.r2_score(y_test_mode, y_pred)

            temporal_r = -999 if temporal_r2 < 0 else np.sqrt(temporal_r2)

            # check the synthesis stats comapared to observed data
            space_r2 = metrics.r2_score(
                spatial_test_flat[:,non_masked_idx],
                synth_test_flat[:,non_masked_idx],
            )
            space_r= -999 if space_r2 < 0 else np.sqrt(space_r2)

            space_rmse = metrics.mean_squared_error(
                spatial_test_flat[:,non_masked_idx],
                synth_test_flat[:,non_masked_idx],
                squared=False
            )

            # pack the resulting statistics in dictionary for the loop
            #stats = [temporal_r2,space_r,space_rmse]
            stats = [temporal_r2, temporal_r, space_rmse]
            loop_dict = {f"mode{mode}_order{order}_{k}":stats[i] for i,k in enumerate(dict_keys)}
            loop_dict[f"mode{mode}_order{order}_coeffs"] = c
            logging.debug(loop_dict)
            # merge the loop dictionary with the larger one
            fit_dict = {**fit_dict,**loop_dict}

    return fit_dict

fits_to_files(fit_dict, out_dir)

Procedure to save coeffient arrays stored in dictionary output from find_fits() to npy files

Parameters:

Name Type Description Default
fit_dict dict

output from function find_fits()

required
out_dir str

directory to save coeffients to

required
Source code in fierpy/fier.py
def fits_to_files(fit_dict: dict,out_dir: str):
    """Procedure to save coeffient arrays stored in dictionary output from `find_fits()` to npy files

    args:
        fit_dict (dict): output from function `find_fits()`
        out_dir (str): directory to save coeffients to
    """

    out_dir = Path(out_dir)

    if not out_dir.exists():
        out_dir.mkdir(parents=True)

    for k,v in fit_dict.items():
        if k.endswith("coeffs"):
            components = k.split("_")
            name_stem = f"poly_{k.replace('_coeffs','')}"
            coeff_file = out_dir / f"{name_stem}.npy"
            np.save(str(coeff_file), v)

    return

get_streamflow(lat, lon)

Function to get histroical streamflow data from the GeoGLOWS server based on geographic coordinates

Parameters:

Name Type Description Default
lat float

latitude value where to get streamflow data

required
lon float

longitude value where to get streamflow data

required

Returns:

Type Description
DataArray

xr.DataArray: DataArray object of streamflow with datetime coordinates

Source code in fierpy/fier.py
def get_streamflow(lat: float, lon: float) -> xr.DataArray:
    """Function to get histroical streamflow data from the GeoGLOWS server
    based on geographic coordinates

    args:
        lat (float): latitude value where to get streamflow data
        lon (float): longitude value where to get streamflow data

    returns:
        xr.DataArray: DataArray object of streamflow with datetime coordinates
    """
    # ??? pass lat lon or do it by basin ???
    reach = streamflow.latlon_to_reach(lat,lon)
    # send request for the streamflow data
    q = streamflow.historic_simulation(reach['reach_id'])

    # rename column name to something not as verbose as 'streamflow_m^3/s'
    q.columns = ["discharge"]

    # rename index and drop the timezone value
    q.index.name = "time"
    q.index = q.index.tz_localize(None)

    # return the series as a xr.DataArray
    return q.discharge.to_xarray()

match_dates(original, matching)

Helper function to filter a DataArray from that match the data values of another. Expects that each xarray object has a dimesion named 'time'

Parameters:

Name Type Description Default
original DataArray

original DataArray with time dimension to select from

required
matching DataArray

DataArray with time dimension to compare against

required

Returns:

Type Description
DataArray

xr.DataArray: DataArray with values that have been temporally matched

Source code in fierpy/fier.py
def match_dates(original: xr.DataArray, matching: xr.DataArray) -> xr.DataArray:
    """Helper function to filter a DataArray from that match the data values of another.
    Expects that each xarray object has a dimesion named 'time'

    args:
        original (xr.DataArray): original DataArray with time dimension to select from
        matching (xr.DataArray): DataArray with time dimension to compare against

    returns:
        xr.DataArray: DataArray with values that have been temporally matched
    """

    # return the DataArray with only rows that match dates
    return original.where(original.time.isin(matching.time),drop=True)

reof(stack, variance_threshold=0.727, n_modes=4)

Function to perform rotated empirical othogonal function (eof) on a spatial timeseries

Parameters:

Name Type Description Default
stack DataArray

DataArray of spatial temporal values with coord order of (t,y,x)

required
variance_threshold(float, optional

optional fall back value to select number of eof modes to use. Only used if n_modes is less than 1. default = 0.727

required
n_modes int

number of eof modes to use. default = 4

4

Returns:

Type Description
xarray.core.dataset.Dataset

xr.Dataset: rotated eof dataset with spatial modes, temporal modes, and mean values as variables

Source code in fierpy/fier.py
def reof(stack: xr.DataArray, variance_threshold: float = 0.727, n_modes: int = 4) -> xr.Dataset:
    """Function to perform rotated empirical othogonal function (eof) on a spatial timeseries

    args:
        stack (xr.DataArray): DataArray of spatial temporal values with coord order of (t,y,x)
        variance_threshold(float, optional): optional fall back value to select number of eof
            modes to use. Only used if n_modes is less than 1. default = 0.727
        n_modes (int, optional): number of eof modes to use. default = 4

    returns:
        xr.Dataset: rotated eof dataset with spatial modes, temporal modes, and mean values
            as variables

    """
    # extract out some dimension shape information
    shape3d = stack.shape
    spatial_shape = shape3d[1:]
    shape2d = (shape3d[0],np.prod(spatial_shape))

    # flatten the data from [t,y,x] to [t,...]
    da_flat = xr.DataArray(
        stack.values.reshape(shape2d),
        coords = [stack.time,np.arange(shape2d[1])],
        dims=['time','space']
    )
    #logger.debug(da_flat)

    ## find the temporal mean for each pixel
    center = da_flat.mean(dim='time')

    centered = da_flat - center

    # get an eof solver object
    # explicitly set center to false since data is already
    #solver = Eof(centered,center=False)
    solver = Eof(centered,center=False)

    # check if the n_modes keyword is set to a realistic value
    # if not get n_modes based on variance explained
    if n_modes < 0:
        n_modes = int((solver.varianceFraction().cumsum() < variance_threshold).sum())

    # calculate to spatial eof values
    eof_components = solver.eofs(neofs=n_modes).transpose()
    # get the indices where the eof is valid data
    non_masked_idx = np.where(np.logical_not(np.isnan(eof_components[:,0])))[0]

    # create a "blank" array to set roated values to
    rotated = eof_components.values.copy()

    # # waiting for release of sklean version >= 0.24
    # # until then have a placeholder function to do the rotation
    # fa = FactorAnalysis(n_components=n_modes, rotation="varimax")
    # rotated[non_masked_idx,:] = fa.fit_transform(eof_components[non_masked_idx,:])

    # apply varimax rotation to eof components
    # placeholder function until sklearn version >= 0.24
    rotated[non_masked_idx,:] = _ortho_rotation(eof_components[non_masked_idx,:])

    # project the original time series data on the rotated eofs
    projected_pcs = np.dot(centered[:,non_masked_idx], rotated[non_masked_idx,:])

    # reshape the rotated eofs to a 3d array of [y,x,c]
    spatial_rotated = rotated.reshape(spatial_shape+(n_modes,))

    # structure the spatial and temporal reof components in a Dataset
    reof_ds = xr.Dataset(
        {
            "spatial_modes": (["lat","lon","mode"],spatial_rotated),
            "temporal_modes":(["time","mode"],projected_pcs),
            "center": (["lat","lon"],center.values.reshape(spatial_shape))
        },
        coords = {
            "lon":(["lon"],stack.lon),
            "lat":(["lat"],stack.lat),
            "time":stack.time,
            "mode": np.arange(n_modes)+1
        }
    )

    return reof_ds

sel_best_fit(fit_dict, metric='r', ranking='max')

Function to extract out the best fit based on user defined metric and ranking

Parameters:

Name Type Description Default
fit_dict dict

output from function find_fits()

required
metric str

statitical metric to rank, options are 'r', 'r2' or 'rmse'. default = 'r'

'r'
ranking str

type of ranking to perform, options are 'min' or 'max'. default = max

'max'

Returns:

Type Description
tuple

tuple: tuple of values containg the coefficient key, mode, and coefficients of best fit

Source code in fierpy/fier.py
def sel_best_fit(fit_dict: dict, metric: str = "r",ranking: str = "max") -> tuple:
    """Function to extract out the best fit based on user defined metric and ranking

    args:
        fit_dict (dict): output from function `find_fits()`
        metric (str, optional): statitical metric to rank, options are 'r', 'r2' or 'rmse'. default = 'r'
        ranking (str, optional): type of ranking to perform, options are 'min' or 'max'. default = max

    returns:
        tuple: tuple of values containg the coefficient key, mode, and coefficients of best fit
    """
    def max_ranking(old,new):
        key,ranker = old
        k,v = new
        if v > ranker:
            key = k
            ranker = v
        return (key,ranker)

    def min_ranking(old,new):
        key,ranker = old
        k,v = new
        if v < ranker:
            key = k
            ranker = v
        return (key,ranker)

    if metric not in ["r","r2","rmse"]:
        raise ValueError("could not determine metric to rank, options are 'r', 'r2' or 'rmse'")

    if ranking == "max":
        ranker = -1
        ranking_f = max_ranking
    elif rankin == "min":
        ranker = 999
        ranking_f = min_ranking
    else:
        raise ValueError("could not determine ranking, options are 'min' or 'max'")

    ranked = ("",ranker)

    for k,v in fit_dict.items():
        if k.endswith(metric):
            ranked = ranking_f(ranked,(k,v))

    key_split = ranked[0].split("_")
    ranked_key = "_".join(key_split[:-2] + ["coeffs"])
    coeffs = fit_dict[ranked_key]
    mode = int(key_split[0].replace("mode",""))

    return (ranked_key,mode,coeffs)

synthesize(reof_ds, q_df, polynomial, mode=1)

Function to synthesize data from reof data and regression coefficients. This will also format the result as a geospatially aware xr.DataArray

Parameters:

Name Type Description Default
reof_ds xarray.core.dataset.Dataset required
q_df DataArray required
polynomial poly1d required
mode int 1

Returns:

Type Description
DataArray

xr.DataArray: resulting synthesized data based on reof temporal regression and spatial modes

Source code in fierpy/fier.py
def synthesize(reof_ds: xr.Dataset, q_df: xr.DataArray, polynomial: np.poly1d, mode: int = 1) -> xr.DataArray:
    """Function to synthesize data from reof data and regression coefficients.
    This will also format the result as a geospatially aware xr.DataArray

    args:
        reof_ds (xr.Dataset):
        q_df (xr.DataArray):
        polynomial (np.poly1d):
        mode (int, optional):


    returns:
        xr.DataArray: resulting synthesized data based on reof temporal regression
            and spatial modes
    """

    y_vals = xr.apply_ufunc(polynomial,q_df)

    logger.debug(y_vals)

    synth = y_vals * reof_ds.spatial_modes.sel(mode=mode) + reof_ds.center

    # drop the unneeded mode dimension
    # force sorting by time in case the array is not already
    synth = synth.astype(np.float32).drop("mode").sortby("time")


    return synth

synthesize_indep(reof_ds, q_df, model_mode_order, model_path='.\\model_path\\')

Function to synthesize data at time of interest and output as DataArray

Source code in fierpy/fier.py
def synthesize_indep(reof_ds: xr.Dataset, q_df: xr.DataArray, model_mode_order, model_path='.\\model_path\\'):
    """Function to synthesize data at time of interest and output as DataArray

    """
    mode_list=list(model_mode_order)
    for num_mode in mode_list:
        #for order in range(1,4):

        f = np.poly1d(np.load(model_path+'\poly'+'{num:0>2}'.format(num=str(num_mode))+'_deg'+'{num:0>2}'.format(num=model_mode_order[str(num_mode)])+'.npy'))
        #logger.debug('{model_mode_order[str(mode)]:0>2}'+'.npy')


        y_vals = xr.apply_ufunc(f, q_df)
        logger.debug(y_vals)

        synth = y_vals * reof_ds.spatial_modes.sel(mode=int(num_mode)) # + reof_ds.center

        synth = synth.astype(np.float32).drop("mode").sortby("time")

        return synth