Coverage for src/gwtransport/utils.py: 91%
142 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-06 15:45 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-06 15:45 +0000
1"""General utilities for the 1D groundwater transport model."""
3from collections.abc import Sequence
5import numpy as np
6import numpy.typing as npt
7import pandas as pd
8from scipy import interpolate
11def linear_interpolate(x_ref, y_ref, x_query, left=None, right=None):
12 """
13 Linear interpolation on monotonically increasing data.
15 Parameters
16 ----------
17 x_ref : array-like
18 Reference vector with sorted x-values.
19 y_ref : array-like
20 Reference vector with y-values.
21 x_query : array-like
22 Query x-values. Array may have any shape.
23 left : float, optional
24 Value to return for x_query < x_ref[0].
25 - If `left` is set to None, x_query = x_ref[0].
26 - If `left` is set to a float, such as np.nan, this value is returned.
27 right : float, optional
28 Value to return for x_query > x_ref[-1].
29 - If `right` is set to None, x_query = x_ref[-1].
30 - If `right` is set to a float, such as np.nan, this value is returned.
32 Returns
33 -------
34 array
35 Interpolated y-values.
36 """
37 x_ref = np.asarray(x_ref)
38 y_ref = np.asarray(y_ref)
39 x_query = np.asarray(x_query)
41 # Find indices where x_query would be inserted in x_ref
42 idx_no_edges = np.searchsorted(x_ref, x_query)
44 idx = np.clip(idx_no_edges, 1, len(x_ref) - 1)
46 # Calculate interpolation weights
47 x0 = x_ref[idx - 1]
48 x1 = x_ref[idx]
49 y0 = y_ref[idx - 1]
50 y1 = y_ref[idx]
52 # Perform linear interpolation
53 weights = (x_query - x0) / (x1 - x0)
54 y_query = y0 + weights * (y1 - y0)
56 # Handle edge cases
57 if left is None:
58 y_query = np.where(x_query < x_ref[0], y_ref[0], y_query)
59 if right is None:
60 y_query = np.where(x_query > x_ref[-1], y_ref[-1], y_query)
61 if isinstance(left, float):
62 y_query = np.where(x_query < x_ref[0], left, y_query)
63 if isinstance(right, float):
64 y_query = np.where(x_query > x_ref[-1], right, y_query)
66 return y_query
69def interp_series(series, index_new, **interp1d_kwargs):
70 """
71 Interpolate a pandas.Series to a new index.
73 Parameters
74 ----------
75 series : pandas.Series
76 Series to interpolate.
77 index_new : pandas.DatetimeIndex
78 New index to interpolate to.
79 interp1d_kwargs : dict, optional
80 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}.
82 Returns
83 -------
84 pandas.Series
85 Interpolated series.
86 """
87 series = series[series.index.notna() & series.notna()]
88 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D")
89 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D")
90 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs)
91 return interp_obj(dt_interp)
94def diff(a, alignment="centered"):
95 """Compute the cell widths for a given array of cell coordinates.
97 If alignment is "centered", the coordinates are assumed to be centered in the cells.
98 If alignment is "left", the coordinates are assumed to be at the left edge of the cells.
99 If alignment is "right", the coordinates are assumed to be at the right edge of the cells.
101 Parameters
102 ----------
103 a : array-like
104 Input array.
106 Returns
107 -------
108 array
109 Array with differences between elements.
110 """
111 if alignment == "centered":
112 mid = a[:-1] + (a[1:] - a[:-1]) / 2
113 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]]))
114 if alignment == "left":
115 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]]))
116 if alignment == "right":
117 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1]))
119 msg = f"Invalid alignment: {alignment}"
120 raise ValueError(msg)
123def linear_average( # noqa: C901
124 x_data: Sequence[float] | npt.NDArray[np.float64],
125 y_data: Sequence[float] | npt.NDArray[np.float64],
126 x_edges: Sequence[float] | npt.NDArray[np.float64],
127 extrapolate_method: str = "nan",
128) -> npt.NDArray[np.float64]:
129 """
130 Compute the average value of a piecewise linear time series between specified x-edges.
132 Parameters
133 ----------
134 x_data : array-like
135 x-coordinates of the time series data points, must be in ascending order
136 y_data : array-like
137 y-coordinates of the time series data points
138 x_edges : array-like
139 x-coordinates of the integration edges, must be in ascending order
140 extrapolate_method : str, optional
141 Method for handling extrapolation. Default is 'nan'.
142 - 'outer': Extrapolate using the outermost data points.
143 - 'nan': Extrapolate using np.nan.
144 - 'raise': Raise an error for out-of-bounds values.
146 Returns
147 -------
148 numpy.ndarray
149 Array of average values between consecutive pairs of x_edges
151 Examples
152 --------
153 >>> x_data = [0, 1, 2, 3]
154 >>> y_data = [0, 1, 1, 0]
155 >>> x_edges = [0, 1.5, 3]
156 >>> linear_average(x_data, y_data, x_edges)
157 array([0.667, 0.667])
158 """
159 # Input validation
160 if len(x_data) != len(y_data) and len(x_data) > 0:
161 msg = "x_data and y_data must have the same length and be non-empty"
162 raise ValueError(msg)
163 if len(x_edges) < 2: # noqa: PLR2004
164 msg = "x_edges_in_range must contain at least 2 values"
165 raise ValueError(msg)
166 if not np.all(np.diff(x_data) >= 0):
167 msg = "x_data must be in ascending order"
168 raise ValueError(msg)
169 if not np.all(np.diff(x_edges) >= 0):
170 msg = "x_edges must be in ascending order"
171 raise ValueError(msg)
173 show = ~np.isnan(x_data) & ~np.isnan(y_data)
175 if (show.sum() < 2 and extrapolate_method == "nan") or show.sum() == 0: # noqa: PLR2004
176 return np.full(shape=len(x_edges) - 1, fill_value=np.nan)
178 x_data = np.asarray(x_data, dtype=float)[show]
179 y_data = np.asarray(y_data, dtype=float)[show]
180 x_edges = np.asarray(x_edges, dtype=float)
182 # Extrapolate
183 if extrapolate_method == "outer":
184 # bins with x_edges ouside the range of x_data should be nan
185 # Zero-widths are handles at the end of this function
186 x_edges_in_range = np.clip(x_edges, x_data.min(), x_data.max())
187 elif extrapolate_method == "nan":
188 # bins with x_edges ouside the range of x_data should be nan
189 is_within_range = (x_edges >= x_data.min()) & (x_edges <= x_data.max())
190 x_edges_in_range = x_edges[is_within_range]
191 elif extrapolate_method == "raise":
192 if np.any(x_edges < x_data.min()) or np.any(x_edges > x_data.max()):
193 msg = "x_edges must be within the range of x_data"
194 raise ValueError(msg)
195 else:
196 msg = "extrapolate_method must be 'outer', 'nan', or 'raise'"
197 raise ValueError(msg)
199 # Create a combined array of all x points
200 all_x = np.concatenate([x_data, x_edges_in_range])
202 # Get unique values and inverse indices
203 unique_x, inverse_indices = np.unique(all_x, return_inverse=True)
205 # Get indices of where the edges are in the unique array
206 edge_indices = inverse_indices[len(x_data) : len(all_x)]
208 # Interpolate y values at all unique x points
209 unique_y = np.interp(unique_x, x_data, y_data, left=np.nan, right=np.nan)
211 # Compute segment-wise integrals using the trapezoidal rule
212 dx = np.diff(unique_x)
213 y_avg = (unique_y[:-1] + unique_y[1:]) / 2
214 segment_integrals = dx * y_avg
216 # Compute cumulative integral
217 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)])
219 # Compute integral between consecutive edges
220 integral_values = np.diff(cumulative_integral[edge_indices])
222 # Compute widths between consecutive edges
223 edge_widths = np.diff(x_edges_in_range)
225 # Handle zero-width intervals and non-zero width intervals in a vectorized way
226 zero_width_mask = edge_widths == 0
228 # For non-zero width intervals, compute average = integral / width
229 average_values_in_range = np.zeros_like(edge_widths)
230 non_zero_mask = ~zero_width_mask
231 if np.any(non_zero_mask):
232 average_values_in_range[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask]
234 # For zero-width intervals, get the y-value directly from unique_y using edge_indices
235 if np.any(zero_width_mask):
236 # The indices in unique_x for zero-width edge positions
237 zero_width_indices = edge_indices[np.where(zero_width_mask)[0]]
239 # Get all the y-values at once and assign them
240 average_values_in_range[zero_width_mask] = unique_y[zero_width_indices]
242 # Handle extrapolation when 'nan' method is used and some edges are outside data range
243 if extrapolate_method == "nan" and ~np.all(is_within_range):
244 # Identify which bins are completely within the data range
245 bins_within_range = (x_edges[:-1] >= x_data.min()) & (x_edges[1:] <= x_data.max())
246 # Create array of NaNs with same size as the number of bins
247 average_values = np.full(shape=bins_within_range.size, fill_value=np.nan)
248 # Copy calculated averages only to bins that are within data range
249 average_values[bins_within_range] = average_values_in_range
250 return average_values
252 return average_values_in_range
255def partial_isin(bin_edges, timespans):
256 """
257 Calculate the fraction of each bin that falls within each timespan.
259 Parameters
260 ----------
261 bin_edges : array_like
262 1D array of bin edges in ascending order. For n bins, there should be n+1 edges.
263 timespans : array_like
264 Timespans as a 2D array of shape (m, 2) where m is the number of timespans and
265 each row contains [start, end] of a timespan.
267 Returns
268 -------
269 bin_fractions : ndarray
270 2D array of shape (m, n) where m is the number of timespans and n is the number of bins.
271 Each element (i, j) represents the fraction of bin j that falls within timespan i.
273 Notes
274 -----
275 - The function assumes bin_edges and timespans are in the same units.
276 - Bins are defined by their edges, i.e., bin j spans from bin_edges[j] to bin_edges[j+1].
277 - Values range from 0 (no overlap) to 1 (complete overlap).
279 Examples
280 --------
281 >>> bin_edges = np.array([0, 10, 20, 30])
282 >>> timespans = np.array([[5, 25], [15, 35]])
283 >>> partial_isin(bin_edges, timespans)
284 array([[0.5, 1. , 0.5],
285 [0. , 0.5, 1. ]])
286 """
287 # Convert inputs to numpy arrays
288 bin_edges = np.asarray(bin_edges)
289 timespans = np.asarray(timespans)
291 # Validate inputs
292 if bin_edges.ndim != 1 or timespans.ndim != 2 or timespans.shape[1] != 2: # noqa: PLR2004
293 msg = "Invalid input shapes: bin_edges must be 1D and timespans must be 2D with shape (m, 2)."
294 raise ValueError(msg)
295 if not np.all(np.diff(bin_edges) > 0):
296 msg = "bin_edges must be in ascending order."
297 raise ValueError(msg)
298 if not np.all(np.diff(timespans) > 0):
299 msg = "timespans must be in ascending order."
300 raise ValueError(msg)
302 # Calculate bin widths
303 bin_widths = np.diff(bin_edges)
305 # Calculate overlapping segments using broadcasting
306 left_edges = bin_edges[:-1][np.newaxis, :]
307 right_edges = bin_edges[1:][np.newaxis, :]
309 starts = timespans[:, 0][:, np.newaxis]
310 ends = timespans[:, 1][:, np.newaxis]
312 overlap_left = np.maximum(left_edges, starts)
313 overlap_right = np.minimum(right_edges, ends)
315 # Calculate overlap widths (clip at 0 to handle non-overlapping cases)
316 overlap_widths = np.maximum(0, overlap_right - overlap_left)
318 # Calculate fraction of each bin that overlaps with timespan
319 return overlap_widths / bin_widths[np.newaxis, :]
322def generate_failed_coverage_badge():
323 """Generate a badge indicating failed coverage."""
324 from genbadge import Badge # type: ignore # noqa: PLC0415
326 b = Badge(left_txt="coverage", right_txt="failed", color="red")
327 b.write_to("coverage_failed.svg", use_shields=False)
330def compute_time_edges(tedges, tstart, tend, number_of_bins):
331 """
332 Compute time edges for binning data based on provided time parameters.
334 This function creates a DatetimeIndex of time bin edges from one of three possible
335 input formats: explicit edges, start times, or end times. The resulting edges
336 define the boundaries of time intervals for data binning.
338 Define either explicit time edges, or start and end times for each bin and leave the others at None.
340 Parameters
341 ----------
342 tedges : pandas.DatetimeIndex or None
343 Explicit time edges for the bins. If provided, must have one more element
344 than the number of bins (n_bins + 1). Takes precedence over tstart and tend.
345 tstart : pandas.DatetimeIndex or None
346 Start times for each bin. Must have the same number of elements as the
347 number of bins. Used when tedges is None.
348 tend : pandas.DatetimeIndex or None
349 End times for each bin. Must have the same number of elements as the
350 number of bins. Used when both tedges and tstart are None.
351 number_of_bins : int
352 The expected number of time bins. Used for validation against the provided
353 time parameters.
355 Returns
356 -------
357 pandas.DatetimeIndex
358 Time edges defining the boundaries of the time bins. Has one more element
359 than number_of_bins.
361 Raises
362 ------
363 ValueError
364 If tedges has incorrect length (not number_of_bins + 1).
365 If tstart has incorrect length (not equal to number_of_bins).
366 If tend has incorrect length (not equal to number_of_bins).
367 If none of tedges, tstart, or tend are provided.
369 Notes
370 -----
371 - When using tstart, the function assumes uniform spacing and extrapolates
372 the final edge based on the spacing between the last two start times.
373 - When using tend, the function assumes uniform spacing and extrapolates
374 the first edge based on the spacing between the first two end times.
375 - All input time data is converted to pandas.DatetimeIndex for consistency.
376 """
377 if tedges is not None:
378 tedges = pd.DatetimeIndex(tedges)
379 if number_of_bins != len(tedges) - 1:
380 msg = "tedges must have one more element than flow"
381 raise ValueError(msg)
383 elif tstart is not None:
384 # Assume the index refers to the time at the start of the measurement interval
385 tstart = pd.DatetimeIndex(tstart)
386 if number_of_bins != len(tstart):
387 msg = "tstart must have the same number of elements as flow"
388 raise ValueError(msg)
390 tedges = tstart.append(tstart[[-1]] + (tstart[-1] - tstart[-2]))
392 elif tend is not None:
393 # Assume the index refers to the time at the end of the measurement interval
394 tend = pd.DatetimeIndex(tend)
395 if number_of_bins != len(tend):
396 msg = "tend must have the same number of elements as flow"
397 raise ValueError(msg)
399 tedges = (tend[[0]] - (tend[1] - tend[0])).append(tend)
401 else:
402 msg = "Either provide tedges, tstart, and tend"
403 raise ValueError(msg)
404 return tedges