Coverage for src / gwtransport / diffusion_fast.py: 84%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-04 18:51 +0000

1""" 

2Fast Diffusive Transport Corrections via Gaussian Smoothing. 

3 

4This module provides a computationally efficient approximation of diffusion/dispersion 

5using Gaussian smoothing. It is much faster than :mod:`gwtransport.diffusion` but 

6less physically accurate, especially under variable flow conditions. 

7 

8Both ``diffusion_fast`` and :mod:`gwtransport.diffusion` add microdispersion and 

9molecular diffusion on top of macrodispersion captured by the APVD. 

10 

11**When to use diffusion_fast vs diffusion:** 

12 

13- Use ``diffusion_fast`` when: Speed is critical, flow and time steps are relatively 

14 constant, or you need real-time processing 

15- Use ``diffusion`` when: Physical accuracy is critical, flow varies significantly, 

16 or you're analyzing periods with changing conditions 

17 

18See :ref:`concept-dispersion` for background on macrodispersion and microdispersion. 

19 

20This module implements diffusion/dispersion processes that modify advective transport 

21in aquifer systems. Diffusion causes spreading and smoothing of concentration or 

22temperature fronts as they travel through the aquifer. While advection moves compounds 

23with water flow, diffusion causes spreading due to molecular diffusion, mechanical 

24dispersion, and thermal diffusion (for temperature). 

25 

26Limitation: This fast approximation works best when flow and tedges are relatively 

27constant. The underlying assumption is that dx (spatial step between cells) remains 

28approximately constant, which holds for steady flow but breaks down under highly 

29variable conditions. For scenarios with significant flow variability, consider using 

30:mod:`gwtransport.diffusion` instead. 

31 

32Available functions: 

33 

34- :func:`infiltration_to_extraction` - Apply diffusion during infiltration to extraction 

35 transport. Combines advection (via residence time) with diffusion (via Gaussian smoothing). 

36 Computes position-dependent diffusion based on local residence time and returns concentration 

37 or temperature in extracted water. 

38 

39- :func:`extraction_to_infiltration` - NOT YET IMPLEMENTED. Inverse diffusion is numerically 

40 unstable and requires regularization techniques. Placeholder for future implementation. 

41 

42- :func:`compute_scaled_sigma_array` - Calculate position-dependent diffusion parameters. Computes 

43 standard deviation (sigma) for Gaussian smoothing at each time step based on residence time, 

44 diffusivity, and spatial discretization: sigma = sqrt(2 * diffusivity * residence_time) / dx. 

45 

46- :func:`convolve_diffusion` - Apply variable-sigma Gaussian filtering. Extends 

47 scipy.ndimage.gaussian_filter1d to position-dependent sigma using sparse matrix representation 

48 for efficiency. Handles boundary conditions via nearest-neighbor extrapolation. 

49 

50- :func:`deconvolve_diffusion` - NOT YET IMPLEMENTED. Inverse filtering placeholder for future 

51 diffusion deconvolution with required regularization for stability. 

52 

53- :func:`create_example_data` - Generate test data for demonstrating diffusion effects with 

54 signals having varying time steps and corresponding sigma arrays. Useful for testing and 

55 validation. 

56 

57This file is part of gwtransport which is released under AGPL-3.0 license. 

58See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

59""" 

60 

61import matplotlib.pyplot as plt 

62import numpy as np 

63import numpy.typing as npt 

64import pandas as pd 

65from scipy import ndimage, sparse 

66 

67from gwtransport.residence_time import residence_time 

68 

69 

70def infiltration_to_extraction( 

71 *, 

72 cin: npt.ArrayLike, 

73 flow: npt.ArrayLike, 

74 tedges: pd.DatetimeIndex, 

75 aquifer_pore_volume: float, 

76 diffusivity: float = 0.1, 

77 retardation_factor: float = 1.0, 

78 aquifer_length: float = 80.0, 

79) -> npt.NDArray[np.floating]: 

80 """Compute the diffusion of a compound during 1D transport in the aquifer. 

81 

82 This function represents infiltration to extraction modeling (equivalent to convolution). 

83 It provides a fast approximation using Gaussian smoothing. The approximation is accurate 

84 when flow and tedges are relatively constant. Under variable flow conditions, errors 

85 increase but mass balance is preserved. 

86 

87 For physically rigorous solutions that handle variable flow correctly, use 

88 :func:`gwtransport.diffusion.infiltration_to_extraction` instead. That function is 

89 slower but provides analytical solutions to the advection-dispersion equation. 

90 

91 Parameters 

92 ---------- 

93 cin : array-like 

94 Concentration or temperature of the compound in the infiltrating water [ng/m3]. 

95 flow : array-like 

96 Flow rate of water in the aquifer [m3/day]. 

97 tedges : pandas.DatetimeIndex 

98 Time edges corresponding to the flow values. 

99 aquifer_pore_volume : float 

100 Pore volume of the aquifer [m3]. 

101 diffusivity : float, optional 

102 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

103 retardation_factor : float, optional 

104 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

105 aquifer_length : float, optional 

106 Length of the aquifer [m]. Default is 80.0. 

107 porosity : float, optional 

108 Porosity of the aquifer [dimensionless]. Default is 0.35. 

109 

110 Returns 

111 ------- 

112 numpy.ndarray 

113 Concentration of the compound in the extracted water [ng/m3]. 

114 

115 See Also 

116 -------- 

117 gwtransport.diffusion.infiltration_to_extraction : Physically rigorous analytical solution (slower) 

118 :ref:`concept-dispersion-scales` : Macrodispersion vs microdispersion 

119 

120 Notes 

121 ----- 

122 Common values for heat in saturated sand in m²/day: 

123 

124 * Lower end (finer sand/silt): ~0.007-0.01 m²/day 

125 * Typical saturated sand: ~0.01-0.05 m²/day 

126 * Upper end (coarse sand/gravel): ~0.05-0.10 m²/day 

127 """ 

128 cin = np.asarray(cin) 

129 flow = np.asarray(flow) 

130 

131 # Validate physical parameters 

132 if aquifer_pore_volume <= 0: 

133 msg = f"Aquifer pore volume must be positive, got {aquifer_pore_volume}" 

134 raise ValueError(msg) 

135 if aquifer_length <= 0: 

136 msg = f"Aquifer length must be positive, got {aquifer_length}" 

137 raise ValueError(msg) 

138 if diffusivity < 0: 

139 msg = f"Diffusivity must be non-negative, got {diffusivity}" 

140 raise ValueError(msg) 

141 

142 sigma_array = compute_scaled_sigma_array( 

143 flow=flow, 

144 tedges=tedges, 

145 aquifer_pore_volume=aquifer_pore_volume, 

146 diffusivity=diffusivity, 

147 retardation_factor=retardation_factor, 

148 aquifer_length=aquifer_length, 

149 ) 

150 return convolve_diffusion(input_signal=cin, sigma_array=sigma_array, truncate=30.0) 

151 

152 

153def extraction_to_infiltration( 

154 *, 

155 cout: npt.ArrayLike, 

156 flow: npt.ArrayLike, 

157 aquifer_pore_volume: float, 

158 diffusivity: float = 0.1, 

159 retardation_factor: float = 1.0, 

160 aquifer_length: float = 80.0, 

161 porosity: float = 0.35, 

162) -> npt.NDArray[np.floating]: 

163 """Compute the reverse diffusion of a compound during 1D transport in the aquifer. 

164 

165 This function represents extraction to infiltration modeling (equivalent to deconvolution). 

166 

167 Parameters 

168 ---------- 

169 cout : array-like 

170 Concentration or temperature of the compound in the extracted water [ng/m3]. 

171 flow : array-like 

172 Flow rate of water in the aquifer [m3/day]. 

173 aquifer_pore_volume : float 

174 Pore volume of the aquifer [m3]. 

175 diffusivity : float, optional 

176 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

177 retardation_factor : float, optional 

178 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

179 aquifer_length : float, optional 

180 Length of the aquifer [m]. Default is 80.0. 

181 porosity : float, optional 

182 Porosity of the aquifer [dimensionless]. Default is 0.35. 

183 

184 Returns 

185 ------- 

186 numpy.ndarray 

187 Concentration of the compound in the infiltrating water [ng/m3]. 

188 

189 See Also 

190 -------- 

191 gwtransport.diffusion.extraction_to_infiltration : Analytically correct deconvolution 

192 

193 Notes 

194 ----- 

195 Extraction to infiltration diffusion (deconvolution) is mathematically ill-posed and requires 

196 regularization to obtain a stable solution. 

197 """ 

198 msg = "Extraction to infiltration diffusion (deconvolution) is not implemented yet" 

199 raise NotImplementedError(msg) 

200 

201 

202def compute_diffusive_spreading_length( 

203 *, 

204 flow: npt.ArrayLike, 

205 tedges: pd.DatetimeIndex, 

206 aquifer_pore_volume: float, 

207 diffusivity: float = 0.1, 

208 retardation_factor: float = 1.0, 

209) -> npt.NDArray[np.floating]: 

210 """Compute the physical diffusive spreading length for each time step. 

211 

212 The diffusive spreading length L_diff = sqrt(2 * D * rt) [m] represents the 

213 physical distance over which concentrations spread due to diffusion during 

214 the residence time rt. 

215 

216 Parameters 

217 ---------- 

218 flow : array-like 

219 Flow rate of water in the aquifer [m3/day]. 

220 tedges : pandas.DatetimeIndex 

221 Time edges corresponding to the flow values. 

222 aquifer_pore_volume : float 

223 Pore volume of the aquifer [m3]. 

224 diffusivity : float, optional 

225 Diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

226 retardation_factor : float, optional 

227 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

228 

229 Returns 

230 ------- 

231 numpy.ndarray 

232 Array of diffusive spreading lengths [m]. 

233 Each value corresponds to a time step in the input flow series. 

234 """ 

235 rt_array = residence_time( 

236 flow=flow, 

237 flow_tedges=tedges, 

238 aquifer_pore_volumes=aquifer_pore_volume, 

239 retardation_factor=retardation_factor, 

240 direction="infiltration_to_extraction", 

241 )[0] # Extract first pore volume 

242 

243 # Interpolate NaN values using linear interpolation with nearest extrapolation 

244 valid_mask = ~np.isnan(rt_array) 

245 if np.any(valid_mask): 

246 rt_array = np.interp(np.arange(len(rt_array)), np.where(valid_mask)[0], rt_array[valid_mask]) 

247 

248 # Diffusive spreading length [m]: how far concentrations spread physically 

249 return np.sqrt(2 * diffusivity * rt_array) 

250 

251 

252def compute_scaled_sigma_array( 

253 *, 

254 flow: npt.ArrayLike, 

255 tedges: pd.DatetimeIndex, 

256 aquifer_pore_volume: float, 

257 diffusivity: float = 0.1, 

258 retardation_factor: float = 1.0, 

259 aquifer_length: float = 80.0, 

260) -> npt.NDArray[np.floating]: 

261 """Compute scaled sigma values for diffusion based on flow and aquifer properties. 

262 

263 Sigma represents the dimensionless spreading parameter for Gaussian filtering, 

264 expressed in units of array indices (time steps). It determines how many 

265 neighboring time steps are blended together when applying diffusive smoothing. 

266 

267 The computation follows these steps: 

268 

269 1. Calculate residence time (rt) for water parcels traveling through the aquifer 

270 2. Compute the diffusive spreading length: L_diff = sqrt(2 * D * rt) [m]. 

271 This is the physical distance over which concentrations spread due to diffusion. 

272 3. Compute the advective step size: dx = (Q * dt / V_pore) * L_aquifer [m]. 

273 This is the physical distance the water moves during one time step. 

274 4. Sigma = L_diff / dx converts the physical spreading into array index units. 

275 

276 Why divide by dx? The Gaussian filter operates on array indices, not physical 

277 distances. If the diffusive spreading is 10 meters and each time step moves 

278 water 2 meters, then sigma = 10/2 = 5 means the filter should blend across 

279 ~5 time steps. This normalization accounts for variable flow rates: faster 

280 flow means larger dx, so fewer time steps are blended (smaller sigma), even 

281 though the physical spreading remains the same. 

282 

283 Parameters 

284 ---------- 

285 flow : array-like 

286 Flow rate of water in the aquifer [m3/day]. 

287 tedges : pandas.DatetimeIndex 

288 Time edges corresponding to the flow values. 

289 aquifer_pore_volume : float 

290 Pore volume of the aquifer [m3]. 

291 diffusivity : float, optional 

292 Diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

293 retardation_factor : float, optional 

294 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

295 aquifer_length : float, optional 

296 Length of the aquifer [m]. Default is 80.0. 

297 

298 Returns 

299 ------- 

300 numpy.ndarray 

301 Array of sigma values (in units of array indices), clipped to range [0, 100]. 

302 Each value corresponds to a time step in the input flow series. 

303 

304 See Also 

305 -------- 

306 gwtransport.diffusion.infiltration_to_extraction : For analytical solutions without this approximation 

307 """ 

308 # Diffusive spreading length [m]: how far concentrations spread physically 

309 diffusive_spreading_length = compute_diffusive_spreading_length( 

310 flow=flow, 

311 tedges=tedges, 

312 aquifer_pore_volume=aquifer_pore_volume, 

313 diffusivity=diffusivity, 

314 retardation_factor=retardation_factor, 

315 ) 

316 

317 # Advective step size [m]: how far water moves during one time step 

318 timedelta_at_departure = np.diff(tedges) / pd.to_timedelta(1, unit="D") 

319 volume_infiltrated_at_departure = flow * timedelta_at_departure 

320 dx = volume_infiltrated_at_departure / aquifer_pore_volume * aquifer_length 

321 

322 # Sigma in array index units: number of time steps to blend 

323 sigma_array = diffusive_spreading_length / dx 

324 return np.clip(sigma_array, 0.0, 100.0) 

325 

326 

327def convolve_diffusion( 

328 *, input_signal: npt.ArrayLike, sigma_array: npt.ArrayLike, truncate: float = 4.0 

329) -> npt.NDArray[np.floating]: 

330 """Apply Gaussian filter with position-dependent sigma values. 

331 

332 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

333 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

334 It implements the filter using a sparse convolution matrix where each row 

335 represents a Gaussian kernel with a locally-appropriate standard deviation. 

336 

337 Parameters 

338 ---------- 

339 input_signal : numpy.ndarray 

340 One-dimensional input array to be filtered. 

341 sigma_array : numpy.ndarray 

342 One-dimensional array of standard deviation values, must have same length 

343 as input_signal. Each value specifies the Gaussian kernel width at the 

344 corresponding position. 

345 truncate : float, optional 

346 Truncate the filter at this many standard deviations. 

347 Default is 4.0. 

348 

349 Returns 

350 ------- 

351 numpy.ndarray 

352 The filtered input signal. Has the same shape as input_signal. 

353 

354 Notes 

355 ----- 

356 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest` 

357 in `scipy.ndimage.gaussian_filter1d`. 

358 

359 The function constructs a sparse convolution matrix where each row represents 

360 a position-specific Gaussian kernel. The kernel width adapts to local sigma 

361 values, making it suitable for problems with varying diffusivitys 

362 or time steps. 

363 

364 For diffusion problems, the local sigma values can be calculated as: 

365 sigma = sqrt(2 * diffusivity * dt) / dx 

366 where diffusivity is the diffusivity, dt is the time step, and dx is the 

367 spatial step size. 

368 

369 The implementation uses sparse matrices for memory efficiency when dealing 

370 with large signals or when sigma values vary significantly. 

371 

372 See Also 

373 -------- 

374 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering 

375 scipy.sparse : Sparse matrix implementations 

376 

377 Examples 

378 -------- 

379 >>> import numpy as np 

380 >>> from gwtransport.diffusion_fast import convolve_diffusion 

381 >>> # Create a sample signal 

382 >>> x = np.linspace(0, 10, 1000) 

383 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) 

384 

385 >>> # Create position-dependent sigma values 

386 >>> diffusivity = 0.1 # diffusivity 

387 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps 

388 >>> dx = x[1] - x[0] 

389 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

390 

391 >>> # Apply the filter 

392 >>> filtered = convolve_diffusion(input_signal=signal, sigma_array=sigma_array) 

393 """ 

394 # Convert to arrays for type safety 

395 input_signal = np.asarray(input_signal) 

396 sigma_array = np.asarray(sigma_array) 

397 

398 if len(input_signal) != len(sigma_array): 

399 msg = "Input signal and sigma array must have the same length" 

400 raise ValueError(msg) 

401 

402 n = len(input_signal) 

403 

404 # Handle zero sigma values 

405 zero_mask = sigma_array == 0 

406 if np.all(zero_mask): 

407 return input_signal.copy() 

408 

409 # Get maximum kernel size and create position arrays 

410 max_sigma = np.max(sigma_array) 

411 max_radius = int(truncate * max_sigma + 0.5) 

412 

413 # Create arrays for all possible kernel positions 

414 positions = np.arange(-max_radius, max_radius + 1) 

415 

416 # Create a mask for valid sigma values 

417 valid_sigma = ~zero_mask 

418 valid_indices = np.where(valid_sigma)[0] 

419 

420 # Create position matrices for broadcasting 

421 # Shape: (n_valid_points, 1) 

422 center_positions = valid_indices[:, np.newaxis] 

423 # Shape: (1, max_kernel_size) 

424 kernel_positions = positions[np.newaxis, :] 

425 

426 # Calculate the relative positions for each point 

427 # This creates a matrix of shape (n_valid_points, max_kernel_size) 

428 relative_positions = kernel_positions 

429 

430 # Calculate Gaussian weights for all positions at once 

431 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size) 

432 sigmas = sigma_array[valid_sigma][:, np.newaxis] 

433 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2) 

434 

435 # Normalize each kernel 

436 weights /= np.sum(weights, axis=1)[:, np.newaxis] 

437 

438 # Calculate absolute positions in the signal 

439 absolute_positions = center_positions + relative_positions 

440 

441 # Handle boundary conditions 

442 absolute_positions = np.clip(absolute_positions, 0, n - 1) 

443 

444 # Create coordinate arrays for sparse matrix 

445 rows = np.repeat(center_positions, weights.shape[1]) 

446 cols = absolute_positions.ravel() 

447 data = weights.ravel() 

448 

449 # Remove zero weights to save memory 

450 nonzero_mask = data != 0 

451 rows = rows[nonzero_mask] 

452 cols = cols[nonzero_mask] 

453 data = data[nonzero_mask] 

454 

455 # Add identity matrix elements for zero-sigma positions 

456 if np.any(zero_mask): 

457 zero_indices = np.where(zero_mask)[0] 

458 rows = np.concatenate([rows, zero_indices]) 

459 cols = np.concatenate([cols, zero_indices]) 

460 data = np.concatenate([data, np.ones(len(zero_indices))]) 

461 

462 # Create the sparse matrix 

463 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n)) 

464 

465 # remove diffusion from signal with inverse of the convolution matrix 

466 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0] 

467 

468 # Apply the filter 

469 return conv_matrix.dot(input_signal) 

470 

471 

472def deconvolve_diffusion( 

473 *, output_signal: npt.ArrayLike, sigma_array: npt.ArrayLike, truncate: float = 4.0 

474) -> npt.NDArray[np.floating]: 

475 """Apply Gaussian deconvolution with position-dependent sigma values. 

476 

477 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

478 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

479 It implements the filter using a sparse convolution matrix where each row 

480 represents a Gaussian kernel with a locally-appropriate standard deviation. 

481 

482 Parameters 

483 ---------- 

484 output_signal : numpy.ndarray 

485 One-dimensional input array to be filtered. 

486 sigma_array : numpy.ndarray 

487 One-dimensional array of standard deviation values, must have same length 

488 as output_signal. Each value specifies the Gaussian kernel width at the 

489 corresponding position. 

490 truncate : float, optional 

491 Truncate the filter at this many standard deviations. 

492 Default is 4.0. 

493 

494 Returns 

495 ------- 

496 numpy.ndarray 

497 The filtered output signal. Has the same shape as output_signal. 

498 """ 

499 msg = "Deconvolution is not implemented yet" 

500 raise NotImplementedError(msg) 

501 

502 

503def create_example_data( 

504 *, nx: int = 1000, domain_length: float = 10.0, diffusivity: float = 0.1 

505) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating], npt.NDArray[np.floating], npt.NDArray[np.floating]]: 

506 """Create example data for demonstrating variable-sigma diffusion. 

507 

508 Parameters 

509 ---------- 

510 nx : int, optional 

511 Number of spatial points. Default is 1000. 

512 domain_length : float, optional 

513 Domain length. Default is 10.0. 

514 diffusivity : float, optional 

515 diffusivity. Default is 0.1. 

516 

517 Returns 

518 ------- 

519 x : numpy.ndarray 

520 Spatial coordinates. 

521 signal : numpy.ndarray 

522 Initial signal (sum of two Gaussians). 

523 sigma_array : numpy.ndarray 

524 Array of sigma values varying in space. 

525 dt : numpy.ndarray 

526 Array of time steps varying in space. 

527 

528 Notes 

529 ----- 

530 This function creates a test case with: 

531 

532 - A signal composed of two Gaussian peaks 

533 - Sinusoidally varying time steps 

534 - Corresponding sigma values for diffusion 

535 """ 

536 # Create spatial grid 

537 x = np.linspace(0, domain_length, nx) 

538 dx = x[1] - x[0] 

539 

540 # Create initial signal (two Gaussians) 

541 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx) 

542 

543 # Create varying time steps 

544 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length)) 

545 

546 # Calculate corresponding sigma values 

547 sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

548 

549 return x, signal, sigma_array, dt 

550 

551 

552if __name__ == "__main__": 

553 from matplotlib import pyplot as plt 

554 

555 # Generate example data 

556 x, signal, sigma_array, dt = create_example_data() 

557 

558 # Apply variable-sigma filtering 

559 filtered = convolve_diffusion(input_signal=signal, sigma_array=sigma_array * 5) 

560 

561 # Compare with regular Gaussian filter 

562 avg_sigma = np.mean(sigma_array) 

563 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma) 

564 plt.figure(figsize=(10, 6)) 

565 plt.plot(x, signal, label="Original signal", lw=0.8) 

566 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0) 

567 

568 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")