Coverage for src/gwtransport/diffusion.py: 0%

77 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-22 18:30 +0000

1"""Module that implements diffusion.""" 

2 

3import matplotlib.pyplot as plt 

4import numpy as np 

5import pandas as pd 

6from scipy import ndimage, sparse 

7 

8from gwtransport.residence_time import residence_time 

9from gwtransport.utils import compute_time_edges, diff 

10 

11 

12def forward( 

13 cin, 

14 flow, 

15 aquifer_pore_volume, 

16 diffusivity=0.1, 

17 retardation_factor=1.0, 

18 aquifer_length=80.0, 

19 porosity=0.35, 

20): 

21 """Compute the diffusion of a compound during 1D transport in the aquifer. 

22 

23 This function represents a forward operation (equivalent to convolution). 

24 

25 Parameters 

26 ---------- 

27 cin : pandas.Series 

28 Concentration or temperature of the compound in the infiltrating water [ng/m3]. 

29 flow : pandas.Series 

30 Flow rate of water in the aquifer [m3/day]. 

31 aquifer_pore_volume : float 

32 Pore volume of the aquifer [m3]. 

33 diffusivity : float, optional 

34 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

35 retardation_factor : float, optional 

36 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

37 aquifer_length : float, optional 

38 Length of the aquifer [m]. Default is 80.0. 

39 porosity : float, optional 

40 Porosity of the aquifer [dimensionless]. Default is 0.35. 

41 

42 Returns 

43 ------- 

44 numpy.ndarray 

45 Concentration of the compound in the extracted water [ng/m3]. 

46 """ 

47 sigma_array = compute_sigma_array( 

48 flow=flow, 

49 aquifer_pore_volume=aquifer_pore_volume, 

50 diffusivity=diffusivity, 

51 retardation_factor=retardation_factor, 

52 aquifer_length=aquifer_length, 

53 porosity=porosity, 

54 ) 

55 return convolve_diffusion(cin.values, sigma_array, truncate=30.0) 

56 

57 

58def backward( 

59 cout, 

60 flow, 

61 aquifer_pore_volume, 

62 diffusivity=0.1, 

63 retardation_factor=1.0, 

64 aquifer_length=80.0, 

65 porosity=0.35, 

66): 

67 """Compute the reverse diffusion of a compound during 1D transport in the aquifer. 

68 

69 This function represents a backward operation (equivalent to deconvolution). 

70 

71 Parameters 

72 ---------- 

73 cout : pandas.Series 

74 Concentration or temperature of the compound in the extracted water [ng/m3]. 

75 flow : pandas.Series 

76 Flow rate of water in the aquifer [m3/day]. 

77 aquifer_pore_volume : float 

78 Pore volume of the aquifer [m3]. 

79 diffusivity : float, optional 

80 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

81 retardation_factor : float, optional 

82 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

83 aquifer_length : float, optional 

84 Length of the aquifer [m]. Default is 80.0. 

85 porosity : float, optional 

86 Porosity of the aquifer [dimensionless]. Default is 0.35. 

87 

88 Returns 

89 ------- 

90 numpy.ndarray 

91 Concentration of the compound in the infiltrating water [ng/m3]. 

92 

93 Notes 

94 ----- 

95 Backward diffusion (deconvolution) is mathematically ill-posed and requires 

96 regularization to obtain a stable solution. 

97 """ 

98 msg = "Backward diffusion (deconvolution) is not implemented yet" 

99 raise NotImplementedError(msg) 

100 

101 

102def compute_sigma_array( 

103 flow, aquifer_pore_volume, diffusivity=0.1, retardation_factor=1.0, aquifer_length=80.0, porosity=0.35 

104): 

105 """Compute sigma values for diffusion based on flow and aquifer properties. 

106 

107 Parameters 

108 ---------- 

109 flow : pandas.Series 

110 Flow rate of water in the aquifer [m3/day]. 

111 aquifer_pore_volume : float 

112 Pore volume of the aquifer [m3]. 

113 diffusivity : float, optional 

114 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1. 

115 retardation_factor : float, optional 

116 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0. 

117 aquifer_length : float, optional 

118 Length of the aquifer [m]. Default is 80.0. 

119 porosity : float, optional 

120 Porosity of the aquifer [dimensionless]. Default is 0.35. 

121 

122 Returns 

123 ------- 

124 array 

125 Array of sigma values for diffusion. 

126 """ 

127 # Create flow tedges from the flow series index (assuming it's at the end of bins) 

128 flow_tedges = compute_time_edges(tedges=None, tstart=None, tend=flow.index, number_of_bins=len(flow)) 

129 residence_time = residence_time( 

130 flow=flow, 

131 flow_tedges=flow_tedges, 

132 aquifer_pore_volume=aquifer_pore_volume, 

133 retardation_factor=retardation_factor, 

134 direction="infiltration", 

135 return_pandas_series=True, 

136 ) 

137 residence_time = residence_time.interpolate(method="nearest").ffill().bfill() 

138 timedelta_at_departure = diff(flow.index, alignment="right") / pd.to_timedelta(1, unit="D") 

139 volume_infiltrated_at_departure = flow * timedelta_at_departure 

140 cross_sectional_area = aquifer_pore_volume / aquifer_length 

141 dx = volume_infiltrated_at_departure / cross_sectional_area / porosity 

142 sigma_array = np.sqrt(2 * diffusivity * residence_time) / dx 

143 return np.clip(a=sigma_array.values, a_min=0.0, a_max=100) 

144 

145 

146def convolve_diffusion(input_signal, sigma_array, truncate=4.0): 

147 """Apply Gaussian filter with position-dependent sigma values. 

148 

149 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

150 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

151 It implements the filter using a sparse convolution matrix where each row 

152 represents a Gaussian kernel with a locally-appropriate standard deviation. 

153 

154 Parameters 

155 ---------- 

156 input_signal : ndarray 

157 One-dimensional input array to be filtered. 

158 sigma_array : ndarray 

159 One-dimensional array of standard deviation values, must have same length 

160 as input_signal. Each value specifies the Gaussian kernel width at the 

161 corresponding position. 

162 truncate : float, optional 

163 Truncate the filter at this many standard deviations. 

164 Default is 4.0. 

165 

166 Returns 

167 ------- 

168 ndarray 

169 The filtered input signal. Has the same shape as input_signal. 

170 

171 Notes 

172 ----- 

173 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest` 

174 in `scipy.ndimage.gaussian_filter1d`. 

175 

176 The function constructs a sparse convolution matrix where each row represents 

177 a position-specific Gaussian kernel. The kernel width adapts to local sigma 

178 values, making it suitable for problems with varying diffusivitys 

179 or time steps. 

180 

181 For diffusion problems, the local sigma values can be calculated as: 

182 sigma = sqrt(2 * diffusivity * dt) / dx 

183 where diffusivity is the diffusivity, dt is the time step, and dx is the 

184 spatial step size. 

185 

186 The implementation uses sparse matrices for memory efficiency when dealing 

187 with large signals or when sigma values vary significantly. 

188 

189 See Also 

190 -------- 

191 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering 

192 scipy.sparse : Sparse matrix implementations 

193 

194 Examples 

195 -------- 

196 >>> # Create a sample signal 

197 >>> x = np.linspace(0, 10, 1000) 

198 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) 

199 

200 >>> # Create position-dependent sigma values 

201 >>> diffusivity = 0.1 # diffusivity 

202 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps 

203 >>> dx = x[1] - x[0] 

204 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

205 

206 >>> # Apply the filter 

207 >>> filtered = convolve_diffusion(signal, sigma_array) 

208 """ 

209 if len(input_signal) != len(sigma_array): 

210 msg = "Input signal and sigma array must have the same length" 

211 raise ValueError(msg) 

212 

213 n = len(input_signal) 

214 

215 # Handle zero sigma values 

216 zero_mask = sigma_array == 0 

217 if np.all(zero_mask): 

218 return input_signal.copy() 

219 

220 # Get maximum kernel size and create position arrays 

221 max_sigma = np.max(sigma_array) 

222 max_radius = int(truncate * max_sigma + 0.5) 

223 

224 # Create arrays for all possible kernel positions 

225 positions = np.arange(-max_radius, max_radius + 1) 

226 

227 # Create a mask for valid sigma values 

228 valid_sigma = ~zero_mask 

229 valid_indices = np.where(valid_sigma)[0] 

230 

231 # Create position matrices for broadcasting 

232 # Shape: (n_valid_points, 1) 

233 center_positions = valid_indices[:, np.newaxis] 

234 # Shape: (1, max_kernel_size) 

235 kernel_positions = positions[np.newaxis, :] 

236 

237 # Calculate the relative positions for each point 

238 # This creates a matrix of shape (n_valid_points, max_kernel_size) 

239 relative_positions = kernel_positions 

240 

241 # Calculate Gaussian weights for all positions at once 

242 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size) 

243 sigmas = sigma_array[valid_sigma][:, np.newaxis] 

244 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2) 

245 

246 # Normalize each kernel 

247 weights /= np.sum(weights, axis=1)[:, np.newaxis] 

248 

249 # Calculate absolute positions in the signal 

250 absolute_positions = center_positions + relative_positions 

251 

252 # Handle boundary conditions 

253 absolute_positions = np.clip(absolute_positions, 0, n - 1) 

254 

255 # Create coordinate arrays for sparse matrix 

256 rows = np.repeat(center_positions, weights.shape[1]) 

257 cols = absolute_positions.ravel() 

258 data = weights.ravel() 

259 

260 # Remove zero weights to save memory 

261 nonzero_mask = data != 0 

262 rows = rows[nonzero_mask] 

263 cols = cols[nonzero_mask] 

264 data = data[nonzero_mask] 

265 

266 # Add identity matrix elements for zero-sigma positions 

267 if np.any(zero_mask): 

268 zero_indices = np.where(zero_mask)[0] 

269 rows = np.concatenate([rows, zero_indices]) 

270 cols = np.concatenate([cols, zero_indices]) 

271 data = np.concatenate([data, np.ones(len(zero_indices))]) 

272 

273 # Create the sparse matrix 

274 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n)) 

275 

276 # remove diffusion from signal with inverse of the convolution matrix 

277 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0] 

278 

279 # Apply the filter 

280 return conv_matrix.dot(input_signal) 

281 

282 

283def deconvolve_diffusion(output_signal, sigma_array, truncate=4.0): 

284 """Apply Gaussian deconvolution with position-dependent sigma values. 

285 

286 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard 

287 deviation (sigma) of the Gaussian kernel to vary at each point in the signal. 

288 It implements the filter using a sparse convolution matrix where each row 

289 represents a Gaussian kernel with a locally-appropriate standard deviation. 

290 

291 Parameters 

292 ---------- 

293 output_signal : ndarray 

294 One-dimensional input array to be filtered. 

295 sigma_array : ndarray 

296 One-dimensional array of standard deviation values, must have same length 

297 as output_signal. Each value specifies the Gaussian kernel width at the 

298 corresponding position. 

299 truncate : float, optional 

300 Truncate the filter at this many standard deviations. 

301 Default is 4.0. 

302 

303 Returns 

304 ------- 

305 ndarray 

306 The filtered output signal. Has the same shape as output_signal. 

307 """ 

308 msg = "Deconvolution is not implemented yet" 

309 raise NotImplementedError(msg) 

310 

311 

312def create_example_data(nx=1000, domain_length=10.0, diffusivity=0.1): 

313 """Create example data for demonstrating variable-sigma diffusion. 

314 

315 Parameters 

316 ---------- 

317 nx : int, optional 

318 Number of spatial points. Default is 1000. 

319 domain_length : float, optional 

320 Domain length. Default is 10.0. 

321 diffusivity : float, optional 

322 diffusivity. Default is 0.1. 

323 

324 Returns 

325 ------- 

326 x : ndarray 

327 Spatial coordinates. 

328 signal : ndarray 

329 Initial signal (sum of two Gaussians). 

330 sigma_array : ndarray 

331 Array of sigma values varying in space. 

332 dt : ndarray 

333 Array of time steps varying in space. 

334 

335 Notes 

336 ----- 

337 This function creates a test case with: 

338 - A signal composed of two Gaussian peaks 

339 - Sinusoidally varying time steps 

340 - Corresponding sigma values for diffusion 

341 """ 

342 # Create spatial grid 

343 x = np.linspace(0, domain_length, nx) 

344 dx = x[1] - x[0] 

345 

346 # Create initial signal (two Gaussians) 

347 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx) 

348 

349 # Create varying time steps 

350 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length)) 

351 

352 # Calculate corresponding sigma values 

353 sigma_array = np.sqrt(2 * diffusivity * dt) / dx 

354 

355 return x, signal, sigma_array, dt 

356 

357 

358if __name__ == "__main__": 

359 from matplotlib import pyplot as plt 

360 

361 # Generate example data 

362 x, signal, sigma_array, dt = create_example_data() 

363 

364 # Apply variable-sigma filtering 

365 filtered = convolve_diffusion(signal, sigma_array * 5) 

366 

367 # Compare with regular Gaussian filter 

368 avg_sigma = np.mean(sigma_array) 

369 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma) 

370 plt.figure(figsize=(10, 6)) 

371 plt.plot(x, signal, label="Original signal", lw=0.8) 

372 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0) 

373 

374 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")