Coverage for src/gwtransport/diffusion.py: 57%

75 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 20:01 +0000

1"""Module that implements diffusion.""" 

2 

3import matplotlib.pyplot as plt 

4import numpy as np 

5import pandas as pd 

6from scipy import ndimage, sparse 

7 

8from gwtransport.residence_time import residence_time 

9from gwtransport.utils import compute_time_edges, diff 

10 

11 

12def infiltration_to_extraction( 

13 cin: pd.Series, 

14 flow: pd.Series, 

15 aquifer_pore_volume: float, 

16 diffusivity: float = 0.1, 

17 retardation_factor: float = 1.0, 

18 aquifer_length: float = 80.0, 

19 porosity: float = 0.35, 

20) -> pd.Series: 

21 """Compute the diffusion of a compound during 1D transport in the aquifer. 

22 

23 This function represents infiltration to extraction modeling (equivalent to convolution). 

24 

25 Parameters 

26 ---------- 

27 cin : pandas.Series 

28 Concentration or temperature of the compound in the infiltrating water [ng/m3]. 

29 flow : pandas.Series 

30 Flow rate of water in the aquifer [m3/day]. 

31 aquifer_pore_volume : float 

32 Pore volume of the aquifer [m3]. 

33 diffusivity : float, optional 

34 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

35 retardation_factor : float, optional 

36 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

37 aquifer_length : float, optional 

38 Length of the aquifer [m]. Default is 80.0. 

39 porosity : float, optional 

40 Porosity of the aquifer [dimensionless]. Default is 0.35. 

41 

42 Returns 

43 ------- 

44 numpy.ndarray 

45 Concentration of the compound in the extracted water [ng/m3]. 

46 """ 

47 sigma_array = compute_sigma_array( 

48 flow=flow, 

49 aquifer_pore_volume=aquifer_pore_volume, 

50 diffusivity=diffusivity, 

51 retardation_factor=retardation_factor, 

52 aquifer_length=aquifer_length, 

53 porosity=porosity, 

54 ) 

55 return convolve_diffusion(cin.values, sigma_array, truncate=30.0) 

56 

57 

58def extraction_to_infiltration( 

59 cout, 

60 flow, 

61 aquifer_pore_volume, 

62 diffusivity=0.1, 

63 retardation_factor=1.0, 

64 aquifer_length=80.0, 

65 porosity=0.35, 

66): 

67 """Compute the reverse diffusion of a compound during 1D transport in the aquifer. 

68 

69 This function represents extraction to infiltration modeling (equivalent to deconvolution). 

70 

71 Parameters 

72 ---------- 

73 cout : pandas.Series 

74 Concentration or temperature of the compound in the extracted water [ng/m3]. 

75 flow : pandas.Series 

76 Flow rate of water in the aquifer [m3/day]. 

77 aquifer_pore_volume : float 

78 Pore volume of the aquifer [m3]. 

79 diffusivity : float, optional 

80 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

81 retardation_factor : float, optional 

82 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

83 aquifer_length : float, optional 

84 Length of the aquifer [m]. Default is 80.0. 

85 porosity : float, optional 

86 Porosity of the aquifer [dimensionless]. Default is 0.35. 

87 

88 Returns 

89 ------- 

90 numpy.ndarray 

91 Concentration of the compound in the infiltrating water [ng/m3]. 

92 

93 Notes 

94 ----- 

95 Extraction to infiltration diffusion (deconvolution) is mathematically ill-posed and requires 

96 regularization to obtain a stable solution. 

97 """ 

98 msg = "Extraction to infiltration diffusion (deconvolution) is not implemented yet" 

99 raise NotImplementedError(msg) 

100 

101 

102def compute_sigma_array( 

103 flow: pd.Series, 

104 aquifer_pore_volume: float, 

105 diffusivity: float = 0.1, 

106 retardation_factor: float = 1.0, 

107 aquifer_length: float = 80.0, 

108 porosity: float = 0.35, 

109) -> np.ndarray: 

110 """Compute sigma values for diffusion based on flow and aquifer properties. 

111 

112 Parameters 

113 ---------- 

114 flow : pandas.Series 

115 Flow rate of water in the aquifer [m3/day]. 

116 aquifer_pore_volume : float 

117 Pore volume of the aquifer [m3]. 

118 diffusivity : float, optional 

119 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

120 retardation_factor : float, optional 

121 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

122 aquifer_length : float, optional 

123 Length of the aquifer [m]. Default is 80.0. 

124 porosity : float, optional 

125 Porosity of the aquifer [dimensionless]. Default is 0.35. 

126 

127 Returns 

128 ------- 

129 numpy.ndarray 

130 Array of sigma values for diffusion. 

131 """ 

132 # Create flow tedges from the flow series index (assuming it's at the end of bins) 

133 flow_tedges = compute_time_edges( 

134 tedges=None, tstart=None, tend=pd.DatetimeIndex(flow.index), number_of_bins=len(flow) 

135 ) 

136 residence_time_series = residence_time( 

137 flow=flow, 

138 flow_tedges=flow_tedges, 

139 aquifer_pore_volume=aquifer_pore_volume, 

140 retardation_factor=retardation_factor, 

141 direction="infiltration_to_extraction", 

142 return_pandas_series=True, 

143 ) 

144 residence_time_series = residence_time_series.interpolate(method="nearest").ffill().bfill() 

145 timedelta_at_departure = diff(flow.index, alignment="right") / pd.to_timedelta(1, unit="D") 

146 volume_infiltrated_at_departure = flow * timedelta_at_departure 

147 cross_sectional_area = aquifer_pore_volume / aquifer_length 

148 dx = volume_infiltrated_at_departure / cross_sectional_area / porosity 

149 sigma_array = np.sqrt(2 * diffusivity * residence_time_series) / dx 

150 return np.clip(a=sigma_array.values, a_min=0.0, a_max=100) 

151 

152 

153def convolve_diffusion(input_signal, sigma_array, truncate=4.0): 

154 """Apply Gaussian filter with position-dependent sigma values. 

155 

156 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

157 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

158 It implements the filter using a sparse convolution matrix where each row 

159 represents a Gaussian kernel with a locally-appropriate standard deviation. 

160 

161 Parameters 

162 ---------- 

163 input_signal : numpy.ndarray 

164 One-dimensional input array to be filtered. 

165 sigma_array : numpy.ndarray 

166 One-dimensional array of standard deviation values, must have same length 

167 as input_signal. Each value specifies the Gaussian kernel width at the 

168 corresponding position. 

169 truncate : float, optional 

170 Truncate the filter at this many standard deviations. 

171 Default is 4.0. 

172 

173 Returns 

174 ------- 

175 numpy.ndarray 

176 The filtered input signal. Has the same shape as input_signal. 

177 

178 Notes 

179 ----- 

180 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest` 

181 in `scipy.ndimage.gaussian_filter1d`. 

182 

183 The function constructs a sparse convolution matrix where each row represents 

184 a position-specific Gaussian kernel. The kernel width adapts to local sigma 

185 values, making it suitable for problems with varying diffusivitys 

186 or time steps. 

187 

188 For diffusion problems, the local sigma values can be calculated as: 

189 sigma = sqrt(2 * diffusivity * dt) / dx 

190 where diffusivity is the diffusivity, dt is the time step, and dx is the 

191 spatial step size. 

192 

193 The implementation uses sparse matrices for memory efficiency when dealing 

194 with large signals or when sigma values vary significantly. 

195 

196 See Also 

197 -------- 

198 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering 

199 scipy.sparse : Sparse matrix implementations 

200 

201 Examples 

202 -------- 

203 >>> # Create a sample signal 

204 >>> x = np.linspace(0, 10, 1000) 

205 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) 

206 

207 >>> # Create position-dependent sigma values 

208 >>> diffusivity = 0.1 # diffusivity 

209 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps 

210 >>> dx = x[1] - x[0] 

211 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

212 

213 >>> # Apply the filter 

214 >>> filtered = convolve_diffusion(signal, sigma_array) 

215 """ 

216 if len(input_signal) != len(sigma_array): 

217 msg = "Input signal and sigma array must have the same length" 

218 raise ValueError(msg) 

219 

220 n = len(input_signal) 

221 

222 # Handle zero sigma values 

223 zero_mask = sigma_array == 0 

224 if np.all(zero_mask): 

225 return input_signal.copy() 

226 

227 # Get maximum kernel size and create position arrays 

228 max_sigma = np.max(sigma_array) 

229 max_radius = int(truncate * max_sigma + 0.5) 

230 

231 # Create arrays for all possible kernel positions 

232 positions = np.arange(-max_radius, max_radius + 1) 

233 

234 # Create a mask for valid sigma values 

235 valid_sigma = ~zero_mask 

236 valid_indices = np.where(valid_sigma)[0] 

237 

238 # Create position matrices for broadcasting 

239 # Shape: (n_valid_points, 1) 

240 center_positions = valid_indices[:, np.newaxis] 

241 # Shape: (1, max_kernel_size) 

242 kernel_positions = positions[np.newaxis, :] 

243 

244 # Calculate the relative positions for each point 

245 # This creates a matrix of shape (n_valid_points, max_kernel_size) 

246 relative_positions = kernel_positions 

247 

248 # Calculate Gaussian weights for all positions at once 

249 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size) 

250 sigmas = sigma_array[valid_sigma][:, np.newaxis] 

251 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2) 

252 

253 # Normalize each kernel 

254 weights /= np.sum(weights, axis=1)[:, np.newaxis] 

255 

256 # Calculate absolute positions in the signal 

257 absolute_positions = center_positions + relative_positions 

258 

259 # Handle boundary conditions 

260 absolute_positions = np.clip(absolute_positions, 0, n - 1) 

261 

262 # Create coordinate arrays for sparse matrix 

263 rows = np.repeat(center_positions, weights.shape[1]) 

264 cols = absolute_positions.ravel() 

265 data = weights.ravel() 

266 

267 # Remove zero weights to save memory 

268 nonzero_mask = data != 0 

269 rows = rows[nonzero_mask] 

270 cols = cols[nonzero_mask] 

271 data = data[nonzero_mask] 

272 

273 # Add identity matrix elements for zero-sigma positions 

274 if np.any(zero_mask): 

275 zero_indices = np.where(zero_mask)[0] 

276 rows = np.concatenate([rows, zero_indices]) 

277 cols = np.concatenate([cols, zero_indices]) 

278 data = np.concatenate([data, np.ones(len(zero_indices))]) 

279 

280 # Create the sparse matrix 

281 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n)) 

282 

283 # remove diffusion from signal with inverse of the convolution matrix 

284 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0] 

285 

286 # Apply the filter 

287 return conv_matrix.dot(input_signal) 

288 

289 

290def deconvolve_diffusion(output_signal, sigma_array, truncate=4.0): 

291 """Apply Gaussian deconvolution with position-dependent sigma values. 

292 

293 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

294 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

295 It implements the filter using a sparse convolution matrix where each row 

296 represents a Gaussian kernel with a locally-appropriate standard deviation. 

297 

298 Parameters 

299 ---------- 

300 output_signal : numpy.ndarray 

301 One-dimensional input array to be filtered. 

302 sigma_array : numpy.ndarray 

303 One-dimensional array of standard deviation values, must have same length 

304 as output_signal. Each value specifies the Gaussian kernel width at the 

305 corresponding position. 

306 truncate : float, optional 

307 Truncate the filter at this many standard deviations. 

308 Default is 4.0. 

309 

310 Returns 

311 ------- 

312 numpy.ndarray 

313 The filtered output signal. Has the same shape as output_signal. 

314 """ 

315 msg = "Deconvolution is not implemented yet" 

316 raise NotImplementedError(msg) 

317 

318 

319def create_example_data(nx=1000, domain_length=10.0, diffusivity=0.1): 

320 """Create example data for demonstrating variable-sigma diffusion. 

321 

322 Parameters 

323 ---------- 

324 nx : int, optional 

325 Number of spatial points. Default is 1000. 

326 domain_length : float, optional 

327 Domain length. Default is 10.0. 

328 diffusivity : float, optional 

329 diffusivity. Default is 0.1. 

330 

331 Returns 

332 ------- 

333 x : numpy.ndarray 

334 Spatial coordinates. 

335 signal : numpy.ndarray 

336 Initial signal (sum of two Gaussians). 

337 sigma_array : numpy.ndarray 

338 Array of sigma values varying in space. 

339 dt : numpy.ndarray 

340 Array of time steps varying in space. 

341 

342 Notes 

343 ----- 

344 This function creates a test case with: 

345 - A signal composed of two Gaussian peaks 

346 - Sinusoidally varying time steps 

347 - Corresponding sigma values for diffusion 

348 """ 

349 # Create spatial grid 

350 x = np.linspace(0, domain_length, nx) 

351 dx = x[1] - x[0] 

352 

353 # Create initial signal (two Gaussians) 

354 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx) 

355 

356 # Create varying time steps 

357 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length)) 

358 

359 # Calculate corresponding sigma values 

360 sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

361 

362 return x, signal, sigma_array, dt 

363 

364 

365if __name__ == "__main__": 

366 from matplotlib import pyplot as plt 

367 

368 # Generate example data 

369 x, signal, sigma_array, dt = create_example_data() 

370 

371 # Apply variable-sigma filtering 

372 filtered = convolve_diffusion(signal, sigma_array * 5) 

373 

374 # Compare with regular Gaussian filter 

375 avg_sigma = np.mean(sigma_array) 

376 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma) 

377 plt.figure(figsize=(10, 6)) 

378 plt.plot(x, signal, label="Original signal", lw=0.8) 

379 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0) 

380 

381 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")