Coverage for src / gwtransport / fronttracking / math.py: 88%
144 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
1"""
2Mathematical Foundation for Front Tracking with Nonlinear Sorption.
4This module provides exact analytical computations for:
5- Freundlich and constant retardation models
6- Shock velocities via Rankine-Hugoniot condition
7- Characteristic velocities and positions
8- First arrival time calculations
9- Entropy condition verification
11All computations are exact analytical formulas with no numerical tolerances.
13This file is part of gwtransport which is released under AGPL-3.0 license.
14See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
15"""
17from dataclasses import dataclass
19import numpy as np
20import numpy.typing as npt
21import pandas as pd
23# Numerical tolerance constants
24EPSILON_FREUNDLICH_N = 1e-10 # Tolerance for checking if n ≈ 1.0
25EPSILON_EXPONENT = 1e-10 # Tolerance for checking if exponent ≈ 0
26EPSILON_DENOMINATOR = 1e-15 # Tolerance for near-zero denominators in shock velocity
29@dataclass
30class FreundlichSorption:
31 """
32 Freundlich sorption isotherm with exact analytical methods.
34 The Freundlich isotherm is: s(C) = k_f * C^(1/n)
36 where:
37 - s is sorbed concentration [mass/mass of solid]
38 - C is dissolved concentration [mass/volume of water]
39 - k_f is Freundlich coefficient [(volume/mass)^(1/n)]
40 - n is Freundlich exponent (dimensionless)
42 For n > 1: Higher C travels faster
43 For n < 1: Higher C travels slower
44 For n = 1: linear (not supported, use ConstantRetardation instead)
46 Parameters
47 ----------
48 k_f : float
49 Freundlich coefficient [(m³/kg)^(1/n)]. Must be positive.
50 n : float
51 Freundlich exponent [-]. Must be positive and != 1.
52 bulk_density : float
53 Bulk density of porous medium [kg/m³]. Must be positive.
54 porosity : float
55 Porosity [-]. Must be in (0, 1).
56 c_min : float, optional
57 Minimum concentration threshold. For n>1, prevents infinite retardation
58 as C→0. Default: 0.1 for n>1, 0.0 for n<1 (set automatically if not provided).
60 Examples
61 --------
62 >>> sorption = FreundlichSorption(
63 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
64 ... )
65 >>> r = sorption.retardation(5.0)
66 >>> c_back = sorption.concentration_from_retardation(r)
67 >>> bool(np.isclose(c_back, 5.0))
68 True
70 Notes
71 -----
72 The retardation factor is defined as:
73 R(C) = 1 + (rho_b/n_por) * ds/dC
74 = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1)
76 For Freundlich sorption, R depends on C, which creates nonlinear wave behavior.
78 For n>1 (higher C travels faster), R(C)→∞ as C→0, which can cause extremely slow
79 wave propagation. The c_min parameter prevents this by enforcing a minimum
80 concentration, making R(C) finite for all C≥0.
81 """
83 k_f: float
84 """Freundlich coefficient [(m³/kg)^(1/n)]."""
85 n: float
86 """Freundlich exponent [-]."""
87 bulk_density: float
88 """Bulk density of porous medium [kg/m³]."""
89 porosity: float
90 """Porosity [-]."""
91 c_min: float = 1e-12
92 """Minimum concentration threshold to prevent infinite retardation."""
94 def __post_init__(self):
95 """Validate parameters after initialization."""
96 if self.k_f <= 0:
97 msg = f"k_f must be positive, got {self.k_f}"
98 raise ValueError(msg)
99 if self.n <= 0:
100 msg = f"n must be positive, got {self.n}"
101 raise ValueError(msg)
102 if abs(self.n - 1.0) < EPSILON_FREUNDLICH_N:
103 msg = "n = 1 (linear case) not supported, use ConstantRetardation instead"
104 raise ValueError(msg)
105 if self.bulk_density <= 0:
106 msg = f"bulk_density must be positive, got {self.bulk_density}"
107 raise ValueError(msg)
108 if not 0 < self.porosity < 1:
109 msg = f"porosity must be in (0, 1), got {self.porosity}"
110 raise ValueError(msg)
111 if self.c_min < 0:
112 msg = f"c_min must be non-negative, got {self.c_min}"
113 raise ValueError(msg)
115 def retardation(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]:
116 """
117 Compute retardation factor R(C).
119 The retardation factor relates concentration velocity to pore water velocity:
120 v_C = flow / R(C)
122 For Freundlich sorption:
123 R(C) = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1)
125 Parameters
126 ----------
127 c : float or array-like
128 Dissolved concentration [mass/volume]. Non-negative.
130 Returns
131 -------
132 r : float or numpy.ndarray
133 Retardation factor [-]. Always >= 1.0.
135 Notes
136 -----
137 - For n > 1: R decreases with increasing C (higher C travels faster)
138 - For n < 1: R increases with increasing C (higher C travels slower)
139 - Concentrations at or below c_min return R=1 if c_min=0, else are clipped to c_min
140 """
141 is_array = isinstance(c, np.ndarray)
142 c_arr = np.asarray(c)
144 if self.c_min == 0 and self.n < 1.0:
145 # Only for n<1 (lower C travels faster) where R(0)=1 is physically correct
146 result = np.where(c_arr <= 0, 1.0, self._compute_retardation(c_arr))
147 else:
148 c_eff = np.maximum(c_arr, self.c_min)
149 result = self._compute_retardation(c_eff)
151 return result if is_array else float(result)
153 def _compute_retardation(self, c: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
154 """Compute retardation for positive concentrations."""
155 exponent = (1.0 / self.n) - 1.0
156 coefficient = (self.bulk_density * self.k_f) / (self.porosity * self.n)
157 return 1.0 + coefficient * (c**exponent)
159 def total_concentration(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]:
160 """
161 Compute total concentration (dissolved + sorbed per unit pore volume).
163 Total concentration includes both dissolved and sorbed mass:
164 C_total = C + (rho_b/n_por) * s(C)
165 = C + (rho_b/n_por) * k_f * C^(1/n)
167 Parameters
168 ----------
169 c : float or array-like
170 Dissolved concentration [mass/volume]. Non-negative.
172 Returns
173 -------
174 c_total : float or numpy.ndarray
175 Total concentration [mass/volume]. Always >= c.
177 Notes
178 -----
179 This is the conserved quantity in the transport equation:
180 ∂C_total/∂t + ∂(flow*C)/∂v = 0
182 The flux term only includes dissolved concentration because sorbed mass
183 is immobile. Concentrations at or below c_min return C if c_min=0, else use c_min.
184 """
185 is_array = isinstance(c, np.ndarray)
186 c_arr = np.asarray(c)
188 if self.c_min == 0 and self.n < 1.0:
189 # Only for n<1 (lower C travels faster) where C=0 is physically valid
190 sorbed = np.where(
191 c_arr <= 0, 0.0, (self.bulk_density / self.porosity) * self.k_f * (c_arr ** (1.0 / self.n))
192 )
193 else:
194 c_eff = np.maximum(c_arr, self.c_min)
195 sorbed = (self.bulk_density / self.porosity) * self.k_f * (c_eff ** (1.0 / self.n))
197 result = c_arr + sorbed
198 return result if is_array else float(result)
200 def concentration_from_retardation(self, r: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]:
201 """
202 Invert retardation factor to obtain concentration analytically.
204 Given R, solves R = retardation(C) for C. This is used in rarefaction waves
205 where the self-similar solution gives R as a function of position and time.
207 Parameters
208 ----------
209 r : float or array-like
210 Retardation factor [-]. Must be >= 1.0.
212 Returns
213 -------
214 c : float or numpy.ndarray
215 Dissolved concentration [mass/volume]. Non-negative.
217 Notes
218 -----
219 This inverts the relation:
220 R = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1)
222 The analytical solution is:
223 C = [(R-1) * n_por*n / (rho_b*k_f)]^(n/(1-n))
225 For n = 1 (linear sorption), the exponent n/(1-n) is undefined, which is
226 why linear sorption must use ConstantRetardation class instead.
228 Examples
229 --------
230 >>> sorption = FreundlichSorption(
231 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
232 ... )
233 >>> r = sorption.retardation(5.0)
234 >>> c = sorption.concentration_from_retardation(r)
235 >>> bool(np.isclose(c, 5.0, rtol=1e-14))
236 True
237 """
238 is_array = isinstance(r, np.ndarray)
239 r_arr = np.asarray(r)
241 exponent = (1.0 / self.n) - 1.0
243 if abs(exponent) < EPSILON_EXPONENT:
244 msg = "Cannot invert linear retardation (n=1)"
245 raise ValueError(msg)
247 coefficient = (self.bulk_density * self.k_f) / (self.porosity * self.n)
248 base = (r_arr - 1.0) / coefficient
250 inversion_exponent = 1.0 / exponent
251 c = base**inversion_exponent
252 result = np.maximum(c, self.c_min)
253 result = np.where(r_arr <= 1.0, self.c_min, result)
254 result = np.where(base <= 0, self.c_min, result)
256 return result if is_array else float(result)
258 def shock_velocity(self, c_left: float, c_right: float, flow: float) -> float:
259 """
260 Compute shock velocity via Rankine-Hugoniot condition.
262 The Rankine-Hugoniot condition ensures mass conservation across the shock:
263 s_shock = [flux(C_R) - flux(C_L)] / [C_total(C_R) - C_total(C_L)]
265 where flux(C) = flow * C (only dissolved species are transported).
267 Parameters
268 ----------
269 c_left : float
270 Concentration upstream (behind) shock [mass/volume].
271 c_right : float
272 Concentration downstream (ahead of) shock [mass/volume].
273 flow : float
274 Flow rate [volume/time]. Must be positive.
276 Returns
277 -------
278 s_shock : float
279 Shock velocity [volume/time].
281 Notes
282 -----
283 The Rankine-Hugoniot condition is derived from integrating the conservation
284 law across the shock discontinuity. It ensures that the total mass flux
285 (advective transport) is conserved.
287 For physical shocks with n > 1 (higher C travels faster):
288 - c_left > c_right (concentration decreases across shock)
289 - The shock velocity is between the characteristic velocities
291 Examples
292 --------
293 >>> sorption = FreundlichSorption(
294 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
295 ... )
296 >>> v_shock = sorption.shock_velocity(c_left=10.0, c_right=2.0, flow=100.0)
297 >>> v_shock > 0
298 True
299 """
300 # Flux = flow * C (only dissolved species flow)
301 flux_left = flow * c_left
302 flux_right = flow * c_right
304 # Total concentration (dissolved + sorbed)
305 c_total_left = self.total_concentration(c_left)
306 c_total_right = self.total_concentration(c_right)
308 # Rankine-Hugoniot condition
309 # s_shock = Δflux / ΔC_total
310 denom = c_total_right - c_total_left
312 # Guard against degenerate "shock" states where the total
313 # concentration jump tends to zero. In the analytic limit
314 # ΔC_total → 0, the Rankine-Hugoniot speed approaches the
315 # characteristic velocity, so we fall back to that value
316 # instead of dividing by an extremely small number.
317 if abs(denom) < EPSILON_DENOMINATOR:
318 avg_retardation = 0.5 * (self.retardation(c_left) + self.retardation(c_right))
319 return float(flow / avg_retardation)
321 return float((flux_right - flux_left) / denom)
323 def check_entropy_condition(self, c_left: float, c_right: float, shock_vel: float, flow: float) -> bool:
324 """
325 Verify Lax entropy condition for physical admissibility of shock.
327 The Lax entropy condition ensures that characteristics flow INTO the shock
328 from both sides, which is required for physical shocks::
330 λ(C_L) > s_shock > λ(C_R)
332 where λ(C) = flow / R(C) is the characteristic velocity.
334 Parameters
335 ----------
336 c_left : float
337 Concentration upstream of shock [mass/volume].
338 c_right : float
339 Concentration downstream of shock [mass/volume].
340 shock_vel : float
341 Shock velocity [volume/time].
342 flow : float
343 Flow rate [volume/time].
345 Returns
346 -------
347 satisfies : bool
348 True if shock satisfies entropy condition (is physical).
350 Notes
351 -----
352 Shocks that violate the entropy condition are unphysical and should be
353 replaced by rarefaction waves. The entropy condition prevents non-physical
354 expansion shocks.
356 For n > 1 (higher C travels faster):
357 - Physical shocks have c_left > c_right
358 - Characteristic from left is faster: λ(c_left) > λ(c_right)
359 - Shock velocity is between them
361 For n < 1 (lower C travels faster):
362 - Physical shocks have c_left < c_right
363 - Characteristic from left is slower: λ(c_left) < λ(c_right)
364 - Shock velocity is still between them
366 Examples
367 --------
368 >>> sorption = FreundlichSorption(
369 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
370 ... )
371 >>> # Physical shock (compression for n>1)
372 >>> v_shock = sorption.shock_velocity(10.0, 2.0, 100.0)
373 >>> sorption.check_entropy_condition(10.0, 2.0, v_shock, 100.0)
374 True
375 >>> # Unphysical shock (expansion for n>1)
376 >>> v_shock_bad = sorption.shock_velocity(2.0, 10.0, 100.0)
377 >>> sorption.check_entropy_condition(2.0, 10.0, v_shock_bad, 100.0)
378 False
379 """
380 # Characteristic velocities
381 lambda_left = flow / self.retardation(c_left)
382 lambda_right = flow / self.retardation(c_right)
384 # If any of the velocities are non-finite (can occur for
385 # test-generated edge states), treat the entropy condition as
386 # violated rather than propagating RuntimeWarnings.
387 if not np.isfinite(lambda_left) or not np.isfinite(lambda_right) or not np.isfinite(shock_vel):
388 return False
390 # Lax condition: λ(C_L) > s_shock > λ(C_R)
391 # Use small tolerance for floating-point comparison
392 tolerance = 1e-14 * max(abs(lambda_left), abs(lambda_right), abs(shock_vel))
394 return bool((lambda_left > shock_vel - tolerance) and (shock_vel > lambda_right - tolerance))
397@dataclass
398class ConstantRetardation:
399 """
400 Constant (linear) retardation model.
402 For linear sorption: s(C) = K_d * C
403 This gives constant retardation: R(C) = 1 + (rho_b/n_por) * K_d = constant
405 This is a special case where concentration-dependent behavior disappears.
406 Used for conservative tracers or as approximation for weak sorption.
408 Parameters
409 ----------
410 retardation_factor : float
411 Constant retardation factor [-]. Must be >= 1.0.
412 R = 1.0 means no retardation (conservative tracer).
414 Notes
415 -----
416 With constant retardation:
417 - All concentrations travel at same velocity: flow / R
418 - No rarefaction waves form (all concentrations travel together)
419 - Shocks occur only at concentration discontinuities at inlet
420 - Solution reduces to simple time-shifting
422 This is equivalent to using `infiltration_to_extraction_series` in the
423 gwtransport package.
425 Examples
426 --------
427 >>> sorption = ConstantRetardation(retardation_factor=2.0)
428 >>> sorption.retardation(5.0)
429 2.0
430 >>> sorption.retardation(10.0)
431 2.0
432 """
434 retardation_factor: float
435 """Constant retardation factor [-]. Must be >= 1.0."""
437 def __post_init__(self):
438 """Validate parameters after initialization."""
439 if self.retardation_factor < 1.0:
440 msg = f"retardation_factor must be >= 1.0, got {self.retardation_factor}"
441 raise ValueError(msg)
443 def retardation(self, c: float) -> float: # noqa: ARG002
444 """
445 Return constant retardation factor (independent of concentration).
447 Parameters
448 ----------
449 c : float
450 Dissolved concentration (not used for constant retardation).
452 Returns
453 -------
454 r : float
455 Constant retardation factor.
456 """
457 return self.retardation_factor
459 def total_concentration(self, c: float) -> float:
460 """
461 Compute total concentration for linear sorption.
463 For constant retardation:
464 C_total = C * R
466 Parameters
467 ----------
468 c : float
469 Dissolved concentration [mass/volume].
471 Returns
472 -------
473 c_total : float
474 Total concentration [mass/volume].
475 """
476 return c * self.retardation_factor
478 def concentration_from_retardation(self, r: float) -> float:
479 """
480 Not applicable for constant retardation.
482 With constant R, all concentrations have the same retardation, so
483 inversion is not meaningful. This method raises an error.
485 Raises
486 ------
487 NotImplementedError
488 Always raised for constant retardation.
489 """
490 msg = "concentration_from_retardation not applicable for ConstantRetardation (R is independent of C)"
491 raise NotImplementedError(msg)
493 def shock_velocity(self, c_left: float, c_right: float, flow: float) -> float: # noqa: ARG002
494 """
495 Compute shock velocity for constant retardation.
497 With constant R, the shock velocity simplifies to:
498 s_shock = flow / R
500 This is the same as all characteristic velocities.
502 Parameters
503 ----------
504 c_left : float
505 Concentration upstream of shock (not used for constant R).
506 c_right : float
507 Concentration downstream of shock (not used for constant R).
508 flow : float
509 Flow rate [volume/time].
511 Returns
512 -------
513 s_shock : float
514 Shock velocity [volume/time].
515 """
516 return flow / self.retardation_factor
518 def check_entropy_condition(self, c_left: float, c_right: float, shock_vel: float, flow: float) -> bool: # noqa: ARG002, PLR6301
519 """
520 Check entropy condition for constant retardation.
522 With constant R, all characteristic velocities are equal, so the
523 entropy condition is trivially satisfied for any shock (or rather,
524 shocks don't really exist - they're just contact discontinuities).
526 Parameters
527 ----------
528 c_left : float
529 Concentration upstream.
530 c_right : float
531 Concentration downstream.
532 shock_vel : float
533 Shock velocity.
534 flow : float
535 Flow rate.
537 Returns
538 -------
539 satisfies : bool
540 Always True for constant retardation.
541 """
542 return True
545def characteristic_velocity(c: float, flow: float, sorption: FreundlichSorption | ConstantRetardation) -> float:
546 """
547 Compute characteristic velocity for given concentration.
549 In smooth regions of the solution, concentration travels at velocity:
550 v = flow / R(C)
552 Parameters
553 ----------
554 c : float
555 Dissolved concentration [mass/volume].
556 flow : float
557 Flow rate [volume/time].
558 sorption : FreundlichSorption or ConstantRetardation
559 Sorption model.
561 Returns
562 -------
563 velocity : float
564 Characteristic velocity [volume/time].
566 Examples
567 --------
568 >>> sorption = FreundlichSorption(
569 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
570 ... )
571 >>> v = characteristic_velocity(c=5.0, flow=100.0, sorption=sorption)
572 >>> v > 0
573 True
574 """
575 return float(flow / sorption.retardation(c))
578def characteristic_position(
579 c: float, flow: float, sorption: FreundlichSorption | ConstantRetardation, t_start: float, v_start: float, t: float
580) -> float | None:
581 """
582 Compute exact position of characteristic at time t.
584 Characteristics propagate linearly in time:
585 V(t) = v_start + velocity * (t - t_start)
587 where velocity = flow / R(C) is constant along the characteristic.
589 Parameters
590 ----------
591 c : float
592 Concentration carried by characteristic [mass/volume].
593 flow : float
594 Flow rate [volume/time].
595 sorption : FreundlichSorption or ConstantRetardation
596 Sorption model.
597 t_start : float
598 Time when characteristic starts [days].
599 v_start : float
600 Starting position [volume].
601 t : float
602 Time at which to compute position [days].
604 Returns
605 -------
606 position : float or None
607 Position at time t [volume], or None if t < t_start.
609 Examples
610 --------
611 >>> sorption = ConstantRetardation(retardation_factor=2.0)
612 >>> v = characteristic_position(
613 ... c=5.0, flow=100.0, sorption=sorption, t_start=0.0, v_start=0.0, t=10.0
614 ... )
615 >>> bool(np.isclose(v, 500.0)) # v = 100/2 * 10 = 500
616 True
617 """
618 if t < t_start:
619 return None
621 velocity = characteristic_velocity(c, flow, sorption)
622 dt_days = t - t_start
624 return v_start + velocity * dt_days
627def compute_first_front_arrival_time(
628 cin: npt.NDArray[np.floating],
629 flow: npt.NDArray[np.floating],
630 tedges: pd.DatetimeIndex,
631 aquifer_pore_volume: float,
632 sorption: FreundlichSorption | ConstantRetardation,
633) -> float:
634 """
635 Compute exact time when first wave reaches outlet (v_max).
637 This function returns the precise moment when the first non-zero
638 concentration wave from the inlet arrives at the outlet. This marks
639 the end of the spin-up period.
641 For the typical case where the first inlet change creates a characteristic
642 (e.g., 0→C transition), this is when that characteristic reaches v_max.
644 For cases with rarefaction waves:
646 - n>1 (higher C travels faster): The head of a rarefaction
647 (higher C) arrives first.
648 - n<1 (lower C travels faster): The head of a rarefaction
649 (lower C) arrives first.
651 Algorithm:
652 1. Find first index where cin > 0
653 2. Compute residence time for this concentration from inlet to outlet
654 3. Account for piecewise constant flow during transit
655 4. Return arrival time in days from tedges[0]
657 Parameters
658 ----------
659 cin : numpy.ndarray
660 Inlet concentration [mass/volume]. Length = len(tedges) - 1.
661 flow : numpy.ndarray
662 Flow rate [volume/time]. Length = len(tedges) - 1.
663 tedges : pandas.DatetimeIndex
664 Time bin edges. Length = len(cin) + 1.
665 Expected to be DatetimeIndex.
666 aquifer_pore_volume : float
667 Total pore volume [volume]. Must be positive.
668 sorption : FreundlichSorption or ConstantRetardation
669 Sorption model.
671 Returns
672 -------
673 t_first_arrival : float
674 Time when first wave reaches outlet, measured in days from tedges[0].
675 Returns np.inf if no concentration ever arrives.
677 Notes
678 -----
679 The residence time accounts for retardation:
680 residence_time = aquifer_pore_volume * R(C) / flow_avg
682 For piecewise constant flow, we integrate:
683 ∫₀^residence_time flow(t) dt = aquifer_pore_volume * R(C)
685 This function computes the EXACT crossing time in days, not a bin edge.
686 Use compute_first_fully_informed_bin_edge() to get the corresponding
687 output bin edge for masking purposes.
689 Examples
690 --------
691 >>> import pandas as pd
692 >>> cin = np.array([0.0, 10.0] + [10.0] * 10) # First bin zero, then nonzero
693 >>> flow = np.array([100.0] * 12) # Constant flow
694 >>> tedges = pd.date_range("2020-01-01", periods=13, freq="D")
695 >>> sorption = ConstantRetardation(retardation_factor=2.0)
696 >>> t_first = compute_first_front_arrival_time(cin, flow, tedges, 500.0, sorption)
697 >>> # Result is in days from tedges[0]
698 >>> bool(np.isclose(t_first, 11.0)) # 1 day (offset) + 10 days (travel time)
699 True
701 See Also
702 --------
703 compute_first_fully_informed_bin_edge : Get first valid output bin edge
704 """
705 # Find first non-zero concentration
706 nonzero_indices = np.where(cin > 0)[0]
708 if len(nonzero_indices) == 0:
709 # No concentration ever arrives
710 return np.inf
712 idx_first = nonzero_indices[0]
713 c_first = cin[idx_first]
715 # Compute retardation for this concentration
716 r_first = sorption.retardation(c_first)
718 # Target: cumulative flow volume needed to reach outlet
719 target_volume = aquifer_pore_volume * r_first
721 # Integrate piecewise constant flow starting from idx_first
722 # tedges is assumed to be DatetimeIndex, convert all times to days
723 cumulative_volume = 0.0
725 for i in range(idx_first, len(flow)):
726 # Convert time interval to days
727 dt_days = (tedges[i + 1] - tedges[i]) / pd.Timedelta(days=1)
728 volume_in_bin = flow[i] * dt_days
730 if cumulative_volume + volume_in_bin >= target_volume:
731 # First arrival occurs during this bin
732 remaining_volume = target_volume - cumulative_volume
733 dt_partial = remaining_volume / flow[i]
735 # Return time in days from tedges[0]
736 t_bin_start_days = (tedges[i] - tedges[0]) / pd.Timedelta(days=1)
737 return t_bin_start_days + dt_partial
739 cumulative_volume += volume_in_bin
741 # Never reaches outlet with given flow history
742 return np.inf
745def compute_first_fully_informed_bin_edge(
746 cin: npt.NDArray[np.floating],
747 flow: npt.NDArray[np.floating],
748 tedges: pd.DatetimeIndex,
749 aquifer_pore_volume: float,
750 sorption: FreundlichSorption | ConstantRetardation,
751 output_tedges: pd.DatetimeIndex,
752) -> float:
753 """
754 Compute left edge of first output bin that is fully informed.
756 A bin [t_i, t_{i+1}] is "fully informed" if all water exiting during
757 that interval originated from known inlet conditions (not unknown
758 initial conditions). This occurs when t_i >= first front arrival time.
760 This function is useful for:
761 - Masking output bins during spin-up period
762 - Determining which output times are valid for analysis
763 - Plotting valid vs spin-up regions
765 Rarefaction handling:
767 - For n>1: Rarefaction tail (lower C, slower) arrives after head.
768 Once the first wave (head) arrives, subsequent bins are informed.
769 - For n<1: Rarefaction tail (higher C, slower) arrives after head.
770 Same principle applies.
772 In both cases, once the leading edge of the inlet-generated wave structure
773 reaches the outlet, all subsequent output is determined by inlet history.
775 Parameters
776 ----------
777 cin : numpy.ndarray
778 Inlet concentration [mass/volume]. Length = len(tedges) - 1.
779 flow : numpy.ndarray
780 Flow rate [volume/time]. Length = len(tedges) - 1.
781 tedges : pandas.DatetimeIndex
782 Inlet time bin edges. Length = len(cin) + 1.
783 Expected to be DatetimeIndex.
784 aquifer_pore_volume : float
785 Total pore volume [volume]. Must be positive.
786 sorption : FreundlichSorption or ConstantRetardation
787 Sorption model.
788 output_tedges : pandas.DatetimeIndex
789 Output time bin edges. These are the bins for which we want
790 to determine the first fully-informed bin.
791 Expected to be DatetimeIndex.
793 Returns
794 -------
795 t_first_bin : float
796 Left edge of first output bin that is fully informed, measured in
797 days from output_tedges[0].
798 Returns last edge in days if no bin is fully informed.
799 Returns np.inf if output_tedges is empty.
801 Notes
802 -----
803 This differs from compute_first_front_arrival_time in that it returns
804 a BIN EDGE (from output_tedges), not the exact crossing time.
806 Both functions return time in days, but measured from different reference points:
807 - compute_first_front_arrival_time: days from tedges[0]
808 - compute_first_fully_informed_bin_edge: days from output_tedges[0]
810 For masking output arrays::
812 import pandas as pd
814 t_first_bin = compute_first_fully_informed_bin_edge(...)
815 # Convert output_tedges to days from output_tedges[0]
816 tedges_days = (output_tedges - output_tedges[0]) / pd.Timedelta(days=1)
817 mask = tedges_days[:-1] >= t_first_bin
818 cout_valid = cout[mask]
820 Examples
821 --------
822 >>> import pandas as pd
823 >>> # Exact arrival at ~11 days from tedges[0]
824 >>> cin = np.array([0.0, 10.0, 10.0])
825 >>> flow = np.array([100.0, 100.0, 100.0])
826 >>> tedges = pd.date_range("2020-01-01", periods=4, freq="D")
827 >>> output_tedges = pd.date_range("2020-01-01", periods=20, freq="D")
828 >>> sorption = ConstantRetardation(retardation_factor=2.0)
829 >>> t_bin = compute_first_fully_informed_bin_edge(
830 ... cin, flow, tedges, 500.0, sorption, output_tedges
831 ... )
832 >>> # Result is in days from output_tedges[0]
833 >>> t_bin >= 11.0 # First bin edge >= arrival time
834 True
836 See Also
837 --------
838 compute_first_front_arrival_time : Get exact arrival time
839 """
840 if len(output_tedges) == 0:
841 return np.inf
843 # Compute exact arrival time (in days from tedges[0])
844 t_arrival_days = compute_first_front_arrival_time(cin, flow, tedges, aquifer_pore_volume, sorption)
846 if not np.isfinite(t_arrival_days):
847 # No arrival, return last edge in days from output_tedges[0]
848 return (output_tedges[-1] - output_tedges[0]) / pd.Timedelta(days=1)
850 # Convert output_tedges to days from output_tedges[0]
851 # Find first bin edge >= t_arrival_days
852 # Note: t_arrival_days is measured from tedges[0], but output_tedges might have different start
853 # So we need to adjust the reference point
855 # Convert t_arrival from "days from tedges[0]" to "days from output_tedges[0]"
856 t_arrival_abs = tedges[0] + pd.Timedelta(days=t_arrival_days)
857 t_arrival_output_ref = (t_arrival_abs - output_tedges[0]) / pd.Timedelta(days=1)
859 # Find first output bin edge >= t_arrival
860 for t_edge in output_tedges:
861 t_edge_days = (t_edge - output_tedges[0]) / pd.Timedelta(days=1)
862 if t_edge_days >= t_arrival_output_ref:
863 return t_edge_days
865 # If no edge is >= t_arrival, return the last edge
866 # (This means all bins are before arrival)
867 return (output_tedges[-1] - output_tedges[0]) / pd.Timedelta(days=1)