Coverage for src/gwtransport/utils.py: 22%

182 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-22 18:30 +0000

1"""General utilities for the 1D groundwater transport model.""" 

2 

3from collections.abc import Sequence 

4 

5import numpy as np 

6import numpy.typing as npt 

7import pandas as pd 

8from scipy import interpolate 

9 

10 

11def linear_interpolate(x_ref, y_ref, x_query, left=None, right=None): 

12 """ 

13 Linear interpolation on monotonically increasing data. 

14 

15 Parameters 

16 ---------- 

17 x_ref : array-like 

18 Reference vector with sorted x-values. 

19 y_ref : array-like 

20 Reference vector with y-values. 

21 x_query : array-like 

22 Query x-values. Array may have any shape. 

23 left : float, optional 

24 Value to return for x_query < x_ref[0]. 

25 - If `left` is set to None, x_query = x_ref[0]. 

26 - If `left` is set to a float, such as np.nan, this value is returned. 

27 right : float, optional 

28 Value to return for x_query > x_ref[-1]. 

29 - If `right` is set to None, x_query = x_ref[-1]. 

30 - If `right` is set to a float, such as np.nan, this value is returned. 

31 

32 Returns 

33 ------- 

34 array 

35 Interpolated y-values. 

36 """ 

37 x_ref = np.asarray(x_ref) 

38 y_ref = np.asarray(y_ref) 

39 x_query = np.asarray(x_query) 

40 

41 # Find indices where x_query would be inserted in x_ref 

42 idx_no_edges = np.searchsorted(x_ref, x_query) 

43 

44 idx = np.clip(idx_no_edges, 1, len(x_ref) - 1) 

45 

46 # Calculate interpolation weights 

47 x0 = x_ref[idx - 1] 

48 x1 = x_ref[idx] 

49 y0 = y_ref[idx - 1] 

50 y1 = y_ref[idx] 

51 

52 # Perform linear interpolation 

53 weights = (x_query - x0) / (x1 - x0) 

54 y_query = y0 + weights * (y1 - y0) 

55 

56 # Handle edge cases 

57 if left is None: 

58 y_query = np.where(x_query < x_ref[0], y_ref[0], y_query) 

59 if right is None: 

60 y_query = np.where(x_query > x_ref[-1], y_ref[-1], y_query) 

61 if isinstance(left, float): 

62 y_query = np.where(x_query < x_ref[0], left, y_query) 

63 if isinstance(right, float): 

64 y_query = np.where(x_query > x_ref[-1], right, y_query) 

65 

66 return y_query 

67 

68 

69def interp_series(series, index_new, **interp1d_kwargs): 

70 """ 

71 Interpolate a pandas.Series to a new index. 

72 

73 Parameters 

74 ---------- 

75 series : pandas.Series 

76 Series to interpolate. 

77 index_new : pandas.DatetimeIndex 

78 New index to interpolate to. 

79 interp1d_kwargs : dict, optional 

80 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}. 

81 

82 Returns 

83 ------- 

84 pandas.Series 

85 Interpolated series. 

86 """ 

87 series = series[series.index.notna() & series.notna()] 

88 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D") 

89 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D") 

90 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs) 

91 return interp_obj(dt_interp) 

92 

93 

94def diff(a, alignment="centered"): 

95 """Compute the cell widths for a given array of cell coordinates. 

96 

97 If alignment is "centered", the coordinates are assumed to be centered in the cells. 

98 If alignment is "left", the coordinates are assumed to be at the left edge of the cells. 

99 If alignment is "right", the coordinates are assumed to be at the right edge of the cells. 

100 

101 Parameters 

102 ---------- 

103 a : array-like 

104 Input array. 

105 

106 Returns 

107 ------- 

108 array 

109 Array with differences between elements. 

110 """ 

111 if alignment == "centered": 

112 mid = a[:-1] + (a[1:] - a[:-1]) / 2 

113 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]])) 

114 if alignment == "left": 

115 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]])) 

116 if alignment == "right": 

117 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1])) 

118 

119 msg = f"Invalid alignment: {alignment}" 

120 raise ValueError(msg) 

121 

122 

123def linear_average( # noqa: C901 

124 x_data: Sequence[float] | npt.NDArray[np.float64], 

125 y_data: Sequence[float] | npt.NDArray[np.float64], 

126 x_edges: Sequence[float] | npt.NDArray[np.float64], 

127 extrapolate_method: str = "nan", 

128) -> npt.NDArray[np.float64]: 

129 """ 

130 Compute the average value of a piecewise linear time series between specified x-edges. 

131 

132 Parameters 

133 ---------- 

134 x_data : array-like 

135 x-coordinates of the time series data points, must be in ascending order 

136 y_data : array-like 

137 y-coordinates of the time series data points 

138 x_edges : array-like 

139 x-coordinates of the integration edges. Can be 1D or 2D. 

140 - If 1D: shape (n_edges,). Can be 1D or 2D. 

141 - If 1D: shape (n_edges,), must be in ascending order 

142 - If 2D: shape (n_series, n_edges), each row must be in ascending order 

143 - If 2D: shape (n_series, n_edges), each row must be in ascending order 

144 extrapolate_method : str, optional 

145 Method for handling extrapolation. Default is 'nan'. 

146 - 'outer': Extrapolate using the outermost data points. 

147 - 'nan': Extrapolate using np.nan. 

148 - 'raise': Raise an error for out-of-bounds values. 

149 

150 Returns 

151 ------- 

152 numpy.ndarray 

153 2D array of average values between consecutive pairs of x_edges. 

154 Shape is (n_series, n_bins) where n_bins = n_edges - 1. 

155 If x_edges is 1D, n_series = 1. 

156 

157 Examples 

158 -------- 

159 >>> x_data = [0, 1, 2, 3] 

160 >>> y_data = [0, 1, 1, 0] 

161 >>> x_edges = [0, 1.5, 3] 

162 >>> linear_average(x_data, y_data, x_edges) 

163 array([[0.667, 0.667]]) 

164 

165 >>> x_edges_2d = [[0, 1.5, 3], [0.5, 2, 3]] 

166 >>> linear_average(x_data, y_data, x_edges_2d) 

167 array([[0.667, 0.667], [0.75, 0.5]]) 

168 """ 

169 # Convert inputs to numpy arrays 

170 x_data = np.asarray(x_data, dtype=float) 

171 y_data = np.asarray(y_data, dtype=float) 

172 x_edges = np.asarray(x_edges, dtype=float) 

173 

174 # Ensure x_edges is always 2D 

175 if x_edges.ndim == 1: 

176 x_edges = x_edges[np.newaxis, :] 

177 elif x_edges.ndim != 2: # noqa: PLR2004 

178 msg = "x_edges must be 1D or 2D array" 

179 raise ValueError(msg) 

180 

181 # Input validation 

182 if len(x_data) != len(y_data) or len(x_data) == 0: 

183 msg = "x_data and y_data must have the same length and be non-empty" 

184 raise ValueError(msg) 

185 if x_edges.shape[1] < 2: # noqa: PLR2004 

186 msg = "x_edges must contain at least 2 values in each row" 

187 raise ValueError(msg) 

188 if not np.all(np.diff(x_data) >= 0): 

189 msg = "x_data must be in ascending order" 

190 raise ValueError(msg) 

191 if not np.all(np.diff(x_edges, axis=1) >= 0): 

192 msg = "x_edges must be in ascending order along each row" 

193 raise ValueError(msg) 

194 

195 # Filter out NaN values 

196 show = ~np.isnan(x_data) & ~np.isnan(y_data) 

197 if show.sum() < 2: # noqa: PLR2004 

198 if show.sum() == 1 and extrapolate_method == "outer": 

199 # For single data point with outer extrapolation, use constant value 

200 constant_value = y_data[show][0] 

201 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=constant_value) 

202 return np.full(shape=(x_edges.shape[0], x_edges.shape[1] - 1), fill_value=np.nan) 

203 

204 x_data_clean = x_data[show] 

205 y_data_clean = y_data[show] 

206 

207 # Handle extrapolation for all series at once (vectorized) 

208 if extrapolate_method == "outer": 

209 edges_processed = np.clip(x_edges, x_data_clean.min(), x_data_clean.max()) 

210 elif extrapolate_method == "raise": 

211 if np.any(x_edges < x_data_clean.min()) or np.any(x_edges > x_data_clean.max()): 

212 msg = "x_edges must be within the range of x_data" 

213 raise ValueError(msg) 

214 edges_processed = x_edges.copy() 

215 else: # nan method 

216 edges_processed = x_edges.copy() 

217 

218 # Create a combined grid of all unique x points (data + all edges) 

219 all_unique_x = np.unique(np.concatenate([x_data_clean, edges_processed.ravel()])) 

220 

221 # Interpolate y values at all unique x points once 

222 all_unique_y = np.interp(all_unique_x, x_data_clean, y_data_clean, left=np.nan, right=np.nan) 

223 

224 # Compute cumulative integrals once using trapezoidal rule 

225 dx = np.diff(all_unique_x) 

226 y_avg = (all_unique_y[:-1] + all_unique_y[1:]) / 2 

227 segment_integrals = dx * y_avg 

228 # Replace NaN values with 0 to avoid breaking cumulative sum 

229 segment_integrals = np.nan_to_num(segment_integrals, nan=0.0) 

230 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)]) 

231 

232 # Vectorized computation for all series 

233 # Find indices of all edges in the combined grid 

234 edge_indices = np.searchsorted(all_unique_x, edges_processed) 

235 

236 # Compute integral between consecutive edges for all series (vectorized) 

237 integral_values = cumulative_integral[edge_indices[:, 1:]] - cumulative_integral[edge_indices[:, :-1]] 

238 

239 # Compute widths between consecutive edges for all series (vectorized) 

240 edge_widths = np.diff(edges_processed, axis=1) 

241 

242 # Handle zero-width intervals (vectorized) 

243 zero_width_mask = edge_widths == 0 

244 result = np.zeros_like(edge_widths) 

245 

246 # For non-zero width intervals, compute average = integral / width (vectorized) 

247 non_zero_mask = ~zero_width_mask 

248 result[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask] 

249 

250 # For zero-width intervals, interpolate y-value directly (vectorized) 

251 if np.any(zero_width_mask): 

252 zero_width_positions = edges_processed[:, :-1][zero_width_mask] 

253 result[zero_width_mask] = np.interp(zero_width_positions, x_data_clean, y_data_clean) 

254 

255 # Handle extrapolation when 'nan' method is used (vectorized) 

256 if extrapolate_method == "nan": 

257 # Set out-of-range bins to NaN 

258 bins_within_range = (x_edges[:, :-1] >= x_data_clean.min()) & (x_edges[:, 1:] <= x_data_clean.max()) 

259 result[~bins_within_range] = np.nan 

260 

261 return result 

262 

263 

264def partial_isin(bin_edges_in, bin_edges_out): 

265 """ 

266 Calculate the fraction of each input bin that overlaps with each output bin. 

267 

268 This function computes a matrix where element (i, j) represents the fraction 

269 of input bin i that overlaps with output bin j. The computation uses 

270 vectorized operations to avoid loops. 

271 

272 Parameters 

273 ---------- 

274 bin_edges_in : array_like 

275 1D array of input bin edges in ascending order. For n_in bins, there 

276 should be n_in+1 edges. 

277 bin_edges_out : array_like 

278 1D array of output bin edges in ascending order. For n_out bins, there 

279 should be n_out+1 edges. 

280 

281 Returns 

282 ------- 

283 overlap_matrix : numpy.ndarray 

284 Dense matrix of shape (n_in, n_out) where n_in is the number of input 

285 bins and n_out is the number of output bins. Each element (i, j) 

286 represents the fraction of input bin i that overlaps with output bin j. 

287 Values range from 0 (no overlap) to 1 (complete overlap). 

288 

289 Notes 

290 ----- 

291 - Both input arrays must be sorted in ascending order 

292 - The function leverages the sorted nature of both inputs for efficiency 

293 - Uses vectorized operations to handle large bin arrays efficiently 

294 - All overlaps sum to 1.0 for each input bin when output bins fully cover input range 

295 

296 Examples 

297 -------- 

298 >>> bin_edges_in = np.array([0, 10, 20, 30]) 

299 >>> bin_edges_out = np.array([5, 15, 25]) 

300 >>> partial_isin(bin_edges_in, bin_edges_out) 

301 array([[0.5, 0.0], 

302 [0.5, 0.5], 

303 [0.0, 0.5]]) 

304 """ 

305 # Convert inputs to numpy arrays 

306 bin_edges_in = np.asarray(bin_edges_in, dtype=float) 

307 bin_edges_out = np.asarray(bin_edges_out, dtype=float) 

308 

309 # Validate inputs 

310 if bin_edges_in.ndim != 1 or bin_edges_out.ndim != 1: 

311 msg = "Both bin_edges_in and bin_edges_out must be 1D arrays" 

312 raise ValueError(msg) 

313 if len(bin_edges_in) < 2 or len(bin_edges_out) < 2: # noqa: PLR2004 

314 msg = "Both edge arrays must have at least 2 elements" 

315 raise ValueError(msg) 

316 if not np.all(np.diff(bin_edges_in) > 0): 

317 msg = "bin_edges_in must be in ascending order" 

318 raise ValueError(msg) 

319 if not np.all(np.diff(bin_edges_out) > 0): 

320 msg = "bin_edges_out must be in ascending order" 

321 raise ValueError(msg) 

322 

323 # Create bin widths 

324 bin_widths_in = np.diff(bin_edges_in) 

325 

326 # Build matrix using fully vectorized approach 

327 # Create meshgrids for all possible input-output bin combinations 

328 in_left = bin_edges_in[:-1, None] # Shape: (n_bins_in, 1) 

329 in_right = bin_edges_in[1:, None] # Shape: (n_bins_in, 1) 

330 in_width = bin_widths_in[:, None] # Shape: (n_bins_in, 1) 

331 

332 out_left = bin_edges_out[None, :-1] # Shape: (1, n_bins_out) 

333 out_right = bin_edges_out[None, 1:] # Shape: (1, n_bins_out) 

334 

335 # Calculate overlaps for all combinations using broadcasting 

336 overlap_left = np.maximum(in_left, out_left) # Shape: (n_bins_in, n_bins_out) 

337 overlap_right = np.minimum(in_right, out_right) # Shape: (n_bins_in, n_bins_out) 

338 

339 # Calculate overlap widths (zero where no overlap) 

340 overlap_widths = np.maximum(0, overlap_right - overlap_left) 

341 

342 # Calculate fractions 

343 return overlap_widths / in_width 

344 

345 

346def generate_failed_coverage_badge(): 

347 """Generate a badge indicating failed coverage.""" 

348 from genbadge import Badge # type: ignore # noqa: PLC0415 

349 

350 b = Badge(left_txt="coverage", right_txt="failed", color="red") 

351 b.write_to("coverage_failed.svg", use_shields=False) 

352 

353 

354def combine_bin_series(a, a_edges, b, b_edges, extrapolation=0.0): 

355 """ 

356 Combine two binned series onto a common set of unique edges. 

357 

358 This function takes two binned series (a and b) with their respective bin edges 

359 and creates new series (c and d) that are defined on a combined set of unique 

360 edges from both input edge arrays. 

361 

362 Parameters 

363 ---------- 

364 a : array-like 

365 Values for the first binned series. 

366 a_edges : array-like 

367 Bin edges for the first series. Must have len(a) + 1 elements. 

368 b : array-like 

369 Values for the second binned series. 

370 b_edges : array-like 

371 Bin edges for the second series. Must have len(b) + 1 elements. 

372 extrapolation : str or float, optional 

373 Method for handling combined bins that fall outside the original series ranges. 

374 - 'nearest': Use the nearest original bin value 

375 - float value (e.g., np.nan, 0.0): Fill with the specified value (default: 0.0) 

376 

377 Returns 

378 ------- 

379 c : numpy.ndarray 

380 Values from series a mapped to the combined edge structure. 

381 c_edges : numpy.ndarray 

382 Combined unique edges from a_edges and b_edges. 

383 d : numpy.ndarray 

384 Values from series b mapped to the combined edge structure. 

385 d_edges : numpy.ndarray 

386 Combined unique edges from a_edges and b_edges (same as c_edges). 

387 

388 Notes 

389 ----- 

390 The combined edges are created by taking the union of all unique values 

391 from both a_edges and b_edges, sorted in ascending order. The values 

392 are then broadcasted/repeated for each combined bin that falls within 

393 the original bin's range. 

394 """ 

395 # Convert inputs to numpy arrays 

396 a = np.asarray(a, dtype=float) 

397 a_edges = np.asarray(a_edges, dtype=float) 

398 b = np.asarray(b, dtype=float) 

399 b_edges = np.asarray(b_edges, dtype=float) 

400 

401 # Validate inputs 

402 if len(a_edges) != len(a) + 1: 

403 msg = "a_edges must have len(a) + 1 elements" 

404 raise ValueError(msg) 

405 if len(b_edges) != len(b) + 1: 

406 msg = "b_edges must have len(b) + 1 elements" 

407 raise ValueError(msg) 

408 

409 # Create combined unique edges 

410 combined_edges = np.unique(np.concatenate([a_edges, b_edges])) 

411 

412 # Initialize output arrays 

413 c = np.zeros(len(combined_edges) - 1) 

414 d = np.zeros(len(combined_edges) - 1) 

415 

416 # Vectorized mapping using searchsorted - find which original bin each combined bin belongs to 

417 # For series a: find which original bin each combined bin center falls into 

418 combined_bin_centers = (combined_edges[:-1] + combined_edges[1:]) / 2 

419 a_bin_assignment = np.searchsorted(a_edges, combined_bin_centers, side="right") - 1 

420 a_bin_assignment = np.clip(a_bin_assignment, 0, len(a) - 1) 

421 

422 # Handle extrapolation for series a 

423 if extrapolation == "nearest": 

424 # Assign all values using nearest neighbor (already clipped) 

425 c[:] = a[a_bin_assignment] 

426 else: 

427 # Only assign values where the combined bin is completely within the original bin 

428 a_valid_mask = (combined_edges[:-1] >= a_edges[a_bin_assignment]) & ( 

429 combined_edges[1:] <= a_edges[a_bin_assignment + 1] 

430 ) 

431 c[a_valid_mask] = a[a_bin_assignment[a_valid_mask]] 

432 # Fill out-of-range bins with extrapolation value 

433 c[~a_valid_mask] = extrapolation 

434 

435 # Handle extrapolation for series b 

436 b_bin_assignment = np.searchsorted(b_edges, combined_bin_centers, side="right") - 1 

437 b_bin_assignment = np.clip(b_bin_assignment, 0, len(b) - 1) 

438 

439 if extrapolation == "nearest": 

440 # Assign all values using nearest neighbor (already clipped) 

441 d[:] = b[b_bin_assignment] 

442 else: 

443 # Only assign values where the combined bin is completely within the original bin 

444 b_valid_mask = (combined_edges[:-1] >= b_edges[b_bin_assignment]) & ( 

445 combined_edges[1:] <= b_edges[b_bin_assignment + 1] 

446 ) 

447 d[b_valid_mask] = b[b_bin_assignment[b_valid_mask]] 

448 # Fill out-of-range bins with extrapolation value 

449 d[~b_valid_mask] = extrapolation 

450 

451 # Return the combined series 

452 c_edges = combined_edges 

453 d_edges = combined_edges.copy() 

454 

455 return c, c_edges, d, d_edges 

456 

457 

458def compute_time_edges(tedges, tstart, tend, number_of_bins): 

459 """ 

460 Compute time edges for binning data based on provided time parameters. 

461 

462 This function creates a DatetimeIndex of time bin edges from one of three possible 

463 input formats: explicit edges, start times, or end times. The resulting edges 

464 define the boundaries of time intervals for data binning. 

465 

466 Define either explicit time edges, or start and end times for each bin and leave the others at None. 

467 

468 Parameters 

469 ---------- 

470 tedges : pandas.DatetimeIndex or None 

471 Explicit time edges for the bins. If provided, must have one more element 

472 than the number of bins (n_bins + 1). Takes precedence over tstart and tend. 

473 tstart : pandas.DatetimeIndex or None 

474 Start times for each bin. Must have the same number of elements as the 

475 number of bins. Used when tedges is None. 

476 tend : pandas.DatetimeIndex or None 

477 End times for each bin. Must have the same number of elements as the 

478 number of bins. Used when both tedges and tstart are None. 

479 number_of_bins : int 

480 The expected number of time bins. Used for validation against the provided 

481 time parameters. 

482 

483 Returns 

484 ------- 

485 pandas.DatetimeIndex 

486 Time edges defining the boundaries of the time bins. Has one more element 

487 than number_of_bins. 

488 

489 Raises 

490 ------ 

491 ValueError 

492 If tedges has incorrect length (not number_of_bins + 1). 

493 If tstart has incorrect length (not equal to number_of_bins). 

494 If tend has incorrect length (not equal to number_of_bins). 

495 If none of tedges, tstart, or tend are provided. 

496 

497 Notes 

498 ----- 

499 - When using tstart, the function assumes uniform spacing and extrapolates 

500 the final edge based on the spacing between the last two start times. 

501 - When using tend, the function assumes uniform spacing and extrapolates 

502 the first edge based on the spacing between the first two end times. 

503 - All input time data is converted to pandas.DatetimeIndex for consistency. 

504 """ 

505 if tedges is not None: 

506 tedges = pd.DatetimeIndex(tedges) 

507 if number_of_bins != len(tedges) - 1: 

508 msg = "tedges must have one more element than flow" 

509 raise ValueError(msg) 

510 

511 elif tstart is not None: 

512 # Assume the index refers to the time at the start of the measurement interval 

513 tstart = pd.DatetimeIndex(tstart) 

514 if number_of_bins != len(tstart): 

515 msg = "tstart must have the same number of elements as flow" 

516 raise ValueError(msg) 

517 

518 tedges = tstart.append(tstart[[-1]] + (tstart[-1] - tstart[-2])) 

519 

520 elif tend is not None: 

521 # Assume the index refers to the time at the end of the measurement interval 

522 tend = pd.DatetimeIndex(tend) 

523 if number_of_bins != len(tend): 

524 msg = "tend must have the same number of elements as flow" 

525 raise ValueError(msg) 

526 

527 tedges = (tend[[0]] - (tend[1] - tend[0])).append(tend) 

528 

529 else: 

530 msg = "Either provide tedges, tstart, and tend" 

531 raise ValueError(msg) 

532 return tedges