Coverage for src/gwtransport/utils.py: 91%

142 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-06 15:45 +0000

1"""General utilities for the 1D groundwater transport model.""" 

2 

3from collections.abc import Sequence 

4 

5import numpy as np 

6import numpy.typing as npt 

7import pandas as pd 

8from scipy import interpolate 

9 

10 

11def linear_interpolate(x_ref, y_ref, x_query, left=None, right=None): 

12 """ 

13 Linear interpolation on monotonically increasing data. 

14 

15 Parameters 

16 ---------- 

17 x_ref : array-like 

18 Reference vector with sorted x-values. 

19 y_ref : array-like 

20 Reference vector with y-values. 

21 x_query : array-like 

22 Query x-values. Array may have any shape. 

23 left : float, optional 

24 Value to return for x_query < x_ref[0]. 

25 - If `left` is set to None, x_query = x_ref[0]. 

26 - If `left` is set to a float, such as np.nan, this value is returned. 

27 right : float, optional 

28 Value to return for x_query > x_ref[-1]. 

29 - If `right` is set to None, x_query = x_ref[-1]. 

30 - If `right` is set to a float, such as np.nan, this value is returned. 

31 

32 Returns 

33 ------- 

34 array 

35 Interpolated y-values. 

36 """ 

37 x_ref = np.asarray(x_ref) 

38 y_ref = np.asarray(y_ref) 

39 x_query = np.asarray(x_query) 

40 

41 # Find indices where x_query would be inserted in x_ref 

42 idx_no_edges = np.searchsorted(x_ref, x_query) 

43 

44 idx = np.clip(idx_no_edges, 1, len(x_ref) - 1) 

45 

46 # Calculate interpolation weights 

47 x0 = x_ref[idx - 1] 

48 x1 = x_ref[idx] 

49 y0 = y_ref[idx - 1] 

50 y1 = y_ref[idx] 

51 

52 # Perform linear interpolation 

53 weights = (x_query - x0) / (x1 - x0) 

54 y_query = y0 + weights * (y1 - y0) 

55 

56 # Handle edge cases 

57 if left is None: 

58 y_query = np.where(x_query < x_ref[0], y_ref[0], y_query) 

59 if right is None: 

60 y_query = np.where(x_query > x_ref[-1], y_ref[-1], y_query) 

61 if isinstance(left, float): 

62 y_query = np.where(x_query < x_ref[0], left, y_query) 

63 if isinstance(right, float): 

64 y_query = np.where(x_query > x_ref[-1], right, y_query) 

65 

66 return y_query 

67 

68 

69def interp_series(series, index_new, **interp1d_kwargs): 

70 """ 

71 Interpolate a pandas.Series to a new index. 

72 

73 Parameters 

74 ---------- 

75 series : pandas.Series 

76 Series to interpolate. 

77 index_new : pandas.DatetimeIndex 

78 New index to interpolate to. 

79 interp1d_kwargs : dict, optional 

80 Keyword arguments passed to scipy.interpolate.interp1d. Default is {}. 

81 

82 Returns 

83 ------- 

84 pandas.Series 

85 Interpolated series. 

86 """ 

87 series = series[series.index.notna() & series.notna()] 

88 dt = (series.index - series.index[0]) / pd.to_timedelta(1, unit="D") 

89 dt_interp = (index_new - series.index[0]) / pd.to_timedelta(1, unit="D") 

90 interp_obj = interpolate.interp1d(dt, series.values, bounds_error=False, **interp1d_kwargs) 

91 return interp_obj(dt_interp) 

92 

93 

94def diff(a, alignment="centered"): 

95 """Compute the cell widths for a given array of cell coordinates. 

96 

97 If alignment is "centered", the coordinates are assumed to be centered in the cells. 

98 If alignment is "left", the coordinates are assumed to be at the left edge of the cells. 

99 If alignment is "right", the coordinates are assumed to be at the right edge of the cells. 

100 

101 Parameters 

102 ---------- 

103 a : array-like 

104 Input array. 

105 

106 Returns 

107 ------- 

108 array 

109 Array with differences between elements. 

110 """ 

111 if alignment == "centered": 

112 mid = a[:-1] + (a[1:] - a[:-1]) / 2 

113 return np.concatenate((a[[1]] - a[[0]], mid[1:] - mid[:-1], a[[-1]] - a[[-2]])) 

114 if alignment == "left": 

115 return np.concatenate((a[1:] - a[:-1], a[[-1]] - a[[-2]])) 

116 if alignment == "right": 

117 return np.concatenate((a[[1]] - a[[0]], a[1:] - a[:-1])) 

118 

119 msg = f"Invalid alignment: {alignment}" 

120 raise ValueError(msg) 

121 

122 

123def linear_average( # noqa: C901 

124 x_data: Sequence[float] | npt.NDArray[np.float64], 

125 y_data: Sequence[float] | npt.NDArray[np.float64], 

126 x_edges: Sequence[float] | npt.NDArray[np.float64], 

127 extrapolate_method: str = "nan", 

128) -> npt.NDArray[np.float64]: 

129 """ 

130 Compute the average value of a piecewise linear time series between specified x-edges. 

131 

132 Parameters 

133 ---------- 

134 x_data : array-like 

135 x-coordinates of the time series data points, must be in ascending order 

136 y_data : array-like 

137 y-coordinates of the time series data points 

138 x_edges : array-like 

139 x-coordinates of the integration edges, must be in ascending order 

140 extrapolate_method : str, optional 

141 Method for handling extrapolation. Default is 'nan'. 

142 - 'outer': Extrapolate using the outermost data points. 

143 - 'nan': Extrapolate using np.nan. 

144 - 'raise': Raise an error for out-of-bounds values. 

145 

146 Returns 

147 ------- 

148 numpy.ndarray 

149 Array of average values between consecutive pairs of x_edges 

150 

151 Examples 

152 -------- 

153 >>> x_data = [0, 1, 2, 3] 

154 >>> y_data = [0, 1, 1, 0] 

155 >>> x_edges = [0, 1.5, 3] 

156 >>> linear_average(x_data, y_data, x_edges) 

157 array([0.667, 0.667]) 

158 """ 

159 # Input validation 

160 if len(x_data) != len(y_data) and len(x_data) > 0: 

161 msg = "x_data and y_data must have the same length and be non-empty" 

162 raise ValueError(msg) 

163 if len(x_edges) < 2: # noqa: PLR2004 

164 msg = "x_edges_in_range must contain at least 2 values" 

165 raise ValueError(msg) 

166 if not np.all(np.diff(x_data) >= 0): 

167 msg = "x_data must be in ascending order" 

168 raise ValueError(msg) 

169 if not np.all(np.diff(x_edges) >= 0): 

170 msg = "x_edges must be in ascending order" 

171 raise ValueError(msg) 

172 

173 show = ~np.isnan(x_data) & ~np.isnan(y_data) 

174 

175 if (show.sum() < 2 and extrapolate_method == "nan") or show.sum() == 0: # noqa: PLR2004 

176 return np.full(shape=len(x_edges) - 1, fill_value=np.nan) 

177 

178 x_data = np.asarray(x_data, dtype=float)[show] 

179 y_data = np.asarray(y_data, dtype=float)[show] 

180 x_edges = np.asarray(x_edges, dtype=float) 

181 

182 # Extrapolate 

183 if extrapolate_method == "outer": 

184 # bins with x_edges ouside the range of x_data should be nan 

185 # Zero-widths are handles at the end of this function 

186 x_edges_in_range = np.clip(x_edges, x_data.min(), x_data.max()) 

187 elif extrapolate_method == "nan": 

188 # bins with x_edges ouside the range of x_data should be nan 

189 is_within_range = (x_edges >= x_data.min()) & (x_edges <= x_data.max()) 

190 x_edges_in_range = x_edges[is_within_range] 

191 elif extrapolate_method == "raise": 

192 if np.any(x_edges < x_data.min()) or np.any(x_edges > x_data.max()): 

193 msg = "x_edges must be within the range of x_data" 

194 raise ValueError(msg) 

195 else: 

196 msg = "extrapolate_method must be 'outer', 'nan', or 'raise'" 

197 raise ValueError(msg) 

198 

199 # Create a combined array of all x points 

200 all_x = np.concatenate([x_data, x_edges_in_range]) 

201 

202 # Get unique values and inverse indices 

203 unique_x, inverse_indices = np.unique(all_x, return_inverse=True) 

204 

205 # Get indices of where the edges are in the unique array 

206 edge_indices = inverse_indices[len(x_data) : len(all_x)] 

207 

208 # Interpolate y values at all unique x points 

209 unique_y = np.interp(unique_x, x_data, y_data, left=np.nan, right=np.nan) 

210 

211 # Compute segment-wise integrals using the trapezoidal rule 

212 dx = np.diff(unique_x) 

213 y_avg = (unique_y[:-1] + unique_y[1:]) / 2 

214 segment_integrals = dx * y_avg 

215 

216 # Compute cumulative integral 

217 cumulative_integral = np.concatenate([[0], np.cumsum(segment_integrals)]) 

218 

219 # Compute integral between consecutive edges 

220 integral_values = np.diff(cumulative_integral[edge_indices]) 

221 

222 # Compute widths between consecutive edges 

223 edge_widths = np.diff(x_edges_in_range) 

224 

225 # Handle zero-width intervals and non-zero width intervals in a vectorized way 

226 zero_width_mask = edge_widths == 0 

227 

228 # For non-zero width intervals, compute average = integral / width 

229 average_values_in_range = np.zeros_like(edge_widths) 

230 non_zero_mask = ~zero_width_mask 

231 if np.any(non_zero_mask): 

232 average_values_in_range[non_zero_mask] = integral_values[non_zero_mask] / edge_widths[non_zero_mask] 

233 

234 # For zero-width intervals, get the y-value directly from unique_y using edge_indices 

235 if np.any(zero_width_mask): 

236 # The indices in unique_x for zero-width edge positions 

237 zero_width_indices = edge_indices[np.where(zero_width_mask)[0]] 

238 

239 # Get all the y-values at once and assign them 

240 average_values_in_range[zero_width_mask] = unique_y[zero_width_indices] 

241 

242 # Handle extrapolation when 'nan' method is used and some edges are outside data range 

243 if extrapolate_method == "nan" and ~np.all(is_within_range): 

244 # Identify which bins are completely within the data range 

245 bins_within_range = (x_edges[:-1] >= x_data.min()) & (x_edges[1:] <= x_data.max()) 

246 # Create array of NaNs with same size as the number of bins 

247 average_values = np.full(shape=bins_within_range.size, fill_value=np.nan) 

248 # Copy calculated averages only to bins that are within data range 

249 average_values[bins_within_range] = average_values_in_range 

250 return average_values 

251 

252 return average_values_in_range 

253 

254 

255def partial_isin(bin_edges, timespans): 

256 """ 

257 Calculate the fraction of each bin that falls within each timespan. 

258 

259 Parameters 

260 ---------- 

261 bin_edges : array_like 

262 1D array of bin edges in ascending order. For n bins, there should be n+1 edges. 

263 timespans : array_like 

264 Timespans as a 2D array of shape (m, 2) where m is the number of timespans and 

265 each row contains [start, end] of a timespan. 

266 

267 Returns 

268 ------- 

269 bin_fractions : ndarray 

270 2D array of shape (m, n) where m is the number of timespans and n is the number of bins. 

271 Each element (i, j) represents the fraction of bin j that falls within timespan i. 

272 

273 Notes 

274 ----- 

275 - The function assumes bin_edges and timespans are in the same units. 

276 - Bins are defined by their edges, i.e., bin j spans from bin_edges[j] to bin_edges[j+1]. 

277 - Values range from 0 (no overlap) to 1 (complete overlap). 

278 

279 Examples 

280 -------- 

281 >>> bin_edges = np.array([0, 10, 20, 30]) 

282 >>> timespans = np.array([[5, 25], [15, 35]]) 

283 >>> partial_isin(bin_edges, timespans) 

284 array([[0.5, 1. , 0.5], 

285 [0. , 0.5, 1. ]]) 

286 """ 

287 # Convert inputs to numpy arrays 

288 bin_edges = np.asarray(bin_edges) 

289 timespans = np.asarray(timespans) 

290 

291 # Validate inputs 

292 if bin_edges.ndim != 1 or timespans.ndim != 2 or timespans.shape[1] != 2: # noqa: PLR2004 

293 msg = "Invalid input shapes: bin_edges must be 1D and timespans must be 2D with shape (m, 2)." 

294 raise ValueError(msg) 

295 if not np.all(np.diff(bin_edges) > 0): 

296 msg = "bin_edges must be in ascending order." 

297 raise ValueError(msg) 

298 if not np.all(np.diff(timespans) > 0): 

299 msg = "timespans must be in ascending order." 

300 raise ValueError(msg) 

301 

302 # Calculate bin widths 

303 bin_widths = np.diff(bin_edges) 

304 

305 # Calculate overlapping segments using broadcasting 

306 left_edges = bin_edges[:-1][np.newaxis, :] 

307 right_edges = bin_edges[1:][np.newaxis, :] 

308 

309 starts = timespans[:, 0][:, np.newaxis] 

310 ends = timespans[:, 1][:, np.newaxis] 

311 

312 overlap_left = np.maximum(left_edges, starts) 

313 overlap_right = np.minimum(right_edges, ends) 

314 

315 # Calculate overlap widths (clip at 0 to handle non-overlapping cases) 

316 overlap_widths = np.maximum(0, overlap_right - overlap_left) 

317 

318 # Calculate fraction of each bin that overlaps with timespan 

319 return overlap_widths / bin_widths[np.newaxis, :] 

320 

321 

322def generate_failed_coverage_badge(): 

323 """Generate a badge indicating failed coverage.""" 

324 from genbadge import Badge # type: ignore # noqa: PLC0415 

325 

326 b = Badge(left_txt="coverage", right_txt="failed", color="red") 

327 b.write_to("coverage_failed.svg", use_shields=False) 

328 

329 

330def compute_time_edges(tedges, tstart, tend, number_of_bins): 

331 """ 

332 Compute time edges for binning data based on provided time parameters. 

333 

334 This function creates a DatetimeIndex of time bin edges from one of three possible 

335 input formats: explicit edges, start times, or end times. The resulting edges 

336 define the boundaries of time intervals for data binning. 

337 

338 Define either explicit time edges, or start and end times for each bin and leave the others at None. 

339 

340 Parameters 

341 ---------- 

342 tedges : pandas.DatetimeIndex or None 

343 Explicit time edges for the bins. If provided, must have one more element 

344 than the number of bins (n_bins + 1). Takes precedence over tstart and tend. 

345 tstart : pandas.DatetimeIndex or None 

346 Start times for each bin. Must have the same number of elements as the 

347 number of bins. Used when tedges is None. 

348 tend : pandas.DatetimeIndex or None 

349 End times for each bin. Must have the same number of elements as the 

350 number of bins. Used when both tedges and tstart are None. 

351 number_of_bins : int 

352 The expected number of time bins. Used for validation against the provided 

353 time parameters. 

354 

355 Returns 

356 ------- 

357 pandas.DatetimeIndex 

358 Time edges defining the boundaries of the time bins. Has one more element 

359 than number_of_bins. 

360 

361 Raises 

362 ------ 

363 ValueError 

364 If tedges has incorrect length (not number_of_bins + 1). 

365 If tstart has incorrect length (not equal to number_of_bins). 

366 If tend has incorrect length (not equal to number_of_bins). 

367 If none of tedges, tstart, or tend are provided. 

368 

369 Notes 

370 ----- 

371 - When using tstart, the function assumes uniform spacing and extrapolates 

372 the final edge based on the spacing between the last two start times. 

373 - When using tend, the function assumes uniform spacing and extrapolates 

374 the first edge based on the spacing between the first two end times. 

375 - All input time data is converted to pandas.DatetimeIndex for consistency. 

376 """ 

377 if tedges is not None: 

378 tedges = pd.DatetimeIndex(tedges) 

379 if number_of_bins != len(tedges) - 1: 

380 msg = "tedges must have one more element than flow" 

381 raise ValueError(msg) 

382 

383 elif tstart is not None: 

384 # Assume the index refers to the time at the start of the measurement interval 

385 tstart = pd.DatetimeIndex(tstart) 

386 if number_of_bins != len(tstart): 

387 msg = "tstart must have the same number of elements as flow" 

388 raise ValueError(msg) 

389 

390 tedges = tstart.append(tstart[[-1]] + (tstart[-1] - tstart[-2])) 

391 

392 elif tend is not None: 

393 # Assume the index refers to the time at the end of the measurement interval 

394 tend = pd.DatetimeIndex(tend) 

395 if number_of_bins != len(tend): 

396 msg = "tend must have the same number of elements as flow" 

397 raise ValueError(msg) 

398 

399 tedges = (tend[[0]] - (tend[1] - tend[0])).append(tend) 

400 

401 else: 

402 msg = "Either provide tedges, tstart, and tend" 

403 raise ValueError(msg) 

404 return tedges