Coverage for src / gwtransport / diffusion.py: 90%

280 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-27 06:32 +0000

1""" 

2Analytical solutions for 1D advection-dispersion transport. 

3 

4This module implements analytical solutions for solute transport in 1D aquifer 

5systems, combining advection with longitudinal dispersion. The solutions are 

6based on the error function (erf) and its integrals. 

7 

8Key functions: 

9 

10- :func:`infiltration_to_extraction` - Main transport function combining advection and dispersion 

11 with explicit pore volume distribution and streamline lengths. 

12 

13- :func:`extraction_to_infiltration` - Inverse operation (deconvolution with dispersion). 

14 

15- :func:`gamma_infiltration_to_extraction` - Gamma-distributed pore volumes with dispersion. 

16 Models aquifer heterogeneity with 2-parameter gamma distribution. Parameterizable via 

17 (alpha, beta) or (mean, std). Discretizes gamma distribution into equal-probability bins. 

18 

19- :func:`gamma_extraction_to_infiltration` - Gamma-distributed pore volumes, deconvolution 

20 with dispersion. Symmetric inverse of gamma_infiltration_to_extraction. 

21 

22The dispersion is characterized by the longitudinal dispersion coefficient D_L, 

23which is computed internally from: 

24 

25 D_L = D_m + alpha_L * v_s 

26 

27where: 

28- D_m is the molecular diffusion coefficient [m^2/day] 

29- alpha_L is the longitudinal dispersivity [m] 

30- v_s is the retarded velocity [m/day], computed as v_s = L / (R * tau_mean) 

31 

32The velocity v_s is computed per (pore_volume, output_bin) using the mean residence 

33time (which includes retardation), correctly accounting for time-varying flow. 

34This formulation ensures that: 

35- Molecular diffusion spreading scales with residence time: sqrt(D_m * tau) 

36- Mechanical dispersion spreading scales with travel distance: sqrt(alpha_L * L) 

37 

38This module adds microdispersion (alpha_L) and molecular diffusion (D_m) on top of 

39macrodispersion captured by the pore volume distribution (APVD). Both represent velocity 

40heterogeneity at different scales. Microdispersion is an aquifer property; macrodispersion 

41depends additionally on hydrological boundary conditions. See :ref:`concept-dispersion-scales` 

42for guidance on when to use each approach and how to avoid double-counting spreading effects. 

43""" 

44 

45import warnings 

46 

47import numpy as np 

48import numpy.typing as npt 

49import pandas as pd 

50from scipy import special 

51 

52from gwtransport import gamma 

53from gwtransport.residence_time import residence_time, residence_time_mean 

54from gwtransport.utils import solve_inverse_transport 

55 

56# Numerical tolerance for coefficient sum to determine valid output bins 

57EPSILON_COEFF_SUM = 1e-10 

58 

59# Gauss-Legendre quadrature nodes and weights for volume-space integration 

60_GL_NODES, _GL_WEIGHTS = np.polynomial.legendre.leggauss(16) 

61 

62 

63def _erf_integral_space( 

64 x: npt.NDArray[np.float64], 

65 diffusivity: npt.ArrayLike, 

66 t: npt.NDArray[np.float64], 

67 clip_to_inf: float = 6.0, 

68) -> npt.NDArray[np.float64]: 

69 """Compute the integral of the error function at each (x[i], t[i], D[i]) point. 

70 

71 This function computes the integral of erf from 0 to x[i] at time t[i], 

72 where x, t, and diffusivity are broadcastable arrays. 

73 

74 Parameters 

75 ---------- 

76 x : ndarray 

77 Input x values. Broadcastable with t and diffusivity. 

78 diffusivity : float or ndarray 

79 Diffusivity [m²/day]. Must be non-negative. Broadcastable with x and t. 

80 t : ndarray 

81 Time values [day]. Broadcastable with x and diffusivity. Must be non-negative. 

82 clip_to_inf : float, optional 

83 Clip ax values beyond this to avoid numerical issues. Default is 6. 

84 

85 Returns 

86 ------- 

87 ndarray 

88 Integral values at each (x, t, D) point. Shape is broadcast shape of inputs. 

89 """ 

90 x = np.asarray(x) 

91 t = np.asarray(t) 

92 diffusivity = np.asarray(diffusivity) 

93 

94 # Broadcast all inputs to common shape 

95 x, t, diffusivity = np.broadcast_arrays(x, t, diffusivity) 

96 out = np.zeros_like(x, dtype=float) 

97 

98 # Compute a = 1/(2*sqrt(diffusivity*t)) from diffusivity and t 

99 # Handle edge cases: diffusivity=0 or t=0 means a=inf (step function) 

100 with np.errstate(divide="ignore", invalid="ignore"): 

101 a = np.where((diffusivity == 0.0) | (t == 0.0), np.inf, 1.0 / (2.0 * np.sqrt(diffusivity * t))) 

102 

103 # Handle a=inf case: integral of sign(x) from 0 to x is |x| 

104 mask_a_inf = np.isinf(a) 

105 if np.any(mask_a_inf): 

106 out[mask_a_inf] = np.abs(x[mask_a_inf]) 

107 

108 # Handle a=0 case: erf(0) = 0, integral is 0 

109 mask_a_zero = a == 0.0 

110 # out[mask_a_zero] already 0 

111 

112 # Handle finite non-zero a 

113 mask_valid = ~mask_a_inf & ~mask_a_zero 

114 if np.any(mask_valid): 

115 x_v = x[mask_valid] 

116 a_v = a[mask_valid] 

117 ax = a_v * x_v 

118 

119 # Initialize result for valid entries 

120 result = np.zeros_like(x_v) 

121 

122 # Handle clipped regions 

123 maskl = ax <= -clip_to_inf 

124 masku = ax >= clip_to_inf 

125 mask_mid = ~maskl & ~masku 

126 

127 result[maskl] = -x_v[maskl] - 1 / (a_v[maskl] * np.sqrt(np.pi)) 

128 result[masku] = x_v[masku] - 1 / (a_v[masku] * np.sqrt(np.pi)) 

129 result[mask_mid] = x_v[mask_mid] * special.erf(ax[mask_mid]) + (np.exp(-(ax[mask_mid] ** 2)) - 1) / ( 

130 a_v[mask_mid] * np.sqrt(np.pi) 

131 ) 

132 

133 out[mask_valid] = result 

134 

135 return out 

136 

137 

138def _erf_mean_volume( 

139 *, 

140 step_widths: npt.NDArray[np.float64], 

141 raw_time: npt.NDArray[np.float64], 

142 rt_at_cin_edges: npt.NDArray[np.float64], 

143 diffusivity: npt.NDArray[np.float64], 

144 cumulative_volume_at_cout_tedges: npt.NDArray[np.float64], 

145 cumulative_volume_at_cin_tedges: npt.NDArray[np.float64], 

146 tedges_days: npt.NDArray[np.float64], 

147 r_vpv: float, 

148 streamline_len: float, 

149 asymptotic_cutoff_sigma: float | None, 

150) -> npt.NDArray[np.float64]: 

151 r"""Compute mean erf along the physical trajectory in cumulative volume space. 

152 

153 For each cell (cout_bin *i*, cin_edge *j*), computes the flow-weighted 

154 average of the error function along the 1D extraction trajectory: 

155 

156 .. math:: 

157 

158 \text{mean\_erf}_{i,j} = \frac{1}{\Delta V_i} 

159 \int_{V_i}^{V_{i+1}} \text{erf}\!\left( 

160 \frac{x(V)}{2\sqrt{D \cdot \tau(V)}} 

161 \right) dV 

162 

163 where *x(V)* is the normalized distance (linear in *V*) and *τ(V)* is the 

164 elapsed time since infiltration (capped at the residence time *RT*). 

165 Averaging over cumulative volume gives a flow-weighted average because 

166 dV = Q(t) dt. 

167 

168 **Fully capped cells** (both cout edges have elapsed time ≥ RT): 

169 τ = RT (constant), so the integral reduces to ``_erf_integral_space`` 

170 at fixed *t* = RT. This path gives machine precision. 

171 

172 **Uncapped cells** (at least one cout edge has elapsed time < RT): 

173 Vectorized 16-point Gauss-Legendre quadrature in volume space. The 

174 integration is split at flow-bin boundaries (where *Q* changes) so 

175 that within each sub-interval the integrand is smooth, and at the 

176 capping transition (where *τ* switches from linear growth to the 

177 constant *RT*) where the exact ``_erf_integral_space`` is used for 

178 the capped portion. The outer loop runs over flow bins; within each 

179 iteration all overlapping uncapped cells are evaluated simultaneously. 

180 

181 Parameters 

182 ---------- 

183 step_widths : ndarray, shape (n_cout_edges, n_cin_edges) 

184 Normalized x-position at each (cout_edge, cin_edge) point. 

185 NaN for inactive cells. 

186 raw_time : ndarray, shape (n_cout_edges, n_cin_edges) 

187 Raw elapsed time in days (before capping at RT). 

188 NaN for inactive cells. 

189 rt_at_cin_edges : ndarray, shape (n_cin_edges,) 

190 Residence time at each cin edge for this pore volume [days]. 

191 diffusivity : ndarray, shape (n_cout_bins,) 

192 Longitudinal dispersion coefficient per cout bin [m²/day]. 

193 cumulative_volume_at_cout_tedges : ndarray, shape (n_cout_edges,) 

194 Cumulative extracted volume at each cout time edge [m³]. 

195 cumulative_volume_at_cin_tedges : ndarray, shape (n_cin_edges,) 

196 Cumulative volume at each cin (flow) time edge [m³]. 

197 tedges_days : ndarray, shape (n_cin_edges,) 

198 Flow time edges in days (same reference as raw_time). 

199 r_vpv : float 

200 Retardation factor times pore volume [m³]. 

201 streamline_len : float 

202 Streamline length [m]. 

203 asymptotic_cutoff_sigma : float or None 

204 Erf cutoff threshold for ``_erf_integral_space``. 

205 

206 Returns 

207 ------- 

208 ndarray, shape (n_cout_bins, n_cin_edges) 

209 Mean erf value for each cell. NaN for inactive cells. 

210 """ 

211 n_cout_edges, n_cin_edges = step_widths.shape 

212 n_cout_bins = n_cout_edges - 1 

213 

214 x_lo = step_widths[:-1] 

215 x_hi = step_widths[1:] 

216 dx = x_hi - x_lo 

217 

218 v_lo_arr = cumulative_volume_at_cout_tedges[:-1] 

219 v_hi_arr = cumulative_volume_at_cout_tedges[1:] 

220 

221 is_valid = ~np.isnan(x_lo) & ~np.isnan(x_hi) 

222 

223 # Determine capping status at each cell edge 

224 with np.errstate(invalid="ignore"): 

225 is_capped_lo = raw_time[:-1] >= rt_at_cin_edges[np.newaxis, :] 

226 is_capped_hi = raw_time[1:] >= rt_at_cin_edges[np.newaxis, :] 

227 is_fully_capped = is_capped_lo & is_capped_hi 

228 

229 response = np.full((n_cout_bins, n_cin_edges), np.nan) 

230 

231 # --- D = 0: erf = sign(x), exact for any τ --- 

232 mask_d_zero = diffusivity == 0.0 

233 if np.any(mask_d_zero): 

234 mask_d_zero_2d = is_valid & np.broadcast_to(mask_d_zero[:, np.newaxis], (n_cout_bins, n_cin_edges)) 

235 with np.errstate(divide="ignore", invalid="ignore"): 

236 d_zero_vals = (np.abs(x_hi) - np.abs(x_lo)) / dx 

237 d_zero_vals = np.where(dx == 0.0, np.sign(x_lo), d_zero_vals) 

238 response = np.where(mask_d_zero_2d, d_zero_vals, response) 

239 is_valid &= ~mask_d_zero_2d 

240 

241 # --- Fully capped cells: exact via _erf_integral_space --- 

242 mask_capped = is_valid & is_fully_capped 

243 if np.any(mask_capped): 

244 t_const = np.broadcast_to(rt_at_cin_edges[np.newaxis, :], (n_cout_bins, n_cin_edges))[mask_capped] 

245 d_bc = np.broadcast_to(diffusivity[:, np.newaxis], (n_cout_bins, n_cin_edges))[mask_capped] 

246 

247 clip_kw = {"clip_to_inf": asymptotic_cutoff_sigma} if asymptotic_cutoff_sigma is not None else {} 

248 h_hi = _erf_integral_space(x_hi[mask_capped], diffusivity=d_bc, t=t_const, **clip_kw) 

249 h_lo = _erf_integral_space(x_lo[mask_capped], diffusivity=d_bc, t=t_const, **clip_kw) 

250 dx_c = dx[mask_capped] 

251 

252 with np.errstate(divide="ignore", invalid="ignore"): 

253 a_vals = np.where( 

254 (t_const <= 0) | (d_bc <= 0), 

255 np.inf, 

256 1.0 / (2.0 * np.sqrt(d_bc * t_const)), 

257 ) 

258 point_val = np.where(np.isinf(a_vals), np.sign(x_lo[mask_capped]), special.erf(a_vals * x_lo[mask_capped])) 

259 

260 with np.errstate(divide="ignore", invalid="ignore"): 

261 response[mask_capped] = np.where(dx_c == 0.0, point_val, (h_hi - h_lo) / dx_c) 

262 

263 # --- Uncapped cells: vectorized Gauss-Legendre quadrature in volume space --- 

264 mask_uncapped = is_valid & ~is_fully_capped 

265 if np.any(mask_uncapped): 

266 idx_i, idx_j = np.nonzero(mask_uncapped) 

267 n_cells = len(idx_i) 

268 

269 # Gather cell parameters (all shape (n_cells,)) 

270 v_lo_cells = v_lo_arr[idx_i] 

271 v_hi_cells = v_hi_arr[idx_i] 

272 v_cin_cells = cumulative_volume_at_cin_tedges[idx_j] 

273 rt_cells = rt_at_cin_edges[idx_j] 

274 t_j_cells = tedges_days[idx_j] 

275 d_cells = diffusivity[idx_i] 

276 total_dv = v_hi_cells - v_lo_cells 

277 

278 valid_cells = np.isfinite(rt_cells) & (total_dv > 0) 

279 

280 # Partially capped: start uncapped, end capped 

281 has_capped = is_capped_hi[idx_i, idx_j] & ~is_capped_lo[idx_i, idx_j] 

282 t_kink_all = t_j_cells + rt_cells 

283 v_kink_all = np.interp(t_kink_all, tedges_days, cumulative_volume_at_cin_tedges) 

284 v_kink_all = np.clip(v_kink_all, v_lo_cells, v_hi_cells) 

285 v_end = np.where(has_capped, v_kink_all, v_hi_cells) 

286 

287 # --- GL quadrature over [v_lo, v_end] for all cells simultaneously --- 

288 # Split at flow bin boundaries for smoothness within each sub-interval. 

289 # Loop over consecutive pairs of flow-bin volume edges; vectorize 

290 # across all cells whose uncapped interval overlaps that sub-interval. 

291 uncapped_integral = np.zeros(n_cells) 

292 vol_edges = cumulative_volume_at_cin_tedges # monotonic volume grid 

293 

294 for k in range(len(vol_edges) - 1): 

295 ve_lo, ve_hi = vol_edges[k], vol_edges[k + 1] 

296 # Clip to each cell's uncapped interval 

297 sub_lo = np.maximum(v_lo_cells, ve_lo) 

298 sub_hi = np.minimum(v_end, ve_hi) 

299 overlap = (sub_hi > sub_lo) & valid_cells 

300 if not np.any(overlap): 

301 continue 

302 

303 dv_sub = sub_hi[overlap] - sub_lo[overlap] # (n_overlap,) 

304 v_mid_sub = (sub_hi[overlap] + sub_lo[overlap]) / 2 

305 v_half_sub = dv_sub / 2 

306 

307 # GL nodes: (n_overlap, n_gl) 

308 v_nodes = v_mid_sub[:, np.newaxis] + v_half_sub[:, np.newaxis] * _GL_NODES[np.newaxis, :] 

309 

310 # x(V): (n_overlap, n_gl) 

311 x_nodes = (v_nodes - v_cin_cells[overlap, np.newaxis] - r_vpv) * streamline_len / r_vpv 

312 

313 # t(V): within flow sub-interval k, Q is constant so t is linear in V 

314 dt_sub = tedges_days[k + 1] - tedges_days[k] 

315 dv_sub_edge = ve_hi - ve_lo 

316 t_nodes = tedges_days[k] + (v_nodes - ve_lo) * (dt_sub / dv_sub_edge) 

317 

318 # τ(V) = clip(t(V) - t_j, 0, RT_j): (n_overlap, n_gl) 

319 tau_nodes = np.clip(t_nodes - t_j_cells[overlap, np.newaxis], 0.0, rt_cells[overlap, np.newaxis]) 

320 

321 # erf(x / (2√(Dτ))): (n_overlap, n_gl) 

322 with np.errstate(divide="ignore", invalid="ignore"): 

323 arg = x_nodes / (2.0 * np.sqrt(d_cells[overlap, np.newaxis] * tau_nodes)) 

324 erf_vals = np.where(np.isfinite(arg), special.erf(arg), np.sign(x_nodes)) 

325 

326 # Accumulate weighted integral for overlapping cells 

327 uncapped_integral[overlap] += (dv_sub / 2) * (erf_vals @ _GL_WEIGHTS) 

328 

329 # --- Capped sub-interval [v_kink, v_hi]: exact via _erf_integral_space --- 

330 capped_integral = np.zeros(n_cells) 

331 mask_cap = has_capped & (v_kink_all < v_hi_cells) & valid_cells 

332 if np.any(mask_cap): 

333 x_at_kink = (v_kink_all[mask_cap] - v_cin_cells[mask_cap] - r_vpv) * streamline_len / r_vpv 

334 x_at_hi_cap = x_hi[idx_i[mask_cap], idx_j[mask_cap]] 

335 clip_kw = {"clip_to_inf": asymptotic_cutoff_sigma} if asymptotic_cutoff_sigma is not None else {} 

336 h_hi_val = _erf_integral_space(x_at_hi_cap, diffusivity=d_cells[mask_cap], t=rt_cells[mask_cap], **clip_kw) 

337 h_lo_val = _erf_integral_space(x_at_kink, diffusivity=d_cells[mask_cap], t=rt_cells[mask_cap], **clip_kw) 

338 capped_integral[mask_cap] = (h_hi_val - h_lo_val) * (r_vpv / streamline_len) 

339 

340 # Combine: mean erf = (uncapped + capped) / total_dv 

341 with np.errstate(divide="ignore", invalid="ignore"): 

342 response_cells = np.where(valid_cells, (uncapped_integral + capped_integral) / total_dv, np.nan) 

343 response[idx_i, idx_j] = response_cells 

344 

345 return response 

346 

347 

348def _infiltration_to_extraction_coeff_matrix( 

349 *, 

350 flow: npt.NDArray[np.floating], 

351 tedges: pd.DatetimeIndex, 

352 cout_tedges: pd.DatetimeIndex, 

353 aquifer_pore_volumes: npt.NDArray[np.floating], 

354 streamline_length: npt.NDArray[np.floating], 

355 molecular_diffusivity: npt.NDArray[np.floating], 

356 longitudinal_dispersivity: npt.NDArray[np.floating], 

357 retardation_factor: float, 

358 asymptotic_cutoff_sigma: float | None, 

359) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.bool_]]: 

360 """Build the forward coefficient matrix for diffusion transport. 

361 

362 Constructs the matrix W such that ``cout = W @ cin``, accounting for 

363 advection and longitudinal dispersion. NaN entries in the raw coefficient 

364 matrix are replaced with zero. 

365 

366 Parameters 

367 ---------- 

368 flow : ndarray 

369 Flow rate of water [m3/day]. Already validated. 

370 tedges : DatetimeIndex 

371 Cin/flow time edges (not yet extended for spin-up). 

372 cout_tedges : DatetimeIndex 

373 Cout time edges. 

374 aquifer_pore_volumes : ndarray 

375 Pore volumes [m3]. Already validated. 

376 streamline_length : ndarray 

377 Travel distances [m]. Already validated. 

378 molecular_diffusivity : ndarray 

379 Effective molecular diffusivities [m2/day]. Already broadcasted. 

380 See :func:`infiltration_to_extraction` for physical interpretation. 

381 longitudinal_dispersivity : ndarray 

382 Longitudinal dispersivities [m]. Already broadcasted. 

383 retardation_factor : float 

384 Retardation factor. 

385 asymptotic_cutoff_sigma : float or None 

386 Erf cutoff threshold. 

387 

388 Returns 

389 ------- 

390 coeff_matrix : ndarray 

391 Filled coefficient matrix of shape (n_cout, n_cin). NaN replaced 

392 with zero. 

393 valid_cout_bins : ndarray 

394 Boolean mask of shape (n_cout,) indicating valid output bins. 

395 """ 

396 # Extend tedges for spin up 

397 tedges = pd.DatetimeIndex([ 

398 tedges[0] - pd.Timedelta("36500D"), 

399 *list(tedges[1:-1]), 

400 tedges[-1] + pd.Timedelta("36500D"), 

401 ]) 

402 

403 # Compute the cumulative flow at tedges 

404 infiltration_volume = flow * (np.diff(tedges) / pd.Timedelta("1D")) # m3 

405 cumulative_volume_at_cin_tedges = np.concatenate(([0], np.cumsum(infiltration_volume))) 

406 

407 # Compute the cumulative flow at cout_tedges 

408 cumulative_volume_at_cout_tedges = np.interp(cout_tedges, tedges, cumulative_volume_at_cin_tedges).astype(float) 

409 

410 rt_edges_2d = residence_time( 

411 flow=flow, 

412 flow_tedges=tedges, 

413 index=tedges, 

414 aquifer_pore_volumes=aquifer_pore_volumes, 

415 retardation_factor=retardation_factor, 

416 direction="infiltration_to_extraction", 

417 ) 

418 

419 # Compute residence time at cout_tedges to identify valid output bins 

420 # RT is NaN for cout_tedges beyond the input data range 

421 rt_at_cout_tedges = residence_time( 

422 flow=flow, 

423 flow_tedges=tedges, 

424 index=cout_tedges, 

425 aquifer_pore_volumes=aquifer_pore_volumes, 

426 retardation_factor=retardation_factor, 

427 direction="extraction_to_infiltration", 

428 ) 

429 # Output bin i is valid if both cout_tedges[i] and cout_tedges[i+1] have valid RT for all pore volumes 

430 valid_cout_bins = ~np.any(np.isnan(rt_at_cout_tedges[:, :-1]) | np.isnan(rt_at_cout_tedges[:, 1:]), axis=0) 

431 

432 # Compute mean residence time per (pore_volume, cout_bin) for velocity calculation 

433 rt_mean_2d = residence_time_mean( 

434 flow=flow, 

435 flow_tedges=tedges, 

436 tedges_out=cout_tedges, 

437 aquifer_pore_volumes=aquifer_pore_volumes, 

438 direction="extraction_to_infiltration", 

439 retardation_factor=retardation_factor, 

440 ) 

441 

442 # Compute retarded velocity: v_s = L / (R * tau) = v / R. 

443 # Using R*tau (from residence_time with retardation) gives the retarded 

444 # velocity, not the pore velocity. This is intentional: 

445 # - Dispersivity term: alpha_L * v_s * R * tau = alpha_L * L — R cancels, 

446 # giving correct distance-dependent mechanical dispersion. 

447 # - Molecular term: D_m * R * tau — exact when D_m represents D_eff = D_m/R. 

448 # For solutes D_m ~ 1e-5 m²/day (negligible); for heat, users pass 

449 # D_th = lambda/(rho*c)_eff which is already the effective diffusivity. 

450 valid_rt = np.isfinite(rt_mean_2d) & (rt_mean_2d > 0) 

451 velocity_2d = np.where(valid_rt, streamline_length[:, None] / rt_mean_2d, 0.0) 

452 

453 # Compute D_L = D_m + alpha_L * v_s 

454 diffusivity_2d = np.where( 

455 valid_rt, 

456 molecular_diffusivity[:, None] + longitudinal_dispersivity[:, None] * velocity_2d, 

457 molecular_diffusivity[:, None], 

458 ) 

459 

460 # Initialize coefficient matrix accumulator 

461 n_cout_bins = len(cout_tedges) - 1 

462 n_cin_bins = len(flow) 

463 accumulated_coeff = np.zeros((n_cout_bins, n_cin_bins)) 

464 

465 # Determine when infiltration has occurred: cout_tedge must be >= tedge (infiltration time) 

466 isactive = cout_tedges.to_numpy()[:, None] >= tedges.to_numpy()[None, :] 

467 

468 # Compute raw elapsed time (before capping at RT) once for all pore volumes 

469 raw_time = (cout_tedges.to_numpy()[:, None] - tedges.to_numpy()[None, :]) / pd.to_timedelta(1, unit="D") 

470 raw_time[~isactive] = np.nan 

471 

472 # Convert tedges to days for volume→time interpolation 

473 tedges_days_arr = ((tedges - tedges[0]) / pd.Timedelta("1D")).values.astype(float) 

474 

475 # Loop over each pore volume 

476 for i_pv in range(len(aquifer_pore_volumes)): 

477 r_vpv = retardation_factor * aquifer_pore_volumes[i_pv] 

478 

479 delta_volume = cumulative_volume_at_cout_tedges[:, None] - cumulative_volume_at_cin_tedges[None, :] - r_vpv 

480 delta_volume[~isactive] = np.nan 

481 

482 step_widths = delta_volume / r_vpv * streamline_length[i_pv] 

483 

484 diff_pv = diffusivity_2d[i_pv, :] 

485 response = _erf_mean_volume( 

486 step_widths=step_widths, 

487 raw_time=raw_time, 

488 rt_at_cin_edges=rt_edges_2d[i_pv], 

489 diffusivity=diff_pv, 

490 cumulative_volume_at_cout_tedges=cumulative_volume_at_cout_tedges, 

491 cumulative_volume_at_cin_tedges=cumulative_volume_at_cin_tedges, 

492 tedges_days=tedges_days_arr, 

493 r_vpv=r_vpv, 

494 streamline_len=streamline_length[i_pv], 

495 asymptotic_cutoff_sigma=asymptotic_cutoff_sigma, 

496 ) 

497 

498 frac = 0.5 * (1 + response) 

499 frac_start = frac[:, :-1] 

500 frac_end = frac[:, 1:] 

501 frac_end_filled = np.where(np.isnan(frac_end) & ~np.isnan(frac_start), 0.0, frac_end) 

502 coeff = frac_start - frac_end_filled 

503 

504 accumulated_coeff += coeff 

505 

506 coeff_matrix = accumulated_coeff / len(aquifer_pore_volumes) 

507 coeff_matrix_filled = np.nan_to_num(coeff_matrix, nan=0.0) 

508 

509 return coeff_matrix_filled, valid_cout_bins 

510 

511 

512def infiltration_to_extraction( 

513 *, 

514 cin: npt.ArrayLike, 

515 flow: npt.ArrayLike, 

516 tedges: pd.DatetimeIndex, 

517 cout_tedges: pd.DatetimeIndex, 

518 aquifer_pore_volumes: npt.ArrayLike, 

519 streamline_length: npt.ArrayLike, 

520 molecular_diffusivity: npt.ArrayLike, 

521 longitudinal_dispersivity: npt.ArrayLike, 

522 retardation_factor: float = 1.0, 

523 suppress_dispersion_warning: bool = False, 

524 asymptotic_cutoff_sigma: float | None = 3.0, 

525) -> npt.NDArray[np.floating]: 

526 """ 

527 Compute extracted concentration with advection and longitudinal dispersion. 

528 

529 This function models 1D solute transport through an aquifer system, 

530 combining advective transport (based on residence times) with longitudinal 

531 dispersion (diffusive spreading during transport). 

532 

533 The physical model assumes: 

534 1. Water infiltrates with concentration cin at time t_in 

535 2. Water travels distance L through aquifer with residence time tau = V_pore / Q 

536 3. During transport, longitudinal dispersion causes spreading 

537 4. At extraction, the concentration is a diffused breakthrough curve 

538 

539 The longitudinal dispersion coefficient D_L is computed internally as: 

540 

541 D_L = D_m + alpha_L * v_s 

542 

543 where v_s = L / (R * tau_mean) is the retarded velocity computed from the 

544 mean residence time (which includes retardation) for each (pore_volume, 

545 output_bin) combination. For the dispersivity term, the R factors cancel 

546 (alpha_L * v_s * R * tau = alpha_L * L), giving correct distance-dependent 

547 mechanical dispersion. See the ``molecular_diffusivity`` parameter for how 

548 the molecular term interacts with retardation. 

549 

550 Parameters 

551 ---------- 

552 cin : array-like 

553 Concentration of the compound in infiltrating water [concentration units]. 

554 Length must match the number of time bins defined by tedges. The model assumes 

555 this value is constant over each interval ``[tedges[i], tedges[i+1])``. 

556 flow : array-like 

557 Flow rate of water in the aquifer [m3/day]. 

558 Length must match cin and the number of time bins defined by tedges. The model 

559 assumes this value is constant over each interval ``[tedges[i], tedges[i+1])``. 

560 tedges : pandas.DatetimeIndex 

561 Time edges defining bins for both cin and flow data. Has length of 

562 len(cin) + 1. 

563 cout_tedges : pandas.DatetimeIndex 

564 Time edges for output data bins. Has length of desired output + 1. 

565 The output concentration is averaged over each bin. 

566 aquifer_pore_volumes : array-like 

567 Array of aquifer pore volumes [m3] representing the distribution 

568 of flow paths. Each pore volume determines the residence time for 

569 that flow path: tau = V_pore / Q. 

570 streamline_length : array-like 

571 Array of travel distances [m] corresponding to each pore volume. 

572 Must have the same length as aquifer_pore_volumes. 

573 molecular_diffusivity : float or array-like 

574 Effective molecular diffusivity [m2/day]. Can be a scalar (same for all 

575 pore volumes) or an array with the same length as aquifer_pore_volumes. 

576 Must be non-negative. For solute transport, this is the molecular 

577 diffusion coefficient D_m [m2/day] — typically ~1e-5 m2/day, negligible 

578 compared to mechanical dispersion. For heat transport, pass the thermal 

579 diffusivity D_th = lambda / (rho*c)_eff [m2/day], typically 0.01-0.1 

580 m2/day. 

581 

582 Internally, this value enters the dispersion formula as 

583 D_L = molecular_diffusivity + alpha_L * v_s, where v_s = L / (R * tau) 

584 is the retarded velocity. For the dispersivity term, the R factors cancel 

585 (alpha_L * L / (R * tau) * R * tau = alpha_L * L), giving correct 

586 distance-dependent spreading. For the molecular term, the spreading 

587 scales as molecular_diffusivity * R * tau. This is exact when 

588 molecular_diffusivity equals D_m/R — which is negligible for solutes 

589 (D_m ~ 1e-5) and correct for heat (where thermal diffusivity already 

590 represents D_eff, not D_m * R). 

591 longitudinal_dispersivity : float or array-like 

592 Longitudinal dispersivity [m]. Can be a scalar (same for all pore 

593 volumes) or an array with the same length as aquifer_pore_volumes. 

594 Must be non-negative. Represents microdispersion (mechanical dispersion 

595 from pore-scale velocity variations). Set to 0 for pure molecular diffusion. 

596 retardation_factor : float, optional 

597 Retardation factor of the compound in the aquifer (default 1.0). 

598 Values > 1.0 indicate slower transport due to sorption. 

599 suppress_dispersion_warning : bool, optional 

600 If True, suppress the warning when using multiple pore volumes with 

601 non-zero longitudinal_dispersivity. Default is False. 

602 asymptotic_cutoff_sigma : float or None, optional 

603 Performance optimization. Cells where the erf argument magnitude exceeds 

604 this threshold are assigned asymptotic values (±1) instead of computing 

605 the expensive integral. Since erf(3) ≈ 0.99998, the default of 3.0 

606 provides excellent accuracy with significant speedup. Set to None to 

607 disable the optimization. Default is 3.0. 

608 

609 Returns 

610 ------- 

611 numpy.ndarray 

612 Bin-averaged concentration in the extracted water. Same units as cin. 

613 Length equals len(cout_tedges) - 1. NaN values indicate time periods 

614 with no valid contributions from the infiltration data. 

615 

616 Raises 

617 ------ 

618 ValueError 

619 If input dimensions are inconsistent, if diffusivity is negative, 

620 or if aquifer_pore_volumes and streamline_length have different lengths. 

621 

622 Warns 

623 ----- 

624 UserWarning 

625 If multiple pore volumes are used with non-zero longitudinal_dispersivity. 

626 This may lead to double-counting of spreading effects. Suppress with 

627 ``suppress_dispersion_warning=True`` if this is intentional. 

628 

629 See Also 

630 -------- 

631 extraction_to_infiltration : Inverse operation (deconvolution) 

632 gwtransport.advection.infiltration_to_extraction : Pure advection (no dispersion) 

633 gwtransport.diffusion_fast.infiltration_to_extraction : Fast Gaussian approximation 

634 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion 

635 

636 Notes 

637 ----- 

638 The algorithm constructs a coefficient matrix W where cout = W @ cin: 

639 

640 1. For each pore volume, compute the breakthrough curve contribution: 

641 - delta_volume: volume between infiltration event and extraction point 

642 - step_widths: convert volume to spatial distance x (erf coordinate) 

643 - time_active: diffusion time, limited by residence time 

644 

645 2. For each infiltration time edge, compute the erf response at all 

646 extraction time edges using analytical space-time averaging. 

647 

648 3. Convert erf response to breakthrough fraction: frac = 0.5 * (1 + erf) 

649 

650 4. Coefficient for bin: coeff[i,j] = frac_start - frac_end 

651 This represents the fraction of cin[j] that arrives in cout[i]. 

652 

653 5. Average coefficients across all pore volumes. 

654 

655 The error function solution assumes an initial step function that diffuses 

656 over time. The position coordinate x represents the distance from the 

657 concentration front to the observation point. 

658 

659 Examples 

660 -------- 

661 Basic usage with constant flow: 

662 

663 >>> import pandas as pd 

664 >>> import numpy as np 

665 >>> from gwtransport.diffusion import infiltration_to_extraction 

666 >>> 

667 >>> # Create time edges 

668 >>> tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

669 >>> cout_tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D") 

670 >>> 

671 >>> # Input concentration (step function) and constant flow 

672 >>> cin = np.zeros(len(tedges) - 1) 

673 >>> cin[5:10] = 1.0 # Pulse of concentration 

674 >>> flow = np.ones(len(tedges) - 1) * 100.0 # 100 m3/day 

675 >>> 

676 >>> # Single pore volume of 500 m3, travel distance 100 m 

677 >>> aquifer_pore_volumes = np.array([500.0]) 

678 >>> streamline_length = np.array([100.0]) 

679 >>> 

680 >>> # Compute with dispersion (molecular diffusion + dispersivity) 

681 >>> # Scalar values broadcast to all pore volumes 

682 >>> cout = infiltration_to_extraction( 

683 ... cin=cin, 

684 ... flow=flow, 

685 ... tedges=tedges, 

686 ... cout_tedges=cout_tedges, 

687 ... aquifer_pore_volumes=aquifer_pore_volumes, 

688 ... streamline_length=streamline_length, 

689 ... molecular_diffusivity=1e-4, # m2/day, same for all pore volumes 

690 ... longitudinal_dispersivity=1.0, # m, same for all pore volumes 

691 ... ) 

692 

693 With multiple pore volumes (heterogeneous aquifer): 

694 

695 >>> # Distribution of pore volumes and corresponding travel distances 

696 >>> aquifer_pore_volumes = np.array([400.0, 500.0, 600.0]) 

697 >>> streamline_length = np.array([80.0, 100.0, 120.0]) 

698 >>> 

699 >>> # Scalar diffusion parameters broadcast to all pore volumes 

700 >>> cout = infiltration_to_extraction( 

701 ... cin=cin, 

702 ... flow=flow, 

703 ... tedges=tedges, 

704 ... cout_tedges=cout_tedges, 

705 ... aquifer_pore_volumes=aquifer_pore_volumes, 

706 ... streamline_length=streamline_length, 

707 ... molecular_diffusivity=1e-4, # m2/day 

708 ... longitudinal_dispersivity=1.0, # m 

709 ... ) 

710 """ 

711 # Convert to pandas DatetimeIndex if needed 

712 cout_tedges = pd.DatetimeIndex(cout_tedges) 

713 tedges = pd.DatetimeIndex(tedges) 

714 

715 # Convert to arrays 

716 cin = np.asarray(cin, dtype=float) 

717 flow = np.asarray(flow, dtype=float) 

718 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes, dtype=float) 

719 streamline_length = np.atleast_1d(np.asarray(streamline_length, dtype=float)) 

720 

721 # Convert diffusion parameters to arrays and broadcast to pore volumes 

722 n_pore_volumes = len(aquifer_pore_volumes) 

723 molecular_diffusivity = np.atleast_1d(np.asarray(molecular_diffusivity, dtype=float)) 

724 longitudinal_dispersivity = np.atleast_1d(np.asarray(longitudinal_dispersivity, dtype=float)) 

725 

726 # Broadcast scalar values to match pore volumes 

727 if streamline_length.size == 1: 

728 streamline_length = np.broadcast_to(streamline_length, (n_pore_volumes,)).copy() 

729 if molecular_diffusivity.size == 1: 

730 molecular_diffusivity = np.broadcast_to(molecular_diffusivity, (n_pore_volumes,)).copy() 

731 if longitudinal_dispersivity.size == 1: 

732 longitudinal_dispersivity = np.broadcast_to(longitudinal_dispersivity, (n_pore_volumes,)).copy() 

733 

734 # Input validation 

735 if len(tedges) != len(cin) + 1: 

736 msg = "tedges must have one more element than cin" 

737 raise ValueError(msg) 

738 if len(tedges) != len(flow) + 1: 

739 msg = "tedges must have one more element than flow" 

740 raise ValueError(msg) 

741 if len(aquifer_pore_volumes) != len(streamline_length): 

742 msg = "aquifer_pore_volumes and streamline_length must have the same length" 

743 raise ValueError(msg) 

744 if len(molecular_diffusivity) != n_pore_volumes: 

745 msg = "molecular_diffusivity must be a scalar or have same length as aquifer_pore_volumes" 

746 raise ValueError(msg) 

747 if len(longitudinal_dispersivity) != n_pore_volumes: 

748 msg = "longitudinal_dispersivity must be a scalar or have same length as aquifer_pore_volumes" 

749 raise ValueError(msg) 

750 if np.any(molecular_diffusivity < 0): 

751 msg = "molecular_diffusivity must be non-negative" 

752 raise ValueError(msg) 

753 if np.any(longitudinal_dispersivity < 0): 

754 msg = "longitudinal_dispersivity must be non-negative" 

755 raise ValueError(msg) 

756 if np.any(np.isnan(cin)): 

757 msg = "cin contains NaN values, which are not allowed" 

758 raise ValueError(msg) 

759 if np.any(np.isnan(flow)): 

760 msg = "flow contains NaN values, which are not allowed" 

761 raise ValueError(msg) 

762 if np.any(flow < 0): 

763 msg = "flow must be non-negative (negative flow not supported)" 

764 raise ValueError(msg) 

765 if np.any(aquifer_pore_volumes <= 0): 

766 msg = "aquifer_pore_volumes must be positive" 

767 raise ValueError(msg) 

768 if np.any(streamline_length <= 0): 

769 msg = "streamline_length must be positive" 

770 raise ValueError(msg) 

771 

772 # Check for conflicting approaches: multiple pore volumes with longitudinal dispersivity 

773 # Both represent the same physical phenomenon (velocity heterogeneity) at different scales. 

774 # See concept-dispersion-scales in the documentation for details. 

775 if n_pore_volumes > 1 and np.any(longitudinal_dispersivity > 0) and not suppress_dispersion_warning: 

776 msg = ( 

777 "Using multiple aquifer_pore_volumes with non-zero longitudinal_dispersivity. " 

778 "Both represent spreading from velocity heterogeneity at different scales.\n\n" 

779 "This is appropriate when:\n" 

780 " - APVD comes from streamline analysis (explicit geometry)\n" 

781 " - You want to add unresolved microdispersion and molecular diffusion\n\n" 

782 "This may double-count spreading when:\n" 

783 " - APVD was calibrated from measurements (microdispersion already included)\n\n" 

784 "For gamma-parameterized APVD, consider using the 'equivalent APVD std' approach\n" 

785 "from 05_Diffusion_Dispersion.ipynb to combine both effects in the faster advection\n" 

786 "module. Note: this variance combination is only valid for continuous (gamma)\n" 

787 "distributions, not for discrete streamline volumes.\n" 

788 "Suppress this warning with suppress_dispersion_warning=True." 

789 ) 

790 warnings.warn(msg, UserWarning, stacklevel=2) 

791 

792 coeff_matrix, valid_cout_bins = _infiltration_to_extraction_coeff_matrix( 

793 flow=flow, 

794 tedges=tedges, 

795 cout_tedges=cout_tedges, 

796 aquifer_pore_volumes=aquifer_pore_volumes, 

797 streamline_length=streamline_length, 

798 molecular_diffusivity=molecular_diffusivity, 

799 longitudinal_dispersivity=longitudinal_dispersivity, 

800 retardation_factor=retardation_factor, 

801 asymptotic_cutoff_sigma=asymptotic_cutoff_sigma, 

802 ) 

803 

804 # Matrix multiply: cout = W @ cin 

805 cout = coeff_matrix @ cin 

806 

807 # Mark output bins as invalid where no valid contributions exist: 

808 # 1. Sum of coefficients near zero (no cin has broken through yet - spinup) 

809 # 2. Output bin extends beyond input data range (from valid_cout_bins) 

810 total_coeff = np.sum(coeff_matrix, axis=1) 

811 no_valid_contribution = (total_coeff < EPSILON_COEFF_SUM) | ~valid_cout_bins 

812 cout[no_valid_contribution] = np.nan 

813 

814 return cout 

815 

816 

817def extraction_to_infiltration( 

818 *, 

819 cout: npt.ArrayLike, 

820 flow: npt.ArrayLike, 

821 tedges: pd.DatetimeIndex, 

822 cout_tedges: pd.DatetimeIndex, 

823 aquifer_pore_volumes: npt.ArrayLike, 

824 streamline_length: npt.ArrayLike, 

825 molecular_diffusivity: npt.ArrayLike, 

826 longitudinal_dispersivity: npt.ArrayLike, 

827 retardation_factor: float = 1.0, 

828 suppress_dispersion_warning: bool = False, 

829 asymptotic_cutoff_sigma: float | None = 3.0, 

830 regularization_strength: float = 1e-10, 

831) -> npt.NDArray[np.floating]: 

832 """ 

833 Compute infiltration concentration from extracted water (deconvolution with dispersion). 

834 

835 Inverts the forward transport model by building the forward coefficient 

836 matrix ``W_forward`` from :func:`infiltration_to_extraction` and solving 

837 ``W_forward @ cin = cout`` via Tikhonov regularization. Well-determined 

838 modes are dominated by the data; poorly-determined modes are pulled 

839 toward the physically motivated target (transpose-and-normalize of the 

840 forward matrix). 

841 

842 Parameters 

843 ---------- 

844 cout : array-like 

845 Concentration of the compound in extracted water [concentration units]. 

846 Length must match the number of time bins defined by cout_tedges. 

847 flow : array-like 

848 Flow rate of water in the aquifer [m3/day]. 

849 Length must match the number of time bins defined by tedges. 

850 tedges : pandas.DatetimeIndex 

851 Time edges defining bins for cin (output) and flow data. 

852 Has length of len(flow) + 1. Output cin has length len(tedges) - 1. 

853 cout_tedges : pandas.DatetimeIndex 

854 Time edges for cout data bins. Has length of len(cout) + 1. 

855 Can have different time alignment and resolution than tedges. 

856 aquifer_pore_volumes : array-like 

857 Array of aquifer pore volumes [m3] representing the distribution 

858 of flow paths. Each pore volume determines the residence time for 

859 that flow path: tau = V_pore / Q. 

860 streamline_length : array-like 

861 Array of travel distances [m] corresponding to each pore volume. 

862 Must have the same length as aquifer_pore_volumes. 

863 molecular_diffusivity : float or array-like 

864 Effective molecular diffusivity [m2/day]. Can be a scalar (same for all 

865 pore volumes) or an array with the same length as aquifer_pore_volumes. 

866 Must be non-negative. See :func:`infiltration_to_extraction` for 

867 details on the physical interpretation and the interaction with 

868 retardation_factor. 

869 longitudinal_dispersivity : float or array-like 

870 Longitudinal dispersivity [m]. Can be a scalar (same for all pore 

871 volumes) or an array with the same length as aquifer_pore_volumes. 

872 Must be non-negative. 

873 retardation_factor : float, optional 

874 Retardation factor of the compound in the aquifer (default 1.0). 

875 Values > 1.0 indicate slower transport due to sorption. 

876 suppress_dispersion_warning : bool, optional 

877 If True, suppress the warning when using multiple pore volumes with 

878 non-zero longitudinal_dispersivity. Default is False. 

879 asymptotic_cutoff_sigma : float or None, optional 

880 Performance optimization for the forward matrix construction. 

881 Default is 3.0. 

882 regularization_strength : float, optional 

883 Tikhonov regularization parameter λ. See 

884 :func:`gwtransport.advection.extraction_to_infiltration` for details. 

885 Default is 1e-10. 

886 

887 Returns 

888 ------- 

889 numpy.ndarray 

890 Bin-averaged concentration in the infiltrating water. Same units as cout. 

891 Length equals len(tedges) - 1. NaN values indicate time periods 

892 with no valid contributions from the extraction data. 

893 

894 Raises 

895 ------ 

896 ValueError 

897 If input dimensions are inconsistent, if diffusivity is negative, 

898 or if aquifer_pore_volumes and streamline_length have different lengths. 

899 

900 Warns 

901 ----- 

902 UserWarning 

903 If multiple pore volumes are used with non-zero longitudinal_dispersivity. 

904 

905 See Also 

906 -------- 

907 infiltration_to_extraction : Forward operation (convolution) 

908 gwtransport.advection.extraction_to_infiltration : Pure advection (no dispersion) 

909 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion 

910 

911 Notes 

912 ----- 

913 The algorithm builds the forward coefficient matrix ``W_forward`` (same as 

914 used by :func:`infiltration_to_extraction`) and solves ``W_forward @ cin = cout`` 

915 using :func:`gwtransport.utils.solve_tikhonov`. This ensures mathematical 

916 consistency between forward and inverse operations. 

917 

918 Examples 

919 -------- 

920 Basic usage with constant flow: 

921 

922 >>> import pandas as pd 

923 >>> import numpy as np 

924 >>> from gwtransport.diffusion import extraction_to_infiltration 

925 >>> 

926 >>> # Create time edges: tedges for cin/flow, cout_tedges for cout 

927 >>> tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

928 >>> cout_tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D") 

929 >>> 

930 >>> # Extracted concentration and constant flow 

931 >>> cout = np.zeros(len(cout_tedges) - 1) 

932 >>> cout[5:10] = 1.0 # Observed pulse at extraction 

933 >>> flow = np.ones(len(tedges) - 1) * 100.0 # 100 m3/day 

934 >>> 

935 >>> # Single pore volume of 500 m3, travel distance 100 m 

936 >>> aquifer_pore_volumes = np.array([500.0]) 

937 >>> streamline_length = np.array([100.0]) 

938 >>> 

939 >>> # Reconstruct infiltration concentration 

940 >>> cin = extraction_to_infiltration( 

941 ... cout=cout, 

942 ... flow=flow, 

943 ... tedges=tedges, 

944 ... cout_tedges=cout_tedges, 

945 ... aquifer_pore_volumes=aquifer_pore_volumes, 

946 ... streamline_length=streamline_length, 

947 ... molecular_diffusivity=1e-4, 

948 ... longitudinal_dispersivity=1.0, 

949 ... ) 

950 """ 

951 # Convert to pandas DatetimeIndex if needed 

952 tedges = pd.DatetimeIndex(tedges) 

953 cout_tedges = pd.DatetimeIndex(cout_tedges) 

954 

955 # Convert to arrays 

956 cout = np.asarray(cout, dtype=float) 

957 flow = np.asarray(flow, dtype=float) 

958 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes, dtype=float) 

959 streamline_length = np.atleast_1d(np.asarray(streamline_length, dtype=float)) 

960 

961 # Convert diffusion parameters to arrays and broadcast to pore volumes 

962 n_pore_volumes = len(aquifer_pore_volumes) 

963 molecular_diffusivity = np.atleast_1d(np.asarray(molecular_diffusivity, dtype=float)) 

964 longitudinal_dispersivity = np.atleast_1d(np.asarray(longitudinal_dispersivity, dtype=float)) 

965 

966 # Broadcast scalar values to match pore volumes 

967 if streamline_length.size == 1: 

968 streamline_length = np.broadcast_to(streamline_length, (n_pore_volumes,)).copy() 

969 if molecular_diffusivity.size == 1: 

970 molecular_diffusivity = np.broadcast_to(molecular_diffusivity, (n_pore_volumes,)).copy() 

971 if longitudinal_dispersivity.size == 1: 

972 longitudinal_dispersivity = np.broadcast_to(longitudinal_dispersivity, (n_pore_volumes,)).copy() 

973 

974 # Input validation 

975 if len(tedges) != len(flow) + 1: 

976 msg = "tedges must have one more element than flow" 

977 raise ValueError(msg) 

978 if len(cout_tedges) != len(cout) + 1: 

979 msg = "cout_tedges must have one more element than cout" 

980 raise ValueError(msg) 

981 if len(aquifer_pore_volumes) != len(streamline_length): 

982 msg = "aquifer_pore_volumes and streamline_length must have the same length" 

983 raise ValueError(msg) 

984 if len(molecular_diffusivity) != n_pore_volumes: 

985 msg = "molecular_diffusivity must be a scalar or have same length as aquifer_pore_volumes" 

986 raise ValueError(msg) 

987 if len(longitudinal_dispersivity) != n_pore_volumes: 

988 msg = "longitudinal_dispersivity must be a scalar or have same length as aquifer_pore_volumes" 

989 raise ValueError(msg) 

990 if np.any(molecular_diffusivity < 0): 

991 msg = "molecular_diffusivity must be non-negative" 

992 raise ValueError(msg) 

993 if np.any(longitudinal_dispersivity < 0): 

994 msg = "longitudinal_dispersivity must be non-negative" 

995 raise ValueError(msg) 

996 if np.any(np.isnan(cout)): 

997 msg = "cout contains NaN values, which are not allowed" 

998 raise ValueError(msg) 

999 if np.any(np.isnan(flow)): 

1000 msg = "flow contains NaN values, which are not allowed" 

1001 raise ValueError(msg) 

1002 if np.any(aquifer_pore_volumes <= 0): 

1003 msg = "aquifer_pore_volumes must be positive" 

1004 raise ValueError(msg) 

1005 if np.any(streamline_length <= 0): 

1006 msg = "streamline_length must be positive" 

1007 raise ValueError(msg) 

1008 

1009 # Check for conflicting approaches 

1010 if n_pore_volumes > 1 and np.any(longitudinal_dispersivity > 0) and not suppress_dispersion_warning: 

1011 msg = ( 

1012 "Using multiple aquifer_pore_volumes with non-zero longitudinal_dispersivity. " 

1013 "Both represent spreading from velocity heterogeneity at different scales.\n\n" 

1014 "This is appropriate when:\n" 

1015 " - APVD comes from streamline analysis (explicit geometry)\n" 

1016 " - You want to add unresolved microdispersion and molecular diffusion\n\n" 

1017 "This may double-count spreading when:\n" 

1018 " - APVD was calibrated from measurements (microdispersion already included)\n\n" 

1019 "For gamma-parameterized APVD, consider using the 'equivalent APVD std' approach\n" 

1020 "from 05_Diffusion_Dispersion.ipynb to combine both effects in the faster advection\n" 

1021 "module. Note: this variance combination is only valid for continuous (gamma)\n" 

1022 "distributions, not for discrete streamline volumes.\n" 

1023 "Suppress this warning with suppress_dispersion_warning=True." 

1024 ) 

1025 warnings.warn(msg, UserWarning, stacklevel=2) 

1026 

1027 n_cin = len(tedges) - 1 

1028 

1029 # Build forward weight matrix: W_forward @ cin = cout 

1030 w_forward, valid_cout_bins = _infiltration_to_extraction_coeff_matrix( 

1031 flow=flow, 

1032 tedges=tedges, 

1033 cout_tedges=cout_tedges, 

1034 aquifer_pore_volumes=aquifer_pore_volumes, 

1035 streamline_length=streamline_length, 

1036 molecular_diffusivity=molecular_diffusivity, 

1037 longitudinal_dispersivity=longitudinal_dispersivity, 

1038 retardation_factor=retardation_factor, 

1039 asymptotic_cutoff_sigma=asymptotic_cutoff_sigma, 

1040 ) 

1041 

1042 return solve_inverse_transport( 

1043 w_forward=w_forward, 

1044 observed=cout, 

1045 n_output=n_cin, 

1046 regularization_strength=regularization_strength, 

1047 valid_rows=valid_cout_bins, 

1048 ) 

1049 

1050 

1051def gamma_infiltration_to_extraction( 

1052 *, 

1053 cin: npt.ArrayLike, 

1054 flow: npt.ArrayLike, 

1055 tedges: pd.DatetimeIndex, 

1056 cout_tedges: pd.DatetimeIndex, 

1057 mean: float | None = None, 

1058 std: float | None = None, 

1059 loc: float = 0.0, 

1060 alpha: float | None = None, 

1061 beta: float | None = None, 

1062 n_bins: int = 100, 

1063 streamline_length: float, 

1064 molecular_diffusivity: float, 

1065 longitudinal_dispersivity: float, 

1066 retardation_factor: float = 1.0, 

1067 suppress_dispersion_warning: bool = False, 

1068 asymptotic_cutoff_sigma: float | None = 3.0, 

1069) -> npt.NDArray[np.floating]: 

1070 """ 

1071 Compute extracted concentration with advection-dispersion for gamma-distributed pore volumes. 

1072 

1073 Combines advective transport (based on gamma-distributed pore volumes) with 

1074 longitudinal dispersion (diffusive spreading during transport). This is a 

1075 convenience wrapper around :func:`infiltration_to_extraction` that parameterizes 

1076 the aquifer pore volume distribution as a (shifted) gamma distribution. 

1077 

1078 Provide either (mean, std) or (alpha, beta); ``loc`` is optional and defaults to 0. 

1079 

1080 Parameters 

1081 ---------- 

1082 cin : array-like 

1083 Concentration of the compound in infiltrating water. 

1084 flow : array-like 

1085 Flow rate of water in the aquifer [m3/day]. 

1086 tedges : pandas.DatetimeIndex 

1087 Time edges for cin and flow data. Has length len(cin) + 1. 

1088 cout_tedges : pandas.DatetimeIndex 

1089 Time edges for output data bins. Has length of desired output + 1. 

1090 mean : float, optional 

1091 Mean of the gamma distribution of the aquifer pore volume. Must be strictly 

1092 greater than ``loc``. 

1093 std : float, optional 

1094 Standard deviation of the gamma distribution of the aquifer pore volume 

1095 (invariant under the ``loc`` shift). 

1096 loc : float, optional 

1097 Location (minimum pore volume) of the gamma distribution. Must satisfy 

1098 ``0 <= loc < mean``. Default is ``0.0``. 

1099 alpha : float, optional 

1100 Shape parameter of gamma distribution of the aquifer pore volume (must be > 0). 

1101 beta : float, optional 

1102 Scale parameter of gamma distribution of the aquifer pore volume (must be > 0). 

1103 n_bins : int, optional 

1104 Number of bins to discretize the gamma distribution. Default is 100. 

1105 streamline_length : float 

1106 Travel distance through the aquifer [m]. Applied uniformly to all 

1107 gamma-discretized pore volumes. 

1108 molecular_diffusivity : float 

1109 Effective molecular diffusivity [m2/day]. Must be non-negative. 

1110 See :func:`infiltration_to_extraction` for details on the interaction 

1111 with retardation_factor. 

1112 longitudinal_dispersivity : float 

1113 Longitudinal dispersivity [m]. Must be non-negative. 

1114 retardation_factor : float, optional 

1115 Retardation factor (default 1.0). Values > 1.0 indicate slower transport. 

1116 suppress_dispersion_warning : bool, optional 

1117 Suppress warning about combining multiple pore volumes with dispersivity. 

1118 Default is False. 

1119 asymptotic_cutoff_sigma : float or None, optional 

1120 Performance optimization. Cells where the erf argument magnitude exceeds 

1121 this threshold are assigned asymptotic values. Default is 3.0. 

1122 

1123 Returns 

1124 ------- 

1125 numpy.ndarray 

1126 Bin-averaged concentration in the extracted water. Length equals 

1127 len(cout_tedges) - 1. NaN values indicate time periods with no valid 

1128 contributions from the infiltration data. 

1129 

1130 See Also 

1131 -------- 

1132 infiltration_to_extraction : Transport with explicit pore volume distribution 

1133 gamma_extraction_to_infiltration : Reverse operation (deconvolution) 

1134 gwtransport.gamma.bins : Create gamma distribution bins 

1135 gwtransport.advection.gamma_infiltration_to_extraction : Pure advection (no dispersion) 

1136 :ref:`concept-gamma-distribution` : Two-parameter pore volume model 

1137 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion 

1138 

1139 Notes 

1140 ----- 

1141 The APVD is only time-invariant under the steady-streamlines assumption 

1142 (see :ref:`assumption-steady-streamlines`). 

1143 

1144 The spreading from the gamma-distributed pore volumes represents macrodispersion 

1145 (aquifer-scale heterogeneity). When ``std`` comes from calibration on measurements, 

1146 it absorbs all mixing: macrodispersion, microdispersion, and an average molecular 

1147 diffusion contribution. When ``std`` comes from streamline analysis, it represents 

1148 macrodispersion only; microdispersion and molecular diffusion can be added via the 

1149 dispersion parameters. 

1150 See :ref:`concept-dispersion-scales` for guidance on when to add microdispersion. 

1151 

1152 Examples 

1153 -------- 

1154 >>> import pandas as pd 

1155 >>> import numpy as np 

1156 >>> from gwtransport.diffusion import gamma_infiltration_to_extraction 

1157 >>> 

1158 >>> tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

1159 >>> cout_tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D") 

1160 >>> cin = np.zeros(len(tedges) - 1) 

1161 >>> cin[5:10] = 1.0 

1162 >>> flow = np.ones(len(tedges) - 1) * 100.0 

1163 >>> 

1164 >>> cout = gamma_infiltration_to_extraction( 

1165 ... cin=cin, 

1166 ... flow=flow, 

1167 ... tedges=tedges, 

1168 ... cout_tedges=cout_tedges, 

1169 ... mean=500.0, 

1170 ... std=100.0, 

1171 ... n_bins=5, 

1172 ... streamline_length=100.0, 

1173 ... molecular_diffusivity=1e-4, 

1174 ... longitudinal_dispersivity=1.0, 

1175 ... ) 

1176 """ 

1177 bins = gamma.bins(mean=mean, std=std, loc=loc, alpha=alpha, beta=beta, n_bins=n_bins) 

1178 return infiltration_to_extraction( 

1179 cin=cin, 

1180 flow=flow, 

1181 tedges=tedges, 

1182 cout_tedges=cout_tedges, 

1183 aquifer_pore_volumes=bins["expected_values"], 

1184 streamline_length=streamline_length, 

1185 molecular_diffusivity=molecular_diffusivity, 

1186 longitudinal_dispersivity=longitudinal_dispersivity, 

1187 retardation_factor=retardation_factor, 

1188 suppress_dispersion_warning=suppress_dispersion_warning, 

1189 asymptotic_cutoff_sigma=asymptotic_cutoff_sigma, 

1190 ) 

1191 

1192 

1193def gamma_extraction_to_infiltration( 

1194 *, 

1195 cout: npt.ArrayLike, 

1196 flow: npt.ArrayLike, 

1197 tedges: pd.DatetimeIndex, 

1198 cout_tedges: pd.DatetimeIndex, 

1199 mean: float | None = None, 

1200 std: float | None = None, 

1201 loc: float = 0.0, 

1202 alpha: float | None = None, 

1203 beta: float | None = None, 

1204 n_bins: int = 100, 

1205 streamline_length: float, 

1206 molecular_diffusivity: float, 

1207 longitudinal_dispersivity: float, 

1208 retardation_factor: float = 1.0, 

1209 suppress_dispersion_warning: bool = False, 

1210 asymptotic_cutoff_sigma: float | None = 3.0, 

1211 regularization_strength: float = 1e-10, 

1212) -> npt.NDArray[np.floating]: 

1213 """ 

1214 Compute infiltration concentration from extracted water for gamma-distributed pore volumes. 

1215 

1216 Inverts the forward transport model (advection + dispersion with gamma-distributed 

1217 pore volumes) via Tikhonov regularization. This is a convenience wrapper around 

1218 :func:`extraction_to_infiltration` that parameterizes the aquifer pore volume 

1219 distribution as a (shifted) gamma distribution. 

1220 

1221 Provide either (mean, std) or (alpha, beta); ``loc`` is optional and defaults to 0. 

1222 

1223 Parameters 

1224 ---------- 

1225 cout : array-like 

1226 Concentration of the compound in extracted water. 

1227 flow : array-like 

1228 Flow rate of water in the aquifer [m3/day]. 

1229 tedges : pandas.DatetimeIndex 

1230 Time edges for cin (output) and flow data. Has length of len(flow) + 1. 

1231 cout_tedges : pandas.DatetimeIndex 

1232 Time edges for cout data bins. Has length of len(cout) + 1. 

1233 mean : float, optional 

1234 Mean of the gamma distribution of the aquifer pore volume. Must be strictly 

1235 greater than ``loc``. 

1236 std : float, optional 

1237 Standard deviation of the gamma distribution of the aquifer pore volume 

1238 (invariant under the ``loc`` shift). 

1239 loc : float, optional 

1240 Location (minimum pore volume) of the gamma distribution. Must satisfy 

1241 ``0 <= loc < mean``. Default is ``0.0``. 

1242 alpha : float, optional 

1243 Shape parameter of gamma distribution of the aquifer pore volume (must be > 0). 

1244 beta : float, optional 

1245 Scale parameter of gamma distribution of the aquifer pore volume (must be > 0). 

1246 n_bins : int, optional 

1247 Number of bins to discretize the gamma distribution. Default is 100. 

1248 streamline_length : float 

1249 Travel distance through the aquifer [m]. Applied uniformly to all 

1250 gamma-discretized pore volumes. 

1251 molecular_diffusivity : float 

1252 Effective molecular diffusivity [m2/day]. Must be non-negative. 

1253 See :func:`infiltration_to_extraction` for details on the interaction 

1254 with retardation_factor. 

1255 longitudinal_dispersivity : float 

1256 Longitudinal dispersivity [m]. Must be non-negative. 

1257 retardation_factor : float, optional 

1258 Retardation factor (default 1.0). Values > 1.0 indicate slower transport. 

1259 suppress_dispersion_warning : bool, optional 

1260 Suppress warning about combining multiple pore volumes with dispersivity. 

1261 Default is False. 

1262 asymptotic_cutoff_sigma : float or None, optional 

1263 Performance optimization for the forward matrix construction. 

1264 Default is 3.0. 

1265 regularization_strength : float, optional 

1266 Tikhonov regularization parameter. Default is 1e-10. 

1267 

1268 Returns 

1269 ------- 

1270 numpy.ndarray 

1271 Bin-averaged concentration in the infiltrating water. Length equals 

1272 len(tedges) - 1. NaN values indicate time periods with no valid 

1273 contributions from the extraction data. 

1274 

1275 See Also 

1276 -------- 

1277 extraction_to_infiltration : Deconvolution with explicit pore volume distribution 

1278 gamma_infiltration_to_extraction : Forward operation (convolution) 

1279 gwtransport.gamma.bins : Create gamma distribution bins 

1280 gwtransport.advection.gamma_extraction_to_infiltration : Pure advection (no dispersion) 

1281 :ref:`concept-gamma-distribution` : Two-parameter pore volume model 

1282 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion 

1283 

1284 Notes 

1285 ----- 

1286 The APVD is only time-invariant under the steady-streamlines assumption 

1287 (see :ref:`assumption-steady-streamlines`). 

1288 

1289 The spreading from the gamma-distributed pore volumes represents macrodispersion 

1290 (aquifer-scale heterogeneity). When ``std`` comes from calibration on measurements, 

1291 it absorbs all mixing: macrodispersion, microdispersion, and an average molecular 

1292 diffusion contribution. When ``std`` comes from streamline analysis, it represents 

1293 macrodispersion only; microdispersion and molecular diffusion can be added via the 

1294 dispersion parameters. 

1295 See :ref:`concept-dispersion-scales` for guidance on when to add microdispersion. 

1296 

1297 Examples 

1298 -------- 

1299 >>> import pandas as pd 

1300 >>> import numpy as np 

1301 >>> from gwtransport.diffusion import gamma_extraction_to_infiltration 

1302 >>> 

1303 >>> tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

1304 >>> cout_tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D") 

1305 >>> cout = np.zeros(len(cout_tedges) - 1) 

1306 >>> cout[5:10] = 1.0 

1307 >>> flow = np.ones(len(tedges) - 1) * 100.0 

1308 >>> 

1309 >>> cin = gamma_extraction_to_infiltration( 

1310 ... cout=cout, 

1311 ... flow=flow, 

1312 ... tedges=tedges, 

1313 ... cout_tedges=cout_tedges, 

1314 ... mean=500.0, 

1315 ... std=100.0, 

1316 ... n_bins=5, 

1317 ... streamline_length=100.0, 

1318 ... molecular_diffusivity=1e-4, 

1319 ... longitudinal_dispersivity=1.0, 

1320 ... ) 

1321 """ 

1322 bins = gamma.bins(mean=mean, std=std, loc=loc, alpha=alpha, beta=beta, n_bins=n_bins) 

1323 return extraction_to_infiltration( 

1324 cout=cout, 

1325 flow=flow, 

1326 tedges=tedges, 

1327 cout_tedges=cout_tedges, 

1328 aquifer_pore_volumes=bins["expected_values"], 

1329 streamline_length=streamline_length, 

1330 molecular_diffusivity=molecular_diffusivity, 

1331 longitudinal_dispersivity=longitudinal_dispersivity, 

1332 retardation_factor=retardation_factor, 

1333 suppress_dispersion_warning=suppress_dispersion_warning, 

1334 asymptotic_cutoff_sigma=asymptotic_cutoff_sigma, 

1335 regularization_strength=regularization_strength, 

1336 )