Coverage for src / gwtransport / advection_utils.py: 94%
63 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
1"""
2Private helper functions for advective transport modeling.
4This module contains internal helper functions used by the advection module.
5These functions implement various algorithms for computing transport weights
6and handling nonlinear sorption.
8This file is part of gwtransport which is released under AGPL-3.0 license.
9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
10"""
12import numpy as np
13import numpy.typing as npt
14import pandas as pd
16from gwtransport.residence_time import residence_time
17from gwtransport.utils import partial_isin
20def _infiltration_to_extraction_weights(
21 *,
22 tedges: pd.DatetimeIndex,
23 cout_tedges: pd.DatetimeIndex,
24 aquifer_pore_volumes: npt.NDArray[np.floating],
25 flow: npt.NDArray[np.floating],
26 retardation_factor: float,
27) -> npt.NDArray[np.floating]:
28 """
29 Compute normalized weights for linear infiltration to extraction transformation.
31 This helper function computes the weight matrix for constant retardation factor.
32 It handles the main advective transport calculation with flow-weighted averaging.
34 The resulting cout values represent volume-weighted (flow-weighted) bin averages,
35 where periods with higher infiltration flow rates contribute more to the output concentration.
37 Parameters
38 ----------
39 tedges : pandas.DatetimeIndex
40 Time edges for infiltration bins.
41 cout_tedges : pandas.DatetimeIndex
42 Time edges for extraction bins.
43 aquifer_pore_volumes : array-like
44 Distribution of pore volumes [m3].
45 flow : array-like
46 Flow rate values [m3/day].
47 retardation_factor : float
48 Constant retardation factor.
50 Returns
51 -------
52 numpy.ndarray
53 Normalized weight matrix. Shape: (len(cout_tedges) - 1, len(tedges) - 1)
54 """
55 # Convert time edges to days
56 cin_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values
57 cout_tedges_days = ((cout_tedges - tedges[0]) / pd.Timedelta(days=1)).values
59 # Pre-compute all residence times and infiltration edges
60 rt_edges_2d = residence_time(
61 flow=flow,
62 flow_tedges=tedges,
63 index=cout_tedges,
64 aquifer_pore_volumes=aquifer_pore_volumes,
65 retardation_factor=retardation_factor,
66 direction="extraction_to_infiltration",
67 )
68 infiltration_tedges_2d = cout_tedges_days[None, :] - rt_edges_2d
70 # Pre-compute valid bins
71 valid_bins_2d = ~(np.isnan(infiltration_tedges_2d[:, :-1]) | np.isnan(infiltration_tedges_2d[:, 1:]))
73 # Pre-compute cin time range for clip optimization (computed once, used n_bins times)
74 cin_time_min = cin_tedges_days[0]
75 cin_time_max = cin_tedges_days[-1]
77 # Accumulate flow-weighted overlap matrices from all pore volumes
78 # Each pore volume has equal probability (equal-mass bins from gamma distribution)
79 accumulated_weights = np.zeros((len(cout_tedges) - 1, len(tedges) - 1))
81 # Loop over each pore volume
82 for i in range(len(aquifer_pore_volumes)):
83 if not np.any(valid_bins_2d[i, :]):
84 continue
86 # Clip optimization: Check for temporal overlap before expensive computation
87 # Get the range of infiltration times for this pore volume (only valid bins)
88 infiltration_times = infiltration_tedges_2d[i, :]
89 valid_infiltration_times = infiltration_times[~np.isnan(infiltration_times)]
91 if len(valid_infiltration_times) == 0:
92 continue
94 infiltration_min = valid_infiltration_times[0] # Min is first element (monotonic)
95 infiltration_max = valid_infiltration_times[-1] # Max is last element (monotonic)
97 # Check if infiltration window overlaps with cin window
98 # Two intervals [a1, a2] and [b1, b2] overlap if: max(a1, b1) < min(a2, b2)
99 has_overlap = max(infiltration_min, cin_time_min) < min(infiltration_max, cin_time_max)
101 if not has_overlap:
102 # No temporal overlap - this bin contributes nothing, skip expensive computation
103 continue
105 # Compute overlap matrix for this pore volume
106 overlap_matrix = partial_isin(bin_edges_in=infiltration_tedges_2d[i, :], bin_edges_out=cin_tedges_days)
108 # Apply flow weighting to this pore volume's overlap matrix
109 flow_weighted_overlap = overlap_matrix * flow[None, :]
111 # Normalize this pore volume's contribution (each row sums to 1 after flow weighting)
112 row_sums = np.sum(flow_weighted_overlap, axis=1)
113 valid_rows_pv = row_sums > 0
114 normalized_overlap = np.zeros_like(flow_weighted_overlap)
115 normalized_overlap[valid_rows_pv, :] = flow_weighted_overlap[valid_rows_pv, :] / row_sums[valid_rows_pv, None]
117 # Accumulate only the valid bins from this pore volume
118 accumulated_weights[valid_bins_2d[i, :], :] += normalized_overlap[valid_bins_2d[i, :], :]
120 # Average across all pore volumes assuming equal probability per bin.
121 # This is correct when aquifer_pore_volumes comes from gamma.bins() which
122 # produces equal-probability bins. For user-supplied pore volumes with
123 # unequal probability mass, a weights parameter would be needed.
124 return accumulated_weights / len(aquifer_pore_volumes)
127def _extraction_to_infiltration_weights(
128 *,
129 tedges: pd.DatetimeIndex,
130 cin_tedges: pd.DatetimeIndex,
131 aquifer_pore_volumes: npt.NDArray[np.floating],
132 flow: npt.NDArray[np.floating],
133 retardation_factor: float,
134) -> npt.NDArray[np.floating]:
135 """
136 Compute extraction to infiltration transformation weights matrix.
138 This is a reverse-direction flow-weighted estimate, NOT a true deconvolution
139 or pseudo-inverse of the infiltration-to-extraction weights. The weight matrix
140 is constructed independently using the same pore-volume-based overlap approach
141 in reverse, so ``W_reverse @ (W_forward @ cin) != cin`` in general.
143 The algorithm mirrors ``_infiltration_to_extraction_weights`` but reverses the
144 temporal mapping direction: for each infiltration time bin, it finds which
145 extraction time bins contribute, weighted by flow rate.
147 The resulting cin values represent volume-weighted (flow-weighted) bin averages,
148 where periods with higher extraction flow rates contribute more to the reconstructed infiltration concentration.
150 Parameters
151 ----------
152 tedges : pandas.DatetimeIndex
153 Time edges for cout and flow data bins.
154 cin_tedges : pandas.DatetimeIndex
155 Time edges for output (infiltration) data bins.
156 aquifer_pore_volumes : array-like
157 Array of aquifer pore volumes [m3].
158 flow : array-like
159 Flow rate values in the aquifer [m3/day].
160 retardation_factor : float
161 Retardation factor of the compound in the aquifer.
163 Returns
164 -------
165 numpy.ndarray
166 Normalized weight matrix for extraction to infiltration transformation.
167 Shape: (len(cin_tedges) - 1, len(tedges) - 1)
168 """
169 # Convert time edges to days
170 cout_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values
171 cin_tedges_days = ((cin_tedges - tedges[0]) / pd.Timedelta(days=1)).values
173 # Pre-compute all residence times and extraction edges (symmetric to infiltration_to_extraction)
174 rt_edges_2d = residence_time(
175 flow=flow,
176 flow_tedges=tedges,
177 index=cin_tedges,
178 aquifer_pore_volumes=aquifer_pore_volumes,
179 retardation_factor=retardation_factor,
180 direction="infiltration_to_extraction", # Computing from infiltration perspective
181 )
182 extraction_tedges_2d = cin_tedges_days[None, :] + rt_edges_2d
184 # Pre-compute valid bins
185 valid_bins_2d = ~(np.isnan(extraction_tedges_2d[:, :-1]) | np.isnan(extraction_tedges_2d[:, 1:]))
187 # Pre-compute cout time range for clip optimization (computed once, used n_bins times)
188 cout_time_min = cout_tedges_days[0]
189 cout_time_max = cout_tedges_days[-1]
191 # Accumulate flow-weighted overlap matrices from all pore volumes
192 # Each pore volume has equal probability (equal-mass bins from gamma distribution)
193 accumulated_weights = np.zeros((len(cin_tedges) - 1, len(tedges) - 1))
195 # Loop over each pore volume (same structure as infiltration_to_extraction)
196 for i in range(len(aquifer_pore_volumes)):
197 if not np.any(valid_bins_2d[i, :]):
198 continue
200 # Clip optimization: Check for temporal overlap before expensive computation
201 # Get the range of extraction times for this pore volume (only valid bins)
202 extraction_times = extraction_tedges_2d[i, :]
203 valid_extraction_times = extraction_times[~np.isnan(extraction_times)]
205 if len(valid_extraction_times) == 0:
206 continue
208 extraction_min = valid_extraction_times[0] # Min is first element (monotonic)
209 extraction_max = valid_extraction_times[-1] # Max is last element (monotonic)
211 # Check if extraction window overlaps with cout window
212 # Two intervals [a1, a2] and [b1, b2] overlap if: max(a1, b1) < min(a2, b2)
213 has_overlap = max(extraction_min, cout_time_min) < min(extraction_max, cout_time_max)
215 if not has_overlap:
216 # No temporal overlap - this bin contributes nothing, skip expensive computation
217 continue
219 # SYMMETRIC temporal overlap computation:
220 # Infiltration to extraction: maps infiltration → cout time windows
221 # Extraction to infiltration: maps extraction → cout time windows (transposed relationship)
222 overlap_matrix = partial_isin(bin_edges_in=extraction_tedges_2d[i, :], bin_edges_out=cout_tedges_days)
224 # Apply flow weighting to this pore volume's overlap matrix
225 flow_weighted_overlap = overlap_matrix * flow[None, :]
227 # Normalize this pore volume's contribution (each row sums to 1 after flow weighting)
228 row_sums = np.sum(flow_weighted_overlap, axis=1)
229 valid_rows_pv = row_sums > 0
230 normalized_overlap = np.zeros_like(flow_weighted_overlap)
231 normalized_overlap[valid_rows_pv, :] = flow_weighted_overlap[valid_rows_pv, :] / row_sums[valid_rows_pv, None]
233 # Accumulate only the valid bins from this pore volume
234 accumulated_weights[valid_bins_2d[i, :], :] += normalized_overlap[valid_bins_2d[i, :], :]
236 # Average across all pore volumes assuming equal probability per bin.
237 # This is correct when aquifer_pore_volumes comes from gamma.bins() which
238 # produces equal-probability bins. For user-supplied pore volumes with
239 # unequal probability mass, a weights parameter would be needed.
240 return accumulated_weights / len(aquifer_pore_volumes)