Coverage for src/gwtransport/diffusion.py: 57%
76 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-06 15:45 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-06 15:45 +0000
1"""Module that implements diffusion."""
3import matplotlib.pyplot as plt
4import numpy as np
5import pandas as pd
6from scipy import ndimage, sparse
8from gwtransport.residence_time import residence_time
9from gwtransport.utils import diff
12def forward(
13 cin,
14 flow,
15 aquifer_pore_volume,
16 diffusivity=0.1,
17 retardation_factor=1.0,
18 aquifer_length=80.0,
19 porosity=0.35,
20):
21 """Compute the diffusion of a compound during 1D transport in the aquifer.
23 This function represents a forward operation (equivalent to convolution).
25 Parameters
26 ----------
27 cin : pandas.Series
28 Concentration or temperature of the compound in the infiltrating water [ng/m3].
29 flow : pandas.Series
30 Flow rate of water in the aquifer [m3/day].
31 aquifer_pore_volume : float
32 Pore volume of the aquifer [m3].
33 diffusivity : float, optional
34 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
35 retardation_factor : float, optional
36 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
37 aquifer_length : float, optional
38 Length of the aquifer [m]. Default is 80.0.
39 porosity : float, optional
40 Porosity of the aquifer [dimensionless]. Default is 0.35.
42 Returns
43 -------
44 pandas.Series
45 Concentration of the compound in the extracted water [ng/m3].
46 """
47 sigma_array = compute_sigma_array(
48 flow=flow,
49 aquifer_pore_volume=aquifer_pore_volume,
50 diffusivity=diffusivity,
51 retardation_factor=retardation_factor,
52 aquifer_length=aquifer_length,
53 porosity=porosity,
54 )
55 return convolve_diffusion(cin.values, sigma_array, truncate=30.0)
58def backward(
59 cout,
60 flow,
61 aquifer_pore_volume,
62 diffusivity=0.1,
63 retardation_factor=1.0,
64 aquifer_length=80.0,
65 porosity=0.35,
66):
67 """Compute the reverse diffusion of a compound during 1D transport in the aquifer.
69 This function represents a backward operation (equivalent to deconvolution).
71 Parameters
72 ----------
73 cout : pandas.Series
74 Concentration or temperature of the compound in the extracted water [ng/m3].
75 flow : pandas.Series
76 Flow rate of water in the aquifer [m3/day].
77 aquifer_pore_volume : float
78 Pore volume of the aquifer [m3].
79 diffusivity : float, optional
80 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
81 retardation_factor : float, optional
82 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
83 aquifer_length : float, optional
84 Length of the aquifer [m]. Default is 80.0.
85 porosity : float, optional
86 Porosity of the aquifer [dimensionless]. Default is 0.35.
88 Returns
89 -------
90 pandas.Series
91 Concentration of the compound in the infiltrating water [ng/m3].
93 Notes
94 -----
95 Backward diffusion (deconvolution) is mathematically ill-posed and requires
96 regularization to obtain a stable solution.
97 """
98 msg = "Backward diffusion (deconvolution) is not implemented yet"
99 raise NotImplementedError(msg)
102def compute_sigma_array(
103 flow, aquifer_pore_volume, diffusivity=0.1, retardation_factor=1.0, aquifer_length=80.0, porosity=0.35
104):
105 """Compute sigma values for diffusion based on flow and aquifer properties.
107 Parameters
108 ----------
109 flow : pandas.Series
110 Flow rate of water in the aquifer [m3/day].
111 aquifer_pore_volume : float
112 Pore volume of the aquifer [m3].
113 diffusivity : float, optional
114 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
115 retardation_factor : float, optional
116 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
117 aquifer_length : float, optional
118 Length of the aquifer [m]. Default is 80.0.
119 porosity : float, optional
120 Porosity of the aquifer [dimensionless]. Default is 0.35.
122 Returns
123 -------
124 array
125 Array of sigma values for diffusion.
126 """
127 residence_time = residence_time(
128 flow=flow,
129 aquifer_pore_volume=aquifer_pore_volume,
130 retardation_factor=retardation_factor,
131 direction="infiltration",
132 return_pandas_series=True,
133 )
134 residence_time = residence_time.interpolate(method="nearest").ffill().bfill()
135 timedelta_at_departure = diff(flow.index, alignment="right") / pd.to_timedelta(1, unit="D")
136 volume_infiltrated_at_departure = flow * timedelta_at_departure
137 cross_sectional_area = aquifer_pore_volume / aquifer_length
138 dx = volume_infiltrated_at_departure / cross_sectional_area / porosity
139 sigma_array = np.sqrt(2 * diffusivity * residence_time) / dx
140 return np.clip(a=sigma_array.values, a_min=0.0, a_max=100)
143def convolve_diffusion(input_signal, sigma_array, truncate=4.0):
144 """Apply Gaussian filter with position-dependent sigma values.
146 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
147 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
148 It implements the filter using a sparse convolution matrix where each row
149 represents a Gaussian kernel with a locally-appropriate standard deviation.
151 Parameters
152 ----------
153 input_signal : ndarray
154 One-dimensional input array to be filtered.
155 sigma_array : ndarray
156 One-dimensional array of standard deviation values, must have same length
157 as input_signal. Each value specifies the Gaussian kernel width at the
158 corresponding position.
159 truncate : float, optional
160 Truncate the filter at this many standard deviations.
161 Default is 4.0.
163 Returns
164 -------
165 ndarray
166 The filtered input signal. Has the same shape as input_signal.
168 Notes
169 -----
170 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest`
171 in `scipy.ndimage.gaussian_filter1d`.
173 The function constructs a sparse convolution matrix where each row represents
174 a position-specific Gaussian kernel. The kernel width adapts to local sigma
175 values, making it suitable for problems with varying diffusivitys
176 or time steps.
178 For diffusion problems, the local sigma values can be calculated as:
179 sigma = sqrt(2 * diffusivity * dt) / dx
180 where diffusivity is the diffusivity, dt is the time step, and dx is the
181 spatial step size.
183 The implementation uses sparse matrices for memory efficiency when dealing
184 with large signals or when sigma values vary significantly.
186 See Also
187 --------
188 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering
189 scipy.sparse : Sparse matrix implementations
191 Examples
192 --------
193 >>> # Create a sample signal
194 >>> x = np.linspace(0, 10, 1000)
195 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5)
197 >>> # Create position-dependent sigma values
198 >>> diffusivity = 0.1 # diffusivity
199 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps
200 >>> dx = x[1] - x[0]
201 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx
203 >>> # Apply the filter
204 >>> filtered = convolve_diffusion(signal, sigma_array)
205 """
206 if len(input_signal) != len(sigma_array):
207 msg = "Input signal and sigma array must have the same length"
208 raise ValueError(msg)
210 n = len(input_signal)
212 # Handle zero sigma values
213 zero_mask = sigma_array == 0
214 if np.all(zero_mask):
215 return input_signal.copy()
217 # Get maximum kernel size and create position arrays
218 max_sigma = np.max(sigma_array)
219 max_radius = int(truncate * max_sigma + 0.5)
221 # Create arrays for all possible kernel positions
222 positions = np.arange(-max_radius, max_radius + 1)
224 # Create a mask for valid sigma values
225 valid_sigma = ~zero_mask
226 valid_indices = np.where(valid_sigma)[0]
228 # Create position matrices for broadcasting
229 # Shape: (n_valid_points, 1)
230 center_positions = valid_indices[:, np.newaxis]
231 # Shape: (1, max_kernel_size)
232 kernel_positions = positions[np.newaxis, :]
234 # Calculate the relative positions for each point
235 # This creates a matrix of shape (n_valid_points, max_kernel_size)
236 relative_positions = kernel_positions
238 # Calculate Gaussian weights for all positions at once
239 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size)
240 sigmas = sigma_array[valid_sigma][:, np.newaxis]
241 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2)
243 # Normalize each kernel
244 weights /= np.sum(weights, axis=1)[:, np.newaxis]
246 # Calculate absolute positions in the signal
247 absolute_positions = center_positions + relative_positions
249 # Handle boundary conditions
250 absolute_positions = np.clip(absolute_positions, 0, n - 1)
252 # Create coordinate arrays for sparse matrix
253 rows = np.repeat(center_positions, weights.shape[1])
254 cols = absolute_positions.ravel()
255 data = weights.ravel()
257 # Remove zero weights to save memory
258 nonzero_mask = data != 0
259 rows = rows[nonzero_mask]
260 cols = cols[nonzero_mask]
261 data = data[nonzero_mask]
263 # Add identity matrix elements for zero-sigma positions
264 if np.any(zero_mask):
265 zero_indices = np.where(zero_mask)[0]
266 rows = np.concatenate([rows, zero_indices])
267 cols = np.concatenate([cols, zero_indices])
268 data = np.concatenate([data, np.ones(len(zero_indices))])
270 # Create the sparse matrix
271 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n))
273 # remove diffusion from signal with inverse of the convolution matrix
274 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0]
276 # Apply the filter
277 return conv_matrix.dot(input_signal)
280def deconvolve_diffusion(output_signal, sigma_array, truncate=4.0):
281 """Apply Gaussian deconvolution with position-dependent sigma values.
283 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
284 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
285 It implements the filter using a sparse convolution matrix where each row
286 represents a Gaussian kernel with a locally-appropriate standard deviation.
288 Parameters
289 ----------
290 output_signal : ndarray
291 One-dimensional input array to be filtered.
292 sigma_array : ndarray
293 One-dimensional array of standard deviation values, must have same length
294 as output_signal. Each value specifies the Gaussian kernel width at the
295 corresponding position.
296 truncate : float, optional
297 Truncate the filter at this many standard deviations.
298 Default is 4.0.
300 Returns
301 -------
302 ndarray
303 The filtered output signal. Has the same shape as output_signal.
304 """
305 msg = "Deconvolution is not implemented yet"
306 raise NotImplementedError(msg)
309def create_example_data(nx=1000, domain_length=10.0, diffusivity=0.1):
310 """Create example data for demonstrating variable-sigma diffusion.
312 Parameters
313 ----------
314 nx : int, optional
315 Number of spatial points. Default is 1000.
316 domain_length : float, optional
317 Domain length. Default is 10.0.
318 diffusivity : float, optional
319 diffusivity. Default is 0.1.
321 Returns
322 -------
323 x : ndarray
324 Spatial coordinates.
325 signal : ndarray
326 Initial signal (sum of two Gaussians).
327 sigma_array : ndarray
328 Array of sigma values varying in space.
329 dt : ndarray
330 Array of time steps varying in space.
332 Notes
333 -----
334 This function creates a test case with:
335 - A signal composed of two Gaussian peaks
336 - Sinusoidally varying time steps
337 - Corresponding sigma values for diffusion
338 """
339 # Create spatial grid
340 x = np.linspace(0, domain_length, nx)
341 dx = x[1] - x[0]
343 # Create initial signal (two Gaussians)
344 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx)
346 # Create varying time steps
347 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length))
349 # Calculate corresponding sigma values
350 sigma_array = np.sqrt(2 * diffusivity * dt) / dx
352 return x, signal, sigma_array, dt
355if __name__ == "__main__":
356 from matplotlib import pyplot as plt
358 # Generate example data
359 x, signal, sigma_array, dt = create_example_data()
361 # Apply variable-sigma filtering
362 filtered = convolve_diffusion(signal, sigma_array * 5)
364 # Compare with regular Gaussian filter
365 avg_sigma = np.mean(sigma_array)
366 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma)
367 plt.figure(figsize=(10, 6))
368 plt.plot(x, signal, label="Original signal", lw=0.8)
369 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0)
371 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")