Coverage for src / gwtransport / utils.py: 83%

276 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-04 18:51 +0000

1""" 

2General Utilities for 1D Groundwater Transport Modeling. 

3 

4This module provides general-purpose utility functions for time series manipulation, 

5interpolation, numerical operations, and data processing used throughout the gwtransport 

6package. Functions include linear interpolation/averaging, bin overlap calculations, 

7underdetermined system solvers, and external data retrieval. 

8 

9Available functions: 

10 

11- :func:`linear_interpolate` - Linear interpolation using numpy's optimized interp function. 

12 Automatically handles unsorted data with configurable extrapolation (None for clamping, 

13 float for constant values). Handles multi-dimensional query arrays. 

14 

15- :func:`interp_series` - Interpolate pandas Series to new DatetimeIndex using 

16 scipy.interpolate.interp1d. Automatically filters NaN values and converts datetime to 

17 numerical representation. 

18 

19- :func:`linear_average` - Compute average values of piecewise linear time series between 

20 specified x-edges. Supports 1D or 2D edge arrays for batch processing. Handles NaN values 

21 and offers multiple extrapolation methods ('nan', 'outer', 'raise'). 

22 

23- :func:`diff` - Compute cell widths from cell coordinate arrays with configurable alignment 

24 ('centered', 'left', 'right'). Returns widths matching input array length. 

25 

26- :func:`partial_isin` - Calculate fraction of each input bin overlapping with each output bin. 

27 Returns dense matrix where element (i,j) represents overlap fraction. Uses vectorized 

28 operations for efficiency. 

29 

30- :func:`time_bin_overlap` - Calculate fraction of time bins overlapping with specified time 

31 ranges. Similar to partial_isin but for time-based bin overlaps with list of (start, end) 

32 tuples. 

33 

34- :func:`combine_bin_series` - Combine two binned series onto common set of unique edges. Maps 

35 values from original bins to new combined structure with configurable extrapolation ('nearest' 

36 or float value). 

37 

38- :func:`compute_time_edges` - Compute DatetimeIndex of time bin edges from explicit edges, 

39 start times, or end times. Validates consistency with expected number of bins and handles 

40 uniform spacing extrapolation. 

41 

42- :func:`solve_underdetermined_system` - Solve underdetermined linear system (Ax = b, m < n) 

43 with nullspace regularization. Handles NaN values by row exclusion. Supports built-in 

44 objectives ('squared_differences', 'summed_differences') or custom callable objectives. 

45 

46- :func:`get_soil_temperature` - Download soil temperature data from KNMI weather stations with 

47 automatic caching. Supports stations 260 (De Bilt), 273 (Marknesse), 286 (Nieuw Beerta), 

48 323 (Wilhelminadorp). Returns DataFrame with columns TB1-TB5, TNB1-TNB2, TXB1-TXB2 at various 

49 depths. Daily cache prevents redundant downloads. 

50 

51- :func:`generate_failed_coverage_badge` - Generate SVG badge indicating failed coverage using 

52 genbadge library. Used in CI/CD workflows. 

53 

54This file is part of gwtransport which is released under AGPL-3.0 license. 

55See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

56""" 

57 

58from __future__ import annotations 

59 

60import io 

61from collections.abc import Callable 

62from datetime import date 

63from pathlib import Path 

64from typing import cast 

65 

66import numpy as np 

67import numpy.typing as npt 

68import pandas as pd 

69import requests 

70from scipy import interpolate 

71from scipy.linalg import null_space 

72from scipy.optimize import minimize 

73 

74cache_dir = Path(__file__).parent.parent.parent / "cache" 

75 

76 

77def linear_interpolate( 

78 *, 

79 x_ref: npt.ArrayLike, 

80 y_ref: npt.ArrayLike, 

81 x_query: npt.ArrayLike, 

82 left: float | None = None, 

83 right: float | None = None, 

84) -> npt.NDArray[np.floating]: 

85 """ 

86 Linear interpolation using numpy's optimized interp function. 

87 

88 Automatically handles unsorted reference data by sorting it first. 

89 

90 Parameters 

91 ---------- 

92 x_ref : array-like 

93 Reference x-values. If unsorted, will be automatically sorted. 

94 y_ref : array-like 

95 Reference y-values corresponding to x_ref. 

96 x_query : array-like 

97 Query x-values where interpolation is needed. Array may have any shape. 

98 left : float, optional 

99 Value to return for x_query < x_ref[0]. 

100 

101 - If ``left=None``: clamp to y_ref[0] (default) 

102 - If ``left=float``: use specified value (e.g., ``np.nan``) 

103 

104 right : float, optional 

105 Value to return for x_query > x_ref[-1]. 

106 

107 - If ``right=None``: clamp to y_ref[-1] (default) 

108 - If ``right=float``: use specified value (e.g., ``np.nan``) 

109 

110 Returns 

111 ------- 

112 numpy.ndarray 

113 Interpolated y-values with the same shape as x_query. 

114 

115 Examples 

116 -------- 

117 Basic interpolation with clamping (default): 

118 

119 >>> import numpy as np 

120 >>> from gwtransport.utils import linear_interpolate 

121 >>> x_ref = np.array([1.0, 2.0, 3.0, 4.0]) 

122 >>> y_ref = np.array([10.0, 20.0, 30.0, 40.0]) 

123 >>> x_query = np.array([0.5, 1.5, 2.5, 3.5, 4.5]) 

124 >>> linear_interpolate(x_ref=x_ref, y_ref=y_ref, x_query=x_query) 

125 array([10., 15., 25., 35., 40.]) 

126 

127 Using NaN for extrapolation: 

128 

129 >>> linear_interpolate( 

130 ... x_ref=x_ref, y_ref=y_ref, x_query=x_query, left=np.nan, right=np.nan 

131 ... ) 

132 array([nan, 15., 25., 35., nan]) 

133 

134 Handles unsorted reference data automatically: 

135 

136 >>> x_unsorted = np.array([3.0, 1.0, 4.0, 2.0]) 

137 >>> y_unsorted = np.array([30.0, 10.0, 40.0, 20.0]) 

138 >>> linear_interpolate(x_ref=x_unsorted, y_ref=y_unsorted, x_query=x_query) 

139 array([10., 15., 25., 35., 40.]) 

140 

141 See Also 

142 -------- 

143 interp_series : Interpolate pandas Series with datetime index 

144 """ 

145 # Convert inputs to arrays 

146 x_ref = np.asarray(x_ref) 

147 y_ref = np.asarray(y_ref) 

148 x_query = np.asarray(x_query) 

149 

150 # Sort reference data to ensure monotonic ordering 

151 sort_idx = np.argsort(x_ref) 

152 x_ref_sorted = x_ref[sort_idx] 

153 y_ref_sorted = y_ref[sort_idx] 

154 

155 # Default behavior (left=None, right=None) clamps to boundary values 

156 return np.interp(x_query, x_ref_sorted, y_ref_sorted, left=left, right=right) 

157 

158 

159def interp_series(*, series: pd.Series, index_new: pd.DatetimeIndex, **interp1d_kwargs: object) -> pd.Series: 

160 """ 

161 Interpolate a pandas.Series to a new index. 

162 

163 Parameters 

164 ---------- 

165 series : pandas.Series 

166 Series to interpolate. 

167 index_new : pandas.DatetimeIndex 

168 New index to interpolate to. 

169 interp1d_kwargs : dict, optional 

170 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}. 

171 

172 Returns 

173 ------- 

174 pandas.Series 

175 Interpolated series. 

176 """ 

177 series = series[series.index.notna() & series.notna()] 

178 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D") 

179 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D") 

180 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs) 

181 return pd.Series(interp_obj(dt_interp), index=index_new) 

182 

183 

184def diff(*, a: npt.ArrayLike, alignment: str = "centered") -> npt.NDArray[np.floating]: 

185 """Compute the cell widths for a given array of cell coordinates. 

186 

187 If alignment is "centered", the coordinates are assumed to be centered in the cells. 

188 If alignment is "left", the coordinates are assumed to be at the left edge of the cells. 

189 If alignment is "right", the coordinates are assumed to be at the right edge of the cells. 

190 

191 Parameters 

192 ---------- 

193 a : array-like 

194 Input array. 

195 

196 Returns 

197 ------- 

198 numpy.ndarray 

199 Array with differences between elements. 

200 """ 

201 # Convert input to array 

202 a = np.asarray(a) 

203 

204 if alignment == "centered": 

205 mid = a[:-1] + (a[1:] - a[:-1]) / 2 

206 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]])) 

207 if alignment == "left": 

208 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]])) 

209 if alignment == "right": 

210 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1])) 

211 

212 msg = f"Invalid alignment: {alignment}" 

213 raise ValueError(msg) 

214 

215 

216def linear_average( 

217 *, 

218 x_data: npt.ArrayLike, 

219 y_data: npt.ArrayLike, 

220 x_edges: npt.ArrayLike, 

221 extrapolate_method: str = "nan", 

222) -> npt.NDArray[np.float64]: 

223 """ 

224 Compute the average value of a piecewise linear time series between specified x-edges. 

225 

226 Parameters 

227 ---------- 

228 x_data : array-like 

229 x-coordinates of the time series data points, must be in ascending order 

230 y_data : array-like 

231 y-coordinates of the time series data points 

232 x_edges : array-like 

233 x-coordinates of the integration edges. Can be 1D or 2D. 

234 - If 1D: shape (n_edges,). Can be 1D or 2D. 

235 - If 1D: shape (n_edges,), must be in ascending order 

236 - If 2D: shape (n_series, n_edges), each row must be in ascending order 

237 - If 2D: shape (n_series, n_edges), each row must be in ascending order 

238 extrapolate_method : str, optional 

239 Method for handling extrapolation. Default is 'nan'. 

240 - 'outer': Extrapolate using the outermost data points. 

241 - 'nan': Extrapolate using np.nan. 

242 - 'raise': Raise an error for out-of-bounds values. 

243 

244 Returns 

245 ------- 

246 numpy.ndarray 

247 2D array of average values between consecutive pairs of x_edges. 

248 Shape is (n_series, n_bins) where n_bins = n_edges - 1. 

249 If x_edges is 1D, n_series = 1. 

250 

251 Examples 

252 -------- 

253 >>> import numpy as np 

254 >>> from gwtransport.utils import linear_average 

255 >>> x_data = [0, 1, 2, 3] 

256 >>> y_data = [0, 1, 1, 0] 

257 >>> x_edges = [0, 1.5, 3] 

258 >>> linear_average( 

259 ... x_data=x_data, y_data=y_data, x_edges=x_edges 

260 ... ) # doctest: +ELLIPSIS 

261 array([[0.666..., 0.666...]]) 

262 

263 >>> x_edges_2d = [[0, 1.5, 3], [0.5, 2, 3]] 

264 >>> linear_average(x_data=x_data, y_data=y_data, x_edges=x_edges_2d) 

265 array([[0.66666667, 0.66666667], 

266 [0.91666667, 0.5 ]]) 

267 """ 

268 # Convert inputs to numpy arrays 

269 x_data = np.asarray(x_data, dtype=float) 

270 y_data = np.asarray(y_data, dtype=float) 

271 x_edges = np.asarray(x_edges, dtype=float) 

272 

273 # Ensure x_edges is always 2D 

274 if x_edges.ndim == 1: 

275 x_edges = x_edges[np.newaxis, :] 

276 elif x_edges.ndim != 2: # noqa: PLR2004 

277 msg = "x_edges must be 1D or 2D array" 

278 raise ValueError(msg) 

279 

280 # Input validation 

281 if len(x_data) != len(y_data) or len(x_data) == 0: 

282 msg = "x_data and y_data must have the same length and be non-empty" 

283 raise ValueError(msg) 

284 if x_edges.shape[1] < 2: # noqa: PLR2004 

285 msg = "x_edges must contain at least 2 values in each row" 

286 raise ValueError(msg) 

287 if not np.all(np.diff(x_data) >= 0): 

288 msg = "x_data must be in ascending order" 

289 raise ValueError(msg) 

290 if not np.all(np.diff(x_edges, axis=1) >= 0): 

291 msg = "x_edges must be in ascending order along each row" 

292 raise ValueError(msg) 

293 

294 # Filter out NaN values 

295 show = ~np.isnan(x_data) & ~np.isnan(y_data) 

296 if show.sum() < 2: # noqa: PLR2004 

297 if show.sum() == 1 and extrapolate_method == "outer": 

298 # For single data point with outer extrapolation, use constant value 

299 constant_value = y_data[show][0] 

300 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=constant_value) 

301 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=np.nan) 

302 

303 x_data_clean = x_data[show] 

304 y_data_clean = y_data[show] 

305 

306 # Handle extrapolation for all series at once (vectorized) 

307 if extrapolate_method == "outer": 

308 edges_processed = np.clip(x_edges, x_data_clean[0], x_data_clean[-1]) 

309 elif extrapolate_method == "raise": 

310 if np.any(x_edges < x_data_clean[0]) or np.any(x_edges > x_data_clean[-1]): 

311 msg = "x_edges must be within the range of x_data" 

312 raise ValueError(msg) 

313 edges_processed = x_edges.copy() 

314 else: # nan method 

315 edges_processed = x_edges.copy() 

316 

317 # Create a combined grid of all unique x points (data + all edges) 

318 all_unique_x = np.unique(np.concatenate([x_data_clean, edges_processed.ravel()])) 

319 

320 # Interpolate y values at all unique x points once 

321 all_unique_y_result = np.interp(all_unique_x, x_data_clean, y_data_clean, left=np.nan, right=np.nan) 

322 # Ensure it's an array for type checker 

323 all_unique_y: npt.NDArray[np.float64] = np.asarray(all_unique_y_result, dtype=np.float64) 

324 

325 # Compute cumulative integrals once using trapezoidal rule 

326 dx = np.diff(all_unique_x) 

327 y_avg = (all_unique_y[:-1] + all_unique_y[1:]) / 2 

328 segment_integrals = dx * y_avg 

329 # Replace NaN values with 0 to avoid breaking cumulative sum. 

330 # NaN occurs for segments outside the data range (extrapolation='nan'). 

331 # Setting NaN to 0 means boundary bins with partial data coverage have their 

332 # average biased low: the integral covers only the valid portion but the 

333 # width denominator below uses the full bin width. A proper fix would track 

334 # the covered fraction per bin. 

335 segment_integrals = np.nan_to_num(segment_integrals, nan=0.0) 

336 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)]) 

337 

338 # Vectorized computation for all series 

339 # Find indices of all edges in the combined grid 

340 edge_indices_result = np.searchsorted(all_unique_x, edges_processed) 

341 # Ensure it's a 2D array for type checker 

342 edge_indices: npt.NDArray[np.intp] = np.asarray(edge_indices_result, dtype=np.intp).reshape(edges_processed.shape) 

343 

344 # Compute integral between consecutive edges for all series (vectorized) 

345 integral_values = cumulative_integral[edge_indices[:, 1:]] - cumulative_integral[edge_indices[:, :-1]] 

346 

347 # Compute widths between consecutive edges for all series (vectorized) 

348 edge_widths = np.diff(edges_processed, axis=1) 

349 

350 # Handle zero-width intervals (vectorized) 

351 zero_width_mask = edge_widths == 0 

352 result = np.zeros_like(edge_widths) 

353 

354 # For non-zero width intervals, compute average = integral / width (vectorized) 

355 non_zero_mask = ~zero_width_mask 

356 result[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask] 

357 

358 # For zero-width intervals, interpolate y-value directly (vectorized) 

359 if np.any(zero_width_mask): 

360 zero_width_positions = edges_processed[:, :-1][zero_width_mask] 

361 result[zero_width_mask] = np.interp(zero_width_positions, x_data_clean, y_data_clean) 

362 

363 # Handle extrapolation when 'nan' method is used (vectorized) 

364 if extrapolate_method == "nan": 

365 # Set out-of-range bins to NaN 

366 bins_within_range = (x_edges[:, :-1] >= x_data_clean[0]) & (x_edges[:, 1:] <= x_data_clean[-1]) 

367 result[~bins_within_range] = np.nan 

368 

369 return result 

370 

371 

372def partial_isin(*, bin_edges_in: npt.ArrayLike, bin_edges_out: npt.ArrayLike) -> npt.NDArray[np.floating]: 

373 """ 

374 Calculate the fraction of each input bin that overlaps with each output bin. 

375 

376 This function computes a matrix where element (i, j) represents the fraction 

377 of input bin i that overlaps with output bin j. The computation uses 

378 vectorized operations to avoid loops. 

379 

380 Parameters 

381 ---------- 

382 bin_edges_in : array-like 

383 1D array of input bin edges in ascending order. For n_in bins, there 

384 should be n_in+1 edges. 

385 bin_edges_out : array-like 

386 1D array of output bin edges in ascending order. For n_out bins, there 

387 should be n_out+1 edges. 

388 

389 Returns 

390 ------- 

391 overlap_matrix : numpy.ndarray 

392 Dense matrix of shape (n_in, n_out) where n_in is the number of input 

393 bins and n_out is the number of output bins. Each element (i, j) 

394 represents the fraction of input bin i that overlaps with output bin j. 

395 Values range from 0 (no overlap) to 1 (complete overlap). 

396 

397 Notes 

398 ----- 

399 - Both input arrays must be sorted in ascending order 

400 - The function leverages the sorted nature of both inputs for efficiency 

401 - Uses vectorized operations to handle large bin arrays efficiently 

402 - All overlaps sum to 1.0 for each input bin when output bins fully cover input range 

403 

404 Examples 

405 -------- 

406 >>> import numpy as np 

407 >>> from gwtransport.utils import partial_isin 

408 >>> bin_edges_in = np.array([0, 10, 20, 30]) 

409 >>> bin_edges_out = np.array([5, 15, 25]) 

410 >>> partial_isin( 

411 ... bin_edges_in=bin_edges_in, bin_edges_out=bin_edges_out 

412 ... ) # doctest: +NORMALIZE_WHITESPACE 

413 array([[0.5, 0. ], 

414 [0.5, 0.5], 

415 [0. , 0.5]]) 

416 """ 

417 # Convert inputs to numpy arrays 

418 bin_edges_in = np.asarray(bin_edges_in, dtype=float) 

419 bin_edges_out = np.asarray(bin_edges_out, dtype=float) 

420 

421 # Validate inputs 

422 if bin_edges_in.ndim != 1 or bin_edges_out.ndim != 1: 

423 msg = "Both bin_edges_in and bin_edges_out must be 1D arrays" 

424 raise ValueError(msg) 

425 if len(bin_edges_in) < 2 or len(bin_edges_out) < 2: # noqa: PLR2004 

426 msg = "Both edge arrays must have at least 2 elements" 

427 raise ValueError(msg) 

428 

429 # Check ascending order, ignoring NaN values 

430 diffs_in = np.diff(bin_edges_in) 

431 valid_diffs_in = ~np.isnan(diffs_in) 

432 if np.any(valid_diffs_in) and not np.all(diffs_in[valid_diffs_in] > 0): 

433 msg = "bin_edges_in must be in ascending order" 

434 raise ValueError(msg) 

435 

436 diffs_out = np.diff(bin_edges_out) 

437 valid_diffs_out = ~np.isnan(diffs_out) 

438 if np.any(valid_diffs_out) and not np.all(diffs_out[valid_diffs_out] > 0): 

439 msg = "bin_edges_out must be in ascending order" 

440 raise ValueError(msg) 

441 

442 # Build matrix using fully vectorized approach 

443 # Create meshgrids for all possible input-output bin combinations 

444 in_left = bin_edges_in[:-1, None] # Shape: (n_bins_in, 1) 

445 in_right = bin_edges_in[1:, None] # Shape: (n_bins_in, 1) 

446 in_width = np.diff(bin_edges_in)[:, None] # Shape: (n_bins_in, 1) 

447 

448 out_left = bin_edges_out[None, :-1] # Shape: (1, n_bins_out) 

449 out_right = bin_edges_out[None, 1:] # Shape: (1, n_bins_out) 

450 

451 # Calculate overlaps for all combinations using broadcasting 

452 overlap_left = np.maximum(in_left, out_left) # Shape: (n_bins_in, n_bins_out) 

453 overlap_right = np.minimum(in_right, out_right) # Shape: (n_bins_in, n_bins_out) 

454 

455 # Calculate overlap widths (zero where no overlap) 

456 overlap_widths = np.maximum(0, overlap_right - overlap_left) 

457 

458 # Calculate fractions (NaN widths will result in NaN fractions) 

459 return overlap_widths / in_width 

460 

461 

462def time_bin_overlap(*, tedges: npt.ArrayLike, bin_tedges: list[tuple]) -> npt.NDArray[np.floating]: 

463 """ 

464 Calculate the fraction of each time bin that overlaps with each time range. 

465 

466 This function computes an array where element (i, j) represents the fraction 

467 of time bin j that overlaps with time range i. The computation uses 

468 vectorized operations to avoid loops. 

469 

470 Parameters 

471 ---------- 

472 tedges : array-like 

473 1D array of time bin edges in ascending order. For n bins, there 

474 should be n+1 edges. 

475 bin_tedges : list of tuple 

476 List of tuples where each tuple contains ``(start_time, end_time)`` 

477 defining a time range. 

478 

479 Returns 

480 ------- 

481 overlap_array : numpy.ndarray 

482 Array of shape (len(bin_tedges), n_bins) where n_bins is the number of 

483 time bins. Each element (i, j) represents the fraction of time bin j 

484 that overlaps with time range i. Values range from 0 (no overlap) to 

485 1 (complete overlap). 

486 

487 Notes 

488 ----- 

489 - tedges must be sorted in ascending order 

490 - Uses vectorized operations to handle large arrays efficiently 

491 - Time ranges in bin_tedges can be in any order and can overlap 

492 

493 Examples 

494 -------- 

495 >>> import numpy as np 

496 >>> from gwtransport.utils import time_bin_overlap 

497 >>> tedges = np.array([0, 10, 20, 30]) 

498 >>> bin_tedges = [(5, 15), (25, 35)] 

499 >>> time_bin_overlap( 

500 ... tedges=tedges, bin_tedges=bin_tedges 

501 ... ) # doctest: +NORMALIZE_WHITESPACE 

502 array([[0.5, 0.5, 0. ], 

503 [0. , 0. , 0.5]]) 

504 """ 

505 # Convert inputs to numpy arrays 

506 tedges = np.asarray(tedges) 

507 bin_tedges_array = np.asarray(bin_tedges) 

508 

509 # Validate inputs 

510 if tedges.ndim != 1: 

511 msg = "tedges must be a 1D array" 

512 raise ValueError(msg) 

513 if len(tedges) < 2: # noqa: PLR2004 

514 msg = "tedges must have at least 2 elements" 

515 raise ValueError(msg) 

516 if bin_tedges_array.size == 0: 

517 msg = "bin_tedges must be non-empty" 

518 raise ValueError(msg) 

519 

520 # Calculate overlaps for all combinations using broadcasting 

521 overlap_left = np.maximum(bin_tedges_array[:, [0]], tedges[None, :-1]) 

522 overlap_right = np.minimum(bin_tedges_array[:, [1]], tedges[None, 1:]) 

523 overlap_widths = np.maximum(0, overlap_right - overlap_left) 

524 

525 # Calculate fractions (handle division by zero for zero-width bins) 

526 bin_width_bc = np.diff(tedges)[None, :] # Shape: (1, n_bins) 

527 

528 return np.divide( 

529 overlap_widths, bin_width_bc, out=np.zeros_like(overlap_widths, dtype=float), where=bin_width_bc != 0.0 

530 ) 

531 

532 

533def generate_failed_coverage_badge() -> None: 

534 """Generate a badge indicating failed coverage.""" 

535 from genbadge import Badge # type: ignore # noqa: PLC0415 

536 

537 b = Badge(left_txt="coverage", right_txt="failed", color="red") 

538 b.write_to("coverage_failed.svg", use_shields=False) 

539 

540 

541def combine_bin_series( 

542 *, 

543 a: npt.ArrayLike, 

544 a_edges: npt.ArrayLike, 

545 b: npt.ArrayLike, 

546 b_edges: npt.ArrayLike, 

547 extrapolation: str | float = 0.0, 

548) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 

549 """ 

550 Combine two binned series onto a common set of unique edges. 

551 

552 This function takes two binned series (a and b) with their respective bin edges 

553 and creates new series (c and d) that are defined on a combined set of unique 

554 edges from both input edge arrays. 

555 

556 Parameters 

557 ---------- 

558 a : array-like 

559 Values for the first binned series. 

560 a_edges : array-like 

561 Bin edges for the first series. Must have len(a) + 1 elements. 

562 b : array-like 

563 Values for the second binned series. 

564 b_edges : array-like 

565 Bin edges for the second series. Must have len(b) + 1 elements. 

566 extrapolation : str or float, optional 

567 Method for handling combined bins that fall outside the original series ranges. 

568 - 'nearest': Use the nearest original bin value 

569 - float value (e.g., np.nan, 0.0): Fill with the specified value (default: 0.0) 

570 

571 Returns 

572 ------- 

573 c : numpy.ndarray 

574 Values from series a mapped to the combined edge structure. 

575 c_edges : numpy.ndarray 

576 Combined unique edges from a_edges and b_edges. 

577 d : numpy.ndarray 

578 Values from series b mapped to the combined edge structure. 

579 d_edges : numpy.ndarray 

580 Combined unique edges from a_edges and b_edges (same as c_edges). 

581 

582 Notes 

583 ----- 

584 The combined edges are created by taking the union of all unique values 

585 from both a_edges and b_edges, sorted in ascending order. The values 

586 are then broadcasted/repeated for each combined bin that falls within 

587 the original bin's range. 

588 """ 

589 # Convert inputs to numpy arrays 

590 a = np.asarray(a, dtype=float) 

591 a_edges = np.asarray(a_edges, dtype=float) 

592 b = np.asarray(b, dtype=float) 

593 b_edges = np.asarray(b_edges, dtype=float) 

594 

595 # Validate inputs 

596 if len(a_edges) != len(a) + 1: 

597 msg = "a_edges must have len(a) + 1 elements" 

598 raise ValueError(msg) 

599 if len(b_edges) != len(b) + 1: 

600 msg = "b_edges must have len(b) + 1 elements" 

601 raise ValueError(msg) 

602 

603 # Create combined unique edges 

604 combined_edges = np.unique(np.concatenate([a_edges, b_edges])) 

605 

606 # Initialize output arrays 

607 c = np.zeros(len(combined_edges) - 1) 

608 d = np.zeros(len(combined_edges) - 1) 

609 

610 # Vectorized mapping using searchsorted - find which original bin each combined bin belongs to 

611 # For series a: find which original bin each combined bin center falls into 

612 combined_bin_centers = (combined_edges[:-1] + combined_edges[1:]) / 2 

613 a_bin_assignment_result = np.searchsorted(a_edges, combined_bin_centers, side="right") - 1 

614 # Ensure it's an array for type checker 

615 a_bin_assignment: npt.NDArray[np.intp] = np.asarray(a_bin_assignment_result, dtype=np.intp) 

616 a_bin_assignment = np.clip(a_bin_assignment, 0, len(a) - 1) 

617 

618 # Handle extrapolation for series a 

619 if extrapolation == "nearest": 

620 # Assign all values using nearest neighbor (already clipped) 

621 c[:] = a[a_bin_assignment] 

622 else: 

623 # Only assign values where the combined bin is completely within the original bin 

624 a_valid_mask = (combined_edges[:-1] >= a_edges[a_bin_assignment]) & ( 

625 combined_edges[1:] <= a_edges[a_bin_assignment + 1] 

626 ) 

627 c[a_valid_mask] = a[a_bin_assignment[a_valid_mask]] 

628 # Fill out-of-range bins with extrapolation value 

629 c[~a_valid_mask] = extrapolation 

630 

631 # Handle extrapolation for series b 

632 b_bin_assignment_result = np.searchsorted(b_edges, combined_bin_centers, side="right") - 1 

633 # Ensure it's an array for type checker 

634 b_bin_assignment: npt.NDArray[np.intp] = np.asarray(b_bin_assignment_result, dtype=np.intp) 

635 b_bin_assignment = np.clip(b_bin_assignment, 0, len(b) - 1) 

636 

637 if extrapolation == "nearest": 

638 # Assign all values using nearest neighbor (already clipped) 

639 d[:] = b[b_bin_assignment] 

640 else: 

641 # Only assign values where the combined bin is completely within the original bin 

642 b_valid_mask = (combined_edges[:-1] >= b_edges[b_bin_assignment]) & ( 

643 combined_edges[1:] <= b_edges[b_bin_assignment + 1] 

644 ) 

645 d[b_valid_mask] = b[b_bin_assignment[b_valid_mask]] 

646 # Fill out-of-range bins with extrapolation value 

647 d[~b_valid_mask] = extrapolation 

648 

649 # Return the combined series 

650 c_edges = combined_edges 

651 d_edges = combined_edges.copy() 

652 

653 return c, c_edges, d, d_edges 

654 

655 

656def compute_time_edges( 

657 *, 

658 tedges: pd.DatetimeIndex | None, 

659 tstart: pd.DatetimeIndex | None, 

660 tend: pd.DatetimeIndex | None, 

661 number_of_bins: int, 

662) -> pd.DatetimeIndex: 

663 """ 

664 Compute time edges for binning data based on provided time parameters. 

665 

666 This function creates a DatetimeIndex of time bin edges from one of three possible 

667 input formats: explicit edges, start times, or end times. The resulting edges 

668 define the boundaries of time intervals for data binning. 

669 

670 Define either explicit time edges, or start and end times for each bin and leave the others at None. 

671 

672 Parameters 

673 ---------- 

674 tedges : pandas.DatetimeIndex or None 

675 Explicit time edges for the bins. If provided, must have one more element 

676 than the number of bins (n_bins + 1). Takes precedence over tstart and tend. 

677 tstart : pandas.DatetimeIndex or None 

678 Start times for each bin. Must have the same number of elements as the 

679 number of bins. Used when tedges is None. 

680 tend : pandas.DatetimeIndex or None 

681 End times for each bin. Must have the same number of elements as the 

682 number of bins. Used when both tedges and tstart are None. 

683 number_of_bins : int 

684 The expected number of time bins. Used for validation against the provided 

685 time parameters. 

686 

687 Returns 

688 ------- 

689 pandas.DatetimeIndex 

690 Time edges defining the boundaries of the time bins. Has one more element 

691 than number_of_bins. 

692 

693 Raises 

694 ------ 

695 ValueError 

696 If tedges has incorrect length (not number_of_bins + 1). 

697 If tstart has incorrect length (not equal to number_of_bins). 

698 If tend has incorrect length (not equal to number_of_bins). 

699 If none of tedges, tstart, or tend are provided. 

700 

701 Notes 

702 ----- 

703 - When using tstart, the function assumes uniform spacing and extrapolates 

704 the final edge based on the spacing between the last two start times. 

705 - When using tend, the function assumes uniform spacing and extrapolates 

706 the first edge based on the spacing between the first two end times. 

707 - All input time data is converted to pandas.DatetimeIndex for consistency. 

708 """ 

709 if tedges is not None: 

710 if number_of_bins != len(tedges) - 1: 

711 msg = "tedges must have one more element than flow" 

712 raise ValueError(msg) 

713 tedges = pd.DatetimeIndex(tedges) 

714 # Ensure nanosecond precision while preserving timezone 

715 return tedges.as_unit("ns") 

716 

717 if tstart is not None: 

718 # Assume the index refers to the time at the start of the measurement interval 

719 tstart = pd.DatetimeIndex(tstart).as_unit("ns") 

720 if number_of_bins != len(tstart): 

721 msg = "tstart must have the same number of elements as flow" 

722 raise ValueError(msg) 

723 

724 # Extrapolate final edge using uniform spacing 

725 final_edge = tstart[-1] + (tstart[-1] - tstart[-2]) 

726 return pd.DatetimeIndex([*list(tstart), final_edge], dtype=tstart.dtype) 

727 

728 if tend is not None: 

729 # Assume the index refers to the time at the end of the measurement interval 

730 tend = pd.DatetimeIndex(tend).as_unit("ns") 

731 if number_of_bins != len(tend): 

732 msg = "tend must have the same number of elements as flow" 

733 raise ValueError(msg) 

734 

735 # Extrapolate initial edge using uniform spacing 

736 initial_edge = tend[0] - (tend[1] - tend[0]) 

737 return pd.DatetimeIndex([initial_edge, *list(tend)], dtype=tend.dtype) 

738 

739 msg = "Either provide tedges, tstart, and tend" 

740 raise ValueError(msg) 

741 

742 

743def get_soil_temperature(*, station_number: int = 260, interpolate_missing_values: bool = True) -> pd.DataFrame: 

744 """ 

745 Download soil temperature data from the KNMI and return it as a pandas DataFrame. 

746 

747 The data is available for the following KNMI weather stations: 

748 - 260: De Bilt, the Netherlands (vanaf 1981) 

749 - 273: Marknesse, the Netherlands (vanaf 1989) 

750 - 286: Nieuw Beerta, the Netherlands (vanaf 1990) 

751 - 323: Wilhelminadorp, the Netherlands (vanaf 1989) 

752 

753 TB1 = grondtemperatuur op 5 cm diepte (graden Celsius) tijdens de waarneming 

754 TB2 = grondtemperatuur op 10 cm diepte (graden Celsius) tijdens de waarneming 

755 TB3 = grondtemperatuur op 20 cm diepte (graden Celsius) tijdens de waarneming 

756 TB4 = grondtemperatuur op 50 cm diepte (graden Celsius) tijdens de waarneming 

757 TB5 = grondtemperatuur op 100 cm diepte (graden Celsius) tijdens de waarneming 

758 TNB2 = minimum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius) 

759 TNB1 = minimum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius) 

760 TXB1 = maximum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius) 

761 TXB2 = maximum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius) 

762 

763 Parameters 

764 ---------- 

765 station_number : int, {260, 273, 286, 323} 

766 The KNMI station number for which to download soil temperature data. 

767 Default is 260 (De Bilt). 

768 interpolate_missing_values : bool, optional 

769 If True, missing values are interpolated and recent NaN values are extrapolated with the previous value. 

770 If False, missing values remain as NaN. Default is True. 

771 

772 Returns 

773 ------- 

774 pandas.DataFrame 

775 DataFrame containing soil temperature data in Celsius with a DatetimeIndex. 

776 Columns include TB1, TB2, TB3, TB4, TB5, TNB1, TNB2, TXB1, TXB2. 

777 

778 Notes 

779 ----- 

780 - KNMI: Royal Netherlands Meteorological Institute 

781 - The timeseries may contain NaN values for missing data. 

782 """ 

783 # File-based daily cache 

784 cache_dir.mkdir(exist_ok=True) 

785 

786 today = date.today().isoformat() # noqa: DTZ011 

787 cache_path = cache_dir / f"soil_temp_{station_number}_{interpolate_missing_values}_{today}.pkl" 

788 

789 # Check if cached file exists and is from today 

790 if cache_path.exists(): 

791 return cast(pd.DataFrame, pd.read_pickle(cache_path)) # noqa: S301 

792 

793 # Clean up old cache files to prevent disk bloat 

794 for old_file in cache_dir.glob(f"soil_temp_{station_number}_{interpolate_missing_values}_*.pkl"): 

795 old_file.unlink(missing_ok=True) 

796 

797 url = f"https://cdn.knmi.nl/knmi/map/page/klimatologie/gegevens/bodemtemps/bodemtemps_{station_number}.zip" 

798 

799 dtypes = { 

800 "YYYYMMDD": "int32", 

801 "HH": "int8", 

802 " TB1": "float32", 

803 " TB3": "float32", 

804 " TB2": "float32", 

805 " TB4": "float32", 

806 " TB5": "float32", 

807 " TNB1": "float32", 

808 " TNB2": "float32", 

809 " TXB1": "float32", 

810 " TXB2": "float32", 

811 } 

812 

813 # Download the ZIP file 

814 with requests.get(url, params={"download": "zip"}, timeout=10) as response: 

815 response.raise_for_status() 

816 

817 df = pd.read_csv( # type: ignore[call-overload] 

818 io.BytesIO(response.content), 

819 compression="zip", 

820 dtype=dtypes, 

821 usecols=list(dtypes.keys()), 

822 skiprows=16, 

823 sep=",", 

824 na_values=[" "], 

825 engine="c", 

826 parse_dates=False, 

827 ) 

828 

829 df.index = pd.to_datetime(df["YYYYMMDD"].values, format=r"%Y%m%d").tz_localize("UTC") + pd.to_timedelta( 

830 df["HH"].values, unit="h" 

831 ) 

832 

833 df.drop(columns=["YYYYMMDD", "HH"], inplace=True) 

834 df.columns = df.columns.str.strip() 

835 df /= 10.0 

836 

837 if interpolate_missing_values: 

838 # Fill NaN values with interpolate linearly and then forward fill 

839 df.interpolate(method="linear", inplace=True) 

840 df.ffill(inplace=True) 

841 

842 # Save to cache for future use 

843 df.to_pickle(cache_path) 

844 return df 

845 

846 

847def solve_underdetermined_system( 

848 *, 

849 coefficient_matrix: npt.ArrayLike, 

850 rhs_vector: npt.ArrayLike, 

851 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float] = "squared_differences", 

852 optimization_method: str = "BFGS", 

853) -> npt.NDArray[np.floating]: 

854 """ 

855 Solve an underdetermined linear system with nullspace regularization. 

856 

857 For an underdetermined system Ax = b where A has more columns than rows, 

858 multiple solutions exist. This function computes a least-squares solution 

859 and then selects a specific solution from the nullspace based on a 

860 regularization objective. 

861 

862 Parameters 

863 ---------- 

864 coefficient_matrix : array-like 

865 Coefficient matrix of shape (m, n) where m < n (underdetermined). 

866 May contain NaN values in some rows, which will be excluded from the system. 

867 rhs_vector : array-like 

868 Right-hand side vector of length m. May contain NaN values corresponding 

869 to NaN rows in coefficient_matrix, which will be excluded from the system. 

870 nullspace_objective : str or callable, optional 

871 Objective function to minimize in the nullspace. Options: 

872 

873 * "squared_differences" : Minimize sum of squared differences between 

874 adjacent elements: ``sum((x[i+1] - x[i])**2)`` 

875 * "summed_differences" : Minimize sum of absolute differences between 

876 adjacent elements: ``sum(|x[i+1] - x[i]|)`` 

877 * callable : Custom objective function with signature 

878 ``objective(coeffs, x_ls, nullspace_basis)`` where: 

879 

880 - coeffs : optimization variables (nullspace coefficients) 

881 - x_ls : least-squares solution 

882 - nullspace_basis : nullspace basis matrix 

883 

884 Default is "squared_differences". 

885 optimization_method : str, optional 

886 Optimization method passed to scipy.optimize.minimize. 

887 Default is "BFGS". 

888 

889 Returns 

890 ------- 

891 numpy.ndarray 

892 Solution vector that minimizes the specified nullspace objective. 

893 Has length n (number of columns in coefficient_matrix). 

894 

895 Raises 

896 ------ 

897 ValueError 

898 If optimization fails, if coefficient_matrix and rhs_vector have incompatible shapes, 

899 or if an unknown nullspace objective is specified. 

900 

901 Notes 

902 ----- 

903 The algorithm follows these steps: 

904 

905 1. Remove rows with NaN values from both coefficient_matrix and rhs_vector 

906 2. Compute least-squares solution: x_ls = pinv(valid_matrix) @ valid_rhs 

907 3. Compute nullspace basis: N = null_space(valid_matrix) 

908 4. Find nullspace coefficients: coeffs = argmin objective(x_ls + N @ coeffs) 

909 5. Return final solution: x = x_ls + N @ coeffs 

910 

911 For the built-in objectives: 

912 

913 * "squared_differences" provides smooth solutions, minimizing rapid changes 

914 * "summed_differences" provides sparse solutions, promoting piecewise constant behavior 

915 

916 Examples 

917 -------- 

918 Basic usage with default squared differences objective: 

919 

920 >>> import numpy as np 

921 >>> from gwtransport.utils import solve_underdetermined_system 

922 >>> 

923 >>> # Create underdetermined system (2 equations, 4 unknowns) 

924 >>> matrix = np.array([[1, 2, 1, 0], [0, 1, 2, 1]]) 

925 >>> rhs = np.array([3, 4]) 

926 >>> 

927 >>> # Solve with squared differences regularization 

928 >>> x = solve_underdetermined_system(coefficient_matrix=matrix, rhs_vector=rhs) 

929 >>> print(f"Solution: {x}") # doctest: +SKIP 

930 >>> print(f"Residual: {np.linalg.norm(matrix @ x - rhs):.2e}") # doctest: +SKIP 

931 

932 With summed differences objective: 

933 

934 >>> x_sparse = solve_underdetermined_system( # doctest: +SKIP 

935 ... coefficient_matrix=matrix, 

936 ... rhs_vector=rhs, 

937 ... nullspace_objective="summed_differences", 

938 ... ) 

939 

940 With custom objective function: 

941 

942 >>> def custom_objective(coeffs, x_ls, nullspace_basis): 

943 ... x = x_ls + nullspace_basis @ coeffs 

944 ... return np.sum(x**2) # Minimize L2 norm 

945 >>> 

946 >>> x_custom = solve_underdetermined_system( # doctest: +SKIP 

947 ... coefficient_matrix=matrix, 

948 ... rhs_vector=rhs, 

949 ... nullspace_objective=custom_objective, 

950 ... ) 

951 

952 Handling NaN values: 

953 

954 >>> # System with missing data 

955 >>> matrix_nan = np.array([ 

956 ... [1, 2, 1, 0], 

957 ... [np.nan, np.nan, np.nan, np.nan], 

958 ... [0, 1, 2, 1], 

959 ... ]) 

960 >>> rhs_nan = np.array([3, np.nan, 4]) 

961 >>> 

962 >>> x_nan = solve_underdetermined_system( 

963 ... coefficient_matrix=matrix_nan, rhs_vector=rhs_nan 

964 ... ) # doctest: +SKIP 

965 """ 

966 matrix = np.asarray(coefficient_matrix) 

967 rhs = np.asarray(rhs_vector) 

968 

969 if matrix.shape[0] != len(rhs): 

970 msg = f"coefficient_matrix has {matrix.shape[0]} rows but rhs_vector has {len(rhs)} elements" 

971 raise ValueError(msg) 

972 

973 # Identify valid rows (no NaN values in either matrix or rhs) 

974 valid_rows = ~np.isnan(matrix).any(axis=1) & ~np.isnan(rhs) 

975 

976 if not np.any(valid_rows): 

977 msg = "No valid rows found (all contain NaN values)" 

978 raise ValueError(msg) 

979 

980 valid_matrix = matrix[valid_rows] 

981 valid_rhs = rhs[valid_rows] 

982 

983 # Compute least-squares solution 

984 x_ls, *_ = np.linalg.lstsq(valid_matrix, valid_rhs, rcond=None) 

985 

986 # Compute nullspace 

987 nullspace_basis = null_space(valid_matrix, rcond=None) 

988 nullrank = nullspace_basis.shape[1] 

989 

990 if nullrank == 0: 

991 # System is determined, return least-squares solution 

992 return x_ls 

993 

994 # Optimize in nullspace 

995 coeffs = _optimize_nullspace_coefficients( 

996 x_ls=x_ls, 

997 nullspace_basis=nullspace_basis, 

998 nullspace_objective=nullspace_objective, 

999 optimization_method=optimization_method, 

1000 ) 

1001 

1002 return x_ls + nullspace_basis @ coeffs 

1003 

1004 

1005def _optimize_nullspace_coefficients( 

1006 *, 

1007 x_ls: np.ndarray, 

1008 nullspace_basis: np.ndarray, 

1009 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float], 

1010 optimization_method: str, 

1011) -> npt.NDArray[np.floating]: 

1012 """Optimize coefficients in the nullspace to minimize the objective.""" 

1013 nullrank = nullspace_basis.shape[1] 

1014 objective_func = _get_nullspace_objective_function(nullspace_objective=nullspace_objective) 

1015 coeffs_0 = np.zeros(nullrank) 

1016 

1017 # For stability, always start with squared differences if using summed differences 

1018 if nullspace_objective == "summed_differences": 

1019 res_init = minimize( 

1020 _squared_differences_objective, 

1021 x0=coeffs_0, 

1022 args=(x_ls, nullspace_basis), 

1023 method=optimization_method, 

1024 ) 

1025 if not res_init.success: 

1026 msg = f"Initial optimization failed: {res_init.message}" 

1027 raise ValueError(msg) 

1028 coeffs_0 = res_init.x 

1029 

1030 # Final optimization with target objective 

1031 res = minimize( 

1032 objective_func, 

1033 x0=coeffs_0, 

1034 args=(x_ls, nullspace_basis), 

1035 method=optimization_method, 

1036 ) 

1037 

1038 if not res.success: 

1039 msg = f"Optimization failed: {res.message}" 

1040 raise ValueError(msg) 

1041 

1042 return res.x 

1043 

1044 

1045def _squared_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float: 

1046 """Minimize sum of squared differences between adjacent elements.""" 

1047 x = x_ls + nullspace_basis @ coeffs 

1048 return np.sum(np.square(x[1:] - x[:-1])) 

1049 

1050 

1051def _summed_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float: 

1052 """Minimize sum of absolute differences between adjacent elements.""" 

1053 x = x_ls + nullspace_basis @ coeffs 

1054 return np.sum(np.abs(x[1:] - x[:-1])) 

1055 

1056 

1057def _get_nullspace_objective_function( 

1058 *, 

1059 nullspace_objective: str | Callable[[np.ndarray, np.ndarray, np.ndarray], float], 

1060) -> Callable[[np.ndarray, np.ndarray, np.ndarray], float]: 

1061 """Get the objective function for nullspace optimization.""" 

1062 if nullspace_objective == "squared_differences": 

1063 return _squared_differences_objective 

1064 if nullspace_objective == "summed_differences": 

1065 return _summed_differences_objective 

1066 if callable(nullspace_objective): 

1067 return nullspace_objective # type: ignore[return-value] 

1068 msg = f"Unknown nullspace objective: {nullspace_objective}" 

1069 raise ValueError(msg)