Coverage for src / gwtransport / utils.py: 83%
442 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
1"""
2General Utilities for 1D Groundwater Transport Modeling.
4This module provides general-purpose utility functions for time series manipulation,
5interpolation, numerical operations, and data processing used throughout the gwtransport
6package. Functions include linear interpolation/averaging, bin overlap calculations,
7underdetermined system solvers, and external data retrieval.
9Available functions:
11- :func:`step_plot_coords` - Compute step-plot coordinates from bin edges and
12 bin-averaged values. Returns paired x/y arrays for plotting piecewise-constant
13 functions with ``ax.plot(x, y)``.
15- :func:`linear_interpolate` - Linear interpolation using numpy's optimized interp function.
16 Automatically handles unsorted data with configurable extrapolation (None for clamping,
17 float for constant values). Handles multi-dimensional query arrays.
19- ``_interp_series`` (private) - Interpolate pandas Series to new DatetimeIndex using
20 scipy.interpolate.interp1d. Automatically filters NaN values and converts datetime to
21 numerical representation.
23- :func:`linear_average` - Compute average values of piecewise linear time series between
24 specified x-edges. Supports 1D or 2D edge arrays for batch processing. Handles NaN values
25 and offers multiple extrapolation methods ('nan', 'outer', 'raise').
27- ``_diff`` (private) - Compute cell widths from cell coordinate arrays with configurable
28 alignment ('centered', 'left', 'right'). Returns widths matching input array length.
30- :func:`partial_isin` - Calculate fraction of each input bin overlapping with each output bin.
31 Returns dense matrix where element (i,j) represents overlap fraction. Uses vectorized
32 operations for efficiency.
34- :func:`time_bin_overlap` - Calculate fraction of time bins overlapping with specified time
35 ranges. Similar to partial_isin but for time-based bin overlaps with list of (start, end)
36 tuples.
38- :func:`simplify_bins` - Simplify a piecewise-constant time series by merging adjacent bins
39 whose values are within a tolerance. Uses volume-weighted (flow x width) averaging when
40 flow is provided, otherwise width-weighted. Direction-independent via recursive splitting.
42- :func:`combine_bin_series` - Combine two binned series onto common set of unique edges. Maps
43 values from original bins to new combined structure with configurable extrapolation ('nearest'
44 or float value).
46- :func:`compute_time_edges` - Compute DatetimeIndex of time bin edges from explicit edges,
47 start times, or end times. Validates consistency with expected number of bins and handles
48 uniform spacing extrapolation.
50- :func:`solve_tikhonov` - Solve linear system with Tikhonov regularization toward a target.
51 Well-determined modes follow the data; poorly-determined modes are pulled toward the target.
53- :func:`solve_inverse_transport` - Solve the inverse transport problem (deconvolution) via
54 Tikhonov regularization. Shared by advection, diffusion, and diffusion_fast
55 ``extraction_to_infiltration`` functions.
57- :func:`solve_underdetermined_system` - Solve underdetermined linear system (Ax = b, m < n)
58 with nullspace regularization. Handles NaN values by row exclusion. Supports built-in
59 objectives ('squared_differences', 'summed_differences') or custom callable objectives.
60 Used by :mod:`gwtransport.deposition`.
62- :func:`get_soil_temperature` - Download soil temperature data from KNMI weather stations with
63 automatic caching. Supports stations 260 (De Bilt), 273 (Marknesse), 286 (Nieuw Beerta),
64 323 (Wilhelminadorp). Returns DataFrame with columns TB1-TB5, TNB1-TNB2, TXB1-TXB2 at various
65 depths. Daily cache prevents redundant downloads.
67- ``_generate_failed_coverage_badge`` (private) - Generate SVG badge indicating failed coverage
68 using genbadge library. Used in CI/CD workflows.
70This file is part of gwtransport which is released under AGPL-3.0 license.
71See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
72"""
74from __future__ import annotations
76import io
77import warnings
78from collections.abc import Callable
79from datetime import date
80from pathlib import Path
82import numpy as np
83import numpy.typing as npt
84import pandas as pd
85import requests
86from scipy import interpolate
87from scipy.linalg import null_space
88from scipy.optimize import minimize
90cache_dir = Path(__file__).parent.parent.parent / "cache"
93def step_plot_coords(edges: npt.ArrayLike, values: npt.ArrayLike) -> tuple[npt.NDArray, npt.NDArray]:
94 """Compute step-plot coordinates from bin edges and bin-averaged values.
96 Converts bin edges (n+1) and bin values (n) into paired x/y arrays
97 suitable for plotting piecewise-constant (step) functions with
98 ``ax.plot(x, y)``.
100 Parameters
101 ----------
102 edges : array-like
103 Bin edges (n+1 elements for n bins). Can be numeric, datetime, or
104 any type accepted by :func:`numpy.repeat`.
105 values : array-like
106 Bin-averaged values (n elements), one per bin.
108 Returns
109 -------
110 x : ndarray
111 Step x-coordinates (2n elements). Same dtype as *edges*.
112 y : ndarray
113 Step y-coordinates (2n elements). Same dtype as *values*.
115 Examples
116 --------
117 >>> import numpy as np
118 >>> edges = np.array([0.0, 1.0, 3.0, 6.0])
119 >>> values = np.array([2.0, 5.0, 1.0])
120 >>> x, y = step_plot_coords(edges, values)
121 >>> x
122 array([0., 1., 1., 3., 3., 6.])
123 >>> y
124 array([2., 2., 5., 5., 1., 1.])
125 """
126 x = np.repeat(edges, 2)[1:-1]
127 y = np.repeat(values, 2)
128 return x, y
131def linear_interpolate(
132 *,
133 x_ref: npt.ArrayLike,
134 y_ref: npt.ArrayLike,
135 x_query: npt.ArrayLike,
136 left: float | None = None,
137 right: float | None = None,
138) -> npt.NDArray[np.floating]:
139 """
140 Linear interpolation using numpy's optimized interp function.
142 Automatically handles unsorted reference data by sorting it first.
144 Parameters
145 ----------
146 x_ref : array-like
147 Reference x-values. If unsorted, will be automatically sorted.
148 y_ref : array-like
149 Reference y-values corresponding to x_ref.
150 x_query : array-like
151 Query x-values where interpolation is needed. Array may have any shape.
152 left : float, optional
153 Value to return for x_query < x_ref[0].
155 - If ``left=None``: clamp to y_ref[0] (default)
156 - If ``left=float``: use specified value (e.g., ``np.nan``)
158 right : float, optional
159 Value to return for x_query > x_ref[-1].
161 - If ``right=None``: clamp to y_ref[-1] (default)
162 - If ``right=float``: use specified value (e.g., ``np.nan``)
164 Returns
165 -------
166 numpy.ndarray
167 Interpolated y-values with the same shape as x_query.
169 Examples
170 --------
171 Basic interpolation with clamping (default):
173 >>> import numpy as np
174 >>> from gwtransport.utils import linear_interpolate
175 >>> x_ref = np.array([1.0, 2.0, 3.0, 4.0])
176 >>> y_ref = np.array([10.0, 20.0, 30.0, 40.0])
177 >>> x_query = np.array([0.5, 1.5, 2.5, 3.5, 4.5])
178 >>> linear_interpolate(x_ref=x_ref, y_ref=y_ref, x_query=x_query)
179 array([10., 15., 25., 35., 40.])
181 Using NaN for extrapolation:
183 >>> linear_interpolate(
184 ... x_ref=x_ref, y_ref=y_ref, x_query=x_query, left=np.nan, right=np.nan
185 ... )
186 array([nan, 15., 25., 35., nan])
188 Handles unsorted reference data automatically:
190 >>> x_unsorted = np.array([3.0, 1.0, 4.0, 2.0])
191 >>> y_unsorted = np.array([30.0, 10.0, 40.0, 20.0])
192 >>> linear_interpolate(x_ref=x_unsorted, y_ref=y_unsorted, x_query=x_query)
193 array([10., 15., 25., 35., 40.])
194 """
195 # Convert inputs to arrays
196 x_ref = np.asarray(x_ref)
197 y_ref = np.asarray(y_ref)
198 x_query = np.asarray(x_query)
200 # Sort reference data to ensure monotonic ordering
201 sort_idx = np.argsort(x_ref)
202 x_ref_sorted = x_ref[sort_idx]
203 y_ref_sorted = y_ref[sort_idx]
205 # Default behavior (left=None, right=None) clamps to boundary values
206 return np.interp(x_query, x_ref_sorted, y_ref_sorted, left=left, right=right)
209def _interp_series(*, series: pd.Series, index_new: pd.DatetimeIndex, **interp1d_kwargs) -> pd.Series:
210 """
211 Interpolate a pandas.Series to a new index.
213 Parameters
214 ----------
215 series : pandas.Series
216 Series to interpolate.
217 index_new : pandas.DatetimeIndex
218 New index to interpolate to.
219 interp1d_kwargs : dict, optional
220 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}.
222 Returns
223 -------
224 pandas.Series
225 Interpolated series.
226 """
227 series = series[series.index.notna() & series.notna()] # pyright: ignore[reportAssignmentType]
228 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D")
229 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D")
230 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs)
231 return pd.Series(interp_obj(dt_interp), index=index_new)
234def _diff(*, a: npt.ArrayLike, alignment: str = "centered") -> npt.NDArray[np.floating]:
235 """Compute the cell widths for a given array of cell coordinates.
237 If alignment is "centered", the coordinates are assumed to be centered in the cells.
238 If alignment is "left", the coordinates are assumed to be at the left edge of the cells.
239 If alignment is "right", the coordinates are assumed to be at the right edge of the cells.
241 Parameters
242 ----------
243 a : array-like
244 Input array.
246 Returns
247 -------
248 numpy.ndarray
249 Array with differences between elements.
251 Raises
252 ------
253 ValueError
254 If ``alignment`` is not ``'centered'``, ``'left'``, or ``'right'``.
255 """
256 # Convert input to array
257 a = np.asarray(a)
259 if alignment == "centered":
260 mid = a[:-1] + (a[1:] - a[:-1]) / 2
261 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]]))
262 if alignment == "left":
263 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]]))
264 if alignment == "right":
265 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1]))
267 msg = f"Invalid alignment: {alignment}"
268 raise ValueError(msg)
271def linear_average(
272 *,
273 x_data: npt.ArrayLike,
274 y_data: npt.ArrayLike,
275 x_edges: npt.ArrayLike,
276 extrapolate_method: str = "nan",
277) -> npt.NDArray[np.float64]:
278 """
279 Compute the average value of a piecewise linear time series between specified x-edges.
281 Parameters
282 ----------
283 x_data : array-like
284 x-coordinates of the time series data points, must be in ascending order.
285 y_data : array-like
286 y-coordinates of the time series data points. Can be 1D or 2D.
288 - If 1D: shape ``(n_data,)`` -- a single series.
289 - If 2D: shape ``(n_series_y, n_data)`` -- multiple series sharing the same
290 ``x_data``. The leading axis is averaged independently per row. Cannot be
291 combined with 2D ``x_edges`` (each row of ``x_edges`` and each row of
292 ``y_data`` would otherwise have to broadcast against each other, which is
293 not supported).
294 x_edges : array-like
295 x-coordinates of the integration edges.
297 - If 1D: shape ``(n_edges,)``, must be in ascending order.
298 - If 2D: shape ``(n_series_x, n_edges)``, each row must be in ascending order.
299 extrapolate_method : str, optional
300 Method for handling extrapolation. Default is 'nan'.
302 - 'outer': Extrapolate using the outermost data points.
303 - 'nan': Extrapolate using np.nan.
304 - 'raise': Raise an error for out-of-bounds values.
306 Returns
307 -------
308 numpy.ndarray
309 2D array of average values between consecutive pairs of x_edges.
310 Shape is ``(n_series, n_bins)`` where ``n_bins = n_edges - 1`` and
311 ``n_series = max(n_series_x, n_series_y)``. Both ``x_edges`` and ``y_data``
312 being 1D yields ``n_series = 1``.
314 Raises
315 ------
316 ValueError
317 If ``x_edges`` is not 1D or 2D. If ``y_data`` is not 1D or 2D. If both
318 ``x_edges`` and ``y_data`` are 2D. If ``x_data`` and ``y_data`` have
319 incompatible shapes or are empty. If ``x_edges`` has fewer than 2 values per
320 row. If ``x_data`` is not in ascending order. If ``x_edges`` rows are not in
321 ascending order. If ``extrapolate_method`` is ``'raise'`` and any edge falls
322 outside the data range.
324 Examples
325 --------
326 >>> import numpy as np
327 >>> from gwtransport.utils import linear_average
328 >>> x_data = [0, 1, 2, 3]
329 >>> y_data = [0, 1, 1, 0]
330 >>> x_edges = [0, 1.5, 3]
331 >>> linear_average(
332 ... x_data=x_data, y_data=y_data, x_edges=x_edges
333 ... ) # doctest: +ELLIPSIS
334 array([[0.666..., 0.666...]])
336 >>> x_edges_2d = [[0, 1.5, 3], [0.5, 2, 3]]
337 >>> linear_average(x_data=x_data, y_data=y_data, x_edges=x_edges_2d)
338 array([[0.66666667, 0.66666667],
339 [0.91666667, 0.5 ]])
341 Multiple y-series with shared x_data and x_edges:
343 >>> y_data_2d = [[0, 1, 1, 0], [0, 2, 2, 0]]
344 >>> linear_average(x_data=x_data, y_data=y_data_2d, x_edges=x_edges)
345 array([[0.66666667, 0.66666667],
346 [1.33333333, 1.33333333]])
347 """
348 # Convert inputs to numpy arrays
349 x_data = np.asarray(x_data, dtype=float)
350 y_data = np.asarray(y_data, dtype=float)
351 x_edges = np.asarray(x_edges, dtype=float)
353 # Ensure x_edges is always 2D
354 if x_edges.ndim == 1:
355 x_edges = x_edges[np.newaxis, :]
356 elif x_edges.ndim != 2: # noqa: PLR2004
357 msg = "x_edges must be 1D or 2D array"
358 raise ValueError(msg)
360 # Ensure y_data is always 2D internally with shape (n_series_y, n_data)
361 if y_data.ndim == 1:
362 y_data = y_data[np.newaxis, :]
363 elif y_data.ndim != 2: # noqa: PLR2004
364 msg = "y_data must be 1D or 2D array"
365 raise ValueError(msg)
367 # 2D y_data requires 1D x_edges (no per-row x_edges allowed). The combination would
368 # require an outer product over (n_series_x, n_series_y), which is intentionally
369 # not supported -- callers can loop or stack instead.
370 n_series_x = x_edges.shape[0]
371 n_series_y = y_data.shape[0]
372 if n_series_x > 1 and n_series_y > 1:
373 msg = "Cannot combine 2D x_edges with 2D y_data"
374 raise ValueError(msg)
375 n_series = max(n_series_x, n_series_y)
377 # Input validation
378 if y_data.shape[1] != x_data.shape[0] or x_data.shape[0] == 0:
379 msg = "x_data and y_data must have the same length and be non-empty"
380 raise ValueError(msg)
381 if x_edges.shape[1] < 2: # noqa: PLR2004
382 msg = "x_edges must contain at least 2 values in each row"
383 raise ValueError(msg)
384 if not np.all(np.diff(x_data) >= 0):
385 msg = "x_data must be in ascending order"
386 raise ValueError(msg)
387 if not np.all(np.diff(x_edges, axis=1) >= 0):
388 msg = "x_edges must be in ascending order along each row"
389 raise ValueError(msg)
391 # Filter out NaN values. With 2D y_data, a column is dropped only when all rows
392 # have NaN there; per-row NaNs are handled via segment masking below so that one
393 # series' NaNs do not contaminate the others.
394 x_nan = np.isnan(x_data)
395 y_any_finite = np.any(~np.isnan(y_data), axis=0)
396 show = ~x_nan & y_any_finite
397 if show.sum() < 2: # noqa: PLR2004
398 if show.sum() == 1 and extrapolate_method == "outer":
399 # For a single retained data point with outer extrapolation, use the
400 # row-wise value broadcast across all output bins.
401 constant_value = y_data[:, show][:, 0] # shape (n_series_y,)
402 return np.broadcast_to(constant_value[:, None], (n_series, x_edges.shape[1] - 1)).astype(
403 np.float64, copy=True
404 )
405 return np.full(shape=(n_series, x_edges.shape[1] - 1), fill_value=np.nan)
407 x_data_clean = x_data[show]
408 y_data_clean = y_data[:, show] # shape (n_series_y, n_clean)
410 # Handle extrapolation for all series at once (vectorized)
411 if extrapolate_method == "outer":
412 edges_processed = np.clip(x_edges, x_data_clean[0], x_data_clean[-1])
413 elif extrapolate_method == "raise":
414 if np.any(x_edges < x_data_clean[0]) or np.any(x_edges > x_data_clean[-1]):
415 msg = "x_edges must be within the range of x_data"
416 raise ValueError(msg)
417 edges_processed = x_edges.copy()
418 else: # nan method
419 edges_processed = x_edges.copy()
421 # Create a combined grid of all unique x points (data + all edges)
422 all_unique_x = np.unique(np.concatenate([x_data_clean, edges_processed.ravel()]))
424 # Interpolate y values at all unique x points once. For 2D y_data we vectorize
425 # the linear interpolation manually since np.interp does not accept 2D y.
426 if n_series_y == 1:
427 all_unique_y_result = np.interp(all_unique_x, x_data_clean, y_data_clean[0], left=np.nan, right=np.nan)
428 all_unique_y: npt.NDArray[np.float64] = np.asarray(all_unique_y_result, dtype=np.float64)[np.newaxis, :]
429 else:
430 # Locate each query x in x_data_clean. For x within the data range, idx is in
431 # [1, len(x_data_clean) - 1] so left_idx = idx - 1 is the bracketing left index.
432 idx = np.searchsorted(x_data_clean, all_unique_x).clip(1, len(x_data_clean) - 1)
433 left_idx = idx - 1
434 right_idx = idx
435 x_left = x_data_clean[left_idx]
436 x_right = x_data_clean[right_idx]
437 denom = x_right - x_left
438 # Detect query points coincident with an x_data point. Handling them via a
439 # direct lookup avoids the IEEE 754 trap where NaN * 0 = NaN, which would
440 # otherwise contaminate exact-endpoint queries adjacent to a NaN sample.
441 on_left_node = denom == 0 # only happens if x_left == x_right (duplicate)
442 weights = np.where(on_left_node, 0.0, (all_unique_x - x_left) / np.where(on_left_node, 1.0, denom))
443 all_unique_y = y_data_clean[:, left_idx] * (1.0 - weights) + y_data_clean[:, right_idx] * weights
444 # Override at exact x_data positions to avoid NaN * 0 contamination.
445 is_left_match = all_unique_x == x_left
446 is_right_match = all_unique_x == x_right
447 all_unique_y[:, is_left_match] = y_data_clean[:, left_idx[is_left_match]]
448 all_unique_y[:, is_right_match] = y_data_clean[:, right_idx[is_right_match]]
449 # Mark out-of-range query points as NaN (matches np.interp(left=nan, right=nan)).
450 out_of_range = (all_unique_x < x_data_clean[0]) | (all_unique_x > x_data_clean[-1])
451 all_unique_y[:, out_of_range] = np.nan
453 # Compute cumulative integrals once using trapezoidal rule.
454 # Segments outside the data range carry NaN (from the interp step with left/right=NaN);
455 # those NaNs will be masked out later via the bin-range check, so we suppress
456 # them here only to keep the cumulative sum finite for in-range bins.
457 dx = np.diff(all_unique_x)
458 y_avg = (all_unique_y[:, :-1] + all_unique_y[:, 1:]) / 2
459 segment_integrals = np.where(np.isnan(y_avg), 0.0, dx[np.newaxis, :] * y_avg)
460 # Cumulative integral with leading 0 along the x axis.
461 cumulative_integral = np.concatenate([np.zeros((y_avg.shape[0], 1)), np.cumsum(segment_integrals, axis=1)], axis=1)
463 # Vectorized computation for all series
464 # Find indices of all edges in the combined grid
465 edge_indices_result = np.searchsorted(all_unique_x, edges_processed)
466 # Ensure it's a 2D array for type checker
467 edge_indices: npt.NDArray[np.intp] = np.asarray(edge_indices_result, dtype=np.intp).reshape(edges_processed.shape)
469 # Compute integral between consecutive edges. Broadcast over n_series via the leading axis
470 # of cumulative_integral. edge_indices is (n_series_x, n_bins+1); cumulative_integral is
471 # (n_series_y, n_unique_x). We rely on n_series_x == 1 or n_series_y == 1 (enforced above).
472 integral_values = cumulative_integral[:, edge_indices[:, 1:]] - cumulative_integral[:, edge_indices[:, :-1]]
473 # integral_values has shape (n_series_y, n_series_x, n_bins). Squeeze the singleton.
474 integral_values_2d = integral_values[0] if n_series_y == 1 else integral_values[:, 0, :]
476 # Compute widths between consecutive edges for all series (vectorized)
477 edge_widths = np.diff(edges_processed, axis=1) # shape (n_series_x, n_bins)
478 # Broadcast widths to match (n_series, n_bins)
479 edge_widths_b = np.broadcast_to(edge_widths, (n_series, edge_widths.shape[1])) if n_series_y > 1 else edge_widths
481 # Handle zero-width intervals (vectorized)
482 zero_width_mask = edge_widths_b == 0
483 result = np.zeros_like(edge_widths_b, dtype=np.float64)
485 # For non-zero width intervals, compute average = integral / width (vectorized)
486 non_zero_mask = ~zero_width_mask
487 result[non_zero_mask] = integral_values_2d[non_zero_mask] / edge_widths_b[non_zero_mask]
489 # For zero-width intervals, interpolate y-value directly (vectorized)
490 if np.any(zero_width_mask):
491 # Positions where zero width occurs; use the left edge's x position.
492 if n_series_y == 1:
493 zero_positions = edges_processed[:, :-1][zero_width_mask] # 1D
494 result[zero_width_mask] = np.interp(zero_positions, x_data_clean, y_data_clean[0])
495 else:
496 # zero_width_mask has shape (n_series_y, n_bins); positions vary per row.
497 # edges_processed is (1, n_bins+1) here since n_series_x == 1.
498 edges_left = np.broadcast_to(edges_processed[:, :-1], (n_series, edge_widths.shape[1]))
499 zero_positions = edges_left[zero_width_mask]
500 # Interpolate per series using the same x_data_clean. Find bracketing indices
501 # for each zero-width position, then index into the appropriate y row.
502 # Get the row index for each zero-width entry.
503 row_idx_grid = np.broadcast_to(np.arange(n_series)[:, None], (n_series, edge_widths.shape[1]))
504 zero_rows = row_idx_grid[zero_width_mask]
505 idx_z = np.searchsorted(x_data_clean, zero_positions).clip(1, len(x_data_clean) - 1)
506 xl = x_data_clean[idx_z - 1]
507 xr = x_data_clean[idx_z]
508 denom_z = np.where(xr == xl, 1.0, xr - xl)
509 w_z = (zero_positions - xl) / denom_z
510 yl = y_data_clean[zero_rows, idx_z - 1]
511 yr = y_data_clean[zero_rows, idx_z]
512 result[zero_width_mask] = yl * (1.0 - w_z) + yr * w_z
514 # Handle extrapolation when 'nan' method is used (vectorized).
515 # Bins must lie entirely within the data range; bins partially outside
516 # (straddling) are also set to NaN, since the integral over the missing
517 # portion is undefined and dividing by the full bin width would bias the
518 # average low. Bins fully outside are likewise NaN.
519 if extrapolate_method == "nan":
520 bins_within_range = (x_edges[:, :-1] >= x_data_clean[0]) & (x_edges[:, 1:] <= x_data_clean[-1])
521 if n_series_y > 1:
522 bins_within_range = np.broadcast_to(bins_within_range, (n_series, bins_within_range.shape[1]))
523 result[~bins_within_range] = np.nan
525 # With 2D y_data, propagate per-row NaNs from the y series itself: any output bin
526 # that touches an x_data segment with NaN y in this row must be NaN. Per-row NaN
527 # info is preserved in all_unique_y; mark bins whose [edge_left, edge_right]
528 # contains a NaN segment for this row.
529 if n_series_y > 1:
530 # For each unique-x segment, is it NaN in this row?
531 seg_nan = np.isnan(y_avg) # shape (n_series_y, n_unique_x - 1)
532 # A bin spans segments [edge_indices[0, b], edge_indices[0, b+1])
533 # For each row, count NaN segments per bin via cumulative sums.
534 seg_nan_cum = np.concatenate([np.zeros((n_series_y, 1)), np.cumsum(seg_nan, axis=1)], axis=1)
535 nan_count_per_bin = seg_nan_cum[:, edge_indices[0, 1:]] - seg_nan_cum[:, edge_indices[0, :-1]]
536 row_has_nan_in_bin = nan_count_per_bin > 0
537 result[row_has_nan_in_bin] = np.nan
539 return result
542def partial_isin(*, bin_edges_in: npt.ArrayLike, bin_edges_out: npt.ArrayLike) -> npt.NDArray[np.floating]:
543 """
544 Calculate the fraction of each input bin that overlaps with each output bin.
546 This function computes a matrix where element (i, j) represents the fraction
547 of input bin i that overlaps with output bin j. The computation uses
548 vectorized operations to avoid loops.
550 Parameters
551 ----------
552 bin_edges_in : array-like
553 1D array of input bin edges in ascending order. For n_in bins, there
554 should be n_in+1 edges.
555 bin_edges_out : array-like
556 1D array of output bin edges in ascending order. For n_out bins, there
557 should be n_out+1 edges.
559 Returns
560 -------
561 overlap_matrix : numpy.ndarray
562 Dense matrix of shape (n_in, n_out) where n_in is the number of input
563 bins and n_out is the number of output bins. Each element (i, j)
564 represents the fraction of input bin i that overlaps with output bin j.
565 Values range from 0 (no overlap) to 1 (complete overlap).
567 Raises
568 ------
569 ValueError
570 If ``bin_edges_in`` or ``bin_edges_out`` are not 1D arrays. If either edge
571 array has fewer than 2 elements. If ``bin_edges_in`` or ``bin_edges_out``
572 are not in ascending order.
574 Notes
575 -----
576 - Both input arrays must be sorted in ascending order
577 - The function leverages the sorted nature of both inputs for efficiency
578 - Uses vectorized operations to handle large bin arrays efficiently
579 - All overlaps sum to 1.0 for each input bin when output bins fully cover input range
581 Examples
582 --------
583 >>> import numpy as np
584 >>> from gwtransport.utils import partial_isin
585 >>> bin_edges_in = np.array([0, 10, 20, 30])
586 >>> bin_edges_out = np.array([5, 15, 25])
587 >>> partial_isin(
588 ... bin_edges_in=bin_edges_in, bin_edges_out=bin_edges_out
589 ... ) # doctest: +NORMALIZE_WHITESPACE
590 array([[0.5, 0. ],
591 [0.5, 0.5],
592 [0. , 0.5]])
593 """
594 # Convert inputs to numpy arrays
595 bin_edges_in = np.asarray(bin_edges_in, dtype=float)
596 bin_edges_out = np.asarray(bin_edges_out, dtype=float)
598 # Validate inputs
599 if bin_edges_in.ndim != 1 or bin_edges_out.ndim != 1:
600 msg = "Both bin_edges_in and bin_edges_out must be 1D arrays"
601 raise ValueError(msg)
602 if len(bin_edges_in) < 2 or len(bin_edges_out) < 2: # noqa: PLR2004
603 msg = "Both edge arrays must have at least 2 elements"
604 raise ValueError(msg)
606 # Edges must be non-decreasing (ignoring NaN). Zero-width input bins are
607 # allowed -- they arise e.g. from cumulative flow with zero-flow intervals --
608 # and produce zero overlap fractions (no water passed through).
609 diffs_in = np.diff(bin_edges_in)
610 valid_diffs_in = ~np.isnan(diffs_in)
611 if np.any(valid_diffs_in) and not np.all(diffs_in[valid_diffs_in] >= 0):
612 msg = "bin_edges_in must be non-decreasing"
613 raise ValueError(msg)
615 diffs_out = np.diff(bin_edges_out)
616 valid_diffs_out = ~np.isnan(diffs_out)
617 if np.any(valid_diffs_out) and not np.all(diffs_out[valid_diffs_out] >= 0):
618 msg = "bin_edges_out must be non-decreasing"
619 raise ValueError(msg)
621 # Build matrix using fully vectorized approach
622 # Create meshgrids for all possible input-output bin combinations
623 in_left = bin_edges_in[:-1, None] # Shape: (n_bins_in, 1)
624 in_right = bin_edges_in[1:, None] # Shape: (n_bins_in, 1)
625 in_width = np.diff(bin_edges_in)[:, None] # Shape: (n_bins_in, 1)
627 out_left = bin_edges_out[None, :-1] # Shape: (1, n_bins_out)
628 out_right = bin_edges_out[None, 1:] # Shape: (1, n_bins_out)
630 # Calculate overlaps for all combinations using broadcasting
631 overlap_left = np.maximum(in_left, out_left) # Shape: (n_bins_in, n_bins_out)
632 overlap_right = np.minimum(in_right, out_right) # Shape: (n_bins_in, n_bins_out)
634 # Calculate overlap widths (zero where no overlap)
635 overlap_widths = np.maximum(0, overlap_right - overlap_left)
637 # Zero-width input bins contribute no overlap (division-safe). NaN widths still
638 # propagate as NaN fractions (preserved for spin-up handling).
639 with np.errstate(divide="ignore", invalid="ignore"):
640 result = overlap_widths / in_width
641 return np.where(in_width == 0, 0.0, result)
644def time_bin_overlap(*, tedges: npt.ArrayLike, bin_tedges: list[tuple]) -> npt.NDArray[np.floating]:
645 """
646 Calculate the fraction of each time bin that overlaps with each time range.
648 This function computes an array where element (i, j) represents the fraction
649 of time bin j that overlaps with time range i. The computation uses
650 vectorized operations to avoid loops.
652 Parameters
653 ----------
654 tedges : array-like
655 1D array of time bin edges in ascending order. For n bins, there
656 should be n+1 edges.
657 bin_tedges : list of tuple
658 List of tuples where each tuple contains ``(start_time, end_time)``
659 defining a time range.
661 Returns
662 -------
663 overlap_array : numpy.ndarray
664 Array of shape (len(bin_tedges), n_bins) where n_bins is the number of
665 time bins. Each element (i, j) represents the fraction of time bin j
666 that overlaps with time range i. Values range from 0 (no overlap) to
667 1 (complete overlap).
669 Raises
670 ------
671 ValueError
672 If ``tedges`` is not a 1D array, has fewer than 2 elements, or if
673 ``bin_tedges`` is empty.
675 Notes
676 -----
677 - tedges must be sorted in ascending order
678 - Uses vectorized operations to handle large arrays efficiently
679 - Time ranges in bin_tedges can be in any order and can overlap
681 Examples
682 --------
683 >>> import numpy as np
684 >>> from gwtransport.utils import time_bin_overlap
685 >>> tedges = np.array([0, 10, 20, 30])
686 >>> bin_tedges = [(5, 15), (25, 35)]
687 >>> time_bin_overlap(
688 ... tedges=tedges, bin_tedges=bin_tedges
689 ... ) # doctest: +NORMALIZE_WHITESPACE
690 array([[0.5, 0.5, 0. ],
691 [0. , 0. , 0.5]])
692 """
693 # Convert inputs to numpy arrays
694 tedges = np.asarray(tedges)
695 bin_tedges_array = np.asarray(bin_tedges)
697 # Validate inputs
698 if tedges.ndim != 1:
699 msg = "tedges must be a 1D array"
700 raise ValueError(msg)
701 if len(tedges) < 2: # noqa: PLR2004
702 msg = "tedges must have at least 2 elements"
703 raise ValueError(msg)
704 if bin_tedges_array.size == 0:
705 msg = "bin_tedges must be non-empty"
706 raise ValueError(msg)
708 # Calculate overlaps for all combinations using broadcasting
709 overlap_left = np.maximum(bin_tedges_array[:, [0]], tedges[None, :-1])
710 overlap_right = np.minimum(bin_tedges_array[:, [1]], tedges[None, 1:])
711 overlap_widths = np.maximum(0, overlap_right - overlap_left)
713 # Calculate fractions (handle division by zero for zero-width bins)
714 bin_width_bc = np.diff(tedges)[None, :] # Shape: (1, n_bins)
716 return np.divide(
717 overlap_widths, bin_width_bc, out=np.zeros_like(overlap_widths, dtype=float), where=bin_width_bc != 0.0
718 )
721def simplify_bins(
722 *,
723 edges: npt.ArrayLike,
724 values: npt.ArrayLike,
725 flow: npt.ArrayLike | None = None,
726 tol: float = 0.0,
727) -> tuple[
728 npt.NDArray[np.floating] | pd.DatetimeIndex,
729 npt.NDArray[np.floating],
730 npt.NDArray[np.floating] | None,
731]:
732 """Simplify a piecewise-constant time series by merging adjacent bins.
734 Recursively splits at the largest value jump until the peak-to-peak
735 range within every group does not exceed `tol`. The result is
736 independent of scan direction.
738 Parameters
739 ----------
740 edges : array-like
741 Bin edges with shape ``(n+1,)``. May be numeric or pandas Timestamps.
742 values : array-like
743 Bin-averaged values with shape ``(n,)`` (e.g., concentrations).
744 flow : array-like, optional
745 Flow rate per bin with shape ``(n,)`` (e.g., m³/day). When provided,
746 merged-bin values are weighted by volume (flow x bin width) instead of
747 bin width alone.
748 tol : float, optional
749 Maximum peak-to-peak range within a merged group.
750 Default is 0.0, which merges only runs of identical values.
752 Returns
753 -------
754 new_edges : ndarray or DatetimeIndex
755 Simplified bin edges with shape ``(m+1,)``, preserving the type of
756 `edges`.
757 new_values : ndarray of float
758 Volume-weighted (or width-weighted) average values per simplified
759 bin, with shape ``(m,)``.
760 new_flow : ndarray of float or None
761 Time-weighted (width-weighted) average flow per simplified bin, with
762 shape ``(m,)``. None when `flow` is not provided.
763 """
764 edges = np.asarray(edges) if not isinstance(edges, pd.DatetimeIndex) else edges
765 values = np.asarray(values, dtype=float)
766 if len(values) == 0:
767 return edges, values, None
769 widths = np.asarray(np.diff(edges), dtype=float)
770 if flow is not None:
771 flow = np.asarray(flow, dtype=float)
772 weights = widths * flow
773 else:
774 weights = widths
776 def _splits(lo: int, hi: int) -> list[int]:
777 if np.ptp(values[lo:hi]) <= tol:
778 return []
779 i = lo + int(np.argmax(np.abs(np.diff(values[lo:hi])))) + 1
780 return [*_splits(lo, i), i, *_splits(i, hi)]
782 s = np.array([0, *_splits(0, len(values))])
783 idx = np.append(s, len(values))
784 new_edges = edges[idx]
785 new_widths = np.add.reduceat(widths, s)
786 new_values = np.add.reduceat(weights * values, s) / np.add.reduceat(weights, s)
787 new_flow = np.add.reduceat(flow * widths, s) / new_widths if flow is not None else None
789 return new_edges, new_values, new_flow
792def _generate_failed_coverage_badge() -> None:
793 """Generate a badge indicating failed coverage."""
794 from genbadge import Badge # type: ignore # noqa: PLC0415
796 b = Badge(left_txt="coverage", right_txt="failed", color="red")
797 b.write_to("coverage_failed.svg", use_shields=False)
800def combine_bin_series(
801 *,
802 a: npt.ArrayLike,
803 a_edges: npt.ArrayLike,
804 b: npt.ArrayLike,
805 b_edges: npt.ArrayLike,
806 extrapolation: str | float = 0.0,
807) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
808 """
809 Combine two binned series onto a common set of unique edges.
811 This function takes two binned series (a and b) with their respective bin edges
812 and creates new series (c and d) that are defined on a combined set of unique
813 edges from both input edge arrays.
815 Parameters
816 ----------
817 a : array-like
818 Values for the first binned series.
819 a_edges : array-like
820 Bin edges for the first series. Must have len(a) + 1 elements.
821 b : array-like
822 Values for the second binned series.
823 b_edges : array-like
824 Bin edges for the second series. Must have len(b) + 1 elements.
825 extrapolation : str or float, optional
826 Method for handling combined bins that fall outside the original series ranges.
827 - 'nearest': Use the nearest original bin value
828 - float value (e.g., np.nan, 0.0): Fill with the specified value (default: 0.0)
830 Returns
831 -------
832 c : numpy.ndarray
833 Values from series a mapped to the combined edge structure.
834 c_edges : numpy.ndarray
835 Combined unique edges from a_edges and b_edges.
836 d : numpy.ndarray
837 Values from series b mapped to the combined edge structure.
838 d_edges : numpy.ndarray
839 Combined unique edges from a_edges and b_edges (same as c_edges).
841 Raises
842 ------
843 ValueError
844 If ``a_edges`` does not have ``len(a) + 1`` elements, or if ``b_edges`` does not
845 have ``len(b) + 1`` elements.
847 Notes
848 -----
849 The combined edges are created by taking the union of all unique values
850 from both a_edges and b_edges, sorted in ascending order. The values
851 are then broadcasted/repeated for each combined bin that falls within
852 the original bin's range.
853 """
854 # Convert inputs to numpy arrays
855 a = np.asarray(a, dtype=float)
856 a_edges = np.asarray(a_edges, dtype=float)
857 b = np.asarray(b, dtype=float)
858 b_edges = np.asarray(b_edges, dtype=float)
860 # Validate inputs
861 if len(a_edges) != len(a) + 1:
862 msg = "a_edges must have len(a) + 1 elements"
863 raise ValueError(msg)
864 if len(b_edges) != len(b) + 1:
865 msg = "b_edges must have len(b) + 1 elements"
866 raise ValueError(msg)
868 # Create combined unique edges
869 combined_edges = np.unique(np.concatenate([a_edges, b_edges]))
871 # Initialize output arrays
872 c = np.zeros(len(combined_edges) - 1)
873 d = np.zeros(len(combined_edges) - 1)
875 # Vectorized mapping using searchsorted - find which original bin each combined bin belongs to
876 # For series a: find which original bin each combined bin center falls into
877 combined_bin_centers = (combined_edges[:-1] + combined_edges[1:]) / 2
878 a_bin_assignment_result = np.searchsorted(a_edges, combined_bin_centers, side="right") - 1
879 # Ensure it's an array for type checker
880 a_bin_assignment: npt.NDArray[np.intp] = np.asarray(a_bin_assignment_result, dtype=np.intp)
881 a_bin_assignment = np.clip(a_bin_assignment, 0, len(a) - 1)
883 # Handle extrapolation for series a
884 if extrapolation == "nearest":
885 # Assign all values using nearest neighbor (already clipped)
886 c[:] = a[a_bin_assignment]
887 else:
888 # Only assign values where the combined bin is completely within the original bin
889 a_valid_mask = (combined_edges[:-1] >= a_edges[a_bin_assignment]) & (
890 combined_edges[1:] <= a_edges[a_bin_assignment + 1]
891 )
892 c[a_valid_mask] = a[a_bin_assignment[a_valid_mask]]
893 # Fill out-of-range bins with extrapolation value
894 c[~a_valid_mask] = extrapolation
896 # Handle extrapolation for series b
897 b_bin_assignment_result = np.searchsorted(b_edges, combined_bin_centers, side="right") - 1
898 # Ensure it's an array for type checker
899 b_bin_assignment: npt.NDArray[np.intp] = np.asarray(b_bin_assignment_result, dtype=np.intp)
900 b_bin_assignment = np.clip(b_bin_assignment, 0, len(b) - 1)
902 if extrapolation == "nearest":
903 # Assign all values using nearest neighbor (already clipped)
904 d[:] = b[b_bin_assignment]
905 else:
906 # Only assign values where the combined bin is completely within the original bin
907 b_valid_mask = (combined_edges[:-1] >= b_edges[b_bin_assignment]) & (
908 combined_edges[1:] <= b_edges[b_bin_assignment + 1]
909 )
910 d[b_valid_mask] = b[b_bin_assignment[b_valid_mask]]
911 # Fill out-of-range bins with extrapolation value
912 d[~b_valid_mask] = extrapolation
914 # Return the combined series
915 c_edges = combined_edges
916 d_edges = combined_edges.copy()
918 return c, c_edges, d, d_edges
921def compute_time_edges(
922 *,
923 tedges: pd.DatetimeIndex | None,
924 tstart: pd.DatetimeIndex | None,
925 tend: pd.DatetimeIndex | None,
926 number_of_bins: int,
927) -> pd.DatetimeIndex:
928 """
929 Compute time edges for binning data based on provided time parameters.
931 This function creates a DatetimeIndex of time bin edges from one of three possible
932 input formats: explicit edges, start times, or end times. The resulting edges
933 define the boundaries of time intervals for data binning.
935 Define either explicit time edges, or start and end times for each bin and leave the others at None.
937 Parameters
938 ----------
939 tedges : pandas.DatetimeIndex or None
940 Explicit time edges for the bins. If provided, must have one more element
941 than the number of bins (n_bins + 1). Takes precedence over tstart and tend.
942 tstart : pandas.DatetimeIndex or None
943 Start times for each bin. Must have the same number of elements as the
944 number of bins. Used when tedges is None.
945 tend : pandas.DatetimeIndex or None
946 End times for each bin. Must have the same number of elements as the
947 number of bins. Used when both tedges and tstart are None.
948 number_of_bins : int
949 The expected number of time bins. Used for validation against the provided
950 time parameters.
952 Returns
953 -------
954 pandas.DatetimeIndex
955 Time edges defining the boundaries of the time bins. Has one more element
956 than number_of_bins.
958 Raises
959 ------
960 ValueError
961 If tedges has incorrect length (not number_of_bins + 1).
962 If tstart has incorrect length (not equal to number_of_bins).
963 If tend has incorrect length (not equal to number_of_bins).
964 If none of tedges, tstart, or tend are provided.
966 Notes
967 -----
968 - When using tstart, the function assumes uniform spacing and extrapolates
969 the final edge based on the spacing between the last two start times.
970 - When using tend, the function assumes uniform spacing and extrapolates
971 the first edge based on the spacing between the first two end times.
972 - When ``tstart`` or ``tend`` are provided with non-uniformly-spaced bins,
973 the extrapolated edge uses only the very first or very last interval and
974 may be physically incorrect: the missing edge is implicitly assigned a
975 bin width equal to that single neighbouring interval, which is unrelated
976 to any other interval in the series. In such cases, supply ``tedges``
977 directly so that all bin widths are explicit.
978 - All input time data is converted to pandas.DatetimeIndex for consistency.
979 """
980 if tedges is not None:
981 if number_of_bins != len(tedges) - 1:
982 msg = "tedges must have one more element than flow"
983 raise ValueError(msg)
984 tedges = pd.DatetimeIndex(tedges)
985 # Ensure nanosecond precision while preserving timezone
986 return tedges.as_unit("ns")
988 if tstart is not None:
989 # Assume the index refers to the time at the start of the measurement interval
990 tstart = pd.DatetimeIndex(tstart).as_unit("ns")
991 if number_of_bins != len(tstart):
992 msg = "tstart must have the same number of elements as flow"
993 raise ValueError(msg)
995 # Extrapolate final edge using uniform spacing
996 final_edge = tstart[-1] + (tstart[-1] - tstart[-2])
997 return pd.DatetimeIndex([*list(tstart), final_edge], dtype=tstart.dtype)
999 if tend is not None:
1000 # Assume the index refers to the time at the end of the measurement interval
1001 tend = pd.DatetimeIndex(tend).as_unit("ns")
1002 if number_of_bins != len(tend):
1003 msg = "tend must have the same number of elements as flow"
1004 raise ValueError(msg)
1006 # Extrapolate initial edge using uniform spacing
1007 initial_edge = tend[0] - (tend[1] - tend[0])
1008 return pd.DatetimeIndex([initial_edge, *list(tend)], dtype=tend.dtype)
1010 msg = "Either provide tedges, tstart, and tend"
1011 raise ValueError(msg)
1014def get_soil_temperature(*, station_number: int = 260, interpolate_missing_values: bool = True) -> pd.DataFrame:
1015 """
1016 Download soil temperature data from the KNMI and return it as a pandas DataFrame.
1018 The data is available for the following KNMI weather stations:
1019 - 260: De Bilt, the Netherlands (vanaf 1981)
1020 - 273: Marknesse, the Netherlands (vanaf 1989)
1021 - 286: Nieuw Beerta, the Netherlands (vanaf 1990)
1022 - 323: Wilhelminadorp, the Netherlands (vanaf 1989)
1024 TB1 = grondtemperatuur op 5 cm diepte (graden Celsius) tijdens de waarneming
1025 TB2 = grondtemperatuur op 10 cm diepte (graden Celsius) tijdens de waarneming
1026 TB3 = grondtemperatuur op 20 cm diepte (graden Celsius) tijdens de waarneming
1027 TB4 = grondtemperatuur op 50 cm diepte (graden Celsius) tijdens de waarneming
1028 TB5 = grondtemperatuur op 100 cm diepte (graden Celsius) tijdens de waarneming
1029 TNB2 = minimum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius)
1030 TNB1 = minimum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius)
1031 TXB1 = maximum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius)
1032 TXB2 = maximum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius)
1034 Parameters
1035 ----------
1036 station_number : int, {260, 273, 286, 323}
1037 The KNMI station number for which to download soil temperature data.
1038 Default is 260 (De Bilt).
1039 interpolate_missing_values : bool, optional
1040 If True, missing values are interpolated and recent NaN values are extrapolated with the previous value.
1041 If False, missing values remain as NaN. Default is True.
1043 Returns
1044 -------
1045 pandas.DataFrame
1046 DataFrame containing soil temperature data in Celsius with a DatetimeIndex.
1047 Columns include TB1, TB2, TB3, TB4, TB5, TNB1, TNB2, TXB1, TXB2.
1049 Notes
1050 -----
1051 - KNMI: Royal Netherlands Meteorological Institute
1052 - The timeseries may contain NaN values for missing data.
1053 """
1054 # File-based daily cache
1055 cache_dir.mkdir(exist_ok=True)
1057 today = date.today().isoformat() # noqa: DTZ011
1058 cache_path = cache_dir / f"soil_temp_{station_number}_{interpolate_missing_values}_{today}.pkl"
1060 # Check if cached file exists and is from today
1061 if cache_path.exists():
1062 cached = pd.read_pickle(cache_path) # noqa: S301
1063 assert isinstance(cached, pd.DataFrame) # noqa: S101 -- the cache only ever stores DataFrames
1064 return cached
1066 # Clean up old cache files to prevent disk bloat
1067 for old_file in cache_dir.glob(f"soil_temp_{station_number}_{interpolate_missing_values}_*.pkl"):
1068 old_file.unlink(missing_ok=True)
1070 url = f"https://cdn.knmi.nl/knmi/map/page/klimatologie/gegevens/bodemtemps/bodemtemps_{station_number}.zip"
1072 dtypes = {
1073 "YYYYMMDD": "int32",
1074 "HH": "int8",
1075 " TB1": "float32",
1076 " TB3": "float32",
1077 " TB2": "float32",
1078 " TB4": "float32",
1079 " TB5": "float32",
1080 " TNB1": "float32",
1081 " TNB2": "float32",
1082 " TXB1": "float32",
1083 " TXB2": "float32",
1084 }
1086 # Download the ZIP file
1087 with requests.get(url, params={"download": "zip"}, timeout=10) as response:
1088 response.raise_for_status()
1090 df = pd.read_csv( # type: ignore[call-overload] # ty: ignore[no-matching-overload]
1091 io.BytesIO(response.content),
1092 compression="zip",
1093 dtype=dtypes, # pyright: ignore[reportArgumentType]
1094 usecols=list(dtypes.keys()), # pyright: ignore[reportArgumentType]
1095 skiprows=16,
1096 sep=",",
1097 na_values=[" "],
1098 engine="c",
1099 parse_dates=False,
1100 )
1102 df.index = pd.to_datetime(df["YYYYMMDD"].values, format=r"%Y%m%d").tz_localize("UTC") + pd.to_timedelta(
1103 df["HH"].values, unit="h"
1104 )
1106 df.drop(columns=["YYYYMMDD", "HH"], inplace=True)
1107 df.columns = df.columns.str.strip()
1108 df /= 10.0
1110 if interpolate_missing_values:
1111 # Fill NaN values with interpolate linearly and then forward fill
1112 df.interpolate(method="linear", inplace=True)
1113 df.ffill(inplace=True)
1115 # Save to cache for future use
1116 df.to_pickle(cache_path)
1117 return df
1120def solve_underdetermined_system(
1121 *,
1122 coefficient_matrix: npt.ArrayLike,
1123 rhs_vector: npt.ArrayLike,
1124 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float] = "squared_differences",
1125 optimization_method: str = "BFGS",
1126 rcond: float | None = None,
1127 x_target: npt.NDArray[np.floating] | None = None,
1128) -> npt.NDArray[np.floating]:
1129 """
1130 Solve an underdetermined linear system with nullspace regularization.
1132 For an underdetermined system Ax = b where A has more columns than rows,
1133 multiple solutions exist. This function computes a least-squares solution
1134 and then selects a specific solution from the nullspace based on a
1135 regularization objective.
1137 Parameters
1138 ----------
1139 coefficient_matrix : array-like
1140 Coefficient matrix of shape (m, n) where m < n (underdetermined).
1141 May contain NaN values in some rows, which will be excluded from the system.
1142 rhs_vector : array-like
1143 Right-hand side vector of length m. May contain NaN values corresponding
1144 to NaN rows in coefficient_matrix, which will be excluded from the system.
1145 nullspace_objective : str or callable, optional
1146 Objective function to minimize in the nullspace. Options:
1148 * "squared_differences" : Minimize sum of squared differences between
1149 adjacent elements: ``sum((x[i+1] - x[i])**2)``
1150 * "summed_differences" : Minimize sum of absolute differences between
1151 adjacent elements: ``sum(|x[i+1] - x[i]|)``
1152 * "reverse_matrix" : Minimize distance to a target solution:
1153 ``sum((x - x_target)**2)``. Requires ``x_target`` parameter.
1154 Combines pseudoinverse exactness for well-determined modes with
1155 physically motivated regularization for undetermined modes.
1156 * callable : Custom objective function with signature
1157 ``objective(coeffs, x_ls, nullspace_basis)`` where:
1159 - coeffs : optimization variables (nullspace coefficients)
1160 - x_ls : least-squares solution
1161 - nullspace_basis : nullspace basis matrix
1163 Default is "squared_differences".
1164 optimization_method : str, optional
1165 Optimization method passed to scipy.optimize.minimize.
1166 Default is "BFGS".
1167 rcond : float or None, optional
1168 Cutoff ratio for small singular values in both ``numpy.linalg.lstsq``
1169 and ``scipy.linalg.null_space``. Singular values smaller than
1170 ``rcond * largest_singular_value`` are treated as zero.
1171 Default is None, which uses the default of each function.
1172 Increasing rcond truncates more modes, expanding the nullspace
1173 available for smoothness optimization. Useful for noisy data.
1174 x_target : numpy.ndarray or None, optional
1175 Target solution vector for the ``"reverse_matrix"`` nullspace objective.
1176 Required when ``nullspace_objective="reverse_matrix"``. The nullspace
1177 coefficients are chosen to minimize ``||x - x_target||^2``.
1179 Returns
1180 -------
1181 numpy.ndarray
1182 Solution vector that minimizes the specified nullspace objective.
1183 Has length n (number of columns in coefficient_matrix).
1185 Raises
1186 ------
1187 ValueError
1188 If optimization fails, if coefficient_matrix and rhs_vector have incompatible shapes,
1189 or if an unknown nullspace objective is specified.
1191 Notes
1192 -----
1193 The algorithm follows these steps:
1195 1. Remove rows with NaN values from both coefficient_matrix and rhs_vector
1196 2. Compute least-squares solution: x_ls = pinv(valid_matrix) @ valid_rhs
1197 3. Compute nullspace basis: N = null_space(valid_matrix)
1198 4. Find nullspace coefficients: coeffs = argmin objective(x_ls + N @ coeffs)
1199 5. Return final solution: x = x_ls + N @ coeffs
1201 For the built-in objectives:
1203 * "squared_differences" provides smooth solutions, minimizing rapid changes
1204 * "summed_differences" provides sparse solutions, promoting piecewise constant behavior
1206 Examples
1207 --------
1208 Basic usage with default squared differences objective:
1210 >>> import numpy as np
1211 >>> from gwtransport.utils import solve_underdetermined_system
1212 >>>
1213 >>> # Create underdetermined system (2 equations, 4 unknowns)
1214 >>> matrix = np.array([[1, 2, 1, 0], [0, 1, 2, 1]])
1215 >>> rhs = np.array([3, 4])
1216 >>>
1217 >>> # Solve with squared differences regularization
1218 >>> x = solve_underdetermined_system(coefficient_matrix=matrix, rhs_vector=rhs)
1219 >>> print(f"Solution: {x}") # doctest: +SKIP
1220 >>> print(f"Residual: {np.linalg.norm(matrix @ x - rhs):.2e}") # doctest: +SKIP
1222 With summed differences objective:
1224 >>> x_sparse = solve_underdetermined_system( # doctest: +SKIP
1225 ... coefficient_matrix=matrix,
1226 ... rhs_vector=rhs,
1227 ... nullspace_objective="summed_differences",
1228 ... )
1230 With custom objective function:
1232 >>> def custom_objective(coeffs, x_ls, nullspace_basis):
1233 ... x = x_ls + nullspace_basis @ coeffs
1234 ... return np.sum(x**2) # Minimize L2 norm
1235 >>>
1236 >>> x_custom = solve_underdetermined_system( # doctest: +SKIP
1237 ... coefficient_matrix=matrix,
1238 ... rhs_vector=rhs,
1239 ... nullspace_objective=custom_objective,
1240 ... )
1242 Handling NaN values:
1244 >>> # System with missing data
1245 >>> matrix_nan = np.array([
1246 ... [1, 2, 1, 0],
1247 ... [np.nan, np.nan, np.nan, np.nan],
1248 ... [0, 1, 2, 1],
1249 ... ])
1250 >>> rhs_nan = np.array([3, np.nan, 4])
1251 >>>
1252 >>> x_nan = solve_underdetermined_system(
1253 ... coefficient_matrix=matrix_nan, rhs_vector=rhs_nan
1254 ... ) # doctest: +SKIP
1255 """
1256 matrix = np.asarray(coefficient_matrix)
1257 rhs = np.asarray(rhs_vector)
1259 if matrix.shape[0] != len(rhs):
1260 msg = f"coefficient_matrix has {matrix.shape[0]} rows but rhs_vector has {len(rhs)} elements"
1261 raise ValueError(msg)
1263 # Identify valid rows (no NaN values in either matrix or rhs)
1264 valid_rows = ~np.isnan(matrix).any(axis=1) & ~np.isnan(rhs)
1266 if not np.any(valid_rows):
1267 msg = "No valid rows found (all contain NaN values)"
1268 raise ValueError(msg)
1270 valid_matrix = matrix[valid_rows]
1271 valid_rhs = rhs[valid_rows]
1273 # Compute least-squares solution
1274 x_ls, *_ = np.linalg.lstsq(valid_matrix, valid_rhs, rcond=rcond)
1276 # Compute nullspace
1277 nullspace_basis = null_space(valid_matrix, rcond=rcond)
1278 nullrank = nullspace_basis.shape[1]
1280 if nullrank == 0:
1281 # System is determined, return least-squares solution
1282 return x_ls
1284 # Optimize in nullspace
1285 coeffs = _optimize_nullspace_coefficients(
1286 x_ls=x_ls,
1287 nullspace_basis=nullspace_basis,
1288 nullspace_objective=nullspace_objective,
1289 optimization_method=optimization_method,
1290 x_target=x_target,
1291 )
1293 return x_ls + nullspace_basis @ coeffs
1296def _optimize_nullspace_coefficients(
1297 *,
1298 x_ls: np.ndarray,
1299 nullspace_basis: np.ndarray,
1300 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float],
1301 optimization_method: str,
1302 x_target: np.ndarray | None = None,
1303) -> npt.NDArray[np.floating]:
1304 """Optimize coefficients in the nullspace to minimize the objective.
1306 Parameters
1307 ----------
1308 x_ls : ndarray
1309 Least-squares solution vector.
1310 nullspace_basis : ndarray
1311 Nullspace basis matrix of shape (n, nullrank).
1312 nullspace_objective : str or callable
1313 Objective to minimize. Supported string values are
1314 ``'squared_differences'``, ``'summed_differences'``, and
1315 ``'reverse_matrix'``. A callable with signature
1316 ``objective(coeffs, x_ls, nullspace_basis)`` is also accepted.
1317 optimization_method : str
1318 Optimization method passed to ``scipy.optimize.minimize``.
1319 x_target : ndarray or None, optional
1320 Target solution vector required when
1321 ``nullspace_objective='reverse_matrix'``. Default is None.
1323 Returns
1324 -------
1325 ndarray
1326 Optimal nullspace coefficient vector of length nullrank.
1328 Raises
1329 ------
1330 ValueError
1331 If ``nullspace_objective`` is ``'reverse_matrix'`` and ``x_target`` is
1332 None. If iterative optimization fails to converge.
1333 """
1334 if nullspace_objective == "reverse_matrix":
1335 if x_target is None:
1336 msg = "x_target is required when nullspace_objective='reverse_matrix'"
1337 raise ValueError(msg)
1338 return _solve_target_analytical(x_ls=x_ls, nullspace_basis=nullspace_basis, x_target=x_target)
1340 # For squared_differences, solve the quadratic form analytically:
1341 # min ||D(x_ls + N c)||^2 => (N'D'DN) c = -N'D'D x_ls
1342 coeffs_sq = _solve_squared_differences_analytical(x_ls=x_ls, nullspace_basis=nullspace_basis)
1344 if nullspace_objective == "squared_differences":
1345 return coeffs_sq
1347 # For other objectives, use iterative optimization starting from the
1348 # squared_differences solution for stability
1349 nullrank = nullspace_basis.shape[1]
1350 objective_func = _get_nullspace_objective_function(nullspace_objective=nullspace_objective)
1351 coeffs_0 = coeffs_sq if coeffs_sq is not None else np.zeros(nullrank)
1353 res = minimize(
1354 objective_func,
1355 x0=coeffs_0,
1356 args=(x_ls, nullspace_basis),
1357 method=optimization_method,
1358 )
1360 if not res.success:
1361 msg = f"Optimization failed: {res.message}"
1362 raise ValueError(msg)
1364 return res.x
1367def _solve_squared_differences_analytical(
1368 *,
1369 x_ls: np.ndarray,
1370 nullspace_basis: np.ndarray,
1371) -> npt.NDArray[np.floating]:
1372 """Solve the squared-differences nullspace problem analytically.
1374 Minimizes ``sum((x[i+1] - x[i])^2)`` where ``x = x_ls + N @ c`` by
1375 solving the normal equations ``(N^T D^T D N) c = -N^T D^T D x_ls``.
1377 Parameters
1378 ----------
1379 x_ls : ndarray
1380 Least-squares solution vector of length n.
1381 nullspace_basis : ndarray
1382 Nullspace basis matrix of shape (n, nullrank).
1384 Returns
1385 -------
1386 ndarray
1387 Optimal nullspace coefficient vector of length nullrank.
1389 Raises
1390 ------
1391 numpy.linalg.LinAlgError
1392 If the normal equations matrix ``(DN)^T(DN)`` is ill-conditioned
1393 (condition number exceeds 1e12).
1394 """
1395 # D is the (n-1, n) first-difference matrix; D @ x = x[1:] - x[:-1]
1396 # D^T D is the tridiagonal matrix with 2 on diagonal, -1 on off-diagonals
1397 # (except corners which have 1 on diagonal)
1398 # Instead of forming D explicitly, compute D @ N and D @ x_ls directly
1399 dn = nullspace_basis[1:, :] - nullspace_basis[:-1, :] # (n-1, nullrank)
1400 dx = x_ls[1:] - x_ls[:-1] # (n-1,)
1402 # Normal equations: (DN)^T (DN) c = -(DN)^T (D x_ls)
1403 dntdn = dn.T @ dn # (nullrank, nullrank)
1404 rhs = -(dn.T @ dx) # (nullrank,)
1406 cond = np.linalg.cond(dntdn)
1407 cond_threshold = 1e12
1408 if cond > cond_threshold:
1409 msg = (
1410 f"The normal equations matrix (DN)^T(DN) is ill-conditioned "
1411 f"(condition number: {cond:.2e}). This typically means the "
1412 f"nullspace contains a near-constant vector, so the "
1413 f"squared-differences objective cannot distinguish between "
1414 f"nullspace directions. Consider using a different "
1415 f"nullspace_objective (e.g., 'summed_differences'), reducing "
1416 f"the problem's degrees of freedom, or lowering rcond to "
1417 f"shrink the nullspace (if the near-constant vector has a "
1418 f"small but non-zero singular value)."
1419 )
1420 raise np.linalg.LinAlgError(msg)
1422 return np.linalg.solve(dntdn, rhs)
1425def _solve_target_analytical(
1426 *,
1427 x_ls: np.ndarray,
1428 nullspace_basis: np.ndarray,
1429 x_target: np.ndarray,
1430) -> npt.NDArray[np.floating]:
1431 """Solve the target-based nullspace problem analytically.
1433 Minimizes ``||x_ls + N @ c - x_target||^2`` over non-NaN entries of
1434 x_target by solving the normal equations
1435 ``(N_v^T N_v) c = N_v^T (x_target_v - x_ls_v)`` where ``_v`` denotes
1436 the valid (non-NaN) subset.
1438 Parameters
1439 ----------
1440 x_ls : ndarray
1441 Least-squares solution vector.
1442 nullspace_basis : ndarray
1443 Nullspace basis matrix of shape (n, nullrank).
1444 x_target : ndarray
1445 Target solution vector of length n. May contain NaN for entries
1446 that should be excluded from the minimization.
1448 Returns
1449 -------
1450 ndarray
1451 Optimal nullspace coefficients of length nullrank.
1452 """
1453 valid = ~np.isnan(x_target)
1454 if not np.any(valid):
1455 return np.zeros(nullspace_basis.shape[1])
1457 delta = x_target[valid] - x_ls[valid]
1458 n_valid = nullspace_basis[valid]
1459 ntn = n_valid.T @ n_valid
1460 rhs = n_valid.T @ delta
1462 # Use lstsq to handle ill-conditioned systems gracefully.
1463 # When valid target entries don't span all nullspace directions,
1464 # lstsq returns the minimum-norm solution (zero coefficients for
1465 # unconstrained directions, preserving x_ls there).
1466 coeffs, *_ = np.linalg.lstsq(ntn, rhs, rcond=None)
1467 return coeffs
1470def compute_reverse_target(
1471 *,
1472 coeff_matrix: npt.NDArray[np.floating],
1473 rhs_vector: npt.NDArray[np.floating],
1474) -> npt.NDArray[np.floating]:
1475 """Compute reverse matrix target from forward coefficient matrix.
1477 Constructs a target solution for the inverse problem by transposing the
1478 forward coefficient matrix and normalizing rows. For ``W_forward[i,j]``
1479 representing the fraction of ``cin[j]`` arriving in ``cout[i]``, the
1480 transpose-and-normalize approach reconstructs ``cin[j]`` as a weighted
1481 average of ``cout`` bins, weighted by how much ``cin[j]`` contributed
1482 to each ``cout`` bin.
1484 Parameters
1485 ----------
1486 coeff_matrix : numpy.ndarray
1487 Forward coefficient matrix of shape (n_cout, n_cin).
1488 rhs_vector : numpy.ndarray
1489 Right-hand side vector of length n_cout (e.g., cout values).
1491 Returns
1492 -------
1493 numpy.ndarray
1494 Target solution vector of length n_cin. Entries with near-zero
1495 column sums in the forward matrix are set to NaN.
1496 """
1497 min_row_sum = 1e-10
1498 wt = coeff_matrix.T # (n_cin, n_cout)
1499 row_sums = wt.sum(axis=1)
1500 valid = row_sums > min_row_sum
1501 w_reverse = np.zeros_like(wt)
1502 w_reverse[valid] = wt[valid] / row_sums[valid, None]
1503 x_target = w_reverse @ rhs_vector
1504 x_target[~valid] = np.nan
1505 return x_target
1508def solve_tikhonov(
1509 *,
1510 coefficient_matrix: npt.ArrayLike,
1511 rhs_vector: npt.ArrayLike,
1512 x_target: npt.NDArray[np.floating],
1513 regularization_strength: float = 1e-10,
1514 return_resolution: bool = False,
1515) -> npt.NDArray[np.floating] | tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
1516 """Solve a linear system with Tikhonov regularization toward a target.
1518 Minimizes ``||A x - b||² + λ ||x - x_target||²`` by solving the
1519 equivalent augmented least-squares problem::
1521 [A; √λ I_v] x = [b; √λ x_target_v]
1523 where ``I_v`` selects only entries where ``x_target`` is not NaN.
1525 Well-determined modes (large singular values relative to √λ) are
1526 dominated by the data; poorly-determined modes are pulled toward
1527 ``x_target``. The solution varies continuously with λ, unlike the
1528 hard singular-value cutoff of ``rcond`` in truncated SVD.
1530 Parameters
1531 ----------
1532 coefficient_matrix : array-like
1533 Coefficient matrix of shape (m, n). May contain NaN rows, which
1534 are excluded from the system.
1535 rhs_vector : array-like
1536 Right-hand side vector of length m. May contain NaN values
1537 corresponding to NaN rows in coefficient_matrix.
1538 x_target : numpy.ndarray
1539 Target solution of length n, typically from
1540 :func:`compute_reverse_target`. NaN entries are excluded from the
1541 regularization term.
1542 regularization_strength : float, optional
1543 Tikhonov parameter λ. Controls the tradeoff between fitting the
1544 data and staying close to ``x_target``. Larger values trust the
1545 target more; smaller values trust the data more. Default is 1e-10.
1547 A good starting value for noisy data is
1548 ``λ ≈ (noise_std / signal_amplitude)²``. For noiseless synthetic
1549 data, the default 1e-10 preserves machine precision.
1550 return_resolution : bool, optional
1551 If True, also return the per-element fraction of the solution that
1552 comes from data (vs from the regularization target). Default is
1553 False.
1555 Returns
1556 -------
1557 numpy.ndarray or tuple of numpy.ndarray
1558 If ``return_resolution`` is False (default), returns the solution
1559 vector of length n.
1561 If ``return_resolution`` is True, returns ``(x, fraction_data)``
1562 where ``fraction_data[j]`` is the diagonal of the model resolution
1563 matrix ``R = (A^T A + λ D)^{-1} A^T A``:
1565 - ``fraction_data[j] ≈ 1``: element *j* is data-driven
1566 - ``fraction_data[j] ≈ 0``: element *j* is target-driven
1567 - Non-regularized entries (NaN in ``x_target``):
1568 ``fraction_data[j] = 1.0``
1570 Raises
1571 ------
1572 ValueError
1573 If ``coefficient_matrix`` and ``rhs_vector`` have incompatible shapes, or if
1574 all rows contain NaN values.
1576 See Also
1577 --------
1578 compute_reverse_target : Compute the regularization target from the
1579 forward matrix.
1580 solve_underdetermined_system : Alternative solver using nullspace
1581 optimization.
1582 """
1583 matrix = np.asarray(coefficient_matrix)
1584 rhs = np.asarray(rhs_vector)
1586 if matrix.shape[0] != len(rhs):
1587 msg = f"coefficient_matrix has {matrix.shape[0]} rows but rhs_vector has {len(rhs)} elements"
1588 raise ValueError(msg)
1590 # Filter NaN rows
1591 valid_rows = ~np.isnan(matrix).any(axis=1) & ~np.isnan(rhs)
1593 if not np.any(valid_rows):
1594 msg = "No valid rows found (all contain NaN values)"
1595 raise ValueError(msg)
1597 valid_matrix = matrix[valid_rows]
1598 valid_rhs = rhs[valid_rows]
1600 n_cin = valid_matrix.shape[1]
1601 sqrt_lam = np.sqrt(regularization_strength)
1603 # Only regularize entries where x_target is valid
1604 valid_target = ~np.isnan(x_target)
1605 target_indices = np.where(valid_target)[0]
1607 # Build augmented system: [A; √λ I_v] x = [b; √λ x_target_v]
1608 n_reg = len(target_indices)
1609 reg_matrix = np.zeros((n_reg, n_cin))
1610 reg_matrix[np.arange(n_reg), target_indices] = sqrt_lam
1611 reg_rhs = sqrt_lam * x_target[target_indices]
1613 augmented_matrix = np.vstack([valid_matrix, reg_matrix])
1614 augmented_rhs = np.concatenate([valid_rhs, reg_rhs])
1616 x, *_ = np.linalg.lstsq(augmented_matrix, augmented_rhs, rcond=None)
1618 if return_resolution:
1619 # Compute fraction_data from model resolution matrix diagonal:
1620 # R = G^{-1} A^T A where G = A^T A + λ diag(d)
1621 # fraction_data[j] = R[j,j] = 1 - λ d[j] G_inv[j,j]
1622 d_reg = np.zeros(n_cin)
1623 d_reg[target_indices] = 1.0
1624 gram = valid_matrix.T @ valid_matrix + regularization_strength * np.diag(d_reg)
1625 gram_inv_diag = np.diag(np.linalg.inv(gram))
1626 fraction_data = 1.0 - regularization_strength * gram_inv_diag * d_reg
1627 return x, fraction_data
1629 return x
1632# Numerical tolerance for coefficient sum to determine valid output bins
1633_EPSILON_COEFF_SUM = 1e-10
1636def solve_inverse_transport(
1637 *,
1638 w_forward: npt.NDArray[np.floating],
1639 observed: npt.NDArray[np.floating],
1640 n_output: int,
1641 regularization_strength: float,
1642 valid_rows: npt.NDArray[np.bool_] | None = None,
1643 warn_rank_deficient: bool = False,
1644) -> npt.NDArray[np.floating]:
1645 """Solve the inverse transport problem via Tikhonov regularization.
1647 Given the forward model ``w_forward @ x = observed``, recovers ``x`` by
1648 building the regularization target from the transpose of ``w_forward`` and
1649 solving the regularized least-squares problem.
1651 Parameters
1652 ----------
1653 w_forward : ndarray
1654 Forward coefficient matrix with shape ``(n_obs, n_output)``.
1655 observed : ndarray
1656 Observed values with shape ``(n_obs,)`` (e.g., extraction
1657 concentrations).
1658 n_output : int
1659 Length of the output vector (e.g., number of cin bins).
1660 regularization_strength : float
1661 Tikhonov regularization parameter.
1662 valid_rows : ndarray of bool, optional
1663 Which observation rows are valid, with shape ``(n_obs,)``. If None,
1664 rows with ``row_sum > 1e-10`` are considered valid.
1665 warn_rank_deficient : bool, optional
1666 If True, emit a warning when the forward matrix has rank
1667 deficiency among its active columns. Default is False.
1669 Returns
1670 -------
1671 ndarray
1672 Recovered signal with shape ``(n_output,)``. NaN for bins with no
1673 active columns.
1675 Warns
1676 -----
1677 UserWarning
1678 When ``warn_rank_deficient=True`` and the matrix is rank-deficient.
1679 """
1680 row_sums = w_forward.sum(axis=1)
1681 col_active: npt.NDArray[np.bool_] = w_forward.sum(axis=0) > 0
1683 if not np.any(col_active):
1684 return np.full(n_output, np.nan)
1686 if warn_rank_deficient:
1687 n_active = int(col_active.sum())
1688 rank = np.linalg.matrix_rank(w_forward[:, col_active])
1689 if rank < n_active:
1690 warnings.warn(
1691 f"Forward matrix is rank-deficient (rank {rank} < {n_active} active "
1692 f"columns). This occurs with constant flow when the residence time "
1693 f"is an integer multiple of the time step width. The "
1694 f"underdetermined modes will be pulled toward the regularization "
1695 f"target instead of being determined by data. To achieve full rank, "
1696 f"adjust aquifer_pore_volumes slightly (e.g., multiply by 1.001).",
1697 stacklevel=2,
1698 )
1700 valid: npt.NDArray[np.bool_] = row_sums > _EPSILON_COEFF_SUM if valid_rows is None else valid_rows
1702 rhs = np.where(valid, row_sums * observed, np.nan)
1703 w_solve = w_forward.copy()
1704 w_solve[~valid, :] = np.nan
1706 x_target = compute_reverse_target(coeff_matrix=w_forward, rhs_vector=observed)
1708 x_solved = solve_tikhonov(
1709 coefficient_matrix=w_solve,
1710 rhs_vector=rhs,
1711 x_target=x_target,
1712 regularization_strength=regularization_strength,
1713 )
1715 out = np.full(n_output, np.nan)
1716 idx = np.flatnonzero(col_active)
1717 out[idx] = x_solved[idx]
1718 return out
1721def _squared_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float:
1722 """Minimize sum of squared differences between adjacent elements.
1724 Parameters
1725 ----------
1726 coeffs : ndarray
1727 Nullspace coefficient vector.
1728 x_ls : ndarray
1729 Least-squares solution vector.
1730 nullspace_basis : ndarray
1731 Nullspace basis matrix.
1733 Returns
1734 -------
1735 float
1736 Sum of squared differences between adjacent elements of the solution.
1737 """
1738 x = x_ls + nullspace_basis @ coeffs
1739 return np.sum(np.square(x[1:] - x[:-1]))
1742def _summed_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float:
1743 """Minimize sum of absolute differences between adjacent elements.
1745 Parameters
1746 ----------
1747 coeffs : ndarray
1748 Nullspace coefficient vector.
1749 x_ls : ndarray
1750 Least-squares solution vector.
1751 nullspace_basis : ndarray
1752 Nullspace basis matrix.
1754 Returns
1755 -------
1756 float
1757 Sum of absolute differences between adjacent elements of the solution.
1758 """
1759 x = x_ls + nullspace_basis @ coeffs
1760 return np.sum(np.abs(x[1:] - x[:-1]))
1763def _get_nullspace_objective_function(
1764 *,
1765 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float],
1766) -> Callable[[np.ndarray, np.ndarray, np.ndarray], float]:
1767 """Get the objective function for nullspace optimization.
1769 Parameters
1770 ----------
1771 nullspace_objective : str or callable
1772 Objective identifier. Supported string values are
1773 ``'squared_differences'`` and ``'summed_differences'``. A callable
1774 with signature ``objective(coeffs, x_ls, nullspace_basis)`` is also
1775 accepted and returned as-is.
1777 Returns
1778 -------
1779 callable
1780 Objective function with signature
1781 ``(coeffs, x_ls, nullspace_basis) -> float``.
1783 Raises
1784 ------
1785 ValueError
1786 If ``nullspace_objective`` is an unrecognized string.
1787 """
1788 if nullspace_objective == "squared_differences":
1789 return _squared_differences_objective
1790 if nullspace_objective == "summed_differences":
1791 return _summed_differences_objective
1792 if callable(nullspace_objective):
1793 return nullspace_objective # type: ignore[return-value] # ty: ignore[invalid-return-type]
1794 msg = f"Unknown nullspace objective: {nullspace_objective}"
1795 raise ValueError(msg)