Coverage for src / gwtransport / utils.py: 83%
276 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
1"""
2General Utilities for 1D Groundwater Transport Modeling.
4This module provides general-purpose utility functions for time series manipulation,
5interpolation, numerical operations, and data processing used throughout the gwtransport
6package. Functions include linear interpolation/averaging, bin overlap calculations,
7underdetermined system solvers, and external data retrieval.
9Available functions:
11- :func:`linear_interpolate` - Linear interpolation using numpy's optimized interp function.
12 Automatically handles unsorted data with configurable extrapolation (None for clamping,
13 float for constant values). Handles multi-dimensional query arrays.
15- :func:`interp_series` - Interpolate pandas Series to new DatetimeIndex using
16 scipy.interpolate.interp1d. Automatically filters NaN values and converts datetime to
17 numerical representation.
19- :func:`linear_average` - Compute average values of piecewise linear time series between
20 specified x-edges. Supports 1D or 2D edge arrays for batch processing. Handles NaN values
21 and offers multiple extrapolation methods ('nan', 'outer', 'raise').
23- :func:`diff` - Compute cell widths from cell coordinate arrays with configurable alignment
24 ('centered', 'left', 'right'). Returns widths matching input array length.
26- :func:`partial_isin` - Calculate fraction of each input bin overlapping with each output bin.
27 Returns dense matrix where element (i,j) represents overlap fraction. Uses vectorized
28 operations for efficiency.
30- :func:`time_bin_overlap` - Calculate fraction of time bins overlapping with specified time
31 ranges. Similar to partial_isin but for time-based bin overlaps with list of (start, end)
32 tuples.
34- :func:`combine_bin_series` - Combine two binned series onto common set of unique edges. Maps
35 values from original bins to new combined structure with configurable extrapolation ('nearest'
36 or float value).
38- :func:`compute_time_edges` - Compute DatetimeIndex of time bin edges from explicit edges,
39 start times, or end times. Validates consistency with expected number of bins and handles
40 uniform spacing extrapolation.
42- :func:`solve_underdetermined_system` - Solve underdetermined linear system (Ax = b, m < n)
43 with nullspace regularization. Handles NaN values by row exclusion. Supports built-in
44 objectives ('squared_differences', 'summed_differences') or custom callable objectives.
46- :func:`get_soil_temperature` - Download soil temperature data from KNMI weather stations with
47 automatic caching. Supports stations 260 (De Bilt), 273 (Marknesse), 286 (Nieuw Beerta),
48 323 (Wilhelminadorp). Returns DataFrame with columns TB1-TB5, TNB1-TNB2, TXB1-TXB2 at various
49 depths. Daily cache prevents redundant downloads.
51- :func:`generate_failed_coverage_badge` - Generate SVG badge indicating failed coverage using
52 genbadge library. Used in CI/CD workflows.
54This file is part of gwtransport which is released under AGPL-3.0 license.
55See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
56"""
58from __future__ import annotations
60import io
61from collections.abc import Callable
62from datetime import date
63from pathlib import Path
64from typing import cast
66import numpy as np
67import numpy.typing as npt
68import pandas as pd
69import requests
70from scipy import interpolate
71from scipy.linalg import null_space
72from scipy.optimize import minimize
74cache_dir = Path(__file__).parent.parent.parent / "cache"
77def linear_interpolate(
78 *,
79 x_ref: npt.ArrayLike,
80 y_ref: npt.ArrayLike,
81 x_query: npt.ArrayLike,
82 left: float | None = None,
83 right: float | None = None,
84) -> npt.NDArray[np.floating]:
85 """
86 Linear interpolation using numpy's optimized interp function.
88 Automatically handles unsorted reference data by sorting it first.
90 Parameters
91 ----------
92 x_ref : array-like
93 Reference x-values. If unsorted, will be automatically sorted.
94 y_ref : array-like
95 Reference y-values corresponding to x_ref.
96 x_query : array-like
97 Query x-values where interpolation is needed. Array may have any shape.
98 left : float, optional
99 Value to return for x_query < x_ref[0].
101 - If ``left=None``: clamp to y_ref[0] (default)
102 - If ``left=float``: use specified value (e.g., ``np.nan``)
104 right : float, optional
105 Value to return for x_query > x_ref[-1].
107 - If ``right=None``: clamp to y_ref[-1] (default)
108 - If ``right=float``: use specified value (e.g., ``np.nan``)
110 Returns
111 -------
112 numpy.ndarray
113 Interpolated y-values with the same shape as x_query.
115 Examples
116 --------
117 Basic interpolation with clamping (default):
119 >>> import numpy as np
120 >>> from gwtransport.utils import linear_interpolate
121 >>> x_ref = np.array([1.0, 2.0, 3.0, 4.0])
122 >>> y_ref = np.array([10.0, 20.0, 30.0, 40.0])
123 >>> x_query = np.array([0.5, 1.5, 2.5, 3.5, 4.5])
124 >>> linear_interpolate(x_ref=x_ref, y_ref=y_ref, x_query=x_query)
125 array([10., 15., 25., 35., 40.])
127 Using NaN for extrapolation:
129 >>> linear_interpolate(
130 ... x_ref=x_ref, y_ref=y_ref, x_query=x_query, left=np.nan, right=np.nan
131 ... )
132 array([nan, 15., 25., 35., nan])
134 Handles unsorted reference data automatically:
136 >>> x_unsorted = np.array([3.0, 1.0, 4.0, 2.0])
137 >>> y_unsorted = np.array([30.0, 10.0, 40.0, 20.0])
138 >>> linear_interpolate(x_ref=x_unsorted, y_ref=y_unsorted, x_query=x_query)
139 array([10., 15., 25., 35., 40.])
141 See Also
142 --------
143 interp_series : Interpolate pandas Series with datetime index
144 """
145 # Convert inputs to arrays
146 x_ref = np.asarray(x_ref)
147 y_ref = np.asarray(y_ref)
148 x_query = np.asarray(x_query)
150 # Sort reference data to ensure monotonic ordering
151 sort_idx = np.argsort(x_ref)
152 x_ref_sorted = x_ref[sort_idx]
153 y_ref_sorted = y_ref[sort_idx]
155 # Default behavior (left=None, right=None) clamps to boundary values
156 return np.interp(x_query, x_ref_sorted, y_ref_sorted, left=left, right=right)
159def interp_series(*, series: pd.Series, index_new: pd.DatetimeIndex, **interp1d_kwargs: object) -> pd.Series:
160 """
161 Interpolate a pandas.Series to a new index.
163 Parameters
164 ----------
165 series : pandas.Series
166 Series to interpolate.
167 index_new : pandas.DatetimeIndex
168 New index to interpolate to.
169 interp1d_kwargs : dict, optional
170 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}.
172 Returns
173 -------
174 pandas.Series
175 Interpolated series.
176 """
177 series = series[series.index.notna() & series.notna()]
178 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D")
179 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D")
180 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs)
181 return pd.Series(interp_obj(dt_interp), index=index_new)
184def diff(*, a: npt.ArrayLike, alignment: str = "centered") -> npt.NDArray[np.floating]:
185 """Compute the cell widths for a given array of cell coordinates.
187 If alignment is "centered", the coordinates are assumed to be centered in the cells.
188 If alignment is "left", the coordinates are assumed to be at the left edge of the cells.
189 If alignment is "right", the coordinates are assumed to be at the right edge of the cells.
191 Parameters
192 ----------
193 a : array-like
194 Input array.
196 Returns
197 -------
198 numpy.ndarray
199 Array with differences between elements.
200 """
201 # Convert input to array
202 a = np.asarray(a)
204 if alignment == "centered":
205 mid = a[:-1] + (a[1:] - a[:-1]) / 2
206 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]]))
207 if alignment == "left":
208 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]]))
209 if alignment == "right":
210 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1]))
212 msg = f"Invalid alignment: {alignment}"
213 raise ValueError(msg)
216def linear_average(
217 *,
218 x_data: npt.ArrayLike,
219 y_data: npt.ArrayLike,
220 x_edges: npt.ArrayLike,
221 extrapolate_method: str = "nan",
222) -> npt.NDArray[np.float64]:
223 """
224 Compute the average value of a piecewise linear time series between specified x-edges.
226 Parameters
227 ----------
228 x_data : array-like
229 x-coordinates of the time series data points, must be in ascending order
230 y_data : array-like
231 y-coordinates of the time series data points
232 x_edges : array-like
233 x-coordinates of the integration edges. Can be 1D or 2D.
234 - If 1D: shape (n_edges,). Can be 1D or 2D.
235 - If 1D: shape (n_edges,), must be in ascending order
236 - If 2D: shape (n_series, n_edges), each row must be in ascending order
237 - If 2D: shape (n_series, n_edges), each row must be in ascending order
238 extrapolate_method : str, optional
239 Method for handling extrapolation. Default is 'nan'.
240 - 'outer': Extrapolate using the outermost data points.
241 - 'nan': Extrapolate using np.nan.
242 - 'raise': Raise an error for out-of-bounds values.
244 Returns
245 -------
246 numpy.ndarray
247 2D array of average values between consecutive pairs of x_edges.
248 Shape is (n_series, n_bins) where n_bins = n_edges - 1.
249 If x_edges is 1D, n_series = 1.
251 Examples
252 --------
253 >>> import numpy as np
254 >>> from gwtransport.utils import linear_average
255 >>> x_data = [0, 1, 2, 3]
256 >>> y_data = [0, 1, 1, 0]
257 >>> x_edges = [0, 1.5, 3]
258 >>> linear_average(
259 ... x_data=x_data, y_data=y_data, x_edges=x_edges
260 ... ) # doctest: +ELLIPSIS
261 array([[0.666..., 0.666...]])
263 >>> x_edges_2d = [[0, 1.5, 3], [0.5, 2, 3]]
264 >>> linear_average(x_data=x_data, y_data=y_data, x_edges=x_edges_2d)
265 array([[0.66666667, 0.66666667],
266 [0.91666667, 0.5 ]])
267 """
268 # Convert inputs to numpy arrays
269 x_data = np.asarray(x_data, dtype=float)
270 y_data = np.asarray(y_data, dtype=float)
271 x_edges = np.asarray(x_edges, dtype=float)
273 # Ensure x_edges is always 2D
274 if x_edges.ndim == 1:
275 x_edges = x_edges[np.newaxis, :]
276 elif x_edges.ndim != 2: # noqa: PLR2004
277 msg = "x_edges must be 1D or 2D array"
278 raise ValueError(msg)
280 # Input validation
281 if len(x_data) != len(y_data) or len(x_data) == 0:
282 msg = "x_data and y_data must have the same length and be non-empty"
283 raise ValueError(msg)
284 if x_edges.shape[1] < 2: # noqa: PLR2004
285 msg = "x_edges must contain at least 2 values in each row"
286 raise ValueError(msg)
287 if not np.all(np.diff(x_data) >= 0):
288 msg = "x_data must be in ascending order"
289 raise ValueError(msg)
290 if not np.all(np.diff(x_edges, axis=1) >= 0):
291 msg = "x_edges must be in ascending order along each row"
292 raise ValueError(msg)
294 # Filter out NaN values
295 show = ~np.isnan(x_data) & ~np.isnan(y_data)
296 if show.sum() < 2: # noqa: PLR2004
297 if show.sum() == 1 and extrapolate_method == "outer":
298 # For single data point with outer extrapolation, use constant value
299 constant_value = y_data[show][0]
300 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=constant_value)
301 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=np.nan)
303 x_data_clean = x_data[show]
304 y_data_clean = y_data[show]
306 # Handle extrapolation for all series at once (vectorized)
307 if extrapolate_method == "outer":
308 edges_processed = np.clip(x_edges, x_data_clean[0], x_data_clean[-1])
309 elif extrapolate_method == "raise":
310 if np.any(x_edges < x_data_clean[0]) or np.any(x_edges > x_data_clean[-1]):
311 msg = "x_edges must be within the range of x_data"
312 raise ValueError(msg)
313 edges_processed = x_edges.copy()
314 else: # nan method
315 edges_processed = x_edges.copy()
317 # Create a combined grid of all unique x points (data + all edges)
318 all_unique_x = np.unique(np.concatenate([x_data_clean, edges_processed.ravel()]))
320 # Interpolate y values at all unique x points once
321 all_unique_y_result = np.interp(all_unique_x, x_data_clean, y_data_clean, left=np.nan, right=np.nan)
322 # Ensure it's an array for type checker
323 all_unique_y: npt.NDArray[np.float64] = np.asarray(all_unique_y_result, dtype=np.float64)
325 # Compute cumulative integrals once using trapezoidal rule
326 dx = np.diff(all_unique_x)
327 y_avg = (all_unique_y[:-1] + all_unique_y[1:]) / 2
328 segment_integrals = dx * y_avg
329 # Replace NaN values with 0 to avoid breaking cumulative sum.
330 # NaN occurs for segments outside the data range (extrapolation='nan').
331 # Setting NaN to 0 means boundary bins with partial data coverage have their
332 # average biased low: the integral covers only the valid portion but the
333 # width denominator below uses the full bin width. A proper fix would track
334 # the covered fraction per bin.
335 segment_integrals = np.nan_to_num(segment_integrals, nan=0.0)
336 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)])
338 # Vectorized computation for all series
339 # Find indices of all edges in the combined grid
340 edge_indices_result = np.searchsorted(all_unique_x, edges_processed)
341 # Ensure it's a 2D array for type checker
342 edge_indices: npt.NDArray[np.intp] = np.asarray(edge_indices_result, dtype=np.intp).reshape(edges_processed.shape)
344 # Compute integral between consecutive edges for all series (vectorized)
345 integral_values = cumulative_integral[edge_indices[:, 1:]] - cumulative_integral[edge_indices[:, :-1]]
347 # Compute widths between consecutive edges for all series (vectorized)
348 edge_widths = np.diff(edges_processed, axis=1)
350 # Handle zero-width intervals (vectorized)
351 zero_width_mask = edge_widths == 0
352 result = np.zeros_like(edge_widths)
354 # For non-zero width intervals, compute average = integral / width (vectorized)
355 non_zero_mask = ~zero_width_mask
356 result[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask]
358 # For zero-width intervals, interpolate y-value directly (vectorized)
359 if np.any(zero_width_mask):
360 zero_width_positions = edges_processed[:, :-1][zero_width_mask]
361 result[zero_width_mask] = np.interp(zero_width_positions, x_data_clean, y_data_clean)
363 # Handle extrapolation when 'nan' method is used (vectorized)
364 if extrapolate_method == "nan":
365 # Set out-of-range bins to NaN
366 bins_within_range = (x_edges[:, :-1] >= x_data_clean[0]) & (x_edges[:, 1:] <= x_data_clean[-1])
367 result[~bins_within_range] = np.nan
369 return result
372def partial_isin(*, bin_edges_in: npt.ArrayLike, bin_edges_out: npt.ArrayLike) -> npt.NDArray[np.floating]:
373 """
374 Calculate the fraction of each input bin that overlaps with each output bin.
376 This function computes a matrix where element (i, j) represents the fraction
377 of input bin i that overlaps with output bin j. The computation uses
378 vectorized operations to avoid loops.
380 Parameters
381 ----------
382 bin_edges_in : array-like
383 1D array of input bin edges in ascending order. For n_in bins, there
384 should be n_in+1 edges.
385 bin_edges_out : array-like
386 1D array of output bin edges in ascending order. For n_out bins, there
387 should be n_out+1 edges.
389 Returns
390 -------
391 overlap_matrix : numpy.ndarray
392 Dense matrix of shape (n_in, n_out) where n_in is the number of input
393 bins and n_out is the number of output bins. Each element (i, j)
394 represents the fraction of input bin i that overlaps with output bin j.
395 Values range from 0 (no overlap) to 1 (complete overlap).
397 Notes
398 -----
399 - Both input arrays must be sorted in ascending order
400 - The function leverages the sorted nature of both inputs for efficiency
401 - Uses vectorized operations to handle large bin arrays efficiently
402 - All overlaps sum to 1.0 for each input bin when output bins fully cover input range
404 Examples
405 --------
406 >>> import numpy as np
407 >>> from gwtransport.utils import partial_isin
408 >>> bin_edges_in = np.array([0, 10, 20, 30])
409 >>> bin_edges_out = np.array([5, 15, 25])
410 >>> partial_isin(
411 ... bin_edges_in=bin_edges_in, bin_edges_out=bin_edges_out
412 ... ) # doctest: +NORMALIZE_WHITESPACE
413 array([[0.5, 0. ],
414 [0.5, 0.5],
415 [0. , 0.5]])
416 """
417 # Convert inputs to numpy arrays
418 bin_edges_in = np.asarray(bin_edges_in, dtype=float)
419 bin_edges_out = np.asarray(bin_edges_out, dtype=float)
421 # Validate inputs
422 if bin_edges_in.ndim != 1 or bin_edges_out.ndim != 1:
423 msg = "Both bin_edges_in and bin_edges_out must be 1D arrays"
424 raise ValueError(msg)
425 if len(bin_edges_in) < 2 or len(bin_edges_out) < 2: # noqa: PLR2004
426 msg = "Both edge arrays must have at least 2 elements"
427 raise ValueError(msg)
429 # Check ascending order, ignoring NaN values
430 diffs_in = np.diff(bin_edges_in)
431 valid_diffs_in = ~np.isnan(diffs_in)
432 if np.any(valid_diffs_in) and not np.all(diffs_in[valid_diffs_in] > 0):
433 msg = "bin_edges_in must be in ascending order"
434 raise ValueError(msg)
436 diffs_out = np.diff(bin_edges_out)
437 valid_diffs_out = ~np.isnan(diffs_out)
438 if np.any(valid_diffs_out) and not np.all(diffs_out[valid_diffs_out] > 0):
439 msg = "bin_edges_out must be in ascending order"
440 raise ValueError(msg)
442 # Build matrix using fully vectorized approach
443 # Create meshgrids for all possible input-output bin combinations
444 in_left = bin_edges_in[:-1, None] # Shape: (n_bins_in, 1)
445 in_right = bin_edges_in[1:, None] # Shape: (n_bins_in, 1)
446 in_width = np.diff(bin_edges_in)[:, None] # Shape: (n_bins_in, 1)
448 out_left = bin_edges_out[None, :-1] # Shape: (1, n_bins_out)
449 out_right = bin_edges_out[None, 1:] # Shape: (1, n_bins_out)
451 # Calculate overlaps for all combinations using broadcasting
452 overlap_left = np.maximum(in_left, out_left) # Shape: (n_bins_in, n_bins_out)
453 overlap_right = np.minimum(in_right, out_right) # Shape: (n_bins_in, n_bins_out)
455 # Calculate overlap widths (zero where no overlap)
456 overlap_widths = np.maximum(0, overlap_right - overlap_left)
458 # Calculate fractions (NaN widths will result in NaN fractions)
459 return overlap_widths / in_width
462def time_bin_overlap(*, tedges: npt.ArrayLike, bin_tedges: list[tuple]) -> npt.NDArray[np.floating]:
463 """
464 Calculate the fraction of each time bin that overlaps with each time range.
466 This function computes an array where element (i, j) represents the fraction
467 of time bin j that overlaps with time range i. The computation uses
468 vectorized operations to avoid loops.
470 Parameters
471 ----------
472 tedges : array-like
473 1D array of time bin edges in ascending order. For n bins, there
474 should be n+1 edges.
475 bin_tedges : list of tuple
476 List of tuples where each tuple contains ``(start_time, end_time)``
477 defining a time range.
479 Returns
480 -------
481 overlap_array : numpy.ndarray
482 Array of shape (len(bin_tedges), n_bins) where n_bins is the number of
483 time bins. Each element (i, j) represents the fraction of time bin j
484 that overlaps with time range i. Values range from 0 (no overlap) to
485 1 (complete overlap).
487 Notes
488 -----
489 - tedges must be sorted in ascending order
490 - Uses vectorized operations to handle large arrays efficiently
491 - Time ranges in bin_tedges can be in any order and can overlap
493 Examples
494 --------
495 >>> import numpy as np
496 >>> from gwtransport.utils import time_bin_overlap
497 >>> tedges = np.array([0, 10, 20, 30])
498 >>> bin_tedges = [(5, 15), (25, 35)]
499 >>> time_bin_overlap(
500 ... tedges=tedges, bin_tedges=bin_tedges
501 ... ) # doctest: +NORMALIZE_WHITESPACE
502 array([[0.5, 0.5, 0. ],
503 [0. , 0. , 0.5]])
504 """
505 # Convert inputs to numpy arrays
506 tedges = np.asarray(tedges)
507 bin_tedges_array = np.asarray(bin_tedges)
509 # Validate inputs
510 if tedges.ndim != 1:
511 msg = "tedges must be a 1D array"
512 raise ValueError(msg)
513 if len(tedges) < 2: # noqa: PLR2004
514 msg = "tedges must have at least 2 elements"
515 raise ValueError(msg)
516 if bin_tedges_array.size == 0:
517 msg = "bin_tedges must be non-empty"
518 raise ValueError(msg)
520 # Calculate overlaps for all combinations using broadcasting
521 overlap_left = np.maximum(bin_tedges_array[:, [0]], tedges[None, :-1])
522 overlap_right = np.minimum(bin_tedges_array[:, [1]], tedges[None, 1:])
523 overlap_widths = np.maximum(0, overlap_right - overlap_left)
525 # Calculate fractions (handle division by zero for zero-width bins)
526 bin_width_bc = np.diff(tedges)[None, :] # Shape: (1, n_bins)
528 return np.divide(
529 overlap_widths, bin_width_bc, out=np.zeros_like(overlap_widths, dtype=float), where=bin_width_bc != 0.0
530 )
533def generate_failed_coverage_badge() -> None:
534 """Generate a badge indicating failed coverage."""
535 from genbadge import Badge # type: ignore # noqa: PLC0415
537 b = Badge(left_txt="coverage", right_txt="failed", color="red")
538 b.write_to("coverage_failed.svg", use_shields=False)
541def combine_bin_series(
542 *,
543 a: npt.ArrayLike,
544 a_edges: npt.ArrayLike,
545 b: npt.ArrayLike,
546 b_edges: npt.ArrayLike,
547 extrapolation: str | float = 0.0,
548) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
549 """
550 Combine two binned series onto a common set of unique edges.
552 This function takes two binned series (a and b) with their respective bin edges
553 and creates new series (c and d) that are defined on a combined set of unique
554 edges from both input edge arrays.
556 Parameters
557 ----------
558 a : array-like
559 Values for the first binned series.
560 a_edges : array-like
561 Bin edges for the first series. Must have len(a) + 1 elements.
562 b : array-like
563 Values for the second binned series.
564 b_edges : array-like
565 Bin edges for the second series. Must have len(b) + 1 elements.
566 extrapolation : str or float, optional
567 Method for handling combined bins that fall outside the original series ranges.
568 - 'nearest': Use the nearest original bin value
569 - float value (e.g., np.nan, 0.0): Fill with the specified value (default: 0.0)
571 Returns
572 -------
573 c : numpy.ndarray
574 Values from series a mapped to the combined edge structure.
575 c_edges : numpy.ndarray
576 Combined unique edges from a_edges and b_edges.
577 d : numpy.ndarray
578 Values from series b mapped to the combined edge structure.
579 d_edges : numpy.ndarray
580 Combined unique edges from a_edges and b_edges (same as c_edges).
582 Notes
583 -----
584 The combined edges are created by taking the union of all unique values
585 from both a_edges and b_edges, sorted in ascending order. The values
586 are then broadcasted/repeated for each combined bin that falls within
587 the original bin's range.
588 """
589 # Convert inputs to numpy arrays
590 a = np.asarray(a, dtype=float)
591 a_edges = np.asarray(a_edges, dtype=float)
592 b = np.asarray(b, dtype=float)
593 b_edges = np.asarray(b_edges, dtype=float)
595 # Validate inputs
596 if len(a_edges) != len(a) + 1:
597 msg = "a_edges must have len(a) + 1 elements"
598 raise ValueError(msg)
599 if len(b_edges) != len(b) + 1:
600 msg = "b_edges must have len(b) + 1 elements"
601 raise ValueError(msg)
603 # Create combined unique edges
604 combined_edges = np.unique(np.concatenate([a_edges, b_edges]))
606 # Initialize output arrays
607 c = np.zeros(len(combined_edges) - 1)
608 d = np.zeros(len(combined_edges) - 1)
610 # Vectorized mapping using searchsorted - find which original bin each combined bin belongs to
611 # For series a: find which original bin each combined bin center falls into
612 combined_bin_centers = (combined_edges[:-1] + combined_edges[1:]) / 2
613 a_bin_assignment_result = np.searchsorted(a_edges, combined_bin_centers, side="right") - 1
614 # Ensure it's an array for type checker
615 a_bin_assignment: npt.NDArray[np.intp] = np.asarray(a_bin_assignment_result, dtype=np.intp)
616 a_bin_assignment = np.clip(a_bin_assignment, 0, len(a) - 1)
618 # Handle extrapolation for series a
619 if extrapolation == "nearest":
620 # Assign all values using nearest neighbor (already clipped)
621 c[:] = a[a_bin_assignment]
622 else:
623 # Only assign values where the combined bin is completely within the original bin
624 a_valid_mask = (combined_edges[:-1] >= a_edges[a_bin_assignment]) & (
625 combined_edges[1:] <= a_edges[a_bin_assignment + 1]
626 )
627 c[a_valid_mask] = a[a_bin_assignment[a_valid_mask]]
628 # Fill out-of-range bins with extrapolation value
629 c[~a_valid_mask] = extrapolation
631 # Handle extrapolation for series b
632 b_bin_assignment_result = np.searchsorted(b_edges, combined_bin_centers, side="right") - 1
633 # Ensure it's an array for type checker
634 b_bin_assignment: npt.NDArray[np.intp] = np.asarray(b_bin_assignment_result, dtype=np.intp)
635 b_bin_assignment = np.clip(b_bin_assignment, 0, len(b) - 1)
637 if extrapolation == "nearest":
638 # Assign all values using nearest neighbor (already clipped)
639 d[:] = b[b_bin_assignment]
640 else:
641 # Only assign values where the combined bin is completely within the original bin
642 b_valid_mask = (combined_edges[:-1] >= b_edges[b_bin_assignment]) & (
643 combined_edges[1:] <= b_edges[b_bin_assignment + 1]
644 )
645 d[b_valid_mask] = b[b_bin_assignment[b_valid_mask]]
646 # Fill out-of-range bins with extrapolation value
647 d[~b_valid_mask] = extrapolation
649 # Return the combined series
650 c_edges = combined_edges
651 d_edges = combined_edges.copy()
653 return c, c_edges, d, d_edges
656def compute_time_edges(
657 *,
658 tedges: pd.DatetimeIndex | None,
659 tstart: pd.DatetimeIndex | None,
660 tend: pd.DatetimeIndex | None,
661 number_of_bins: int,
662) -> pd.DatetimeIndex:
663 """
664 Compute time edges for binning data based on provided time parameters.
666 This function creates a DatetimeIndex of time bin edges from one of three possible
667 input formats: explicit edges, start times, or end times. The resulting edges
668 define the boundaries of time intervals for data binning.
670 Define either explicit time edges, or start and end times for each bin and leave the others at None.
672 Parameters
673 ----------
674 tedges : pandas.DatetimeIndex or None
675 Explicit time edges for the bins. If provided, must have one more element
676 than the number of bins (n_bins + 1). Takes precedence over tstart and tend.
677 tstart : pandas.DatetimeIndex or None
678 Start times for each bin. Must have the same number of elements as the
679 number of bins. Used when tedges is None.
680 tend : pandas.DatetimeIndex or None
681 End times for each bin. Must have the same number of elements as the
682 number of bins. Used when both tedges and tstart are None.
683 number_of_bins : int
684 The expected number of time bins. Used for validation against the provided
685 time parameters.
687 Returns
688 -------
689 pandas.DatetimeIndex
690 Time edges defining the boundaries of the time bins. Has one more element
691 than number_of_bins.
693 Raises
694 ------
695 ValueError
696 If tedges has incorrect length (not number_of_bins + 1).
697 If tstart has incorrect length (not equal to number_of_bins).
698 If tend has incorrect length (not equal to number_of_bins).
699 If none of tedges, tstart, or tend are provided.
701 Notes
702 -----
703 - When using tstart, the function assumes uniform spacing and extrapolates
704 the final edge based on the spacing between the last two start times.
705 - When using tend, the function assumes uniform spacing and extrapolates
706 the first edge based on the spacing between the first two end times.
707 - All input time data is converted to pandas.DatetimeIndex for consistency.
708 """
709 if tedges is not None:
710 if number_of_bins != len(tedges) - 1:
711 msg = "tedges must have one more element than flow"
712 raise ValueError(msg)
713 tedges = pd.DatetimeIndex(tedges)
714 # Ensure nanosecond precision while preserving timezone
715 return tedges.as_unit("ns")
717 if tstart is not None:
718 # Assume the index refers to the time at the start of the measurement interval
719 tstart = pd.DatetimeIndex(tstart).as_unit("ns")
720 if number_of_bins != len(tstart):
721 msg = "tstart must have the same number of elements as flow"
722 raise ValueError(msg)
724 # Extrapolate final edge using uniform spacing
725 final_edge = tstart[-1] + (tstart[-1] - tstart[-2])
726 return pd.DatetimeIndex([*list(tstart), final_edge], dtype=tstart.dtype)
728 if tend is not None:
729 # Assume the index refers to the time at the end of the measurement interval
730 tend = pd.DatetimeIndex(tend).as_unit("ns")
731 if number_of_bins != len(tend):
732 msg = "tend must have the same number of elements as flow"
733 raise ValueError(msg)
735 # Extrapolate initial edge using uniform spacing
736 initial_edge = tend[0] - (tend[1] - tend[0])
737 return pd.DatetimeIndex([initial_edge, *list(tend)], dtype=tend.dtype)
739 msg = "Either provide tedges, tstart, and tend"
740 raise ValueError(msg)
743def get_soil_temperature(*, station_number: int = 260, interpolate_missing_values: bool = True) -> pd.DataFrame:
744 """
745 Download soil temperature data from the KNMI and return it as a pandas DataFrame.
747 The data is available for the following KNMI weather stations:
748 - 260: De Bilt, the Netherlands (vanaf 1981)
749 - 273: Marknesse, the Netherlands (vanaf 1989)
750 - 286: Nieuw Beerta, the Netherlands (vanaf 1990)
751 - 323: Wilhelminadorp, the Netherlands (vanaf 1989)
753 TB1 = grondtemperatuur op 5 cm diepte (graden Celsius) tijdens de waarneming
754 TB2 = grondtemperatuur op 10 cm diepte (graden Celsius) tijdens de waarneming
755 TB3 = grondtemperatuur op 20 cm diepte (graden Celsius) tijdens de waarneming
756 TB4 = grondtemperatuur op 50 cm diepte (graden Celsius) tijdens de waarneming
757 TB5 = grondtemperatuur op 100 cm diepte (graden Celsius) tijdens de waarneming
758 TNB2 = minimum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius)
759 TNB1 = minimum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius)
760 TXB1 = maximum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius)
761 TXB2 = maximum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius)
763 Parameters
764 ----------
765 station_number : int, {260, 273, 286, 323}
766 The KNMI station number for which to download soil temperature data.
767 Default is 260 (De Bilt).
768 interpolate_missing_values : bool, optional
769 If True, missing values are interpolated and recent NaN values are extrapolated with the previous value.
770 If False, missing values remain as NaN. Default is True.
772 Returns
773 -------
774 pandas.DataFrame
775 DataFrame containing soil temperature data in Celsius with a DatetimeIndex.
776 Columns include TB1, TB2, TB3, TB4, TB5, TNB1, TNB2, TXB1, TXB2.
778 Notes
779 -----
780 - KNMI: Royal Netherlands Meteorological Institute
781 - The timeseries may contain NaN values for missing data.
782 """
783 # File-based daily cache
784 cache_dir.mkdir(exist_ok=True)
786 today = date.today().isoformat() # noqa: DTZ011
787 cache_path = cache_dir / f"soil_temp_{station_number}_{interpolate_missing_values}_{today}.pkl"
789 # Check if cached file exists and is from today
790 if cache_path.exists():
791 return cast(pd.DataFrame, pd.read_pickle(cache_path)) # noqa: S301
793 # Clean up old cache files to prevent disk bloat
794 for old_file in cache_dir.glob(f"soil_temp_{station_number}_{interpolate_missing_values}_*.pkl"):
795 old_file.unlink(missing_ok=True)
797 url = f"https://cdn.knmi.nl/knmi/map/page/klimatologie/gegevens/bodemtemps/bodemtemps_{station_number}.zip"
799 dtypes = {
800 "YYYYMMDD": "int32",
801 "HH": "int8",
802 " TB1": "float32",
803 " TB3": "float32",
804 " TB2": "float32",
805 " TB4": "float32",
806 " TB5": "float32",
807 " TNB1": "float32",
808 " TNB2": "float32",
809 " TXB1": "float32",
810 " TXB2": "float32",
811 }
813 # Download the ZIP file
814 with requests.get(url, params={"download": "zip"}, timeout=10) as response:
815 response.raise_for_status()
817 df = pd.read_csv( # type: ignore[call-overload]
818 io.BytesIO(response.content),
819 compression="zip",
820 dtype=dtypes,
821 usecols=list(dtypes.keys()),
822 skiprows=16,
823 sep=",",
824 na_values=[" "],
825 engine="c",
826 parse_dates=False,
827 )
829 df.index = pd.to_datetime(df["YYYYMMDD"].values, format=r"%Y%m%d").tz_localize("UTC") + pd.to_timedelta(
830 df["HH"].values, unit="h"
831 )
833 df.drop(columns=["YYYYMMDD", "HH"], inplace=True)
834 df.columns = df.columns.str.strip()
835 df /= 10.0
837 if interpolate_missing_values:
838 # Fill NaN values with interpolate linearly and then forward fill
839 df.interpolate(method="linear", inplace=True)
840 df.ffill(inplace=True)
842 # Save to cache for future use
843 df.to_pickle(cache_path)
844 return df
847def solve_underdetermined_system(
848 *,
849 coefficient_matrix: npt.ArrayLike,
850 rhs_vector: npt.ArrayLike,
851 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float] = "squared_differences",
852 optimization_method: str = "BFGS",
853) -> npt.NDArray[np.floating]:
854 """
855 Solve an underdetermined linear system with nullspace regularization.
857 For an underdetermined system Ax = b where A has more columns than rows,
858 multiple solutions exist. This function computes a least-squares solution
859 and then selects a specific solution from the nullspace based on a
860 regularization objective.
862 Parameters
863 ----------
864 coefficient_matrix : array-like
865 Coefficient matrix of shape (m, n) where m < n (underdetermined).
866 May contain NaN values in some rows, which will be excluded from the system.
867 rhs_vector : array-like
868 Right-hand side vector of length m. May contain NaN values corresponding
869 to NaN rows in coefficient_matrix, which will be excluded from the system.
870 nullspace_objective : str or callable, optional
871 Objective function to minimize in the nullspace. Options:
873 * "squared_differences" : Minimize sum of squared differences between
874 adjacent elements: ``sum((x[i+1] - x[i])**2)``
875 * "summed_differences" : Minimize sum of absolute differences between
876 adjacent elements: ``sum(|x[i+1] - x[i]|)``
877 * callable : Custom objective function with signature
878 ``objective(coeffs, x_ls, nullspace_basis)`` where:
880 - coeffs : optimization variables (nullspace coefficients)
881 - x_ls : least-squares solution
882 - nullspace_basis : nullspace basis matrix
884 Default is "squared_differences".
885 optimization_method : str, optional
886 Optimization method passed to scipy.optimize.minimize.
887 Default is "BFGS".
889 Returns
890 -------
891 numpy.ndarray
892 Solution vector that minimizes the specified nullspace objective.
893 Has length n (number of columns in coefficient_matrix).
895 Raises
896 ------
897 ValueError
898 If optimization fails, if coefficient_matrix and rhs_vector have incompatible shapes,
899 or if an unknown nullspace objective is specified.
901 Notes
902 -----
903 The algorithm follows these steps:
905 1. Remove rows with NaN values from both coefficient_matrix and rhs_vector
906 2. Compute least-squares solution: x_ls = pinv(valid_matrix) @ valid_rhs
907 3. Compute nullspace basis: N = null_space(valid_matrix)
908 4. Find nullspace coefficients: coeffs = argmin objective(x_ls + N @ coeffs)
909 5. Return final solution: x = x_ls + N @ coeffs
911 For the built-in objectives:
913 * "squared_differences" provides smooth solutions, minimizing rapid changes
914 * "summed_differences" provides sparse solutions, promoting piecewise constant behavior
916 Examples
917 --------
918 Basic usage with default squared differences objective:
920 >>> import numpy as np
921 >>> from gwtransport.utils import solve_underdetermined_system
922 >>>
923 >>> # Create underdetermined system (2 equations, 4 unknowns)
924 >>> matrix = np.array([[1, 2, 1, 0], [0, 1, 2, 1]])
925 >>> rhs = np.array([3, 4])
926 >>>
927 >>> # Solve with squared differences regularization
928 >>> x = solve_underdetermined_system(coefficient_matrix=matrix, rhs_vector=rhs)
929 >>> print(f"Solution: {x}") # doctest: +SKIP
930 >>> print(f"Residual: {np.linalg.norm(matrix @ x - rhs):.2e}") # doctest: +SKIP
932 With summed differences objective:
934 >>> x_sparse = solve_underdetermined_system( # doctest: +SKIP
935 ... coefficient_matrix=matrix,
936 ... rhs_vector=rhs,
937 ... nullspace_objective="summed_differences",
938 ... )
940 With custom objective function:
942 >>> def custom_objective(coeffs, x_ls, nullspace_basis):
943 ... x = x_ls + nullspace_basis @ coeffs
944 ... return np.sum(x**2) # Minimize L2 norm
945 >>>
946 >>> x_custom = solve_underdetermined_system( # doctest: +SKIP
947 ... coefficient_matrix=matrix,
948 ... rhs_vector=rhs,
949 ... nullspace_objective=custom_objective,
950 ... )
952 Handling NaN values:
954 >>> # System with missing data
955 >>> matrix_nan = np.array([
956 ... [1, 2, 1, 0],
957 ... [np.nan, np.nan, np.nan, np.nan],
958 ... [0, 1, 2, 1],
959 ... ])
960 >>> rhs_nan = np.array([3, np.nan, 4])
961 >>>
962 >>> x_nan = solve_underdetermined_system(
963 ... coefficient_matrix=matrix_nan, rhs_vector=rhs_nan
964 ... ) # doctest: +SKIP
965 """
966 matrix = np.asarray(coefficient_matrix)
967 rhs = np.asarray(rhs_vector)
969 if matrix.shape[0] != len(rhs):
970 msg = f"coefficient_matrix has {matrix.shape[0]} rows but rhs_vector has {len(rhs)} elements"
971 raise ValueError(msg)
973 # Identify valid rows (no NaN values in either matrix or rhs)
974 valid_rows = ~np.isnan(matrix).any(axis=1) & ~np.isnan(rhs)
976 if not np.any(valid_rows):
977 msg = "No valid rows found (all contain NaN values)"
978 raise ValueError(msg)
980 valid_matrix = matrix[valid_rows]
981 valid_rhs = rhs[valid_rows]
983 # Compute least-squares solution
984 x_ls, *_ = np.linalg.lstsq(valid_matrix, valid_rhs, rcond=None)
986 # Compute nullspace
987 nullspace_basis = null_space(valid_matrix, rcond=None)
988 nullrank = nullspace_basis.shape[1]
990 if nullrank == 0:
991 # System is determined, return least-squares solution
992 return x_ls
994 # Optimize in nullspace
995 coeffs = _optimize_nullspace_coefficients(
996 x_ls=x_ls,
997 nullspace_basis=nullspace_basis,
998 nullspace_objective=nullspace_objective,
999 optimization_method=optimization_method,
1000 )
1002 return x_ls + nullspace_basis @ coeffs
1005def _optimize_nullspace_coefficients(
1006 *,
1007 x_ls: np.ndarray,
1008 nullspace_basis: np.ndarray,
1009 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float],
1010 optimization_method: str,
1011) -> npt.NDArray[np.floating]:
1012 """Optimize coefficients in the nullspace to minimize the objective."""
1013 nullrank = nullspace_basis.shape[1]
1014 objective_func = _get_nullspace_objective_function(nullspace_objective=nullspace_objective)
1015 coeffs_0 = np.zeros(nullrank)
1017 # For stability, always start with squared differences if using summed differences
1018 if nullspace_objective == "summed_differences":
1019 res_init = minimize(
1020 _squared_differences_objective,
1021 x0=coeffs_0,
1022 args=(x_ls, nullspace_basis),
1023 method=optimization_method,
1024 )
1025 if not res_init.success:
1026 msg = f"Initial optimization failed: {res_init.message}"
1027 raise ValueError(msg)
1028 coeffs_0 = res_init.x
1030 # Final optimization with target objective
1031 res = minimize(
1032 objective_func,
1033 x0=coeffs_0,
1034 args=(x_ls, nullspace_basis),
1035 method=optimization_method,
1036 )
1038 if not res.success:
1039 msg = f"Optimization failed: {res.message}"
1040 raise ValueError(msg)
1042 return res.x
1045def _squared_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float:
1046 """Minimize sum of squared differences between adjacent elements."""
1047 x = x_ls + nullspace_basis @ coeffs
1048 return np.sum(np.square(x[1:] - x[:-1]))
1051def _summed_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float:
1052 """Minimize sum of absolute differences between adjacent elements."""
1053 x = x_ls + nullspace_basis @ coeffs
1054 return np.sum(np.abs(x[1:] - x[:-1]))
1057def _get_nullspace_objective_function(
1058 *,
1059 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float],
1060) -> Callable[[np.ndarray, np.ndarray, np.ndarray], float]:
1061 """Get the objective function for nullspace optimization."""
1062 if nullspace_objective == "squared_differences":
1063 return _squared_differences_objective
1064 if nullspace_objective == "summed_differences":
1065 return _summed_differences_objective
1066 if callable(nullspace_objective):
1067 return nullspace_objective # type: ignore[return-value]
1068 msg = f"Unknown nullspace objective: {nullspace_objective}"
1069 raise ValueError(msg)