Coverage for src / gwtransport / advection_utils.py: 95%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
1"""
2Private helper functions for advective transport modeling.
4This module contains internal helper functions used by the advection module.
5These functions implement various algorithms for computing transport weights
6and handling nonlinear sorption.
8This file is part of gwtransport which is released under AGPL-3.0 license.
9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
10"""
12import numpy as np
13import numpy.typing as npt
14import pandas as pd
16from gwtransport.residence_time import residence_time
17from gwtransport.utils import partial_isin
20def _infiltration_to_extraction_weights(
21 *,
22 tedges: pd.DatetimeIndex,
23 cout_tedges: pd.DatetimeIndex,
24 aquifer_pore_volumes: npt.NDArray[np.floating],
25 flow: npt.NDArray[np.floating],
26 retardation_factor: float,
27) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.bool_]]:
28 """
29 Compute normalized weights for linear infiltration to extraction transformation.
31 This helper function computes the weight matrix for constant retardation factor.
32 It handles the main advective transport calculation with flow-weighted averaging.
34 The resulting cout values represent volume-weighted (flow-weighted) bin averages,
35 where periods with higher infiltration flow rates contribute more to the output concentration.
37 Parameters
38 ----------
39 tedges : pandas.DatetimeIndex
40 Time edges for infiltration bins.
41 cout_tedges : pandas.DatetimeIndex
42 Time edges for extraction bins.
43 aquifer_pore_volumes : array-like
44 Distribution of pore volumes [m3].
45 flow : array-like
46 Flow rate values [m3/day].
47 retardation_factor : float
48 Constant retardation factor.
50 Returns
51 -------
52 weights : numpy.ndarray
53 Normalized weight matrix. Shape: (len(cout_tedges) - 1, len(tedges) - 1).
54 Rows for spin-up and zero-flow cout bins are all zero; use ``spinup_mask``
55 to distinguish spin-up (no contributing streamtube) from zero-flow
56 (contributing streamtubes all trace back into a zero-flow cin window).
57 spinup_mask : numpy.ndarray of bool
58 Shape: (len(cout_tedges) - 1,). True for cout bins where no streamtube
59 traced back into the cin time range (signal has not broken through).
60 """
61 # Convert time edges to days
62 cin_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values
63 cout_tedges_days = ((cout_tedges - tedges[0]) / pd.Timedelta(days=1)).values
65 # Pre-compute all residence times and infiltration edges
66 rt_edges_2d = residence_time(
67 flow=flow,
68 flow_tedges=tedges,
69 index=cout_tedges,
70 aquifer_pore_volumes=aquifer_pore_volumes,
71 retardation_factor=retardation_factor,
72 direction="extraction_to_infiltration",
73 )
74 infiltration_tedges_2d = cout_tedges_days[None, :] - rt_edges_2d
76 # Pre-compute valid bins
77 valid_bins_2d = ~(np.isnan(infiltration_tedges_2d[:, :-1]) | np.isnan(infiltration_tedges_2d[:, 1:]))
79 # Pre-compute cin time range for clip optimization (computed once, used n_bins times)
80 cin_time_min = cin_tedges_days[0]
81 cin_time_max = cin_tedges_days[-1]
83 # Per-streamtube derivation: for streamtube i and outlet bin k, mass balance gives
84 # c_out[k|i] = sum_j (Q_j * overlap[i,k,j] * c_j) / sum_j' (Q_j' * overlap[i,k,j'])
85 # which is the literal definition of mass-flux divided by water-flux for the bin.
86 # Each streamtube carries equal flow at the outlet (equal-mass pore-volume bins from
87 # the gamma distribution), so the bundle outlet concentration is the simple arithmetic
88 # average over streamtubes that contributed to the bin:
89 # c_out[k] = (1 / N_valid_k) * sum_{i valid} c_out[k|i]
90 # During spin-up, only the shorter pore-volume streamtubes have a valid source window
91 # within the cin time range; we average over the contributing streamtubes only.
92 n_cout = len(cout_tedges) - 1
93 n_cin = len(tedges) - 1
94 accumulated_weights = np.zeros((n_cout, n_cin))
95 contributing_bins = np.zeros(n_cout)
97 # Loop over each pore volume
98 for i in range(len(aquifer_pore_volumes)):
99 if not np.any(valid_bins_2d[i, :]):
100 continue
102 # Clip optimization: Check for temporal overlap before expensive computation
103 # Get the range of infiltration times for this pore volume (only valid bins)
104 infiltration_times = infiltration_tedges_2d[i, :]
105 valid_infiltration_times = infiltration_times[~np.isnan(infiltration_times)]
107 if len(valid_infiltration_times) == 0:
108 continue
110 infiltration_min = valid_infiltration_times[0] # Min is first element (monotonic)
111 infiltration_max = valid_infiltration_times[-1] # Max is last element (monotonic)
113 # Check if infiltration window overlaps with cin window
114 # Two intervals [a1, a2] and [b1, b2] overlap if: max(a1, b1) < min(a2, b2)
115 has_overlap = max(infiltration_min, cin_time_min) < min(infiltration_max, cin_time_max)
117 if not has_overlap:
118 # No temporal overlap - this bin contributes nothing, skip expensive computation
119 continue
121 # Compute overlap matrix for this pore volume.
122 # partial_isin returns shape (n_in, n_out) = (n_cout, n_cin) here, because
123 # bin_edges_in has length n_cout+1 (one infiltration-time edge per cout edge)
124 # and bin_edges_out has length n_cin+1 (the cin time edges).
125 overlap_matrix = partial_isin(bin_edges_in=infiltration_tedges_2d[i, :], bin_edges_out=cin_tedges_days)
127 # Per-streamtube weights: row k gives c_out[k|i] = sum_j weight[k,j] * c_in[j].
128 # The row-normalization is the direct mass-flux/water-flux ratio, NOT an
129 # error-hiding renormalization.
130 flow_weighted_overlap = overlap_matrix * flow[None, :]
131 row_sums = np.sum(flow_weighted_overlap, axis=1)
132 valid_rows_pv = row_sums > 0
133 normalized_overlap = np.zeros_like(flow_weighted_overlap)
134 normalized_overlap[valid_rows_pv, :] = flow_weighted_overlap[valid_rows_pv, :] / row_sums[valid_rows_pv, None]
136 # Accumulate only the valid bins from this pore volume for a simple count-average
137 # over contributing streamtubes.
138 accumulated_weights[valid_bins_2d[i, :], :] += normalized_overlap[valid_bins_2d[i, :], :]
139 contributing_bins[valid_bins_2d[i, :]] += 1
141 # Simple arithmetic average across contributing streamtubes (equal flow per
142 # streamtube at the outlet means equal weight). This is correct under variable
143 # flow; an end-of-loop global normalization would silently bias streamtubes by
144 # source-window length.
145 valid_rows = contributing_bins > 0
146 result = np.zeros_like(accumulated_weights)
147 result[valid_rows, :] = accumulated_weights[valid_rows, :] / contributing_bins[valid_rows, None]
148 spinup_mask = ~valid_rows
149 return result, spinup_mask