Coverage for src/gwtransport/diffusion.py: 0%
77 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-22 18:30 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-22 18:30 +0000
1"""Module that implements diffusion."""
3import matplotlib.pyplot as plt
4import numpy as np
5import pandas as pd
6from scipy import ndimage, sparse
8from gwtransport.residence_time import residence_time
9from gwtransport.utils import compute_time_edges, diff
12def forward(
13 cin,
14 flow,
15 aquifer_pore_volume,
16 diffusivity=0.1,
17 retardation_factor=1.0,
18 aquifer_length=80.0,
19 porosity=0.35,
20):
21 """Compute the diffusion of a compound during 1D transport in the aquifer.
23 This function represents a forward operation (equivalent to convolution).
25 Parameters
26 ----------
27 cin : pandas.Series
28 Concentration or temperature of the compound in the infiltrating water [ng/m3].
29 flow : pandas.Series
30 Flow rate of water in the aquifer [m3/day].
31 aquifer_pore_volume : float
32 Pore volume of the aquifer [m3].
33 diffusivity : float, optional
34 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
35 retardation_factor : float, optional
36 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
37 aquifer_length : float, optional
38 Length of the aquifer [m]. Default is 80.0.
39 porosity : float, optional
40 Porosity of the aquifer [dimensionless]. Default is 0.35.
42 Returns
43 -------
44 numpy.ndarray
45 Concentration of the compound in the extracted water [ng/m3].
46 """
47 sigma_array = compute_sigma_array(
48 flow=flow,
49 aquifer_pore_volume=aquifer_pore_volume,
50 diffusivity=diffusivity,
51 retardation_factor=retardation_factor,
52 aquifer_length=aquifer_length,
53 porosity=porosity,
54 )
55 return convolve_diffusion(cin.values, sigma_array, truncate=30.0)
58def backward(
59 cout,
60 flow,
61 aquifer_pore_volume,
62 diffusivity=0.1,
63 retardation_factor=1.0,
64 aquifer_length=80.0,
65 porosity=0.35,
66):
67 """Compute the reverse diffusion of a compound during 1D transport in the aquifer.
69 This function represents a backward operation (equivalent to deconvolution).
71 Parameters
72 ----------
73 cout : pandas.Series
74 Concentration or temperature of the compound in the extracted water [ng/m3].
75 flow : pandas.Series
76 Flow rate of water in the aquifer [m3/day].
77 aquifer_pore_volume : float
78 Pore volume of the aquifer [m3].
79 diffusivity : float, optional
80 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
81 retardation_factor : float, optional
82 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
83 aquifer_length : float, optional
84 Length of the aquifer [m]. Default is 80.0.
85 porosity : float, optional
86 Porosity of the aquifer [dimensionless]. Default is 0.35.
88 Returns
89 -------
90 numpy.ndarray
91 Concentration of the compound in the infiltrating water [ng/m3].
93 Notes
94 -----
95 Backward diffusion (deconvolution) is mathematically ill-posed and requires
96 regularization to obtain a stable solution.
97 """
98 msg = "Backward diffusion (deconvolution) is not implemented yet"
99 raise NotImplementedError(msg)
102def compute_sigma_array(
103 flow, aquifer_pore_volume, diffusivity=0.1, retardation_factor=1.0, aquifer_length=80.0, porosity=0.35
104):
105 """Compute sigma values for diffusion based on flow and aquifer properties.
107 Parameters
108 ----------
109 flow : pandas.Series
110 Flow rate of water in the aquifer [m3/day].
111 aquifer_pore_volume : float
112 Pore volume of the aquifer [m3].
113 diffusivity : float, optional
114 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
115 retardation_factor : float, optional
116 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
117 aquifer_length : float, optional
118 Length of the aquifer [m]. Default is 80.0.
119 porosity : float, optional
120 Porosity of the aquifer [dimensionless]. Default is 0.35.
122 Returns
123 -------
124 array
125 Array of sigma values for diffusion.
126 """
127 # Create flow tedges from the flow series index (assuming it's at the end of bins)
128 flow_tedges = compute_time_edges(tedges=None, tstart=None, tend=flow.index, number_of_bins=len(flow))
129 residence_time = residence_time(
130 flow=flow,
131 flow_tedges=flow_tedges,
132 aquifer_pore_volume=aquifer_pore_volume,
133 retardation_factor=retardation_factor,
134 direction="infiltration",
135 return_pandas_series=True,
136 )
137 residence_time = residence_time.interpolate(method="nearest").ffill().bfill()
138 timedelta_at_departure = diff(flow.index, alignment="right") / pd.to_timedelta(1, unit="D")
139 volume_infiltrated_at_departure = flow * timedelta_at_departure
140 cross_sectional_area = aquifer_pore_volume / aquifer_length
141 dx = volume_infiltrated_at_departure / cross_sectional_area / porosity
142 sigma_array = np.sqrt(2 * diffusivity * residence_time) / dx
143 return np.clip(a=sigma_array.values, a_min=0.0, a_max=100)
146def convolve_diffusion(input_signal, sigma_array, truncate=4.0):
147 """Apply Gaussian filter with position-dependent sigma values.
149 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
150 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
151 It implements the filter using a sparse convolution matrix where each row
152 represents a Gaussian kernel with a locally-appropriate standard deviation.
154 Parameters
155 ----------
156 input_signal : ndarray
157 One-dimensional input array to be filtered.
158 sigma_array : ndarray
159 One-dimensional array of standard deviation values, must have same length
160 as input_signal. Each value specifies the Gaussian kernel width at the
161 corresponding position.
162 truncate : float, optional
163 Truncate the filter at this many standard deviations.
164 Default is 4.0.
166 Returns
167 -------
168 ndarray
169 The filtered input signal. Has the same shape as input_signal.
171 Notes
172 -----
173 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest`
174 in `scipy.ndimage.gaussian_filter1d`.
176 The function constructs a sparse convolution matrix where each row represents
177 a position-specific Gaussian kernel. The kernel width adapts to local sigma
178 values, making it suitable for problems with varying diffusivitys
179 or time steps.
181 For diffusion problems, the local sigma values can be calculated as:
182 sigma = sqrt(2 * diffusivity * dt) / dx
183 where diffusivity is the diffusivity, dt is the time step, and dx is the
184 spatial step size.
186 The implementation uses sparse matrices for memory efficiency when dealing
187 with large signals or when sigma values vary significantly.
189 See Also
190 --------
191 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering
192 scipy.sparse : Sparse matrix implementations
194 Examples
195 --------
196 >>> # Create a sample signal
197 >>> x = np.linspace(0, 10, 1000)
198 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5)
200 >>> # Create position-dependent sigma values
201 >>> diffusivity = 0.1 # diffusivity
202 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps
203 >>> dx = x[1] - x[0]
204 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx
206 >>> # Apply the filter
207 >>> filtered = convolve_diffusion(signal, sigma_array)
208 """
209 if len(input_signal) != len(sigma_array):
210 msg = "Input signal and sigma array must have the same length"
211 raise ValueError(msg)
213 n = len(input_signal)
215 # Handle zero sigma values
216 zero_mask = sigma_array == 0
217 if np.all(zero_mask):
218 return input_signal.copy()
220 # Get maximum kernel size and create position arrays
221 max_sigma = np.max(sigma_array)
222 max_radius = int(truncate * max_sigma + 0.5)
224 # Create arrays for all possible kernel positions
225 positions = np.arange(-max_radius, max_radius + 1)
227 # Create a mask for valid sigma values
228 valid_sigma = ~zero_mask
229 valid_indices = np.where(valid_sigma)[0]
231 # Create position matrices for broadcasting
232 # Shape: (n_valid_points, 1)
233 center_positions = valid_indices[:, np.newaxis]
234 # Shape: (1, max_kernel_size)
235 kernel_positions = positions[np.newaxis, :]
237 # Calculate the relative positions for each point
238 # This creates a matrix of shape (n_valid_points, max_kernel_size)
239 relative_positions = kernel_positions
241 # Calculate Gaussian weights for all positions at once
242 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size)
243 sigmas = sigma_array[valid_sigma][:, np.newaxis]
244 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2)
246 # Normalize each kernel
247 weights /= np.sum(weights, axis=1)[:, np.newaxis]
249 # Calculate absolute positions in the signal
250 absolute_positions = center_positions + relative_positions
252 # Handle boundary conditions
253 absolute_positions = np.clip(absolute_positions, 0, n - 1)
255 # Create coordinate arrays for sparse matrix
256 rows = np.repeat(center_positions, weights.shape[1])
257 cols = absolute_positions.ravel()
258 data = weights.ravel()
260 # Remove zero weights to save memory
261 nonzero_mask = data != 0
262 rows = rows[nonzero_mask]
263 cols = cols[nonzero_mask]
264 data = data[nonzero_mask]
266 # Add identity matrix elements for zero-sigma positions
267 if np.any(zero_mask):
268 zero_indices = np.where(zero_mask)[0]
269 rows = np.concatenate([rows, zero_indices])
270 cols = np.concatenate([cols, zero_indices])
271 data = np.concatenate([data, np.ones(len(zero_indices))])
273 # Create the sparse matrix
274 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n))
276 # remove diffusion from signal with inverse of the convolution matrix
277 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0]
279 # Apply the filter
280 return conv_matrix.dot(input_signal)
283def deconvolve_diffusion(output_signal, sigma_array, truncate=4.0):
284 """Apply Gaussian deconvolution with position-dependent sigma values.
286 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
287 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
288 It implements the filter using a sparse convolution matrix where each row
289 represents a Gaussian kernel with a locally-appropriate standard deviation.
291 Parameters
292 ----------
293 output_signal : ndarray
294 One-dimensional input array to be filtered.
295 sigma_array : ndarray
296 One-dimensional array of standard deviation values, must have same length
297 as output_signal. Each value specifies the Gaussian kernel width at the
298 corresponding position.
299 truncate : float, optional
300 Truncate the filter at this many standard deviations.
301 Default is 4.0.
303 Returns
304 -------
305 ndarray
306 The filtered output signal. Has the same shape as output_signal.
307 """
308 msg = "Deconvolution is not implemented yet"
309 raise NotImplementedError(msg)
312def create_example_data(nx=1000, domain_length=10.0, diffusivity=0.1):
313 """Create example data for demonstrating variable-sigma diffusion.
315 Parameters
316 ----------
317 nx : int, optional
318 Number of spatial points. Default is 1000.
319 domain_length : float, optional
320 Domain length. Default is 10.0.
321 diffusivity : float, optional
322 diffusivity. Default is 0.1.
324 Returns
325 -------
326 x : ndarray
327 Spatial coordinates.
328 signal : ndarray
329 Initial signal (sum of two Gaussians).
330 sigma_array : ndarray
331 Array of sigma values varying in space.
332 dt : ndarray
333 Array of time steps varying in space.
335 Notes
336 -----
337 This function creates a test case with:
338 - A signal composed of two Gaussian peaks
339 - Sinusoidally varying time steps
340 - Corresponding sigma values for diffusion
341 """
342 # Create spatial grid
343 x = np.linspace(0, domain_length, nx)
344 dx = x[1] - x[0]
346 # Create initial signal (two Gaussians)
347 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx)
349 # Create varying time steps
350 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length))
352 # Calculate corresponding sigma values
353 sigma_array = np.sqrt(2 * diffusivity * dt) / dx
355 return x, signal, sigma_array, dt
358if __name__ == "__main__":
359 from matplotlib import pyplot as plt
361 # Generate example data
362 x, signal, sigma_array, dt = create_example_data()
364 # Apply variable-sigma filtering
365 filtered = convolve_diffusion(signal, sigma_array * 5)
367 # Compare with regular Gaussian filter
368 avg_sigma = np.mean(sigma_array)
369 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma)
370 plt.figure(figsize=(10, 6))
371 plt.plot(x, signal, label="Original signal", lw=0.8)
372 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0)
374 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")