Coverage for src / gwtransport / fronttracking / math.py: 88%

144 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-04 18:51 +0000

1""" 

2Mathematical Foundation for Front Tracking with Nonlinear Sorption. 

3 

4This module provides exact analytical computations for: 

5- Freundlich and constant retardation models 

6- Shock velocities via Rankine-Hugoniot condition 

7- Characteristic velocities and positions 

8- First arrival time calculations 

9- Entropy condition verification 

10 

11All computations are exact analytical formulas with no numerical tolerances. 

12 

13This file is part of gwtransport which is released under AGPL-3.0 license. 

14See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

15""" 

16 

17from dataclasses import dataclass 

18 

19import numpy as np 

20import numpy.typing as npt 

21import pandas as pd 

22 

23# Numerical tolerance constants 

24EPSILON_FREUNDLICH_N = 1e-10 # Tolerance for checking if n ≈ 1.0 

25EPSILON_EXPONENT = 1e-10 # Tolerance for checking if exponent ≈ 0 

26EPSILON_DENOMINATOR = 1e-15 # Tolerance for near-zero denominators in shock velocity 

27 

28 

29@dataclass 

30class FreundlichSorption: 

31 """ 

32 Freundlich sorption isotherm with exact analytical methods. 

33 

34 The Freundlich isotherm is: s(C) = k_f * C^(1/n) 

35 

36 where: 

37 - s is sorbed concentration [mass/mass of solid] 

38 - C is dissolved concentration [mass/volume of water] 

39 - k_f is Freundlich coefficient [(volume/mass)^(1/n)] 

40 - n is Freundlich exponent (dimensionless) 

41 

42 For n > 1: Higher C travels faster 

43 For n < 1: Higher C travels slower 

44 For n = 1: linear (not supported, use ConstantRetardation instead) 

45 

46 Parameters 

47 ---------- 

48 k_f : float 

49 Freundlich coefficient [(m³/kg)^(1/n)]. Must be positive. 

50 n : float 

51 Freundlich exponent [-]. Must be positive and != 1. 

52 bulk_density : float 

53 Bulk density of porous medium [kg/m³]. Must be positive. 

54 porosity : float 

55 Porosity [-]. Must be in (0, 1). 

56 c_min : float, optional 

57 Minimum concentration threshold. For n>1, prevents infinite retardation 

58 as C→0. Default: 0.1 for n>1, 0.0 for n<1 (set automatically if not provided). 

59 

60 Examples 

61 -------- 

62 >>> sorption = FreundlichSorption( 

63 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

64 ... ) 

65 >>> r = sorption.retardation(5.0) 

66 >>> c_back = sorption.concentration_from_retardation(r) 

67 >>> bool(np.isclose(c_back, 5.0)) 

68 True 

69 

70 Notes 

71 ----- 

72 The retardation factor is defined as: 

73 R(C) = 1 + (rho_b/n_por) * ds/dC 

74 = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1) 

75 

76 For Freundlich sorption, R depends on C, which creates nonlinear wave behavior. 

77 

78 For n>1 (higher C travels faster), R(C)→∞ as C→0, which can cause extremely slow 

79 wave propagation. The c_min parameter prevents this by enforcing a minimum 

80 concentration, making R(C) finite for all C≥0. 

81 """ 

82 

83 k_f: float 

84 """Freundlich coefficient [(m³/kg)^(1/n)].""" 

85 n: float 

86 """Freundlich exponent [-].""" 

87 bulk_density: float 

88 """Bulk density of porous medium [kg/m³].""" 

89 porosity: float 

90 """Porosity [-].""" 

91 c_min: float = 1e-12 

92 """Minimum concentration threshold to prevent infinite retardation.""" 

93 

94 def __post_init__(self): 

95 """Validate parameters after initialization.""" 

96 if self.k_f <= 0: 

97 msg = f"k_f must be positive, got {self.k_f}" 

98 raise ValueError(msg) 

99 if self.n <= 0: 

100 msg = f"n must be positive, got {self.n}" 

101 raise ValueError(msg) 

102 if abs(self.n - 1.0) < EPSILON_FREUNDLICH_N: 

103 msg = "n = 1 (linear case) not supported, use ConstantRetardation instead" 

104 raise ValueError(msg) 

105 if self.bulk_density <= 0: 

106 msg = f"bulk_density must be positive, got {self.bulk_density}" 

107 raise ValueError(msg) 

108 if not 0 < self.porosity < 1: 

109 msg = f"porosity must be in (0, 1), got {self.porosity}" 

110 raise ValueError(msg) 

111 if self.c_min < 0: 

112 msg = f"c_min must be non-negative, got {self.c_min}" 

113 raise ValueError(msg) 

114 

115 def retardation(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

116 """ 

117 Compute retardation factor R(C). 

118 

119 The retardation factor relates concentration velocity to pore water velocity: 

120 v_C = flow / R(C) 

121 

122 For Freundlich sorption: 

123 R(C) = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1) 

124 

125 Parameters 

126 ---------- 

127 c : float or array-like 

128 Dissolved concentration [mass/volume]. Non-negative. 

129 

130 Returns 

131 ------- 

132 r : float or numpy.ndarray 

133 Retardation factor [-]. Always >= 1.0. 

134 

135 Notes 

136 ----- 

137 - For n > 1: R decreases with increasing C (higher C travels faster) 

138 - For n < 1: R increases with increasing C (higher C travels slower) 

139 - Concentrations at or below c_min return R=1 if c_min=0, else are clipped to c_min 

140 """ 

141 is_array = isinstance(c, np.ndarray) 

142 c_arr = np.asarray(c) 

143 

144 if self.c_min == 0 and self.n < 1.0: 

145 # Only for n<1 (lower C travels faster) where R(0)=1 is physically correct 

146 result = np.where(c_arr <= 0, 1.0, self._compute_retardation(c_arr)) 

147 else: 

148 c_eff = np.maximum(c_arr, self.c_min) 

149 result = self._compute_retardation(c_eff) 

150 

151 return result if is_array else float(result) 

152 

153 def _compute_retardation(self, c: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: 

154 """Compute retardation for positive concentrations.""" 

155 exponent = (1.0 / self.n) - 1.0 

156 coefficient = (self.bulk_density * self.k_f) / (self.porosity * self.n) 

157 return 1.0 + coefficient * (c**exponent) 

158 

159 def total_concentration(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

160 """ 

161 Compute total concentration (dissolved + sorbed per unit pore volume). 

162 

163 Total concentration includes both dissolved and sorbed mass: 

164 C_total = C + (rho_b/n_por) * s(C) 

165 = C + (rho_b/n_por) * k_f * C^(1/n) 

166 

167 Parameters 

168 ---------- 

169 c : float or array-like 

170 Dissolved concentration [mass/volume]. Non-negative. 

171 

172 Returns 

173 ------- 

174 c_total : float or numpy.ndarray 

175 Total concentration [mass/volume]. Always >= c. 

176 

177 Notes 

178 ----- 

179 This is the conserved quantity in the transport equation: 

180 ∂C_total/∂t + ∂(flow*C)/∂v = 0 

181 

182 The flux term only includes dissolved concentration because sorbed mass 

183 is immobile. Concentrations at or below c_min return C if c_min=0, else use c_min. 

184 """ 

185 is_array = isinstance(c, np.ndarray) 

186 c_arr = np.asarray(c) 

187 

188 if self.c_min == 0 and self.n < 1.0: 

189 # Only for n<1 (lower C travels faster) where C=0 is physically valid 

190 sorbed = np.where( 

191 c_arr <= 0, 0.0, (self.bulk_density / self.porosity) * self.k_f * (c_arr ** (1.0 / self.n)) 

192 ) 

193 else: 

194 c_eff = np.maximum(c_arr, self.c_min) 

195 sorbed = (self.bulk_density / self.porosity) * self.k_f * (c_eff ** (1.0 / self.n)) 

196 

197 result = c_arr + sorbed 

198 return result if is_array else float(result) 

199 

200 def concentration_from_retardation(self, r: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

201 """ 

202 Invert retardation factor to obtain concentration analytically. 

203 

204 Given R, solves R = retardation(C) for C. This is used in rarefaction waves 

205 where the self-similar solution gives R as a function of position and time. 

206 

207 Parameters 

208 ---------- 

209 r : float or array-like 

210 Retardation factor [-]. Must be >= 1.0. 

211 

212 Returns 

213 ------- 

214 c : float or numpy.ndarray 

215 Dissolved concentration [mass/volume]. Non-negative. 

216 

217 Notes 

218 ----- 

219 This inverts the relation: 

220 R = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1) 

221 

222 The analytical solution is: 

223 C = [(R-1) * n_por*n / (rho_b*k_f)]^(n/(1-n)) 

224 

225 For n = 1 (linear sorption), the exponent n/(1-n) is undefined, which is 

226 why linear sorption must use ConstantRetardation class instead. 

227 

228 Examples 

229 -------- 

230 >>> sorption = FreundlichSorption( 

231 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

232 ... ) 

233 >>> r = sorption.retardation(5.0) 

234 >>> c = sorption.concentration_from_retardation(r) 

235 >>> bool(np.isclose(c, 5.0, rtol=1e-14)) 

236 True 

237 """ 

238 is_array = isinstance(r, np.ndarray) 

239 r_arr = np.asarray(r) 

240 

241 exponent = (1.0 / self.n) - 1.0 

242 

243 if abs(exponent) < EPSILON_EXPONENT: 

244 msg = "Cannot invert linear retardation (n=1)" 

245 raise ValueError(msg) 

246 

247 coefficient = (self.bulk_density * self.k_f) / (self.porosity * self.n) 

248 base = (r_arr - 1.0) / coefficient 

249 

250 inversion_exponent = 1.0 / exponent 

251 c = base**inversion_exponent 

252 result = np.maximum(c, self.c_min) 

253 result = np.where(r_arr <= 1.0, self.c_min, result) 

254 result = np.where(base <= 0, self.c_min, result) 

255 

256 return result if is_array else float(result) 

257 

258 def shock_velocity(self, c_left: float, c_right: float, flow: float) -> float: 

259 """ 

260 Compute shock velocity via Rankine-Hugoniot condition. 

261 

262 The Rankine-Hugoniot condition ensures mass conservation across the shock: 

263 s_shock = [flux(C_R) - flux(C_L)] / [C_total(C_R) - C_total(C_L)] 

264 

265 where flux(C) = flow * C (only dissolved species are transported). 

266 

267 Parameters 

268 ---------- 

269 c_left : float 

270 Concentration upstream (behind) shock [mass/volume]. 

271 c_right : float 

272 Concentration downstream (ahead of) shock [mass/volume]. 

273 flow : float 

274 Flow rate [volume/time]. Must be positive. 

275 

276 Returns 

277 ------- 

278 s_shock : float 

279 Shock velocity [volume/time]. 

280 

281 Notes 

282 ----- 

283 The Rankine-Hugoniot condition is derived from integrating the conservation 

284 law across the shock discontinuity. It ensures that the total mass flux 

285 (advective transport) is conserved. 

286 

287 For physical shocks with n > 1 (higher C travels faster): 

288 - c_left > c_right (concentration decreases across shock) 

289 - The shock velocity is between the characteristic velocities 

290 

291 Examples 

292 -------- 

293 >>> sorption = FreundlichSorption( 

294 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

295 ... ) 

296 >>> v_shock = sorption.shock_velocity(c_left=10.0, c_right=2.0, flow=100.0) 

297 >>> v_shock > 0 

298 True 

299 """ 

300 # Flux = flow * C (only dissolved species flow) 

301 flux_left = flow * c_left 

302 flux_right = flow * c_right 

303 

304 # Total concentration (dissolved + sorbed) 

305 c_total_left = self.total_concentration(c_left) 

306 c_total_right = self.total_concentration(c_right) 

307 

308 # Rankine-Hugoniot condition 

309 # s_shock = Δflux / ΔC_total 

310 denom = c_total_right - c_total_left 

311 

312 # Guard against degenerate "shock" states where the total 

313 # concentration jump tends to zero. In the analytic limit 

314 # ΔC_total → 0, the Rankine-Hugoniot speed approaches the 

315 # characteristic velocity, so we fall back to that value 

316 # instead of dividing by an extremely small number. 

317 if abs(denom) < EPSILON_DENOMINATOR: 

318 avg_retardation = 0.5 * (self.retardation(c_left) + self.retardation(c_right)) 

319 return float(flow / avg_retardation) 

320 

321 return float((flux_right - flux_left) / denom) 

322 

323 def check_entropy_condition(self, c_left: float, c_right: float, shock_vel: float, flow: float) -> bool: 

324 """ 

325 Verify Lax entropy condition for physical admissibility of shock. 

326 

327 The Lax entropy condition ensures that characteristics flow INTO the shock 

328 from both sides, which is required for physical shocks:: 

329 

330 λ(C_L) > s_shock > λ(C_R) 

331 

332 where λ(C) = flow / R(C) is the characteristic velocity. 

333 

334 Parameters 

335 ---------- 

336 c_left : float 

337 Concentration upstream of shock [mass/volume]. 

338 c_right : float 

339 Concentration downstream of shock [mass/volume]. 

340 shock_vel : float 

341 Shock velocity [volume/time]. 

342 flow : float 

343 Flow rate [volume/time]. 

344 

345 Returns 

346 ------- 

347 satisfies : bool 

348 True if shock satisfies entropy condition (is physical). 

349 

350 Notes 

351 ----- 

352 Shocks that violate the entropy condition are unphysical and should be 

353 replaced by rarefaction waves. The entropy condition prevents non-physical 

354 expansion shocks. 

355 

356 For n > 1 (higher C travels faster): 

357 - Physical shocks have c_left > c_right 

358 - Characteristic from left is faster: λ(c_left) > λ(c_right) 

359 - Shock velocity is between them 

360 

361 For n < 1 (lower C travels faster): 

362 - Physical shocks have c_left < c_right 

363 - Characteristic from left is slower: λ(c_left) < λ(c_right) 

364 - Shock velocity is still between them 

365 

366 Examples 

367 -------- 

368 >>> sorption = FreundlichSorption( 

369 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

370 ... ) 

371 >>> # Physical shock (compression for n>1) 

372 >>> v_shock = sorption.shock_velocity(10.0, 2.0, 100.0) 

373 >>> sorption.check_entropy_condition(10.0, 2.0, v_shock, 100.0) 

374 True 

375 >>> # Unphysical shock (expansion for n>1) 

376 >>> v_shock_bad = sorption.shock_velocity(2.0, 10.0, 100.0) 

377 >>> sorption.check_entropy_condition(2.0, 10.0, v_shock_bad, 100.0) 

378 False 

379 """ 

380 # Characteristic velocities 

381 lambda_left = flow / self.retardation(c_left) 

382 lambda_right = flow / self.retardation(c_right) 

383 

384 # If any of the velocities are non-finite (can occur for 

385 # test-generated edge states), treat the entropy condition as 

386 # violated rather than propagating RuntimeWarnings. 

387 if not np.isfinite(lambda_left) or not np.isfinite(lambda_right) or not np.isfinite(shock_vel): 

388 return False 

389 

390 # Lax condition: λ(C_L) > s_shock > λ(C_R) 

391 # Use small tolerance for floating-point comparison 

392 tolerance = 1e-14 * max(abs(lambda_left), abs(lambda_right), abs(shock_vel)) 

393 

394 return bool((lambda_left > shock_vel - tolerance) and (shock_vel > lambda_right - tolerance)) 

395 

396 

397@dataclass 

398class ConstantRetardation: 

399 """ 

400 Constant (linear) retardation model. 

401 

402 For linear sorption: s(C) = K_d * C 

403 This gives constant retardation: R(C) = 1 + (rho_b/n_por) * K_d = constant 

404 

405 This is a special case where concentration-dependent behavior disappears. 

406 Used for conservative tracers or as approximation for weak sorption. 

407 

408 Parameters 

409 ---------- 

410 retardation_factor : float 

411 Constant retardation factor [-]. Must be >= 1.0. 

412 R = 1.0 means no retardation (conservative tracer). 

413 

414 Notes 

415 ----- 

416 With constant retardation: 

417 - All concentrations travel at same velocity: flow / R 

418 - No rarefaction waves form (all concentrations travel together) 

419 - Shocks occur only at concentration discontinuities at inlet 

420 - Solution reduces to simple time-shifting 

421 

422 This is equivalent to using `infiltration_to_extraction_series` in the 

423 gwtransport package. 

424 

425 Examples 

426 -------- 

427 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

428 >>> sorption.retardation(5.0) 

429 2.0 

430 >>> sorption.retardation(10.0) 

431 2.0 

432 """ 

433 

434 retardation_factor: float 

435 """Constant retardation factor [-]. Must be >= 1.0.""" 

436 

437 def __post_init__(self): 

438 """Validate parameters after initialization.""" 

439 if self.retardation_factor < 1.0: 

440 msg = f"retardation_factor must be >= 1.0, got {self.retardation_factor}" 

441 raise ValueError(msg) 

442 

443 def retardation(self, c: float) -> float: # noqa: ARG002 

444 """ 

445 Return constant retardation factor (independent of concentration). 

446 

447 Parameters 

448 ---------- 

449 c : float 

450 Dissolved concentration (not used for constant retardation). 

451 

452 Returns 

453 ------- 

454 r : float 

455 Constant retardation factor. 

456 """ 

457 return self.retardation_factor 

458 

459 def total_concentration(self, c: float) -> float: 

460 """ 

461 Compute total concentration for linear sorption. 

462 

463 For constant retardation: 

464 C_total = C * R 

465 

466 Parameters 

467 ---------- 

468 c : float 

469 Dissolved concentration [mass/volume]. 

470 

471 Returns 

472 ------- 

473 c_total : float 

474 Total concentration [mass/volume]. 

475 """ 

476 return c * self.retardation_factor 

477 

478 def concentration_from_retardation(self, r: float) -> float: 

479 """ 

480 Not applicable for constant retardation. 

481 

482 With constant R, all concentrations have the same retardation, so 

483 inversion is not meaningful. This method raises an error. 

484 

485 Raises 

486 ------ 

487 NotImplementedError 

488 Always raised for constant retardation. 

489 """ 

490 msg = "concentration_from_retardation not applicable for ConstantRetardation (R is independent of C)" 

491 raise NotImplementedError(msg) 

492 

493 def shock_velocity(self, c_left: float, c_right: float, flow: float) -> float: # noqa: ARG002 

494 """ 

495 Compute shock velocity for constant retardation. 

496 

497 With constant R, the shock velocity simplifies to: 

498 s_shock = flow / R 

499 

500 This is the same as all characteristic velocities. 

501 

502 Parameters 

503 ---------- 

504 c_left : float 

505 Concentration upstream of shock (not used for constant R). 

506 c_right : float 

507 Concentration downstream of shock (not used for constant R). 

508 flow : float 

509 Flow rate [volume/time]. 

510 

511 Returns 

512 ------- 

513 s_shock : float 

514 Shock velocity [volume/time]. 

515 """ 

516 return flow / self.retardation_factor 

517 

518 def check_entropy_condition(self, c_left: float, c_right: float, shock_vel: float, flow: float) -> bool: # noqa: ARG002, PLR6301 

519 """ 

520 Check entropy condition for constant retardation. 

521 

522 With constant R, all characteristic velocities are equal, so the 

523 entropy condition is trivially satisfied for any shock (or rather, 

524 shocks don't really exist - they're just contact discontinuities). 

525 

526 Parameters 

527 ---------- 

528 c_left : float 

529 Concentration upstream. 

530 c_right : float 

531 Concentration downstream. 

532 shock_vel : float 

533 Shock velocity. 

534 flow : float 

535 Flow rate. 

536 

537 Returns 

538 ------- 

539 satisfies : bool 

540 Always True for constant retardation. 

541 """ 

542 return True 

543 

544 

545def characteristic_velocity(c: float, flow: float, sorption: FreundlichSorption | ConstantRetardation) -> float: 

546 """ 

547 Compute characteristic velocity for given concentration. 

548 

549 In smooth regions of the solution, concentration travels at velocity: 

550 v = flow / R(C) 

551 

552 Parameters 

553 ---------- 

554 c : float 

555 Dissolved concentration [mass/volume]. 

556 flow : float 

557 Flow rate [volume/time]. 

558 sorption : FreundlichSorption or ConstantRetardation 

559 Sorption model. 

560 

561 Returns 

562 ------- 

563 velocity : float 

564 Characteristic velocity [volume/time]. 

565 

566 Examples 

567 -------- 

568 >>> sorption = FreundlichSorption( 

569 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

570 ... ) 

571 >>> v = characteristic_velocity(c=5.0, flow=100.0, sorption=sorption) 

572 >>> v > 0 

573 True 

574 """ 

575 return float(flow / sorption.retardation(c)) 

576 

577 

578def characteristic_position( 

579 c: float, flow: float, sorption: FreundlichSorption | ConstantRetardation, t_start: float, v_start: float, t: float 

580) -> float | None: 

581 """ 

582 Compute exact position of characteristic at time t. 

583 

584 Characteristics propagate linearly in time: 

585 V(t) = v_start + velocity * (t - t_start) 

586 

587 where velocity = flow / R(C) is constant along the characteristic. 

588 

589 Parameters 

590 ---------- 

591 c : float 

592 Concentration carried by characteristic [mass/volume]. 

593 flow : float 

594 Flow rate [volume/time]. 

595 sorption : FreundlichSorption or ConstantRetardation 

596 Sorption model. 

597 t_start : float 

598 Time when characteristic starts [days]. 

599 v_start : float 

600 Starting position [volume]. 

601 t : float 

602 Time at which to compute position [days]. 

603 

604 Returns 

605 ------- 

606 position : float or None 

607 Position at time t [volume], or None if t < t_start. 

608 

609 Examples 

610 -------- 

611 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

612 >>> v = characteristic_position( 

613 ... c=5.0, flow=100.0, sorption=sorption, t_start=0.0, v_start=0.0, t=10.0 

614 ... ) 

615 >>> bool(np.isclose(v, 500.0)) # v = 100/2 * 10 = 500 

616 True 

617 """ 

618 if t < t_start: 

619 return None 

620 

621 velocity = characteristic_velocity(c, flow, sorption) 

622 dt_days = t - t_start 

623 

624 return v_start + velocity * dt_days 

625 

626 

627def compute_first_front_arrival_time( 

628 cin: npt.NDArray[np.floating], 

629 flow: npt.NDArray[np.floating], 

630 tedges: pd.DatetimeIndex, 

631 aquifer_pore_volume: float, 

632 sorption: FreundlichSorption | ConstantRetardation, 

633) -> float: 

634 """ 

635 Compute exact time when first wave reaches outlet (v_max). 

636 

637 This function returns the precise moment when the first non-zero 

638 concentration wave from the inlet arrives at the outlet. This marks 

639 the end of the spin-up period. 

640 

641 For the typical case where the first inlet change creates a characteristic 

642 (e.g., 0→C transition), this is when that characteristic reaches v_max. 

643 

644 For cases with rarefaction waves: 

645 

646 - n>1 (higher C travels faster): The head of a rarefaction 

647 (higher C) arrives first. 

648 - n<1 (lower C travels faster): The head of a rarefaction 

649 (lower C) arrives first. 

650 

651 Algorithm: 

652 1. Find first index where cin > 0 

653 2. Compute residence time for this concentration from inlet to outlet 

654 3. Account for piecewise constant flow during transit 

655 4. Return arrival time in days from tedges[0] 

656 

657 Parameters 

658 ---------- 

659 cin : numpy.ndarray 

660 Inlet concentration [mass/volume]. Length = len(tedges) - 1. 

661 flow : numpy.ndarray 

662 Flow rate [volume/time]. Length = len(tedges) - 1. 

663 tedges : pandas.DatetimeIndex 

664 Time bin edges. Length = len(cin) + 1. 

665 Expected to be DatetimeIndex. 

666 aquifer_pore_volume : float 

667 Total pore volume [volume]. Must be positive. 

668 sorption : FreundlichSorption or ConstantRetardation 

669 Sorption model. 

670 

671 Returns 

672 ------- 

673 t_first_arrival : float 

674 Time when first wave reaches outlet, measured in days from tedges[0]. 

675 Returns np.inf if no concentration ever arrives. 

676 

677 Notes 

678 ----- 

679 The residence time accounts for retardation: 

680 residence_time = aquifer_pore_volume * R(C) / flow_avg 

681 

682 For piecewise constant flow, we integrate: 

683 ∫₀^residence_time flow(t) dt = aquifer_pore_volume * R(C) 

684 

685 This function computes the EXACT crossing time in days, not a bin edge. 

686 Use compute_first_fully_informed_bin_edge() to get the corresponding 

687 output bin edge for masking purposes. 

688 

689 Examples 

690 -------- 

691 >>> import pandas as pd 

692 >>> cin = np.array([0.0, 10.0] + [10.0] * 10) # First bin zero, then nonzero 

693 >>> flow = np.array([100.0] * 12) # Constant flow 

694 >>> tedges = pd.date_range("2020-01-01", periods=13, freq="D") 

695 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

696 >>> t_first = compute_first_front_arrival_time(cin, flow, tedges, 500.0, sorption) 

697 >>> # Result is in days from tedges[0] 

698 >>> bool(np.isclose(t_first, 11.0)) # 1 day (offset) + 10 days (travel time) 

699 True 

700 

701 See Also 

702 -------- 

703 compute_first_fully_informed_bin_edge : Get first valid output bin edge 

704 """ 

705 # Find first non-zero concentration 

706 nonzero_indices = np.where(cin > 0)[0] 

707 

708 if len(nonzero_indices) == 0: 

709 # No concentration ever arrives 

710 return np.inf 

711 

712 idx_first = nonzero_indices[0] 

713 c_first = cin[idx_first] 

714 

715 # Compute retardation for this concentration 

716 r_first = sorption.retardation(c_first) 

717 

718 # Target: cumulative flow volume needed to reach outlet 

719 target_volume = aquifer_pore_volume * r_first 

720 

721 # Integrate piecewise constant flow starting from idx_first 

722 # tedges is assumed to be DatetimeIndex, convert all times to days 

723 cumulative_volume = 0.0 

724 

725 for i in range(idx_first, len(flow)): 

726 # Convert time interval to days 

727 dt_days = (tedges[i + 1] - tedges[i]) / pd.Timedelta(days=1) 

728 volume_in_bin = flow[i] * dt_days 

729 

730 if cumulative_volume + volume_in_bin >= target_volume: 

731 # First arrival occurs during this bin 

732 remaining_volume = target_volume - cumulative_volume 

733 dt_partial = remaining_volume / flow[i] 

734 

735 # Return time in days from tedges[0] 

736 t_bin_start_days = (tedges[i] - tedges[0]) / pd.Timedelta(days=1) 

737 return t_bin_start_days + dt_partial 

738 

739 cumulative_volume += volume_in_bin 

740 

741 # Never reaches outlet with given flow history 

742 return np.inf 

743 

744 

745def compute_first_fully_informed_bin_edge( 

746 cin: npt.NDArray[np.floating], 

747 flow: npt.NDArray[np.floating], 

748 tedges: pd.DatetimeIndex, 

749 aquifer_pore_volume: float, 

750 sorption: FreundlichSorption | ConstantRetardation, 

751 output_tedges: pd.DatetimeIndex, 

752) -> float: 

753 """ 

754 Compute left edge of first output bin that is fully informed. 

755 

756 A bin [t_i, t_{i+1}] is "fully informed" if all water exiting during 

757 that interval originated from known inlet conditions (not unknown 

758 initial conditions). This occurs when t_i >= first front arrival time. 

759 

760 This function is useful for: 

761 - Masking output bins during spin-up period 

762 - Determining which output times are valid for analysis 

763 - Plotting valid vs spin-up regions 

764 

765 Rarefaction handling: 

766 

767 - For n>1: Rarefaction tail (lower C, slower) arrives after head. 

768 Once the first wave (head) arrives, subsequent bins are informed. 

769 - For n<1: Rarefaction tail (higher C, slower) arrives after head. 

770 Same principle applies. 

771 

772 In both cases, once the leading edge of the inlet-generated wave structure 

773 reaches the outlet, all subsequent output is determined by inlet history. 

774 

775 Parameters 

776 ---------- 

777 cin : numpy.ndarray 

778 Inlet concentration [mass/volume]. Length = len(tedges) - 1. 

779 flow : numpy.ndarray 

780 Flow rate [volume/time]. Length = len(tedges) - 1. 

781 tedges : pandas.DatetimeIndex 

782 Inlet time bin edges. Length = len(cin) + 1. 

783 Expected to be DatetimeIndex. 

784 aquifer_pore_volume : float 

785 Total pore volume [volume]. Must be positive. 

786 sorption : FreundlichSorption or ConstantRetardation 

787 Sorption model. 

788 output_tedges : pandas.DatetimeIndex 

789 Output time bin edges. These are the bins for which we want 

790 to determine the first fully-informed bin. 

791 Expected to be DatetimeIndex. 

792 

793 Returns 

794 ------- 

795 t_first_bin : float 

796 Left edge of first output bin that is fully informed, measured in 

797 days from output_tedges[0]. 

798 Returns last edge in days if no bin is fully informed. 

799 Returns np.inf if output_tedges is empty. 

800 

801 Notes 

802 ----- 

803 This differs from compute_first_front_arrival_time in that it returns 

804 a BIN EDGE (from output_tedges), not the exact crossing time. 

805 

806 Both functions return time in days, but measured from different reference points: 

807 - compute_first_front_arrival_time: days from tedges[0] 

808 - compute_first_fully_informed_bin_edge: days from output_tedges[0] 

809 

810 For masking output arrays:: 

811 

812 import pandas as pd 

813 

814 t_first_bin = compute_first_fully_informed_bin_edge(...) 

815 # Convert output_tedges to days from output_tedges[0] 

816 tedges_days = (output_tedges - output_tedges[0]) / pd.Timedelta(days=1) 

817 mask = tedges_days[:-1] >= t_first_bin 

818 cout_valid = cout[mask] 

819 

820 Examples 

821 -------- 

822 >>> import pandas as pd 

823 >>> # Exact arrival at ~11 days from tedges[0] 

824 >>> cin = np.array([0.0, 10.0, 10.0]) 

825 >>> flow = np.array([100.0, 100.0, 100.0]) 

826 >>> tedges = pd.date_range("2020-01-01", periods=4, freq="D") 

827 >>> output_tedges = pd.date_range("2020-01-01", periods=20, freq="D") 

828 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

829 >>> t_bin = compute_first_fully_informed_bin_edge( 

830 ... cin, flow, tedges, 500.0, sorption, output_tedges 

831 ... ) 

832 >>> # Result is in days from output_tedges[0] 

833 >>> t_bin >= 11.0 # First bin edge >= arrival time 

834 True 

835 

836 See Also 

837 -------- 

838 compute_first_front_arrival_time : Get exact arrival time 

839 """ 

840 if len(output_tedges) == 0: 

841 return np.inf 

842 

843 # Compute exact arrival time (in days from tedges[0]) 

844 t_arrival_days = compute_first_front_arrival_time(cin, flow, tedges, aquifer_pore_volume, sorption) 

845 

846 if not np.isfinite(t_arrival_days): 

847 # No arrival, return last edge in days from output_tedges[0] 

848 return (output_tedges[-1] - output_tedges[0]) / pd.Timedelta(days=1) 

849 

850 # Convert output_tedges to days from output_tedges[0] 

851 # Find first bin edge >= t_arrival_days 

852 # Note: t_arrival_days is measured from tedges[0], but output_tedges might have different start 

853 # So we need to adjust the reference point 

854 

855 # Convert t_arrival from "days from tedges[0]" to "days from output_tedges[0]" 

856 t_arrival_abs = tedges[0] + pd.Timedelta(days=t_arrival_days) 

857 t_arrival_output_ref = (t_arrival_abs - output_tedges[0]) / pd.Timedelta(days=1) 

858 

859 # Find first output bin edge >= t_arrival 

860 for t_edge in output_tedges: 

861 t_edge_days = (t_edge - output_tedges[0]) / pd.Timedelta(days=1) 

862 if t_edge_days >= t_arrival_output_ref: 

863 return t_edge_days 

864 

865 # If no edge is >= t_arrival, return the last edge 

866 # (This means all bins are before arrival) 

867 return (output_tedges[-1] - output_tedges[0]) / pd.Timedelta(days=1)