Coverage for src / gwtransport / diffusion.py: 87%

375 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-04 18:51 +0000

1""" 

2Analytical solutions for 1D advection-dispersion transport. 

3 

4This module implements analytical solutions for solute transport in 1D aquifer 

5systems, combining advection with longitudinal dispersion. The solutions are 

6based on the error function (erf) and its integrals. 

7 

8Key function: 

9- infiltration_to_extraction: Main transport function combining advection and dispersion 

10 

11The dispersion is characterized by the longitudinal dispersion coefficient D_L, 

12which is computed internally from: 

13 

14 D_L = D_m + alpha_L * v 

15 

16where: 

17- D_m is the molecular diffusion coefficient [m^2/day] 

18- alpha_L is the longitudinal dispersivity [m] 

19- v is the pore velocity [m/day], computed as v = L / tau_mean 

20 

21The velocity v is computed per (pore_volume, output_bin) using the mean residence 

22time, which correctly accounts for time-varying flow. This formulation ensures that: 

23- Molecular diffusion spreading scales with residence time: sqrt(D_m * tau) 

24- Mechanical dispersion spreading scales with travel distance: sqrt(alpha_L * L) 

25 

26This module adds microdispersion (alpha_L) and molecular diffusion (D_m) on top of 

27macrodispersion captured by the pore volume distribution (APVD). Both represent velocity 

28heterogeneity at different scales. Microdispersion is an aquifer property; macrodispersion 

29depends additionally on hydrological boundary conditions. See :ref:`concept-dispersion-scales` 

30for guidance on when to use each approach and how to avoid double-counting spreading effects. 

31""" 

32 

33import warnings 

34 

35import numpy as np 

36import numpy.typing as npt 

37import pandas as pd 

38from numpy.typing import NDArray 

39from scipy import special 

40 

41from gwtransport.residence_time import residence_time, residence_time_mean 

42 

43# Numerical tolerance for coefficient sum to determine valid output bins 

44EPSILON_COEFF_SUM = 1e-10 

45 

46 

47def _erf_integral_time( 

48 t: NDArray[np.float64], 

49 x: NDArray[np.float64], 

50 diffusivity: npt.ArrayLike, 

51) -> NDArray[np.float64]: 

52 r"""Compute the integral of the error function over time at each (t[i], x[i]) point. 

53 

54 This function computes the integral of erf(x/(2*sqrt(D*tau))) from 0 to t, 

55 where t, x, and diffusivity are broadcastable arrays. 

56 

57 The analytical solution is: 

58 

59 .. math:: 

60 \int_0^t \text{erf}\left(\frac{x}{2\sqrt{D \tau}}\right) d\tau 

61 = t \cdot \text{erf}\left(\frac{x}{2\sqrt{D t}}\right) 

62 + \frac{x \sqrt{t}}{\sqrt{\pi D}} \exp\left(-\frac{x^2}{4 D t}\right) 

63 - \frac{x^2}{2D} \text{erfc}\left(\frac{x}{2\sqrt{D t}}\right) 

64 

65 Parameters 

66 ---------- 

67 t : ndarray 

68 Input time values. Broadcastable with x and diffusivity. Must be non-negative. 

69 x : ndarray 

70 Position values. Broadcastable with t and diffusivity. 

71 diffusivity : float or ndarray 

72 Diffusivity [m²/day]. Must be positive. Broadcastable with t and x. 

73 

74 Returns 

75 ------- 

76 ndarray 

77 Integral values at each (t, x, D) point. Shape is broadcast shape of inputs. 

78 """ 

79 t = np.asarray(t, dtype=float) 

80 x = np.asarray(x, dtype=float) 

81 diffusivity = np.asarray(diffusivity, dtype=float) 

82 

83 # Broadcast all inputs to common shape 

84 t, x, diffusivity = np.broadcast_arrays(t, x, diffusivity) 

85 out = np.zeros_like(t, dtype=float) 

86 

87 # Mask for valid computation: t > 0, x != 0, and diffusivity > 0 

88 mask_valid = (t > 0.0) & (x != 0.0) & (diffusivity > 0.0) 

89 

90 if np.any(mask_valid): 

91 t_v = t[mask_valid] 

92 x_v = x[mask_valid] 

93 d_v = diffusivity[mask_valid] 

94 

95 # Use |x| for the formula and restore sign at the end. 

96 # The antiderivative is an odd function of x, but the erfc term 

97 # introduces a spurious -x²/D offset for negative x if computed 

98 # directly. 

99 sign_x = np.sign(x_v) 

100 abs_x = np.abs(x_v) 

101 

102 sqrt_t = np.sqrt(t_v) 

103 sqrt_d = np.sqrt(d_v) 

104 sqrt_pi = np.sqrt(np.pi) 

105 

106 arg = abs_x / (2 * sqrt_d * sqrt_t) 

107 exp_term = np.exp(-(abs_x**2) / (4 * d_v * t_v)) 

108 erf_term = special.erf(arg) 

109 erfc_term = special.erfc(arg) 

110 

111 term1 = t_v * erf_term 

112 term2 = (abs_x * sqrt_t / (sqrt_pi * sqrt_d)) * exp_term 

113 term3 = -(abs_x**2 / (2 * d_v)) * erfc_term 

114 

115 out[mask_valid] = sign_x * (term1 + term2 + term3) 

116 

117 # Handle infinity: as t -> inf, integral -> inf * sign(x) 

118 mask_t_inf = np.isinf(t) 

119 if np.any(mask_t_inf): 

120 out[mask_t_inf] = np.inf * np.sign(x[mask_t_inf]) 

121 

122 return out 

123 

124 

125def _erf_integral_space_time(x, t, diffusivity): 

126 """ 

127 Compute the integral of the error function in space and time at (x, t) points. 

128 

129 This function evaluates 

130 F(x[i], t[i], D[i]) for each i, where x, t, and diffusivity are broadcastable. 

131 This is useful for batched computations where we need F at arbitrary 

132 (x, t, D) triplets. 

133 

134 The double integral F(x,t,D) = ∫₀ᵗ ∫₀ˣ erf(ξ/(2√(Dτ))) dξ dτ is symmetric in x: 

135 F(-x, t, D) = F(x, t, D). The analytical formula is only valid for x >= 0, so we 

136 compute using |x| and the symmetry property. 

137 

138 Parameters 

139 ---------- 

140 x : ndarray 

141 Input values in space. Broadcastable with t and diffusivity. 

142 t : ndarray 

143 Input values in time. Broadcastable with x and diffusivity. 

144 diffusivity : float or ndarray 

145 Diffusivity [m²/day]. Must be positive. Can be a scalar or array 

146 broadcastable with x and t. 

147 

148 Returns 

149 ------- 

150 ndarray 

151 Integral F(x[i], t[i], D[i]) for each i. Shape is broadcast shape of inputs. 

152 """ 

153 x = np.asarray(x) 

154 t = np.asarray(t) 

155 diffusivity = np.asarray(diffusivity) 

156 

157 # The double integral is symmetric in x: F(-x, t, D) = F(x, t, D) 

158 # Use |x| for the computation 

159 x = np.abs(x) 

160 

161 # Broadcast all inputs to common shape 

162 x, t, diffusivity = np.broadcast_arrays(x, t, diffusivity) 

163 

164 isnan = np.isnan(x) | np.isnan(t) | np.isnan(diffusivity) 

165 

166 sqrt_diffusivity = np.sqrt(diffusivity) 

167 sqrt_pi = np.sqrt(np.pi) 

168 

169 # Handle t <= 0 or diffusivity <= 0 to avoid sqrt of negative / division by zero 

170 with np.errstate(divide="ignore", invalid="ignore"): 

171 safe_t = np.maximum(t, 1e-30) 

172 safe_d = np.maximum(diffusivity, 1e-30) 

173 sqrt_t = np.sqrt(safe_t) 

174 exp_term = np.exp(-(x**2) / (4 * safe_d * safe_t)) 

175 erf_term = special.erf(x / (2 * np.sqrt(safe_d) * sqrt_t)) 

176 

177 term1 = -4 * sqrt_diffusivity * t ** (3 / 2) / (3 * sqrt_pi) 

178 term2 = (2 * sqrt_diffusivity / sqrt_pi) * ( 

179 (2 * t ** (3 / 2) * exp_term / 3) 

180 - (sqrt_t * x**2 * exp_term / (3 * safe_d)) 

181 - (sqrt_pi * x**3 * erf_term / (6 * safe_d ** (3 / 2))) 

182 ) 

183 term3 = x * ( 

184 t * erf_term + (x**2 * erf_term / (2 * safe_d)) + (sqrt_t * x * exp_term / (sqrt_pi * np.sqrt(safe_d))) 

185 ) 

186 term4 = -(x**3) / (6 * safe_d) 

187 out = term1 + term2 + term3 + term4 

188 

189 out = np.where(isnan, np.nan, out) 

190 out = np.where(t <= 0.0, 0.0, out) 

191 out = np.where(diffusivity <= 0.0, 0.0, out) 

192 return np.where(np.isinf(x) | np.isinf(t), np.inf, out) 

193 

194 

195def _erf_integral_space( 

196 x: NDArray[np.float64], 

197 diffusivity: npt.ArrayLike, 

198 t: NDArray[np.float64], 

199 clip_to_inf: float = 6.0, 

200) -> NDArray[np.float64]: 

201 """Compute the integral of the error function at each (x[i], t[i], D[i]) point. 

202 

203 This function computes the integral of erf from 0 to x[i] at time t[i], 

204 where x, t, and diffusivity are broadcastable arrays. 

205 

206 Parameters 

207 ---------- 

208 x : ndarray 

209 Input x values. Broadcastable with t and diffusivity. 

210 diffusivity : float or ndarray 

211 Diffusivity [m²/day]. Must be non-negative. Broadcastable with x and t. 

212 t : ndarray 

213 Time values [day]. Broadcastable with x and diffusivity. Must be non-negative. 

214 clip_to_inf : float, optional 

215 Clip ax values beyond this to avoid numerical issues. Default is 6. 

216 

217 Returns 

218 ------- 

219 ndarray 

220 Integral values at each (x, t, D) point. Shape is broadcast shape of inputs. 

221 """ 

222 x = np.asarray(x) 

223 t = np.asarray(t) 

224 diffusivity = np.asarray(diffusivity) 

225 

226 # Broadcast all inputs to common shape 

227 x, t, diffusivity = np.broadcast_arrays(x, t, diffusivity) 

228 out = np.zeros_like(x, dtype=float) 

229 

230 # Compute a = 1/(2*sqrt(diffusivity*t)) from diffusivity and t 

231 # Handle edge cases: diffusivity=0 or t=0 means a=inf (step function) 

232 with np.errstate(divide="ignore", invalid="ignore"): 

233 a = np.where((diffusivity == 0.0) | (t == 0.0), np.inf, 1.0 / (2.0 * np.sqrt(diffusivity * t))) 

234 

235 # Handle a=inf case: integral of sign(x) from 0 to x is |x| 

236 mask_a_inf = np.isinf(a) 

237 if np.any(mask_a_inf): 

238 out[mask_a_inf] = np.abs(x[mask_a_inf]) 

239 

240 # Handle a=0 case: erf(0) = 0, integral is 0 

241 mask_a_zero = a == 0.0 

242 # out[mask_a_zero] already 0 

243 

244 # Handle finite non-zero a 

245 mask_valid = ~mask_a_inf & ~mask_a_zero 

246 if np.any(mask_valid): 

247 x_v = x[mask_valid] 

248 a_v = a[mask_valid] 

249 ax = a_v * x_v 

250 

251 # Initialize result for valid entries 

252 result = np.zeros_like(x_v) 

253 

254 # Handle clipped regions 

255 maskl = ax <= -clip_to_inf 

256 masku = ax >= clip_to_inf 

257 mask_mid = ~maskl & ~masku 

258 

259 result[maskl] = -x_v[maskl] - 1 / (a_v[maskl] * np.sqrt(np.pi)) 

260 result[masku] = x_v[masku] - 1 / (a_v[masku] * np.sqrt(np.pi)) 

261 result[mask_mid] = x_v[mask_mid] * special.erf(ax[mask_mid]) + (np.exp(-(ax[mask_mid] ** 2)) - 1) / ( 

262 a_v[mask_mid] * np.sqrt(np.pi) 

263 ) 

264 

265 out[mask_valid] = result 

266 

267 return out 

268 

269 

270def _erf_mean_space_time(xedges, tedges, diffusivity, *, asymptotic_cutoff_sigma=None): 

271 """ 

272 Compute the mean of the error function over paired space-time cells. 

273 

274 Computes the average value of erf(x/(2*sqrt(D*t))) over cells where 

275 cell i spans [xedges[i], xedges[i+1]] x [tedges[i], tedges[i+1]]. 

276 

277 The mean is computed using the inclusion-exclusion principle: 

278 (F(x₁,t₁,D) - F(x₀,t₁,D) - F(x₁,t₀,D) + F(x₀,t₀,D)) / ((x₁-x₀)(t₁-t₀)) 

279 

280 where F(x,t,D) = ∫₀ᵗ ∫₀ˣ erf(ξ/(2√(D·τ))) dξ dτ 

281 

282 Parameters 

283 ---------- 

284 xedges : ndarray 

285 Cell edges in space of size n. 

286 tedges : ndarray 

287 Cell edges in time of size n (same length as xedges). 

288 diffusivity : float or ndarray 

289 Diffusivity [m²/day]. Must be non-negative. Can be a scalar (same for 

290 all cells) or an array of size n-1 (one value per cell). 

291 asymptotic_cutoff_sigma : float, optional 

292 Performance optimization. When set, cells where the erf argument 

293 magnitude exceeds this threshold are assigned asymptotic values (+1 or -1) 

294 instead of computing the expensive double integral. Since erf(3) ≈ 0.99998 

295 and erf(4) ≈ 0.9999999846, values of 3-5 provide good accuracy with 

296 significant speedup for transport problems where most cells are far 

297 from the concentration front. Default is None (no cutoff, compute all). 

298 

299 Returns 

300 ------- 

301 ndarray 

302 Mean of the error function over each cell. 

303 Returns 1D array of length n-1, or scalar if n=2. 

304 

305 Notes 

306 ----- 

307 The asymptotic cutoff optimization works by checking the "weakest" erf 

308 argument in each cell (minimum |x| with maximum t). If this argument 

309 exceeds the cutoff threshold, the entire cell is in the asymptotic region 

310 and no computation is needed. This is particularly effective for advection- 

311 dispersion problems where most cells are far from the moving front. 

312 """ 

313 xedges = np.asarray(xedges) 

314 tedges = np.asarray(tedges) 

315 diffusivity = np.atleast_1d(np.asarray(diffusivity)) 

316 

317 if len(xedges) != len(tedges): 

318 msg = "xedges and tedges must have the same length" 

319 raise ValueError(msg) 

320 

321 n_cells = len(xedges) - 1 

322 

323 # Broadcast scalar diffusivity to all cells 

324 if diffusivity.size == 1: 

325 diffusivity = np.broadcast_to(diffusivity, (n_cells,)) 

326 

327 if len(diffusivity) != n_cells: 

328 msg = "diffusivity must be a scalar or have length n_cells (len(xedges) - 1)" 

329 raise ValueError(msg) 

330 

331 out = np.full(n_cells, np.nan, dtype=float) 

332 

333 # Early exit for asymptotic cells (performance optimization) 

334 # For cells far from the concentration front, erf ≈ ±1 

335 # Assumes xedges and tedges are sorted (x0 <= x1, t0 <= t1) 

336 if asymptotic_cutoff_sigma is not None: 

337 x0, x1 = xedges[:-1], xedges[1:] 

338 t1, d = tedges[1:], diffusivity 

339 # Cell doesn't straddle zero if x0 >= 0 (all positive) or x1 <= 0 (all negative) 

340 # min(|x|) = x0 if x0 >= 0, else -x1 if x1 <= 0 

341 min_abs_x = np.where(x0 >= 0, x0, -x1) 

342 # Asymptotic if min(|x|) / (2 * sqrt(D * t1)) >= cutoff, with valid t1 > 0 and d > 0 

343 mask = ((x0 >= 0) | (x1 <= 0)) & (t1 > 0) & (d > 0) 

344 mask[mask] &= min_abs_x[mask] / (2.0 * np.sqrt(d[mask] * t1[mask])) >= asymptotic_cutoff_sigma 

345 out[mask] = np.sign(x0[mask] + x1[mask]) 

346 

347 # Track which cells still need computation 

348 mask_computed = ~np.isnan(out) 

349 

350 dx = xedges[1:] - xedges[:-1] 

351 dt = tedges[1:] - tedges[:-1] 

352 

353 mask_dx_zero = (dx == 0.0) & ~mask_computed 

354 mask_dt_zero = (dt == 0.0) & ~mask_computed 

355 

356 # Handle cells where both dx=0 AND dt=0 (point evaluation of erf) 

357 mask_both_zero = mask_dx_zero & mask_dt_zero 

358 if np.any(mask_both_zero): 

359 x_pts = xedges[:-1][mask_both_zero] 

360 t_pts = tedges[:-1][mask_both_zero] 

361 d_pts = diffusivity[mask_both_zero] 

362 with np.errstate(divide="ignore", invalid="ignore"): 

363 a_pts = np.where( 

364 (d_pts == 0.0) | (t_pts == 0.0), 

365 np.inf, 

366 1.0 / (2.0 * np.sqrt(d_pts * t_pts)), 

367 ) 

368 mask_a_inf = np.isinf(a_pts) 

369 result = np.zeros(np.sum(mask_both_zero)) 

370 if np.any(mask_a_inf): 

371 result[mask_a_inf] = np.sign(x_pts[mask_a_inf]) 

372 if np.any(~mask_a_inf): 

373 result[~mask_a_inf] = special.erf(a_pts[~mask_a_inf] * x_pts[~mask_a_inf]) 

374 out[mask_both_zero] = result 

375 

376 # Handle remaining dt=0 cells (mean over space at fixed time) 

377 mask_dt_zero_only = mask_dt_zero & ~mask_dx_zero 

378 if np.any(mask_dt_zero_only): 

379 idx_dt_zero = np.where(mask_dt_zero_only)[0] 

380 n_dt_zero = len(idx_dt_zero) 

381 

382 # Build edge arrays for all dt=0 cells 

383 x_edges_flat = np.concatenate([xedges[idx_dt_zero], xedges[idx_dt_zero + 1]]) 

384 t_edges_flat = np.concatenate([tedges[idx_dt_zero], tedges[idx_dt_zero + 1]]) 

385 d_cells = diffusivity[idx_dt_zero] 

386 

387 # Replicate diffusivity for both edges of each cell 

388 d_edges = np.concatenate([d_cells, d_cells]) 

389 

390 # Compute integral at all edge points 

391 erfint_flat = _erf_integral_space(x_edges_flat, diffusivity=d_edges, t=t_edges_flat) 

392 erfint_lower = erfint_flat[:n_dt_zero] 

393 erfint_upper = erfint_flat[n_dt_zero:] 

394 

395 dx_cells = xedges[idx_dt_zero + 1] - xedges[idx_dt_zero] 

396 out[idx_dt_zero] = (erfint_upper - erfint_lower) / dx_cells 

397 

398 # Handle remaining dx=0 cells (mean over time at fixed x) 

399 mask_dx_zero_only = mask_dx_zero & ~mask_dt_zero 

400 if np.any(mask_dx_zero_only): 

401 idx_dx_zero = np.where(mask_dx_zero_only)[0] 

402 n_dx_zero = len(idx_dx_zero) 

403 

404 # Build edge arrays for all dx=0 cells 

405 t_edges_flat = np.concatenate([tedges[idx_dx_zero], tedges[idx_dx_zero + 1]]) 

406 x_edges_flat = np.concatenate([xedges[idx_dx_zero], xedges[idx_dx_zero + 1]]) 

407 d_cells = diffusivity[idx_dx_zero] 

408 

409 # Replicate diffusivity for both edges of each cell 

410 d_edges = np.concatenate([d_cells, d_cells]) 

411 

412 # Compute integral at all edge points 

413 erfint_flat = _erf_integral_time(t_edges_flat, x=x_edges_flat, diffusivity=d_edges) 

414 erfint_lower = erfint_flat[:n_dx_zero] 

415 erfint_upper = erfint_flat[n_dx_zero:] 

416 

417 dt_cells = tedges[idx_dx_zero + 1] - tedges[idx_dx_zero] 

418 out[idx_dx_zero] = (erfint_upper - erfint_lower) / dt_cells 

419 

420 # Handle remaining cells with full double integral 

421 mask_remainder = ~mask_dx_zero & ~mask_dt_zero & ~mask_computed 

422 if np.any(mask_remainder): 

423 # Check for zero diffusivity cells 

424 d_remainder = diffusivity[mask_remainder] 

425 mask_d_zero = d_remainder == 0.0 

426 

427 if np.any(mask_d_zero): 

428 # For zero diffusivity, erf becomes sign function 

429 idx_d_zero = np.where(mask_remainder)[0][mask_d_zero] 

430 x_mid = (xedges[idx_d_zero] + xedges[idx_d_zero + 1]) / 2 

431 out[idx_d_zero] = np.sign(x_mid) 

432 

433 # Handle non-zero diffusivity cells 

434 mask_d_nonzero = ~mask_d_zero 

435 if np.any(mask_d_nonzero): 

436 idx_remainder = np.where(mask_remainder)[0][mask_d_nonzero] 

437 d_nonzero = d_remainder[mask_d_nonzero] 

438 

439 # Build corner arrays: each cell has 4 corners, all using same diffusivity 

440 x_corners = np.concatenate([ 

441 xedges[idx_remainder], # x0 

442 xedges[idx_remainder + 1], # x1 

443 xedges[idx_remainder], # x0 

444 xedges[idx_remainder + 1], # x1 

445 ]) 

446 t_corners = np.concatenate([ 

447 tedges[idx_remainder], # t0 

448 tedges[idx_remainder + 1], # t1 

449 tedges[idx_remainder + 1], # t1 

450 tedges[idx_remainder], # t0 

451 ]) 

452 # Replicate diffusivity for all 4 corners of each cell 

453 d_corners = np.concatenate([d_nonzero, d_nonzero, d_nonzero, d_nonzero]) 

454 

455 f = _erf_integral_space_time(x_corners, t_corners, d_corners) 

456 n_rem = len(idx_remainder) 

457 f_00 = f[:n_rem] 

458 f_11 = f[n_rem : 2 * n_rem] 

459 f_01 = f[2 * n_rem : 3 * n_rem] 

460 f_10 = f[3 * n_rem :] 

461 

462 double_integrals = f_11 - f_10 - f_01 + f_00 

463 cell_areas = dx[idx_remainder] * dt[idx_remainder] 

464 out[idx_remainder] = double_integrals / cell_areas 

465 

466 # Handle infinite x edges 

467 le, ue = xedges[:-1], xedges[1:] 

468 out[np.isinf(le) & ~np.isinf(ue)] = -1.0 

469 out[np.isneginf(le) & np.isneginf(ue)] = -1.0 

470 out[~np.isinf(le) & np.isinf(ue)] = 1.0 

471 out[np.isposinf(le) & np.isposinf(ue)] = 1.0 

472 out[np.isneginf(le) & np.isposinf(ue)] = 0.0 

473 out[np.isposinf(le) & np.isneginf(ue)] = 0.0 

474 

475 # Handle NaN in edges 

476 out[np.isnan(xedges[:-1]) | np.isnan(xedges[1:])] = np.nan 

477 out[np.isnan(tedges[:-1]) | np.isnan(tedges[1:])] = np.nan 

478 

479 if len(out) == 1: 

480 return out[0] 

481 return out 

482 

483 

484def infiltration_to_extraction( 

485 *, 

486 cin: npt.ArrayLike, 

487 flow: npt.ArrayLike, 

488 tedges: pd.DatetimeIndex, 

489 cout_tedges: pd.DatetimeIndex, 

490 aquifer_pore_volumes: npt.ArrayLike, 

491 streamline_length: npt.ArrayLike, 

492 molecular_diffusivity: npt.ArrayLike, 

493 longitudinal_dispersivity: npt.ArrayLike, 

494 retardation_factor: float = 1.0, 

495 suppress_dispersion_warning: bool = False, 

496 asymptotic_cutoff_sigma: float | None = 3.0, 

497) -> npt.NDArray[np.floating]: 

498 """ 

499 Compute extracted concentration with advection and longitudinal dispersion. 

500 

501 This function models 1D solute transport through an aquifer system, 

502 combining advective transport (based on residence times) with longitudinal 

503 dispersion (diffusive spreading during transport). 

504 

505 The physical model assumes: 

506 1. Water infiltrates with concentration cin at time t_in 

507 2. Water travels distance L through aquifer with residence time tau = V_pore / Q 

508 3. During transport, longitudinal dispersion causes spreading 

509 4. At extraction, the concentration is a diffused breakthrough curve 

510 

511 The longitudinal dispersion coefficient D_L is computed internally as: 

512 

513 D_L = D_m + alpha_L * v 

514 

515 where v = L / tau_mean is the mean pore velocity computed from the mean 

516 residence time for each (pore_volume, output_bin) combination. This 

517 formulation correctly captures that: 

518 - Molecular diffusion spreading scales with residence time: sqrt(D_m * tau) 

519 - Mechanical dispersion spreading scales with travel distance: sqrt(alpha_L * L) 

520 

521 Parameters 

522 ---------- 

523 cin : array-like 

524 Concentration of the compound in infiltrating water [concentration units]. 

525 Length must match the number of time bins defined by tedges. 

526 flow : array-like 

527 Flow rate of water in the aquifer [m3/day]. 

528 Length must match cin and the number of time bins defined by tedges. 

529 tedges : pandas.DatetimeIndex 

530 Time edges defining bins for both cin and flow data. Has length of 

531 len(cin) + 1. 

532 cout_tedges : pandas.DatetimeIndex 

533 Time edges for output data bins. Has length of desired output + 1. 

534 The output concentration is averaged over each bin. 

535 aquifer_pore_volumes : array-like 

536 Array of aquifer pore volumes [m3] representing the distribution 

537 of flow paths. Each pore volume determines the residence time for 

538 that flow path: tau = V_pore / Q. 

539 streamline_length : array-like 

540 Array of travel distances [m] corresponding to each pore volume. 

541 Must have the same length as aquifer_pore_volumes. 

542 molecular_diffusivity : float or array-like 

543 Molecular diffusion coefficient [m2/day]. Can be a scalar (same for all 

544 pore volumes) or an array with the same length as aquifer_pore_volumes. 

545 Must be non-negative. Represents Brownian motion of solute molecules, 

546 independent of velocity. 

547 longitudinal_dispersivity : float or array-like 

548 Longitudinal dispersivity [m]. Can be a scalar (same for all pore 

549 volumes) or an array with the same length as aquifer_pore_volumes. 

550 Must be non-negative. Represents microdispersion (mechanical dispersion 

551 from pore-scale velocity variations). Set to 0 for pure molecular diffusion. 

552 retardation_factor : float, optional 

553 Retardation factor of the compound in the aquifer (default 1.0). 

554 Values > 1.0 indicate slower transport due to sorption. 

555 suppress_dispersion_warning : bool, optional 

556 If True, suppress the warning when using multiple pore volumes with 

557 non-zero longitudinal_dispersivity. Default is False. 

558 asymptotic_cutoff_sigma : float or None, optional 

559 Performance optimization. Cells where the erf argument magnitude exceeds 

560 this threshold are assigned asymptotic values (±1) instead of computing 

561 the expensive integral. Since erf(3) ≈ 0.99998, the default of 3.0 

562 provides excellent accuracy with significant speedup. Set to None to 

563 disable the optimization. Default is 3.0. 

564 

565 Returns 

566 ------- 

567 numpy.ndarray 

568 Bin-averaged concentration in the extracted water. Same units as cin. 

569 Length equals len(cout_tedges) - 1. NaN values indicate time periods 

570 with no valid contributions from the infiltration data. 

571 

572 Raises 

573 ------ 

574 ValueError 

575 If input dimensions are inconsistent, if diffusivity is negative, 

576 or if aquifer_pore_volumes and streamline_length have different lengths. 

577 

578 Warns 

579 ----- 

580 UserWarning 

581 If multiple pore volumes are used with non-zero longitudinal_dispersivity. 

582 This may lead to double-counting of spreading effects. Suppress with 

583 ``suppress_dispersion_warning=True`` if this is intentional. 

584 

585 See Also 

586 -------- 

587 extraction_to_infiltration : Inverse operation (deconvolution) 

588 gwtransport.advection.infiltration_to_extraction : Pure advection (no dispersion) 

589 gwtransport.diffusion_fast.infiltration_to_extraction : Fast Gaussian approximation 

590 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion 

591 

592 Notes 

593 ----- 

594 The algorithm constructs a coefficient matrix W where cout = W @ cin: 

595 

596 1. For each pore volume, compute the breakthrough curve contribution: 

597 - delta_volume: volume between infiltration event and extraction point 

598 - step_widths: convert volume to spatial distance x (erf coordinate) 

599 - time_active: diffusion time, limited by residence time 

600 

601 2. For each infiltration time edge, compute the erf response at all 

602 extraction time edges using analytical space-time averaging. 

603 

604 3. Convert erf response to breakthrough fraction: frac = 0.5 * (1 + erf) 

605 

606 4. Coefficient for bin: coeff[i,j] = frac_start - frac_end 

607 This represents the fraction of cin[j] that arrives in cout[i]. 

608 

609 5. Average coefficients across all pore volumes. 

610 

611 The error function solution assumes an initial step function that diffuses 

612 over time. The position coordinate x represents the distance from the 

613 concentration front to the observation point. 

614 

615 Examples 

616 -------- 

617 Basic usage with constant flow: 

618 

619 >>> import pandas as pd 

620 >>> import numpy as np 

621 >>> from gwtransport.diffusion import infiltration_to_extraction 

622 >>> 

623 >>> # Create time edges 

624 >>> tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

625 >>> cout_tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D") 

626 >>> 

627 >>> # Input concentration (step function) and constant flow 

628 >>> cin = np.zeros(len(tedges) - 1) 

629 >>> cin[5:10] = 1.0 # Pulse of concentration 

630 >>> flow = np.ones(len(tedges) - 1) * 100.0 # 100 m3/day 

631 >>> 

632 >>> # Single pore volume of 500 m3, travel distance 100 m 

633 >>> aquifer_pore_volumes = np.array([500.0]) 

634 >>> streamline_length = np.array([100.0]) 

635 >>> 

636 >>> # Compute with dispersion (molecular diffusion + dispersivity) 

637 >>> # Scalar values broadcast to all pore volumes 

638 >>> cout = infiltration_to_extraction( 

639 ... cin=cin, 

640 ... flow=flow, 

641 ... tedges=tedges, 

642 ... cout_tedges=cout_tedges, 

643 ... aquifer_pore_volumes=aquifer_pore_volumes, 

644 ... streamline_length=streamline_length, 

645 ... molecular_diffusivity=1e-4, # m2/day, same for all pore volumes 

646 ... longitudinal_dispersivity=1.0, # m, same for all pore volumes 

647 ... ) 

648 

649 With multiple pore volumes (heterogeneous aquifer): 

650 

651 >>> # Distribution of pore volumes and corresponding travel distances 

652 >>> aquifer_pore_volumes = np.array([400.0, 500.0, 600.0]) 

653 >>> streamline_length = np.array([80.0, 100.0, 120.0]) 

654 >>> 

655 >>> # Scalar diffusion parameters broadcast to all pore volumes 

656 >>> cout = infiltration_to_extraction( 

657 ... cin=cin, 

658 ... flow=flow, 

659 ... tedges=tedges, 

660 ... cout_tedges=cout_tedges, 

661 ... aquifer_pore_volumes=aquifer_pore_volumes, 

662 ... streamline_length=streamline_length, 

663 ... molecular_diffusivity=1e-4, # m2/day 

664 ... longitudinal_dispersivity=1.0, # m 

665 ... ) 

666 """ 

667 # Convert to pandas DatetimeIndex if needed 

668 cout_tedges = pd.DatetimeIndex(cout_tedges) 

669 tedges = pd.DatetimeIndex(tedges) 

670 

671 # Convert to arrays 

672 cin = np.asarray(cin, dtype=float) 

673 flow = np.asarray(flow, dtype=float) 

674 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes, dtype=float) 

675 streamline_length = np.asarray(streamline_length, dtype=float) 

676 

677 # Convert diffusion parameters to arrays and broadcast to pore volumes 

678 n_pore_volumes = len(aquifer_pore_volumes) 

679 molecular_diffusivity = np.atleast_1d(np.asarray(molecular_diffusivity, dtype=float)) 

680 longitudinal_dispersivity = np.atleast_1d(np.asarray(longitudinal_dispersivity, dtype=float)) 

681 

682 # Broadcast scalar values to match pore volumes 

683 if molecular_diffusivity.size == 1: 

684 molecular_diffusivity = np.broadcast_to(molecular_diffusivity, (n_pore_volumes,)).copy() 

685 if longitudinal_dispersivity.size == 1: 

686 longitudinal_dispersivity = np.broadcast_to(longitudinal_dispersivity, (n_pore_volumes,)).copy() 

687 

688 # Input validation 

689 if len(tedges) != len(cin) + 1: 

690 msg = "tedges must have one more element than cin" 

691 raise ValueError(msg) 

692 if len(tedges) != len(flow) + 1: 

693 msg = "tedges must have one more element than flow" 

694 raise ValueError(msg) 

695 if len(aquifer_pore_volumes) != len(streamline_length): 

696 msg = "aquifer_pore_volumes and streamline_length must have the same length" 

697 raise ValueError(msg) 

698 if len(molecular_diffusivity) != n_pore_volumes: 

699 msg = "molecular_diffusivity must be a scalar or have same length as aquifer_pore_volumes" 

700 raise ValueError(msg) 

701 if len(longitudinal_dispersivity) != n_pore_volumes: 

702 msg = "longitudinal_dispersivity must be a scalar or have same length as aquifer_pore_volumes" 

703 raise ValueError(msg) 

704 if np.any(molecular_diffusivity < 0): 

705 msg = "molecular_diffusivity must be non-negative" 

706 raise ValueError(msg) 

707 if np.any(longitudinal_dispersivity < 0): 

708 msg = "longitudinal_dispersivity must be non-negative" 

709 raise ValueError(msg) 

710 if np.any(np.isnan(cin)): 

711 msg = "cin contains NaN values, which are not allowed" 

712 raise ValueError(msg) 

713 if np.any(np.isnan(flow)): 

714 msg = "flow contains NaN values, which are not allowed" 

715 raise ValueError(msg) 

716 if np.any(aquifer_pore_volumes <= 0): 

717 msg = "aquifer_pore_volumes must be positive" 

718 raise ValueError(msg) 

719 if np.any(streamline_length <= 0): 

720 msg = "streamline_length must be positive" 

721 raise ValueError(msg) 

722 

723 # Check for conflicting approaches: multiple pore volumes with longitudinal dispersivity 

724 # Both represent the same physical phenomenon (velocity heterogeneity) at different scales. 

725 # See concept-dispersion-scales in the documentation for details. 

726 if n_pore_volumes > 1 and np.any(longitudinal_dispersivity > 0) and not suppress_dispersion_warning: 

727 msg = ( 

728 "Using multiple aquifer_pore_volumes with non-zero longitudinal_dispersivity. " 

729 "Both represent spreading from velocity heterogeneity at different scales.\n\n" 

730 "This is appropriate when:\n" 

731 " - APVD comes from streamline analysis (explicit geometry)\n" 

732 " - You want to add unresolved microdispersion and molecular diffusion\n\n" 

733 "This may double-count spreading when:\n" 

734 " - APVD was calibrated from measurements (microdispersion already included)\n\n" 

735 "For gamma-parameterized APVD, consider using the 'equivalent APVD std' approach\n" 

736 "from 05_Diffusion_Dispersion.ipynb to combine both effects in the faster advection\n" 

737 "module. Note: this variance combination is only valid for continuous (gamma)\n" 

738 "distributions, not for discrete streamline volumes.\n" 

739 "Suppress this warning with suppress_dispersion_warning=True." 

740 ) 

741 warnings.warn(msg, UserWarning, stacklevel=2) 

742 

743 # Extend tedges for spin up 

744 tedges = pd.DatetimeIndex([ 

745 tedges[0] - pd.Timedelta("36500D"), 

746 *list(tedges[1:-1]), 

747 tedges[-1] + pd.Timedelta("36500D"), 

748 ]) 

749 

750 # Compute the cumulative flow at tedges 

751 infiltration_volume = flow * (np.diff(tedges) / pd.Timedelta("1D")) # m3 

752 cumulative_volume_at_cin_tedges = np.concatenate(([0], np.cumsum(infiltration_volume))) 

753 

754 # Compute the cumulative flow at cout_tedges 

755 cumulative_volume_at_cout_tedges = np.interp(cout_tedges, tedges, cumulative_volume_at_cin_tedges) 

756 

757 rt_edges_2d = residence_time( 

758 flow=flow, 

759 flow_tedges=tedges, 

760 index=tedges, 

761 aquifer_pore_volumes=aquifer_pore_volumes, 

762 retardation_factor=retardation_factor, 

763 direction="infiltration_to_extraction", 

764 ) 

765 

766 # Compute residence time at cout_tedges to identify valid output bins 

767 # RT is NaN for cout_tedges beyond the input data range 

768 rt_at_cout_tedges = residence_time( 

769 flow=flow, 

770 flow_tedges=tedges, 

771 index=cout_tedges, 

772 aquifer_pore_volumes=aquifer_pore_volumes, 

773 retardation_factor=retardation_factor, 

774 direction="extraction_to_infiltration", 

775 ) 

776 # Output bin i is valid if both cout_tedges[i] and cout_tedges[i+1] have valid RT for all pore volumes 

777 # Check if any pore volume has NaN RT at the bin edges 

778 valid_cout_bins = ~np.any(np.isnan(rt_at_cout_tedges[:, :-1]) | np.isnan(rt_at_cout_tedges[:, 1:]), axis=0) 

779 

780 # Compute mean residence time per (pore_volume, cout_bin) for velocity calculation 

781 # Shape: (n_pore_volumes, n_cout_bins) 

782 rt_mean_2d = residence_time_mean( 

783 flow=flow, 

784 flow_tedges=tedges, 

785 tedges_out=cout_tedges, 

786 aquifer_pore_volumes=aquifer_pore_volumes, 

787 direction="extraction_to_infiltration", 

788 retardation_factor=retardation_factor, 

789 ) 

790 

791 # Compute velocity: v = L / tau_mean 

792 # Shape: (n_pore_volumes, n_cout_bins) 

793 # Use explicit validity handling instead of error suppression 

794 valid_rt = np.isfinite(rt_mean_2d) & (rt_mean_2d > 0) 

795 velocity_2d = np.where(valid_rt, streamline_length[:, None] / rt_mean_2d, 0.0) 

796 

797 # Compute D_L = D_m + alpha_L * v 

798 # Shape: (n_pore_volumes, n_cout_bins) 

799 # Fall back to molecular diffusivity only where RT is invalid (spinup period) 

800 diffusivity_2d = np.where( 

801 valid_rt, 

802 molecular_diffusivity[:, None] + longitudinal_dispersivity[:, None] * velocity_2d, 

803 molecular_diffusivity[:, None], 

804 ) 

805 

806 # Initialize coefficient matrix accumulator 

807 # Shape: (n_cout_bins, n_cin_bins) - each row represents contributions to one output bin 

808 n_cout_bins = len(cout_tedges) - 1 

809 n_cin_bins = len(cin) 

810 n_cin_edges = len(tedges) 

811 accumulated_coeff = np.zeros((n_cout_bins, n_cin_bins)) 

812 

813 # Determine when infiltration has occurred: cout_tedge must be >= tedge (infiltration time) 

814 isactive = cout_tedges.to_numpy()[:, None] >= tedges.to_numpy()[None, :] 

815 

816 # Loop over each pore volume 

817 for i_pv in range(len(aquifer_pore_volumes)): 

818 # Compute volume relationship for the erf x-coordinate: 

819 # delta_volume = volume extracted at cout_tedge - volume at infiltration - R*APV 

820 # This represents how far past the pore volume the concentration front has traveled 

821 delta_volume = ( 

822 cumulative_volume_at_cout_tedges[:, None] 

823 - cumulative_volume_at_cin_tedges[None, :] 

824 - (retardation_factor * aquifer_pore_volumes[i_pv]) 

825 ) 

826 delta_volume[~isactive] = np.nan 

827 

828 # Convert volume to spatial distance x (erf coordinate) 

829 # x = delta_volume / (R * APV) * L 

830 step_widths = delta_volume / (retardation_factor * aquifer_pore_volumes[i_pv]) * streamline_length[i_pv] 

831 

832 # Compute diffusion time: time since infiltration, limited by residence time 

833 time_active = (cout_tedges.to_numpy()[:, None] - tedges.to_numpy()[None, :]) / pd.to_timedelta(1, unit="D") 

834 time_active[~isactive] = np.nan 

835 time_active = np.minimum(time_active, rt_edges_2d[[i_pv]]) 

836 

837 # Compute erf response: response[i_cout_bin, j_cin_edge] = mean erf for cin edge j at cout bin i 

838 response = np.zeros((n_cout_bins, n_cin_edges)) 

839 

840 # Get diffusivity for this pore volume across all cout bins 

841 diff_pv = diffusivity_2d[i_pv, :] # shape (n_cout_bins,) 

842 

843 for j in range(n_cin_edges): 

844 # For each infiltration edge, compute erf response at all extraction bins 

845 xedges_j = step_widths[:, j] # shape (n_cout_edges,) 

846 tedges_j = time_active[:, j] # shape (n_cout_edges,) 

847 response[:, j] = _erf_mean_space_time( 

848 xedges_j, tedges_j, diff_pv, asymptotic_cutoff_sigma=asymptotic_cutoff_sigma 

849 ) 

850 

851 # Convert erf response [-1, 1] to breakthrough fraction [0, 1] 

852 frac = 0.5 * (1 + response) # shape (n_cout_bins, n_cin_edges) 

853 

854 # Coefficient for bin j: coeff[i, j] = frac[i, j] - frac[i, j+1] 

855 # This is the fraction of cin[j] that arrives in cout[i] 

856 frac_start = frac[:, :-1] 

857 frac_end = frac[:, 1:] 

858 

859 # Handle NaN: if frac_end is NaN but frac_start is valid, use 0.0 for frac_end 

860 # (the concentration step hasn't fully passed through yet) 

861 frac_end_filled = np.where(np.isnan(frac_end) & ~np.isnan(frac_start), 0.0, frac_end) 

862 coeff = frac_start - frac_end_filled # shape (n_cout_bins, n_cin_bins) 

863 

864 accumulated_coeff += coeff 

865 

866 # Average across pore volumes 

867 coeff_matrix = accumulated_coeff / len(aquifer_pore_volumes) 

868 

869 # Replace NaN with 0 for matrix multiplication 

870 # NaN indicates infiltration hasn't occurred yet for that (cout, cin) pair 

871 coeff_matrix_filled = np.nan_to_num(coeff_matrix, nan=0.0) 

872 

873 # Matrix multiply: cout = W @ cin 

874 cout = coeff_matrix_filled @ cin 

875 

876 # Mark output bins as invalid where no valid contributions exist: 

877 # 1. Sum of coefficients near zero (no cin has broken through yet - spinup) 

878 # 2. Output bin extends beyond input data range (from valid_cout_bins) 

879 total_coeff = np.sum(coeff_matrix_filled, axis=1) 

880 no_valid_contribution = (total_coeff < EPSILON_COEFF_SUM) | ~valid_cout_bins 

881 cout[no_valid_contribution] = np.nan 

882 

883 return cout 

884 

885 

886def extraction_to_infiltration( 

887 *, 

888 cout: npt.ArrayLike, 

889 flow: npt.ArrayLike, 

890 tedges: pd.DatetimeIndex, 

891 cin_tedges: pd.DatetimeIndex, 

892 aquifer_pore_volumes: npt.ArrayLike, 

893 streamline_length: npt.ArrayLike, 

894 molecular_diffusivity: npt.ArrayLike, 

895 longitudinal_dispersivity: npt.ArrayLike, 

896 retardation_factor: float = 1.0, 

897 suppress_dispersion_warning: bool = False, 

898 asymptotic_cutoff_sigma: float | None = 3.0, 

899) -> npt.NDArray[np.floating]: 

900 """ 

901 Compute infiltration concentration from extracted water (deconvolution with dispersion). 

902 

903 This function implements the inverse of infiltration_to_extraction, reconstructing 

904 the original infiltration concentration from the extracted water concentration. 

905 It explicitly constructs the weights matrix (rather than inverting the forward matrix) 

906 to ensure numerical stability and proper handling of dispersion effects. 

907 

908 The physical model assumes: 

909 1. Water is extracted with concentration cout at time t_out 

910 2. Water traveled distance L through aquifer with residence time tau = V_pore / Q 

911 3. During transport, longitudinal dispersion caused spreading 

912 4. At infiltration, the concentration is reconstructed from the diffused signal 

913 

914 The longitudinal dispersion coefficient D_L is computed internally as: 

915 

916 D_L = D_m + alpha_L * v 

917 

918 where v = L / tau_mean is the mean pore velocity computed from the mean 

919 residence time for each (pore_volume, cin_bin) combination. 

920 

921 Parameters 

922 ---------- 

923 cout : array-like 

924 Concentration of the compound in extracted water [concentration units]. 

925 Length must match the number of time bins defined by tedges. 

926 flow : array-like 

927 Flow rate of water in the aquifer [m3/day]. 

928 Length must match cout and the number of time bins defined by tedges. 

929 tedges : pandas.DatetimeIndex 

930 Time edges defining bins for both cout and flow data (extraction times). 

931 Has length of len(cout) + 1. 

932 cin_tedges : pandas.DatetimeIndex 

933 Time edges for output infiltration data bins. Has length of desired output + 1. 

934 The output concentration is averaged over each bin. 

935 aquifer_pore_volumes : array-like 

936 Array of aquifer pore volumes [m3] representing the distribution 

937 of flow paths. Each pore volume determines the residence time for 

938 that flow path: tau = V_pore / Q. 

939 streamline_length : array-like 

940 Array of travel distances [m] corresponding to each pore volume. 

941 Must have the same length as aquifer_pore_volumes. 

942 molecular_diffusivity : float or array-like 

943 Molecular diffusion coefficient [m2/day]. Can be a scalar (same for all 

944 pore volumes) or an array with the same length as aquifer_pore_volumes. 

945 Must be non-negative. 

946 longitudinal_dispersivity : float or array-like 

947 Longitudinal dispersivity [m]. Can be a scalar (same for all pore 

948 volumes) or an array with the same length as aquifer_pore_volumes. 

949 Must be non-negative. 

950 retardation_factor : float, optional 

951 Retardation factor of the compound in the aquifer (default 1.0). 

952 Values > 1.0 indicate slower transport due to sorption. 

953 suppress_dispersion_warning : bool, optional 

954 If True, suppress the warning when using multiple pore volumes with 

955 non-zero longitudinal_dispersivity. Default is False. 

956 asymptotic_cutoff_sigma : float or None, optional 

957 Performance optimization. Cells where the erf argument magnitude exceeds 

958 this threshold are assigned asymptotic values (±1) instead of computing 

959 the expensive integral. Since erf(3) ≈ 0.99998, the default of 3.0 

960 provides excellent accuracy with significant speedup. Set to None to 

961 disable the optimization. Default is 3.0. 

962 

963 Returns 

964 ------- 

965 numpy.ndarray 

966 Bin-averaged concentration in the infiltrating water. Same units as cout. 

967 Length equals len(cin_tedges) - 1. NaN values indicate time periods 

968 with no valid contributions from the extraction data. 

969 

970 Raises 

971 ------ 

972 ValueError 

973 If input dimensions are inconsistent, if diffusivity is negative, 

974 or if aquifer_pore_volumes and streamline_length have different lengths. 

975 

976 Warns 

977 ----- 

978 UserWarning 

979 If multiple pore volumes are used with non-zero longitudinal_dispersivity. 

980 

981 See Also 

982 -------- 

983 infiltration_to_extraction : Forward operation (convolution) 

984 gwtransport.advection.extraction_to_infiltration : Pure advection (no dispersion) 

985 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion 

986 

987 Notes 

988 ----- 

989 The algorithm constructs a coefficient matrix W where cin = W @ cout: 

990 

991 1. For each pore volume, compute the deconvolution contribution: 

992 - extraction_times: when water infiltrated at cin_tedges will be extracted 

993 - delta_volume: volume between infiltration event and extraction point 

994 - step_widths: convert volume to spatial distance x (erf coordinate) 

995 - time_active: diffusion time, limited by residence time 

996 

997 2. For each extraction time edge, compute the erf response at all 

998 infiltration time edges using analytical space-time averaging. 

999 

1000 3. Convert erf response to breakthrough fraction: frac = 0.5 * (1 + erf) 

1001 

1002 4. Coefficient for bin: coeff[i,j] = frac_start - frac_end 

1003 This represents the fraction of cout[j] that originated from cin[i]. 

1004 

1005 5. Average coefficients across all pore volumes, using nan_to_num to 

1006 prevent NaN propagation when different pore volumes have different 

1007 valid time ranges. 

1008 

1009 The error function solution assumes an initial step function that diffuses 

1010 over time. The position coordinate x represents the distance from the 

1011 concentration front to the observation point. 

1012 

1013 Examples 

1014 -------- 

1015 Basic usage with constant flow: 

1016 

1017 >>> import pandas as pd 

1018 >>> import numpy as np 

1019 >>> from gwtransport.diffusion import extraction_to_infiltration 

1020 >>> 

1021 >>> # Create time edges for extraction data 

1022 >>> tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D") 

1023 >>> cin_tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

1024 >>> 

1025 >>> # Extracted concentration and constant flow 

1026 >>> cout = np.zeros(len(tedges) - 1) 

1027 >>> cout[5:10] = 1.0 # Observed pulse at extraction 

1028 >>> flow = np.ones(len(tedges) - 1) * 100.0 # 100 m3/day 

1029 >>> 

1030 >>> # Single pore volume of 500 m3, travel distance 100 m 

1031 >>> aquifer_pore_volumes = np.array([500.0]) 

1032 >>> streamline_length = np.array([100.0]) 

1033 >>> 

1034 >>> # Reconstruct infiltration concentration 

1035 >>> cin = extraction_to_infiltration( 

1036 ... cout=cout, 

1037 ... flow=flow, 

1038 ... tedges=tedges, 

1039 ... cin_tedges=cin_tedges, 

1040 ... aquifer_pore_volumes=aquifer_pore_volumes, 

1041 ... streamline_length=streamline_length, 

1042 ... molecular_diffusivity=1e-4, 

1043 ... longitudinal_dispersivity=1.0, 

1044 ... ) 

1045 """ 

1046 # Convert to pandas DatetimeIndex if needed 

1047 cin_tedges = pd.DatetimeIndex(cin_tedges) 

1048 tedges = pd.DatetimeIndex(tedges) 

1049 

1050 # Convert to arrays 

1051 cout = np.asarray(cout, dtype=float) 

1052 flow = np.asarray(flow, dtype=float) 

1053 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes, dtype=float) 

1054 streamline_length = np.asarray(streamline_length, dtype=float) 

1055 

1056 # Convert diffusion parameters to arrays and broadcast to pore volumes 

1057 n_pore_volumes = len(aquifer_pore_volumes) 

1058 molecular_diffusivity = np.atleast_1d(np.asarray(molecular_diffusivity, dtype=float)) 

1059 longitudinal_dispersivity = np.atleast_1d(np.asarray(longitudinal_dispersivity, dtype=float)) 

1060 

1061 # Broadcast scalar values to match pore volumes 

1062 if molecular_diffusivity.size == 1: 

1063 molecular_diffusivity = np.broadcast_to(molecular_diffusivity, (n_pore_volumes,)).copy() 

1064 if longitudinal_dispersivity.size == 1: 

1065 longitudinal_dispersivity = np.broadcast_to(longitudinal_dispersivity, (n_pore_volumes,)).copy() 

1066 

1067 # Input validation 

1068 if len(tedges) != len(cout) + 1: 

1069 msg = "tedges must have one more element than cout" 

1070 raise ValueError(msg) 

1071 if len(tedges) != len(flow) + 1: 

1072 msg = "tedges must have one more element than flow" 

1073 raise ValueError(msg) 

1074 if len(aquifer_pore_volumes) != len(streamline_length): 

1075 msg = "aquifer_pore_volumes and streamline_length must have the same length" 

1076 raise ValueError(msg) 

1077 if len(molecular_diffusivity) != n_pore_volumes: 

1078 msg = "molecular_diffusivity must be a scalar or have same length as aquifer_pore_volumes" 

1079 raise ValueError(msg) 

1080 if len(longitudinal_dispersivity) != n_pore_volumes: 

1081 msg = "longitudinal_dispersivity must be a scalar or have same length as aquifer_pore_volumes" 

1082 raise ValueError(msg) 

1083 if np.any(molecular_diffusivity < 0): 

1084 msg = "molecular_diffusivity must be non-negative" 

1085 raise ValueError(msg) 

1086 if np.any(longitudinal_dispersivity < 0): 

1087 msg = "longitudinal_dispersivity must be non-negative" 

1088 raise ValueError(msg) 

1089 if np.any(np.isnan(cout)): 

1090 msg = "cout contains NaN values, which are not allowed" 

1091 raise ValueError(msg) 

1092 if np.any(np.isnan(flow)): 

1093 msg = "flow contains NaN values, which are not allowed" 

1094 raise ValueError(msg) 

1095 if np.any(aquifer_pore_volumes <= 0): 

1096 msg = "aquifer_pore_volumes must be positive" 

1097 raise ValueError(msg) 

1098 if np.any(streamline_length <= 0): 

1099 msg = "streamline_length must be positive" 

1100 raise ValueError(msg) 

1101 

1102 # Check for conflicting approaches: multiple pore volumes with longitudinal dispersivity 

1103 # Both represent the same physical phenomenon (velocity heterogeneity) at different scales. 

1104 # See concept-dispersion-scales in the documentation for details. 

1105 if n_pore_volumes > 1 and np.any(longitudinal_dispersivity > 0) and not suppress_dispersion_warning: 

1106 msg = ( 

1107 "Using multiple aquifer_pore_volumes with non-zero longitudinal_dispersivity. " 

1108 "Both represent spreading from velocity heterogeneity at different scales.\n\n" 

1109 "This is appropriate when:\n" 

1110 " - APVD comes from streamline analysis (explicit geometry)\n" 

1111 " - You want to add unresolved microdispersion and molecular diffusion\n\n" 

1112 "This may double-count spreading when:\n" 

1113 " - APVD was calibrated from measurements (microdispersion already included)\n\n" 

1114 "For gamma-parameterized APVD, consider using the 'equivalent APVD std' approach\n" 

1115 "from 05_Diffusion_Dispersion.ipynb to combine both effects in the faster advection\n" 

1116 "module. Note: this variance combination is only valid for continuous (gamma)\n" 

1117 "distributions, not for discrete streamline volumes.\n" 

1118 "Suppress this warning with suppress_dispersion_warning=True." 

1119 ) 

1120 warnings.warn(msg, UserWarning, stacklevel=2) 

1121 

1122 # Extend tedges for spin up (same as forward function) 

1123 tedges = pd.DatetimeIndex([ 

1124 tedges[0] - pd.Timedelta("36500D"), 

1125 *list(tedges[1:-1]), 

1126 tedges[-1] + pd.Timedelta("36500D"), 

1127 ]) 

1128 

1129 # Compute the cumulative flow at tedges (extraction times) 

1130 extraction_volume = flow * (np.diff(tedges) / pd.Timedelta("1D")) # m3 

1131 cumulative_volume_at_cout_tedges = np.concatenate(([0], np.cumsum(extraction_volume))) 

1132 

1133 # Compute the cumulative flow at cin_tedges (interpolated) 

1134 cumulative_volume_at_cin_tedges = np.interp(cin_tedges, tedges, cumulative_volume_at_cout_tedges) 

1135 

1136 # Compute residence times at cin_tedges using forward direction 

1137 # (from infiltration perspective: when will water infiltrated at cin_tedges be extracted?) 

1138 rt_edges_2d = residence_time( 

1139 flow=flow, 

1140 flow_tedges=tedges, 

1141 index=cin_tedges, 

1142 aquifer_pore_volumes=aquifer_pore_volumes, 

1143 retardation_factor=retardation_factor, 

1144 direction="infiltration_to_extraction", 

1145 ) 

1146 

1147 # Determine valid cin bins immediately (reuse rt_edges_2d, no duplicate computation) 

1148 # A cin bin is valid if both its edges have finite residence times across all pore volumes 

1149 valid_cin_bins = ~np.any(np.isnan(rt_edges_2d[:, :-1]) | np.isnan(rt_edges_2d[:, 1:]), axis=0) 

1150 

1151 # Compute mean residence time per (pore_volume, cin_bin) for velocity calculation 

1152 # Shape: (n_pore_volumes, n_cin_bins) 

1153 rt_mean_2d = residence_time_mean( 

1154 flow=flow, 

1155 flow_tedges=tedges, 

1156 tedges_out=cin_tedges, 

1157 aquifer_pore_volumes=aquifer_pore_volumes, 

1158 direction="infiltration_to_extraction", 

1159 retardation_factor=retardation_factor, 

1160 ) 

1161 

1162 # Compute velocity: v = L / tau_mean 

1163 # Shape: (n_pore_volumes, n_cin_bins) 

1164 # Use explicit validity handling instead of error suppression 

1165 valid_rt = np.isfinite(rt_mean_2d) & (rt_mean_2d > 0) 

1166 velocity_2d = np.where(valid_rt, streamline_length[:, None] / rt_mean_2d, 0.0) 

1167 

1168 # Compute D_L = D_m + alpha_L * v 

1169 # Shape: (n_pore_volumes, n_cin_bins) 

1170 # Fall back to molecular diffusivity only where RT is invalid (spinup period) 

1171 diffusivity_2d = np.where( 

1172 valid_rt, 

1173 molecular_diffusivity[:, None] + longitudinal_dispersivity[:, None] * velocity_2d, 

1174 molecular_diffusivity[:, None], 

1175 ) 

1176 

1177 # Initialize coefficient matrix accumulator 

1178 # Shape: (n_cin_bins, n_cout_bins) - each row represents contributions to one output bin 

1179 n_cin_bins = len(cin_tedges) - 1 

1180 n_cout_bins = len(cout) 

1181 n_cout_edges = len(tedges) 

1182 accumulated_coeff = np.zeros((n_cin_bins, n_cout_bins)) 

1183 

1184 # Loop over each pore volume 

1185 for i_pv in range(len(aquifer_pore_volumes)): 

1186 # Compute extraction times: when water infiltrated at cin_tedges will be extracted 

1187 # t_extraction = t_infiltration + residence_time 

1188 extraction_times = cin_tedges + pd.to_timedelta(rt_edges_2d[i_pv, :], unit="D") 

1189 

1190 # Convert to days relative to tedges[0] for computation 

1191 cout_tedges_days = ((tedges - tedges[0]) / pd.Timedelta("1D")).values 

1192 extraction_times_days = ((extraction_times - tedges[0]) / pd.Timedelta("1D")).values 

1193 

1194 # Compute volume relationship for the erf x-coordinate: 

1195 # delta_volume = volume at infiltration + R*APV - volume extracted at cout_tedge 

1196 # This represents how far past the pore volume the concentration front has traveled 

1197 delta_volume = ( 

1198 cumulative_volume_at_cin_tedges[:, None] 

1199 + (retardation_factor * aquifer_pore_volumes[i_pv]) 

1200 - cumulative_volume_at_cout_tedges[None, :] 

1201 ) 

1202 

1203 # Determine when extraction has occurred: extraction_time must be >= cout_tedge 

1204 isactive = extraction_times_days[:, None] >= cout_tedges_days[None, :] 

1205 delta_volume[~isactive] = np.nan 

1206 

1207 # Convert volume to spatial distance x (erf coordinate) 

1208 # x = delta_volume / (R * APV) * L 

1209 step_widths = delta_volume / (retardation_factor * aquifer_pore_volumes[i_pv]) * streamline_length[i_pv] 

1210 

1211 # Compute diffusion time: time from extraction until cout_tedge, limited by residence time 

1212 time_active = extraction_times_days[:, None] - cout_tedges_days[None, :] 

1213 time_active[~isactive] = np.nan 

1214 time_active = np.maximum(time_active, 0) # No negative times 

1215 time_active = np.minimum(time_active, rt_edges_2d[[i_pv]].T) # Limited by residence time 

1216 

1217 # Compute erf response: response[i_cin_bin, j_cout_edge] = mean erf for cin bin i at cout edge j 

1218 response = np.zeros((n_cin_bins, n_cout_edges)) 

1219 

1220 # Get diffusivity for this pore volume across all cin bins 

1221 diff_pv = diffusivity_2d[i_pv, :] # shape (n_cin_bins,) 

1222 

1223 for j in range(n_cout_edges): 

1224 # For each extraction edge, compute erf response at all infiltration bins 

1225 xedges_j = step_widths[:, j] # shape (n_cin_edges,) 

1226 tedges_j = time_active[:, j] # shape (n_cin_edges,) 

1227 response[:, j] = _erf_mean_space_time( 

1228 xedges_j, tedges_j, diff_pv, asymptotic_cutoff_sigma=asymptotic_cutoff_sigma 

1229 ) 

1230 

1231 # Convert erf response [-1, 1] to breakthrough fraction [0, 1] 

1232 frac = 0.5 * (1 + response) # shape (n_cin_bins, n_cout_edges) 

1233 

1234 # Coefficient for bin j: coeff[i, j] = frac[i, j] - frac[i, j+1] 

1235 # This is the fraction of cout[j] that originated from cin[i] 

1236 frac_start = frac[:, :-1] 

1237 frac_end = frac[:, 1:] 

1238 

1239 # Handle NaN: if frac_end is NaN but frac_start is valid, use 0.0 for frac_end 

1240 # (the concentration step hasn't fully passed through yet) 

1241 frac_end_filled = np.where(np.isnan(frac_end) & ~np.isnan(frac_start), 0.0, frac_end) 

1242 coeff = frac_start - frac_end_filled # shape (n_cin_bins, n_cout_bins) 

1243 

1244 # Replace NaN with 0 BEFORE accumulating to avoid NaN propagation 

1245 # (NaN + valid = NaN would lose valid contributions from other pore volumes) 

1246 accumulated_coeff += np.nan_to_num(coeff, nan=0.0) 

1247 

1248 # Average across pore volumes 

1249 coeff_matrix = accumulated_coeff / len(aquifer_pore_volumes) 

1250 

1251 # coeff_matrix is already NaN-free due to nan_to_num in the loop 

1252 coeff_matrix_filled = coeff_matrix 

1253 

1254 # Matrix multiply: cin = W @ cout 

1255 cin = coeff_matrix_filled @ cout 

1256 

1257 # Mark output bins as invalid where no valid contributions exist: 

1258 # 1. Sum of coefficients near zero (no cout data covers this time - spinup) 

1259 # 2. Output bin extends beyond input data range (from valid_cin_bins) 

1260 total_coeff = np.sum(coeff_matrix_filled, axis=1) 

1261 no_valid_contribution = (total_coeff < EPSILON_COEFF_SUM) | ~valid_cin_bins 

1262 cin[no_valid_contribution] = np.nan 

1263 

1264 return cin