Coverage for src / gwtransport / advection_utils.py: 95%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-27 06:32 +0000

1""" 

2Private helper functions for advective transport modeling. 

3 

4This module contains internal helper functions used by the advection module. 

5These functions implement various algorithms for computing transport weights 

6and handling nonlinear sorption. 

7 

8This file is part of gwtransport which is released under AGPL-3.0 license. 

9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

10""" 

11 

12import numpy as np 

13import numpy.typing as npt 

14import pandas as pd 

15 

16from gwtransport.residence_time import residence_time 

17from gwtransport.utils import partial_isin 

18 

19 

20def _infiltration_to_extraction_weights( 

21 *, 

22 tedges: pd.DatetimeIndex, 

23 cout_tedges: pd.DatetimeIndex, 

24 aquifer_pore_volumes: npt.NDArray[np.floating], 

25 flow: npt.NDArray[np.floating], 

26 retardation_factor: float, 

27) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.bool_]]: 

28 """ 

29 Compute normalized weights for linear infiltration to extraction transformation. 

30 

31 This helper function computes the weight matrix for constant retardation factor. 

32 It handles the main advective transport calculation with flow-weighted averaging. 

33 

34 The resulting cout values represent volume-weighted (flow-weighted) bin averages, 

35 where periods with higher infiltration flow rates contribute more to the output concentration. 

36 

37 Parameters 

38 ---------- 

39 tedges : pandas.DatetimeIndex 

40 Time edges for infiltration bins. 

41 cout_tedges : pandas.DatetimeIndex 

42 Time edges for extraction bins. 

43 aquifer_pore_volumes : array-like 

44 Distribution of pore volumes [m3]. 

45 flow : array-like 

46 Flow rate values [m3/day]. 

47 retardation_factor : float 

48 Constant retardation factor. 

49 

50 Returns 

51 ------- 

52 weights : numpy.ndarray 

53 Normalized weight matrix. Shape: (len(cout_tedges) - 1, len(tedges) - 1). 

54 Rows for spin-up and zero-flow cout bins are all zero; use ``spinup_mask`` 

55 to distinguish spin-up (no contributing streamtube) from zero-flow 

56 (contributing streamtubes all trace back into a zero-flow cin window). 

57 spinup_mask : numpy.ndarray of bool 

58 Shape: (len(cout_tedges) - 1,). True for cout bins where no streamtube 

59 traced back into the cin time range (signal has not broken through). 

60 """ 

61 # Convert time edges to days 

62 cin_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values 

63 cout_tedges_days = ((cout_tedges - tedges[0]) / pd.Timedelta(days=1)).values 

64 

65 # Pre-compute all residence times and infiltration edges 

66 rt_edges_2d = residence_time( 

67 flow=flow, 

68 flow_tedges=tedges, 

69 index=cout_tedges, 

70 aquifer_pore_volumes=aquifer_pore_volumes, 

71 retardation_factor=retardation_factor, 

72 direction="extraction_to_infiltration", 

73 ) 

74 infiltration_tedges_2d = cout_tedges_days[None, :] - rt_edges_2d 

75 

76 # Pre-compute valid bins 

77 valid_bins_2d = ~(np.isnan(infiltration_tedges_2d[:, :-1]) | np.isnan(infiltration_tedges_2d[:, 1:])) 

78 

79 # Pre-compute cin time range for clip optimization (computed once, used n_bins times) 

80 cin_time_min = cin_tedges_days[0] 

81 cin_time_max = cin_tedges_days[-1] 

82 

83 # Per-streamtube derivation: for streamtube i and outlet bin k, mass balance gives 

84 # c_out[k|i] = sum_j (Q_j * overlap[i,k,j] * c_j) / sum_j' (Q_j' * overlap[i,k,j']) 

85 # which is the literal definition of mass-flux divided by water-flux for the bin. 

86 # Each streamtube carries equal flow at the outlet (equal-mass pore-volume bins from 

87 # the gamma distribution), so the bundle outlet concentration is the simple arithmetic 

88 # average over streamtubes that contributed to the bin: 

89 # c_out[k] = (1 / N_valid_k) * sum_{i valid} c_out[k|i] 

90 # During spin-up, only the shorter pore-volume streamtubes have a valid source window 

91 # within the cin time range; we average over the contributing streamtubes only. 

92 n_cout = len(cout_tedges) - 1 

93 n_cin = len(tedges) - 1 

94 accumulated_weights = np.zeros((n_cout, n_cin)) 

95 contributing_bins = np.zeros(n_cout) 

96 

97 # Loop over each pore volume 

98 for i in range(len(aquifer_pore_volumes)): 

99 if not np.any(valid_bins_2d[i, :]): 

100 continue 

101 

102 # Clip optimization: Check for temporal overlap before expensive computation 

103 # Get the range of infiltration times for this pore volume (only valid bins) 

104 infiltration_times = infiltration_tedges_2d[i, :] 

105 valid_infiltration_times = infiltration_times[~np.isnan(infiltration_times)] 

106 

107 if len(valid_infiltration_times) == 0: 

108 continue 

109 

110 infiltration_min = valid_infiltration_times[0] # Min is first element (monotonic) 

111 infiltration_max = valid_infiltration_times[-1] # Max is last element (monotonic) 

112 

113 # Check if infiltration window overlaps with cin window 

114 # Two intervals [a1, a2] and [b1, b2] overlap if: max(a1, b1) < min(a2, b2) 

115 has_overlap = max(infiltration_min, cin_time_min) < min(infiltration_max, cin_time_max) 

116 

117 if not has_overlap: 

118 # No temporal overlap - this bin contributes nothing, skip expensive computation 

119 continue 

120 

121 # Compute overlap matrix for this pore volume. 

122 # partial_isin returns shape (n_in, n_out) = (n_cout, n_cin) here, because 

123 # bin_edges_in has length n_cout+1 (one infiltration-time edge per cout edge) 

124 # and bin_edges_out has length n_cin+1 (the cin time edges). 

125 overlap_matrix = partial_isin(bin_edges_in=infiltration_tedges_2d[i, :], bin_edges_out=cin_tedges_days) 

126 

127 # Per-streamtube weights: row k gives c_out[k|i] = sum_j weight[k,j] * c_in[j]. 

128 # The row-normalization is the direct mass-flux/water-flux ratio, NOT an 

129 # error-hiding renormalization. 

130 flow_weighted_overlap = overlap_matrix * flow[None, :] 

131 row_sums = np.sum(flow_weighted_overlap, axis=1) 

132 valid_rows_pv = row_sums > 0 

133 normalized_overlap = np.zeros_like(flow_weighted_overlap) 

134 normalized_overlap[valid_rows_pv, :] = flow_weighted_overlap[valid_rows_pv, :] / row_sums[valid_rows_pv, None] 

135 

136 # Accumulate only the valid bins from this pore volume for a simple count-average 

137 # over contributing streamtubes. 

138 accumulated_weights[valid_bins_2d[i, :], :] += normalized_overlap[valid_bins_2d[i, :], :] 

139 contributing_bins[valid_bins_2d[i, :]] += 1 

140 

141 # Simple arithmetic average across contributing streamtubes (equal flow per 

142 # streamtube at the outlet means equal weight). This is correct under variable 

143 # flow; an end-of-loop global normalization would silently bias streamtubes by 

144 # source-window length. 

145 valid_rows = contributing_bins > 0 

146 result = np.zeros_like(accumulated_weights) 

147 result[valid_rows, :] = accumulated_weights[valid_rows, :] / contributing_bins[valid_rows, None] 

148 spinup_mask = ~valid_rows 

149 return result, spinup_mask