Coverage for src / gwtransport / diffusion_fast.py: 84%
91 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
1"""
2Fast Diffusive Transport Corrections via Gaussian Smoothing.
4This module provides a computationally efficient approximation of diffusion/dispersion
5using Gaussian smoothing. It is much faster than :mod:`gwtransport.diffusion` but
6less physically accurate, especially under variable flow conditions.
8Both ``diffusion_fast`` and :mod:`gwtransport.diffusion` add microdispersion and
9molecular diffusion on top of macrodispersion captured by the APVD.
11**When to use diffusion_fast vs diffusion:**
13- Use ``diffusion_fast`` when: Speed is critical, flow and time steps are relatively
14 constant, or you need real-time processing
15- Use ``diffusion`` when: Physical accuracy is critical, flow varies significantly,
16 or you're analyzing periods with changing conditions
18See :ref:`concept-dispersion` for background on macrodispersion and microdispersion.
20This module implements diffusion/dispersion processes that modify advective transport
21in aquifer systems. Diffusion causes spreading and smoothing of concentration or
22temperature fronts as they travel through the aquifer. While advection moves compounds
23with water flow, diffusion causes spreading due to molecular diffusion, mechanical
24dispersion, and thermal diffusion (for temperature).
26Limitation: This fast approximation works best when flow and tedges are relatively
27constant. The underlying assumption is that dx (spatial step between cells) remains
28approximately constant, which holds for steady flow but breaks down under highly
29variable conditions. For scenarios with significant flow variability, consider using
30:mod:`gwtransport.diffusion` instead.
32Available functions:
34- :func:`infiltration_to_extraction` - Apply diffusion during infiltration to extraction
35 transport. Combines advection (via residence time) with diffusion (via Gaussian smoothing).
36 Computes position-dependent diffusion based on local residence time and returns concentration
37 or temperature in extracted water.
39- :func:`extraction_to_infiltration` - NOT YET IMPLEMENTED. Inverse diffusion is numerically
40 unstable and requires regularization techniques. Placeholder for future implementation.
42- :func:`compute_scaled_sigma_array` - Calculate position-dependent diffusion parameters. Computes
43 standard deviation (sigma) for Gaussian smoothing at each time step based on residence time,
44 diffusivity, and spatial discretization: sigma = sqrt(2 * diffusivity * residence_time) / dx.
46- :func:`convolve_diffusion` - Apply variable-sigma Gaussian filtering. Extends
47 scipy.ndimage.gaussian_filter1d to position-dependent sigma using sparse matrix representation
48 for efficiency. Handles boundary conditions via nearest-neighbor extrapolation.
50- :func:`deconvolve_diffusion` - NOT YET IMPLEMENTED. Inverse filtering placeholder for future
51 diffusion deconvolution with required regularization for stability.
53- :func:`create_example_data` - Generate test data for demonstrating diffusion effects with
54 signals having varying time steps and corresponding sigma arrays. Useful for testing and
55 validation.
57This file is part of gwtransport which is released under AGPL-3.0 license.
58See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
59"""
61import matplotlib.pyplot as plt
62import numpy as np
63import numpy.typing as npt
64import pandas as pd
65from scipy import ndimage, sparse
67from gwtransport.residence_time import residence_time
70def infiltration_to_extraction(
71 *,
72 cin: npt.ArrayLike,
73 flow: npt.ArrayLike,
74 tedges: pd.DatetimeIndex,
75 aquifer_pore_volume: float,
76 diffusivity: float = 0.1,
77 retardation_factor: float = 1.0,
78 aquifer_length: float = 80.0,
79) -> npt.NDArray[np.floating]:
80 """Compute the diffusion of a compound during 1D transport in the aquifer.
82 This function represents infiltration to extraction modeling (equivalent to convolution).
83 It provides a fast approximation using Gaussian smoothing. The approximation is accurate
84 when flow and tedges are relatively constant. Under variable flow conditions, errors
85 increase but mass balance is preserved.
87 For physically rigorous solutions that handle variable flow correctly, use
88 :func:`gwtransport.diffusion.infiltration_to_extraction` instead. That function is
89 slower but provides analytical solutions to the advection-dispersion equation.
91 Parameters
92 ----------
93 cin : array-like
94 Concentration or temperature of the compound in the infiltrating water [ng/m3].
95 flow : array-like
96 Flow rate of water in the aquifer [m3/day].
97 tedges : pandas.DatetimeIndex
98 Time edges corresponding to the flow values.
99 aquifer_pore_volume : float
100 Pore volume of the aquifer [m3].
101 diffusivity : float, optional
102 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
103 retardation_factor : float, optional
104 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
105 aquifer_length : float, optional
106 Length of the aquifer [m]. Default is 80.0.
107 porosity : float, optional
108 Porosity of the aquifer [dimensionless]. Default is 0.35.
110 Returns
111 -------
112 numpy.ndarray
113 Concentration of the compound in the extracted water [ng/m3].
115 See Also
116 --------
117 gwtransport.diffusion.infiltration_to_extraction : Physically rigorous analytical solution (slower)
118 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion
120 Notes
121 -----
122 Common values for heat in saturated sand in m²/day:
124 * Lower end (finer sand/silt): ~0.007-0.01 m²/day
125 * Typical saturated sand: ~0.01-0.05 m²/day
126 * Upper end (coarse sand/gravel): ~0.05-0.10 m²/day
127 """
128 cin = np.asarray(cin)
129 flow = np.asarray(flow)
131 # Validate physical parameters
132 if aquifer_pore_volume <= 0:
133 msg = f"Aquifer pore volume must be positive, got {aquifer_pore_volume}"
134 raise ValueError(msg)
135 if aquifer_length <= 0:
136 msg = f"Aquifer length must be positive, got {aquifer_length}"
137 raise ValueError(msg)
138 if diffusivity < 0:
139 msg = f"Diffusivity must be non-negative, got {diffusivity}"
140 raise ValueError(msg)
142 sigma_array = compute_scaled_sigma_array(
143 flow=flow,
144 tedges=tedges,
145 aquifer_pore_volume=aquifer_pore_volume,
146 diffusivity=diffusivity,
147 retardation_factor=retardation_factor,
148 aquifer_length=aquifer_length,
149 )
150 return convolve_diffusion(input_signal=cin, sigma_array=sigma_array, truncate=30.0)
153def extraction_to_infiltration(
154 *,
155 cout: npt.ArrayLike,
156 flow: npt.ArrayLike,
157 aquifer_pore_volume: float,
158 diffusivity: float = 0.1,
159 retardation_factor: float = 1.0,
160 aquifer_length: float = 80.0,
161 porosity: float = 0.35,
162) -> npt.NDArray[np.floating]:
163 """Compute the reverse diffusion of a compound during 1D transport in the aquifer.
165 This function represents extraction to infiltration modeling (equivalent to deconvolution).
167 Parameters
168 ----------
169 cout : array-like
170 Concentration or temperature of the compound in the extracted water [ng/m3].
171 flow : array-like
172 Flow rate of water in the aquifer [m3/day].
173 aquifer_pore_volume : float
174 Pore volume of the aquifer [m3].
175 diffusivity : float, optional
176 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
177 retardation_factor : float, optional
178 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
179 aquifer_length : float, optional
180 Length of the aquifer [m]. Default is 80.0.
181 porosity : float, optional
182 Porosity of the aquifer [dimensionless]. Default is 0.35.
184 Returns
185 -------
186 numpy.ndarray
187 Concentration of the compound in the infiltrating water [ng/m3].
189 See Also
190 --------
191 gwtransport.diffusion.extraction_to_infiltration : Analytically correct deconvolution
193 Notes
194 -----
195 Extraction to infiltration diffusion (deconvolution) is mathematically ill-posed and requires
196 regularization to obtain a stable solution.
197 """
198 msg = "Extraction to infiltration diffusion (deconvolution) is not implemented yet"
199 raise NotImplementedError(msg)
202def compute_diffusive_spreading_length(
203 *,
204 flow: npt.ArrayLike,
205 tedges: pd.DatetimeIndex,
206 aquifer_pore_volume: float,
207 diffusivity: float = 0.1,
208 retardation_factor: float = 1.0,
209) -> npt.NDArray[np.floating]:
210 """Compute the physical diffusive spreading length for each time step.
212 The diffusive spreading length L_diff = sqrt(2 * D * rt) [m] represents the
213 physical distance over which concentrations spread due to diffusion during
214 the residence time rt.
216 Parameters
217 ----------
218 flow : array-like
219 Flow rate of water in the aquifer [m3/day].
220 tedges : pandas.DatetimeIndex
221 Time edges corresponding to the flow values.
222 aquifer_pore_volume : float
223 Pore volume of the aquifer [m3].
224 diffusivity : float, optional
225 Diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
226 retardation_factor : float, optional
227 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
229 Returns
230 -------
231 numpy.ndarray
232 Array of diffusive spreading lengths [m].
233 Each value corresponds to a time step in the input flow series.
234 """
235 rt_array = residence_time(
236 flow=flow,
237 flow_tedges=tedges,
238 aquifer_pore_volumes=aquifer_pore_volume,
239 retardation_factor=retardation_factor,
240 direction="infiltration_to_extraction",
241 )[0] # Extract first pore volume
243 # Interpolate NaN values using linear interpolation with nearest extrapolation
244 valid_mask = ~np.isnan(rt_array)
245 if np.any(valid_mask):
246 rt_array = np.interp(np.arange(len(rt_array)), np.where(valid_mask)[0], rt_array[valid_mask])
248 # Diffusive spreading length [m]: how far concentrations spread physically
249 return np.sqrt(2 * diffusivity * rt_array)
252def compute_scaled_sigma_array(
253 *,
254 flow: npt.ArrayLike,
255 tedges: pd.DatetimeIndex,
256 aquifer_pore_volume: float,
257 diffusivity: float = 0.1,
258 retardation_factor: float = 1.0,
259 aquifer_length: float = 80.0,
260) -> npt.NDArray[np.floating]:
261 """Compute scaled sigma values for diffusion based on flow and aquifer properties.
263 Sigma represents the dimensionless spreading parameter for Gaussian filtering,
264 expressed in units of array indices (time steps). It determines how many
265 neighboring time steps are blended together when applying diffusive smoothing.
267 The computation follows these steps:
269 1. Calculate residence time (rt) for water parcels traveling through the aquifer
270 2. Compute the diffusive spreading length: L_diff = sqrt(2 * D * rt) [m].
271 This is the physical distance over which concentrations spread due to diffusion.
272 3. Compute the advective step size: dx = (Q * dt / V_pore) * L_aquifer [m].
273 This is the physical distance the water moves during one time step.
274 4. Sigma = L_diff / dx converts the physical spreading into array index units.
276 Why divide by dx? The Gaussian filter operates on array indices, not physical
277 distances. If the diffusive spreading is 10 meters and each time step moves
278 water 2 meters, then sigma = 10/2 = 5 means the filter should blend across
279 ~5 time steps. This normalization accounts for variable flow rates: faster
280 flow means larger dx, so fewer time steps are blended (smaller sigma), even
281 though the physical spreading remains the same.
283 Parameters
284 ----------
285 flow : array-like
286 Flow rate of water in the aquifer [m3/day].
287 tedges : pandas.DatetimeIndex
288 Time edges corresponding to the flow values.
289 aquifer_pore_volume : float
290 Pore volume of the aquifer [m3].
291 diffusivity : float, optional
292 Diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
293 retardation_factor : float, optional
294 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
295 aquifer_length : float, optional
296 Length of the aquifer [m]. Default is 80.0.
298 Returns
299 -------
300 numpy.ndarray
301 Array of sigma values (in units of array indices), clipped to range [0, 100].
302 Each value corresponds to a time step in the input flow series.
304 See Also
305 --------
306 gwtransport.diffusion.infiltration_to_extraction : For analytical solutions without this approximation
307 """
308 # Diffusive spreading length [m]: how far concentrations spread physically
309 diffusive_spreading_length = compute_diffusive_spreading_length(
310 flow=flow,
311 tedges=tedges,
312 aquifer_pore_volume=aquifer_pore_volume,
313 diffusivity=diffusivity,
314 retardation_factor=retardation_factor,
315 )
317 # Advective step size [m]: how far water moves during one time step
318 timedelta_at_departure = np.diff(tedges) / pd.to_timedelta(1, unit="D")
319 volume_infiltrated_at_departure = flow * timedelta_at_departure
320 dx = volume_infiltrated_at_departure / aquifer_pore_volume * aquifer_length
322 # Sigma in array index units: number of time steps to blend
323 sigma_array = diffusive_spreading_length / dx
324 return np.clip(sigma_array, 0.0, 100.0)
327def convolve_diffusion(
328 *, input_signal: npt.ArrayLike, sigma_array: npt.ArrayLike, truncate: float = 4.0
329) -> npt.NDArray[np.floating]:
330 """Apply Gaussian filter with position-dependent sigma values.
332 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
333 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
334 It implements the filter using a sparse convolution matrix where each row
335 represents a Gaussian kernel with a locally-appropriate standard deviation.
337 Parameters
338 ----------
339 input_signal : numpy.ndarray
340 One-dimensional input array to be filtered.
341 sigma_array : numpy.ndarray
342 One-dimensional array of standard deviation values, must have same length
343 as input_signal. Each value specifies the Gaussian kernel width at the
344 corresponding position.
345 truncate : float, optional
346 Truncate the filter at this many standard deviations.
347 Default is 4.0.
349 Returns
350 -------
351 numpy.ndarray
352 The filtered input signal. Has the same shape as input_signal.
354 Notes
355 -----
356 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest`
357 in `scipy.ndimage.gaussian_filter1d`.
359 The function constructs a sparse convolution matrix where each row represents
360 a position-specific Gaussian kernel. The kernel width adapts to local sigma
361 values, making it suitable for problems with varying diffusivitys
362 or time steps.
364 For diffusion problems, the local sigma values can be calculated as:
365 sigma = sqrt(2 * diffusivity * dt) / dx
366 where diffusivity is the diffusivity, dt is the time step, and dx is the
367 spatial step size.
369 The implementation uses sparse matrices for memory efficiency when dealing
370 with large signals or when sigma values vary significantly.
372 See Also
373 --------
374 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering
375 scipy.sparse : Sparse matrix implementations
377 Examples
378 --------
379 >>> import numpy as np
380 >>> from gwtransport.diffusion_fast import convolve_diffusion
381 >>> # Create a sample signal
382 >>> x = np.linspace(0, 10, 1000)
383 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5)
385 >>> # Create position-dependent sigma values
386 >>> diffusivity = 0.1 # diffusivity
387 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps
388 >>> dx = x[1] - x[0]
389 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx
391 >>> # Apply the filter
392 >>> filtered = convolve_diffusion(input_signal=signal, sigma_array=sigma_array)
393 """
394 # Convert to arrays for type safety
395 input_signal = np.asarray(input_signal)
396 sigma_array = np.asarray(sigma_array)
398 if len(input_signal) != len(sigma_array):
399 msg = "Input signal and sigma array must have the same length"
400 raise ValueError(msg)
402 n = len(input_signal)
404 # Handle zero sigma values
405 zero_mask = sigma_array == 0
406 if np.all(zero_mask):
407 return input_signal.copy()
409 # Get maximum kernel size and create position arrays
410 max_sigma = np.max(sigma_array)
411 max_radius = int(truncate * max_sigma + 0.5)
413 # Create arrays for all possible kernel positions
414 positions = np.arange(-max_radius, max_radius + 1)
416 # Create a mask for valid sigma values
417 valid_sigma = ~zero_mask
418 valid_indices = np.where(valid_sigma)[0]
420 # Create position matrices for broadcasting
421 # Shape: (n_valid_points, 1)
422 center_positions = valid_indices[:, np.newaxis]
423 # Shape: (1, max_kernel_size)
424 kernel_positions = positions[np.newaxis, :]
426 # Calculate the relative positions for each point
427 # This creates a matrix of shape (n_valid_points, max_kernel_size)
428 relative_positions = kernel_positions
430 # Calculate Gaussian weights for all positions at once
431 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size)
432 sigmas = sigma_array[valid_sigma][:, np.newaxis]
433 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2)
435 # Normalize each kernel
436 weights /= np.sum(weights, axis=1)[:, np.newaxis]
438 # Calculate absolute positions in the signal
439 absolute_positions = center_positions + relative_positions
441 # Handle boundary conditions
442 absolute_positions = np.clip(absolute_positions, 0, n - 1)
444 # Create coordinate arrays for sparse matrix
445 rows = np.repeat(center_positions, weights.shape[1])
446 cols = absolute_positions.ravel()
447 data = weights.ravel()
449 # Remove zero weights to save memory
450 nonzero_mask = data != 0
451 rows = rows[nonzero_mask]
452 cols = cols[nonzero_mask]
453 data = data[nonzero_mask]
455 # Add identity matrix elements for zero-sigma positions
456 if np.any(zero_mask):
457 zero_indices = np.where(zero_mask)[0]
458 rows = np.concatenate([rows, zero_indices])
459 cols = np.concatenate([cols, zero_indices])
460 data = np.concatenate([data, np.ones(len(zero_indices))])
462 # Create the sparse matrix
463 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n))
465 # remove diffusion from signal with inverse of the convolution matrix
466 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0]
468 # Apply the filter
469 return conv_matrix.dot(input_signal)
472def deconvolve_diffusion(
473 *, output_signal: npt.ArrayLike, sigma_array: npt.ArrayLike, truncate: float = 4.0
474) -> npt.NDArray[np.floating]:
475 """Apply Gaussian deconvolution with position-dependent sigma values.
477 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
478 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
479 It implements the filter using a sparse convolution matrix where each row
480 represents a Gaussian kernel with a locally-appropriate standard deviation.
482 Parameters
483 ----------
484 output_signal : numpy.ndarray
485 One-dimensional input array to be filtered.
486 sigma_array : numpy.ndarray
487 One-dimensional array of standard deviation values, must have same length
488 as output_signal. Each value specifies the Gaussian kernel width at the
489 corresponding position.
490 truncate : float, optional
491 Truncate the filter at this many standard deviations.
492 Default is 4.0.
494 Returns
495 -------
496 numpy.ndarray
497 The filtered output signal. Has the same shape as output_signal.
498 """
499 msg = "Deconvolution is not implemented yet"
500 raise NotImplementedError(msg)
503def create_example_data(
504 *, nx: int = 1000, domain_length: float = 10.0, diffusivity: float = 0.1
505) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating], npt.NDArray[np.floating], npt.NDArray[np.floating]]:
506 """Create example data for demonstrating variable-sigma diffusion.
508 Parameters
509 ----------
510 nx : int, optional
511 Number of spatial points. Default is 1000.
512 domain_length : float, optional
513 Domain length. Default is 10.0.
514 diffusivity : float, optional
515 diffusivity. Default is 0.1.
517 Returns
518 -------
519 x : numpy.ndarray
520 Spatial coordinates.
521 signal : numpy.ndarray
522 Initial signal (sum of two Gaussians).
523 sigma_array : numpy.ndarray
524 Array of sigma values varying in space.
525 dt : numpy.ndarray
526 Array of time steps varying in space.
528 Notes
529 -----
530 This function creates a test case with:
532 - A signal composed of two Gaussian peaks
533 - Sinusoidally varying time steps
534 - Corresponding sigma values for diffusion
535 """
536 # Create spatial grid
537 x = np.linspace(0, domain_length, nx)
538 dx = x[1] - x[0]
540 # Create initial signal (two Gaussians)
541 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx)
543 # Create varying time steps
544 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length))
546 # Calculate corresponding sigma values
547 sigma_array = np.sqrt(2 * diffusivity * dt) / dx
549 return x, signal, sigma_array, dt
552if __name__ == "__main__":
553 from matplotlib import pyplot as plt
555 # Generate example data
556 x, signal, sigma_array, dt = create_example_data()
558 # Apply variable-sigma filtering
559 filtered = convolve_diffusion(input_signal=signal, sigma_array=sigma_array * 5)
561 # Compare with regular Gaussian filter
562 avg_sigma = np.mean(sigma_array)
563 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma)
564 plt.figure(figsize=(10, 6))
565 plt.plot(x, signal, label="Original signal", lw=0.8)
566 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0)
568 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")