Coverage for src/gwtransport/diffusion.py: 57%

76 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-06 15:45 +0000

1"""Module that implements diffusion.""" 

2 

3import matplotlib.pyplot as plt 

4import numpy as np 

5import pandas as pd 

6from scipy import ndimage, sparse 

7 

8from gwtransport.residence_time import residence_time 

9from gwtransport.utils import diff 

10 

11 

12def forward( 

13 cin, 

14 flow, 

15 aquifer_pore_volume, 

16 diffusivity=0.1, 

17 retardation_factor=1.0, 

18 aquifer_length=80.0, 

19 porosity=0.35, 

20): 

21 """Compute the diffusion of a compound during 1D transport in the aquifer. 

22 

23 This function represents a forward operation (equivalent to convolution). 

24 

25 Parameters 

26 ---------- 

27 cin : pandas.Series 

28 Concentration or temperature of the compound in the infiltrating water [ng/m3]. 

29 flow : pandas.Series 

30 Flow rate of water in the aquifer [m3/day]. 

31 aquifer_pore_volume : float 

32 Pore volume of the aquifer [m3]. 

33 diffusivity : float, optional 

34 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

35 retardation_factor : float, optional 

36 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

37 aquifer_length : float, optional 

38 Length of the aquifer [m]. Default is 80.0. 

39 porosity : float, optional 

40 Porosity of the aquifer [dimensionless]. Default is 0.35. 

41 

42 Returns 

43 ------- 

44 pandas.Series 

45 Concentration of the compound in the extracted water [ng/m3]. 

46 """ 

47 sigma_array = compute_sigma_array( 

48 flow=flow, 

49 aquifer_pore_volume=aquifer_pore_volume, 

50 diffusivity=diffusivity, 

51 retardation_factor=retardation_factor, 

52 aquifer_length=aquifer_length, 

53 porosity=porosity, 

54 ) 

55 return convolve_diffusion(cin.values, sigma_array, truncate=30.0) 

56 

57 

58def backward( 

59 cout, 

60 flow, 

61 aquifer_pore_volume, 

62 diffusivity=0.1, 

63 retardation_factor=1.0, 

64 aquifer_length=80.0, 

65 porosity=0.35, 

66): 

67 """Compute the reverse diffusion of a compound during 1D transport in the aquifer. 

68 

69 This function represents a backward operation (equivalent to deconvolution). 

70 

71 Parameters 

72 ---------- 

73 cout : pandas.Series 

74 Concentration or temperature of the compound in the extracted water [ng/m3]. 

75 flow : pandas.Series 

76 Flow rate of water in the aquifer [m3/day]. 

77 aquifer_pore_volume : float 

78 Pore volume of the aquifer [m3]. 

79 diffusivity : float, optional 

80 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

81 retardation_factor : float, optional 

82 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

83 aquifer_length : float, optional 

84 Length of the aquifer [m]. Default is 80.0. 

85 porosity : float, optional 

86 Porosity of the aquifer [dimensionless]. Default is 0.35. 

87 

88 Returns 

89 ------- 

90 pandas.Series 

91 Concentration of the compound in the infiltrating water [ng/m3]. 

92 

93 Notes 

94 ----- 

95 Backward diffusion (deconvolution) is mathematically ill-posed and requires 

96 regularization to obtain a stable solution. 

97 """ 

98 msg = "Backward diffusion (deconvolution) is not implemented yet" 

99 raise NotImplementedError(msg) 

100 

101 

102def compute_sigma_array( 

103 flow, aquifer_pore_volume, diffusivity=0.1, retardation_factor=1.0, aquifer_length=80.0, porosity=0.35 

104): 

105 """Compute sigma values for diffusion based on flow and aquifer properties. 

106 

107 Parameters 

108 ---------- 

109 flow : pandas.Series 

110 Flow rate of water in the aquifer [m3/day]. 

111 aquifer_pore_volume : float 

112 Pore volume of the aquifer [m3]. 

113 diffusivity : float, optional 

114 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

115 retardation_factor : float, optional 

116 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

117 aquifer_length : float, optional 

118 Length of the aquifer [m]. Default is 80.0. 

119 porosity : float, optional 

120 Porosity of the aquifer [dimensionless]. Default is 0.35. 

121 

122 Returns 

123 ------- 

124 array 

125 Array of sigma values for diffusion. 

126 """ 

127 residence_time = residence_time( 

128 flow=flow, 

129 aquifer_pore_volume=aquifer_pore_volume, 

130 retardation_factor=retardation_factor, 

131 direction="infiltration", 

132 return_pandas_series=True, 

133 ) 

134 residence_time = residence_time.interpolate(method="nearest").ffill().bfill() 

135 timedelta_at_departure = diff(flow.index, alignment="right") / pd.to_timedelta(1, unit="D") 

136 volume_infiltrated_at_departure = flow * timedelta_at_departure 

137 cross_sectional_area = aquifer_pore_volume / aquifer_length 

138 dx = volume_infiltrated_at_departure / cross_sectional_area / porosity 

139 sigma_array = np.sqrt(2 * diffusivity * residence_time) / dx 

140 return np.clip(a=sigma_array.values, a_min=0.0, a_max=100) 

141 

142 

143def convolve_diffusion(input_signal, sigma_array, truncate=4.0): 

144 """Apply Gaussian filter with position-dependent sigma values. 

145 

146 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

147 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

148 It implements the filter using a sparse convolution matrix where each row 

149 represents a Gaussian kernel with a locally-appropriate standard deviation. 

150 

151 Parameters 

152 ---------- 

153 input_signal : ndarray 

154 One-dimensional input array to be filtered. 

155 sigma_array : ndarray 

156 One-dimensional array of standard deviation values, must have same length 

157 as input_signal. Each value specifies the Gaussian kernel width at the 

158 corresponding position. 

159 truncate : float, optional 

160 Truncate the filter at this many standard deviations. 

161 Default is 4.0. 

162 

163 Returns 

164 ------- 

165 ndarray 

166 The filtered input signal. Has the same shape as input_signal. 

167 

168 Notes 

169 ----- 

170 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest` 

171 in `scipy.ndimage.gaussian_filter1d`. 

172 

173 The function constructs a sparse convolution matrix where each row represents 

174 a position-specific Gaussian kernel. The kernel width adapts to local sigma 

175 values, making it suitable for problems with varying diffusivitys 

176 or time steps. 

177 

178 For diffusion problems, the local sigma values can be calculated as: 

179 sigma = sqrt(2 * diffusivity * dt) / dx 

180 where diffusivity is the diffusivity, dt is the time step, and dx is the 

181 spatial step size. 

182 

183 The implementation uses sparse matrices for memory efficiency when dealing 

184 with large signals or when sigma values vary significantly. 

185 

186 See Also 

187 -------- 

188 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering 

189 scipy.sparse : Sparse matrix implementations 

190 

191 Examples 

192 -------- 

193 >>> # Create a sample signal 

194 >>> x = np.linspace(0, 10, 1000) 

195 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) 

196 

197 >>> # Create position-dependent sigma values 

198 >>> diffusivity = 0.1 # diffusivity 

199 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps 

200 >>> dx = x[1] - x[0] 

201 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

202 

203 >>> # Apply the filter 

204 >>> filtered = convolve_diffusion(signal, sigma_array) 

205 """ 

206 if len(input_signal) != len(sigma_array): 

207 msg = "Input signal and sigma array must have the same length" 

208 raise ValueError(msg) 

209 

210 n = len(input_signal) 

211 

212 # Handle zero sigma values 

213 zero_mask = sigma_array == 0 

214 if np.all(zero_mask): 

215 return input_signal.copy() 

216 

217 # Get maximum kernel size and create position arrays 

218 max_sigma = np.max(sigma_array) 

219 max_radius = int(truncate * max_sigma + 0.5) 

220 

221 # Create arrays for all possible kernel positions 

222 positions = np.arange(-max_radius, max_radius + 1) 

223 

224 # Create a mask for valid sigma values 

225 valid_sigma = ~zero_mask 

226 valid_indices = np.where(valid_sigma)[0] 

227 

228 # Create position matrices for broadcasting 

229 # Shape: (n_valid_points, 1) 

230 center_positions = valid_indices[:, np.newaxis] 

231 # Shape: (1, max_kernel_size) 

232 kernel_positions = positions[np.newaxis, :] 

233 

234 # Calculate the relative positions for each point 

235 # This creates a matrix of shape (n_valid_points, max_kernel_size) 

236 relative_positions = kernel_positions 

237 

238 # Calculate Gaussian weights for all positions at once 

239 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size) 

240 sigmas = sigma_array[valid_sigma][:, np.newaxis] 

241 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2) 

242 

243 # Normalize each kernel 

244 weights /= np.sum(weights, axis=1)[:, np.newaxis] 

245 

246 # Calculate absolute positions in the signal 

247 absolute_positions = center_positions + relative_positions 

248 

249 # Handle boundary conditions 

250 absolute_positions = np.clip(absolute_positions, 0, n - 1) 

251 

252 # Create coordinate arrays for sparse matrix 

253 rows = np.repeat(center_positions, weights.shape[1]) 

254 cols = absolute_positions.ravel() 

255 data = weights.ravel() 

256 

257 # Remove zero weights to save memory 

258 nonzero_mask = data != 0 

259 rows = rows[nonzero_mask] 

260 cols = cols[nonzero_mask] 

261 data = data[nonzero_mask] 

262 

263 # Add identity matrix elements for zero-sigma positions 

264 if np.any(zero_mask): 

265 zero_indices = np.where(zero_mask)[0] 

266 rows = np.concatenate([rows, zero_indices]) 

267 cols = np.concatenate([cols, zero_indices]) 

268 data = np.concatenate([data, np.ones(len(zero_indices))]) 

269 

270 # Create the sparse matrix 

271 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n)) 

272 

273 # remove diffusion from signal with inverse of the convolution matrix 

274 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0] 

275 

276 # Apply the filter 

277 return conv_matrix.dot(input_signal) 

278 

279 

280def deconvolve_diffusion(output_signal, sigma_array, truncate=4.0): 

281 """Apply Gaussian deconvolution with position-dependent sigma values. 

282 

283 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

284 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

285 It implements the filter using a sparse convolution matrix where each row 

286 represents a Gaussian kernel with a locally-appropriate standard deviation. 

287 

288 Parameters 

289 ---------- 

290 output_signal : ndarray 

291 One-dimensional input array to be filtered. 

292 sigma_array : ndarray 

293 One-dimensional array of standard deviation values, must have same length 

294 as output_signal. Each value specifies the Gaussian kernel width at the 

295 corresponding position. 

296 truncate : float, optional 

297 Truncate the filter at this many standard deviations. 

298 Default is 4.0. 

299 

300 Returns 

301 ------- 

302 ndarray 

303 The filtered output signal. Has the same shape as output_signal. 

304 """ 

305 msg = "Deconvolution is not implemented yet" 

306 raise NotImplementedError(msg) 

307 

308 

309def create_example_data(nx=1000, domain_length=10.0, diffusivity=0.1): 

310 """Create example data for demonstrating variable-sigma diffusion. 

311 

312 Parameters 

313 ---------- 

314 nx : int, optional 

315 Number of spatial points. Default is 1000. 

316 domain_length : float, optional 

317 Domain length. Default is 10.0. 

318 diffusivity : float, optional 

319 diffusivity. Default is 0.1. 

320 

321 Returns 

322 ------- 

323 x : ndarray 

324 Spatial coordinates. 

325 signal : ndarray 

326 Initial signal (sum of two Gaussians). 

327 sigma_array : ndarray 

328 Array of sigma values varying in space. 

329 dt : ndarray 

330 Array of time steps varying in space. 

331 

332 Notes 

333 ----- 

334 This function creates a test case with: 

335 - A signal composed of two Gaussian peaks 

336 - Sinusoidally varying time steps 

337 - Corresponding sigma values for diffusion 

338 """ 

339 # Create spatial grid 

340 x = np.linspace(0, domain_length, nx) 

341 dx = x[1] - x[0] 

342 

343 # Create initial signal (two Gaussians) 

344 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx) 

345 

346 # Create varying time steps 

347 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length)) 

348 

349 # Calculate corresponding sigma values 

350 sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

351 

352 return x, signal, sigma_array, dt 

353 

354 

355if __name__ == "__main__": 

356 from matplotlib import pyplot as plt 

357 

358 # Generate example data 

359 x, signal, sigma_array, dt = create_example_data() 

360 

361 # Apply variable-sigma filtering 

362 filtered = convolve_diffusion(signal, sigma_array * 5) 

363 

364 # Compare with regular Gaussian filter 

365 avg_sigma = np.mean(sigma_array) 

366 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma) 

367 plt.figure(figsize=(10, 6)) 

368 plt.plot(x, signal, label="Original signal", lw=0.8) 

369 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0) 

370 

371 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")