Coverage for src / gwtransport / fronttracking / math.py: 91%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-27 06:32 +0000

1""" 

2Mathematical Foundation for Front Tracking with Nonlinear Sorption. 

3 

4This module provides exact analytical computations for: 

5- Freundlich, Langmuir, and constant retardation models 

6- Shock velocities via Rankine-Hugoniot condition 

7- Characteristic velocities and positions 

8- First arrival time calculations 

9- Entropy condition verification 

10 

11All computations are exact analytical formulas with no numerical tolerances. 

12 

13This file is part of gwtransport which is released under AGPL-3.0 license. 

14See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

15""" 

16 

17from abc import ABC, abstractmethod 

18from dataclasses import dataclass 

19 

20import numpy as np 

21import numpy.typing as npt 

22import pandas as pd 

23 

24# Numerical tolerance constants 

25EPSILON_FREUNDLICH_N = 1e-10 # Tolerance for checking if n ≈ 1.0 

26EPSILON_EXPONENT = 1e-10 # Tolerance for checking if exponent ≈ 0 

27EPSILON_DENOMINATOR = 1e-15 # Tolerance for near-zero denominators in shock velocity 

28 

29 

30class NonlinearSorption(ABC): 

31 """Abstract base for concentration-dependent sorption models. 

32 

33 Subclasses must implement `retardation`, `total_concentration`, and 

34 `concentration_from_retardation`. Shock velocity and entropy checking 

35 are provided generically via the Rankine-Hugoniot and Lax conditions. 

36 

37 See Also 

38 -------- 

39 FreundlichSorption : Freundlich isotherm implementation. 

40 LangmuirSorption : Langmuir isotherm implementation. 

41 ConstantRetardation : Linear (constant R) retardation model. 

42 """ 

43 

44 @abstractmethod 

45 def retardation(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

46 """Compute retardation factor R(C).""" 

47 raise NotImplementedError 

48 

49 @abstractmethod 

50 def total_concentration(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

51 """Compute total concentration (dissolved + sorbed per unit pore volume).""" 

52 raise NotImplementedError 

53 

54 @abstractmethod 

55 def concentration_from_retardation(self, r: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

56 """Invert retardation factor to obtain concentration.""" 

57 raise NotImplementedError 

58 

59 def shock_velocity(self, c_left: float, c_right: float, flow: float) -> float: 

60 """ 

61 Compute shock velocity via Rankine-Hugoniot condition. 

62 

63 The Rankine-Hugoniot condition ensures mass conservation across the shock: 

64 s_shock = [flux(C_R) - flux(C_L)] / [C_total(C_R) - C_total(C_L)] 

65 

66 where flux(C) = flow * C (only dissolved species are transported). 

67 

68 Parameters 

69 ---------- 

70 c_left : float 

71 Concentration upstream (behind) shock [mass/volume]. 

72 c_right : float 

73 Concentration downstream (ahead of) shock [mass/volume]. 

74 flow : float 

75 Flow rate [volume/time]. Must be positive. 

76 

77 Returns 

78 ------- 

79 s_shock : float 

80 Shock velocity [volume/time]. 

81 

82 Notes 

83 ----- 

84 The Rankine-Hugoniot condition is derived from integrating the conservation 

85 law across the shock discontinuity. It ensures that the total mass flux 

86 (advective transport) is conserved. 

87 """ 

88 # Flux = flow * C (only dissolved species flow) 

89 flux_left = flow * c_left 

90 flux_right = flow * c_right 

91 

92 # Total concentration (dissolved + sorbed) 

93 c_total_left = self.total_concentration(c_left) 

94 c_total_right = self.total_concentration(c_right) 

95 

96 # Rankine-Hugoniot condition 

97 denom = c_total_right - c_total_left 

98 

99 if abs(denom) < EPSILON_DENOMINATOR: 

100 avg_retardation = 0.5 * (self.retardation(c_left) + self.retardation(c_right)) 

101 return float(flow / avg_retardation) 

102 

103 return float((flux_right - flux_left) / denom) 

104 

105 def check_entropy_condition(self, c_left: float, c_right: float, shock_vel: float, flow: float) -> bool: 

106 """ 

107 Verify Lax entropy condition for physical admissibility of shock. 

108 

109 The Lax entropy condition ensures that characteristics flow INTO the shock 

110 from both sides, which is required for physical shocks:: 

111 

112 λ(C_L) > s_shock > λ(C_R) 

113 

114 where λ(C) = flow / R(C) is the characteristic velocity. 

115 

116 Parameters 

117 ---------- 

118 c_left : float 

119 Concentration upstream of shock [mass/volume]. 

120 c_right : float 

121 Concentration downstream of shock [mass/volume]. 

122 shock_vel : float 

123 Shock velocity [volume/time]. 

124 flow : float 

125 Flow rate [volume/time]. 

126 

127 Returns 

128 ------- 

129 satisfies : bool 

130 True if shock satisfies entropy condition (is physical). 

131 """ 

132 # Characteristic velocities 

133 lambda_left = flow / self.retardation(c_left) 

134 lambda_right = flow / self.retardation(c_right) 

135 

136 if not np.isfinite(lambda_left) or not np.isfinite(lambda_right) or not np.isfinite(shock_vel): 

137 return False 

138 

139 # Lax condition: λ(C_L) > s_shock > λ(C_R) 

140 tolerance = 1e-14 * max(abs(lambda_left), abs(lambda_right), abs(shock_vel)) 

141 

142 return bool((lambda_left > shock_vel - tolerance) and (shock_vel > lambda_right - tolerance)) 

143 

144 

145@dataclass 

146class FreundlichSorption(NonlinearSorption): 

147 """ 

148 Freundlich sorption isotherm with exact analytical methods. 

149 

150 The Freundlich isotherm is: s(C) = k_f * C^(1/n) 

151 

152 where: 

153 - s is sorbed concentration [mass/mass of solid] 

154 - C is dissolved concentration [mass/volume of water] 

155 - k_f is Freundlich coefficient [(volume/mass)^(1/n)] 

156 - n is Freundlich exponent (dimensionless) 

157 

158 For n > 1: Higher C travels faster 

159 For n < 1: Higher C travels slower 

160 For n = 1: linear (not supported, use ConstantRetardation instead) 

161 

162 Parameters 

163 ---------- 

164 k_f : float 

165 Freundlich coefficient [(m³/kg)^(1/n)]. Must be positive. 

166 n : float 

167 Freundlich exponent [-]. Must be positive and != 1. 

168 bulk_density : float 

169 Bulk density of porous medium [kg/m³]. Must be positive. 

170 porosity : float 

171 Porosity [-]. Must be in (0, 1). 

172 c_min : float, optional 

173 Minimum concentration threshold. For n>1, prevents infinite retardation 

174 as C→0. Default: 0.1 for n>1, 0.0 for n<1 (set automatically if not provided). 

175 

176 Notes 

177 ----- 

178 The retardation factor is defined as: 

179 R(C) = 1 + (rho_b/n_por) * ds/dC 

180 = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1) 

181 

182 For Freundlich sorption, R depends on C, which creates nonlinear wave behavior. 

183 

184 For n>1 (higher C travels faster), R(C)→∞ as C→0, which can cause extremely slow 

185 wave propagation. The c_min parameter prevents this by enforcing a minimum 

186 concentration, making R(C) finite for all C≥0. 

187 

188 Examples 

189 -------- 

190 >>> sorption = FreundlichSorption( 

191 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

192 ... ) 

193 >>> r = sorption.retardation(5.0) 

194 >>> c_back = sorption.concentration_from_retardation(r) 

195 >>> bool(np.isclose(c_back, 5.0)) 

196 True 

197 """ 

198 

199 k_f: float 

200 """Freundlich coefficient [(m³/kg)^(1/n)].""" 

201 n: float 

202 """Freundlich exponent [-].""" 

203 bulk_density: float 

204 """Bulk density of porous medium [kg/m³].""" 

205 porosity: float 

206 """Porosity [-].""" 

207 c_min: float = 1e-12 

208 """Minimum concentration threshold to prevent infinite retardation.""" 

209 

210 def __post_init__(self): 

211 """Validate parameters after initialization. 

212 

213 Raises 

214 ------ 

215 ValueError 

216 If any parameter is outside its valid range: ``k_f`` <= 0, 

217 ``n`` <= 0, ``n`` == 1, ``bulk_density`` <= 0, ``porosity`` 

218 outside (0, 1), or ``c_min`` < 0. 

219 """ 

220 if self.k_f <= 0: 

221 msg = f"k_f must be positive, got {self.k_f}" 

222 raise ValueError(msg) 

223 if self.n <= 0: 

224 msg = f"n must be positive, got {self.n}" 

225 raise ValueError(msg) 

226 if abs(self.n - 1.0) < EPSILON_FREUNDLICH_N: 

227 msg = "n = 1 (linear case) not supported, use ConstantRetardation instead" 

228 raise ValueError(msg) 

229 if self.bulk_density <= 0: 

230 msg = f"bulk_density must be positive, got {self.bulk_density}" 

231 raise ValueError(msg) 

232 if not 0 < self.porosity < 1: 

233 msg = f"porosity must be in (0, 1), got {self.porosity}" 

234 raise ValueError(msg) 

235 if self.c_min < 0: 

236 msg = f"c_min must be non-negative, got {self.c_min}" 

237 raise ValueError(msg) 

238 

239 def retardation(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

240 """ 

241 Compute retardation factor R(C). 

242 

243 The retardation factor relates concentration velocity to pore water velocity: 

244 v_C = flow / R(C) 

245 

246 For Freundlich sorption: 

247 R(C) = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1) 

248 

249 Parameters 

250 ---------- 

251 c : float or array-like 

252 Dissolved concentration [mass/volume]. Non-negative. 

253 

254 Returns 

255 ------- 

256 r : float or numpy.ndarray 

257 Retardation factor [-]. Always >= 1.0. 

258 

259 Notes 

260 ----- 

261 - For n > 1: R decreases with increasing C (higher C travels faster) 

262 - For n < 1: R increases with increasing C (higher C travels slower) 

263 - Concentrations at or below c_min return R=1 if c_min=0, else are clipped to c_min 

264 """ 

265 is_array = isinstance(c, np.ndarray) 

266 c_arr = np.asarray(c) 

267 

268 if self.c_min == 0 and self.n < 1.0: 

269 # Only for n<1 (lower C travels faster) where R(0)=1 is physically correct 

270 result = np.where(c_arr <= 0, 1.0, self._compute_retardation(c_arr)) 

271 else: 

272 c_eff = np.maximum(c_arr, self.c_min) 

273 result = self._compute_retardation(c_eff) 

274 

275 return result if is_array else float(result) 

276 

277 def _compute_retardation(self, c: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: 

278 """Compute retardation for positive concentrations. 

279 

280 Parameters 

281 ---------- 

282 c : numpy.ndarray 

283 Dissolved concentration [mass/volume]. Must be positive. 

284 

285 Returns 

286 ------- 

287 numpy.ndarray 

288 Retardation factor [-]. Always >= 1.0. 

289 """ 

290 exponent = (1.0 / self.n) - 1.0 

291 coefficient = (self.bulk_density * self.k_f) / (self.porosity * self.n) 

292 return 1.0 + coefficient * (c**exponent) 

293 

294 def total_concentration(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

295 """ 

296 Compute total concentration (dissolved + sorbed per unit pore volume). 

297 

298 Total concentration includes both dissolved and sorbed mass: 

299 C_total = C + (rho_b/n_por) * s(C) 

300 = C + (rho_b/n_por) * k_f * C^(1/n) 

301 

302 Parameters 

303 ---------- 

304 c : float or array-like 

305 Dissolved concentration [mass/volume]. Non-negative. 

306 

307 Returns 

308 ------- 

309 c_total : float or numpy.ndarray 

310 Total concentration [mass/volume]. Always >= c. 

311 

312 Notes 

313 ----- 

314 This is the conserved quantity in the transport equation: 

315 ∂C_total/∂t + ∂(flow*C)/∂v = 0 

316 

317 The flux term only includes dissolved concentration because sorbed mass 

318 is immobile. Concentrations at or below c_min return C if c_min=0, else use c_min. 

319 """ 

320 is_array = isinstance(c, np.ndarray) 

321 c_arr = np.asarray(c) 

322 

323 if self.c_min == 0 and self.n < 1.0: 

324 # Only for n<1 (lower C travels faster) where C=0 is physically valid 

325 sorbed = np.where( 

326 c_arr <= 0, 0.0, (self.bulk_density / self.porosity) * self.k_f * (c_arr ** (1.0 / self.n)) 

327 ) 

328 else: 

329 c_eff = np.maximum(c_arr, self.c_min) 

330 sorbed = (self.bulk_density / self.porosity) * self.k_f * (c_eff ** (1.0 / self.n)) 

331 

332 result = c_arr + sorbed 

333 return result if is_array else float(result) 

334 

335 def concentration_from_retardation(self, r: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

336 """ 

337 Invert retardation factor to obtain concentration analytically. 

338 

339 Given R, solves R = retardation(C) for C. This is used in rarefaction waves 

340 where the self-similar solution gives R as a function of position and time. 

341 

342 Parameters 

343 ---------- 

344 r : float or array-like 

345 Retardation factor [-]. Must be >= 1.0. 

346 

347 Returns 

348 ------- 

349 c : float or numpy.ndarray 

350 Dissolved concentration [mass/volume]. Non-negative. 

351 

352 Raises 

353 ------ 

354 ValueError 

355 If the retardation exponent is effectively zero (i.e., ``n`` ≈ 1), 

356 making inversion undefined. 

357 

358 Notes 

359 ----- 

360 This inverts the relation: 

361 R = 1 + (rho_b*k_f)/(n_por*n) * C^((1/n)-1) 

362 

363 The analytical solution is: 

364 C = [(R-1) * n_por*n / (rho_b*k_f)]^(n/(1-n)) 

365 

366 For n = 1 (linear sorption), the exponent n/(1-n) is undefined, which is 

367 why linear sorption must use ConstantRetardation class instead. 

368 

369 Examples 

370 -------- 

371 >>> sorption = FreundlichSorption( 

372 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

373 ... ) 

374 >>> r = sorption.retardation(5.0) 

375 >>> c = sorption.concentration_from_retardation(r) 

376 >>> bool(np.isclose(c, 5.0, rtol=1e-14)) 

377 True 

378 """ 

379 is_array = isinstance(r, np.ndarray) 

380 r_arr = np.asarray(r) 

381 

382 exponent = (1.0 / self.n) - 1.0 

383 

384 if abs(exponent) < EPSILON_EXPONENT: 

385 msg = "Cannot invert linear retardation (n=1)" 

386 raise ValueError(msg) 

387 

388 coefficient = (self.bulk_density * self.k_f) / (self.porosity * self.n) 

389 base = (r_arr - 1.0) / coefficient 

390 

391 inversion_exponent = 1.0 / exponent 

392 c = base**inversion_exponent 

393 result = np.maximum(c, self.c_min) 

394 result = np.where(r_arr <= 1.0, self.c_min, result) 

395 result = np.where(base <= 0, self.c_min, result) 

396 

397 return result if is_array else float(result) 

398 

399 

400@dataclass 

401class ConstantRetardation: 

402 """ 

403 Constant (linear) retardation model. 

404 

405 For linear sorption: s(C) = K_d * C 

406 This gives constant retardation: R(C) = 1 + (rho_b/n_por) * K_d = constant 

407 

408 This is a special case where concentration-dependent behavior disappears. 

409 Used for conservative tracers or as approximation for weak sorption. 

410 

411 Parameters 

412 ---------- 

413 retardation_factor : float 

414 Constant retardation factor [-]. Must be >= 1.0. 

415 R = 1.0 means no retardation (conservative tracer). 

416 

417 Notes 

418 ----- 

419 With constant retardation: 

420 - All concentrations travel at same velocity: flow / R 

421 - No rarefaction waves form (all concentrations travel together) 

422 - Shocks occur only at concentration discontinuities at inlet 

423 - Solution reduces to simple time-shifting 

424 

425 This is equivalent to using `infiltration_to_extraction_series` in the 

426 gwtransport package. 

427 

428 Examples 

429 -------- 

430 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

431 >>> sorption.retardation(5.0) 

432 2.0 

433 >>> sorption.retardation(10.0) 

434 2.0 

435 """ 

436 

437 retardation_factor: float 

438 """Constant retardation factor [-]. Must be >= 1.0.""" 

439 

440 def __post_init__(self): 

441 """Validate parameters after initialization. 

442 

443 Raises 

444 ------ 

445 ValueError 

446 If ``retardation_factor`` is less than 1.0. 

447 """ 

448 if self.retardation_factor < 1.0: 

449 msg = f"retardation_factor must be >= 1.0, got {self.retardation_factor}" 

450 raise ValueError(msg) 

451 

452 def retardation(self, c: float) -> float: # noqa: ARG002 

453 """ 

454 Return constant retardation factor (independent of concentration). 

455 

456 Parameters 

457 ---------- 

458 c : float 

459 Dissolved concentration (not used for constant retardation). 

460 

461 Returns 

462 ------- 

463 r : float 

464 Constant retardation factor. 

465 """ 

466 return self.retardation_factor 

467 

468 def total_concentration(self, c: float) -> float: 

469 """ 

470 Compute total concentration for linear sorption. 

471 

472 For constant retardation: 

473 C_total = C * R 

474 

475 Parameters 

476 ---------- 

477 c : float 

478 Dissolved concentration [mass/volume]. 

479 

480 Returns 

481 ------- 

482 c_total : float 

483 Total concentration [mass/volume]. 

484 """ 

485 return c * self.retardation_factor 

486 

487 def concentration_from_retardation(self, r: float) -> float: 

488 """ 

489 Not applicable for constant retardation. 

490 

491 With constant R, all concentrations have the same retardation, so 

492 inversion is not meaningful. This method raises an error. 

493 

494 Raises 

495 ------ 

496 NotImplementedError 

497 Always raised for constant retardation. 

498 """ 

499 msg = "concentration_from_retardation not applicable for ConstantRetardation (R is independent of C)" 

500 raise NotImplementedError(msg) 

501 

502 def shock_velocity(self, c_left: float, c_right: float, flow: float) -> float: # noqa: ARG002 

503 """ 

504 Compute shock velocity for constant retardation. 

505 

506 With constant R, the shock velocity simplifies to: 

507 s_shock = flow / R 

508 

509 This is the same as all characteristic velocities. 

510 

511 Parameters 

512 ---------- 

513 c_left : float 

514 Concentration upstream of shock (not used for constant R). 

515 c_right : float 

516 Concentration downstream of shock (not used for constant R). 

517 flow : float 

518 Flow rate [volume/time]. 

519 

520 Returns 

521 ------- 

522 s_shock : float 

523 Shock velocity [volume/time]. 

524 """ 

525 return flow / self.retardation_factor 

526 

527 def check_entropy_condition(self, c_left: float, c_right: float, shock_vel: float, flow: float) -> bool: # noqa: ARG002, PLR6301 

528 """ 

529 Check entropy condition for constant retardation. 

530 

531 With constant R, all characteristic velocities are equal, so the 

532 entropy condition is trivially satisfied for any shock (or rather, 

533 shocks don't really exist - they're just contact discontinuities). 

534 

535 Parameters 

536 ---------- 

537 c_left : float 

538 Concentration upstream. 

539 c_right : float 

540 Concentration downstream. 

541 shock_vel : float 

542 Shock velocity. 

543 flow : float 

544 Flow rate. 

545 

546 Returns 

547 ------- 

548 satisfies : bool 

549 Always True for constant retardation. 

550 """ 

551 return True 

552 

553 

554@dataclass 

555class LangmuirSorption(NonlinearSorption): 

556 """ 

557 Langmuir sorption isotherm with exact analytical methods. 

558 

559 The Langmuir isotherm is: s(C) = s_max * C / (K_L + C) 

560 

561 where: 

562 - s is sorbed concentration [mass/mass of solid] 

563 - C is dissolved concentration [mass/volume of water] 

564 - s_max is maximum sorption capacity [mass/mass of solid] 

565 - K_L is half-saturation constant [mass/volume] 

566 

567 Retardation always decreases with C (favorable isotherm), and R(0) is 

568 finite — unlike Freundlich with n > 1, no minimum concentration threshold 

569 is needed. 

570 

571 Parameters 

572 ---------- 

573 s_max : float 

574 Maximum sorption capacity [mass/mass of solid]. Must be positive. 

575 k_l : float 

576 Half-saturation constant [mass/volume]. Concentration at which 

577 s = s_max / 2. Must be positive. 

578 bulk_density : float 

579 Bulk density of porous medium [kg/m³]. Must be positive. 

580 porosity : float 

581 Porosity [-]. Must be in (0, 1). 

582 

583 See Also 

584 -------- 

585 FreundlichSorption : Freundlich isotherm (unbounded sorption). 

586 ConstantRetardation : Linear (constant R) retardation model. 

587 :ref:`concept-nonlinear-sorption` : Background on nonlinear sorption. 

588 

589 Notes 

590 ----- 

591 The retardation factor is defined as: 

592 R(C) = 1 + (rho_b * s_max * K_L) / (n_por * (K_L + C)^2) 

593 

594 Key properties: 

595 

596 - R(0) = 1 + rho_b * s_max / (n_por * K_L) -- finite for all parameters 

597 - R -> 1 as C -> infinity (all sorption sites saturated) 

598 - R always decreases with increasing C (higher C travels faster) 

599 - Shocks form on concentration increases, rarefaction fans on decreases 

600 

601 Examples 

602 -------- 

603 >>> sorption = LangmuirSorption( 

604 ... s_max=0.1, k_l=5.0, bulk_density=1500.0, porosity=0.3 

605 ... ) 

606 >>> r = sorption.retardation(5.0) 

607 >>> c_back = sorption.concentration_from_retardation(r) 

608 >>> bool(np.isclose(c_back, 5.0)) 

609 True 

610 """ 

611 

612 s_max: float 

613 """Maximum sorption capacity [mass/mass of solid].""" 

614 k_l: float 

615 """Half-saturation constant [mass/volume].""" 

616 bulk_density: float 

617 """Bulk density of porous medium [kg/m³].""" 

618 porosity: float 

619 """Porosity [-].""" 

620 

621 def __post_init__(self): 

622 """Validate parameters after initialization. 

623 

624 Raises 

625 ------ 

626 ValueError 

627 If any parameter is outside its valid range: ``s_max`` <= 0, 

628 ``k_l`` <= 0, ``bulk_density`` <= 0, or ``porosity`` 

629 outside (0, 1). 

630 """ 

631 if self.s_max <= 0: 

632 msg = f"s_max must be positive, got {self.s_max}" 

633 raise ValueError(msg) 

634 if self.k_l <= 0: 

635 msg = f"k_l must be positive, got {self.k_l}" 

636 raise ValueError(msg) 

637 if self.bulk_density <= 0: 

638 msg = f"bulk_density must be positive, got {self.bulk_density}" 

639 raise ValueError(msg) 

640 if not 0 < self.porosity < 1: 

641 msg = f"porosity must be in (0, 1), got {self.porosity}" 

642 raise ValueError(msg) 

643 

644 self.a_coeff: float = self.bulk_density * self.s_max * self.k_l / self.porosity 

645 """Lumped retardation constant rho_b * s_max * K_L / n_por.""" 

646 

647 def retardation(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

648 """ 

649 Compute retardation factor R(C). 

650 

651 For Langmuir sorption: 

652 R(C) = 1 + A / (K_L + C)² 

653 

654 where A = rho_b * s_max * K_L / n_por. 

655 

656 Parameters 

657 ---------- 

658 c : float or array-like 

659 Dissolved concentration [mass/volume]. Non-negative. 

660 

661 Returns 

662 ------- 

663 r : float or numpy.ndarray 

664 Retardation factor [-]. Always >= 1.0. 

665 

666 Notes 

667 ----- 

668 - R(0) = 1 + rho_b * s_max / (n_por * K_L) — always finite 

669 - R decreases with increasing C (higher C travels faster) 

670 - R → 1 as C → ∞ (all sorption sites saturated) 

671 """ 

672 is_array = isinstance(c, np.ndarray) 

673 c_arr = np.asarray(c) 

674 c_eff = np.maximum(c_arr, 0.0) 

675 result = 1.0 + self.a_coeff / (self.k_l + c_eff) ** 2 

676 return result if is_array else float(result) 

677 

678 def total_concentration(self, c: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

679 """ 

680 Compute total concentration (dissolved + sorbed per unit pore volume). 

681 

682 For Langmuir sorption: 

683 C_total = C + (rho_b / n_por) * s_max * C / (K_L + C) 

684 

685 Parameters 

686 ---------- 

687 c : float or array-like 

688 Dissolved concentration [mass/volume]. Non-negative. 

689 

690 Returns 

691 ------- 

692 c_total : float or numpy.ndarray 

693 Total concentration [mass/volume]. Always >= c. 

694 """ 

695 is_array = isinstance(c, np.ndarray) 

696 c_arr = np.asarray(c) 

697 c_eff = np.maximum(c_arr, 0.0) 

698 sorbed = (self.bulk_density / self.porosity) * self.s_max * c_eff / (self.k_l + c_eff) 

699 result = c_arr + sorbed 

700 return result if is_array else float(result) 

701 

702 def concentration_from_retardation(self, r: float | npt.NDArray[np.float64]) -> float | npt.NDArray[np.float64]: 

703 """ 

704 Invert retardation factor to obtain concentration analytically. 

705 

706 Given R, solves R = 1 + A / (K_L + C)² for C: 

707 C = sqrt(A / (R - 1)) - K_L 

708 

709 Parameters 

710 ---------- 

711 r : float or array-like 

712 Retardation factor [-]. Must be >= 1.0. 

713 

714 Returns 

715 ------- 

716 c : float or numpy.ndarray 

717 Dissolved concentration [mass/volume]. Non-negative. 

718 

719 Notes 

720 ----- 

721 For R <= 1, returns 0.0 (unphysical region). 

722 For R >= R(0) = 1 + A/K_L², returns 0.0 (at or below zero concentration). 

723 

724 Examples 

725 -------- 

726 >>> sorption = LangmuirSorption( 

727 ... s_max=0.1, k_l=5.0, bulk_density=1500.0, porosity=0.3 

728 ... ) 

729 >>> r = sorption.retardation(5.0) 

730 >>> c = sorption.concentration_from_retardation(r) 

731 >>> bool(np.isclose(c, 5.0, rtol=1e-14)) 

732 True 

733 """ 

734 is_array = isinstance(r, np.ndarray) 

735 r_arr = np.asarray(r) 

736 

737 r_minus_1 = r_arr - 1.0 

738 # For R <= 1 or very large R, return 0 

739 c = np.where(r_minus_1 > 0, np.sqrt(self.a_coeff / r_minus_1) - self.k_l, 0.0) 

740 result = np.maximum(c, 0.0) 

741 

742 return result if is_array else float(result) 

743 

744 

745SorptionModel = NonlinearSorption | ConstantRetardation 

746"""Type alias for all sorption models accepted by the front-tracking solver.""" 

747 

748 

749def characteristic_velocity(c: float, flow: float, sorption: SorptionModel) -> float: 

750 """ 

751 Compute characteristic velocity for given concentration. 

752 

753 In smooth regions of the solution, concentration travels at velocity: 

754 v = flow / R(C) 

755 

756 Parameters 

757 ---------- 

758 c : float 

759 Dissolved concentration [mass/volume]. 

760 flow : float 

761 Flow rate [volume/time]. 

762 sorption : SorptionModel 

763 Sorption model. 

764 

765 Returns 

766 ------- 

767 velocity : float 

768 Characteristic velocity [volume/time]. 

769 

770 Examples 

771 -------- 

772 >>> sorption = FreundlichSorption( 

773 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

774 ... ) 

775 >>> v = characteristic_velocity(c=5.0, flow=100.0, sorption=sorption) 

776 >>> v > 0 

777 True 

778 """ 

779 return float(flow / sorption.retardation(c)) 

780 

781 

782def characteristic_position( 

783 c: float, flow: float, sorption: SorptionModel, t_start: float, v_start: float, t: float 

784) -> float | None: 

785 """ 

786 Compute exact position of characteristic at time t. 

787 

788 Characteristics propagate linearly in time: 

789 V(t) = v_start + velocity * (t - t_start) 

790 

791 where velocity = flow / R(C) is constant along the characteristic. 

792 

793 Parameters 

794 ---------- 

795 c : float 

796 Concentration carried by characteristic [mass/volume]. 

797 flow : float 

798 Flow rate [volume/time]. 

799 sorption : SorptionModel 

800 Sorption model. 

801 t_start : float 

802 Time when characteristic starts [days]. 

803 v_start : float 

804 Starting position [volume]. 

805 t : float 

806 Time at which to compute position [days]. 

807 

808 Returns 

809 ------- 

810 position : float or None 

811 Position at time t [volume], or None if t < t_start. 

812 

813 Examples 

814 -------- 

815 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

816 >>> v = characteristic_position( 

817 ... c=5.0, flow=100.0, sorption=sorption, t_start=0.0, v_start=0.0, t=10.0 

818 ... ) 

819 >>> bool(np.isclose(v, 500.0)) # v = 100/2 * 10 = 500 

820 True 

821 """ 

822 if t < t_start: 

823 return None 

824 

825 velocity = characteristic_velocity(c, flow, sorption) 

826 dt_days = t - t_start 

827 

828 return v_start + velocity * dt_days 

829 

830 

831def compute_first_front_arrival_time( 

832 cin: npt.NDArray[np.floating], 

833 flow: npt.NDArray[np.floating], 

834 tedges: pd.DatetimeIndex, 

835 aquifer_pore_volume: float, 

836 sorption: SorptionModel, 

837) -> float: 

838 """ 

839 Compute exact time when first wave reaches outlet (v_max). 

840 

841 This function returns the precise moment when the first non-zero 

842 concentration wave from the inlet arrives at the outlet. This marks 

843 the end of the spin-up period. 

844 

845 For the typical case where the first inlet change creates a characteristic 

846 (e.g., 0→C transition), this is when that characteristic reaches v_max. 

847 

848 For cases with rarefaction waves: 

849 

850 - n>1 (higher C travels faster): The head of a rarefaction 

851 (higher C) arrives first. 

852 - n<1 (lower C travels faster): The head of a rarefaction 

853 (lower C) arrives first. 

854 

855 Algorithm: 

856 1. Find first index where cin > 0 

857 2. Compute residence time for this concentration from inlet to outlet 

858 3. Account for piecewise constant flow during transit 

859 4. Return arrival time in days from tedges[0] 

860 

861 Parameters 

862 ---------- 

863 cin : numpy.ndarray 

864 Inlet concentration [mass/volume]. Length = len(tedges) - 1. 

865 flow : numpy.ndarray 

866 Flow rate [volume/time]. Length = len(tedges) - 1. 

867 tedges : pandas.DatetimeIndex 

868 Time bin edges. Length = len(cin) + 1. 

869 Expected to be DatetimeIndex. 

870 aquifer_pore_volume : float 

871 Total pore volume [volume]. Must be positive. 

872 sorption : SorptionModel 

873 Sorption model. 

874 

875 Returns 

876 ------- 

877 t_first_arrival : float 

878 Time when first wave reaches outlet, measured in days from tedges[0]. 

879 Returns np.inf if no concentration ever arrives. 

880 

881 See Also 

882 -------- 

883 compute_first_fully_informed_bin_edge : Get first valid output bin edge 

884 

885 Notes 

886 ----- 

887 The residence time accounts for retardation: 

888 residence_time = aquifer_pore_volume * R(C) / flow_avg 

889 

890 For piecewise constant flow, we integrate: 

891 ∫₀^residence_time flow(t) dt = aquifer_pore_volume * R(C) 

892 

893 This function computes the EXACT crossing time in days, not a bin edge. 

894 Use compute_first_fully_informed_bin_edge() to get the corresponding 

895 output bin edge for masking purposes. 

896 

897 Examples 

898 -------- 

899 >>> import pandas as pd 

900 >>> cin = np.array([0.0, 10.0] + [10.0] * 10) # First bin zero, then nonzero 

901 >>> flow = np.array([100.0] * 12) # Constant flow 

902 >>> tedges = pd.date_range("2020-01-01", periods=13, freq="D") 

903 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

904 >>> t_first = compute_first_front_arrival_time(cin, flow, tedges, 500.0, sorption) 

905 >>> # Result is in days from tedges[0] 

906 >>> bool(np.isclose(t_first, 11.0)) # 1 day (offset) + 10 days (travel time) 

907 True 

908 """ 

909 # Find first non-zero concentration 

910 nonzero_indices = np.where(cin > 0)[0] 

911 

912 if len(nonzero_indices) == 0: 

913 # No concentration ever arrives 

914 return np.inf 

915 

916 idx_first = nonzero_indices[0] 

917 c_first = float(cin[idx_first]) 

918 

919 # Compute retardation for this concentration 

920 r_first = sorption.retardation(c_first) 

921 

922 # Target: cumulative flow volume needed to reach outlet 

923 target_volume = aquifer_pore_volume * r_first 

924 

925 # Vectorized integration of piecewise-constant flow starting from idx_first. 

926 # Convert all bin widths to days at once and accumulate the volume profile. 

927 tedges_days = np.asarray((tedges - tedges[0]) / pd.Timedelta(days=1), dtype=float) 

928 dt_days = np.diff(tedges_days[idx_first:]) 

929 volumes = np.asarray(flow[idx_first:], dtype=float) * dt_days 

930 cumulative_volume = np.cumsum(volumes) 

931 

932 # Locate the first bin whose cumulative volume reaches the target. 

933 bin_offset = int(np.searchsorted(cumulative_volume, target_volume, side="left")) 

934 

935 if bin_offset >= len(cumulative_volume): 

936 # Never reaches outlet with given flow history 

937 return float(np.inf) 

938 

939 # Volume already accumulated before entering the bin where arrival occurs. 

940 volume_before_bin = float(cumulative_volume[bin_offset - 1]) if bin_offset > 0 else 0.0 

941 remaining_volume = target_volume - volume_before_bin 

942 dt_partial = remaining_volume / float(flow[idx_first + bin_offset]) 

943 

944 return float(float(tedges_days[idx_first + bin_offset]) + dt_partial) 

945 

946 

947def compute_first_fully_informed_bin_edge( 

948 cin: npt.NDArray[np.floating], 

949 flow: npt.NDArray[np.floating], 

950 tedges: pd.DatetimeIndex, 

951 aquifer_pore_volume: float, 

952 sorption: SorptionModel, 

953 output_tedges: pd.DatetimeIndex, 

954) -> float: 

955 """ 

956 Compute left edge of first output bin that is fully informed. 

957 

958 A bin [t_i, t_{i+1}] is "fully informed" if all water exiting during 

959 that interval originated from known inlet conditions (not unknown 

960 initial conditions). This occurs when t_i >= first front arrival time. 

961 

962 This function is useful for: 

963 - Masking output bins during spin-up period 

964 - Determining which output times are valid for analysis 

965 - Plotting valid vs spin-up regions 

966 

967 Rarefaction handling: 

968 

969 - For n>1: Rarefaction tail (lower C, slower) arrives after head. 

970 Once the first wave (head) arrives, subsequent bins are informed. 

971 - For n<1: Rarefaction tail (higher C, slower) arrives after head. 

972 Same principle applies. 

973 

974 In both cases, once the leading edge of the inlet-generated wave structure 

975 reaches the outlet, all subsequent output is determined by inlet history. 

976 

977 Parameters 

978 ---------- 

979 cin : numpy.ndarray 

980 Inlet concentration [mass/volume]. Length = len(tedges) - 1. 

981 flow : numpy.ndarray 

982 Flow rate [volume/time]. Length = len(tedges) - 1. 

983 tedges : pandas.DatetimeIndex 

984 Inlet time bin edges. Length = len(cin) + 1. 

985 Expected to be DatetimeIndex. 

986 aquifer_pore_volume : float 

987 Total pore volume [volume]. Must be positive. 

988 sorption : SorptionModel 

989 Sorption model. 

990 output_tedges : pandas.DatetimeIndex 

991 Output time bin edges. These are the bins for which we want 

992 to determine the first fully-informed bin. 

993 Expected to be DatetimeIndex. 

994 

995 Returns 

996 ------- 

997 t_first_bin : float 

998 Left edge of first output bin that is fully informed, measured in 

999 days from output_tedges[0]. 

1000 Returns last edge in days if no bin is fully informed. 

1001 Returns np.inf if output_tedges is empty. 

1002 

1003 See Also 

1004 -------- 

1005 compute_first_front_arrival_time : Get exact arrival time 

1006 

1007 Notes 

1008 ----- 

1009 This differs from compute_first_front_arrival_time in that it returns 

1010 a BIN EDGE (from output_tedges), not the exact crossing time. 

1011 

1012 Both functions return time in days, but measured from different reference points: 

1013 - compute_first_front_arrival_time: days from tedges[0] 

1014 - compute_first_fully_informed_bin_edge: days from output_tedges[0] 

1015 

1016 For masking output arrays:: 

1017 

1018 import pandas as pd 

1019 

1020 t_first_bin = compute_first_fully_informed_bin_edge(...) 

1021 # Convert output_tedges to days from output_tedges[0] 

1022 tedges_days = (output_tedges - output_tedges[0]) / pd.Timedelta(days=1) 

1023 mask = tedges_days[:-1] >= t_first_bin 

1024 cout_valid = cout[mask] 

1025 

1026 Examples 

1027 -------- 

1028 >>> import pandas as pd 

1029 >>> # Exact arrival at ~11 days from tedges[0] 

1030 >>> cin = np.array([0.0, 10.0, 10.0]) 

1031 >>> flow = np.array([100.0, 100.0, 100.0]) 

1032 >>> tedges = pd.date_range("2020-01-01", periods=4, freq="D") 

1033 >>> output_tedges = pd.date_range("2020-01-01", periods=20, freq="D") 

1034 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

1035 >>> t_bin = compute_first_fully_informed_bin_edge( 

1036 ... cin, flow, tedges, 500.0, sorption, output_tedges 

1037 ... ) 

1038 >>> # Result is in days from output_tedges[0] 

1039 >>> t_bin >= 11.0 # First bin edge >= arrival time 

1040 True 

1041 """ 

1042 if len(output_tedges) == 0: 

1043 return np.inf 

1044 

1045 # Compute exact arrival time (in days from tedges[0]) 

1046 t_arrival_days = compute_first_front_arrival_time(cin, flow, tedges, aquifer_pore_volume, sorption) 

1047 

1048 if not np.isfinite(t_arrival_days): 

1049 # No arrival, return last edge in days from output_tedges[0] 

1050 return (output_tedges[-1] - output_tedges[0]) / pd.Timedelta(days=1) 

1051 

1052 # Convert output_tedges to days from output_tedges[0] 

1053 # Find first bin edge >= t_arrival_days 

1054 # Note: t_arrival_days is measured from tedges[0], but output_tedges might have different start 

1055 # So we need to adjust the reference point 

1056 

1057 # Convert t_arrival from "days from tedges[0]" to "days from output_tedges[0]" 

1058 t_arrival_abs = tedges[0] + pd.Timedelta(days=t_arrival_days) 

1059 t_arrival_output_ref = (t_arrival_abs - output_tedges[0]) / pd.Timedelta(days=1) 

1060 

1061 # Find first output bin edge >= t_arrival 

1062 for t_edge in output_tedges: 

1063 t_edge_days = (t_edge - output_tedges[0]) / pd.Timedelta(days=1) 

1064 if t_edge_days >= t_arrival_output_ref: 

1065 return t_edge_days 

1066 

1067 # If no edge is >= t_arrival, return the last edge 

1068 # (This means all bins are before arrival) 

1069 return (output_tedges[-1] - output_tedges[0]) / pd.Timedelta(days=1)