Coverage for src/gwtransport/diffusion.py: 57%
75 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 20:01 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 20:01 +0000
1"""Module that implements diffusion."""
3import matplotlib.pyplot as plt
4import numpy as np
5import pandas as pd
6from scipy import ndimage, sparse
8from gwtransport.residence_time import residence_time
9from gwtransport.utils import compute_time_edges, diff
12def infiltration_to_extraction(
13 cin: pd.Series,
14 flow: pd.Series,
15 aquifer_pore_volume: float,
16 diffusivity: float = 0.1,
17 retardation_factor: float = 1.0,
18 aquifer_length: float = 80.0,
19 porosity: float = 0.35,
20) -> pd.Series:
21 """Compute the diffusion of a compound during 1D transport in the aquifer.
23 This function represents infiltration to extraction modeling (equivalent to convolution).
25 Parameters
26 ----------
27 cin : pandas.Series
28 Concentration or temperature of the compound in the infiltrating water [ng/m3].
29 flow : pandas.Series
30 Flow rate of water in the aquifer [m3/day].
31 aquifer_pore_volume : float
32 Pore volume of the aquifer [m3].
33 diffusivity : float, optional
34 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
35 retardation_factor : float, optional
36 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
37 aquifer_length : float, optional
38 Length of the aquifer [m]. Default is 80.0.
39 porosity : float, optional
40 Porosity of the aquifer [dimensionless]. Default is 0.35.
42 Returns
43 -------
44 numpy.ndarray
45 Concentration of the compound in the extracted water [ng/m3].
46 """
47 sigma_array = compute_sigma_array(
48 flow=flow,
49 aquifer_pore_volume=aquifer_pore_volume,
50 diffusivity=diffusivity,
51 retardation_factor=retardation_factor,
52 aquifer_length=aquifer_length,
53 porosity=porosity,
54 )
55 return convolve_diffusion(cin.values, sigma_array, truncate=30.0)
58def extraction_to_infiltration(
59 cout,
60 flow,
61 aquifer_pore_volume,
62 diffusivity=0.1,
63 retardation_factor=1.0,
64 aquifer_length=80.0,
65 porosity=0.35,
66):
67 """Compute the reverse diffusion of a compound during 1D transport in the aquifer.
69 This function represents extraction to infiltration modeling (equivalent to deconvolution).
71 Parameters
72 ----------
73 cout : pandas.Series
74 Concentration or temperature of the compound in the extracted water [ng/m3].
75 flow : pandas.Series
76 Flow rate of water in the aquifer [m3/day].
77 aquifer_pore_volume : float
78 Pore volume of the aquifer [m3].
79 diffusivity : float, optional
80 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
81 retardation_factor : float, optional
82 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
83 aquifer_length : float, optional
84 Length of the aquifer [m]. Default is 80.0.
85 porosity : float, optional
86 Porosity of the aquifer [dimensionless]. Default is 0.35.
88 Returns
89 -------
90 numpy.ndarray
91 Concentration of the compound in the infiltrating water [ng/m3].
93 Notes
94 -----
95 Extraction to infiltration diffusion (deconvolution) is mathematically ill-posed and requires
96 regularization to obtain a stable solution.
97 """
98 msg = "Extraction to infiltration diffusion (deconvolution) is not implemented yet"
99 raise NotImplementedError(msg)
102def compute_sigma_array(
103 flow: pd.Series,
104 aquifer_pore_volume: float,
105 diffusivity: float = 0.1,
106 retardation_factor: float = 1.0,
107 aquifer_length: float = 80.0,
108 porosity: float = 0.35,
109) -> np.ndarray:
110 """Compute sigma values for diffusion based on flow and aquifer properties.
112 Parameters
113 ----------
114 flow : pandas.Series
115 Flow rate of water in the aquifer [m3/day].
116 aquifer_pore_volume : float
117 Pore volume of the aquifer [m3].
118 diffusivity : float, optional
119 diffusivity of the compound in the aquifer [m2/day]. Default is 0.1.
120 retardation_factor : float, optional
121 Retardation factor of the compound in the aquifer [dimensionless]. Default is 1.0.
122 aquifer_length : float, optional
123 Length of the aquifer [m]. Default is 80.0.
124 porosity : float, optional
125 Porosity of the aquifer [dimensionless]. Default is 0.35.
127 Returns
128 -------
129 numpy.ndarray
130 Array of sigma values for diffusion.
131 """
132 # Create flow tedges from the flow series index (assuming it's at the end of bins)
133 flow_tedges = compute_time_edges(
134 tedges=None, tstart=None, tend=pd.DatetimeIndex(flow.index), number_of_bins=len(flow)
135 )
136 residence_time_series = residence_time(
137 flow=flow,
138 flow_tedges=flow_tedges,
139 aquifer_pore_volume=aquifer_pore_volume,
140 retardation_factor=retardation_factor,
141 direction="infiltration_to_extraction",
142 return_pandas_series=True,
143 )
144 residence_time_series = residence_time_series.interpolate(method="nearest").ffill().bfill()
145 timedelta_at_departure = diff(flow.index, alignment="right") / pd.to_timedelta(1, unit="D")
146 volume_infiltrated_at_departure = flow * timedelta_at_departure
147 cross_sectional_area = aquifer_pore_volume / aquifer_length
148 dx = volume_infiltrated_at_departure / cross_sectional_area / porosity
149 sigma_array = np.sqrt(2 * diffusivity * residence_time_series) / dx
150 return np.clip(a=sigma_array.values, a_min=0.0, a_max=100)
153def convolve_diffusion(input_signal, sigma_array, truncate=4.0):
154 """Apply Gaussian filter with position-dependent sigma values.
156 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
157 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
158 It implements the filter using a sparse convolution matrix where each row
159 represents a Gaussian kernel with a locally-appropriate standard deviation.
161 Parameters
162 ----------
163 input_signal : numpy.ndarray
164 One-dimensional input array to be filtered.
165 sigma_array : numpy.ndarray
166 One-dimensional array of standard deviation values, must have same length
167 as input_signal. Each value specifies the Gaussian kernel width at the
168 corresponding position.
169 truncate : float, optional
170 Truncate the filter at this many standard deviations.
171 Default is 4.0.
173 Returns
174 -------
175 numpy.ndarray
176 The filtered input signal. Has the same shape as input_signal.
178 Notes
179 -----
180 At the boundaries, the outer values are repeated to avoid edge effects. Equal to mode=`nearest`
181 in `scipy.ndimage.gaussian_filter1d`.
183 The function constructs a sparse convolution matrix where each row represents
184 a position-specific Gaussian kernel. The kernel width adapts to local sigma
185 values, making it suitable for problems with varying diffusivitys
186 or time steps.
188 For diffusion problems, the local sigma values can be calculated as:
189 sigma = sqrt(2 * diffusivity * dt) / dx
190 where diffusivity is the diffusivity, dt is the time step, and dx is the
191 spatial step size.
193 The implementation uses sparse matrices for memory efficiency when dealing
194 with large signals or when sigma values vary significantly.
196 See Also
197 --------
198 scipy.ndimage.gaussian_filter1d : Fixed-sigma Gaussian filtering
199 scipy.sparse : Sparse matrix implementations
201 Examples
202 --------
203 >>> # Create a sample signal
204 >>> x = np.linspace(0, 10, 1000)
205 >>> signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5)
207 >>> # Create position-dependent sigma values
208 >>> diffusivity = 0.1 # diffusivity
209 >>> dt = 0.001 * (1 + np.sin(2 * np.pi * x / 10)) # Varying time steps
210 >>> dx = x[1] - x[0]
211 >>> sigma_array = np.sqrt(2 * diffusivity * dt) / dx
213 >>> # Apply the filter
214 >>> filtered = convolve_diffusion(signal, sigma_array)
215 """
216 if len(input_signal) != len(sigma_array):
217 msg = "Input signal and sigma array must have the same length"
218 raise ValueError(msg)
220 n = len(input_signal)
222 # Handle zero sigma values
223 zero_mask = sigma_array == 0
224 if np.all(zero_mask):
225 return input_signal.copy()
227 # Get maximum kernel size and create position arrays
228 max_sigma = np.max(sigma_array)
229 max_radius = int(truncate * max_sigma + 0.5)
231 # Create arrays for all possible kernel positions
232 positions = np.arange(-max_radius, max_radius + 1)
234 # Create a mask for valid sigma values
235 valid_sigma = ~zero_mask
236 valid_indices = np.where(valid_sigma)[0]
238 # Create position matrices for broadcasting
239 # Shape: (n_valid_points, 1)
240 center_positions = valid_indices[:, np.newaxis]
241 # Shape: (1, max_kernel_size)
242 kernel_positions = positions[np.newaxis, :]
244 # Calculate the relative positions for each point
245 # This creates a matrix of shape (n_valid_points, max_kernel_size)
246 relative_positions = kernel_positions
248 # Calculate Gaussian weights for all positions at once
249 # Using broadcasting to create a matrix of shape (n_valid_points, max_kernel_size)
250 sigmas = sigma_array[valid_sigma][:, np.newaxis]
251 weights = np.exp(-0.5 * (relative_positions / sigmas) ** 2)
253 # Normalize each kernel
254 weights /= np.sum(weights, axis=1)[:, np.newaxis]
256 # Calculate absolute positions in the signal
257 absolute_positions = center_positions + relative_positions
259 # Handle boundary conditions
260 absolute_positions = np.clip(absolute_positions, 0, n - 1)
262 # Create coordinate arrays for sparse matrix
263 rows = np.repeat(center_positions, weights.shape[1])
264 cols = absolute_positions.ravel()
265 data = weights.ravel()
267 # Remove zero weights to save memory
268 nonzero_mask = data != 0
269 rows = rows[nonzero_mask]
270 cols = cols[nonzero_mask]
271 data = data[nonzero_mask]
273 # Add identity matrix elements for zero-sigma positions
274 if np.any(zero_mask):
275 zero_indices = np.where(zero_mask)[0]
276 rows = np.concatenate([rows, zero_indices])
277 cols = np.concatenate([cols, zero_indices])
278 data = np.concatenate([data, np.ones(len(zero_indices))])
280 # Create the sparse matrix
281 conv_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(n, n))
283 # remove diffusion from signal with inverse of the convolution matrix
284 # conv_matrix_inv = np.linalg.lstsq(conv_matrix.todense(), np.eye(n), rcond=None)[0]
286 # Apply the filter
287 return conv_matrix.dot(input_signal)
290def deconvolve_diffusion(output_signal, sigma_array, truncate=4.0):
291 """Apply Gaussian deconvolution with position-dependent sigma values.
293 This function extends scipy.ndimage.gaussian_filter1d by allowing the standard
294 deviation (sigma) of the Gaussian kernel to vary at each point in the signal.
295 It implements the filter using a sparse convolution matrix where each row
296 represents a Gaussian kernel with a locally-appropriate standard deviation.
298 Parameters
299 ----------
300 output_signal : numpy.ndarray
301 One-dimensional input array to be filtered.
302 sigma_array : numpy.ndarray
303 One-dimensional array of standard deviation values, must have same length
304 as output_signal. Each value specifies the Gaussian kernel width at the
305 corresponding position.
306 truncate : float, optional
307 Truncate the filter at this many standard deviations.
308 Default is 4.0.
310 Returns
311 -------
312 numpy.ndarray
313 The filtered output signal. Has the same shape as output_signal.
314 """
315 msg = "Deconvolution is not implemented yet"
316 raise NotImplementedError(msg)
319def create_example_data(nx=1000, domain_length=10.0, diffusivity=0.1):
320 """Create example data for demonstrating variable-sigma diffusion.
322 Parameters
323 ----------
324 nx : int, optional
325 Number of spatial points. Default is 1000.
326 domain_length : float, optional
327 Domain length. Default is 10.0.
328 diffusivity : float, optional
329 diffusivity. Default is 0.1.
331 Returns
332 -------
333 x : numpy.ndarray
334 Spatial coordinates.
335 signal : numpy.ndarray
336 Initial signal (sum of two Gaussians).
337 sigma_array : numpy.ndarray
338 Array of sigma values varying in space.
339 dt : numpy.ndarray
340 Array of time steps varying in space.
342 Notes
343 -----
344 This function creates a test case with:
345 - A signal composed of two Gaussian peaks
346 - Sinusoidally varying time steps
347 - Corresponding sigma values for diffusion
348 """
349 # Create spatial grid
350 x = np.linspace(0, domain_length, nx)
351 dx = x[1] - x[0]
353 # Create initial signal (two Gaussians)
354 signal = np.exp(-((x - 3) ** 2)) + 0.5 * np.exp(-((x - 7) ** 2) / 0.5) + 0.1 * np.random.randn(nx)
356 # Create varying time steps
357 dt = 0.001 * (1 + np.sin(2 * np.pi * x / domain_length))
359 # Calculate corresponding sigma values
360 sigma_array = np.sqrt(2 * diffusivity * dt) / dx
362 return x, signal, sigma_array, dt
365if __name__ == "__main__":
366 from matplotlib import pyplot as plt
368 # Generate example data
369 x, signal, sigma_array, dt = create_example_data()
371 # Apply variable-sigma filtering
372 filtered = convolve_diffusion(signal, sigma_array * 5)
374 # Compare with regular Gaussian filter
375 avg_sigma = np.mean(sigma_array)
376 regular_filtered = ndimage.gaussian_filter1d(signal, avg_sigma)
377 plt.figure(figsize=(10, 6))
378 plt.plot(x, signal, label="Original signal", lw=0.8)
379 plt.plot(x, filtered, label="Variable-sigma filtered", lw=1.0)
381 plt.plot(x, regular_filtered, label="Regular Gaussian filter", lw=0.8, ls="--")