-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add routines for L1 and L2 processing #220
base: main
Are you sure you want to change the base?
Conversation
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.42 (8.77 -> 8.35)
- Declining Code Health: 114 findings(s) 🚩
- Improving Code Health: 22 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
def define_l2e_filename(ds, campaign_name: str, station_name: str, sample_interval: int, rolling: bool) -> str: | ||
"""Define L2E file name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
define_l2e_filename has 5 arguments, threshold = 4
def run_disdrodb_l0a( | ||
data_sources=None, | ||
campaign_names=None, | ||
station_names=None, | ||
# Processing options | ||
force: bool = False, | ||
verbose: bool = False, | ||
debugging_mode: bool = False, | ||
parallel: bool = True, | ||
base_dir: Optional[str] = None, | ||
): | ||
"""Run the L0A processing of DISDRODB stations. | ||
|
||
This function allows to launch the processing of many DISDRODB stations with a single command. | ||
From the list of all available DISDRODB stations, it runs the processing of the | ||
stations matching the provided data_sources, campaign_names and station_names. | ||
|
||
Parameters | ||
---------- | ||
data_sources : list | ||
Name of data source(s) to process. | ||
The name(s) must be UPPER CASE. | ||
If campaign_names and station are not specified, process all stations. | ||
The default is ``None``. | ||
campaign_names : list | ||
Name of the campaign(s) to process. | ||
The name(s) must be UPPER CASE. | ||
The default is ``None``. | ||
station_names : list | ||
Station names to process. | ||
The default is ``None``. | ||
force : bool | ||
If ``True``, overwrite existing data into destination directories. | ||
If ``False``, raise an error if there are already data into destination directories. | ||
The default is ``False``. | ||
verbose : bool | ||
Whether to print detailed processing information into terminal. | ||
The default is ``True``. | ||
parallel : bool | ||
If ``True``, the files are processed simultaneously in multiple processes. | ||
By default, the number of process is defined with ``os.cpu_count()``. | ||
If ``False``, the files are processed sequentially in a single process. | ||
debugging_mode : bool | ||
If ``True``, it reduces the amount of data to process. | ||
For L0A, it processes just the first 3 raw data files. | ||
The default is ``False``. | ||
base_dir : str (optional) | ||
Base directory of DISDRODB. Format: ``<...>/DISDRODB``. | ||
If ``None`` (the default), the ``base_dir`` path specified in the DISDRODB active configuration will be used. | ||
""" | ||
# Define products | ||
product = "L0A" | ||
required_product = get_required_product(product) | ||
|
||
# Get list of available stations | ||
list_info = available_stations( | ||
base_dir=base_dir, | ||
product=required_product, | ||
data_sources=data_sources, | ||
campaign_names=campaign_names, | ||
station_names=station_names, | ||
raise_error_if_empty=True, | ||
) | ||
|
||
# Print message | ||
n_stations = len(list_info) | ||
print(f"{product} processing of {n_stations} stations started.") | ||
|
||
# Loop over stations | ||
for data_source, campaign_name, station_name in list_info: | ||
print(f"{product} processing of {data_source} {campaign_name} {station_name} station started.") | ||
# Run processing | ||
run_disdrodb_l0a_station( | ||
base_dir=base_dir, | ||
data_source=data_source, | ||
campaign_name=campaign_name, | ||
station_name=station_name, | ||
# Process options | ||
force=force, | ||
verbose=verbose, | ||
debugging_mode=debugging_mode, | ||
parallel=parallel, | ||
) | ||
print(f"{product} processing of {data_source} {campaign_name} {station_name} station ended.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Large Method
run_disdrodb_l0a has 72 lines, threshold = 70
@@ -1,760 +0,0 @@ | |||
#!/usr/bin/env python3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Missing Arguments Abstractions
The average number of function arguments increases from 5.13 to 9.00, threshold = 4.00
def run_disdrodb_l0c_station( | ||
# Station arguments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
run_disdrodb_l0c_station has 9 arguments, threshold = 4
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #220 +/- ##
===========================================
- Coverage 96.67% 61.06% -35.62%
===========================================
Files 57 85 +28
Lines 3906 6500 +2594
===========================================
+ Hits 3776 3969 +193
- Misses 130 2531 +2401 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.42 (8.77 -> 8.35)
- Declining Code Health: 116 findings(s) 🚩
- Improving Code Health: 22 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
_execute_cmd(cmd) | ||
|
||
|
||
def run_disdrodb_l1( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
run_disdrodb_l1 has 8 arguments, threshold = 4
print(f"{product} processing of {data_source} {campaign_name} {station_name} station ended.") | ||
|
||
|
||
def run_disdrodb_l2e( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
run_disdrodb_l2e has 8 arguments, threshold = 4
print(f"{product} processing of {data_source} {campaign_name} {station_name} station ended.") | ||
|
||
|
||
def run_disdrodb_l2m( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
run_disdrodb_l2m has 8 arguments, threshold = 4
def disdrodb_run_l2e( | ||
# Stations options | ||
data_sources: Optional[str] = None, | ||
campaign_names: Optional[str] = None, | ||
station_names: Optional[str] = None, | ||
# Processing options | ||
force: bool = False, | ||
verbose: bool = True, | ||
parallel: bool = True, | ||
debugging_mode: bool = False, | ||
base_dir: Optional[str] = None, | ||
): | ||
""" | ||
Run the L2E processing of DISDRODB stations. | ||
|
||
This function allows to launch the processing of many DISDRODB stations with a single command. | ||
From the list of all available DISDRODB stations, it runs the processing | ||
of the stations matching the provided data_sources, campaign_names and station_names. | ||
|
||
Parameters | ||
---------- | ||
data_sources : str | ||
Name of data source(s) to process. | ||
The name(s) must be UPPER CASE. | ||
If campaign_names and station are not specified, process all stations. | ||
To specify multiple data sources, write i.e.: --data_sources 'GPM EPFL NCAR' | ||
campaign_names : str | ||
Name of the campaign(s) to process. | ||
The name(s) must be UPPER CASE. | ||
To specify multiple campaigns, write i.e.: --campaign_names 'IPEX IMPACTS' | ||
station_names : str | ||
Station names. | ||
To specify multiple stations, write i.e.: --station_names 'station1 station2' | ||
force : bool | ||
If True, overwrite existing data into destination directories. | ||
If False, raise an error if there are already data into destination directories. | ||
The default is False. | ||
verbose : bool | ||
Whether to print detailed processing information into terminal. | ||
The default is False. | ||
parallel : bool | ||
If True, the files are processed simultaneously in multiple processes. | ||
Each process will use a single thread. | ||
By default, the number of process is defined with os.cpu_count(). | ||
However, you can customize it by typing: DASK_NUM_WORKERS=4 disdrodb_run_l0a | ||
If False, the files are processed sequentially in a single process. | ||
If False, multi-threading is automatically exploited to speed up I/0 tasks. | ||
debugging_mode : bool | ||
If True, it reduces the amount of data to process. | ||
It processes just the first 3 raw data files for each station. | ||
The default is False. | ||
base_dir : str | ||
Base directory of DISDRODB | ||
Format: <...>/DISDRODB | ||
If not specified, uses path specified in the DISDRODB active configuration. | ||
""" | ||
from disdrodb.routines import run_disdrodb_l2e | ||
|
||
# Parse data_sources, campaign_names and station arguments | ||
base_dir = parse_base_dir(base_dir) | ||
data_sources = parse_arg_to_list(data_sources) | ||
campaign_names = parse_arg_to_list(campaign_names) | ||
station_names = parse_arg_to_list(station_names) | ||
|
||
# Run processing | ||
run_disdrodb_l2e( | ||
base_dir=base_dir, | ||
data_sources=data_sources, | ||
campaign_names=campaign_names, | ||
station_names=station_names, | ||
# Processing options | ||
force=force, | ||
verbose=verbose, | ||
debugging_mode=debugging_mode, | ||
parallel=parallel, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
disdrodb_run_l2e has 8 arguments, threshold = 4
def disdrodb_run_l2m( | ||
# Stations options | ||
data_sources: Optional[str] = None, | ||
campaign_names: Optional[str] = None, | ||
station_names: Optional[str] = None, | ||
# Processing options | ||
force: bool = False, | ||
verbose: bool = True, | ||
parallel: bool = True, | ||
debugging_mode: bool = False, | ||
base_dir: Optional[str] = None, | ||
): | ||
""" | ||
Run the L2M processing of DISDRODB stations. | ||
|
||
This function allows to launch the processing of many DISDRODB stations with a single command. | ||
From the list of all available DISDRODB stations, it runs the processing | ||
of the stations matching the provided data_sources, campaign_names and station_names. | ||
|
||
Parameters | ||
---------- | ||
data_sources : str | ||
Name of data source(s) to process. | ||
The name(s) must be UPPER CASE. | ||
If campaign_names and station are not specified, process all stations. | ||
To specify multiple data sources, write i.e.: --data_sources 'GPM EPFL NCAR' | ||
campaign_names : str | ||
Name of the campaign(s) to process. | ||
The name(s) must be UPPER CASE. | ||
To specify multiple campaigns, write i.e.: --campaign_names 'IPEX IMPACTS' | ||
station_names : str | ||
Station names. | ||
To specify multiple stations, write i.e.: --station_names 'station1 station2' | ||
force : bool | ||
If True, overwrite existing data into destination directories. | ||
If False, raise an error if there are already data into destination directories. | ||
The default is False. | ||
verbose : bool | ||
Whether to print detailed processing information into terminal. | ||
The default is False. | ||
parallel : bool | ||
If True, the files are processed simultaneously in multiple processes. | ||
Each process will use a single thread. | ||
By default, the number of process is defined with os.cpu_count(). | ||
However, you can customize it by typing: DASK_NUM_WORKERS=4 disdrodb_run_l0a | ||
If False, the files are processed sequentially in a single process. | ||
If False, multi-threading is automatically exploited to speed up I/0 tasks. | ||
debugging_mode : bool | ||
If True, it reduces the amount of data to process. | ||
It processes just the first 3 raw data files for each station. | ||
The default is False. | ||
base_dir : str | ||
Base directory of DISDRODB | ||
Format: <...>/DISDRODB | ||
If not specified, uses path specified in the DISDRODB active configuration. | ||
""" | ||
from disdrodb.routines import run_disdrodb_l2m | ||
|
||
# Parse data_sources, campaign_names and station arguments | ||
base_dir = parse_base_dir(base_dir) | ||
data_sources = parse_arg_to_list(data_sources) | ||
campaign_names = parse_arg_to_list(campaign_names) | ||
station_names = parse_arg_to_list(station_names) | ||
|
||
# Run processing | ||
run_disdrodb_l2m( | ||
base_dir=base_dir, | ||
data_sources=data_sources, | ||
campaign_names=campaign_names, | ||
station_names=station_names, | ||
# Processing options | ||
force=force, | ||
verbose=verbose, | ||
debugging_mode=debugging_mode, | ||
parallel=parallel, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
disdrodb_run_l2m has 8 arguments, threshold = 4
def get_expected_probabilities(params, cdf_func, pdf_func, bin_edges, probability_method, normalized=False): | ||
""" | ||
Compute the expected probabilities for each bin given the distribution parameters. | ||
|
||
Parameters | ||
---------- | ||
params : array-like | ||
Parameters for the CDF or PDF function. | ||
cdf_func : callable | ||
Cumulative distribution function (CDF) that takes bin edges and parameters as inputs. | ||
pdf_func : callable | ||
Probability density function (PDF) that takes a value and parameters as inputs. | ||
bin_edges : array-like | ||
Edges of the bins for which to compute the probabilities. | ||
probability_method : {'cdf', 'pdf'} | ||
Method to compute the probabilities. If 'cdf', use the CDF to compute probabilities. | ||
If 'pdf', integrate the PDF over each bin range. | ||
normalized : bool, optional | ||
If True, normalize the probabilities to sum to 1. Default is False. | ||
|
||
Returns | ||
------- | ||
expected_probabilities : numpy.ndarray | ||
Array of expected probabilities for each bin. | ||
|
||
Notes | ||
----- | ||
- If the 'cdf' method is used, the probabilities are computed as the difference in CDF values at the bin edges. | ||
- If the 'pdf' method is used, the probabilities are computed by integrating the PDF over each bin range. | ||
- Any zero or negative probabilities are replaced with a very small positive number (1e-10) to ensure optimization. | ||
- If `normalized` is True, the probabilities are normalized to sum to 1. | ||
|
||
""" | ||
if probability_method == "cdf": | ||
# Compute the CDF at bin edges | ||
cdf_vals = cdf_func(bin_edges, params) | ||
# Compute probabilities for each bin | ||
expected_probabilities = np.diff(cdf_vals) | ||
# Replace any zero or negative probabilities with a very small positive number | ||
# --> Otherwise do not optimize ... | ||
expected_probabilities = np.maximum(expected_probabilities, 1e-10) | ||
# Or integrate PDF over the bin range | ||
else: # probability_method == "pdf": | ||
# For each bin, integrate the PDF over the bin range | ||
expected_probabilities = np.array( | ||
[quad(lambda x: pdf_func(x, params), bin_edges[i], bin_edges[i + 1])[0] for i in range(len(bin_edges) - 1)], | ||
) | ||
if normalized: | ||
# Normalize probabilities to sum to 1 | ||
total_probability = np.sum(expected_probabilities) | ||
expected_probabilities /= total_probability | ||
return expected_probabilities |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
get_expected_probabilities has 6 arguments, threshold = 4
def compute_negative_log_likelihood( | ||
params, | ||
bin_edges, | ||
counts, | ||
cdf_func, | ||
pdf_func, | ||
param_constraints=None, | ||
probability_method="cdf", | ||
likelihood="multinomial", | ||
truncated_likelihood=True, | ||
): | ||
""" | ||
General negative log-likelihood function for fitting distributions to binned data. | ||
|
||
Parameters | ||
---------- | ||
params : array-like | ||
Parameters of the distribution. | ||
bin_edges : array-like | ||
Edges of the bins (length N+1). | ||
counts : array-like | ||
Observed counts in each bin (length N). | ||
cdf_func : callable | ||
Cumulative distribution function of the distribution. | ||
pdf_func : callable | ||
Probability density function of the distribution. | ||
param_constraints : callable, optional | ||
Function that checks if parameters are valid. | ||
probability_method : str, optional | ||
Method to compute expected probabilities, either 'cdf' or 'pdf'. Default is 'cdf'. | ||
likelihood : str, optional | ||
Type of likelihood to compute, either 'multinomial' or 'poisson'. Default is 'multinomial'. | ||
truncated_likelihood : bool, optional | ||
Whether to normalize the expected probabilities. Default is True. | ||
nll : float | ||
Negative log-likelihood value. | ||
|
||
Returns | ||
------- | ||
nll: float | ||
The negative log-likelihood value. | ||
""" | ||
# Check if parameters are valid | ||
if param_constraints is not None and not param_constraints(params): | ||
return np.inf | ||
|
||
# Compute (unormalized) expected probabilities using CDF | ||
expected_probabilities = get_expected_probabilities( | ||
params=params, | ||
cdf_func=cdf_func, | ||
pdf_func=pdf_func, | ||
bin_edges=bin_edges, | ||
probability_method=probability_method, | ||
normalized=truncated_likelihood, | ||
) | ||
|
||
# Ensure expected probabilities are valid | ||
if np.any(expected_probabilities <= 0): | ||
return np.inf | ||
|
||
# Compute negative log-likelihood | ||
if likelihood == "poisson": | ||
n_total = np.sum(counts) | ||
expected_counts = expected_probabilities * n_total | ||
expected_counts = np.maximum(expected_counts, 1e-10) # Avoid zero expected counts | ||
nll = -np.sum(counts * np.log(expected_counts) - expected_counts) | ||
else: # likelihood == "multinomial": | ||
# Compute likelihood | ||
nll = -np.sum(counts * np.log(expected_probabilities)) | ||
return nll |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
compute_negative_log_likelihood has 9 arguments, threshold = 4
counts : array-like | ||
The counts for each bin in the histogram. | ||
bin_edges : array-like | ||
The edges of the bins. | ||
probability_method : str, optional | ||
The method to compute probabilities, either ``"cdf"`` or ``"pdf"``. The default is ``"cdf"``. | ||
likelihood : str, optional | ||
The likelihood function to use, either ``"multinomial"`` or ``"poisson"``. | ||
The default is ``"multinomial"``. | ||
truncated_likelihood : bool, optional | ||
Whether to use truncated likelihood. The default is ``True``. | ||
output_dictionary : bool, optional | ||
Whether to return the output as a dictionary. | ||
If False, returns a numpy array. The default is ``True`` | ||
optimizer : str, optional | ||
The optimization method to use. Default is ``"Nelder-Mead"``. | ||
|
||
Returns | ||
------- | ||
dict or numpy.ndarray | ||
The estimated parameters of the lognormal distribution. | ||
If ``output_dictionary`` is ``True``, returns a dictionary with keys ``Nt``, ``mu``, and ``sigma``. | ||
If ``output_dictionary`` is ``False``,returns a numpy array with values [Nt, mu, sigma]. | ||
|
||
Notes | ||
----- | ||
The lognormal distribution is defined as: | ||
N(D) = Nt / (sqrt(2 * pi) * sigma * D) * exp(-(ln(D) - mu)**2 / (2 * sigma**2)) | ||
where Nt is the total number of counts, mu is the mean of the log of the distribution, | ||
and sigma is the standard deviation of the log of the distribution. | ||
|
||
References | ||
---------- | ||
.. [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.lognorm.html#scipy.stats.lognorm | ||
""" | ||
# LogNormal | ||
# - mu = log(scale) | ||
# - loc = 0 | ||
|
||
# Initialize bad results | ||
null_output = ( | ||
{"Nt": np.nan, "mu": np.nan, "sigma": np.nan} if output_dictionary else np.array([np.nan, np.nan, np.nan]) | ||
) | ||
|
||
# Define the CDF and PDF functions for the lognormal distribution | ||
def lognorm_cdf(x, params): | ||
sigma, scale = params | ||
return ss.lognorm.cdf(x, sigma, loc=0, scale=scale) | ||
|
||
def lognorm_pdf(x, params): | ||
sigma, scale = params | ||
return ss.lognorm.pdf(x, sigma, loc=0, scale=scale) | ||
|
||
# Define valid parameters for the lognormal distribution | ||
def param_constraints(params): | ||
sigma, scale = params | ||
return sigma > 0 and scale > 0 | ||
|
||
# Definite initial guess for the parameters | ||
initial_params = [1.0, 1.0] # sigma, scale | ||
|
||
# Define bounds for sigma and scale | ||
bounds = [(1e-6, None), (1e-6, None)] | ||
|
||
# Minimize the negative log-likelihood | ||
with suppress_warnings(): | ||
result = minimize( | ||
compute_negative_log_likelihood, | ||
initial_params, | ||
args=( | ||
bin_edges, | ||
counts, | ||
lognorm_cdf, | ||
lognorm_pdf, | ||
param_constraints, | ||
probability_method, | ||
likelihood, | ||
truncated_likelihood, | ||
), | ||
bounds=bounds, | ||
method=optimizer, | ||
) | ||
|
||
# Check if the fit had success | ||
if not result.success: | ||
return null_output | ||
|
||
# Define Nt | ||
Nt = np.sum(counts).item() | ||
|
||
# Retrieve parameters | ||
params = result.x | ||
if truncated_likelihood: | ||
Nt = get_adjusted_nt(cdf=lognorm_cdf, params=params, Nt=Nt, bin_edges=bin_edges) | ||
sigma, scale = params | ||
mu = np.log(scale) | ||
|
||
# Define output | ||
output = {"Nt": Nt, "mu": mu, "sigma": sigma} if output_dictionary else np.array([Nt, mu, sigma]) | ||
return output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
estimate_lognormal_parameters has 7 arguments, threshold = 4
): | ||
""" | ||
Estimate the parameters of an exponential distribution given histogram data. | ||
|
||
Parameters | ||
---------- | ||
counts : array-like | ||
The counts for each bin in the histogram. | ||
bin_edges : array-like | ||
The edges of the bins. | ||
probability_method : str, optional | ||
The method to compute probabilities, either ``"cdf"`` or ``"pdf"``. The default is ``"cdf"``. | ||
likelihood : str, optional | ||
The likelihood function to use, either ``"multinomial"`` or ``"poisson"``. | ||
The default is ``"multinomial"``. | ||
truncated_likelihood : bool, optional | ||
Whether to use truncated likelihood. The default is ``True``. | ||
output_dictionary : bool, optional | ||
Whether to return the output as a dictionary. | ||
If False, returns a numpy array. The default is ``True`` | ||
optimizer : str, optional | ||
The optimization method to use. Default is ``"Nelder-Mead"``. | ||
|
||
Returns | ||
------- | ||
dict or numpy.ndarray | ||
The estimated parameters of the exponential distribution. | ||
If ``output_dictionary`` is ``True``, returns a dictionary with keys ``N0`` and ``Lambda``. | ||
If `output_dictionary` is ``False``, returns a numpy array with [N0, Lambda]. | ||
|
||
Notes | ||
----- | ||
The exponential distribution is defined as: | ||
N(D) = N0 * exp(-Lambda * D) = Nt * Lambda * exp(-Lambda * D) | ||
where Lambda = 1 / scale and N0 = Nt * Lambda. | ||
|
||
References | ||
---------- | ||
.. [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.expon.html | ||
""" | ||
# Initialize bad results | ||
null_output = {"N0": np.nan, "Lambda": np.nan} if output_dictionary else np.array([np.nan, np.nan]) | ||
|
||
# Define the CDF and PDF functions for the exponential distribution | ||
def exp_cdf(x, params): | ||
scale = params[0] | ||
return ss.expon.cdf(x, loc=0, scale=scale) | ||
|
||
def exp_pdf(x, params): | ||
scale = params[0] | ||
return ss.expon.pdf(x, loc=0, scale=scale) | ||
|
||
# Define valid parameters for the exponential distribution | ||
def param_constraints(params): | ||
scale = params[0] | ||
return scale > 0 | ||
|
||
# Definite initial guess for the scale parameter | ||
initial_params = [1.0] # scale | ||
|
||
# Define bounds for scale | ||
bounds = [(1e-6, None)] | ||
|
||
# Minimize the negative log-likelihood | ||
with suppress_warnings(): | ||
result = minimize( | ||
compute_negative_log_likelihood, | ||
initial_params, | ||
args=( | ||
bin_edges, | ||
counts, | ||
exp_cdf, | ||
exp_pdf, | ||
param_constraints, | ||
probability_method, | ||
likelihood, | ||
truncated_likelihood, | ||
), | ||
bounds=bounds, | ||
method=optimizer, | ||
) | ||
|
||
# Check if the fit had success | ||
if not result.success: | ||
return null_output | ||
|
||
# Define Nt | ||
Nt = np.sum(counts).item() | ||
|
||
# Retrieve parameters | ||
params = result.x | ||
if truncated_likelihood: | ||
Nt = get_adjusted_nt(cdf=exp_cdf, params=params, Nt=Nt, bin_edges=bin_edges) | ||
scale = params[0] | ||
Lambda = 1 / scale | ||
N0 = Nt * Lambda | ||
|
||
# Define output | ||
output = {"N0": N0, "Lambda": Lambda} if output_dictionary else np.array([N0, Lambda]) | ||
return output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
estimate_exponential_parameters has 7 arguments, threshold = 4
def get_lognormal_parameters( | ||
ds, | ||
probability_method="cdf", | ||
likelihood="multinomial", | ||
truncated_likelihood=True, | ||
optimizer="Nelder-Mead", | ||
): | ||
""" | ||
Estimate lognormal distribution parameters for drop size distribution (DSD) data. | ||
|
||
Parameters | ||
---------- | ||
ds : xarray.Dataset | ||
Input dataset containing drop size distribution data. It must include the following variables: | ||
- ``drop_number_concentration``: The number concentration of drops. | ||
- ``diameter_bin_width``": The width of each diameter bin. | ||
- ``diameter_bin_lower``: The lower bounds of the diameter bins. | ||
- ``diameter_bin_upper``: The upper bounds of the diameter bins. | ||
- ``diameter_bin_center``: The center values of the diameter bins. | ||
probability_method : str, optional | ||
Method to compute probabilities. The default is ``cdf``. | ||
likelihood : str, optional | ||
Likelihood function to use for fitting. The default is ``multinomial``. | ||
truncated_likelihood : bool, optional | ||
Whether to use truncated likelihood. The default is ``True``. | ||
optimizer : str, optional | ||
Optimization method to use. The default is ``Nelder-Mead``. | ||
|
||
Returns | ||
------- | ||
xarray.Dataset | ||
Dataset containing the estimated lognormal distribution parameters: | ||
- ``Nt``: Total number concentration. | ||
- ``mu``: Mean of the lognormal distribution. | ||
- ``sigma``: Standard deviation of the lognormal distribution. | ||
The resulting dataset will have an attribute ``disdrodb_psd_model`` set to ``LognormalPSD``. | ||
|
||
Notes | ||
----- | ||
The function uses `xr.apply_ufunc` to fit the lognormal distribution parameters | ||
in parallel, leveraging Dask for parallel computation. | ||
|
||
""" | ||
# Define inputs | ||
counts = ds["drop_number_concentration"] * ds["diameter_bin_width"] | ||
diameter_breaks = np.append(ds["diameter_bin_lower"].data, ds["diameter_bin_upper"].data[-1]) | ||
|
||
# Define kwargs | ||
kwargs = { | ||
"output_dictionary": False, | ||
"bin_edges": diameter_breaks, | ||
"probability_method": probability_method, | ||
"likelihood": likelihood, | ||
"truncated_likelihood": truncated_likelihood, | ||
"optimizer": optimizer, | ||
} | ||
|
||
# Fit distribution in parallel | ||
da_params = xr.apply_ufunc( | ||
estimate_lognormal_parameters, | ||
counts, | ||
kwargs=kwargs, | ||
input_core_dims=[["diameter_bin_center"]], | ||
output_core_dims=[["parameters"]], | ||
vectorize=True, | ||
dask="parallelized", | ||
dask_gufunc_kwargs={"output_sizes": {"parameters": 3}}, # lengths of the new output_core_dims dimensions. | ||
output_dtypes=["float64"], | ||
) | ||
|
||
# Add parameters coordinates | ||
da_params = da_params.assign_coords({"parameters": ["Nt", "mu", "sigma"]}) | ||
|
||
# Create parameters dataset | ||
ds_params = da_params.to_dataset(dim="parameters") | ||
|
||
# Add DSD model name to the attribute | ||
ds_params.attrs["disdrodb_psd_model"] = "LognormalPSD" | ||
|
||
return ds_params |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
get_lognormal_parameters has 5 arguments, threshold = 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.45 (8.77 -> 8.32)
- Declining Code Health: 119 findings(s) 🚩
- Improving Code Health: 22 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
# Define product name | ||
product = "L0A" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
run_l0a increases from 102 to 134 lines of code, threshold = 70
# Define product name | ||
product = "L0B" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
run_l0b_from_nc increases from 112 to 125 lines of code, threshold = 70
@@ -1,760 +0,0 @@ | |||
#!/usr/bin/env python3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Lines of Code in a Single File
The lines of code increases from 605 to 845, improve code health by reducing it to 600
debugging_mode: bool = False, | ||
parallel: bool = True, | ||
base_dir: Optional[str] = None, | ||
remove_l0a: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Code Duplication
introduced similar code in: run_disdrodb_l0,run_disdrodb_l0c,run_disdrodb_l0c_station,run_disdrodb_l1 and 5 more functions
def __init__(self, Nw=1.0, D50=1.0, mu=0.0, Dmin=0, Dmax=None): | ||
self.D50 = D50 | ||
self.mu = mu | ||
self.Dmin = Dmin | ||
self.Dmax = 3.0 * D50 if Dmax is None else Dmax | ||
self.Nw = Nw | ||
self.parameters = {"Nw": Nw, "D50": D50, "mu": mu} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
NormalizedGammaPSD.init has 5 arguments, threshold = 4
def get_model_radar_parameters( | ||
ds, | ||
radar_band, | ||
canting_angle_std=7, | ||
diameter_max=8, | ||
axis_ratio="Thurai2007", | ||
): | ||
"""Compute radar parameters from a PSD model. | ||
|
||
Parameters | ||
---------- | ||
ds : xarray.Dataset | ||
Dataset containing the parameters of the PSD model. | ||
The dataset attribute disdrodb_psd_model specifies the PSD model to use. | ||
radar_band : str | ||
Radar band to be used. | ||
canting_angle_std : float, optional | ||
Standard deviation of the canting angle. The default value is 7. | ||
diameter_max : float, optional | ||
Maximum diameter. The default value is 8 mm. | ||
axis_ratio : str, optional | ||
Method to compute the axis ratio. The default method is ``Thurai2007``. | ||
|
||
Returns | ||
------- | ||
xarray.Dataset | ||
Dataset containing the computed radar parameters. | ||
""" | ||
# Retrieve psd model and parameters. | ||
psd_model = ds.attrs["disdrodb_psd_model"] | ||
required_parameters = get_required_parameters(psd_model) | ||
ds_parameters = get_psd_parameters(ds) | ||
|
||
# Check argument validity | ||
axis_ratio = check_axis_ratio(axis_ratio) | ||
radar_band = check_radar_band(radar_band) | ||
|
||
# Retrieve wavelengths in mm | ||
wavelength = get_radar_wavelength(radar_band) | ||
|
||
# Create DataArray with PSD parameters | ||
da_parameters = ds_parameters.to_array(dim="psd_parameters").compute() | ||
|
||
# Initialize scattering table | ||
scatterer = initialize_scatterer( | ||
wavelength=wavelength, | ||
canting_angle_std=canting_angle_std, | ||
D_max=diameter_max, | ||
axis_ratio=axis_ratio, | ||
) | ||
|
||
# Define kwargs | ||
kwargs = { | ||
"output_dictionary": False, | ||
"psd_model": psd_model, | ||
"psd_parameters_names": required_parameters, | ||
"scatterer": scatterer, | ||
} | ||
|
||
# Loop over each PSD (not in parallel --> dask="forbidden") | ||
# - It costs much more to initiate the scatterer rather than looping over timesteps ! | ||
da_radar = xr.apply_ufunc( | ||
_estimate_model_radar_parameters, | ||
da_parameters, | ||
kwargs=kwargs, | ||
input_core_dims=[["psd_parameters"]], | ||
output_core_dims=[["radar_variables"]], | ||
vectorize=True, | ||
dask="forbidden", | ||
dask_gufunc_kwargs={"output_sizes": {"radar_variables": 5}}, # lengths of the new output_core_dims dimensions. | ||
output_dtypes=["float64"], | ||
) | ||
|
||
# Add parameters coordinates | ||
da_radar = da_radar.assign_coords({"radar_variables": ["Zh", "Zdr", "rho_hv", "ldr", "Kdp", "Ai"]}) | ||
|
||
# Create parameters dataset | ||
ds_radar = da_radar.to_dataset(dim="radar_variables") | ||
|
||
# Expand dimensions for later merging | ||
dims_dict = { | ||
"radar_band": [radar_band], | ||
"axis_ratio": [axis_ratio], | ||
"canting_angle_std": [canting_angle_std], | ||
"diameter_max": [diameter_max], | ||
} | ||
ds_radar = ds_radar.expand_dims(dim=dims_dict) | ||
return ds_radar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
get_model_radar_parameters has 5 arguments, threshold = 4
def get_empirical_radar_parameters( | ||
ds, | ||
radar_band=None, | ||
canting_angle_std=7, | ||
diameter_max=8, | ||
axis_ratio="Thurai2007", | ||
): | ||
"""Compute radar parameters from empirical drop number concentration. | ||
|
||
Parameters | ||
---------- | ||
ds : xarray.Dataset | ||
Dataset containing the drop number concentration variable. | ||
radar_band : str | ||
Radar band to be used. | ||
canting_angle_std : float, optional | ||
Standard deviation of the canting angle. The default value is 7. | ||
diameter_max : float, optional | ||
Maximum diameter. The default value is 8 mm. | ||
axis_ratio : str, optional | ||
Method to compute the axis ratio. The default method is ``Thurai2007``. | ||
|
||
Returns | ||
------- | ||
xarray.Dataset | ||
Dataset containing the computed radar parameters. | ||
""" | ||
# Define inputs | ||
da_drop_number_concentration = ds["drop_number_concentration"].compute() | ||
|
||
# Define bin edges | ||
bin_edges = np.append(ds["diameter_bin_lower"].compute().data, ds["diameter_bin_upper"].compute().data[-1]) | ||
|
||
# Check argument validity | ||
axis_ratio = check_axis_ratio(axis_ratio) | ||
radar_band = check_radar_band(radar_band) | ||
|
||
# Retrieve wavelengths in mm | ||
wavelength = get_radar_wavelength(radar_band) | ||
|
||
# Initialize scattering table | ||
scatterer = initialize_scatterer( | ||
wavelength=wavelength, | ||
canting_angle_std=canting_angle_std, | ||
D_max=diameter_max, | ||
axis_ratio=axis_ratio, | ||
) | ||
|
||
# Define kwargs | ||
kwargs = { | ||
"output_dictionary": False, | ||
"bin_edges": bin_edges, | ||
"scatterer": scatterer, | ||
} | ||
|
||
# Loop over each PSD (not in parallel --> dask="forbidden") | ||
# - It costs much more to initiate the scatterer rather than looping over timesteps ! | ||
da_radar = xr.apply_ufunc( | ||
_estimate_empirical_radar_parameters, | ||
da_drop_number_concentration, | ||
kwargs=kwargs, | ||
input_core_dims=[["diameter_bin_center"]], | ||
output_core_dims=[["radar_variables"]], | ||
vectorize=True, | ||
dask="forbidden", | ||
dask_gufunc_kwargs={"output_sizes": {"radar_variables": 5}}, # lengths of the new output_core_dims dimensions. | ||
output_dtypes=["float64"], | ||
) | ||
|
||
# Add parameters coordinates | ||
da_radar = da_radar.assign_coords({"radar_variables": ["Zh", "Zdr", "rho_hv", "ldr", "Kdp", "Ai"]}) | ||
|
||
# Create parameters dataset | ||
ds_radar = da_radar.to_dataset(dim="radar_variables") | ||
|
||
# Expand dimensions for later merging | ||
dims_dict = { | ||
"radar_band": [radar_band], | ||
"axis_ratio": [axis_ratio], | ||
"canting_angle_std": [canting_angle_std], | ||
"diameter_max": [diameter_max], | ||
} | ||
ds_radar = ds_radar.expand_dims(dim=dims_dict) | ||
return ds_radar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
get_empirical_radar_parameters has 5 arguments, threshold = 4
def _define_fill_value(ds, fill_value): | ||
fill_value = {} | ||
for var in ds.data_vars: | ||
if np.issubdtype(ds[var].dtype, np.floating): | ||
fill_value[var] = dtypes.NA | ||
elif np.issubdtype(ds[var].dtype, np.integer): | ||
if "_FillValue" in ds[var].attrs: | ||
fill_value[var] = ds[var].attrs["_FillValue"] | ||
else: | ||
fill_value[var] = np.iinfo(ds[var].dtype).max | ||
return fill_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Bumpy Road Ahead
_define_fill_value has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function
def regularize_dataset( | ||
ds: xr.Dataset, | ||
freq: str, | ||
time_dim: str = "time", | ||
method: Optional[str] = None, | ||
fill_value=None, | ||
): | ||
"""Regularize a dataset across time dimension with uniform resolution. | ||
|
||
Parameters | ||
---------- | ||
ds : xarray.Dataset | ||
xarray Dataset. | ||
time_dim : str, optional | ||
The time dimension in the xarray.Dataset. The default is ``"time"``. | ||
freq : str | ||
The ``freq`` string to pass to `pd.date_range()` to define the new time coordinates. | ||
Examples: ``freq="2min"``. | ||
method : str, optional | ||
Method to use for filling missing timesteps. | ||
If ``None``, fill with ``fill_value``. The default is ``None``. | ||
For other possible methods, see xarray.Dataset.reindex()`. | ||
fill_value : (float, dict), optional | ||
Fill value to fill missing timesteps. | ||
If not specified, for float variables it uses ``dtypes.NA`` while for | ||
for integers variables it uses the maximum allowed integer value or, | ||
in case of undecoded variables, the ``_FillValue`` DataArray attribute.. | ||
|
||
Returns | ||
------- | ||
ds_reindexed : xarray.Dataset | ||
Regularized dataset. | ||
|
||
""" | ||
ds = _check_time_sorted(ds, time_dim=time_dim) | ||
start_time, end_time = get_dataset_start_end_time(ds, time_dim=time_dim) | ||
new_time_index = pd.date_range( | ||
start=pd.to_datetime(start_time), | ||
end=pd.to_datetime(end_time), | ||
freq=freq, | ||
) | ||
|
||
# Define fill_value dictionary | ||
if fill_value is None: | ||
fill_value = _define_fill_value(ds, fill_value) | ||
|
||
# Regularize dataset and fill with NA values | ||
ds = ds.reindex( | ||
{time_dim: new_time_index}, | ||
method=method, # do not fill gaps | ||
# tolerance=tolerance, # mismatch in seconds | ||
fill_value=fill_value, | ||
) | ||
return ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
regularize_dataset has 5 arguments, threshold = 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.44 (8.80 -> 8.36)
- Declining Code Health: 124 findings(s) 🚩
- Improving Code Health: 22 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
@@ -81,8 +81,14 @@ def df_sanitizer_fun(df): | |||
df = df["TO_PARSE"].str.split(":", expand=True, n=1) | |||
df.columns = ["ID", "Value"] | |||
|
|||
# Drop rows with no values | |||
# Select only rows with values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
reader increases from 95 to 98 lines of code, threshold = 70
@@ -64,14 +64,21 @@ def reader( | |||
#### - Define dataframe sanitizer function for L0 processing | |||
def df_sanitizer_fun(df): | |||
# - Import pandas | |||
import numpy as np |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
reader increases from 83 to 87 lines of code, threshold = 70
@@ -71,14 +71,21 @@ def reader( | |||
#### - Define dataframe sanitizer function for L0 processing | |||
def df_sanitizer_fun(df): | |||
# - Import pandas | |||
import numpy as np |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
reader increases from 83 to 87 lines of code, threshold = 70
@@ -82,8 +82,14 @@ def df_sanitizer_fun(df): | |||
df = df["TO_PARSE"].str.split(":", expand=True, n=1) | |||
df.columns = ["ID", "Value"] | |||
|
|||
# Drop rows with no values | |||
# Select only rows with values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
reader increases from 90 to 93 lines of code, threshold = 70
@@ -82,8 +82,18 @@ def df_sanitizer_fun(df): | |||
df = df["TO_PARSE"].str.split(":", expand=True, n=1) | |||
df.columns = ["ID", "Value"] | |||
|
|||
# Drop rows with no values | |||
# Select only rows with values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
reader increases from 78 to 83 lines of code, threshold = 70
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.42 (8.80 -> 8.38)
- Declining Code Health: 118 findings(s) 🚩
- Improving Code Health: 24 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
@@ -24,11 +24,11 @@ | |||
|
|||
from disdrodb.api.info import infer_disdrodb_tree_path_components | |||
from disdrodb.api.path import ( | |||
define_data_dir, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ No longer an issue: Code Duplication
The module no longer contains too many functions with similar structure
import os | ||
|
||
import dask | ||
from dask.distributed import Client, LocalCluster | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ No longer an issue: Large Method
disdrodb_run_l0a_station is no longer above the threshold for lines of code
import os | ||
|
||
import dask | ||
from dask.distributed import Client, LocalCluster | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ No longer an issue: Large Method
disdrodb_run_l0b_station is no longer above the threshold for lines of code
def disdrodb_run_l0c_station( | ||
# Station arguments | ||
data_source: str, | ||
campaign_name: str, | ||
station_name: str, | ||
# L0C processing options | ||
remove_l0b: bool = False, | ||
# Processing options | ||
force: bool = False, | ||
verbose: bool = True, | ||
parallel: bool = True, | ||
debugging_mode: bool = False, | ||
base_dir: Optional[str] = None, | ||
): | ||
"""Run the L0C processing of a specific DISDRODB station from the terminal. | ||
|
||
Parameters | ||
---------- | ||
data_source : str | ||
Institution name (when campaign data spans more than 1 country), | ||
or country (when all campaigns (or sensor networks) are inside a given country). | ||
Must be UPPER CASE. | ||
campaign_name : str | ||
Campaign name. Must be UPPER CASE. | ||
station_name : str | ||
Station name | ||
force : bool | ||
If True, overwrite existing data into destination directories. | ||
If False, raise an error if there are already data into destination directories. | ||
The default is False. | ||
verbose : bool | ||
Whether to print detailed processing information into terminal. | ||
The default is True. | ||
parallel : bool | ||
If True, the files are processed simultaneously in multiple processes. | ||
Each process will use a single thread to avoid issues with the HDF/netCDF library. | ||
By default, the number of process is defined with os.cpu_count(). | ||
However, you can customize it by typing: DASK_NUM_WORKERS=4 disdrodb_run_l0b_station | ||
If False, the files are processed sequentially in a single process. | ||
If False, multi-threading is automatically exploited to speed up I/0 tasks. | ||
debugging_mode : bool | ||
If True, it reduces the amount of data to process. | ||
It processes just the first 100 rows of 3 L0A files. | ||
The default is False. | ||
remove_l0b: bool, optional | ||
Whether to remove the processed L0B files. The default is ``False``. | ||
base_dir : str | ||
Base directory of DISDRODB | ||
Format: <...>/DISDRODB | ||
If not specified, uses path specified in the DISDRODB active configuration. | ||
""" | ||
from disdrodb.l0.l0_processing import run_l0c_station | ||
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster | ||
|
||
base_dir = parse_base_dir(base_dir) | ||
|
||
# -------------------------------------------------------------------------. | ||
# If parallel=True, set the dask environment | ||
if parallel: | ||
cluster, client = initialize_dask_cluster() | ||
|
||
# -------------------------------------------------------------------------. | ||
run_l0c_station( | ||
# Station arguments | ||
data_source=data_source, | ||
campaign_name=campaign_name, | ||
station_name=station_name, | ||
# L0C processing options | ||
remove_l0b=remove_l0b, | ||
# Processing options | ||
force=force, | ||
verbose=verbose, | ||
debugging_mode=debugging_mode, | ||
parallel=parallel, | ||
base_dir=base_dir, | ||
) | ||
|
||
# -------------------------------------------------------------------------. | ||
# Close the cluster | ||
if parallel: | ||
close_dask_cluster(cluster, client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
disdrodb_run_l0c_station has 9 arguments, threshold = 4
def disdrodb_run_l2e_station( | ||
# Station arguments | ||
data_source: str, | ||
campaign_name: str, | ||
station_name: str, | ||
# Processing options | ||
force: bool = False, | ||
verbose: bool = False, | ||
parallel: bool = True, | ||
debugging_mode: bool = False, | ||
base_dir: Optional[str] = None, | ||
): | ||
""" | ||
Run the L2E processing of a specific DISDRODB station from the terminal. | ||
|
||
Parameters | ||
---------- | ||
data_source : str | ||
Institution name (when campaign data spans more than 1 country), | ||
or country (when all campaigns (or sensor networks) are inside a given country). | ||
Must be UPPER CASE. | ||
campaign_name : str | ||
Campaign name. Must be UPPER CASE. | ||
station_name : str | ||
Station name | ||
force : bool | ||
If True, overwrite existing data into destination directories. | ||
If False, raise an error if there are already data into destination directories. | ||
The default is False. | ||
verbose : bool | ||
Whether to print detailed processing information into terminal. | ||
The default is True. | ||
parallel : bool | ||
If True, the files are processed simultaneously in multiple processes. | ||
Each process will use a single thread. | ||
By default, the number of process is defined with os.cpu_count(). | ||
However, you can customize it by typing: DASK_NUM_WORKERS=4 disdrodb_run_l0a_station | ||
If False, the files are processed sequentially in a single process. | ||
If False, multi-threading is automatically exploited to speed up I/0 tasks. | ||
debugging_mode : bool | ||
If True, it reduces the amount of data to process. | ||
It processes just the first 3 raw data files. | ||
The default is False. | ||
base_dir : str | ||
Base directory of DISDRODB. | ||
Format: <...>/DISDRODB | ||
If not specified, uses path specified in the DISDRODB active configuration. | ||
""" | ||
from disdrodb.l2.routines import run_l2e_station | ||
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster | ||
|
||
base_dir = parse_base_dir(base_dir) | ||
|
||
# -------------------------------------------------------------------------. | ||
# If parallel=True, set the dask environment | ||
if parallel: | ||
cluster, client = initialize_dask_cluster() | ||
|
||
# -------------------------------------------------------------------------. | ||
run_l2e_station( | ||
# Station arguments | ||
data_source=data_source, | ||
campaign_name=campaign_name, | ||
station_name=station_name, | ||
# Processing options | ||
force=force, | ||
verbose=verbose, | ||
debugging_mode=debugging_mode, | ||
parallel=parallel, | ||
base_dir=base_dir, | ||
) | ||
|
||
# -------------------------------------------------------------------------. | ||
# Close the cluster | ||
if parallel: | ||
close_dask_cluster(cluster, client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
disdrodb_run_l2e_station has 8 arguments, threshold = 4
def disdrodb_run_l2m_station( | ||
# Station arguments | ||
data_source: str, | ||
campaign_name: str, | ||
station_name: str, | ||
# Processing options | ||
force: bool = False, | ||
verbose: bool = False, | ||
parallel: bool = True, | ||
debugging_mode: bool = False, | ||
base_dir: Optional[str] = None, | ||
): | ||
""" | ||
Run the L2M processing of a specific DISDRODB station from the terminal. | ||
|
||
Parameters | ||
---------- | ||
data_source : str | ||
Institution name (when campaign data spans more than 1 country), | ||
or country (when all campaigns (or sensor networks) are inside a given country). | ||
Must be UPPER CASE. | ||
campaign_name : str | ||
Campaign name. Must be UPPER CASE. | ||
station_name : str | ||
Station name | ||
force : bool | ||
If True, overwrite existing data into destination directories. | ||
If False, raise an error if there are already data into destination directories. | ||
The default is False. | ||
verbose : bool | ||
Whether to print detailed processing information into terminal. | ||
The default is True. | ||
parallel : bool | ||
If True, the files are processed simultaneously in multiple processes. | ||
Each process will use a single thread. | ||
By default, the number of process is defined with os.cpu_count(). | ||
However, you can customize it by typing: DASK_NUM_WORKERS=4 disdrodb_run_l0a_station | ||
If False, the files are processed sequentially in a single process. | ||
If False, multi-threading is automatically exploited to speed up I/0 tasks. | ||
debugging_mode : bool | ||
If True, it reduces the amount of data to process. | ||
It processes just the first 3 raw data files. | ||
The default is False. | ||
base_dir : str | ||
Base directory of DISDRODB. | ||
Format: <...>/DISDRODB | ||
If not specified, uses path specified in the DISDRODB active configuration. | ||
""" | ||
from disdrodb.l2.routines import run_l2m_station | ||
from disdrodb.utils.dask import close_dask_cluster, initialize_dask_cluster | ||
|
||
base_dir = parse_base_dir(base_dir) | ||
|
||
# -------------------------------------------------------------------------. | ||
# If parallel=True, set the dask environment | ||
if parallel: | ||
cluster, client = initialize_dask_cluster() | ||
|
||
# -------------------------------------------------------------------------. | ||
run_l2m_station( | ||
# Station arguments | ||
data_source=data_source, | ||
campaign_name=campaign_name, | ||
station_name=station_name, | ||
# Processing options | ||
force=force, | ||
verbose=verbose, | ||
debugging_mode=debugging_mode, | ||
parallel=parallel, | ||
base_dir=base_dir, | ||
) | ||
|
||
# -------------------------------------------------------------------------. | ||
# Close the cluster | ||
if parallel: | ||
close_dask_cluster(cluster, client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
disdrodb_run_l2m_station has 8 arguments, threshold = 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.42 (8.80 -> 8.38)
- Declining Code Health: 118 findings(s) 🚩
- Improving Code Health: 24 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.42 (8.81 -> 8.39)
- Declining Code Health: 122 findings(s) 🚩
- Improving Code Health: 24 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
@@ -92,10 +92,15 @@ def df_sanitizer_fun(df): | |||
df["time"] = pd.to_datetime(df["time"], format="%d-%m-%Y %H:%M:%S", errors="coerce") | |||
|
|||
# - Split TO_BE_SPLITTED columns | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
reader increases from 72 to 73 lines of code, threshold = 70
@@ -65,40 +65,104 @@ def reader( | |||
#### - Define dataframe sanitizer function for L0 processing | |||
def df_sanitizer_fun(df): | |||
# - Import pandas | |||
import numpy as np |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Large Method
reader has 107 lines, threshold = 70
@@ -82,7 +82,7 @@ def df_sanitizer_fun(df): | |||
possible_delimiters, counts = np.unique(df["TO_BE_SPLITTED"].str.count(","), return_counts=True) | |||
n_delimiters = possible_delimiters[np.argmax(counts)] | |||
|
|||
if n_delimiters == 1027: | |||
if n_delimiters == 1027: # APU 2010 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
reader increases from 85 to 109 lines of code, threshold = 70
@@ -82,7 +82,7 @@ def df_sanitizer_fun(df): | |||
possible_delimiters, counts = np.unique(df["TO_BE_SPLITTED"].str.count(","), return_counts=True) | |||
n_delimiters = possible_delimiters[np.argmax(counts)] | |||
|
|||
if n_delimiters == 1027: | |||
if n_delimiters == 1027: # APU 2010 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Bumpy Road Ahead
reader has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.46 (8.82 -> 8.36)
- Declining Code Health: 134 findings(s) 🚩
- Improving Code Health: 25 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
@@ -17,15 +17,14 @@ | |||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
# -----------------------------------------------------------------------------. | |||
"""Define paths within the DISDRODB infrastructure.""" | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Getting better: Code Duplication
reduced similar code in: define_l0a_station_dir,define_l0b_station_dir
@@ -19,130 +19,114 @@ | |||
"""Implement DISDRODB L0 processing.""" | |||
|
|||
import datetime | |||
import functools |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Lines of Code in a Single File
The lines of code increases from 691 to 785, improve code health by reducing it to 600
# L0B processing options | ||
remove_l0a: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Large Method
run_l0b_station increases from 70 to 133 lines of code, threshold = 70
|
||
|
||
def run_l0b_concat_station( | ||
def run_l0c_station( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Large Method
run_l0c_station has 141 lines, threshold = 70
@@ -19,130 +19,114 @@ | |||
"""Implement DISDRODB L0 processing.""" | |||
|
|||
import datetime | |||
import functools |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Missing Arguments Abstractions
The average number of function arguments increases from 7.18 to 10.11, threshold = 4.00
# - 0 when ok or just rounded to closest 00 | ||
# - 1 if previous timestep is missing | ||
# - 2 if next timestep is missing | ||
# - 3 if previous and next timestep is missing | ||
# - 4 if solved duplicated timesteps | ||
# - 5 if needed to drop duplicated timesteps and select the last | ||
flag_previous_missing = 1 | ||
flag_next_missing = 2 | ||
flag_isolated_timestep = 3 | ||
flag_solved_duplicated_timestep = 4 | ||
flag_dropped_duplicated_timestep = 5 | ||
qc_flag = np.zeros(adjusted_times.shape) | ||
|
||
# Initialize list with the duplicated timesteps index to drop | ||
# - We drop the first occurrence because is likely the shortest interval | ||
idx_to_drop = [] | ||
|
||
# Attempt to resolve for duplicates | ||
if duplicates.size > 0: | ||
# Handle duplicates | ||
for dup_time in duplicates: | ||
# Indices of duplicates | ||
dup_indices = np.where(adjusted_times == dup_time)[0] | ||
n_duplicates = len(dup_indices) | ||
# Define previous and following timestep | ||
prev_time = dup_time - pd.Timedelta(seconds=sample_interval) | ||
next_time = dup_time + pd.Timedelta(seconds=sample_interval) | ||
# Try to find missing slots before and after | ||
# - If more than 3 duplicates, impossible to solve ! | ||
count_solved = 0 | ||
# If the previous timestep is available, set that one | ||
if n_duplicates == 2: | ||
if prev_time not in adjusted_times: | ||
adjusted_times[dup_indices[0]] = prev_time | ||
qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
elif next_time not in adjusted_times: | ||
adjusted_times[dup_indices[-1]] = next_time | ||
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
else: | ||
pass | ||
elif n_duplicates == 3: | ||
if prev_time not in adjusted_times: | ||
adjusted_times[dup_indices[0]] = prev_time | ||
qc_flag[dup_indices[0]] = flag_dropped_duplicated_timestep | ||
count_solved += 1 | ||
if next_time not in adjusted_times: | ||
adjusted_times[dup_indices[-1]] = next_time | ||
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
if count_solved != n_duplicates - 1: | ||
idx_to_drop = np.append(idx_to_drop, dup_indices[0:-1]) | ||
qc_flag[dup_indices[-1]] = flag_dropped_duplicated_timestep | ||
msg = ( | ||
f"Cannot resolve {n_duplicates} duplicated timesteps" | ||
f"(after trailing seconds correction) around {dup_time}." | ||
) | ||
log_warning(logger=logger, msg=msg, verbose=verbose) | ||
if robust: | ||
raise ValueError(msg) | ||
|
||
# Update the time coordinate (Convert to ns for xarray compatibility) | ||
ds = ds.assign_coords({"time": adjusted_times.astype("datetime64[ns]")}) | ||
|
||
# Update quality flag values for next and previous timestep is missing | ||
if add_quality_flag: | ||
idx_previous_missing, idx_next_missing, idx_isolated_missing = get_problematic_timestep_indices( | ||
adjusted_times, | ||
sample_interval, | ||
) | ||
qc_flag[idx_previous_missing] = np.maximum(qc_flag[idx_previous_missing], flag_previous_missing) | ||
qc_flag[idx_next_missing] = np.maximum(qc_flag[idx_next_missing], flag_next_missing) | ||
qc_flag[idx_isolated_missing] = np.maximum(qc_flag[idx_isolated_missing], flag_isolated_timestep) | ||
|
||
# If the first timestep is at 00:00 and currently flagged as previous missing (1), reset to 0 | ||
# first_time = pd.to_datetime(adjusted_times[0]).time() | ||
# first_expected_time = pd.Timestamp("00:00:00").time() | ||
# if first_time == first_expected_time and qc_flag[0] == flag_previous_missing: | ||
# qc_flag[0] = 0 | ||
|
||
# # If the last timestep is flagged and currently flagged as next missing (2), reset it to 0 | ||
# last_time = pd.to_datetime(adjusted_times[-1]).time() | ||
# last_time_expected = (pd.Timestamp("00:00:00") - pd.Timedelta(30, unit="seconds")).time() | ||
# # Check if adding one interval would go beyond the end_time | ||
# if last_time == last_time_expected and qc_flag[-1] == flag_next_missing: | ||
# qc_flag[-1] = 0 | ||
|
||
# Assign time quality flag coordinate | ||
ds["time_qc"] = xr.DataArray(qc_flag, dims="time") | ||
ds = ds.set_coords("time_qc") | ||
|
||
# Drop duplicated timesteps | ||
if len(idx_to_drop) > 0: | ||
idx_to_drop = idx_to_drop.astype(int) | ||
idx_valid_timesteps = np.arange(0, ds["time"].size) | ||
idx_valid_timesteps = np.delete(idx_valid_timesteps, idx_to_drop) | ||
ds = ds.isel(time=idx_valid_timesteps) | ||
# Return dataset | ||
return ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Bumpy Road Ahead
regularize_timesteps has 3 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function
@@ -0,0 +1,545 @@ | |||
# -----------------------------------------------------------------------------. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Overall Code Complexity
This module has a mean cyclomatic complexity of 5.42 across 12 functions. The mean complexity threshold is 4
# - 0 when ok or just rounded to closest 00 | ||
# - 1 if previous timestep is missing | ||
# - 2 if next timestep is missing | ||
# - 3 if previous and next timestep is missing | ||
# - 4 if solved duplicated timesteps | ||
# - 5 if needed to drop duplicated timesteps and select the last | ||
flag_previous_missing = 1 | ||
flag_next_missing = 2 | ||
flag_isolated_timestep = 3 | ||
flag_solved_duplicated_timestep = 4 | ||
flag_dropped_duplicated_timestep = 5 | ||
qc_flag = np.zeros(adjusted_times.shape) | ||
|
||
# Initialize list with the duplicated timesteps index to drop | ||
# - We drop the first occurrence because is likely the shortest interval | ||
idx_to_drop = [] | ||
|
||
# Attempt to resolve for duplicates | ||
if duplicates.size > 0: | ||
# Handle duplicates | ||
for dup_time in duplicates: | ||
# Indices of duplicates | ||
dup_indices = np.where(adjusted_times == dup_time)[0] | ||
n_duplicates = len(dup_indices) | ||
# Define previous and following timestep | ||
prev_time = dup_time - pd.Timedelta(seconds=sample_interval) | ||
next_time = dup_time + pd.Timedelta(seconds=sample_interval) | ||
# Try to find missing slots before and after | ||
# - If more than 3 duplicates, impossible to solve ! | ||
count_solved = 0 | ||
# If the previous timestep is available, set that one | ||
if n_duplicates == 2: | ||
if prev_time not in adjusted_times: | ||
adjusted_times[dup_indices[0]] = prev_time | ||
qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
elif next_time not in adjusted_times: | ||
adjusted_times[dup_indices[-1]] = next_time | ||
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
else: | ||
pass | ||
elif n_duplicates == 3: | ||
if prev_time not in adjusted_times: | ||
adjusted_times[dup_indices[0]] = prev_time | ||
qc_flag[dup_indices[0]] = flag_dropped_duplicated_timestep | ||
count_solved += 1 | ||
if next_time not in adjusted_times: | ||
adjusted_times[dup_indices[-1]] = next_time | ||
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
if count_solved != n_duplicates - 1: | ||
idx_to_drop = np.append(idx_to_drop, dup_indices[0:-1]) | ||
qc_flag[dup_indices[-1]] = flag_dropped_duplicated_timestep | ||
msg = ( | ||
f"Cannot resolve {n_duplicates} duplicated timesteps" | ||
f"(after trailing seconds correction) around {dup_time}." | ||
) | ||
log_warning(logger=logger, msg=msg, verbose=verbose) | ||
if robust: | ||
raise ValueError(msg) | ||
|
||
# Update the time coordinate (Convert to ns for xarray compatibility) | ||
ds = ds.assign_coords({"time": adjusted_times.astype("datetime64[ns]")}) | ||
|
||
# Update quality flag values for next and previous timestep is missing | ||
if add_quality_flag: | ||
idx_previous_missing, idx_next_missing, idx_isolated_missing = get_problematic_timestep_indices( | ||
adjusted_times, | ||
sample_interval, | ||
) | ||
qc_flag[idx_previous_missing] = np.maximum(qc_flag[idx_previous_missing], flag_previous_missing) | ||
qc_flag[idx_next_missing] = np.maximum(qc_flag[idx_next_missing], flag_next_missing) | ||
qc_flag[idx_isolated_missing] = np.maximum(qc_flag[idx_isolated_missing], flag_isolated_timestep) | ||
|
||
# If the first timestep is at 00:00 and currently flagged as previous missing (1), reset to 0 | ||
# first_time = pd.to_datetime(adjusted_times[0]).time() | ||
# first_expected_time = pd.Timestamp("00:00:00").time() | ||
# if first_time == first_expected_time and qc_flag[0] == flag_previous_missing: | ||
# qc_flag[0] = 0 | ||
|
||
# # If the last timestep is flagged and currently flagged as next missing (2), reset it to 0 | ||
# last_time = pd.to_datetime(adjusted_times[-1]).time() | ||
# last_time_expected = (pd.Timestamp("00:00:00") - pd.Timedelta(30, unit="seconds")).time() | ||
# # Check if adding one interval would go beyond the end_time | ||
# if last_time == last_time_expected and qc_flag[-1] == flag_next_missing: | ||
# qc_flag[-1] = 0 | ||
|
||
# Assign time quality flag coordinate | ||
ds["time_qc"] = xr.DataArray(qc_flag, dims="time") | ||
ds = ds.set_coords("time_qc") | ||
|
||
# Drop duplicated timesteps | ||
if len(idx_to_drop) > 0: | ||
idx_to_drop = idx_to_drop.astype(int) | ||
idx_valid_timesteps = np.arange(0, ds["time"].size) | ||
idx_valid_timesteps = np.delete(idx_valid_timesteps, idx_to_drop) | ||
ds = ds.isel(time=idx_valid_timesteps) | ||
# Return dataset | ||
return ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Deep, Nested Complexity
regularize_timesteps has a nested complexity depth of 4, threshold = 4
# - 0 when ok or just rounded to closest 00 | ||
# - 1 if previous timestep is missing | ||
# - 2 if next timestep is missing | ||
# - 3 if previous and next timestep is missing | ||
# - 4 if solved duplicated timesteps | ||
# - 5 if needed to drop duplicated timesteps and select the last | ||
flag_previous_missing = 1 | ||
flag_next_missing = 2 | ||
flag_isolated_timestep = 3 | ||
flag_solved_duplicated_timestep = 4 | ||
flag_dropped_duplicated_timestep = 5 | ||
qc_flag = np.zeros(adjusted_times.shape) | ||
|
||
# Initialize list with the duplicated timesteps index to drop | ||
# - We drop the first occurrence because is likely the shortest interval | ||
idx_to_drop = [] | ||
|
||
# Attempt to resolve for duplicates | ||
if duplicates.size > 0: | ||
# Handle duplicates | ||
for dup_time in duplicates: | ||
# Indices of duplicates | ||
dup_indices = np.where(adjusted_times == dup_time)[0] | ||
n_duplicates = len(dup_indices) | ||
# Define previous and following timestep | ||
prev_time = dup_time - pd.Timedelta(seconds=sample_interval) | ||
next_time = dup_time + pd.Timedelta(seconds=sample_interval) | ||
# Try to find missing slots before and after | ||
# - If more than 3 duplicates, impossible to solve ! | ||
count_solved = 0 | ||
# If the previous timestep is available, set that one | ||
if n_duplicates == 2: | ||
if prev_time not in adjusted_times: | ||
adjusted_times[dup_indices[0]] = prev_time | ||
qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
elif next_time not in adjusted_times: | ||
adjusted_times[dup_indices[-1]] = next_time | ||
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
else: | ||
pass | ||
elif n_duplicates == 3: | ||
if prev_time not in adjusted_times: | ||
adjusted_times[dup_indices[0]] = prev_time | ||
qc_flag[dup_indices[0]] = flag_dropped_duplicated_timestep | ||
count_solved += 1 | ||
if next_time not in adjusted_times: | ||
adjusted_times[dup_indices[-1]] = next_time | ||
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep | ||
count_solved += 1 | ||
if count_solved != n_duplicates - 1: | ||
idx_to_drop = np.append(idx_to_drop, dup_indices[0:-1]) | ||
qc_flag[dup_indices[-1]] = flag_dropped_duplicated_timestep | ||
msg = ( | ||
f"Cannot resolve {n_duplicates} duplicated timesteps" | ||
f"(after trailing seconds correction) around {dup_time}." | ||
) | ||
log_warning(logger=logger, msg=msg, verbose=verbose) | ||
if robust: | ||
raise ValueError(msg) | ||
|
||
# Update the time coordinate (Convert to ns for xarray compatibility) | ||
ds = ds.assign_coords({"time": adjusted_times.astype("datetime64[ns]")}) | ||
|
||
# Update quality flag values for next and previous timestep is missing | ||
if add_quality_flag: | ||
idx_previous_missing, idx_next_missing, idx_isolated_missing = get_problematic_timestep_indices( | ||
adjusted_times, | ||
sample_interval, | ||
) | ||
qc_flag[idx_previous_missing] = np.maximum(qc_flag[idx_previous_missing], flag_previous_missing) | ||
qc_flag[idx_next_missing] = np.maximum(qc_flag[idx_next_missing], flag_next_missing) | ||
qc_flag[idx_isolated_missing] = np.maximum(qc_flag[idx_isolated_missing], flag_isolated_timestep) | ||
|
||
# If the first timestep is at 00:00 and currently flagged as previous missing (1), reset to 0 | ||
# first_time = pd.to_datetime(adjusted_times[0]).time() | ||
# first_expected_time = pd.Timestamp("00:00:00").time() | ||
# if first_time == first_expected_time and qc_flag[0] == flag_previous_missing: | ||
# qc_flag[0] = 0 | ||
|
||
# # If the last timestep is flagged and currently flagged as next missing (2), reset it to 0 | ||
# last_time = pd.to_datetime(adjusted_times[-1]).time() | ||
# last_time_expected = (pd.Timestamp("00:00:00") - pd.Timedelta(30, unit="seconds")).time() | ||
# # Check if adding one interval would go beyond the end_time | ||
# if last_time == last_time_expected and qc_flag[-1] == flag_next_missing: | ||
# qc_flag[-1] = 0 | ||
|
||
# Assign time quality flag coordinate | ||
ds["time_qc"] = xr.DataArray(qc_flag, dims="time") | ||
ds = ds.set_coords("time_qc") | ||
|
||
# Drop duplicated timesteps | ||
if len(idx_to_drop) > 0: | ||
idx_to_drop = idx_to_drop.astype(int) | ||
idx_valid_timesteps = np.arange(0, ds["time"].size) | ||
idx_valid_timesteps = np.delete(idx_valid_timesteps, idx_to_drop) | ||
ds = ds.isel(time=idx_valid_timesteps) | ||
# Return dataset | ||
return ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
regularize_timesteps has 6 arguments, threshold = 4
@@ -156,9 +156,7 @@ def df_sanitizer_fun(df): | |||
"station_name", | |||
"station_number", | |||
"sensor_serial_number", | |||
"sample_interval", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Getting better: Large Method
reader decreases from 105 to 104 lines of code, threshold = 70
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.60 (8.82 -> 8.22)
- Declining Code Health: 144 findings(s) 🚩
- Improving Code Health: 25 findings(s) ✅
- Affected Hotspots: 3 files(s) 🔥
def has_available_data( | ||
data_source, | ||
campaign_name, | ||
station_name, | ||
product, | ||
base_dir=None, | ||
# Option for L2E | ||
sample_interval=None, | ||
rolling=None, | ||
# Option for L2M | ||
model_name=None, | ||
): | ||
"""Return ``True`` if data are available for the given product and station.""" | ||
# Define product directory | ||
data_dir = define_data_dir( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
has_available_data has 8 arguments, threshold = 4
def check_data_availability( | ||
product, | ||
data_source, | ||
campaign_name, | ||
station_name, | ||
base_dir=None, | ||
# Option for L2E | ||
sample_interval=None, | ||
rolling=None, | ||
# Option for L2M | ||
model_name=None, | ||
): | ||
"""Check the station product data directory has files inside. If not, raise an error.""" | ||
if not has_available_data( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
check_data_availability has 8 arguments, threshold = 4
@@ -17,15 +17,14 @@ | |||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
# -----------------------------------------------------------------------------. | |||
"""Define paths within the DISDRODB infrastructure.""" | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Lines of Code in a Single File
This module has 775 lines of code, improve code health by reducing it to 600
@@ -17,15 +17,14 @@ | |||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
# -----------------------------------------------------------------------------. | |||
"""Define paths within the DISDRODB infrastructure.""" | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Low Cohesion
This module has at least 5 different responsibilities amongst its 24 functions, threshold = 4
station_name : str | ||
Name of the station |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Code Duplication
introduced similar code in: define_data_dir_new,define_l0c_filename,define_l1_filename,define_l2e_filename and 3 more functions
def __init__(self, N0=1.0, mu=0.0, Lambda=1.0, Dmin=0, Dmax=None, coverage=0.999): | ||
# Define parameters | ||
self.N0 = N0 | ||
self.Lambda = Lambda | ||
self.mu = mu | ||
self.parameters = {"N0": self.N0, "mu": self.mu, "Lambda": self.Lambda} | ||
# Define Dmin and Dmax | ||
self.Dmin = Dmin | ||
if Dmax is not None: | ||
self.Dmax = Dmax | ||
else: | ||
dmax = gamma.ppf(coverage, a=self.mu + 1.0, scale=1.0 / self.Lambda) | ||
if isinstance(self.Lambda, xr.DataArray): | ||
self.Dmax = xr.DataArray(dmax, dims=self.Lambda.dims, coords=self.Lambda.coords) | ||
else: | ||
self.Dmax = dmax |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
GammaPSD.init has 6 arguments, threshold = 4
def _estimate_empirical_radar_parameters( | ||
drop_number_concentration, | ||
bin_edges, | ||
scatterer, | ||
output_dictionary, | ||
): | ||
# Initialize bad results | ||
if output_dictionary: | ||
null_output = {"Zh": np.nan, "Zdr": np.nan, "rho_hv": np.nan, "ldr": np.nan, "Kdp": np.nan, "Ai": np.nan} | ||
else: | ||
null_output = np.array([np.nan, np.nan, np.nan, np.nan, np.nan, np.nan]) | ||
|
||
# Assign PSD model to the scatterer object | ||
scatterer.psd = BinnedPSD(bin_edges, drop_number_concentration) | ||
|
||
# Get radar variables | ||
with suppress_warnings(): | ||
try: | ||
radar_vars = compute_radar_variables(scatterer) | ||
output = radar_vars if output_dictionary else np.array(list(radar_vars.values())) | ||
except Exception: | ||
output = null_output | ||
return output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Code Duplication
The module contains 4 functions with similar structure: _estimate_empirical_radar_parameters,_estimate_model_radar_parameters,get_empirical_radar_parameters,get_model_radar_parameters
def get_radar_parameters( | ||
ds, | ||
radar_band=None, | ||
canting_angle_std=7, | ||
diameter_max=8, | ||
axis_ratio="Thurai2007", | ||
parallel=True, | ||
): | ||
"""Compute radar parameters from empirical drop number concentration or PSD model. | ||
|
||
Parameters | ||
---------- | ||
ds : xarray.Dataset | ||
Dataset containing the drop number concentration variable. | ||
radar_band : str or list of str, optional | ||
Radar band(s) to be used. | ||
If ``None`` (the default), all available radar bands are used. | ||
canting_angle_std : float or list of float, optional | ||
Standard deviation of the canting angle. The default value is 7. | ||
diameter_max : float or list of float, optional | ||
Maximum diameter. The default value is 8 mm. | ||
axis_ratio : str or list of str, optional | ||
Method to compute the axis ratio. The default method is ``Thurai2007``. | ||
parallel : bool, optional | ||
Whether to compute radar variables in parallel. | ||
The default value is ``True``. | ||
|
||
Returns | ||
------- | ||
xarray.Dataset | ||
Dataset containing the computed radar parameters. | ||
""" | ||
# Decide whether to simulate radar parameters based on empirical PSD or model PSD | ||
if "disdrodb_psd_model" not in ds.attrs and "drop_number_concentration" not in ds: | ||
raise ValueError("The input dataset is not a DISDRODB L2E or L2M product.") | ||
# Model-based simulation | ||
if "disdrodb_psd_model" in ds.attrs: | ||
func = get_model_radar_parameters | ||
ds_subset = get_psd_parameters(ds).compute() | ||
# Empirical PSD simulation | ||
else: | ||
func = get_empirical_radar_parameters | ||
ds_subset = ds[["drop_number_concentration"]].compute() | ||
|
||
# Initialize radar band if not provided | ||
if radar_band is None: | ||
radar_band = available_radar_bands() | ||
|
||
# Ensure parameters are list | ||
diameter_max = np.atleast_1d(diameter_max) | ||
canting_angle_std = np.atleast_1d(canting_angle_std) | ||
axis_ratio = np.atleast_1d(axis_ratio) | ||
radar_band = np.atleast_1d(radar_band) | ||
|
||
# Check parameters validity | ||
axis_ratio = [check_axis_ratio(method) for method in axis_ratio] | ||
radar_band = [check_radar_band(band) for band in radar_band] | ||
|
||
# Retrieve combination of parameters | ||
list_params = [ | ||
{ | ||
"radar_band": rb.item(), | ||
"canting_angle_std": cas.item(), | ||
"axis_ratio": ar.item(), | ||
"diameter_max": d_max.item(), | ||
} | ||
for rb, cas, ar, d_max in itertools.product(radar_band, canting_angle_std, axis_ratio, diameter_max) | ||
] | ||
|
||
# Compute radar variables for each configuration in parallel | ||
# - The function expects the data into memory (no dask arrays !) | ||
if parallel: | ||
list_ds = [dask.delayed(func)(ds_subset, **params) for params in list_params] | ||
list_ds = dask.compute(*list_ds) | ||
else: | ||
list_ds = [func(ds_subset, **params) for params in list_params] | ||
|
||
# Merge into a single dataset | ||
ds_radar = xr.merge(list_ds) | ||
|
||
# Copy global attributes from input dataset | ||
ds_radar.attrs = ds.attrs.copy() | ||
|
||
# Remove single dimensions (add info to attributes) | ||
parameters = ["radar_band", "canting_angle_std", "axis_ratio", "diameter_max"] | ||
for param in parameters: | ||
if ds_radar.sizes[param] == 1: | ||
ds_radar.attrs[f"disdrodb_scattering_{param}"] = ds_radar[param].item() | ||
ds_radar = ds_radar.squeeze() | ||
return ds_radar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Complex Method
get_radar_parameters has a cyclomatic complexity of 16, threshold = 9
def _estimate_model_radar_parameters( | ||
parameters, | ||
psd_model, | ||
psd_parameters_names, | ||
scatterer, | ||
output_dictionary, | ||
): | ||
# Initialize bad results | ||
if output_dictionary: | ||
null_output = {"Zh": np.nan, "Zdr": np.nan, "rho_hv": np.nan, "ldr": np.nan, "Kdp": np.nan, "Ai": np.nan} | ||
else: | ||
null_output = np.array([np.nan, np.nan, np.nan, np.nan, np.nan, np.nan]) | ||
|
||
# Assign PSD model to the scatterer object | ||
parameters = dict(zip(psd_parameters_names, parameters)) | ||
scatterer.psd = create_psd(psd_model, parameters) | ||
|
||
# Get radar variables | ||
with suppress_warnings(): | ||
radar_vars = compute_radar_variables(scatterer) | ||
try: | ||
radar_vars = compute_radar_variables(scatterer) | ||
output = radar_vars if output_dictionary else np.array(list(radar_vars.values())) | ||
except Exception: | ||
output = null_output | ||
return output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
_estimate_model_radar_parameters has 5 arguments, threshold = 4
def get_radar_parameters( | ||
ds, | ||
radar_band=None, | ||
canting_angle_std=7, | ||
diameter_max=8, | ||
axis_ratio="Thurai2007", | ||
parallel=True, | ||
): | ||
"""Compute radar parameters from empirical drop number concentration or PSD model. | ||
|
||
Parameters | ||
---------- | ||
ds : xarray.Dataset | ||
Dataset containing the drop number concentration variable. | ||
radar_band : str or list of str, optional | ||
Radar band(s) to be used. | ||
If ``None`` (the default), all available radar bands are used. | ||
canting_angle_std : float or list of float, optional | ||
Standard deviation of the canting angle. The default value is 7. | ||
diameter_max : float or list of float, optional | ||
Maximum diameter. The default value is 8 mm. | ||
axis_ratio : str or list of str, optional | ||
Method to compute the axis ratio. The default method is ``Thurai2007``. | ||
parallel : bool, optional | ||
Whether to compute radar variables in parallel. | ||
The default value is ``True``. | ||
|
||
Returns | ||
------- | ||
xarray.Dataset | ||
Dataset containing the computed radar parameters. | ||
""" | ||
# Decide whether to simulate radar parameters based on empirical PSD or model PSD | ||
if "disdrodb_psd_model" not in ds.attrs and "drop_number_concentration" not in ds: | ||
raise ValueError("The input dataset is not a DISDRODB L2E or L2M product.") | ||
# Model-based simulation | ||
if "disdrodb_psd_model" in ds.attrs: | ||
func = get_model_radar_parameters | ||
ds_subset = get_psd_parameters(ds).compute() | ||
# Empirical PSD simulation | ||
else: | ||
func = get_empirical_radar_parameters | ||
ds_subset = ds[["drop_number_concentration"]].compute() | ||
|
||
# Initialize radar band if not provided | ||
if radar_band is None: | ||
radar_band = available_radar_bands() | ||
|
||
# Ensure parameters are list | ||
diameter_max = np.atleast_1d(diameter_max) | ||
canting_angle_std = np.atleast_1d(canting_angle_std) | ||
axis_ratio = np.atleast_1d(axis_ratio) | ||
radar_band = np.atleast_1d(radar_band) | ||
|
||
# Check parameters validity | ||
axis_ratio = [check_axis_ratio(method) for method in axis_ratio] | ||
radar_band = [check_radar_band(band) for band in radar_band] | ||
|
||
# Retrieve combination of parameters | ||
list_params = [ | ||
{ | ||
"radar_band": rb.item(), | ||
"canting_angle_std": cas.item(), | ||
"axis_ratio": ar.item(), | ||
"diameter_max": d_max.item(), | ||
} | ||
for rb, cas, ar, d_max in itertools.product(radar_band, canting_angle_std, axis_ratio, diameter_max) | ||
] | ||
|
||
# Compute radar variables for each configuration in parallel | ||
# - The function expects the data into memory (no dask arrays !) | ||
if parallel: | ||
list_ds = [dask.delayed(func)(ds_subset, **params) for params in list_params] | ||
list_ds = dask.compute(*list_ds) | ||
else: | ||
list_ds = [func(ds_subset, **params) for params in list_params] | ||
|
||
# Merge into a single dataset | ||
ds_radar = xr.merge(list_ds) | ||
|
||
# Copy global attributes from input dataset | ||
ds_radar.attrs = ds.attrs.copy() | ||
|
||
# Remove single dimensions (add info to attributes) | ||
parameters = ["radar_band", "canting_angle_std", "axis_ratio", "diameter_max"] | ||
for param in parameters: | ||
if ds_radar.sizes[param] == 1: | ||
ds_radar.attrs[f"disdrodb_scattering_{param}"] = ds_radar[param].item() | ||
ds_radar = ds_radar.squeeze() | ||
return ds_radar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Excess Number of Function Arguments
get_radar_parameters has 6 arguments, threshold = 4
Prework
What kind of change does this PR introduce? (check at least one)
Does this PR introduce a breaking change? (check one)
If yes, please describe the impact and communicate accordingly:
The PR fulfills these requirements:
reader-<institute>-<campaign>
bugfix-<some_key>-<word>
doc-<some_key>-<word>
feature-<some_key>-<word>
refactor-<some_key>-<word>
optimize-<some_key>-<word>
fix #xxx[,#xxx]
, where "xxx" is the issue number)Summary
Add routines for L1 and L2 product generation