Coverage for src/gwtransport/utils.py: 85%

280 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 20:01 +0000

1"""General utilities for the 1D groundwater transport model.""" 

2 

3from __future__ import annotations 

4 

5import io 

6from datetime import date 

7from pathlib import Path 

8 

9import numpy as np 

10import numpy.typing as npt 

11import pandas as pd 

12import requests 

13from scipy import interpolate 

14from scipy.linalg import null_space 

15from scipy.optimize import minimize 

16 

17cache_dir = Path(__file__).parent.parent.parent / "cache" 

18 

19 

20def linear_interpolate( 

21 x_ref: npt.ArrayLike, y_ref: npt.ArrayLike, x_query: npt.ArrayLike, left=None, right=None 

22) -> np.ndarray: 

23 """ 

24 Linear interpolation on monotonically increasing data. 

25 

26 Parameters 

27 ---------- 

28 x_ref : array-like 

29 Reference vector with sorted x-values. 

30 y_ref : array-like 

31 Reference vector with y-values. 

32 x_query : array-like 

33 Query x-values. Array may have any shape. 

34 left : float, optional 

35 Value to return for x_query < x_ref[0]. 

36 - If `left` is set to None, x_query = x_ref[0]. 

37 - If `left` is set to a float, such as np.nan, this value is returned. 

38 right : float, optional 

39 Value to return for x_query > x_ref[-1]. 

40 - If `right` is set to None, x_query = x_ref[-1]. 

41 - If `right` is set to a float, such as np.nan, this value is returned. 

42 

43 Returns 

44 ------- 

45 numpy.ndarray 

46 Interpolated y-values. 

47 """ 

48 x_ref = np.asarray(x_ref) 

49 y_ref = np.asarray(y_ref) 

50 x_query = np.asarray(x_query) 

51 

52 # Find indices where x_query would be inserted in x_ref 

53 idx_no_edges = np.searchsorted(x_ref, x_query) 

54 

55 idx = np.clip(idx_no_edges, 1, len(x_ref) - 1) 

56 

57 # Calculate interpolation weights 

58 x0 = x_ref[idx - 1] 

59 x1 = x_ref[idx] 

60 y0 = y_ref[idx - 1] 

61 y1 = y_ref[idx] 

62 

63 # Perform linear interpolation 

64 weights = (x_query - x0) / (x1 - x0) 

65 y_query = y0 + weights * (y1 - y0) 

66 

67 # Handle edge cases 

68 if left is None: 

69 y_query = np.where(x_query < x_ref[0], y_ref[0], y_query) 

70 if right is None: 

71 y_query = np.where(x_query > x_ref[-1], y_ref[-1], y_query) 

72 if isinstance(left, float): 

73 y_query = np.where(x_query < x_ref[0], left, y_query) 

74 if isinstance(right, float): 

75 y_query = np.where(x_query > x_ref[-1], right, y_query) 

76 

77 return y_query 

78 

79 

80def interp_series(series: pd.Series, index_new: pd.DatetimeIndex, **interp1d_kwargs) -> pd.Series: 

81 """ 

82 Interpolate a pandas.Series to a new index. 

83 

84 Parameters 

85 ---------- 

86 series : pandas.Series 

87 Series to interpolate. 

88 index_new : pandas.DatetimeIndex 

89 New index to interpolate to. 

90 interp1d_kwargs : dict, optional 

91 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}. 

92 

93 Returns 

94 ------- 

95 pandas.Series 

96 Interpolated series. 

97 """ 

98 series = series[series.index.notna() & series.notna()] 

99 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D") 

100 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D") 

101 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs) 

102 return pd.Series(interp_obj(dt_interp), index=index_new) 

103 

104 

105def diff(a: npt.ArrayLike, alignment: str = "centered") -> np.ndarray: 

106 """Compute the cell widths for a given array of cell coordinates. 

107 

108 If alignment is "centered", the coordinates are assumed to be centered in the cells. 

109 If alignment is "left", the coordinates are assumed to be at the left edge of the cells. 

110 If alignment is "right", the coordinates are assumed to be at the right edge of the cells. 

111 

112 Parameters 

113 ---------- 

114 a : array-like 

115 Input array. 

116 

117 Returns 

118 ------- 

119 numpy.ndarray 

120 Array with differences between elements. 

121 """ 

122 if alignment == "centered": 

123 mid = a[:-1] + (a[1:] - a[:-1]) / 2 

124 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]])) 

125 if alignment == "left": 

126 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]])) 

127 if alignment == "right": 

128 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1])) 

129 

130 msg = f"Invalid alignment: {alignment}" 

131 raise ValueError(msg) 

132 

133 

134def linear_average( # noqa: C901 

135 x_data: npt.ArrayLike, 

136 y_data: npt.ArrayLike, 

137 x_edges: npt.ArrayLike, 

138 extrapolate_method: str = "nan", 

139) -> npt.NDArray[np.float64]: 

140 """ 

141 Compute the average value of a piecewise linear time series between specified x-edges. 

142 

143 Parameters 

144 ---------- 

145 x_data : array-like 

146 x-coordinates of the time series data points, must be in ascending order 

147 y_data : array-like 

148 y-coordinates of the time series data points 

149 x_edges : array-like 

150 x-coordinates of the integration edges. Can be 1D or 2D. 

151 - If 1D: shape (n_edges,). Can be 1D or 2D. 

152 - If 1D: shape (n_edges,), must be in ascending order 

153 - If 2D: shape (n_series, n_edges), each row must be in ascending order 

154 - If 2D: shape (n_series, n_edges), each row must be in ascending order 

155 extrapolate_method : str, optional 

156 Method for handling extrapolation. Default is 'nan'. 

157 - 'outer': Extrapolate using the outermost data points. 

158 - 'nan': Extrapolate using np.nan. 

159 - 'raise': Raise an error for out-of-bounds values. 

160 

161 Returns 

162 ------- 

163 numpy.ndarray 

164 2D array of average values between consecutive pairs of x_edges. 

165 Shape is (n_series, n_bins) where n_bins = n_edges - 1. 

166 If x_edges is 1D, n_series = 1. 

167 

168 Examples 

169 -------- 

170 >>> x_data = [0, 1, 2, 3] 

171 >>> y_data = [0, 1, 1, 0] 

172 >>> x_edges = [0, 1.5, 3] 

173 >>> linear_average(x_data, y_data, x_edges) 

174 array([[0.667, 0.667]]) 

175 

176 >>> x_edges_2d = [[0, 1.5, 3], [0.5, 2, 3]] 

177 >>> linear_average(x_data, y_data, x_edges_2d) 

178 array([[0.667, 0.667], [0.75, 0.5]]) 

179 """ 

180 # Convert inputs to numpy arrays 

181 x_data = np.asarray(x_data, dtype=float) 

182 y_data = np.asarray(y_data, dtype=float) 

183 x_edges = np.asarray(x_edges, dtype=float) 

184 

185 # Ensure x_edges is always 2D 

186 if x_edges.ndim == 1: 

187 x_edges = x_edges[np.newaxis, :] 

188 elif x_edges.ndim != 2: # noqa: PLR2004 

189 msg = "x_edges must be 1D or 2D array" 

190 raise ValueError(msg) 

191 

192 # Input validation 

193 if len(x_data) != len(y_data) or len(x_data) == 0: 

194 msg = "x_data and y_data must have the same length and be non-empty" 

195 raise ValueError(msg) 

196 if x_edges.shape[1] < 2: # noqa: PLR2004 

197 msg = "x_edges must contain at least 2 values in each row" 

198 raise ValueError(msg) 

199 if not np.all(np.diff(x_data) >= 0): 

200 msg = "x_data must be in ascending order" 

201 raise ValueError(msg) 

202 if not np.all(np.diff(x_edges, axis=1) >= 0): 

203 msg = "x_edges must be in ascending order along each row" 

204 raise ValueError(msg) 

205 

206 # Filter out NaN values 

207 show = ~np.isnan(x_data) & ~np.isnan(y_data) 

208 if show.sum() < 2: # noqa: PLR2004 

209 if show.sum() == 1 and extrapolate_method == "outer": 

210 # For single data point with outer extrapolation, use constant value 

211 constant_value = y_data[show][0] 

212 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=constant_value) 

213 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=np.nan) 

214 

215 x_data_clean = x_data[show] 

216 y_data_clean = y_data[show] 

217 

218 # Handle extrapolation for all series at once (vectorized) 

219 if extrapolate_method == "outer": 

220 edges_processed = np.clip(x_edges, x_data_clean.min(), x_data_clean.max()) 

221 elif extrapolate_method == "raise": 

222 if np.any(x_edges < x_data_clean.min()) or np.any(x_edges > x_data_clean.max()): 

223 msg = "x_edges must be within the range of x_data" 

224 raise ValueError(msg) 

225 edges_processed = x_edges.copy() 

226 else: # nan method 

227 edges_processed = x_edges.copy() 

228 

229 # Create a combined grid of all unique x points (data + all edges) 

230 all_unique_x = np.unique(np.concatenate([x_data_clean, edges_processed.ravel()])) 

231 

232 # Interpolate y values at all unique x points once 

233 all_unique_y: npt.NDArray[np.float64] = np.interp( 

234 all_unique_x, x_data_clean, y_data_clean, left=np.nan, right=np.nan 

235 ) 

236 

237 # Compute cumulative integrals once using trapezoidal rule 

238 dx = np.diff(all_unique_x) 

239 y_avg = (all_unique_y[:-1] + all_unique_y[1:]) / 2 

240 segment_integrals = dx * y_avg 

241 # Replace NaN values with 0 to avoid breaking cumulative sum 

242 segment_integrals = np.nan_to_num(segment_integrals, nan=0.0) 

243 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)]) 

244 

245 # Vectorized computation for all series 

246 # Find indices of all edges in the combined grid 

247 edge_indices: npt.NDArray[np.intp] = np.searchsorted(all_unique_x, edges_processed) 

248 

249 # Compute integral between consecutive edges for all series (vectorized) 

250 integral_values = cumulative_integral[edge_indices[:, 1:]] - cumulative_integral[edge_indices[:, :-1]] 

251 

252 # Compute widths between consecutive edges for all series (vectorized) 

253 edge_widths = np.diff(edges_processed, axis=1) 

254 

255 # Handle zero-width intervals (vectorized) 

256 zero_width_mask = edge_widths == 0 

257 result = np.zeros_like(edge_widths) 

258 

259 # For non-zero width intervals, compute average = integral / width (vectorized) 

260 non_zero_mask = ~zero_width_mask 

261 result[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask] 

262 

263 # For zero-width intervals, interpolate y-value directly (vectorized) 

264 if np.any(zero_width_mask): 

265 zero_width_positions = edges_processed[:, :-1][zero_width_mask] 

266 result[zero_width_mask] = np.interp(zero_width_positions, x_data_clean, y_data_clean) 

267 

268 # Handle extrapolation when 'nan' method is used (vectorized) 

269 if extrapolate_method == "nan": 

270 # Set out-of-range bins to NaN 

271 bins_within_range = (x_edges[:, :-1] >= x_data_clean.min()) & (x_edges[:, 1:] <= x_data_clean.max()) 

272 result[~bins_within_range] = np.nan 

273 

274 return result 

275 

276 

277def partial_isin(bin_edges_in: npt.ArrayLike, bin_edges_out: npt.ArrayLike) -> np.ndarray: 

278 """ 

279 Calculate the fraction of each input bin that overlaps with each output bin. 

280 

281 This function computes a matrix where element (i, j) represents the fraction 

282 of input bin i that overlaps with output bin j. The computation uses 

283 vectorized operations to avoid loops. 

284 

285 Parameters 

286 ---------- 

287 bin_edges_in : array-like 

288 1D array of input bin edges in ascending order. For n_in bins, there 

289 should be n_in+1 edges. 

290 bin_edges_out : array-like 

291 1D array of output bin edges in ascending order. For n_out bins, there 

292 should be n_out+1 edges. 

293 

294 Returns 

295 ------- 

296 overlap_matrix : numpy.ndarray 

297 Dense matrix of shape (n_in, n_out) where n_in is the number of input 

298 bins and n_out is the number of output bins. Each element (i, j) 

299 represents the fraction of input bin i that overlaps with output bin j. 

300 Values range from 0 (no overlap) to 1 (complete overlap). 

301 

302 Notes 

303 ----- 

304 - Both input arrays must be sorted in ascending order 

305 - The function leverages the sorted nature of both inputs for efficiency 

306 - Uses vectorized operations to handle large bin arrays efficiently 

307 - All overlaps sum to 1.0 for each input bin when output bins fully cover input range 

308 

309 Examples 

310 -------- 

311 >>> bin_edges_in = np.array([0, 10, 20, 30]) 

312 >>> bin_edges_out = np.array([5, 15, 25]) 

313 >>> partial_isin(bin_edges_in, bin_edges_out) 

314 array([[0.5, 0.0], 

315 [0.5, 0.5], 

316 [0.0, 0.5]]) 

317 """ 

318 # Convert inputs to numpy arrays 

319 bin_edges_in = np.asarray(bin_edges_in, dtype=float) 

320 bin_edges_out = np.asarray(bin_edges_out, dtype=float) 

321 

322 # Validate inputs 

323 if bin_edges_in.ndim != 1 or bin_edges_out.ndim != 1: 

324 msg = "Both bin_edges_in and bin_edges_out must be 1D arrays" 

325 raise ValueError(msg) 

326 if len(bin_edges_in) < 2 or len(bin_edges_out) < 2: # noqa: PLR2004 

327 msg = "Both edge arrays must have at least 2 elements" 

328 raise ValueError(msg) 

329 

330 # Check ascending order, ignoring NaN values 

331 diffs_in = np.diff(bin_edges_in) 

332 valid_diffs_in = ~np.isnan(diffs_in) 

333 if np.any(valid_diffs_in) and not np.all(diffs_in[valid_diffs_in] > 0): 

334 msg = "bin_edges_in must be in ascending order" 

335 raise ValueError(msg) 

336 

337 diffs_out = np.diff(bin_edges_out) 

338 valid_diffs_out = ~np.isnan(diffs_out) 

339 if np.any(valid_diffs_out) and not np.all(diffs_out[valid_diffs_out] > 0): 

340 msg = "bin_edges_out must be in ascending order" 

341 raise ValueError(msg) 

342 

343 # Build matrix using fully vectorized approach 

344 # Create meshgrids for all possible input-output bin combinations 

345 in_left = bin_edges_in[:-1, None] # Shape: (n_bins_in, 1) 

346 in_right = bin_edges_in[1:, None] # Shape: (n_bins_in, 1) 

347 in_width = np.diff(bin_edges_in)[:, None] # Shape: (n_bins_in, 1) 

348 

349 out_left = bin_edges_out[None, :-1] # Shape: (1, n_bins_out) 

350 out_right = bin_edges_out[None, 1:] # Shape: (1, n_bins_out) 

351 

352 # Calculate overlaps for all combinations using broadcasting 

353 overlap_left = np.maximum(in_left, out_left) # Shape: (n_bins_in, n_bins_out) 

354 overlap_right = np.minimum(in_right, out_right) # Shape: (n_bins_in, n_bins_out) 

355 

356 # Calculate overlap widths (zero where no overlap) 

357 overlap_widths = np.maximum(0, overlap_right - overlap_left) 

358 

359 # Calculate fractions (NaN widths will result in NaN fractions) 

360 return overlap_widths / in_width 

361 

362 

363def time_bin_overlap(tedges: npt.ArrayLike, bin_tedges: list[tuple]) -> np.ndarray: 

364 """ 

365 Calculate the fraction of each time bin that overlaps with each time range. 

366 

367 This function computes an array where element (i, j) represents the fraction 

368 of time bin j that overlaps with time range i. The computation uses 

369 vectorized operations to avoid loops. 

370 

371 Parameters 

372 ---------- 

373 tedges : array-like 

374 1D array of time bin edges in ascending order. For n bins, there 

375 should be n+1 edges. 

376 bin_tedges : list of tuples 

377 List of tuples where each tuple contains (start_time, end_time) 

378 defining a time range. 

379 

380 Returns 

381 ------- 

382 overlap_array : numpy.ndarray 

383 Array of shape (len(bin_tedges), n_bins) where n_bins is the number of 

384 time bins. Each element (i, j) represents the fraction of time bin j 

385 that overlaps with time range i. Values range from 0 (no overlap) to 

386 1 (complete overlap). 

387 

388 Notes 

389 ----- 

390 - tedges must be sorted in ascending order 

391 - Uses vectorized operations to handle large arrays efficiently 

392 - Time ranges in bin_tedges can be in any order and can overlap 

393 

394 Examples 

395 -------- 

396 >>> tedges = np.array([0, 10, 20, 30]) 

397 >>> bin_tedges = [(5, 15), (25, 35)] 

398 >>> time_bin_overlap(tedges, bin_tedges) 

399 array([[0.5, 0.5, 0.0], 

400 [0.0, 0.0, 0.5]]) 

401 """ 

402 # Convert inputs to numpy arrays 

403 tedges = np.asarray(tedges) 

404 bin_tedges_array = np.asarray(bin_tedges) 

405 

406 # Validate inputs 

407 if tedges.ndim != 1: 

408 msg = "tedges must be a 1D array" 

409 raise ValueError(msg) 

410 if len(tedges) < 2: # noqa: PLR2004 

411 msg = "tedges must have at least 2 elements" 

412 raise ValueError(msg) 

413 if bin_tedges_array.size == 0: 

414 msg = "bin_tedges must be non-empty" 

415 raise ValueError(msg) 

416 

417 # Calculate overlaps for all combinations using broadcasting 

418 overlap_left = np.maximum(bin_tedges_array[:, [0]], tedges[None, :-1]) 

419 overlap_right = np.minimum(bin_tedges_array[:, [1]], tedges[None, 1:]) 

420 overlap_widths = np.maximum(0, overlap_right - overlap_left) 

421 

422 # Calculate fractions (handle division by zero for zero-width bins) 

423 bin_width_bc = np.diff(tedges)[None, :] # Shape: (1, n_bins) 

424 

425 return np.divide( 

426 overlap_widths, bin_width_bc, out=np.zeros_like(overlap_widths, dtype=float), where=bin_width_bc != 0.0 

427 ) 

428 

429 

430def generate_failed_coverage_badge() -> None: 

431 """Generate a badge indicating failed coverage.""" 

432 from genbadge import Badge # type: ignore # noqa: PLC0415 

433 

434 b = Badge(left_txt="coverage", right_txt="failed", color="red") 

435 b.write_to("coverage_failed.svg", use_shields=False) 

436 

437 

438def combine_bin_series( 

439 a: npt.ArrayLike, 

440 a_edges: npt.ArrayLike, 

441 b: npt.ArrayLike, 

442 b_edges: npt.ArrayLike, 

443 extrapolation: str | float = 0.0, 

444) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 

445 """ 

446 Combine two binned series onto a common set of unique edges. 

447 

448 This function takes two binned series (a and b) with their respective bin edges 

449 and creates new series (c and d) that are defined on a combined set of unique 

450 edges from both input edge arrays. 

451 

452 Parameters 

453 ---------- 

454 a : array-like 

455 Values for the first binned series. 

456 a_edges : array-like 

457 Bin edges for the first series. Must have len(a) + 1 elements. 

458 b : array-like 

459 Values for the second binned series. 

460 b_edges : array-like 

461 Bin edges for the second series. Must have len(b) + 1 elements. 

462 extrapolation : str or float, optional 

463 Method for handling combined bins that fall outside the original series ranges. 

464 - 'nearest': Use the nearest original bin value 

465 - float value (e.g., np.nan, 0.0): Fill with the specified value (default: 0.0) 

466 

467 Returns 

468 ------- 

469 c : numpy.ndarray 

470 Values from series a mapped to the combined edge structure. 

471 c_edges : numpy.ndarray 

472 Combined unique edges from a_edges and b_edges. 

473 d : numpy.ndarray 

474 Values from series b mapped to the combined edge structure. 

475 d_edges : numpy.ndarray 

476 Combined unique edges from a_edges and b_edges (same as c_edges). 

477 

478 Notes 

479 ----- 

480 The combined edges are created by taking the union of all unique values 

481 from both a_edges and b_edges, sorted in ascending order. The values 

482 are then broadcasted/repeated for each combined bin that falls within 

483 the original bin's range. 

484 """ 

485 # Convert inputs to numpy arrays 

486 a = np.asarray(a, dtype=float) 

487 a_edges = np.asarray(a_edges, dtype=float) 

488 b = np.asarray(b, dtype=float) 

489 b_edges = np.asarray(b_edges, dtype=float) 

490 

491 # Validate inputs 

492 if len(a_edges) != len(a) + 1: 

493 msg = "a_edges must have len(a) + 1 elements" 

494 raise ValueError(msg) 

495 if len(b_edges) != len(b) + 1: 

496 msg = "b_edges must have len(b) + 1 elements" 

497 raise ValueError(msg) 

498 

499 # Create combined unique edges 

500 combined_edges = np.unique(np.concatenate([a_edges, b_edges])) 

501 

502 # Initialize output arrays 

503 c = np.zeros(len(combined_edges) - 1) 

504 d = np.zeros(len(combined_edges) - 1) 

505 

506 # Vectorized mapping using searchsorted - find which original bin each combined bin belongs to 

507 # For series a: find which original bin each combined bin center falls into 

508 combined_bin_centers = (combined_edges[:-1] + combined_edges[1:]) / 2 

509 a_bin_assignment = np.searchsorted(a_edges, combined_bin_centers, side="right") - 1 

510 a_bin_assignment = np.clip(a_bin_assignment, 0, len(a) - 1) 

511 

512 # Handle extrapolation for series a 

513 if extrapolation == "nearest": 

514 # Assign all values using nearest neighbor (already clipped) 

515 c[:] = a[a_bin_assignment] 

516 else: 

517 # Only assign values where the combined bin is completely within the original bin 

518 a_valid_mask = (combined_edges[:-1] >= a_edges[a_bin_assignment]) & ( 

519 combined_edges[1:] <= a_edges[a_bin_assignment + 1] 

520 ) 

521 c[a_valid_mask] = a[a_bin_assignment[a_valid_mask]] 

522 # Fill out-of-range bins with extrapolation value 

523 c[~a_valid_mask] = extrapolation 

524 

525 # Handle extrapolation for series b 

526 b_bin_assignment = np.searchsorted(b_edges, combined_bin_centers, side="right") - 1 

527 b_bin_assignment = np.clip(b_bin_assignment, 0, len(b) - 1) 

528 

529 if extrapolation == "nearest": 

530 # Assign all values using nearest neighbor (already clipped) 

531 d[:] = b[b_bin_assignment] 

532 else: 

533 # Only assign values where the combined bin is completely within the original bin 

534 b_valid_mask = (combined_edges[:-1] >= b_edges[b_bin_assignment]) & ( 

535 combined_edges[1:] <= b_edges[b_bin_assignment + 1] 

536 ) 

537 d[b_valid_mask] = b[b_bin_assignment[b_valid_mask]] 

538 # Fill out-of-range bins with extrapolation value 

539 d[~b_valid_mask] = extrapolation 

540 

541 # Return the combined series 

542 c_edges = combined_edges 

543 d_edges = combined_edges.copy() 

544 

545 return c, c_edges, d, d_edges 

546 

547 

548def compute_time_edges( 

549 tedges: pd.DatetimeIndex | None, 

550 tstart: pd.DatetimeIndex | None, 

551 tend: pd.DatetimeIndex | None, 

552 number_of_bins: int, 

553) -> pd.DatetimeIndex: 

554 """ 

555 Compute time edges for binning data based on provided time parameters. 

556 

557 This function creates a DatetimeIndex of time bin edges from one of three possible 

558 input formats: explicit edges, start times, or end times. The resulting edges 

559 define the boundaries of time intervals for data binning. 

560 

561 Define either explicit time edges, or start and end times for each bin and leave the others at None. 

562 

563 Parameters 

564 ---------- 

565 tedges : pandas.DatetimeIndex or None 

566 Explicit time edges for the bins. If provided, must have one more element 

567 than the number of bins (n_bins + 1). Takes precedence over tstart and tend. 

568 tstart : pandas.DatetimeIndex or None 

569 Start times for each bin. Must have the same number of elements as the 

570 number of bins. Used when tedges is None. 

571 tend : pandas.DatetimeIndex or None 

572 End times for each bin. Must have the same number of elements as the 

573 number of bins. Used when both tedges and tstart are None. 

574 number_of_bins : int 

575 The expected number of time bins. Used for validation against the provided 

576 time parameters. 

577 

578 Returns 

579 ------- 

580 pandas.DatetimeIndex 

581 Time edges defining the boundaries of the time bins. Has one more element 

582 than number_of_bins. 

583 

584 Raises 

585 ------ 

586 ValueError 

587 If tedges has incorrect length (not number_of_bins + 1). 

588 If tstart has incorrect length (not equal to number_of_bins). 

589 If tend has incorrect length (not equal to number_of_bins). 

590 If none of tedges, tstart, or tend are provided. 

591 

592 Notes 

593 ----- 

594 - When using tstart, the function assumes uniform spacing and extrapolates 

595 the final edge based on the spacing between the last two start times. 

596 - When using tend, the function assumes uniform spacing and extrapolates 

597 the first edge based on the spacing between the first two end times. 

598 - All input time data is converted to pandas.DatetimeIndex for consistency. 

599 """ 

600 if tedges is not None: 

601 tedges = pd.DatetimeIndex(tedges) 

602 if number_of_bins != len(tedges) - 1: 

603 msg = "tedges must have one more element than flow" 

604 raise ValueError(msg) 

605 

606 elif tstart is not None: 

607 # Assume the index refers to the time at the start of the measurement interval 

608 tstart = pd.DatetimeIndex(tstart) 

609 if number_of_bins != len(tstart): 

610 msg = "tstart must have the same number of elements as flow" 

611 raise ValueError(msg) 

612 

613 tedges = tstart.append(tstart[[-1]] + (tstart[-1] - tstart[-2])) 

614 

615 elif tend is not None: 

616 # Assume the index refers to the time at the end of the measurement interval 

617 tend = pd.DatetimeIndex(tend) 

618 if number_of_bins != len(tend): 

619 msg = "tend must have the same number of elements as flow" 

620 raise ValueError(msg) 

621 

622 tedges = (tend[[0]] - (tend[1] - tend[0])).append(tend) 

623 

624 else: 

625 msg = "Either provide tedges, tstart, and tend" 

626 raise ValueError(msg) 

627 return tedges 

628 

629 

630def get_soil_temperature(station_number: int = 260, *, interpolate_missing_values: bool = True) -> pd.DataFrame: 

631 """ 

632 Download soil temperature data from the KNMI and return it as a pandas DataFrame. 

633 

634 The data is available for the following KNMI weather stations: 

635 - 260: De Bilt, the Netherlands (vanaf 1981) 

636 - 273: Marknesse, the Netherlands (vanaf 1989) 

637 - 286: Nieuw Beerta, the Netherlands (vanaf 1990) 

638 - 323: Wilhelminadorp, the Netherlands (vanaf 1989) 

639 

640 TB1 = grondtemperatuur op 5 cm diepte (graden Celsius) tijdens de waarneming 

641 TB2 = grondtemperatuur op 10 cm diepte (graden Celsius) tijdens de waarneming 

642 TB3 = grondtemperatuur op 20 cm diepte (graden Celsius) tijdens de waarneming 

643 TB4 = grondtemperatuur op 50 cm diepte (graden Celsius) tijdens de waarneming 

644 TB5 = grondtemperatuur op 100 cm diepte (graden Celsius) tijdens de waarneming 

645 TNB2 = minimum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius) 

646 TNB1 = minimum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius) 

647 TXB1 = maximum grondtemperatuur op 5 cm diepte in de afgelopen 6 uur (graden Celsius) 

648 TXB2 = maximum grondtemperatuur op 10 cm diepte in de afgelopen 6 uur (graden Celsius) 

649 

650 Parameters 

651 ---------- 

652 station_number : int, {260, 273, 286, 323} 

653 The KNMI station number for which to download soil temperature data. 

654 Default is 260 (De Bilt). 

655 interpolate_missing_values : bool, optional 

656 If True, missing values are interpolated and recent NaN values are extrapolated with the previous value. 

657 If False, missing values remain as NaN. Default is True. 

658 

659 Returns 

660 ------- 

661 pandas.DataFrame 

662 DataFrame containing soil temperature data in Celsius with a DatetimeIndex. 

663 Columns include TB1, TB2, TB3, TB4, TB5, TNB1, TNB2, TXB1, TXB2. 

664 

665 Notes 

666 ----- 

667 - KNMI: Royal Netherlands Meteorological Institute 

668 - The timeseries may contain NaN values for missing data. 

669 """ 

670 # File-based daily cache 

671 cache_dir.mkdir(exist_ok=True) 

672 

673 today = date.today().isoformat() # noqa: DTZ011 

674 cache_path = cache_dir / f"soil_temp_{station_number}_{interpolate_missing_values}_{today}.pkl" 

675 

676 # Check if cached file exists and is from today 

677 if cache_path.exists(): 

678 return pd.read_pickle(cache_path) # noqa: S301 

679 

680 # Clean up old cache files to prevent disk bloat 

681 for old_file in cache_dir.glob(f"soil_temp_{station_number}_{interpolate_missing_values}_*.pkl"): 

682 old_file.unlink(missing_ok=True) 

683 

684 url = f"https://cdn.knmi.nl/knmi/map/page/klimatologie/gegevens/bodemtemps/bodemtemps_{station_number}.zip" 

685 

686 dtypes = { 

687 "YYYYMMDD": "int32", 

688 "HH": "int8", 

689 " TB1": "float32", 

690 " TB3": "float32", 

691 " TB2": "float32", 

692 " TB4": "float32", 

693 " TB5": "float32", 

694 " TNB1": "float32", 

695 " TNB2": "float32", 

696 " TXB1": "float32", 

697 " TXB2": "float32", 

698 } 

699 

700 # Download the ZIP file 

701 with requests.get(url, params={"download": "zip"}, timeout=10) as response: 

702 response.raise_for_status() 

703 

704 df = pd.read_csv( 

705 io.BytesIO(response.content), 

706 compression="zip", 

707 dtype=dtypes, 

708 usecols=dtypes.keys(), 

709 skiprows=16, 

710 sep=",", 

711 index_col=None, 

712 na_values=[" "], 

713 engine="c", 

714 parse_dates=False, 

715 ) 

716 

717 df.index = pd.to_datetime(df["YYYYMMDD"].values, format=r"%Y%m%d").tz_localize("UTC") + pd.to_timedelta( 

718 df["HH"].values, unit="h" 

719 ) 

720 

721 df.drop(columns=["YYYYMMDD", "HH"], inplace=True) 

722 df.columns = df.columns.str.strip() 

723 df /= 10.0 

724 

725 if interpolate_missing_values: 

726 # Fill NaN values with interpolate linearly and then forward fill 

727 df.interpolate(method="linear", inplace=True) 

728 df.ffill(inplace=True) 

729 

730 # Save to cache for future use 

731 df.to_pickle(cache_path) 

732 return df 

733 

734 

735def solve_underdetermined_system( 

736 coefficient_matrix, 

737 rhs_vector, 

738 *, 

739 nullspace_objective="squared_differences", 

740 optimization_method="BFGS", 

741): 

742 """ 

743 Solve an underdetermined linear system with nullspace regularization. 

744 

745 For an underdetermined system Ax = b where A has more columns than rows, 

746 multiple solutions exist. This function computes a least-squares solution 

747 and then selects a specific solution from the nullspace based on a 

748 regularization objective. 

749 

750 Parameters 

751 ---------- 

752 coefficient_matrix : array-like 

753 Coefficient matrix of shape (m, n) where m < n (underdetermined). 

754 May contain NaN values in some rows, which will be excluded from the system. 

755 rhs_vector : array-like 

756 Right-hand side vector of length m. May contain NaN values corresponding 

757 to NaN rows in coefficient_matrix, which will be excluded from the system. 

758 nullspace_objective : str or callable, optional 

759 Objective function to minimize in the nullspace. Options: 

760 

761 * "squared_differences" : Minimize sum of squared differences between 

762 adjacent elements: ``sum((x[i+1] - x[i])**2)`` 

763 * "summed_differences" : Minimize sum of absolute differences between 

764 adjacent elements: ``sum(|x[i+1] - x[i]|)`` 

765 * callable : Custom objective function with signature 

766 ``objective(coeffs, x_ls, nullspace_basis)`` where: 

767 

768 - coeffs : optimization variables (nullspace coefficients) 

769 - x_ls : least-squares solution 

770 - nullspace_basis : nullspace basis matrix 

771 

772 Default is "squared_differences". 

773 optimization_method : str, optional 

774 Optimization method passed to scipy.optimize.minimize. 

775 Default is "BFGS". 

776 

777 Returns 

778 ------- 

779 numpy.ndarray 

780 Solution vector that minimizes the specified nullspace objective. 

781 Has length n (number of columns in coefficient_matrix). 

782 

783 Raises 

784 ------ 

785 ValueError 

786 If optimization fails, if coefficient_matrix and rhs_vector have incompatible shapes, 

787 or if an unknown nullspace objective is specified. 

788 

789 Notes 

790 ----- 

791 The algorithm follows these steps: 

792 

793 1. Remove rows with NaN values from both coefficient_matrix and rhs_vector 

794 2. Compute least-squares solution: x_ls = pinv(valid_matrix) @ valid_rhs 

795 3. Compute nullspace basis: N = null_space(valid_matrix) 

796 4. Find nullspace coefficients: coeffs = argmin objective(x_ls + N @ coeffs) 

797 5. Return final solution: x = x_ls + N @ coeffs 

798 

799 For the built-in objectives: 

800 

801 * "squared_differences" provides smooth solutions, minimizing rapid changes 

802 * "summed_differences" provides sparse solutions, promoting piecewise constant behavior 

803 

804 Examples 

805 -------- 

806 Basic usage with default squared differences objective: 

807 

808 >>> import numpy as np 

809 >>> from gwtransport.utils import solve_underdetermined_system 

810 >>> 

811 >>> # Create underdetermined system (2 equations, 4 unknowns) 

812 >>> matrix = np.array([[1, 2, 1, 0], [0, 1, 2, 1]]) 

813 >>> rhs = np.array([3, 4]) 

814 >>> 

815 >>> # Solve with squared differences regularization 

816 >>> x = solve_underdetermined_system(matrix, rhs) 

817 >>> print(f"Solution: {x}") 

818 >>> print(f"Residual: {np.linalg.norm(matrix @ x - rhs):.2e}") 

819 

820 With summed differences objective: 

821 

822 >>> x_sparse = solve_underdetermined_system( 

823 ... matrix, rhs, nullspace_objective="summed_differences" 

824 ... ) 

825 

826 With custom objective function: 

827 

828 >>> def custom_objective(coeffs, x_ls, nullspace_basis): 

829 ... x = x_ls + nullspace_basis @ coeffs 

830 ... return np.sum(x**2) # Minimize L2 norm 

831 >>> 

832 >>> x_custom = solve_underdetermined_system( 

833 ... matrix, rhs, nullspace_objective=custom_objective 

834 ... ) 

835 

836 Handling NaN values: 

837 

838 >>> # System with missing data 

839 >>> matrix_nan = np.array([ 

840 ... [1, 2, 1, 0], 

841 ... [np.nan, np.nan, np.nan, np.nan], 

842 ... [0, 1, 2, 1], 

843 ... ]) 

844 >>> rhs_nan = np.array([3, np.nan, 4]) 

845 >>> 

846 >>> x_nan = solve_underdetermined_system(matrix_nan, rhs_nan) 

847 """ 

848 matrix = np.asarray(coefficient_matrix) 

849 rhs = np.asarray(rhs_vector) 

850 

851 if matrix.shape[0] != len(rhs): 

852 msg = f"coefficient_matrix has {matrix.shape[0]} rows but rhs_vector has {len(rhs)} elements" 

853 raise ValueError(msg) 

854 

855 # Identify valid rows (no NaN values in either matrix or rhs) 

856 valid_rows = ~np.isnan(matrix).any(axis=1) & ~np.isnan(rhs) 

857 

858 if not np.any(valid_rows): 

859 msg = "No valid rows found (all contain NaN values)" 

860 raise ValueError(msg) 

861 

862 valid_matrix = matrix[valid_rows] 

863 valid_rhs = rhs[valid_rows] 

864 

865 # Compute least-squares solution 

866 x_ls, *_ = np.linalg.lstsq(valid_matrix, valid_rhs, rcond=None) 

867 

868 # Compute nullspace 

869 nullspace_basis = null_space(valid_matrix, rcond=None) 

870 nullrank = nullspace_basis.shape[1] 

871 

872 if nullrank == 0: 

873 # System is determined, return least-squares solution 

874 return x_ls 

875 

876 # Optimize in nullspace 

877 coeffs = _optimize_nullspace_coefficients(x_ls, nullspace_basis, nullspace_objective, optimization_method) 

878 

879 return x_ls + nullspace_basis @ coeffs 

880 

881 

882def _optimize_nullspace_coefficients( 

883 x_ls: np.ndarray, nullspace_basis: np.ndarray, nullspace_objective: str, optimization_method: str 

884) -> np.ndarray: 

885 """Optimize coefficients in the nullspace to minimize the objective.""" 

886 nullrank = nullspace_basis.shape[1] 

887 objective_func = _get_nullspace_objective_function(nullspace_objective) 

888 coeffs_0 = np.zeros(nullrank) 

889 

890 # For stability, always start with squared differences if using summed differences 

891 if nullspace_objective == "summed_differences": 

892 res_init = minimize( 

893 _squared_differences_objective, 

894 x0=coeffs_0, 

895 args=(x_ls, nullspace_basis), 

896 method=optimization_method, 

897 ) 

898 if not res_init.success: 

899 msg = f"Initial optimization failed: {res_init.message}" 

900 raise ValueError(msg) 

901 coeffs_0 = res_init.x 

902 

903 # Final optimization with target objective 

904 res = minimize( 

905 objective_func, 

906 x0=coeffs_0, 

907 args=(x_ls, nullspace_basis), 

908 method=optimization_method, 

909 ) 

910 

911 if not res.success: 

912 msg = f"Optimization failed: {res.message}" 

913 raise ValueError(msg) 

914 

915 return res.x 

916 

917 

918def _squared_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float: 

919 """Minimize sum of squared differences between adjacent elements.""" 

920 x = x_ls + nullspace_basis @ coeffs 

921 return np.sum(np.square(x[1:] - x[:-1])) 

922 

923 

924def _summed_differences_objective(coeffs: np.ndarray, x_ls: np.ndarray, nullspace_basis: np.ndarray) -> float: 

925 """Minimize sum of absolute differences between adjacent elements.""" 

926 x = x_ls + nullspace_basis @ coeffs 

927 return np.sum(np.abs(x[1:] - x[:-1])) 

928 

929 

930def _get_nullspace_objective_function(nullspace_objective: str): 

931 """Get the objective function for nullspace optimization.""" 

932 if nullspace_objective == "squared_differences": 

933 return _squared_differences_objective 

934 if nullspace_objective == "summed_differences": 

935 return _summed_differences_objective 

936 if callable(nullspace_objective): 

937 return nullspace_objective 

938 msg = f"Unknown nullspace objective: {nullspace_objective}" 

939 raise ValueError(msg)