Coverage for src/gwtransport/utils.py: 22%
182 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-22 18:30 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-22 18:30 +0000
1"""General utilities for the 1D groundwater transport model."""
3from collections.abc import Sequence
5import numpy as np
6import numpy.typing as npt
7import pandas as pd
8from scipy import interpolate
11def linear_interpolate(x_ref, y_ref, x_query, left=None, right=None):
12 """
13 Linear interpolation on monotonically increasing data.
15 Parameters
16 ----------
17 x_ref : array-like
18 Reference vector with sorted x-values.
19 y_ref : array-like
20 Reference vector with y-values.
21 x_query : array-like
22 Query x-values. Array may have any shape.
23 left : float, optional
24 Value to return for x_query < x_ref[0].
25 - If `left` is set to None, x_query = x_ref[0].
26 - If `left` is set to a float, such as np.nan, this value is returned.
27 right : float, optional
28 Value to return for x_query > x_ref[-1].
29 - If `right` is set to None, x_query = x_ref[-1].
30 - If `right` is set to a float, such as np.nan, this value is returned.
32 Returns
33 -------
34 array
35 Interpolated y-values.
36 """
37 x_ref = np.asarray(x_ref)
38 y_ref = np.asarray(y_ref)
39 x_query = np.asarray(x_query)
41 # Find indices where x_query would be inserted in x_ref
42 idx_no_edges = np.searchsorted(x_ref, x_query)
44 idx = np.clip(idx_no_edges, 1, len(x_ref) - 1)
46 # Calculate interpolation weights
47 x0 = x_ref[idx - 1]
48 x1 = x_ref[idx]
49 y0 = y_ref[idx - 1]
50 y1 = y_ref[idx]
52 # Perform linear interpolation
53 weights = (x_query - x0) / (x1 - x0)
54 y_query = y0 + weights * (y1 - y0)
56 # Handle edge cases
57 if left is None:
58 y_query = np.where(x_query < x_ref[0], y_ref[0], y_query)
59 if right is None:
60 y_query = np.where(x_query > x_ref[-1], y_ref[-1], y_query)
61 if isinstance(left, float):
62 y_query = np.where(x_query < x_ref[0], left, y_query)
63 if isinstance(right, float):
64 y_query = np.where(x_query > x_ref[-1], right, y_query)
66 return y_query
69def interp_series(series, index_new, **interp1d_kwargs):
70 """
71 Interpolate a pandas.Series to a new index.
73 Parameters
74 ----------
75 series : pandas.Series
76 Series to interpolate.
77 index_new : pandas.DatetimeIndex
78 New index to interpolate to.
79 interp1d_kwargs : dict, optional
80 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}.
82 Returns
83 -------
84 pandas.Series
85 Interpolated series.
86 """
87 series = series[series.index.notna() & series.notna()]
88 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D")
89 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D")
90 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs)
91 return interp_obj(dt_interp)
94def diff(a, alignment="centered"):
95 """Compute the cell widths for a given array of cell coordinates.
97 If alignment is "centered", the coordinates are assumed to be centered in the cells.
98 If alignment is "left", the coordinates are assumed to be at the left edge of the cells.
99 If alignment is "right", the coordinates are assumed to be at the right edge of the cells.
101 Parameters
102 ----------
103 a : array-like
104 Input array.
106 Returns
107 -------
108 array
109 Array with differences between elements.
110 """
111 if alignment == "centered":
112 mid = a[:-1] + (a[1:] - a[:-1]) / 2
113 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]]))
114 if alignment == "left":
115 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]]))
116 if alignment == "right":
117 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1]))
119 msg = f"Invalid alignment: {alignment}"
120 raise ValueError(msg)
123def linear_average( # noqa: C901
124 x_data: Sequence[float] | npt.NDArray[np.float64],
125 y_data: Sequence[float] | npt.NDArray[np.float64],
126 x_edges: Sequence[float] | npt.NDArray[np.float64],
127 extrapolate_method: str = "nan",
128) -> npt.NDArray[np.float64]:
129 """
130 Compute the average value of a piecewise linear time series between specified x-edges.
132 Parameters
133 ----------
134 x_data : array-like
135 x-coordinates of the time series data points, must be in ascending order
136 y_data : array-like
137 y-coordinates of the time series data points
138 x_edges : array-like
139 x-coordinates of the integration edges. Can be 1D or 2D.
140 - If 1D: shape (n_edges,). Can be 1D or 2D.
141 - If 1D: shape (n_edges,), must be in ascending order
142 - If 2D: shape (n_series, n_edges), each row must be in ascending order
143 - If 2D: shape (n_series, n_edges), each row must be in ascending order
144 extrapolate_method : str, optional
145 Method for handling extrapolation. Default is 'nan'.
146 - 'outer': Extrapolate using the outermost data points.
147 - 'nan': Extrapolate using np.nan.
148 - 'raise': Raise an error for out-of-bounds values.
150 Returns
151 -------
152 numpy.ndarray
153 2D array of average values between consecutive pairs of x_edges.
154 Shape is (n_series, n_bins) where n_bins = n_edges - 1.
155 If x_edges is 1D, n_series = 1.
157 Examples
158 --------
159 >>> x_data = [0, 1, 2, 3]
160 >>> y_data = [0, 1, 1, 0]
161 >>> x_edges = [0, 1.5, 3]
162 >>> linear_average(x_data, y_data, x_edges)
163 array([[0.667, 0.667]])
165 >>> x_edges_2d = [[0, 1.5, 3], [0.5, 2, 3]]
166 >>> linear_average(x_data, y_data, x_edges_2d)
167 array([[0.667, 0.667], [0.75, 0.5]])
168 """
169 # Convert inputs to numpy arrays
170 x_data = np.asarray(x_data, dtype=float)
171 y_data = np.asarray(y_data, dtype=float)
172 x_edges = np.asarray(x_edges, dtype=float)
174 # Ensure x_edges is always 2D
175 if x_edges.ndim == 1:
176 x_edges = x_edges[np.newaxis, :]
177 elif x_edges.ndim != 2: # noqa: PLR2004
178 msg = "x_edges must be 1D or 2D array"
179 raise ValueError(msg)
181 # Input validation
182 if len(x_data) != len(y_data) or len(x_data) == 0:
183 msg = "x_data and y_data must have the same length and be non-empty"
184 raise ValueError(msg)
185 if x_edges.shape[1] < 2: # noqa: PLR2004
186 msg = "x_edges must contain at least 2 values in each row"
187 raise ValueError(msg)
188 if not np.all(np.diff(x_data) >= 0):
189 msg = "x_data must be in ascending order"
190 raise ValueError(msg)
191 if not np.all(np.diff(x_edges, axis=1) >= 0):
192 msg = "x_edges must be in ascending order along each row"
193 raise ValueError(msg)
195 # Filter out NaN values
196 show = ~np.isnan(x_data) & ~np.isnan(y_data)
197 if show.sum() < 2: # noqa: PLR2004
198 if show.sum() == 1 and extrapolate_method == "outer":
199 # For single data point with outer extrapolation, use constant value
200 constant_value = y_data[show][0]
201 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=constant_value)
202 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=np.nan)
204 x_data_clean = x_data[show]
205 y_data_clean = y_data[show]
207 # Handle extrapolation for all series at once (vectorized)
208 if extrapolate_method == "outer":
209 edges_processed = np.clip(x_edges, x_data_clean.min(), x_data_clean.max())
210 elif extrapolate_method == "raise":
211 if np.any(x_edges < x_data_clean.min()) or np.any(x_edges > x_data_clean.max()):
212 msg = "x_edges must be within the range of x_data"
213 raise ValueError(msg)
214 edges_processed = x_edges.copy()
215 else: # nan method
216 edges_processed = x_edges.copy()
218 # Create a combined grid of all unique x points (data + all edges)
219 all_unique_x = np.unique(np.concatenate([x_data_clean, edges_processed.ravel()]))
221 # Interpolate y values at all unique x points once
222 all_unique_y = np.interp(all_unique_x, x_data_clean, y_data_clean, left=np.nan, right=np.nan)
224 # Compute cumulative integrals once using trapezoidal rule
225 dx = np.diff(all_unique_x)
226 y_avg = (all_unique_y[:-1] + all_unique_y[1:]) / 2
227 segment_integrals = dx * y_avg
228 # Replace NaN values with 0 to avoid breaking cumulative sum
229 segment_integrals = np.nan_to_num(segment_integrals, nan=0.0)
230 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)])
232 # Vectorized computation for all series
233 # Find indices of all edges in the combined grid
234 edge_indices = np.searchsorted(all_unique_x, edges_processed)
236 # Compute integral between consecutive edges for all series (vectorized)
237 integral_values = cumulative_integral[edge_indices[:, 1:]] - cumulative_integral[edge_indices[:, :-1]]
239 # Compute widths between consecutive edges for all series (vectorized)
240 edge_widths = np.diff(edges_processed, axis=1)
242 # Handle zero-width intervals (vectorized)
243 zero_width_mask = edge_widths == 0
244 result = np.zeros_like(edge_widths)
246 # For non-zero width intervals, compute average = integral / width (vectorized)
247 non_zero_mask = ~zero_width_mask
248 result[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask]
250 # For zero-width intervals, interpolate y-value directly (vectorized)
251 if np.any(zero_width_mask):
252 zero_width_positions = edges_processed[:, :-1][zero_width_mask]
253 result[zero_width_mask] = np.interp(zero_width_positions, x_data_clean, y_data_clean)
255 # Handle extrapolation when 'nan' method is used (vectorized)
256 if extrapolate_method == "nan":
257 # Set out-of-range bins to NaN
258 bins_within_range = (x_edges[:, :-1] >= x_data_clean.min()) & (x_edges[:, 1:] <= x_data_clean.max())
259 result[~bins_within_range] = np.nan
261 return result
264def partial_isin(bin_edges_in, bin_edges_out):
265 """
266 Calculate the fraction of each input bin that overlaps with each output bin.
268 This function computes a matrix where element (i, j) represents the fraction
269 of input bin i that overlaps with output bin j. The computation uses
270 vectorized operations to avoid loops.
272 Parameters
273 ----------
274 bin_edges_in : array_like
275 1D array of input bin edges in ascending order. For n_in bins, there
276 should be n_in+1 edges.
277 bin_edges_out : array_like
278 1D array of output bin edges in ascending order. For n_out bins, there
279 should be n_out+1 edges.
281 Returns
282 -------
283 overlap_matrix : numpy.ndarray
284 Dense matrix of shape (n_in, n_out) where n_in is the number of input
285 bins and n_out is the number of output bins. Each element (i, j)
286 represents the fraction of input bin i that overlaps with output bin j.
287 Values range from 0 (no overlap) to 1 (complete overlap).
289 Notes
290 -----
291 - Both input arrays must be sorted in ascending order
292 - The function leverages the sorted nature of both inputs for efficiency
293 - Uses vectorized operations to handle large bin arrays efficiently
294 - All overlaps sum to 1.0 for each input bin when output bins fully cover input range
296 Examples
297 --------
298 >>> bin_edges_in = np.array([0, 10, 20, 30])
299 >>> bin_edges_out = np.array([5, 15, 25])
300 >>> partial_isin(bin_edges_in, bin_edges_out)
301 array([[0.5, 0.0],
302 [0.5, 0.5],
303 [0.0, 0.5]])
304 """
305 # Convert inputs to numpy arrays
306 bin_edges_in = np.asarray(bin_edges_in, dtype=float)
307 bin_edges_out = np.asarray(bin_edges_out, dtype=float)
309 # Validate inputs
310 if bin_edges_in.ndim != 1 or bin_edges_out.ndim != 1:
311 msg = "Both bin_edges_in and bin_edges_out must be 1D arrays"
312 raise ValueError(msg)
313 if len(bin_edges_in) < 2 or len(bin_edges_out) < 2: # noqa: PLR2004
314 msg = "Both edge arrays must have at least 2 elements"
315 raise ValueError(msg)
316 if not np.all(np.diff(bin_edges_in) > 0):
317 msg = "bin_edges_in must be in ascending order"
318 raise ValueError(msg)
319 if not np.all(np.diff(bin_edges_out) > 0):
320 msg = "bin_edges_out must be in ascending order"
321 raise ValueError(msg)
323 # Create bin widths
324 bin_widths_in = np.diff(bin_edges_in)
326 # Build matrix using fully vectorized approach
327 # Create meshgrids for all possible input-output bin combinations
328 in_left = bin_edges_in[:-1, None] # Shape: (n_bins_in, 1)
329 in_right = bin_edges_in[1:, None] # Shape: (n_bins_in, 1)
330 in_width = bin_widths_in[:, None] # Shape: (n_bins_in, 1)
332 out_left = bin_edges_out[None, :-1] # Shape: (1, n_bins_out)
333 out_right = bin_edges_out[None, 1:] # Shape: (1, n_bins_out)
335 # Calculate overlaps for all combinations using broadcasting
336 overlap_left = np.maximum(in_left, out_left) # Shape: (n_bins_in, n_bins_out)
337 overlap_right = np.minimum(in_right, out_right) # Shape: (n_bins_in, n_bins_out)
339 # Calculate overlap widths (zero where no overlap)
340 overlap_widths = np.maximum(0, overlap_right - overlap_left)
342 # Calculate fractions
343 return overlap_widths / in_width
346def generate_failed_coverage_badge():
347 """Generate a badge indicating failed coverage."""
348 from genbadge import Badge # type: ignore # noqa: PLC0415
350 b = Badge(left_txt="coverage", right_txt="failed", color="red")
351 b.write_to("coverage_failed.svg", use_shields=False)
354def combine_bin_series(a, a_edges, b, b_edges, extrapolation=0.0):
355 """
356 Combine two binned series onto a common set of unique edges.
358 This function takes two binned series (a and b) with their respective bin edges
359 and creates new series (c and d) that are defined on a combined set of unique
360 edges from both input edge arrays.
362 Parameters
363 ----------
364 a : array-like
365 Values for the first binned series.
366 a_edges : array-like
367 Bin edges for the first series. Must have len(a) + 1 elements.
368 b : array-like
369 Values for the second binned series.
370 b_edges : array-like
371 Bin edges for the second series. Must have len(b) + 1 elements.
372 extrapolation : str or float, optional
373 Method for handling combined bins that fall outside the original series ranges.
374 - 'nearest': Use the nearest original bin value
375 - float value (e.g., np.nan, 0.0): Fill with the specified value (default: 0.0)
377 Returns
378 -------
379 c : numpy.ndarray
380 Values from series a mapped to the combined edge structure.
381 c_edges : numpy.ndarray
382 Combined unique edges from a_edges and b_edges.
383 d : numpy.ndarray
384 Values from series b mapped to the combined edge structure.
385 d_edges : numpy.ndarray
386 Combined unique edges from a_edges and b_edges (same as c_edges).
388 Notes
389 -----
390 The combined edges are created by taking the union of all unique values
391 from both a_edges and b_edges, sorted in ascending order. The values
392 are then broadcasted/repeated for each combined bin that falls within
393 the original bin's range.
394 """
395 # Convert inputs to numpy arrays
396 a = np.asarray(a, dtype=float)
397 a_edges = np.asarray(a_edges, dtype=float)
398 b = np.asarray(b, dtype=float)
399 b_edges = np.asarray(b_edges, dtype=float)
401 # Validate inputs
402 if len(a_edges) != len(a) + 1:
403 msg = "a_edges must have len(a) + 1 elements"
404 raise ValueError(msg)
405 if len(b_edges) != len(b) + 1:
406 msg = "b_edges must have len(b) + 1 elements"
407 raise ValueError(msg)
409 # Create combined unique edges
410 combined_edges = np.unique(np.concatenate([a_edges, b_edges]))
412 # Initialize output arrays
413 c = np.zeros(len(combined_edges) - 1)
414 d = np.zeros(len(combined_edges) - 1)
416 # Vectorized mapping using searchsorted - find which original bin each combined bin belongs to
417 # For series a: find which original bin each combined bin center falls into
418 combined_bin_centers = (combined_edges[:-1] + combined_edges[1:]) / 2
419 a_bin_assignment = np.searchsorted(a_edges, combined_bin_centers, side="right") - 1
420 a_bin_assignment = np.clip(a_bin_assignment, 0, len(a) - 1)
422 # Handle extrapolation for series a
423 if extrapolation == "nearest":
424 # Assign all values using nearest neighbor (already clipped)
425 c[:] = a[a_bin_assignment]
426 else:
427 # Only assign values where the combined bin is completely within the original bin
428 a_valid_mask = (combined_edges[:-1] >= a_edges[a_bin_assignment]) & (
429 combined_edges[1:] <= a_edges[a_bin_assignment + 1]
430 )
431 c[a_valid_mask] = a[a_bin_assignment[a_valid_mask]]
432 # Fill out-of-range bins with extrapolation value
433 c[~a_valid_mask] = extrapolation
435 # Handle extrapolation for series b
436 b_bin_assignment = np.searchsorted(b_edges, combined_bin_centers, side="right") - 1
437 b_bin_assignment = np.clip(b_bin_assignment, 0, len(b) - 1)
439 if extrapolation == "nearest":
440 # Assign all values using nearest neighbor (already clipped)
441 d[:] = b[b_bin_assignment]
442 else:
443 # Only assign values where the combined bin is completely within the original bin
444 b_valid_mask = (combined_edges[:-1] >= b_edges[b_bin_assignment]) & (
445 combined_edges[1:] <= b_edges[b_bin_assignment + 1]
446 )
447 d[b_valid_mask] = b[b_bin_assignment[b_valid_mask]]
448 # Fill out-of-range bins with extrapolation value
449 d[~b_valid_mask] = extrapolation
451 # Return the combined series
452 c_edges = combined_edges
453 d_edges = combined_edges.copy()
455 return c, c_edges, d, d_edges
458def compute_time_edges(tedges, tstart, tend, number_of_bins):
459 """
460 Compute time edges for binning data based on provided time parameters.
462 This function creates a DatetimeIndex of time bin edges from one of three possible
463 input formats: explicit edges, start times, or end times. The resulting edges
464 define the boundaries of time intervals for data binning.
466 Define either explicit time edges, or start and end times for each bin and leave the others at None.
468 Parameters
469 ----------
470 tedges : pandas.DatetimeIndex or None
471 Explicit time edges for the bins. If provided, must have one more element
472 than the number of bins (n_bins + 1). Takes precedence over tstart and tend.
473 tstart : pandas.DatetimeIndex or None
474 Start times for each bin. Must have the same number of elements as the
475 number of bins. Used when tedges is None.
476 tend : pandas.DatetimeIndex or None
477 End times for each bin. Must have the same number of elements as the
478 number of bins. Used when both tedges and tstart are None.
479 number_of_bins : int
480 The expected number of time bins. Used for validation against the provided
481 time parameters.
483 Returns
484 -------
485 pandas.DatetimeIndex
486 Time edges defining the boundaries of the time bins. Has one more element
487 than number_of_bins.
489 Raises
490 ------
491 ValueError
492 If tedges has incorrect length (not number_of_bins + 1).
493 If tstart has incorrect length (not equal to number_of_bins).
494 If tend has incorrect length (not equal to number_of_bins).
495 If none of tedges, tstart, or tend are provided.
497 Notes
498 -----
499 - When using tstart, the function assumes uniform spacing and extrapolates
500 the final edge based on the spacing between the last two start times.
501 - When using tend, the function assumes uniform spacing and extrapolates
502 the first edge based on the spacing between the first two end times.
503 - All input time data is converted to pandas.DatetimeIndex for consistency.
504 """
505 if tedges is not None:
506 tedges = pd.DatetimeIndex(tedges)
507 if number_of_bins != len(tedges) - 1:
508 msg = "tedges must have one more element than flow"
509 raise ValueError(msg)
511 elif tstart is not None:
512 # Assume the index refers to the time at the start of the measurement interval
513 tstart = pd.DatetimeIndex(tstart)
514 if number_of_bins != len(tstart):
515 msg = "tstart must have the same number of elements as flow"
516 raise ValueError(msg)
518 tedges = tstart.append(tstart[[-1]] + (tstart[-1] - tstart[-2]))
520 elif tend is not None:
521 # Assume the index refers to the time at the end of the measurement interval
522 tend = pd.DatetimeIndex(tend)
523 if number_of_bins != len(tend):
524 msg = "tend must have the same number of elements as flow"
525 raise ValueError(msg)
527 tedges = (tend[[0]] - (tend[1] - tend[0])).append(tend)
529 else:
530 msg = "Either provide tedges, tstart, and tend"
531 raise ValueError(msg)
532 return tedges