Coverage for src / gwtransport / diffusion.py: 87%
375 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
1"""
2Analytical solutions for 1D advection-dispersion transport.
4This module implements analytical solutions for solute transport in 1D aquifer
5systems, combining advection with longitudinal dispersion. The solutions are
6based on the error function (erf) and its integrals.
8Key function:
9- infiltration_to_extraction: Main transport function combining advection and dispersion
11The dispersion is characterized by the longitudinal dispersion coefficient D_L,
12which is computed internally from:
14 D_L = D_m + alpha_L * v
16where:
17- D_m is the molecular diffusion coefficient [m^2/day]
18- alpha_L is the longitudinal dispersivity [m]
19- v is the pore velocity [m/day], computed as v = L / tau_mean
21The velocity v is computed per (pore_volume, output_bin) using the mean residence
22time, which correctly accounts for time-varying flow. This formulation ensures that:
23- Molecular diffusion spreading scales with residence time: sqrt(D_m * tau)
24- Mechanical dispersion spreading scales with travel distance: sqrt(alpha_L * L)
26This module adds microdispersion (alpha_L) and molecular diffusion (D_m) on top of
27macrodispersion captured by the pore volume distribution (APVD). Both represent velocity
28heterogeneity at different scales. Microdispersion is an aquifer property; macrodispersion
29depends additionally on hydrological boundary conditions. See :ref:`concept-dispersion-scales`
30for guidance on when to use each approach and how to avoid double-counting spreading effects.
31"""
33import warnings
35import numpy as np
36import numpy.typing as npt
37import pandas as pd
38from numpy.typing import NDArray
39from scipy import special
41from gwtransport.residence_time import residence_time, residence_time_mean
43# Numerical tolerance for coefficient sum to determine valid output bins
44EPSILON_COEFF_SUM = 1e-10
47def _erf_integral_time(
48 t: NDArray[np.float64],
49 x: NDArray[np.float64],
50 diffusivity: npt.ArrayLike,
51) -> NDArray[np.float64]:
52 r"""Compute the integral of the error function over time at each (t[i], x[i]) point.
54 This function computes the integral of erf(x/(2*sqrt(D*tau))) from 0 to t,
55 where t, x, and diffusivity are broadcastable arrays.
57 The analytical solution is:
59 .. math::
60 \int_0^t \text{erf}\left(\frac{x}{2\sqrt{D \tau}}\right) d\tau
61 = t \cdot \text{erf}\left(\frac{x}{2\sqrt{D t}}\right)
62 + \frac{x \sqrt{t}}{\sqrt{\pi D}} \exp\left(-\frac{x^2}{4 D t}\right)
63 - \frac{x^2}{2D} \text{erfc}\left(\frac{x}{2\sqrt{D t}}\right)
65 Parameters
66 ----------
67 t : ndarray
68 Input time values. Broadcastable with x and diffusivity. Must be non-negative.
69 x : ndarray
70 Position values. Broadcastable with t and diffusivity.
71 diffusivity : float or ndarray
72 Diffusivity [m²/day]. Must be positive. Broadcastable with t and x.
74 Returns
75 -------
76 ndarray
77 Integral values at each (t, x, D) point. Shape is broadcast shape of inputs.
78 """
79 t = np.asarray(t, dtype=float)
80 x = np.asarray(x, dtype=float)
81 diffusivity = np.asarray(diffusivity, dtype=float)
83 # Broadcast all inputs to common shape
84 t, x, diffusivity = np.broadcast_arrays(t, x, diffusivity)
85 out = np.zeros_like(t, dtype=float)
87 # Mask for valid computation: t > 0, x != 0, and diffusivity > 0
88 mask_valid = (t > 0.0) & (x != 0.0) & (diffusivity > 0.0)
90 if np.any(mask_valid):
91 t_v = t[mask_valid]
92 x_v = x[mask_valid]
93 d_v = diffusivity[mask_valid]
95 # Use |x| for the formula and restore sign at the end.
96 # The antiderivative is an odd function of x, but the erfc term
97 # introduces a spurious -x²/D offset for negative x if computed
98 # directly.
99 sign_x = np.sign(x_v)
100 abs_x = np.abs(x_v)
102 sqrt_t = np.sqrt(t_v)
103 sqrt_d = np.sqrt(d_v)
104 sqrt_pi = np.sqrt(np.pi)
106 arg = abs_x / (2 * sqrt_d * sqrt_t)
107 exp_term = np.exp(-(abs_x**2) / (4 * d_v * t_v))
108 erf_term = special.erf(arg)
109 erfc_term = special.erfc(arg)
111 term1 = t_v * erf_term
112 term2 = (abs_x * sqrt_t / (sqrt_pi * sqrt_d)) * exp_term
113 term3 = -(abs_x**2 / (2 * d_v)) * erfc_term
115 out[mask_valid] = sign_x * (term1 + term2 + term3)
117 # Handle infinity: as t -> inf, integral -> inf * sign(x)
118 mask_t_inf = np.isinf(t)
119 if np.any(mask_t_inf):
120 out[mask_t_inf] = np.inf * np.sign(x[mask_t_inf])
122 return out
125def _erf_integral_space_time(x, t, diffusivity):
126 """
127 Compute the integral of the error function in space and time at (x, t) points.
129 This function evaluates
130 F(x[i], t[i], D[i]) for each i, where x, t, and diffusivity are broadcastable.
131 This is useful for batched computations where we need F at arbitrary
132 (x, t, D) triplets.
134 The double integral F(x,t,D) = ∫₀ᵗ ∫₀ˣ erf(ξ/(2√(Dτ))) dξ dτ is symmetric in x:
135 F(-x, t, D) = F(x, t, D). The analytical formula is only valid for x >= 0, so we
136 compute using |x| and the symmetry property.
138 Parameters
139 ----------
140 x : ndarray
141 Input values in space. Broadcastable with t and diffusivity.
142 t : ndarray
143 Input values in time. Broadcastable with x and diffusivity.
144 diffusivity : float or ndarray
145 Diffusivity [m²/day]. Must be positive. Can be a scalar or array
146 broadcastable with x and t.
148 Returns
149 -------
150 ndarray
151 Integral F(x[i], t[i], D[i]) for each i. Shape is broadcast shape of inputs.
152 """
153 x = np.asarray(x)
154 t = np.asarray(t)
155 diffusivity = np.asarray(diffusivity)
157 # The double integral is symmetric in x: F(-x, t, D) = F(x, t, D)
158 # Use |x| for the computation
159 x = np.abs(x)
161 # Broadcast all inputs to common shape
162 x, t, diffusivity = np.broadcast_arrays(x, t, diffusivity)
164 isnan = np.isnan(x) | np.isnan(t) | np.isnan(diffusivity)
166 sqrt_diffusivity = np.sqrt(diffusivity)
167 sqrt_pi = np.sqrt(np.pi)
169 # Handle t <= 0 or diffusivity <= 0 to avoid sqrt of negative / division by zero
170 with np.errstate(divide="ignore", invalid="ignore"):
171 safe_t = np.maximum(t, 1e-30)
172 safe_d = np.maximum(diffusivity, 1e-30)
173 sqrt_t = np.sqrt(safe_t)
174 exp_term = np.exp(-(x**2) / (4 * safe_d * safe_t))
175 erf_term = special.erf(x / (2 * np.sqrt(safe_d) * sqrt_t))
177 term1 = -4 * sqrt_diffusivity * t ** (3 / 2) / (3 * sqrt_pi)
178 term2 = (2 * sqrt_diffusivity / sqrt_pi) * (
179 (2 * t ** (3 / 2) * exp_term / 3)
180 - (sqrt_t * x**2 * exp_term / (3 * safe_d))
181 - (sqrt_pi * x**3 * erf_term / (6 * safe_d ** (3 / 2)))
182 )
183 term3 = x * (
184 t * erf_term + (x**2 * erf_term / (2 * safe_d)) + (sqrt_t * x * exp_term / (sqrt_pi * np.sqrt(safe_d)))
185 )
186 term4 = -(x**3) / (6 * safe_d)
187 out = term1 + term2 + term3 + term4
189 out = np.where(isnan, np.nan, out)
190 out = np.where(t <= 0.0, 0.0, out)
191 out = np.where(diffusivity <= 0.0, 0.0, out)
192 return np.where(np.isinf(x) | np.isinf(t), np.inf, out)
195def _erf_integral_space(
196 x: NDArray[np.float64],
197 diffusivity: npt.ArrayLike,
198 t: NDArray[np.float64],
199 clip_to_inf: float = 6.0,
200) -> NDArray[np.float64]:
201 """Compute the integral of the error function at each (x[i], t[i], D[i]) point.
203 This function computes the integral of erf from 0 to x[i] at time t[i],
204 where x, t, and diffusivity are broadcastable arrays.
206 Parameters
207 ----------
208 x : ndarray
209 Input x values. Broadcastable with t and diffusivity.
210 diffusivity : float or ndarray
211 Diffusivity [m²/day]. Must be non-negative. Broadcastable with x and t.
212 t : ndarray
213 Time values [day]. Broadcastable with x and diffusivity. Must be non-negative.
214 clip_to_inf : float, optional
215 Clip ax values beyond this to avoid numerical issues. Default is 6.
217 Returns
218 -------
219 ndarray
220 Integral values at each (x, t, D) point. Shape is broadcast shape of inputs.
221 """
222 x = np.asarray(x)
223 t = np.asarray(t)
224 diffusivity = np.asarray(diffusivity)
226 # Broadcast all inputs to common shape
227 x, t, diffusivity = np.broadcast_arrays(x, t, diffusivity)
228 out = np.zeros_like(x, dtype=float)
230 # Compute a = 1/(2*sqrt(diffusivity*t)) from diffusivity and t
231 # Handle edge cases: diffusivity=0 or t=0 means a=inf (step function)
232 with np.errstate(divide="ignore", invalid="ignore"):
233 a = np.where((diffusivity == 0.0) | (t == 0.0), np.inf, 1.0 / (2.0 * np.sqrt(diffusivity * t)))
235 # Handle a=inf case: integral of sign(x) from 0 to x is |x|
236 mask_a_inf = np.isinf(a)
237 if np.any(mask_a_inf):
238 out[mask_a_inf] = np.abs(x[mask_a_inf])
240 # Handle a=0 case: erf(0) = 0, integral is 0
241 mask_a_zero = a == 0.0
242 # out[mask_a_zero] already 0
244 # Handle finite non-zero a
245 mask_valid = ~mask_a_inf & ~mask_a_zero
246 if np.any(mask_valid):
247 x_v = x[mask_valid]
248 a_v = a[mask_valid]
249 ax = a_v * x_v
251 # Initialize result for valid entries
252 result = np.zeros_like(x_v)
254 # Handle clipped regions
255 maskl = ax <= -clip_to_inf
256 masku = ax >= clip_to_inf
257 mask_mid = ~maskl & ~masku
259 result[maskl] = -x_v[maskl] - 1 / (a_v[maskl] * np.sqrt(np.pi))
260 result[masku] = x_v[masku] - 1 / (a_v[masku] * np.sqrt(np.pi))
261 result[mask_mid] = x_v[mask_mid] * special.erf(ax[mask_mid]) + (np.exp(-(ax[mask_mid] ** 2)) - 1) / (
262 a_v[mask_mid] * np.sqrt(np.pi)
263 )
265 out[mask_valid] = result
267 return out
270def _erf_mean_space_time(xedges, tedges, diffusivity, *, asymptotic_cutoff_sigma=None):
271 """
272 Compute the mean of the error function over paired space-time cells.
274 Computes the average value of erf(x/(2*sqrt(D*t))) over cells where
275 cell i spans [xedges[i], xedges[i+1]] x [tedges[i], tedges[i+1]].
277 The mean is computed using the inclusion-exclusion principle:
278 (F(x₁,t₁,D) - F(x₀,t₁,D) - F(x₁,t₀,D) + F(x₀,t₀,D)) / ((x₁-x₀)(t₁-t₀))
280 where F(x,t,D) = ∫₀ᵗ ∫₀ˣ erf(ξ/(2√(D·τ))) dξ dτ
282 Parameters
283 ----------
284 xedges : ndarray
285 Cell edges in space of size n.
286 tedges : ndarray
287 Cell edges in time of size n (same length as xedges).
288 diffusivity : float or ndarray
289 Diffusivity [m²/day]. Must be non-negative. Can be a scalar (same for
290 all cells) or an array of size n-1 (one value per cell).
291 asymptotic_cutoff_sigma : float, optional
292 Performance optimization. When set, cells where the erf argument
293 magnitude exceeds this threshold are assigned asymptotic values (+1 or -1)
294 instead of computing the expensive double integral. Since erf(3) ≈ 0.99998
295 and erf(4) ≈ 0.9999999846, values of 3-5 provide good accuracy with
296 significant speedup for transport problems where most cells are far
297 from the concentration front. Default is None (no cutoff, compute all).
299 Returns
300 -------
301 ndarray
302 Mean of the error function over each cell.
303 Returns 1D array of length n-1, or scalar if n=2.
305 Notes
306 -----
307 The asymptotic cutoff optimization works by checking the "weakest" erf
308 argument in each cell (minimum |x| with maximum t). If this argument
309 exceeds the cutoff threshold, the entire cell is in the asymptotic region
310 and no computation is needed. This is particularly effective for advection-
311 dispersion problems where most cells are far from the moving front.
312 """
313 xedges = np.asarray(xedges)
314 tedges = np.asarray(tedges)
315 diffusivity = np.atleast_1d(np.asarray(diffusivity))
317 if len(xedges) != len(tedges):
318 msg = "xedges and tedges must have the same length"
319 raise ValueError(msg)
321 n_cells = len(xedges) - 1
323 # Broadcast scalar diffusivity to all cells
324 if diffusivity.size == 1:
325 diffusivity = np.broadcast_to(diffusivity, (n_cells,))
327 if len(diffusivity) != n_cells:
328 msg = "diffusivity must be a scalar or have length n_cells (len(xedges) - 1)"
329 raise ValueError(msg)
331 out = np.full(n_cells, np.nan, dtype=float)
333 # Early exit for asymptotic cells (performance optimization)
334 # For cells far from the concentration front, erf ≈ ±1
335 # Assumes xedges and tedges are sorted (x0 <= x1, t0 <= t1)
336 if asymptotic_cutoff_sigma is not None:
337 x0, x1 = xedges[:-1], xedges[1:]
338 t1, d = tedges[1:], diffusivity
339 # Cell doesn't straddle zero if x0 >= 0 (all positive) or x1 <= 0 (all negative)
340 # min(|x|) = x0 if x0 >= 0, else -x1 if x1 <= 0
341 min_abs_x = np.where(x0 >= 0, x0, -x1)
342 # Asymptotic if min(|x|) / (2 * sqrt(D * t1)) >= cutoff, with valid t1 > 0 and d > 0
343 mask = ((x0 >= 0) | (x1 <= 0)) & (t1 > 0) & (d > 0)
344 mask[mask] &= min_abs_x[mask] / (2.0 * np.sqrt(d[mask] * t1[mask])) >= asymptotic_cutoff_sigma
345 out[mask] = np.sign(x0[mask] + x1[mask])
347 # Track which cells still need computation
348 mask_computed = ~np.isnan(out)
350 dx = xedges[1:] - xedges[:-1]
351 dt = tedges[1:] - tedges[:-1]
353 mask_dx_zero = (dx == 0.0) & ~mask_computed
354 mask_dt_zero = (dt == 0.0) & ~mask_computed
356 # Handle cells where both dx=0 AND dt=0 (point evaluation of erf)
357 mask_both_zero = mask_dx_zero & mask_dt_zero
358 if np.any(mask_both_zero):
359 x_pts = xedges[:-1][mask_both_zero]
360 t_pts = tedges[:-1][mask_both_zero]
361 d_pts = diffusivity[mask_both_zero]
362 with np.errstate(divide="ignore", invalid="ignore"):
363 a_pts = np.where(
364 (d_pts == 0.0) | (t_pts == 0.0),
365 np.inf,
366 1.0 / (2.0 * np.sqrt(d_pts * t_pts)),
367 )
368 mask_a_inf = np.isinf(a_pts)
369 result = np.zeros(np.sum(mask_both_zero))
370 if np.any(mask_a_inf):
371 result[mask_a_inf] = np.sign(x_pts[mask_a_inf])
372 if np.any(~mask_a_inf):
373 result[~mask_a_inf] = special.erf(a_pts[~mask_a_inf] * x_pts[~mask_a_inf])
374 out[mask_both_zero] = result
376 # Handle remaining dt=0 cells (mean over space at fixed time)
377 mask_dt_zero_only = mask_dt_zero & ~mask_dx_zero
378 if np.any(mask_dt_zero_only):
379 idx_dt_zero = np.where(mask_dt_zero_only)[0]
380 n_dt_zero = len(idx_dt_zero)
382 # Build edge arrays for all dt=0 cells
383 x_edges_flat = np.concatenate([xedges[idx_dt_zero], xedges[idx_dt_zero + 1]])
384 t_edges_flat = np.concatenate([tedges[idx_dt_zero], tedges[idx_dt_zero + 1]])
385 d_cells = diffusivity[idx_dt_zero]
387 # Replicate diffusivity for both edges of each cell
388 d_edges = np.concatenate([d_cells, d_cells])
390 # Compute integral at all edge points
391 erfint_flat = _erf_integral_space(x_edges_flat, diffusivity=d_edges, t=t_edges_flat)
392 erfint_lower = erfint_flat[:n_dt_zero]
393 erfint_upper = erfint_flat[n_dt_zero:]
395 dx_cells = xedges[idx_dt_zero + 1] - xedges[idx_dt_zero]
396 out[idx_dt_zero] = (erfint_upper - erfint_lower) / dx_cells
398 # Handle remaining dx=0 cells (mean over time at fixed x)
399 mask_dx_zero_only = mask_dx_zero & ~mask_dt_zero
400 if np.any(mask_dx_zero_only):
401 idx_dx_zero = np.where(mask_dx_zero_only)[0]
402 n_dx_zero = len(idx_dx_zero)
404 # Build edge arrays for all dx=0 cells
405 t_edges_flat = np.concatenate([tedges[idx_dx_zero], tedges[idx_dx_zero + 1]])
406 x_edges_flat = np.concatenate([xedges[idx_dx_zero], xedges[idx_dx_zero + 1]])
407 d_cells = diffusivity[idx_dx_zero]
409 # Replicate diffusivity for both edges of each cell
410 d_edges = np.concatenate([d_cells, d_cells])
412 # Compute integral at all edge points
413 erfint_flat = _erf_integral_time(t_edges_flat, x=x_edges_flat, diffusivity=d_edges)
414 erfint_lower = erfint_flat[:n_dx_zero]
415 erfint_upper = erfint_flat[n_dx_zero:]
417 dt_cells = tedges[idx_dx_zero + 1] - tedges[idx_dx_zero]
418 out[idx_dx_zero] = (erfint_upper - erfint_lower) / dt_cells
420 # Handle remaining cells with full double integral
421 mask_remainder = ~mask_dx_zero & ~mask_dt_zero & ~mask_computed
422 if np.any(mask_remainder):
423 # Check for zero diffusivity cells
424 d_remainder = diffusivity[mask_remainder]
425 mask_d_zero = d_remainder == 0.0
427 if np.any(mask_d_zero):
428 # For zero diffusivity, erf becomes sign function
429 idx_d_zero = np.where(mask_remainder)[0][mask_d_zero]
430 x_mid = (xedges[idx_d_zero] + xedges[idx_d_zero + 1]) / 2
431 out[idx_d_zero] = np.sign(x_mid)
433 # Handle non-zero diffusivity cells
434 mask_d_nonzero = ~mask_d_zero
435 if np.any(mask_d_nonzero):
436 idx_remainder = np.where(mask_remainder)[0][mask_d_nonzero]
437 d_nonzero = d_remainder[mask_d_nonzero]
439 # Build corner arrays: each cell has 4 corners, all using same diffusivity
440 x_corners = np.concatenate([
441 xedges[idx_remainder], # x0
442 xedges[idx_remainder + 1], # x1
443 xedges[idx_remainder], # x0
444 xedges[idx_remainder + 1], # x1
445 ])
446 t_corners = np.concatenate([
447 tedges[idx_remainder], # t0
448 tedges[idx_remainder + 1], # t1
449 tedges[idx_remainder + 1], # t1
450 tedges[idx_remainder], # t0
451 ])
452 # Replicate diffusivity for all 4 corners of each cell
453 d_corners = np.concatenate([d_nonzero, d_nonzero, d_nonzero, d_nonzero])
455 f = _erf_integral_space_time(x_corners, t_corners, d_corners)
456 n_rem = len(idx_remainder)
457 f_00 = f[:n_rem]
458 f_11 = f[n_rem : 2 * n_rem]
459 f_01 = f[2 * n_rem : 3 * n_rem]
460 f_10 = f[3 * n_rem :]
462 double_integrals = f_11 - f_10 - f_01 + f_00
463 cell_areas = dx[idx_remainder] * dt[idx_remainder]
464 out[idx_remainder] = double_integrals / cell_areas
466 # Handle infinite x edges
467 le, ue = xedges[:-1], xedges[1:]
468 out[np.isinf(le) & ~np.isinf(ue)] = -1.0
469 out[np.isneginf(le) & np.isneginf(ue)] = -1.0
470 out[~np.isinf(le) & np.isinf(ue)] = 1.0
471 out[np.isposinf(le) & np.isposinf(ue)] = 1.0
472 out[np.isneginf(le) & np.isposinf(ue)] = 0.0
473 out[np.isposinf(le) & np.isneginf(ue)] = 0.0
475 # Handle NaN in edges
476 out[np.isnan(xedges[:-1]) | np.isnan(xedges[1:])] = np.nan
477 out[np.isnan(tedges[:-1]) | np.isnan(tedges[1:])] = np.nan
479 if len(out) == 1:
480 return out[0]
481 return out
484def infiltration_to_extraction(
485 *,
486 cin: npt.ArrayLike,
487 flow: npt.ArrayLike,
488 tedges: pd.DatetimeIndex,
489 cout_tedges: pd.DatetimeIndex,
490 aquifer_pore_volumes: npt.ArrayLike,
491 streamline_length: npt.ArrayLike,
492 molecular_diffusivity: npt.ArrayLike,
493 longitudinal_dispersivity: npt.ArrayLike,
494 retardation_factor: float = 1.0,
495 suppress_dispersion_warning: bool = False,
496 asymptotic_cutoff_sigma: float | None = 3.0,
497) -> npt.NDArray[np.floating]:
498 """
499 Compute extracted concentration with advection and longitudinal dispersion.
501 This function models 1D solute transport through an aquifer system,
502 combining advective transport (based on residence times) with longitudinal
503 dispersion (diffusive spreading during transport).
505 The physical model assumes:
506 1. Water infiltrates with concentration cin at time t_in
507 2. Water travels distance L through aquifer with residence time tau = V_pore / Q
508 3. During transport, longitudinal dispersion causes spreading
509 4. At extraction, the concentration is a diffused breakthrough curve
511 The longitudinal dispersion coefficient D_L is computed internally as:
513 D_L = D_m + alpha_L * v
515 where v = L / tau_mean is the mean pore velocity computed from the mean
516 residence time for each (pore_volume, output_bin) combination. This
517 formulation correctly captures that:
518 - Molecular diffusion spreading scales with residence time: sqrt(D_m * tau)
519 - Mechanical dispersion spreading scales with travel distance: sqrt(alpha_L * L)
521 Parameters
522 ----------
523 cin : array-like
524 Concentration of the compound in infiltrating water [concentration units].
525 Length must match the number of time bins defined by tedges.
526 flow : array-like
527 Flow rate of water in the aquifer [m3/day].
528 Length must match cin and the number of time bins defined by tedges.
529 tedges : pandas.DatetimeIndex
530 Time edges defining bins for both cin and flow data. Has length of
531 len(cin) + 1.
532 cout_tedges : pandas.DatetimeIndex
533 Time edges for output data bins. Has length of desired output + 1.
534 The output concentration is averaged over each bin.
535 aquifer_pore_volumes : array-like
536 Array of aquifer pore volumes [m3] representing the distribution
537 of flow paths. Each pore volume determines the residence time for
538 that flow path: tau = V_pore / Q.
539 streamline_length : array-like
540 Array of travel distances [m] corresponding to each pore volume.
541 Must have the same length as aquifer_pore_volumes.
542 molecular_diffusivity : float or array-like
543 Molecular diffusion coefficient [m2/day]. Can be a scalar (same for all
544 pore volumes) or an array with the same length as aquifer_pore_volumes.
545 Must be non-negative. Represents Brownian motion of solute molecules,
546 independent of velocity.
547 longitudinal_dispersivity : float or array-like
548 Longitudinal dispersivity [m]. Can be a scalar (same for all pore
549 volumes) or an array with the same length as aquifer_pore_volumes.
550 Must be non-negative. Represents microdispersion (mechanical dispersion
551 from pore-scale velocity variations). Set to 0 for pure molecular diffusion.
552 retardation_factor : float, optional
553 Retardation factor of the compound in the aquifer (default 1.0).
554 Values > 1.0 indicate slower transport due to sorption.
555 suppress_dispersion_warning : bool, optional
556 If True, suppress the warning when using multiple pore volumes with
557 non-zero longitudinal_dispersivity. Default is False.
558 asymptotic_cutoff_sigma : float or None, optional
559 Performance optimization. Cells where the erf argument magnitude exceeds
560 this threshold are assigned asymptotic values (±1) instead of computing
561 the expensive integral. Since erf(3) ≈ 0.99998, the default of 3.0
562 provides excellent accuracy with significant speedup. Set to None to
563 disable the optimization. Default is 3.0.
565 Returns
566 -------
567 numpy.ndarray
568 Bin-averaged concentration in the extracted water. Same units as cin.
569 Length equals len(cout_tedges) - 1. NaN values indicate time periods
570 with no valid contributions from the infiltration data.
572 Raises
573 ------
574 ValueError
575 If input dimensions are inconsistent, if diffusivity is negative,
576 or if aquifer_pore_volumes and streamline_length have different lengths.
578 Warns
579 -----
580 UserWarning
581 If multiple pore volumes are used with non-zero longitudinal_dispersivity.
582 This may lead to double-counting of spreading effects. Suppress with
583 ``suppress_dispersion_warning=True`` if this is intentional.
585 See Also
586 --------
587 extraction_to_infiltration : Inverse operation (deconvolution)
588 gwtransport.advection.infiltration_to_extraction : Pure advection (no dispersion)
589 gwtransport.diffusion_fast.infiltration_to_extraction : Fast Gaussian approximation
590 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion
592 Notes
593 -----
594 The algorithm constructs a coefficient matrix W where cout = W @ cin:
596 1. For each pore volume, compute the breakthrough curve contribution:
597 - delta_volume: volume between infiltration event and extraction point
598 - step_widths: convert volume to spatial distance x (erf coordinate)
599 - time_active: diffusion time, limited by residence time
601 2. For each infiltration time edge, compute the erf response at all
602 extraction time edges using analytical space-time averaging.
604 3. Convert erf response to breakthrough fraction: frac = 0.5 * (1 + erf)
606 4. Coefficient for bin: coeff[i,j] = frac_start - frac_end
607 This represents the fraction of cin[j] that arrives in cout[i].
609 5. Average coefficients across all pore volumes.
611 The error function solution assumes an initial step function that diffuses
612 over time. The position coordinate x represents the distance from the
613 concentration front to the observation point.
615 Examples
616 --------
617 Basic usage with constant flow:
619 >>> import pandas as pd
620 >>> import numpy as np
621 >>> from gwtransport.diffusion import infiltration_to_extraction
622 >>>
623 >>> # Create time edges
624 >>> tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D")
625 >>> cout_tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D")
626 >>>
627 >>> # Input concentration (step function) and constant flow
628 >>> cin = np.zeros(len(tedges) - 1)
629 >>> cin[5:10] = 1.0 # Pulse of concentration
630 >>> flow = np.ones(len(tedges) - 1) * 100.0 # 100 m3/day
631 >>>
632 >>> # Single pore volume of 500 m3, travel distance 100 m
633 >>> aquifer_pore_volumes = np.array([500.0])
634 >>> streamline_length = np.array([100.0])
635 >>>
636 >>> # Compute with dispersion (molecular diffusion + dispersivity)
637 >>> # Scalar values broadcast to all pore volumes
638 >>> cout = infiltration_to_extraction(
639 ... cin=cin,
640 ... flow=flow,
641 ... tedges=tedges,
642 ... cout_tedges=cout_tedges,
643 ... aquifer_pore_volumes=aquifer_pore_volumes,
644 ... streamline_length=streamline_length,
645 ... molecular_diffusivity=1e-4, # m2/day, same for all pore volumes
646 ... longitudinal_dispersivity=1.0, # m, same for all pore volumes
647 ... )
649 With multiple pore volumes (heterogeneous aquifer):
651 >>> # Distribution of pore volumes and corresponding travel distances
652 >>> aquifer_pore_volumes = np.array([400.0, 500.0, 600.0])
653 >>> streamline_length = np.array([80.0, 100.0, 120.0])
654 >>>
655 >>> # Scalar diffusion parameters broadcast to all pore volumes
656 >>> cout = infiltration_to_extraction(
657 ... cin=cin,
658 ... flow=flow,
659 ... tedges=tedges,
660 ... cout_tedges=cout_tedges,
661 ... aquifer_pore_volumes=aquifer_pore_volumes,
662 ... streamline_length=streamline_length,
663 ... molecular_diffusivity=1e-4, # m2/day
664 ... longitudinal_dispersivity=1.0, # m
665 ... )
666 """
667 # Convert to pandas DatetimeIndex if needed
668 cout_tedges = pd.DatetimeIndex(cout_tedges)
669 tedges = pd.DatetimeIndex(tedges)
671 # Convert to arrays
672 cin = np.asarray(cin, dtype=float)
673 flow = np.asarray(flow, dtype=float)
674 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes, dtype=float)
675 streamline_length = np.asarray(streamline_length, dtype=float)
677 # Convert diffusion parameters to arrays and broadcast to pore volumes
678 n_pore_volumes = len(aquifer_pore_volumes)
679 molecular_diffusivity = np.atleast_1d(np.asarray(molecular_diffusivity, dtype=float))
680 longitudinal_dispersivity = np.atleast_1d(np.asarray(longitudinal_dispersivity, dtype=float))
682 # Broadcast scalar values to match pore volumes
683 if molecular_diffusivity.size == 1:
684 molecular_diffusivity = np.broadcast_to(molecular_diffusivity, (n_pore_volumes,)).copy()
685 if longitudinal_dispersivity.size == 1:
686 longitudinal_dispersivity = np.broadcast_to(longitudinal_dispersivity, (n_pore_volumes,)).copy()
688 # Input validation
689 if len(tedges) != len(cin) + 1:
690 msg = "tedges must have one more element than cin"
691 raise ValueError(msg)
692 if len(tedges) != len(flow) + 1:
693 msg = "tedges must have one more element than flow"
694 raise ValueError(msg)
695 if len(aquifer_pore_volumes) != len(streamline_length):
696 msg = "aquifer_pore_volumes and streamline_length must have the same length"
697 raise ValueError(msg)
698 if len(molecular_diffusivity) != n_pore_volumes:
699 msg = "molecular_diffusivity must be a scalar or have same length as aquifer_pore_volumes"
700 raise ValueError(msg)
701 if len(longitudinal_dispersivity) != n_pore_volumes:
702 msg = "longitudinal_dispersivity must be a scalar or have same length as aquifer_pore_volumes"
703 raise ValueError(msg)
704 if np.any(molecular_diffusivity < 0):
705 msg = "molecular_diffusivity must be non-negative"
706 raise ValueError(msg)
707 if np.any(longitudinal_dispersivity < 0):
708 msg = "longitudinal_dispersivity must be non-negative"
709 raise ValueError(msg)
710 if np.any(np.isnan(cin)):
711 msg = "cin contains NaN values, which are not allowed"
712 raise ValueError(msg)
713 if np.any(np.isnan(flow)):
714 msg = "flow contains NaN values, which are not allowed"
715 raise ValueError(msg)
716 if np.any(aquifer_pore_volumes <= 0):
717 msg = "aquifer_pore_volumes must be positive"
718 raise ValueError(msg)
719 if np.any(streamline_length <= 0):
720 msg = "streamline_length must be positive"
721 raise ValueError(msg)
723 # Check for conflicting approaches: multiple pore volumes with longitudinal dispersivity
724 # Both represent the same physical phenomenon (velocity heterogeneity) at different scales.
725 # See concept-dispersion-scales in the documentation for details.
726 if n_pore_volumes > 1 and np.any(longitudinal_dispersivity > 0) and not suppress_dispersion_warning:
727 msg = (
728 "Using multiple aquifer_pore_volumes with non-zero longitudinal_dispersivity. "
729 "Both represent spreading from velocity heterogeneity at different scales.\n\n"
730 "This is appropriate when:\n"
731 " - APVD comes from streamline analysis (explicit geometry)\n"
732 " - You want to add unresolved microdispersion and molecular diffusion\n\n"
733 "This may double-count spreading when:\n"
734 " - APVD was calibrated from measurements (microdispersion already included)\n\n"
735 "For gamma-parameterized APVD, consider using the 'equivalent APVD std' approach\n"
736 "from 05_Diffusion_Dispersion.ipynb to combine both effects in the faster advection\n"
737 "module. Note: this variance combination is only valid for continuous (gamma)\n"
738 "distributions, not for discrete streamline volumes.\n"
739 "Suppress this warning with suppress_dispersion_warning=True."
740 )
741 warnings.warn(msg, UserWarning, stacklevel=2)
743 # Extend tedges for spin up
744 tedges = pd.DatetimeIndex([
745 tedges[0] - pd.Timedelta("36500D"),
746 *list(tedges[1:-1]),
747 tedges[-1] + pd.Timedelta("36500D"),
748 ])
750 # Compute the cumulative flow at tedges
751 infiltration_volume = flow * (np.diff(tedges) / pd.Timedelta("1D")) # m3
752 cumulative_volume_at_cin_tedges = np.concatenate(([0], np.cumsum(infiltration_volume)))
754 # Compute the cumulative flow at cout_tedges
755 cumulative_volume_at_cout_tedges = np.interp(cout_tedges, tedges, cumulative_volume_at_cin_tedges)
757 rt_edges_2d = residence_time(
758 flow=flow,
759 flow_tedges=tedges,
760 index=tedges,
761 aquifer_pore_volumes=aquifer_pore_volumes,
762 retardation_factor=retardation_factor,
763 direction="infiltration_to_extraction",
764 )
766 # Compute residence time at cout_tedges to identify valid output bins
767 # RT is NaN for cout_tedges beyond the input data range
768 rt_at_cout_tedges = residence_time(
769 flow=flow,
770 flow_tedges=tedges,
771 index=cout_tedges,
772 aquifer_pore_volumes=aquifer_pore_volumes,
773 retardation_factor=retardation_factor,
774 direction="extraction_to_infiltration",
775 )
776 # Output bin i is valid if both cout_tedges[i] and cout_tedges[i+1] have valid RT for all pore volumes
777 # Check if any pore volume has NaN RT at the bin edges
778 valid_cout_bins = ~np.any(np.isnan(rt_at_cout_tedges[:, :-1]) | np.isnan(rt_at_cout_tedges[:, 1:]), axis=0)
780 # Compute mean residence time per (pore_volume, cout_bin) for velocity calculation
781 # Shape: (n_pore_volumes, n_cout_bins)
782 rt_mean_2d = residence_time_mean(
783 flow=flow,
784 flow_tedges=tedges,
785 tedges_out=cout_tedges,
786 aquifer_pore_volumes=aquifer_pore_volumes,
787 direction="extraction_to_infiltration",
788 retardation_factor=retardation_factor,
789 )
791 # Compute velocity: v = L / tau_mean
792 # Shape: (n_pore_volumes, n_cout_bins)
793 # Use explicit validity handling instead of error suppression
794 valid_rt = np.isfinite(rt_mean_2d) & (rt_mean_2d > 0)
795 velocity_2d = np.where(valid_rt, streamline_length[:, None] / rt_mean_2d, 0.0)
797 # Compute D_L = D_m + alpha_L * v
798 # Shape: (n_pore_volumes, n_cout_bins)
799 # Fall back to molecular diffusivity only where RT is invalid (spinup period)
800 diffusivity_2d = np.where(
801 valid_rt,
802 molecular_diffusivity[:, None] + longitudinal_dispersivity[:, None] * velocity_2d,
803 molecular_diffusivity[:, None],
804 )
806 # Initialize coefficient matrix accumulator
807 # Shape: (n_cout_bins, n_cin_bins) - each row represents contributions to one output bin
808 n_cout_bins = len(cout_tedges) - 1
809 n_cin_bins = len(cin)
810 n_cin_edges = len(tedges)
811 accumulated_coeff = np.zeros((n_cout_bins, n_cin_bins))
813 # Determine when infiltration has occurred: cout_tedge must be >= tedge (infiltration time)
814 isactive = cout_tedges.to_numpy()[:, None] >= tedges.to_numpy()[None, :]
816 # Loop over each pore volume
817 for i_pv in range(len(aquifer_pore_volumes)):
818 # Compute volume relationship for the erf x-coordinate:
819 # delta_volume = volume extracted at cout_tedge - volume at infiltration - R*APV
820 # This represents how far past the pore volume the concentration front has traveled
821 delta_volume = (
822 cumulative_volume_at_cout_tedges[:, None]
823 - cumulative_volume_at_cin_tedges[None, :]
824 - (retardation_factor * aquifer_pore_volumes[i_pv])
825 )
826 delta_volume[~isactive] = np.nan
828 # Convert volume to spatial distance x (erf coordinate)
829 # x = delta_volume / (R * APV) * L
830 step_widths = delta_volume / (retardation_factor * aquifer_pore_volumes[i_pv]) * streamline_length[i_pv]
832 # Compute diffusion time: time since infiltration, limited by residence time
833 time_active = (cout_tedges.to_numpy()[:, None] - tedges.to_numpy()[None, :]) / pd.to_timedelta(1, unit="D")
834 time_active[~isactive] = np.nan
835 time_active = np.minimum(time_active, rt_edges_2d[[i_pv]])
837 # Compute erf response: response[i_cout_bin, j_cin_edge] = mean erf for cin edge j at cout bin i
838 response = np.zeros((n_cout_bins, n_cin_edges))
840 # Get diffusivity for this pore volume across all cout bins
841 diff_pv = diffusivity_2d[i_pv, :] # shape (n_cout_bins,)
843 for j in range(n_cin_edges):
844 # For each infiltration edge, compute erf response at all extraction bins
845 xedges_j = step_widths[:, j] # shape (n_cout_edges,)
846 tedges_j = time_active[:, j] # shape (n_cout_edges,)
847 response[:, j] = _erf_mean_space_time(
848 xedges_j, tedges_j, diff_pv, asymptotic_cutoff_sigma=asymptotic_cutoff_sigma
849 )
851 # Convert erf response [-1, 1] to breakthrough fraction [0, 1]
852 frac = 0.5 * (1 + response) # shape (n_cout_bins, n_cin_edges)
854 # Coefficient for bin j: coeff[i, j] = frac[i, j] - frac[i, j+1]
855 # This is the fraction of cin[j] that arrives in cout[i]
856 frac_start = frac[:, :-1]
857 frac_end = frac[:, 1:]
859 # Handle NaN: if frac_end is NaN but frac_start is valid, use 0.0 for frac_end
860 # (the concentration step hasn't fully passed through yet)
861 frac_end_filled = np.where(np.isnan(frac_end) & ~np.isnan(frac_start), 0.0, frac_end)
862 coeff = frac_start - frac_end_filled # shape (n_cout_bins, n_cin_bins)
864 accumulated_coeff += coeff
866 # Average across pore volumes
867 coeff_matrix = accumulated_coeff / len(aquifer_pore_volumes)
869 # Replace NaN with 0 for matrix multiplication
870 # NaN indicates infiltration hasn't occurred yet for that (cout, cin) pair
871 coeff_matrix_filled = np.nan_to_num(coeff_matrix, nan=0.0)
873 # Matrix multiply: cout = W @ cin
874 cout = coeff_matrix_filled @ cin
876 # Mark output bins as invalid where no valid contributions exist:
877 # 1. Sum of coefficients near zero (no cin has broken through yet - spinup)
878 # 2. Output bin extends beyond input data range (from valid_cout_bins)
879 total_coeff = np.sum(coeff_matrix_filled, axis=1)
880 no_valid_contribution = (total_coeff < EPSILON_COEFF_SUM) | ~valid_cout_bins
881 cout[no_valid_contribution] = np.nan
883 return cout
886def extraction_to_infiltration(
887 *,
888 cout: npt.ArrayLike,
889 flow: npt.ArrayLike,
890 tedges: pd.DatetimeIndex,
891 cin_tedges: pd.DatetimeIndex,
892 aquifer_pore_volumes: npt.ArrayLike,
893 streamline_length: npt.ArrayLike,
894 molecular_diffusivity: npt.ArrayLike,
895 longitudinal_dispersivity: npt.ArrayLike,
896 retardation_factor: float = 1.0,
897 suppress_dispersion_warning: bool = False,
898 asymptotic_cutoff_sigma: float | None = 3.0,
899) -> npt.NDArray[np.floating]:
900 """
901 Compute infiltration concentration from extracted water (deconvolution with dispersion).
903 This function implements the inverse of infiltration_to_extraction, reconstructing
904 the original infiltration concentration from the extracted water concentration.
905 It explicitly constructs the weights matrix (rather than inverting the forward matrix)
906 to ensure numerical stability and proper handling of dispersion effects.
908 The physical model assumes:
909 1. Water is extracted with concentration cout at time t_out
910 2. Water traveled distance L through aquifer with residence time tau = V_pore / Q
911 3. During transport, longitudinal dispersion caused spreading
912 4. At infiltration, the concentration is reconstructed from the diffused signal
914 The longitudinal dispersion coefficient D_L is computed internally as:
916 D_L = D_m + alpha_L * v
918 where v = L / tau_mean is the mean pore velocity computed from the mean
919 residence time for each (pore_volume, cin_bin) combination.
921 Parameters
922 ----------
923 cout : array-like
924 Concentration of the compound in extracted water [concentration units].
925 Length must match the number of time bins defined by tedges.
926 flow : array-like
927 Flow rate of water in the aquifer [m3/day].
928 Length must match cout and the number of time bins defined by tedges.
929 tedges : pandas.DatetimeIndex
930 Time edges defining bins for both cout and flow data (extraction times).
931 Has length of len(cout) + 1.
932 cin_tedges : pandas.DatetimeIndex
933 Time edges for output infiltration data bins. Has length of desired output + 1.
934 The output concentration is averaged over each bin.
935 aquifer_pore_volumes : array-like
936 Array of aquifer pore volumes [m3] representing the distribution
937 of flow paths. Each pore volume determines the residence time for
938 that flow path: tau = V_pore / Q.
939 streamline_length : array-like
940 Array of travel distances [m] corresponding to each pore volume.
941 Must have the same length as aquifer_pore_volumes.
942 molecular_diffusivity : float or array-like
943 Molecular diffusion coefficient [m2/day]. Can be a scalar (same for all
944 pore volumes) or an array with the same length as aquifer_pore_volumes.
945 Must be non-negative.
946 longitudinal_dispersivity : float or array-like
947 Longitudinal dispersivity [m]. Can be a scalar (same for all pore
948 volumes) or an array with the same length as aquifer_pore_volumes.
949 Must be non-negative.
950 retardation_factor : float, optional
951 Retardation factor of the compound in the aquifer (default 1.0).
952 Values > 1.0 indicate slower transport due to sorption.
953 suppress_dispersion_warning : bool, optional
954 If True, suppress the warning when using multiple pore volumes with
955 non-zero longitudinal_dispersivity. Default is False.
956 asymptotic_cutoff_sigma : float or None, optional
957 Performance optimization. Cells where the erf argument magnitude exceeds
958 this threshold are assigned asymptotic values (±1) instead of computing
959 the expensive integral. Since erf(3) ≈ 0.99998, the default of 3.0
960 provides excellent accuracy with significant speedup. Set to None to
961 disable the optimization. Default is 3.0.
963 Returns
964 -------
965 numpy.ndarray
966 Bin-averaged concentration in the infiltrating water. Same units as cout.
967 Length equals len(cin_tedges) - 1. NaN values indicate time periods
968 with no valid contributions from the extraction data.
970 Raises
971 ------
972 ValueError
973 If input dimensions are inconsistent, if diffusivity is negative,
974 or if aquifer_pore_volumes and streamline_length have different lengths.
976 Warns
977 -----
978 UserWarning
979 If multiple pore volumes are used with non-zero longitudinal_dispersivity.
981 See Also
982 --------
983 infiltration_to_extraction : Forward operation (convolution)
984 gwtransport.advection.extraction_to_infiltration : Pure advection (no dispersion)
985 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion
987 Notes
988 -----
989 The algorithm constructs a coefficient matrix W where cin = W @ cout:
991 1. For each pore volume, compute the deconvolution contribution:
992 - extraction_times: when water infiltrated at cin_tedges will be extracted
993 - delta_volume: volume between infiltration event and extraction point
994 - step_widths: convert volume to spatial distance x (erf coordinate)
995 - time_active: diffusion time, limited by residence time
997 2. For each extraction time edge, compute the erf response at all
998 infiltration time edges using analytical space-time averaging.
1000 3. Convert erf response to breakthrough fraction: frac = 0.5 * (1 + erf)
1002 4. Coefficient for bin: coeff[i,j] = frac_start - frac_end
1003 This represents the fraction of cout[j] that originated from cin[i].
1005 5. Average coefficients across all pore volumes, using nan_to_num to
1006 prevent NaN propagation when different pore volumes have different
1007 valid time ranges.
1009 The error function solution assumes an initial step function that diffuses
1010 over time. The position coordinate x represents the distance from the
1011 concentration front to the observation point.
1013 Examples
1014 --------
1015 Basic usage with constant flow:
1017 >>> import pandas as pd
1018 >>> import numpy as np
1019 >>> from gwtransport.diffusion import extraction_to_infiltration
1020 >>>
1021 >>> # Create time edges for extraction data
1022 >>> tedges = pd.date_range(start="2020-01-05", end="2020-01-25", freq="D")
1023 >>> cin_tedges = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D")
1024 >>>
1025 >>> # Extracted concentration and constant flow
1026 >>> cout = np.zeros(len(tedges) - 1)
1027 >>> cout[5:10] = 1.0 # Observed pulse at extraction
1028 >>> flow = np.ones(len(tedges) - 1) * 100.0 # 100 m3/day
1029 >>>
1030 >>> # Single pore volume of 500 m3, travel distance 100 m
1031 >>> aquifer_pore_volumes = np.array([500.0])
1032 >>> streamline_length = np.array([100.0])
1033 >>>
1034 >>> # Reconstruct infiltration concentration
1035 >>> cin = extraction_to_infiltration(
1036 ... cout=cout,
1037 ... flow=flow,
1038 ... tedges=tedges,
1039 ... cin_tedges=cin_tedges,
1040 ... aquifer_pore_volumes=aquifer_pore_volumes,
1041 ... streamline_length=streamline_length,
1042 ... molecular_diffusivity=1e-4,
1043 ... longitudinal_dispersivity=1.0,
1044 ... )
1045 """
1046 # Convert to pandas DatetimeIndex if needed
1047 cin_tedges = pd.DatetimeIndex(cin_tedges)
1048 tedges = pd.DatetimeIndex(tedges)
1050 # Convert to arrays
1051 cout = np.asarray(cout, dtype=float)
1052 flow = np.asarray(flow, dtype=float)
1053 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes, dtype=float)
1054 streamline_length = np.asarray(streamline_length, dtype=float)
1056 # Convert diffusion parameters to arrays and broadcast to pore volumes
1057 n_pore_volumes = len(aquifer_pore_volumes)
1058 molecular_diffusivity = np.atleast_1d(np.asarray(molecular_diffusivity, dtype=float))
1059 longitudinal_dispersivity = np.atleast_1d(np.asarray(longitudinal_dispersivity, dtype=float))
1061 # Broadcast scalar values to match pore volumes
1062 if molecular_diffusivity.size == 1:
1063 molecular_diffusivity = np.broadcast_to(molecular_diffusivity, (n_pore_volumes,)).copy()
1064 if longitudinal_dispersivity.size == 1:
1065 longitudinal_dispersivity = np.broadcast_to(longitudinal_dispersivity, (n_pore_volumes,)).copy()
1067 # Input validation
1068 if len(tedges) != len(cout) + 1:
1069 msg = "tedges must have one more element than cout"
1070 raise ValueError(msg)
1071 if len(tedges) != len(flow) + 1:
1072 msg = "tedges must have one more element than flow"
1073 raise ValueError(msg)
1074 if len(aquifer_pore_volumes) != len(streamline_length):
1075 msg = "aquifer_pore_volumes and streamline_length must have the same length"
1076 raise ValueError(msg)
1077 if len(molecular_diffusivity) != n_pore_volumes:
1078 msg = "molecular_diffusivity must be a scalar or have same length as aquifer_pore_volumes"
1079 raise ValueError(msg)
1080 if len(longitudinal_dispersivity) != n_pore_volumes:
1081 msg = "longitudinal_dispersivity must be a scalar or have same length as aquifer_pore_volumes"
1082 raise ValueError(msg)
1083 if np.any(molecular_diffusivity < 0):
1084 msg = "molecular_diffusivity must be non-negative"
1085 raise ValueError(msg)
1086 if np.any(longitudinal_dispersivity < 0):
1087 msg = "longitudinal_dispersivity must be non-negative"
1088 raise ValueError(msg)
1089 if np.any(np.isnan(cout)):
1090 msg = "cout contains NaN values, which are not allowed"
1091 raise ValueError(msg)
1092 if np.any(np.isnan(flow)):
1093 msg = "flow contains NaN values, which are not allowed"
1094 raise ValueError(msg)
1095 if np.any(aquifer_pore_volumes <= 0):
1096 msg = "aquifer_pore_volumes must be positive"
1097 raise ValueError(msg)
1098 if np.any(streamline_length <= 0):
1099 msg = "streamline_length must be positive"
1100 raise ValueError(msg)
1102 # Check for conflicting approaches: multiple pore volumes with longitudinal dispersivity
1103 # Both represent the same physical phenomenon (velocity heterogeneity) at different scales.
1104 # See concept-dispersion-scales in the documentation for details.
1105 if n_pore_volumes > 1 and np.any(longitudinal_dispersivity > 0) and not suppress_dispersion_warning:
1106 msg = (
1107 "Using multiple aquifer_pore_volumes with non-zero longitudinal_dispersivity. "
1108 "Both represent spreading from velocity heterogeneity at different scales.\n\n"
1109 "This is appropriate when:\n"
1110 " - APVD comes from streamline analysis (explicit geometry)\n"
1111 " - You want to add unresolved microdispersion and molecular diffusion\n\n"
1112 "This may double-count spreading when:\n"
1113 " - APVD was calibrated from measurements (microdispersion already included)\n\n"
1114 "For gamma-parameterized APVD, consider using the 'equivalent APVD std' approach\n"
1115 "from 05_Diffusion_Dispersion.ipynb to combine both effects in the faster advection\n"
1116 "module. Note: this variance combination is only valid for continuous (gamma)\n"
1117 "distributions, not for discrete streamline volumes.\n"
1118 "Suppress this warning with suppress_dispersion_warning=True."
1119 )
1120 warnings.warn(msg, UserWarning, stacklevel=2)
1122 # Extend tedges for spin up (same as forward function)
1123 tedges = pd.DatetimeIndex([
1124 tedges[0] - pd.Timedelta("36500D"),
1125 *list(tedges[1:-1]),
1126 tedges[-1] + pd.Timedelta("36500D"),
1127 ])
1129 # Compute the cumulative flow at tedges (extraction times)
1130 extraction_volume = flow * (np.diff(tedges) / pd.Timedelta("1D")) # m3
1131 cumulative_volume_at_cout_tedges = np.concatenate(([0], np.cumsum(extraction_volume)))
1133 # Compute the cumulative flow at cin_tedges (interpolated)
1134 cumulative_volume_at_cin_tedges = np.interp(cin_tedges, tedges, cumulative_volume_at_cout_tedges)
1136 # Compute residence times at cin_tedges using forward direction
1137 # (from infiltration perspective: when will water infiltrated at cin_tedges be extracted?)
1138 rt_edges_2d = residence_time(
1139 flow=flow,
1140 flow_tedges=tedges,
1141 index=cin_tedges,
1142 aquifer_pore_volumes=aquifer_pore_volumes,
1143 retardation_factor=retardation_factor,
1144 direction="infiltration_to_extraction",
1145 )
1147 # Determine valid cin bins immediately (reuse rt_edges_2d, no duplicate computation)
1148 # A cin bin is valid if both its edges have finite residence times across all pore volumes
1149 valid_cin_bins = ~np.any(np.isnan(rt_edges_2d[:, :-1]) | np.isnan(rt_edges_2d[:, 1:]), axis=0)
1151 # Compute mean residence time per (pore_volume, cin_bin) for velocity calculation
1152 # Shape: (n_pore_volumes, n_cin_bins)
1153 rt_mean_2d = residence_time_mean(
1154 flow=flow,
1155 flow_tedges=tedges,
1156 tedges_out=cin_tedges,
1157 aquifer_pore_volumes=aquifer_pore_volumes,
1158 direction="infiltration_to_extraction",
1159 retardation_factor=retardation_factor,
1160 )
1162 # Compute velocity: v = L / tau_mean
1163 # Shape: (n_pore_volumes, n_cin_bins)
1164 # Use explicit validity handling instead of error suppression
1165 valid_rt = np.isfinite(rt_mean_2d) & (rt_mean_2d > 0)
1166 velocity_2d = np.where(valid_rt, streamline_length[:, None] / rt_mean_2d, 0.0)
1168 # Compute D_L = D_m + alpha_L * v
1169 # Shape: (n_pore_volumes, n_cin_bins)
1170 # Fall back to molecular diffusivity only where RT is invalid (spinup period)
1171 diffusivity_2d = np.where(
1172 valid_rt,
1173 molecular_diffusivity[:, None] + longitudinal_dispersivity[:, None] * velocity_2d,
1174 molecular_diffusivity[:, None],
1175 )
1177 # Initialize coefficient matrix accumulator
1178 # Shape: (n_cin_bins, n_cout_bins) - each row represents contributions to one output bin
1179 n_cin_bins = len(cin_tedges) - 1
1180 n_cout_bins = len(cout)
1181 n_cout_edges = len(tedges)
1182 accumulated_coeff = np.zeros((n_cin_bins, n_cout_bins))
1184 # Loop over each pore volume
1185 for i_pv in range(len(aquifer_pore_volumes)):
1186 # Compute extraction times: when water infiltrated at cin_tedges will be extracted
1187 # t_extraction = t_infiltration + residence_time
1188 extraction_times = cin_tedges + pd.to_timedelta(rt_edges_2d[i_pv, :], unit="D")
1190 # Convert to days relative to tedges[0] for computation
1191 cout_tedges_days = ((tedges - tedges[0]) / pd.Timedelta("1D")).values
1192 extraction_times_days = ((extraction_times - tedges[0]) / pd.Timedelta("1D")).values
1194 # Compute volume relationship for the erf x-coordinate:
1195 # delta_volume = volume at infiltration + R*APV - volume extracted at cout_tedge
1196 # This represents how far past the pore volume the concentration front has traveled
1197 delta_volume = (
1198 cumulative_volume_at_cin_tedges[:, None]
1199 + (retardation_factor * aquifer_pore_volumes[i_pv])
1200 - cumulative_volume_at_cout_tedges[None, :]
1201 )
1203 # Determine when extraction has occurred: extraction_time must be >= cout_tedge
1204 isactive = extraction_times_days[:, None] >= cout_tedges_days[None, :]
1205 delta_volume[~isactive] = np.nan
1207 # Convert volume to spatial distance x (erf coordinate)
1208 # x = delta_volume / (R * APV) * L
1209 step_widths = delta_volume / (retardation_factor * aquifer_pore_volumes[i_pv]) * streamline_length[i_pv]
1211 # Compute diffusion time: time from extraction until cout_tedge, limited by residence time
1212 time_active = extraction_times_days[:, None] - cout_tedges_days[None, :]
1213 time_active[~isactive] = np.nan
1214 time_active = np.maximum(time_active, 0) # No negative times
1215 time_active = np.minimum(time_active, rt_edges_2d[[i_pv]].T) # Limited by residence time
1217 # Compute erf response: response[i_cin_bin, j_cout_edge] = mean erf for cin bin i at cout edge j
1218 response = np.zeros((n_cin_bins, n_cout_edges))
1220 # Get diffusivity for this pore volume across all cin bins
1221 diff_pv = diffusivity_2d[i_pv, :] # shape (n_cin_bins,)
1223 for j in range(n_cout_edges):
1224 # For each extraction edge, compute erf response at all infiltration bins
1225 xedges_j = step_widths[:, j] # shape (n_cin_edges,)
1226 tedges_j = time_active[:, j] # shape (n_cin_edges,)
1227 response[:, j] = _erf_mean_space_time(
1228 xedges_j, tedges_j, diff_pv, asymptotic_cutoff_sigma=asymptotic_cutoff_sigma
1229 )
1231 # Convert erf response [-1, 1] to breakthrough fraction [0, 1]
1232 frac = 0.5 * (1 + response) # shape (n_cin_bins, n_cout_edges)
1234 # Coefficient for bin j: coeff[i, j] = frac[i, j] - frac[i, j+1]
1235 # This is the fraction of cout[j] that originated from cin[i]
1236 frac_start = frac[:, :-1]
1237 frac_end = frac[:, 1:]
1239 # Handle NaN: if frac_end is NaN but frac_start is valid, use 0.0 for frac_end
1240 # (the concentration step hasn't fully passed through yet)
1241 frac_end_filled = np.where(np.isnan(frac_end) & ~np.isnan(frac_start), 0.0, frac_end)
1242 coeff = frac_start - frac_end_filled # shape (n_cin_bins, n_cout_bins)
1244 # Replace NaN with 0 BEFORE accumulating to avoid NaN propagation
1245 # (NaN + valid = NaN would lose valid contributions from other pore volumes)
1246 accumulated_coeff += np.nan_to_num(coeff, nan=0.0)
1248 # Average across pore volumes
1249 coeff_matrix = accumulated_coeff / len(aquifer_pore_volumes)
1251 # coeff_matrix is already NaN-free due to nan_to_num in the loop
1252 coeff_matrix_filled = coeff_matrix
1254 # Matrix multiply: cin = W @ cout
1255 cin = coeff_matrix_filled @ cout
1257 # Mark output bins as invalid where no valid contributions exist:
1258 # 1. Sum of coefficients near zero (no cout data covers this time - spinup)
1259 # 2. Output bin extends beyond input data range (from valid_cin_bins)
1260 total_coeff = np.sum(coeff_matrix_filled, axis=1)
1261 no_valid_contribution = (total_coeff < EPSILON_COEFF_SUM) | ~valid_cin_bins
1262 cin[no_valid_contribution] = np.nan
1264 return cin