Coverage for src/gwtransport/utils.py: 85%
280 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 20:01 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 20:01 +0000
1"""General utilities for the 1D groundwater transport model."""
3from __future__ import annotations
5import io
6from datetime import date
7from pathlib import Path
9import numpy as np
10import numpy.typing as npt
11import pandas as pd
12import requests
13from scipy import interpolate
14from scipy.linalg import null_space
15from scipy.optimize import minimize
17cache_dir = Path(__file__).parent.parent.parent / "cache"
20def linear_interpolate(
21 x_ref: npt.ArrayLike, y_ref: npt.ArrayLike, x_query: npt.ArrayLike, left=None, right=None
22) -> np.ndarray:
23 """
24 Linear interpolation on monotonically increasing data.
26 Parameters
27 ----------
28 x_ref : array-like
29 Reference vector with sorted x-values.
30 y_ref : array-like
31 Reference vector with y-values.
32 x_query : array-like
33 Query x-values. Array may have any shape.
34 left : float, optional
35 Value to return for x_query < x_ref[0].
36 - If `left` is set to None, x_query = x_ref[0].
37 - If `left` is set to a float, such as np.nan, this value is returned.
38 right : float, optional
39 Value to return for x_query > x_ref[-1].
40 - If `right` is set to None, x_query = x_ref[-1].
41 - If `right` is set to a float, such as np.nan, this value is returned.
43 Returns
44 -------
45 numpy.ndarray
46 Interpolated y-values.
47 """
48 x_ref = np.asarray(x_ref)
49 y_ref = np.asarray(y_ref)
50 x_query = np.asarray(x_query)
52 # Find indices where x_query would be inserted in x_ref
53 idx_no_edges = np.searchsorted(x_ref, x_query)
55 idx = np.clip(idx_no_edges, 1, len(x_ref) - 1)
57 # Calculate interpolation weights
58 x0 = x_ref[idx - 1]
59 x1 = x_ref[idx]
60 y0 = y_ref[idx - 1]
61 y1 = y_ref[idx]
63 # Perform linear interpolation
64 weights = (x_query - x0) / (x1 - x0)
65 y_query = y0 + weights * (y1 - y0)
67 # Handle edge cases
68 if left is None:
69 y_query = np.where(x_query < x_ref[0], y_ref[0], y_query)
70 if right is None:
71 y_query = np.where(x_query > x_ref[-1], y_ref[-1], y_query)
72 if isinstance(left, float):
73 y_query = np.where(x_query < x_ref[0], left, y_query)
74 if isinstance(right, float):
75 y_query = np.where(x_query > x_ref[-1], right, y_query)
77 return y_query
80def interp_series(series: pd.Series, index_new: pd.DatetimeIndex, **interp1d_kwargs) -> pd.Series:
81 """
82 Interpolate a pandas.Series to a new index.
84 Parameters
85 ----------
86 series : pandas.Series
87 Series to interpolate.
88 index_new : pandas.DatetimeIndex
89 New index to interpolate to.
90 interp1d_kwargs : dict, optional
91 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}.
93 Returns
94 -------
95 pandas.Series
96 Interpolated series.
97 """
98 series = series[series.index.notna() & series.notna()]
99 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D")
100 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D")
101 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs)
102 return pd.Series(interp_obj(dt_interp), index=index_new)
105def diff(a: npt.ArrayLike, alignment: str = "centered") -> np.ndarray:
106 """Compute the cell widths for a given array of cell coordinates.
108 If alignment is "centered", the coordinates are assumed to be centered in the cells.
109 If alignment is "left", the coordinates are assumed to be at the left edge of the cells.
110 If alignment is "right", the coordinates are assumed to be at the right edge of the cells.
112 Parameters
113 ----------
114 a : array-like
115 Input array.
117 Returns
118 -------
119 numpy.ndarray
120 Array with differences between elements.
121 """
122 if alignment == "centered":
123 mid = a[:-1] + (a[1:] - a[:-1]) / 2
124 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]]))
125 if alignment == "left":
126 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]]))
127 if alignment == "right":
128 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1]))
130 msg = f"Invalid alignment: {alignment}"
131 raise ValueError(msg)
134def linear_average( # noqa: C901
135 x_data: npt.ArrayLike,
136 y_data: npt.ArrayLike,
137 x_edges: npt.ArrayLike,
138 extrapolate_method: str = "nan",
139) -> npt.NDArray[np.float64]:
140 """
141 Compute the average value of a piecewise linear time series between specified x-edges.
143 Parameters
144 ----------
145 x_data : array-like
146 x-coordinates of the time series data points, must be in ascending order
147 y_data : array-like
148 y-coordinates of the time series data points
149 x_edges : array-like
150 x-coordinates of the integration edges. Can be 1D or 2D.
151 - If 1D: shape (n_edges,). Can be 1D or 2D.
152 - If 1D: shape (n_edges,), must be in ascending order
153 - If 2D: shape (n_series, n_edges), each row must be in ascending order
154 - If 2D: shape (n_series, n_edges), each row must be in ascending order
155 extrapolate_method : str, optional
156 Method for handling extrapolation. Default is 'nan'.
157 - 'outer': Extrapolate using the outermost data points.
158 - 'nan': Extrapolate using np.nan.
159 - 'raise': Raise an error for out-of-bounds values.
161 Returns
162 -------
163 numpy.ndarray
164 2D array of average values between consecutive pairs of x_edges.
165 Shape is (n_series, n_bins) where n_bins = n_edges - 1.
166 If x_edges is 1D, n_series = 1.
168 Examples
169 --------
170 >>> x_data = [0, 1, 2, 3]
171 >>> y_data = [0, 1, 1, 0]
172 >>> x_edges = [0, 1.5, 3]
173 >>> linear_average(x_data, y_data, x_edges)
174 array([[0.667, 0.667]])
176 >>> x_edges_2d = [[0, 1.5, 3], [0.5, 2, 3]]
177 >>> linear_average(x_data, y_data, x_edges_2d)
178 array([[0.667, 0.667], [0.75, 0.5]])
179 """
180 # Convert inputs to numpy arrays
181 x_data = np.asarray(x_data, dtype=float)
182 y_data = np.asarray(y_data, dtype=float)
183 x_edges = np.asarray(x_edges, dtype=float)
185 # Ensure x_edges is always 2D
186 if x_edges.ndim == 1:
187 x_edges = x_edges[np.newaxis, :]
188 elif x_edges.ndim != 2: # noqa: PLR2004
189 msg = "x_edges must be 1D or 2D array"
190 raise ValueError(msg)
192 # Input validation
193 if len(x_data) != len(y_data) or len(x_data) == 0:
194 msg = "x_data and y_data must have the same length and be non-empty"
195 raise ValueError(msg)
196 if x_edges.shape[1] < 2: # noqa: PLR2004
197 msg = "x_edges must contain at least 2 values in each row"
198 raise ValueError(msg)
199 if not np.all(np.diff(x_data) >= 0):
200 msg = "x_data must be in ascending order"
201 raise ValueError(msg)
202 if not np.all(np.diff(x_edges, axis=1) >= 0):
203 msg = "x_edges must be in ascending order along each row"
204 raise ValueError(msg)
206 # Filter out NaN values
207 show = ~np.isnan(x_data) & ~np.isnan(y_data)
208 if show.sum() < 2: # noqa: PLR2004
209 if show.sum() == 1 and extrapolate_method == "outer":
210 # For single data point with outer extrapolation, use constant value
211 constant_value = y_data[show][0]
212 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=constant_value)
213 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=np.nan)
215 x_data_clean = x_data[show]
216 y_data_clean = y_data[show]
218 # Handle extrapolation for all series at once (vectorized)
219 if extrapolate_method == "outer":
220 edges_processed = np.clip(x_edges, x_data_clean.min(), x_data_clean.max())
221 elif extrapolate_method == "raise":
222 if np.any(x_edges < x_data_clean.min()) or np.any(x_edges > x_data_clean.max()):
223 msg = "x_edges must be within the range of x_data"
224 raise ValueError(msg)
225 edges_processed = x_edges.copy()
226 else: # nan method
227 edges_processed = x_edges.copy()
229 # Create a combined grid of all unique x points (data + all edges)
230 all_unique_x = np.unique(np.concatenate([x_data_clean, edges_processed.ravel()]))
232 # Interpolate y values at all unique x points once
233 all_unique_y: npt.NDArray[np.float64] = np.interp(
234 all_unique_x, x_data_clean, y_data_clean, left=np.nan, right=np.nan
235 )
237 # Compute cumulative integrals once using trapezoidal rule
238 dx = np.diff(all_unique_x)
239 y_avg = (all_unique_y[:-1] + all_unique_y[1:]) / 2
240 segment_integrals = dx * y_avg
241 # Replace NaN values with 0 to avoid breaking cumulative sum
242 segment_integrals = np.nan_to_num(segment_integrals, nan=0.0)
243 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)])
245 # Vectorized computation for all series
246 # Find indices of all edges in the combined grid
247 edge_indices: npt.NDArray[np.intp] = np.searchsorted(all_unique_x, edges_processed)
249 # Compute integral between consecutive edges for all series (vectorized)
250 integral_values = cumulative_integral[edge_indices[:, 1:]] - cumulative_integral[edge_indices[:, :-1]]
252 # Compute widths between consecutive edges for all series (vectorized)
253 edge_widths = np.diff(edges_processed, axis=1)
255 # Handle zero-width intervals (vectorized)
256 zero_width_mask = edge_widths == 0
257 result = np.zeros_like(edge_widths)
259 # For non-zero width intervals, compute average = integral / width (vectorized)
260 non_zero_mask = ~zero_width_mask
261 result[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask]
263 # For zero-width intervals, interpolate y-value directly (vectorized)
264 if np.any(zero_width_mask):
265 zero_width_positions = edges_processed[:, :-1][zero_width_mask]
266 result[zero_width_mask] = np.interp(zero_width_positions, x_data_clean, y_data_clean)
268 # Handle extrapolation when 'nan' method is used (vectorized)
269 if extrapolate_method == "nan":
270 # Set out-of-range bins to NaN
271 bins_within_range = (x_edges[:, :-1] >= x_data_clean.min()) & (x_edges[:, 1:] <= x_data_clean.max())
272 result[~bins_within_range] = np.nan
274 return result
277def partial_isin(bin_edges_in: npt.ArrayLike, bin_edges_out: npt.ArrayLike) -> np.ndarray:
278 """
279 Calculate the fraction of each input bin that overlaps with each output bin.
281 This function computes a matrix where element (i, j) represents the fraction
282 of input bin i that overlaps with output bin j. The computation uses
283 vectorized operations to avoid loops.
285 Parameters
286 ----------
287 bin_edges_in : array-like
288 1D array of input bin edges in ascending order. For n_in bins, there
289 should be n_in+1 edges.
290 bin_edges_out : array-like
291 1D array of output bin edges in ascending order. For n_out bins, there
292 should be n_out+1 edges.
294 Returns
295 -------
296 overlap_matrix : numpy.ndarray
297 Dense matrix of shape (n_in, n_out) where n_in is the number of input
298 bins and n_out is the number of output bins. Each element (i, j)
299 represents the fraction of input bin i that overlaps with output bin j.
300 Values range from 0 (no overlap) to 1 (complete overlap).
302 Notes
303 -----
304 - Both input arrays must be sorted in ascending order
305 - The function leverages the sorted nature of both inputs for efficiency
306 - Uses vectorized operations to handle large bin arrays efficiently
307 - All overlaps sum to 1.0 for each input bin when output bins fully cover input range
309 Examples
310 --------
311 >>> bin_edges_in = np.array([0, 10, 20, 30])
312 >>> bin_edges_out = np.array([5, 15, 25])
313 >>> partial_isin(bin_edges_in, bin_edges_out)
314 array([[0.5, 0.0],
315 [0.5, 0.5],
316 [0.0, 0.5]])
317 """
318 # Convert inputs to numpy arrays
319 bin_edges_in = np.asarray(bin_edges_in, dtype=float)
320 bin_edges_out = np.asarray(bin_edges_out, dtype=float)
322 # Validate inputs
323 if bin_edges_in.ndim != 1 or bin_edges_out.ndim != 1:
324 msg = "Both bin_edges_in and bin_edges_out must be 1D arrays"
325 raise ValueError(msg)
326 if len(bin_edges_in) < 2 or len(bin_edges_out) < 2: # noqa: PLR2004
327 msg = "Both edge arrays must have at least 2 elements"
328 raise ValueError(msg)
330 # Check ascending order, ignoring NaN values
331 diffs_in = np.diff(bin_edges_in)
332 valid_diffs_in = ~np.isnan(diffs_in)
333 if np.any(valid_diffs_in) and not np.all(diffs_in[valid_diffs_in] > 0):
334 msg = "bin_edges_in must be in ascending order"
335 raise ValueError(msg)
337 diffs_out = np.diff(bin_edges_out)
338 valid_diffs_out = ~np.isnan(diffs_out)
339 if np.any(valid_diffs_out) and not np.all(diffs_out[valid_diffs_out] > 0):
340 msg = "bin_edges_out must be in ascending order"
341 raise ValueError(msg)
343 # Build matrix using fully vectorized approach
344 # Create meshgrids for all possible input-output bin combinations
345 in_left = bin_edges_in[:-1, None] # Shape: (n_bins_in, 1)
346 in_right = bin_edges_in[1:, None] # Shape: (n_bins_in, 1)
347 in_width = np.diff(bin_edges_in)[:, None] # Shape: (n_bins_in, 1)
349 out_left = bin_edges_out[None, :-1] # Shape: (1, n_bins_out)
350 out_right = bin_edges_out[None, 1:] # Shape: (1, n_bins_out)
352 # Calculate overlaps for all combinations using broadcasting
353 overlap_left = np.maximum(in_left, out_left) # Shape: (n_bins_in, n_bins_out)
354 overlap_right = np.minimum(in_right, out_right) # Shape: (n_bins_in, n_bins_out)
356 # Calculate overlap widths (zero where no overlap)
357 overlap_widths = np.maximum(0, overlap_right - overlap_left)
359 # Calculate fractions (NaN widths will result in NaN fractions)
360 return overlap_widths / in_width
363def time_bin_overlap(tedges: npt.ArrayLike, bin_tedges: list[tuple]) -> np.ndarray:
364 """
365 Calculate the fraction of each time bin that overlaps with each time range.
367 This function computes an array where element (i, j) represents the fraction
368 of time bin j that overlaps with time range i. The computation uses
369 vectorized operations to avoid loops.
371 Parameters
372 ----------
373 tedges : array-like
374 1D array of time bin edges in ascending order. For n bins, there
375 should be n+1 edges.
376 bin_tedges : list of tuples
377 List of tuples where each tuple contains (start_time, end_time)
378 defining a time range.
380 Returns
381 -------
382 overlap_array : numpy.ndarray
383 Array of shape (len(bin_tedges), n_bins) where n_bins is the number of
384 time bins. Each element (i, j) represents the fraction of time bin j
385 that overlaps with time range i. Values range from 0 (no overlap) to
386 1 (complete overlap).
388 Notes
389 -----
390 - tedges must be sorted in ascending order
391 - Uses vectorized operations to handle large arrays efficiently
392 - Time ranges in bin_tedges can be in any order and can overlap
394 Examples
395 --------
396 >>> tedges = np.array([0, 10, 20, 30])
397 >>> bin_tedges = [(5, 15), (25, 35)]
398 >>> time_bin_overlap(tedges, bin_tedges)
399 array([[0.5, 0.5, 0.0],
400 [0.0, 0.0, 0.5]])
401 """
402 # Convert inputs to numpy arrays
403 tedges = np.asarray(tedges)
404 bin_tedges_array = np.asarray(bin_tedges)
406 # Validate inputs
407 if tedges.ndim != 1:
408 msg = "tedges must be a 1D array"
409 raise ValueError(msg)
410 if len(tedges) < 2: # noqa: PLR2004
411 msg = "tedges must have at least 2 elements"
412 raise ValueError(msg)
413 if bin_tedges_array.size == 0:
414 msg = "bin_tedges must be non-empty"
415 raise ValueError(msg)
417 # Calculate overlaps for all combinations using broadcasting
418 overlap_left = np.maximum(bin_tedges_array[:, [0]], tedges[None, :-1])
419 overlap_right = np.minimum(bin_tedges_array[:, [1]], tedges[None, 1:])
420 overlap_widths = np.maximum(0, overlap_right - overlap_left)
422 # Calculate fractions (handle division by zero for zero-width bins)
423 bin_width_bc = np.diff(tedges)[None, :] # Shape: (1, n_bins)
425 return np.divide(
426 overlap_widths, bin_width_bc, out=np.zeros_like(overlap_widths, dtype=float), where=bin_width_bc != 0.0
427 )
430def generate_failed_coverage_badge() -> None:
431 """Generate a badge indicating failed coverage."""
432 from genbadge import Badge # type: ignore # noqa: PLC0415
434 b = Badge(left_txt="coverage", right_txt="failed", color="red")
435 b.write_to("coverage_failed.svg", use_shields=False)
438def combine_bin_series(
439 a: npt.ArrayLike,
440 a_edges: npt.ArrayLike,
441 b: npt.ArrayLike,
442 b_edges: npt.ArrayLike,
443 extrapolation: str | float = 0.0,
444) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
445 """
446 Combine two binned series onto a common set of unique edges.
448 This function takes two binned series (a and b) with their respective bin edges
449 and creates new series (c and d) that are defined on a combined set of unique
450 edges from both input edge arrays.
452 Parameters
453 ----------
454 a : array-like
455 Values for the first binned series.
456 a_edges : array-like
457 Bin edges for the first series. Must have len(a) + 1 elements.
458 b : array-like
459 Values for the second binned series.
460 b_edges : array-like
461 Bin edges for the second series. Must have len(b) + 1 elements.
462 extrapolation : str or float, optional
463 Method for handling combined bins that fall outside the original series ranges.
464 - 'nearest': Use the nearest original bin value
465 - float value (e.g., np.nan, 0.0): Fill with the specified value (default: 0.0)
467 Returns
468 -------
469 c : numpy.ndarray
470 Values from series a mapped to the combined edge structure.
471 c_edges : numpy.ndarray
472 Combined unique edges from a_edges and b_edges.
473 d : numpy.ndarray
474 Values from series b mapped to the combined edge structure.
475 d_edges : numpy.ndarray
476 Combined unique edges from a_edges and b_edges (same as c_edges).
478 Notes
479 -----
480 The combined edges are created by taking the union of all unique values
481 from both a_edges and b_edges, sorted in ascending order. The values
482 are then broadcasted/repeated for each combined bin that falls within
483 the original bin's range.
484 """
485 # Convert inputs to numpy arrays
486 a = np.asarray(a, dtype=float)
487 a_edges = np.asarray(a_edges, dtype=float)
488 b = np.asarray(b, dtype=float)
489 b_edges = np.asarray(b_edges, dtype=float)
491 # Validate inputs
492 if len(a_edges) != len(a) + 1:
493 msg = "a_edges must have len(a) + 1 elements"
494 raise ValueError(msg)
495 if len(b_edges) != len(b) + 1:
496 msg = "b_edges must have len(b) + 1 elements"
497 raise ValueError(msg)
499 # Create combined unique edges
500 combined_edges = np.unique(np.concatenate([a_edges, b_edges]))
502 # Initialize output arrays
503 c = np.zeros(len(combined_edges) - 1)
504 d = np.zeros(len(combined_edges) - 1)
506 # Vectorized mapping using searchsorted - find which original bin each combined bin belongs to
507 # For series a: find which original bin each combined bin center falls into
508 combined_bin_centers = (combined_edges[:-1] + combined_edges[1:]) / 2
509 a_bin_assignment = np.searchsorted(a_edges, combined_bin_centers, side="right") - 1
510 a_bin_assignment = np.clip(a_bin_assignment, 0, len(a) - 1)
512 # Handle extrapolation for series a
513 if extrapolation == "nearest":
514 # Assign all values using nearest neighbor (already clipped)
515 c[:] = a[a_bin_assignment]
516 else:
517 # Only assign values where the combined bin is completely within the original bin
518 a_valid_mask = (combined_edges[:-1] >= a_edges[a_bin_assignment]) & (
519 combined_edges[1:] <= a_edges[a_bin_assignment + 1]
520 )
521 c[a_valid_mask] = a[a_bin_assignment[a_valid_mask]]
522 # Fill out-of-range bins with extrapolation value
523 c[~a_valid_mask] = extrapolation
525 # Handle extrapolation for series b
526 b_bin_assignment = np.searchsorted(b_edges, combined_bin_centers, side="right") - 1
527 b_bin_assignment = np.clip(b_bin_assignment, 0, len(b) - 1)
529 if extrapolation == "nearest":
530 # Assign all values using nearest neighbor (already clipped)
531 d[:] = b[b_bin_assignment]
532 else:
533 # Only assign values where the combined bin is completely within the original bin
534 b_valid_mask = (combined_edges[:-1] >= b_edges[b_bin_assignment]) & (
535 combined_edges[1:] <= b_edges[b_bin_assignment + 1]
536 )
537 d[b_valid_mask] = b[b_bin_assignment[b_valid_mask]]
538 # Fill out-of-range bins with extrapolation value
539 d[~b_valid_mask] = extrapolation
541 # Return the combined series
542 c_edges = combined_edges
543 d_edges = combined_edges.copy()
545 return c, c_edges, d, d_edges
548def compute_time_edges(
549 tedges: pd.DatetimeIndex | None,
550 tstart: pd.DatetimeIndex | None,
551 tend: pd.DatetimeIndex | None,
552 number_of_bins: int,
553) -> pd.DatetimeIndex:
554 """
555 Compute time edges for binning data based on provided time parameters.
557 This function creates a DatetimeIndex of time bin edges from one of three possible
558 input formats: explicit edges, start times, or end times. The resulting edges
559 define the boundaries of time intervals for data binning.
561 Define either explicit time edges, or start and end times for each bin and leave the others at None.
563 Parameters
564 ----------
565 tedges : pandas.DatetimeIndex or None
566 Explicit time edges for the bins. If provided, must have one more element
567 than the number of bins (n_bins + 1). Takes precedence over tstart and tend.
568 tstart : pandas.DatetimeIndex or None
569 Start times for each bin. Must have the same number of elements as the
570 number of bins. Used when tedges is None.
571 tend : pandas.DatetimeIndex or None
572 End times for each bin. Must have the same number of elements as the
573 number of bins. Used when both tedges and tstart are None.
574 number_of_bins : int
575 The expected number of time bins. Used for validation against the provided
576 time parameters.
578 Returns
579 -------
580 pandas.DatetimeIndex
581 Time edges defining the boundaries of the time bins. Has one more element
582 than number_of_bins.
584 Raises
585 ------
586 ValueError
587 If tedges has incorrect length (not number_of_bins + 1).
588 If tstart has incorrect length (not equal to number_of_bins).
589 If tend has incorrect length (not equal to number_of_bins).
590 If none of tedges, tstart, or tend are provided.
592 Notes
593 -----
594 - When using tstart, the function assumes uniform spacing and extrapolates
595 the final edge based on the spacing between the last two start times.
596 - When using tend, the function assumes uniform spacing and extrapolates
597 the first edge based on the spacing between the first two end times.
598 - All input time data is converted to pandas.DatetimeIndex for consistency.
599 """
600 if tedges is not None:
601 tedges = pd.DatetimeIndex(tedges)
602 if number_of_bins != len(tedges) - 1:
603 msg = "tedges must have one more element than flow"
604 raise ValueError(msg)
606 elif tstart is not None:
607 # Assume the index refers to the time at the start of the measurement interval
608 tstart = pd.DatetimeIndex(tstart)
609 if number_of_bins != len(tstart):
610 msg = "tstart must have the same number of elements as flow"
611 raise ValueError(msg)
613 tedges = tstart.append(tstart[[-1]] + (tstart[-1] - tstart[-2]))
615 elif tend is not None:
616 # Assume the index refers to the time at the end of the measurement interval
617 tend = pd.DatetimeIndex(tend)
618 if number_of_bins != len(tend):
619 msg = "tend must have the same number of elements as flow"
620 raise ValueError(msg)
622 tedges = (tend[[0]] - (tend[1] - tend[0])).append(tend)
624 else:
625 msg = "Either provide tedges, tstart, and tend"
626 raise ValueError(msg)
627 return tedges
630def get_soil_temperature(station_number: int = 260, *, interpolate_missing_values: bool = True) -> pd.DataFrame:
631 """
632 Download soil temperature data from the KNMI and return it as a pandas DataFrame.
634 The data is available for the following KNMI weather stations:
635 - 260: De Bilt, the Netherlands (vanaf 1981)
636 - 273: Marknesse, the Netherlands (vanaf 1989)
637 - 286: Nieuw Beerta, the Netherlands (vanaf 1990)
638 - 323: Wilhelminadorp, the Netherlands (vanaf 1989)
640 TB1 = grondtemperatuur op 5 cm diepte (graden Celsius) tijdens de waarneming
641 TB2 = grondtemperatuur op 10 cm diepte (graden Celsius) tijdens de waarneming
642 TB3 = grondtemperatuur op 20 cm diepte (graden Celsius) tijdens de waarneming
643 TB4 = grondtemperatuur op 50 cm diepte (graden Celsius) tijdens de waarneming
644 TB5 = grondtemperatuur op 100 cm diepte (graden Celsius) tijdens de waarneming
645 TNB2 = minimum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius)
646 TNB1 = minimum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius)
647 TXB1 = maximum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius)
648 TXB2 = maximum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius)
650 Parameters
651 ----------
652 station_number : int, {260, 273, 286, 323}
653 The KNMI station number for which to download soil temperature data.
654 Default is 260 (De Bilt).
655 interpolate_missing_values : bool, optional
656 If True, missing values are interpolated and recent NaN values are extrapolated with the previous value.
657 If False, missing values remain as NaN. Default is True.
659 Returns
660 -------
661 pandas.DataFrame
662 DataFrame containing soil temperature data in Celsius with a DatetimeIndex.
663 Columns include TB1, TB2, TB3, TB4, TB5, TNB1, TNB2, TXB1, TXB2.
665 Notes
666 -----
667 - KNMI: Royal Netherlands Meteorological Institute
668 - The timeseries may contain NaN values for missing data.
669 """
670 # File-based daily cache
671 cache_dir.mkdir(exist_ok=True)
673 today = date.today().isoformat() # noqa: DTZ011
674 cache_path = cache_dir / f"soil_temp_{station_number}_{interpolate_missing_values}_{today}.pkl"
676 # Check if cached file exists and is from today
677 if cache_path.exists():
678 return pd.read_pickle(cache_path) # noqa: S301
680 # Clean up old cache files to prevent disk bloat
681 for old_file in cache_dir.glob(f"soil_temp_{station_number}_{interpolate_missing_values}_*.pkl"):
682 old_file.unlink(missing_ok=True)
684 url = f"https://cdn.knmi.nl/knmi/map/page/klimatologie/gegevens/bodemtemps/bodemtemps_{station_number}.zip"
686 dtypes = {
687 "YYYYMMDD": "int32",
688 "HH": "int8",
689 " TB1": "float32",
690 " TB3": "float32",
691 " TB2": "float32",
692 " TB4": "float32",
693 " TB5": "float32",
694 " TNB1": "float32",
695 " TNB2": "float32",
696 " TXB1": "float32",
697 " TXB2": "float32",
698 }
700 # Download the ZIP file
701 with requests.get(url, params={"download": "zip"}, timeout=10) as response:
702 response.raise_for_status()
704 df = pd.read_csv(
705 io.BytesIO(response.content),
706 compression="zip",
707 dtype=dtypes,
708 usecols=dtypes.keys(),
709 skiprows=16,
710 sep=",",
711 index_col=None,
712 na_values=[" "],
713 engine="c",
714 parse_dates=False,
715 )
717 df.index = pd.to_datetime(df["YYYYMMDD"].values, format=r"%Y%m%d").tz_localize("UTC") + pd.to_timedelta(
718 df["HH"].values, unit="h"
719 )
721 df.drop(columns=["YYYYMMDD", "HH"], inplace=True)
722 df.columns = df.columns.str.strip()
723 df /= 10.0
725 if interpolate_missing_values:
726 # Fill NaN values with interpolate linearly and then forward fill
727 df.interpolate(method="linear", inplace=True)
728 df.ffill(inplace=True)
730 # Save to cache for future use
731 df.to_pickle(cache_path)
732 return df
735def solve_underdetermined_system(
736 coefficient_matrix,
737 rhs_vector,
738 *,
739 nullspace_objective="squared_differences",
740 optimization_method="BFGS",
741):
742 """
743 Solve an underdetermined linear system with nullspace regularization.
745 For an underdetermined system Ax = b where A has more columns than rows,
746 multiple solutions exist. This function computes a least-squares solution
747 and then selects a specific solution from the nullspace based on a
748 regularization objective.
750 Parameters
751 ----------
752 coefficient_matrix : array-like
753 Coefficient matrix of shape (m, n) where m < n (underdetermined).
754 May contain NaN values in some rows, which will be excluded from the system.
755 rhs_vector : array-like
756 Right-hand side vector of length m. May contain NaN values corresponding
757 to NaN rows in coefficient_matrix, which will be excluded from the system.
758 nullspace_objective : str or callable, optional
759 Objective function to minimize in the nullspace. Options:
761 * "squared_differences" : Minimize sum of squared differences between
762 adjacent elements: ``sum((x[i+1] - x[i])**2)``
763 * "summed_differences" : Minimize sum of absolute differences between
764 adjacent elements: ``sum(|x[i+1] - x[i]|)``
765 * callable : Custom objective function with signature
766 ``objective(coeffs, x_ls, nullspace_basis)`` where:
768 - coeffs : optimization variables (nullspace coefficients)
769 - x_ls : least-squares solution
770 - nullspace_basis : nullspace basis matrix
772 Default is "squared_differences".
773 optimization_method : str, optional
774 Optimization method passed to scipy.optimize.minimize.
775 Default is "BFGS".
777 Returns
778 -------
779 numpy.ndarray
780 Solution vector that minimizes the specified nullspace objective.
781 Has length n (number of columns in coefficient_matrix).
783 Raises
784 ------
785 ValueError
786 If optimization fails, if coefficient_matrix and rhs_vector have incompatible shapes,
787 or if an unknown nullspace objective is specified.
789 Notes
790 -----
791 The algorithm follows these steps:
793 1. Remove rows with NaN values from both coefficient_matrix and rhs_vector
794 2. Compute least-squares solution: x_ls = pinv(valid_matrix) @ valid_rhs
795 3. Compute nullspace basis: N = null_space(valid_matrix)
796 4. Find nullspace coefficients: coeffs = argmin objective(x_ls + N @ coeffs)
797 5. Return final solution: x = x_ls + N @ coeffs
799 For the built-in objectives:
801 * "squared_differences" provides smooth solutions, minimizing rapid changes
802 * "summed_differences" provides sparse solutions, promoting piecewise constant behavior
804 Examples
805 --------
806 Basic usage with default squared differences objective:
808 >>> import numpy as np
809 >>> from gwtransport.utils import solve_underdetermined_system
810 >>>
811 >>> # Create underdetermined system (2 equations, 4 unknowns)
812 >>> matrix = np.array([[1, 2, 1, 0], [0, 1, 2, 1]])
813 >>> rhs = np.array([3, 4])
814 >>>
815 >>> # Solve with squared differences regularization
816 >>> x = solve_underdetermined_system(matrix, rhs)
817 >>> print(f"Solution: {x}")
818 >>> print(f"Residual: {np.linalg.norm(matrix @ x - rhs):.2e}")
820 With summed differences objective:
822 >>> x_sparse = solve_underdetermined_system(
823 ... matrix, rhs, nullspace_objective="summed_differences"
824 ... )
826 With custom objective function:
828 >>> def custom_objective(coeffs, x_ls, nullspace_basis):
829 ... x = x_ls + nullspace_basis @ coeffs
830 ... return np.sum(x**2) # Minimize L2 norm
831 >>>
832 >>> x_custom = solve_underdetermined_system(
833 ... matrix, rhs, nullspace_objective=custom_objective
834 ... )
836 Handling NaN values:
838 >>> # System with missing data
839 >>> matrix_nan = np.array([
840 ... [1, 2, 1, 0],
841 ... [np.nan, np.nan, np.nan, np.nan],
842 ... [0, 1, 2, 1],
843 ... ])
844 >>> rhs_nan = np.array([3, np.nan, 4])
845 >>>
846 >>> x_nan = solve_underdetermined_system(matrix_nan, rhs_nan)
847 """
848 matrix = np.asarray(coefficient_matrix)
849 rhs = np.asarray(rhs_vector)
851 if matrix.shape[0] != len(rhs):
852 msg = f"coefficient_matrix has {matrix.shape[0]} rows but rhs_vector has {len(rhs)} elements"
853 raise ValueError(msg)
855 # Identify valid rows (no NaN values in either matrix or rhs)
856 valid_rows = ~np.isnan(matrix).any(axis=1) & ~np.isnan(rhs)
858 if not np.any(valid_rows):
859 msg = "No valid rows found (all contain NaN values)"
860 raise ValueError(msg)
862 valid_matrix = matrix[valid_rows]
863 valid_rhs = rhs[valid_rows]
865 # Compute least-squares solution
866 x_ls, *_ = np.linalg.lstsq(valid_matrix, valid_rhs, rcond=None)
868 # Compute nullspace
869 nullspace_basis = null_space(valid_matrix, rcond=None)
870 nullrank = nullspace_basis.shape[1]
872 if nullrank == 0:
873 # System is determined, return least-squares solution
874 return x_ls
876 # Optimize in nullspace
877 coeffs = _optimize_nullspace_coefficients(x_ls, nullspace_basis, nullspace_objective, optimization_method)
879 return x_ls + nullspace_basis @ coeffs
882def _optimize_nullspace_coefficients(
883 x_ls: np.ndarray, nullspace_basis: np.ndarray, nullspace_objective: str, optimization_method: str
884) -> np.ndarray:
885 """Optimize coefficients in the nullspace to minimize the objective."""
886 nullrank = nullspace_basis.shape[1]
887 objective_func = _get_nullspace_objective_function(nullspace_objective)
888 coeffs_0 = np.zeros(nullrank)
890 # For stability, always start with squared differences if using summed differences
891 if nullspace_objective == "summed_differences":
892 res_init = minimize(
893 _squared_differences_objective,
894 x0=coeffs_0,
895 args=(x_ls, nullspace_basis),
896 method=optimization_method,
897 )
898 if not res_init.success:
899 msg = f"Initial optimization failed: {res_init.message}"
900 raise ValueError(msg)
901 coeffs_0 = res_init.x
903 # Final optimization with target objective
904 res = minimize(
905 objective_func,
906 x0=coeffs_0,
907 args=(x_ls, nullspace_basis),
908 method=optimization_method,
909 )
911 if not res.success:
912 msg = f"Optimization failed: {res.message}"
913 raise ValueError(msg)
915 return res.x
918def _squared_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float:
919 """Minimize sum of squared differences between adjacent elements."""
920 x = x_ls + nullspace_basis @ coeffs
921 return np.sum(np.square(x[1:] - x[:-1]))
924def _summed_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float:
925 """Minimize sum of absolute differences between adjacent elements."""
926 x = x_ls + nullspace_basis @ coeffs
927 return np.sum(np.abs(x[1:] - x[:-1]))
930def _get_nullspace_objective_function(nullspace_objective: str):
931 """Get the objective function for nullspace optimization."""
932 if nullspace_objective == "squared_differences":
933 return _squared_differences_objective
934 if nullspace_objective == "summed_differences":
935 return _summed_differences_objective
936 if callable(nullspace_objective):
937 return nullspace_objective
938 msg = f"Unknown nullspace objective: {nullspace_objective}"
939 raise ValueError(msg)