Coverage for src / gwtransport / advection_utils.py: 94%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-04 18:51 +0000

1""" 

2Private helper functions for advective transport modeling. 

3 

4This module contains internal helper functions used by the advection module. 

5These functions implement various algorithms for computing transport weights 

6and handling nonlinear sorption. 

7 

8This file is part of gwtransport which is released under AGPL-3.0 license. 

9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

10""" 

11 

12import numpy as np 

13import numpy.typing as npt 

14import pandas as pd 

15 

16from gwtransport.residence_time import residence_time 

17from gwtransport.utils import partial_isin 

18 

19 

20def _infiltration_to_extraction_weights( 

21 *, 

22 tedges: pd.DatetimeIndex, 

23 cout_tedges: pd.DatetimeIndex, 

24 aquifer_pore_volumes: npt.NDArray[np.floating], 

25 flow: npt.NDArray[np.floating], 

26 retardation_factor: float, 

27) -> npt.NDArray[np.floating]: 

28 """ 

29 Compute normalized weights for linear infiltration to extraction transformation. 

30 

31 This helper function computes the weight matrix for constant retardation factor. 

32 It handles the main advective transport calculation with flow-weighted averaging. 

33 

34 The resulting cout values represent volume-weighted (flow-weighted) bin averages, 

35 where periods with higher infiltration flow rates contribute more to the output concentration. 

36 

37 Parameters 

38 ---------- 

39 tedges : pandas.DatetimeIndex 

40 Time edges for infiltration bins. 

41 cout_tedges : pandas.DatetimeIndex 

42 Time edges for extraction bins. 

43 aquifer_pore_volumes : array-like 

44 Distribution of pore volumes [m3]. 

45 flow : array-like 

46 Flow rate values [m3/day]. 

47 retardation_factor : float 

48 Constant retardation factor. 

49 

50 Returns 

51 ------- 

52 numpy.ndarray 

53 Normalized weight matrix. Shape: (len(cout_tedges) - 1, len(tedges) - 1) 

54 """ 

55 # Convert time edges to days 

56 cin_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values 

57 cout_tedges_days = ((cout_tedges - tedges[0]) / pd.Timedelta(days=1)).values 

58 

59 # Pre-compute all residence times and infiltration edges 

60 rt_edges_2d = residence_time( 

61 flow=flow, 

62 flow_tedges=tedges, 

63 index=cout_tedges, 

64 aquifer_pore_volumes=aquifer_pore_volumes, 

65 retardation_factor=retardation_factor, 

66 direction="extraction_to_infiltration", 

67 ) 

68 infiltration_tedges_2d = cout_tedges_days[None, :] - rt_edges_2d 

69 

70 # Pre-compute valid bins 

71 valid_bins_2d = ~(np.isnan(infiltration_tedges_2d[:, :-1]) | np.isnan(infiltration_tedges_2d[:, 1:])) 

72 

73 # Pre-compute cin time range for clip optimization (computed once, used n_bins times) 

74 cin_time_min = cin_tedges_days[0] 

75 cin_time_max = cin_tedges_days[-1] 

76 

77 # Accumulate flow-weighted overlap matrices from all pore volumes 

78 # Each pore volume has equal probability (equal-mass bins from gamma distribution) 

79 accumulated_weights = np.zeros((len(cout_tedges) - 1, len(tedges) - 1)) 

80 

81 # Loop over each pore volume 

82 for i in range(len(aquifer_pore_volumes)): 

83 if not np.any(valid_bins_2d[i, :]): 

84 continue 

85 

86 # Clip optimization: Check for temporal overlap before expensive computation 

87 # Get the range of infiltration times for this pore volume (only valid bins) 

88 infiltration_times = infiltration_tedges_2d[i, :] 

89 valid_infiltration_times = infiltration_times[~np.isnan(infiltration_times)] 

90 

91 if len(valid_infiltration_times) == 0: 

92 continue 

93 

94 infiltration_min = valid_infiltration_times[0] # Min is first element (monotonic) 

95 infiltration_max = valid_infiltration_times[-1] # Max is last element (monotonic) 

96 

97 # Check if infiltration window overlaps with cin window 

98 # Two intervals [a1, a2] and [b1, b2] overlap if: max(a1, b1) < min(a2, b2) 

99 has_overlap = max(infiltration_min, cin_time_min) < min(infiltration_max, cin_time_max) 

100 

101 if not has_overlap: 

102 # No temporal overlap - this bin contributes nothing, skip expensive computation 

103 continue 

104 

105 # Compute overlap matrix for this pore volume 

106 overlap_matrix = partial_isin(bin_edges_in=infiltration_tedges_2d[i, :], bin_edges_out=cin_tedges_days) 

107 

108 # Apply flow weighting to this pore volume's overlap matrix 

109 flow_weighted_overlap = overlap_matrix * flow[None, :] 

110 

111 # Normalize this pore volume's contribution (each row sums to 1 after flow weighting) 

112 row_sums = np.sum(flow_weighted_overlap, axis=1) 

113 valid_rows_pv = row_sums > 0 

114 normalized_overlap = np.zeros_like(flow_weighted_overlap) 

115 normalized_overlap[valid_rows_pv, :] = flow_weighted_overlap[valid_rows_pv, :] / row_sums[valid_rows_pv, None] 

116 

117 # Accumulate only the valid bins from this pore volume 

118 accumulated_weights[valid_bins_2d[i, :], :] += normalized_overlap[valid_bins_2d[i, :], :] 

119 

120 # Average across all pore volumes assuming equal probability per bin. 

121 # This is correct when aquifer_pore_volumes comes from gamma.bins() which 

122 # produces equal-probability bins. For user-supplied pore volumes with 

123 # unequal probability mass, a weights parameter would be needed. 

124 return accumulated_weights / len(aquifer_pore_volumes) 

125 

126 

127def _extraction_to_infiltration_weights( 

128 *, 

129 tedges: pd.DatetimeIndex, 

130 cin_tedges: pd.DatetimeIndex, 

131 aquifer_pore_volumes: npt.NDArray[np.floating], 

132 flow: npt.NDArray[np.floating], 

133 retardation_factor: float, 

134) -> npt.NDArray[np.floating]: 

135 """ 

136 Compute extraction to infiltration transformation weights matrix. 

137 

138 This is a reverse-direction flow-weighted estimate, NOT a true deconvolution 

139 or pseudo-inverse of the infiltration-to-extraction weights. The weight matrix 

140 is constructed independently using the same pore-volume-based overlap approach 

141 in reverse, so ``W_reverse @ (W_forward @ cin) != cin`` in general. 

142 

143 The algorithm mirrors ``_infiltration_to_extraction_weights`` but reverses the 

144 temporal mapping direction: for each infiltration time bin, it finds which 

145 extraction time bins contribute, weighted by flow rate. 

146 

147 The resulting cin values represent volume-weighted (flow-weighted) bin averages, 

148 where periods with higher extraction flow rates contribute more to the reconstructed infiltration concentration. 

149 

150 Parameters 

151 ---------- 

152 tedges : pandas.DatetimeIndex 

153 Time edges for cout and flow data bins. 

154 cin_tedges : pandas.DatetimeIndex 

155 Time edges for output (infiltration) data bins. 

156 aquifer_pore_volumes : array-like 

157 Array of aquifer pore volumes [m3]. 

158 flow : array-like 

159 Flow rate values in the aquifer [m3/day]. 

160 retardation_factor : float 

161 Retardation factor of the compound in the aquifer. 

162 

163 Returns 

164 ------- 

165 numpy.ndarray 

166 Normalized weight matrix for extraction to infiltration transformation. 

167 Shape: (len(cin_tedges) - 1, len(tedges) - 1) 

168 """ 

169 # Convert time edges to days 

170 cout_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values 

171 cin_tedges_days = ((cin_tedges - tedges[0]) / pd.Timedelta(days=1)).values 

172 

173 # Pre-compute all residence times and extraction edges (symmetric to infiltration_to_extraction) 

174 rt_edges_2d = residence_time( 

175 flow=flow, 

176 flow_tedges=tedges, 

177 index=cin_tedges, 

178 aquifer_pore_volumes=aquifer_pore_volumes, 

179 retardation_factor=retardation_factor, 

180 direction="infiltration_to_extraction", # Computing from infiltration perspective 

181 ) 

182 extraction_tedges_2d = cin_tedges_days[None, :] + rt_edges_2d 

183 

184 # Pre-compute valid bins 

185 valid_bins_2d = ~(np.isnan(extraction_tedges_2d[:, :-1]) | np.isnan(extraction_tedges_2d[:, 1:])) 

186 

187 # Pre-compute cout time range for clip optimization (computed once, used n_bins times) 

188 cout_time_min = cout_tedges_days[0] 

189 cout_time_max = cout_tedges_days[-1] 

190 

191 # Accumulate flow-weighted overlap matrices from all pore volumes 

192 # Each pore volume has equal probability (equal-mass bins from gamma distribution) 

193 accumulated_weights = np.zeros((len(cin_tedges) - 1, len(tedges) - 1)) 

194 

195 # Loop over each pore volume (same structure as infiltration_to_extraction) 

196 for i in range(len(aquifer_pore_volumes)): 

197 if not np.any(valid_bins_2d[i, :]): 

198 continue 

199 

200 # Clip optimization: Check for temporal overlap before expensive computation 

201 # Get the range of extraction times for this pore volume (only valid bins) 

202 extraction_times = extraction_tedges_2d[i, :] 

203 valid_extraction_times = extraction_times[~np.isnan(extraction_times)] 

204 

205 if len(valid_extraction_times) == 0: 

206 continue 

207 

208 extraction_min = valid_extraction_times[0] # Min is first element (monotonic) 

209 extraction_max = valid_extraction_times[-1] # Max is last element (monotonic) 

210 

211 # Check if extraction window overlaps with cout window 

212 # Two intervals [a1, a2] and [b1, b2] overlap if: max(a1, b1) < min(a2, b2) 

213 has_overlap = max(extraction_min, cout_time_min) < min(extraction_max, cout_time_max) 

214 

215 if not has_overlap: 

216 # No temporal overlap - this bin contributes nothing, skip expensive computation 

217 continue 

218 

219 # SYMMETRIC temporal overlap computation: 

220 # Infiltration to extraction: maps infiltration → cout time windows 

221 # Extraction to infiltration: maps extraction → cout time windows (transposed relationship) 

222 overlap_matrix = partial_isin(bin_edges_in=extraction_tedges_2d[i, :], bin_edges_out=cout_tedges_days) 

223 

224 # Apply flow weighting to this pore volume's overlap matrix 

225 flow_weighted_overlap = overlap_matrix * flow[None, :] 

226 

227 # Normalize this pore volume's contribution (each row sums to 1 after flow weighting) 

228 row_sums = np.sum(flow_weighted_overlap, axis=1) 

229 valid_rows_pv = row_sums > 0 

230 normalized_overlap = np.zeros_like(flow_weighted_overlap) 

231 normalized_overlap[valid_rows_pv, :] = flow_weighted_overlap[valid_rows_pv, :] / row_sums[valid_rows_pv, None] 

232 

233 # Accumulate only the valid bins from this pore volume 

234 accumulated_weights[valid_bins_2d[i, :], :] += normalized_overlap[valid_bins_2d[i, :], :] 

235 

236 # Average across all pore volumes assuming equal probability per bin. 

237 # This is correct when aquifer_pore_volumes comes from gamma.bins() which 

238 # produces equal-probability bins. For user-supplied pore volumes with 

239 # unequal probability mass, a weights parameter would be needed. 

240 return accumulated_weights / len(aquifer_pore_volumes)