Coverage for src/gwtransport/advection.py: 91%
113 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 20:01 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 20:01 +0000
1"""
2Advection Analysis for 1D Aquifer Systems.
4This module provides functions to analyze compound transport by advection
5in aquifer systems. It includes tools for computing concentrations of the extracted water
6based on the concentration of the infiltrating water, extraction data and aquifer properties.
8The model assumes requires the groundwaterflow to be reduced to a 1D system. On one side,
9water with a certain concentration infiltrates ('cin'), the water flows through the aquifer and
10the compound of interest flows through the aquifer with a retarded velocity. The water is
11extracted ('cout').
13Main functions:
14- infiltration_to_extraction: Compute the concentration of the extracted water by shifting cin with its residence time. This corresponds to a convolution operation.
15- gamma_infiltration_to_extraction: Similar to infiltration_to_extraction, but for a gamma distribution of aquifer pore volumes.
16- distribution_infiltration_to_extraction: Similar to infiltration_to_extraction, but for an arbitrary distribution of aquifer pore volumes.
17"""
19import numpy as np
20import numpy.typing as npt
21import pandas as pd
23from gwtransport import gamma
24from gwtransport.residence_time import residence_time
25from gwtransport.utils import compute_time_edges, interp_series, partial_isin
28def infiltration_to_extraction(
29 cin_series: pd.Series,
30 flow_series: pd.Series,
31 aquifer_pore_volume: float,
32 retardation_factor: float = 1.0,
33 cout_index: str = "cin",
34) -> pd.Series:
35 """
36 Compute the concentration of the extracted water by shifting cin with its residence time.
38 The compound is retarded in the aquifer with a retardation factor. The residence
39 time is computed based on the flow rate of the water in the aquifer and the pore volume
40 of the aquifer.
42 This function represents infiltration to extraction modeling (equivalent to convolution).
44 Parameters
45 ----------
46 cin_series : pandas.Series
47 Concentration of the compound in the infiltrating water [ng/m3]. The cin_series should be the average concentration of a time bin. The index should be a pandas.DatetimeIndex
48 and is labeled at the end of the time bin.
49 flow_series : pandas.Series
50 Flow rate of water in the aquifer [m3/day]. The flow_series should be the average flow of a time bin. The index should be a pandas.DatetimeIndex
51 and is labeled at the end of the time bin.
52 aquifer_pore_volume : float
53 Pore volume of the aquifer [m3].
54 cout_index : str, optional
55 The index of the output series. Can be 'cin', 'flow', or 'cout'. Default is 'cin'.
56 - 'cin': The output series will have the same index as `cin_series`.
57 - 'flow': The output series will have the same index as `flow_series`.
58 - 'cout': The output series will have the same index as `cin_series + residence_time`.
60 Returns
61 -------
62 pandas.Series or numpy.ndarray
63 Concentration of the compound in the extracted water [ng/m3].
64 Returns pandas.Series when cout_index is 'cin' or 'flow', or numpy.ndarray when cout_index is 'cout'.
66 Examples
67 --------
68 Basic usage with single pore volume:
70 >>> import pandas as pd
71 >>> import numpy as np
72 >>> from gwtransport.advection import infiltration_to_extraction
73 >>>
74 >>> # Create input data
75 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-10", freq="D")
76 >>> cin = pd.Series(np.ones(len(dates)), index=dates)
77 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day
78 >>>
79 >>> # Single aquifer pore volume
80 >>> aquifer_pore_volume = 500.0 # m3
81 >>>
82 >>> # Run infiltration to extraction model (default returns on cin index)
83 >>> cout = infiltration_to_extraction(cin, flow, aquifer_pore_volume)
84 >>> len(cout) == len(cin)
85 True
87 Different output index options:
89 >>> # Output on cin index (default)
90 >>> cout_cin = infiltration_to_extraction(
91 ... cin, flow, aquifer_pore_volume, cout_index="cin"
92 ... )
93 >>>
94 >>> # Output on flow index
95 >>> cout_flow = infiltration_to_extraction(
96 ... cin, flow, aquifer_pore_volume, cout_index="flow"
97 ... )
98 >>>
99 >>> # Output on shifted time index (cin + residence_time)
100 >>> cout_shifted = infiltration_to_extraction(
101 ... cin, flow, aquifer_pore_volume, cout_index="cout"
102 ... )
104 With retardation factor:
106 >>> # Compound moves twice as slowly due to sorption
107 >>> cout = infiltration_to_extraction(
108 ... cin, flow, aquifer_pore_volume, retardation_factor=2.0
109 ... )
110 """
111 # Create flow tedges from the flow series index (assuming it's at the end of bins)
112 flow_tedges = compute_time_edges(
113 tedges=None, tstart=None, tend=pd.DatetimeIndex(flow_series.index), number_of_bins=len(flow_series)
114 )
115 rt_array = residence_time(
116 flow=flow_series,
117 flow_tedges=flow_tedges,
118 index=pd.DatetimeIndex(cin_series.index),
119 aquifer_pore_volume=aquifer_pore_volume,
120 retardation_factor=retardation_factor,
121 direction="infiltration_to_extraction",
122 )
124 rt = pd.to_timedelta(rt_array[0], unit="D", errors="coerce")
125 index = cin_series.index + rt
127 cout = pd.Series(data=cin_series.values, index=index, name="cout")
129 if cout_index == "cin":
130 return interp_series(cout, pd.DatetimeIndex(cin_series.index))
131 if cout_index == "flow":
132 # If cout_index is 'flow', we need to resample cout to the flow index
133 return interp_series(cout, pd.DatetimeIndex(flow_series.index))
134 if cout_index == "cout":
135 # If cout_index is 'cout', we return the cout as is
136 return cout.values
138 msg = f"Invalid cout_index: {cout_index}. Must be 'cin', 'flow', or 'cout'."
139 raise ValueError(msg)
142def extraction_to_infiltration(
143 cout: pd.Series, flow: pd.Series, aquifer_pore_volume: float, retardation_factor: float = 1.0, resample_dates=None
144) -> pd.Series:
145 """
146 Compute the concentration of the infiltrating water by shifting cout with its residence time.
148 This function represents extraction to infiltration modeling (equivalent to deconvolution).
150 Parameters
151 ----------
152 cout : pandas.Series
153 Concentration of the compound in the extracted water [ng/m3].
154 flow : pandas.Series
155 Flow rate of water in the aquifer [m3/day].
156 aquifer_pore_volume : float
157 Pore volume of the aquifer [m3].
159 Returns
160 -------
161 numpy.ndarray
162 Concentration of the compound in the infiltrating water [ng/m3].
163 """
164 msg = "Extraction to infiltration advection (deconvolution) is not implemented yet"
165 raise NotImplementedError(msg)
168def gamma_infiltration_to_extraction(
169 *,
170 cin,
171 flow,
172 tedges,
173 cout_tedges,
174 alpha=None,
175 beta=None,
176 mean=None,
177 std=None,
178 n_bins=100,
179 retardation_factor=1.0,
180):
181 """
182 Compute the concentration of the extracted water by shifting cin with its residence time.
184 The compound is retarded in the aquifer with a retardation factor. The residence
185 time is computed based on the flow rate of the water in the aquifer and the pore volume
186 of the aquifer. The aquifer pore volume is approximated by a gamma distribution, with
187 parameters alpha and beta.
189 This function represents infiltration to extraction modeling (equivalent to convolution).
191 Provide either alpha and beta or mean and std.
193 Parameters
194 ----------
195 cin : pandas.Series
196 Concentration of the compound in infiltrating water or temperature of infiltrating
197 water.
198 cin_tedges : pandas.DatetimeIndex
199 Time edges for the concentration data. Used to compute the cumulative concentration.
200 Has a length of one more than `cin`.
201 cout_tedges : pandas.DatetimeIndex
202 Time edges for the output data. Used to compute the cumulative concentration.
203 Has a length of one more than `flow`.
204 flow : pandas.Series
205 Flow rate of water in the aquifer [m3/day].
206 flow_tedges : pandas.DatetimeIndex
207 Time edges for the flow data. Used to compute the cumulative flow.
208 Has a length of one more than `flow`.
209 alpha : float, optional
210 Shape parameter of gamma distribution of the aquifer pore volume (must be > 0)
211 beta : float, optional
212 Scale parameter of gamma distribution of the aquifer pore volume (must be > 0)
213 mean : float, optional
214 Mean of the gamma distribution.
215 std : float, optional
216 Standard deviation of the gamma distribution.
217 n_bins : int
218 Number of bins to discretize the gamma distribution.
219 retardation_factor : float
220 Retardation factor of the compound in the aquifer.
222 Returns
223 -------
224 numpy.ndarray
225 Concentration of the compound in the extracted water [ng/m3] or temperature.
227 Examples
228 --------
229 Basic usage with alpha and beta parameters:
231 >>> import pandas as pd
232 >>> import numpy as np
233 >>> from gwtransport.utils import compute_time_edges
234 >>> from gwtransport.advection import gamma_infiltration_to_extraction
235 >>>
236 >>> # Create input data with aligned time edges
237 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D")
238 >>> tedges = compute_time_edges(
239 ... tedges=None, tstart=None, tend=dates, number_of_bins=len(dates)
240 ... )
241 >>>
242 >>> # Create output time edges (can be different alignment)
243 >>> cout_dates = pd.date_range(start="2020-01-05", end="2020-01-15", freq="D")
244 >>> cout_tedges = compute_time_edges(
245 ... tedges=None, tstart=None, tend=cout_dates, number_of_bins=len(cout_dates)
246 ... )
247 >>>
248 >>> # Input concentration and flow (same length, aligned with tedges)
249 >>> cin = pd.Series(np.ones(len(dates)), index=dates)
250 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day
251 >>>
252 >>> # Run gamma_infiltration_to_extraction with alpha/beta parameters
253 >>> cout = gamma_infiltration_to_extraction(
254 ... cin=cin,
255 ... cin_tedges=tedges,
256 ... cout_tedges=cout_tedges,
257 ... flow=flow,
258 ... flow_tedges=tedges, # Must be identical to cin_tedges
259 ... alpha=10.0,
260 ... beta=10.0,
261 ... n_bins=5,
262 ... )
263 >>> cout.shape
264 (11,)
266 Using mean and std parameters instead:
268 >>> cout = gamma_infiltration_to_extraction(
269 ... cin=cin,
270 ... cin_tedges=tedges,
271 ... cout_tedges=cout_tedges,
272 ... flow=flow,
273 ... flow_tedges=tedges,
274 ... mean=100.0,
275 ... std=20.0,
276 ... n_bins=5,
277 ... )
279 With retardation factor:
281 >>> cout = gamma_infiltration_to_extraction(
282 ... cin=cin,
283 ... cin_tedges=tedges,
284 ... cout_tedges=cout_tedges,
285 ... flow=flow,
286 ... flow_tedges=tedges,
287 ... alpha=10.0,
288 ... beta=10.0,
289 ... retardation_factor=2.0, # Doubles residence time
290 ... )
291 """
292 bins = gamma.bins(alpha=alpha, beta=beta, mean=mean, std=std, n_bins=n_bins)
293 return distribution_infiltration_to_extraction(
294 cin=cin,
295 flow=flow,
296 tedges=tedges,
297 cout_tedges=cout_tedges,
298 aquifer_pore_volumes=bins["expected_values"],
299 retardation_factor=retardation_factor,
300 )
303def gamma_extraction_to_infiltration(cout, flow, alpha, beta, n_bins=100, retardation_factor=1.0):
304 """
305 Compute the concentration of the infiltrating water by shifting cout with its residence time.
307 This function represents extraction to infiltration modeling (equivalent to deconvolution).
309 Parameters
310 ----------
311 cout : pandas.Series
312 Concentration of the compound in the extracted water [ng/m3].
313 flow : pandas.Series
314 Flow rate of water in the aquifer [m3/day].
315 alpha : float
316 Shape parameter of gamma distribution of the aquifer pore volume (must be > 0)
317 beta : float
318 Scale parameter of gamma distribution of the aquifer pore volume (must be > 0)
319 n_bins : int
320 Number of bins to discretize the gamma distribution.
321 retardation_factor : float
322 Retardation factor of the compound in the aquifer.
324 Returns
325 -------
326 NotImplementedError
327 This function is not yet implemented.
328 """
329 msg = "Extraction to infiltration advection gamma (deconvolution) is not implemented yet"
330 raise NotImplementedError(msg)
333def distribution_infiltration_to_extraction(
334 *,
335 cin: npt.ArrayLike,
336 flow: npt.ArrayLike,
337 tedges: pd.DatetimeIndex,
338 cout_tedges: pd.DatetimeIndex,
339 aquifer_pore_volumes: npt.ArrayLike,
340 retardation_factor: float = 1.0,
341) -> np.ndarray:
342 """
343 Compute the concentration of the extracted water using flow-weighted advection.
345 This function implements an infiltration to extraction advection model where cin and flow values
346 correspond to the same aligned time bins defined by tedges.
348 The algorithm:
349 1. Computes residence times for each pore volume at cout time edges
350 2. Calculates infiltration time edges by subtracting residence times
351 3. Determines temporal overlaps between infiltration and cin time windows
352 4. Creates flow-weighted overlap matrices normalized by total weights
353 5. Computes weighted contributions and averages across pore volumes
355 Parameters
356 ----------
357 cin : array-like
358 Concentration values of infiltrating water or temperature [concentration units].
359 Length must match the number of time bins defined by tedges.
360 flow : array-like
361 Flow rate values in the aquifer [m3/day].
362 Length must match cin and the number of time bins defined by tedges.
363 tedges : pandas.DatetimeIndex
364 Time edges defining bins for both cin and flow data. Has length of
365 len(cin) + 1 and len(flow) + 1.
366 cout_tedges : pandas.DatetimeIndex
367 Time edges for output data bins. Has length of desired output + 1.
368 Can have different time alignment and resolution than tedges.
369 aquifer_pore_volumes : array-like
370 Array of aquifer pore volumes [m3] representing the distribution
371 of residence times in the aquifer system.
372 retardation_factor : float, optional
373 Retardation factor of the compound in the aquifer (default 1.0).
374 Values > 1.0 indicate slower transport due to sorption/interaction.
376 Returns
377 -------
378 numpy.ndarray
379 Flow-weighted concentration in the extracted water. Same units as cin.
380 Length equals len(cout_tedges) - 1. NaN values indicate time periods
381 with no valid contributions from the infiltration data.
383 Raises
384 ------
385 ValueError
386 If tedges length doesn't match cin/flow arrays plus one, or if
387 infiltration time edges become non-monotonic (invalid input conditions).
389 Examples
390 --------
391 Basic usage with pandas Series:
393 >>> import pandas as pd
394 >>> import numpy as np
395 >>> from gwtransport.utils import compute_time_edges
396 >>> from gwtransport.advection import distribution_infiltration_to_extraction
397 >>>
398 >>> # Create input data
399 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D")
400 >>> tedges = compute_time_edges(
401 ... tedges=None, tstart=None, tend=dates, number_of_bins=len(dates)
402 ... )
403 >>>
404 >>> # Create output time edges (different alignment)
405 >>> cout_dates = pd.date_range(start="2020-01-05", end="2020-01-15", freq="D")
406 >>> cout_tedges = compute_time_edges(
407 ... tedges=None, tstart=None, tend=cout_dates, number_of_bins=len(cout_dates)
408 ... )
409 >>>
410 >>> # Input concentration and flow
411 >>> cin = pd.Series(np.ones(len(dates)), index=dates)
412 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day
413 >>>
414 >>> # Define distribution of aquifer pore volumes
415 >>> aquifer_pore_volumes = np.array([50, 100, 200]) # m3
416 >>>
417 >>> # Run distribution_infiltration_to_extraction
418 >>> cout = distribution_infiltration_to_extraction(
419 ... cin=cin,
420 ... flow=flow,
421 ... tedges=tedges,
422 ... cout_tedges=cout_tedges,
423 ... aquifer_pore_volumes=aquifer_pore_volumes,
424 ... )
425 >>> cout.shape
426 (11,)
428 Using array inputs instead of pandas Series:
430 >>> # Convert to arrays
431 >>> cin_values = cin.values
432 >>> flow_values = flow.values
433 >>>
434 >>> cout = distribution_infiltration_to_extraction(
435 ... cin=cin_values,
436 ... flow=flow_values,
437 ... tedges=tedges,
438 ... cout_tedges=cout_tedges,
439 ... aquifer_pore_volumes=aquifer_pore_volumes,
440 ... )
442 With retardation factor:
444 >>> cout = distribution_infiltration_to_extraction(
445 ... cin=cin,
446 ... flow=flow,
447 ... tedges=tedges,
448 ... cout_tedges=cout_tedges,
449 ... aquifer_pore_volumes=aquifer_pore_volumes,
450 ... retardation_factor=2.0, # Compound moves twice as slowly
451 ... )
453 Using single pore volume:
455 >>> single_volume = np.array([100]) # Single 100 m3 pore volume
456 >>> cout = distribution_infiltration_to_extraction(
457 ... cin=cin,
458 ... flow=flow,
459 ... tedges=tedges,
460 ... cout_tedges=cout_tedges,
461 ... aquifer_pore_volumes=single_volume,
462 ... )
463 """
464 tedges = pd.DatetimeIndex(tedges)
465 cout_tedges = pd.DatetimeIndex(cout_tedges)
467 if len(tedges) != len(cin) + 1:
468 msg = "tedges must have one more element than cin"
469 raise ValueError(msg)
470 if len(tedges) != len(flow) + 1:
471 msg = "tedges must have one more element than flow"
472 raise ValueError(msg)
474 # Convert to arrays for vectorized operations
475 cin_values = np.asarray(cin)
476 flow_values = np.asarray(flow)
478 # Validate inputs do not contain NaN values
479 if np.any(np.isnan(cin_values)):
480 msg = "cin contains NaN values, which are not allowed"
481 raise ValueError(msg)
482 if np.any(np.isnan(flow_values)):
483 msg = "flow contains NaN values, which are not allowed"
484 raise ValueError(msg)
485 cin_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values
486 cout_tedges_days = ((cout_tedges - tedges[0]) / pd.Timedelta(days=1)).values
487 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes)
489 # Pre-compute all residence times and infiltration edges
490 rt_edges_2d = residence_time(
491 flow=flow_values,
492 flow_tedges=tedges,
493 index=cout_tedges,
494 aquifer_pore_volume=aquifer_pore_volumes,
495 retardation_factor=retardation_factor,
496 direction="extraction_to_infiltration",
497 )
498 infiltration_tedges_2d = cout_tedges_days[None, :] - rt_edges_2d
500 # Pre-compute valid bins and count
501 valid_bins_2d = ~(np.isnan(infiltration_tedges_2d[:, :-1]) | np.isnan(infiltration_tedges_2d[:, 1:]))
502 valid_pv_count = np.sum(valid_bins_2d, axis=0)
504 # Initialize accumulator
505 normalized_weights = _distribution_infiltration_to_extraction_weights(
506 cout_tedges,
507 aquifer_pore_volumes,
508 cin_values,
509 flow_values,
510 cin_tedges_days,
511 infiltration_tedges_2d,
512 valid_bins_2d,
513 valid_pv_count,
514 )
516 # Apply to concentrations and handle NaN for periods with no contributions
517 out = normalized_weights.dot(cin_values)
518 out[valid_pv_count == 0] = np.nan
520 return out
523def _distribution_infiltration_to_extraction_weights(
524 cout_tedges,
525 aquifer_pore_volumes,
526 cin_values,
527 flow_values,
528 cin_tedges_days,
529 infiltration_tedges_2d,
530 valid_bins_2d,
531 valid_pv_count,
532):
533 accumulated_weights = np.zeros((len(cout_tedges) - 1, len(cin_values)))
535 # Loop over each pore volume
536 for i in range(len(aquifer_pore_volumes)):
537 if np.any(valid_bins_2d[i, :]):
538 overlap_matrix = partial_isin(infiltration_tedges_2d[i, :], cin_tedges_days)
539 accumulated_weights[valid_bins_2d[i, :], :] += overlap_matrix[valid_bins_2d[i, :], :]
541 # Average across valid pore volumes and apply flow weighting
542 averaged_weights = np.zeros_like(accumulated_weights)
543 valid_cout = valid_pv_count > 0
544 averaged_weights[valid_cout, :] = accumulated_weights[valid_cout, :] / valid_pv_count[valid_cout, None]
546 # Apply flow weighting after averaging
547 flow_weighted_averaged = averaged_weights * flow_values[None, :]
549 total_weights = np.sum(flow_weighted_averaged, axis=1)
550 valid_weights = total_weights > 0
551 normalized_weights = np.zeros_like(flow_weighted_averaged)
552 normalized_weights[valid_weights, :] = flow_weighted_averaged[valid_weights, :] / total_weights[valid_weights, None]
553 return normalized_weights
556def distribution_extraction_to_infiltration(
557 *,
558 cout: npt.ArrayLike,
559 flow: npt.ArrayLike,
560 tedges: pd.DatetimeIndex,
561 cin_tedges: pd.DatetimeIndex,
562 aquifer_pore_volumes: npt.ArrayLike,
563 retardation_factor: float = 1.0,
564) -> np.ndarray:
565 """
566 Compute the concentration of the infiltrating water from extracted water (deconvolution).
568 This function implements an extraction to infiltration advection model (inverse of distribution_infiltration_to_extraction)
569 where cout and flow values correspond to the same aligned time bins defined by tedges.
571 SYMMETRIC RELATIONSHIP:
572 - distribution_infiltration_to_extraction: cin + tedges → cout + cout_tedges
573 - distribution_extraction_to_infiltration: cout + tedges → cin + cin_tedges
575 The algorithm (symmetric to distribution_infiltration_to_extraction):
576 1. Computes residence times for each pore volume at cint time edges
577 2. Calculates extraction time edges by adding residence times (reverse of infiltration_to_extraction)
578 3. Determines temporal overlaps between extraction and cout time windows
579 4. Creates flow-weighted overlap matrices normalized by total weights
580 5. Computes weighted contributions and averages across pore volumes
582 Parameters
583 ----------
584 cout : array-like
585 Concentration values of extracted water [concentration units].
586 Length must match the number of time bins defined by tedges.
587 flow : array-like
588 Flow rate values in the aquifer [m3/day].
589 Length must match cout and the number of time bins defined by tedges.
590 tedges : pandas.DatetimeIndex
591 Time edges defining bins for both cout and flow data. Has length of
592 len(cout) + 1 and len(flow) + 1.
593 cin_tedges : pandas.DatetimeIndex
594 Time edges for output (infiltration) data bins. Has length of desired output + 1.
595 Can have different time alignment and resolution than tedges.
596 aquifer_pore_volumes : array-like
597 Array of aquifer pore volumes [m3] representing the distribution
598 of residence times in the aquifer system.
599 retardation_factor : float, optional
600 Retardation factor of the compound in the aquifer (default 1.0).
601 Values > 1.0 indicate slower transport due to sorption/interaction.
603 Returns
604 -------
605 numpy.ndarray
606 Flow-weighted concentration in the infiltrating water. Same units as cout.
607 Length equals len(cin_tedges) - 1. NaN values indicate time periods
608 with no valid contributions from the extraction data.
610 Raises
611 ------
612 ValueError
613 If tedges length doesn't match cout/flow arrays plus one, or if
614 extraction time edges become non-monotonic (invalid input conditions).
616 Examples
617 --------
618 Basic usage with pandas Series:
620 >>> import pandas as pd
621 >>> import numpy as np
622 >>> from gwtransport.utils import compute_time_edges
623 >>> from gwtransport.advection import distribution_extraction_to_infiltration
624 >>>
625 >>> # Create input data
626 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D")
627 >>> tedges = compute_time_edges(
628 ... tedges=None, tstart=None, tend=dates, number_of_bins=len(dates)
629 ... )
630 >>>
631 >>> # Create output time edges (different alignment)
632 >>> cint_dates = pd.date_range(start="2019-12-25", end="2020-01-15", freq="D")
633 >>> cin_tedges = compute_time_edges(
634 ... tedges=None, tstart=None, tend=cint_dates, number_of_bins=len(cint_dates)
635 ... )
636 >>>
637 >>> # Input concentration and flow
638 >>> cout = pd.Series(np.ones(len(dates)), index=dates)
639 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day
640 >>>
641 >>> # Define distribution of aquifer pore volumes
642 >>> aquifer_pore_volumes = np.array([50, 100, 200]) # m3
643 >>>
644 >>> # Run distribution_extraction_to_infiltration
645 >>> cin = distribution_extraction_to_infiltration(
646 ... cout=cout,
647 ... flow=flow,
648 ... tedges=tedges,
649 ... cin_tedges=cin_tedges,
650 ... aquifer_pore_volumes=aquifer_pore_volumes,
651 ... )
652 >>> cin.shape
653 (22,)
655 Using array inputs instead of pandas Series:
657 >>> # Convert to arrays
658 >>> cout_values = cout.values
659 >>> flow_values = flow.values
660 >>>
661 >>> cin = distribution_extraction_to_infiltration(
662 ... cout=cout_values,
663 ... flow=flow_values,
664 ... tedges=tedges,
665 ... cin_tedges=cin_tedges,
666 ... aquifer_pore_volumes=aquifer_pore_volumes,
667 ... )
669 With retardation factor:
671 >>> cin = distribution_extraction_to_infiltration(
672 ... cout=cout,
673 ... flow=flow,
674 ... tedges=tedges,
675 ... cin_tedges=cin_tedges,
676 ... aquifer_pore_volumes=aquifer_pore_volumes,
677 ... retardation_factor=2.0, # Compound moves twice as slowly
678 ... )
680 Using single pore volume:
682 >>> single_volume = np.array([100]) # Single 100 m3 pore volume
683 >>> cin = distribution_extraction_to_infiltration(
684 ... cout=cout,
685 ... flow=flow,
686 ... tedges=tedges,
687 ... cin_tedges=cin_tedges,
688 ... aquifer_pore_volumes=single_volume,
689 ... )
690 """
691 tedges = pd.DatetimeIndex(tedges)
692 cin_tedges = pd.DatetimeIndex(cin_tedges)
694 if len(tedges) != len(cout) + 1:
695 msg = "tedges must have one more element than cout"
696 raise ValueError(msg)
697 if len(tedges) != len(flow) + 1:
698 msg = "tedges must have one more element than flow"
699 raise ValueError(msg)
701 # Convert to arrays for vectorized operations
702 cout_values = np.asarray(cout)
703 flow_values = np.asarray(flow)
705 # Validate inputs do not contain NaN values
706 if np.any(np.isnan(cout_values)):
707 msg = "cout contains NaN values, which are not allowed"
708 raise ValueError(msg)
709 if np.any(np.isnan(flow_values)):
710 msg = "flow contains NaN values, which are not allowed"
711 raise ValueError(msg)
712 cout_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values
713 cin_tedges_days = ((cin_tedges - tedges[0]) / pd.Timedelta(days=1)).values
714 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes)
716 # Pre-compute all residence times and extraction edges (symmetric to infiltration_to_extraction)
717 rt_edges_2d = residence_time(
718 flow=flow_values,
719 flow_tedges=tedges,
720 index=cin_tedges,
721 aquifer_pore_volume=aquifer_pore_volumes,
722 retardation_factor=retardation_factor,
723 direction="infiltration_to_extraction", # Computing from infiltration perspective
724 )
725 extraction_tedges_2d = cin_tedges_days[None, :] + rt_edges_2d
727 # Pre-compute valid bins and count
728 valid_bins_2d = ~(np.isnan(extraction_tedges_2d[:, :-1]) | np.isnan(extraction_tedges_2d[:, 1:]))
729 valid_pv_count = np.sum(valid_bins_2d, axis=0)
731 # Initialize accumulator
732 normalized_weights = _distribution_extraction_to_infiltration_weights(
733 cin_tedges,
734 aquifer_pore_volumes,
735 cout_values,
736 flow_values,
737 cout_tedges_days,
738 extraction_tedges_2d,
739 valid_bins_2d,
740 valid_pv_count,
741 )
743 # Apply to concentrations and handle NaN for periods with no contributions
744 out = normalized_weights.dot(cout_values)
745 out[valid_pv_count == 0] = np.nan
747 return out
750def _distribution_extraction_to_infiltration_weights(
751 cin_tedges,
752 aquifer_pore_volumes,
753 cout_values,
754 flow_values,
755 cout_tedges_days,
756 extraction_tedges_2d,
757 valid_bins_2d,
758 valid_pv_count,
759):
760 """
761 Compute extraction to infiltration transformation weights matrix.
763 Computes the weight matrix for the extraction to infiltration transformation,
764 ensuring mathematical symmetry with the infiltration to extraction operation. The extraction to infiltration
765 weights represent the transpose relationship needed for deconvolution.
767 SYMMETRIC RELATIONSHIP:
768 - Infiltration to extraction weights: W_infiltration_to_extraction maps cin → cout
769 - Extraction to infiltration weights: W_extraction_to_infiltration maps cout → cin
770 - Mathematical constraint: W_extraction_to_infiltration should be the pseudo-inverse of W_infiltration_to_extraction
772 The algorithm mirrors _distribution_infiltration_to_extraction_weights but with transposed
773 temporal overlap computations to ensure mathematical consistency.
775 Parameters
776 ----------
777 cin_tedges : pandas.DatetimeIndex
778 Time edges for output (infiltration) data bins.
779 aquifer_pore_volumes : array-like
780 Array of aquifer pore volumes [m3].
781 cout_values : array-like
782 Concentration values of extracted water.
783 flow_values : array-like
784 Flow rate values in the aquifer [m3/day].
785 cout_tedges_days : array-like
786 Time edges for cout data in days since reference.
787 extraction_tedges_2d : array-like
788 2D array of extraction time edges for each pore volume.
789 valid_bins_2d : array-like
790 2D boolean array indicating valid time bins for each pore volume.
791 valid_pv_count : array-like
792 Count of valid pore volumes for each output time bin.
794 Returns
795 -------
796 numpy.ndarray
797 Normalized weight matrix for extraction to infiltration transformation.
798 Shape: (len(cin_tedges) - 1, len(cout_values))
799 """
800 accumulated_weights = np.zeros((len(cin_tedges) - 1, len(cout_values)))
802 # Loop over each pore volume (same structure as infiltration_to_extraction)
803 for i in range(len(aquifer_pore_volumes)):
804 if np.any(valid_bins_2d[i, :]):
805 # SYMMETRIC temporal overlap computation:
806 # Infiltration to extraction: maps infiltration → cout time windows
807 # Extraction to infiltration: maps extraction → cout time windows (transposed relationship)
808 overlap_matrix = partial_isin(extraction_tedges_2d[i, :], cout_tedges_days)
809 accumulated_weights[valid_bins_2d[i, :], :] += overlap_matrix[valid_bins_2d[i, :], :]
811 # Average across valid pore volumes (symmetric to infiltration_to_extraction)
812 averaged_weights = np.zeros_like(accumulated_weights)
813 valid_cout = valid_pv_count > 0
814 averaged_weights[valid_cout, :] = accumulated_weights[valid_cout, :] / valid_pv_count[valid_cout, None]
816 # Apply flow weighting (symmetric to infiltration_to_extraction)
817 flow_weighted_averaged = averaged_weights * flow_values[None, :]
819 # Normalize by total weights (symmetric to infiltration_to_extraction)
820 total_weights = np.sum(flow_weighted_averaged, axis=1)
821 valid_weights = total_weights > 0
822 normalized_weights = np.zeros_like(flow_weighted_averaged)
823 normalized_weights[valid_weights, :] = flow_weighted_averaged[valid_weights, :] / total_weights[valid_weights, None]
825 return normalized_weights