Coverage for src / gwtransport / fronttracking / waves.py: 92%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-27 06:32 +0000

1""" 

2Wave Representation for Front Tracking. 

3 

4This module implements wave classes for representing characteristics, shocks, 

5and rarefaction waves in the front tracking algorithm. Each wave type knows 

6how to compute its position and concentration at any point in space-time. 

7 

8This file is part of gwtransport which is released under AGPL-3.0 license. 

9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

10""" 

11 

12from abc import ABC, abstractmethod 

13from dataclasses import dataclass, field 

14 

15from gwtransport.fronttracking.math import SorptionModel 

16 

17# Numerical tolerance constants 

18EPSILON_POSITION = 1e-15 # Tolerance for checking if two positions are equal 

19 

20 

21@dataclass 

22class Wave(ABC): 

23 """ 

24 Abstract base class for all wave types in front tracking. 

25 

26 All waves share common attributes and must implement methods for 

27 computing position and concentration. Waves can be active or inactive 

28 (deactivated waves are preserved for history but don't participate in 

29 future interactions). 

30 

31 Parameters 

32 ---------- 

33 t_start : float 

34 Time when wave forms [days]. 

35 v_start : float 

36 Position where wave forms [m³]. 

37 flow : float 

38 Flow rate at formation time [m³/day]. 

39 is_active : bool, optional 

40 Whether wave is currently active. Default True. 

41 """ 

42 

43 t_start: float 

44 """Time when wave forms [days].""" 

45 v_start: float 

46 """Position where wave forms [m³].""" 

47 flow: float 

48 """Flow rate at formation time [m³/day].""" 

49 is_active: bool = field(default=True, kw_only=True) 

50 """Whether wave is currently active.""" 

51 

52 @abstractmethod 

53 def position_at_time(self, t: float) -> float | None: 

54 """ 

55 Compute wave position at time t. 

56 

57 Parameters 

58 ---------- 

59 t : float 

60 Time [days]. 

61 

62 Returns 

63 ------- 

64 position : float or None 

65 Position [m³], or None if t < t_start or wave is inactive. 

66 """ 

67 

68 @abstractmethod 

69 def concentration_left(self) -> float: 

70 """ 

71 Get concentration on left (upstream) side of wave. 

72 

73 Returns 

74 ------- 

75 c_left : float 

76 Upstream concentration [mass/volume]. 

77 """ 

78 

79 @abstractmethod 

80 def concentration_right(self) -> float: 

81 """ 

82 Get concentration on right (downstream) side of wave. 

83 

84 Returns 

85 ------- 

86 c_right : float 

87 Downstream concentration [mass/volume]. 

88 """ 

89 

90 @abstractmethod 

91 def concentration_at_point(self, v: float, t: float) -> float | None: 

92 """ 

93 Compute concentration at point (v, t) if wave controls it. 

94 

95 Parameters 

96 ---------- 

97 v : float 

98 Position [m³]. 

99 t : float 

100 Time [days]. 

101 

102 Returns 

103 ------- 

104 concentration : float or None 

105 Concentration [mass/volume] if wave controls this point, None otherwise. 

106 """ 

107 

108 

109@dataclass 

110class CharacteristicWave(Wave): 

111 """ 

112 Characteristic line along which concentration is constant. 

113 

114 In smooth regions, concentration travels at velocity flow/R(C). Along 

115 each characteristic line, the concentration value is constant. This is 

116 the fundamental solution element for hyperbolic conservation laws. 

117 

118 Parameters 

119 ---------- 

120 t_start : float 

121 Formation time [days]. 

122 v_start : float 

123 Starting position [m³]. 

124 flow : float 

125 Flow rate [m³/day]. 

126 concentration : float 

127 Constant concentration carried [mass/volume]. 

128 sorption : FreundlichSorption or ConstantRetardation 

129 Sorption model determining velocity. 

130 is_active : bool, optional 

131 Activity status. Default True. 

132 

133 Examples 

134 -------- 

135 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

136 >>> char = CharacteristicWave( 

137 ... t_start=0.0, v_start=0.0, flow=100.0, concentration=5.0, sorption=sorption 

138 ... ) 

139 >>> char.velocity() 

140 50.0 

141 >>> char.position_at_time(10.0) 

142 500.0 

143 """ 

144 

145 concentration: float 

146 """Constant concentration carried [mass/volume].""" 

147 sorption: SorptionModel 

148 """Sorption model determining velocity.""" 

149 

150 def velocity(self) -> float: 

151 """ 

152 Compute characteristic velocity. 

153 

154 The velocity is v = flow / R(C), where R is the retardation factor. 

155 

156 Returns 

157 ------- 

158 velocity : float 

159 Characteristic velocity [m³/day]. 

160 """ 

161 return float(self.flow / self.sorption.retardation(self.concentration)) 

162 

163 def position_at_time(self, t: float) -> float | None: 

164 """ 

165 Compute position at time t. 

166 

167 Characteristics propagate linearly: V(t) = v_start + velocity*(t - t_start). 

168 

169 Parameters 

170 ---------- 

171 t : float 

172 Time [days]. 

173 

174 Returns 

175 ------- 

176 position : float or None 

177 Position [m³], or None if t < t_start or inactive. 

178 """ 

179 if t < self.t_start or not self.is_active: 

180 return None 

181 return self.v_start + self.velocity() * (t - self.t_start) 

182 

183 def concentration_left(self) -> float: 

184 """ 

185 Get upstream concentration (same as concentration for characteristics). 

186 

187 Returns 

188 ------- 

189 c_left : float 

190 Upstream concentration [mass/volume]. 

191 """ 

192 return self.concentration 

193 

194 def concentration_right(self) -> float: 

195 """ 

196 Get downstream concentration (same as concentration for characteristics). 

197 

198 Returns 

199 ------- 

200 c_right : float 

201 Downstream concentration [mass/volume]. 

202 """ 

203 return self.concentration 

204 

205 def concentration_at_point(self, v: float, t: float) -> float | None: 

206 """ 

207 Get concentration if point is on this characteristic. 

208 

209 For practical purposes, we check if the characteristic has reached 

210 position v by time t. 

211 

212 Parameters 

213 ---------- 

214 v : float 

215 Position [m³]. 

216 t : float 

217 Time [days]. 

218 

219 Returns 

220 ------- 

221 concentration : float or None 

222 Concentration if point is on characteristic, None otherwise. 

223 

224 Notes 

225 ----- 

226 In practice, this method is used by higher-level algorithms to 

227 determine which wave controls a given point. The exact point-on-line 

228 check is handled by the solver. 

229 """ 

230 v_at_t = self.position_at_time(t) 

231 if v_at_t is None: 

232 return None 

233 

234 # If characteristic has reached or passed this position 

235 if v_at_t >= v: 

236 return self.concentration 

237 

238 return None 

239 

240 

241@dataclass 

242class ShockWave(Wave): 

243 """ 

244 Shock wave (discontinuity) with jump in concentration. 

245 

246 Shocks form when faster water overtakes slower water, creating a sharp 

247 front. The shock velocity is determined by the Rankine-Hugoniot condition 

248 to ensure mass conservation across the discontinuity. 

249 

250 Parameters 

251 ---------- 

252 t_start : float 

253 Formation time [days]. 

254 v_start : float 

255 Formation position [m³]. 

256 flow : float 

257 Flow rate [m³/day]. 

258 c_left : float 

259 Concentration upstream (behind) shock [mass/volume]. 

260 c_right : float 

261 Concentration downstream (ahead of) shock [mass/volume]. 

262 sorption : FreundlichSorption or ConstantRetardation 

263 Sorption model. 

264 is_active : bool, optional 

265 Activity status. Default True. 

266 velocity : float, optional 

267 Shock velocity computed from Rankine-Hugoniot. Computed automatically 

268 if not provided. 

269 

270 Examples 

271 -------- 

272 >>> sorption = FreundlichSorption( 

273 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

274 ... ) 

275 >>> shock = ShockWave( 

276 ... t_start=0.0, 

277 ... v_start=0.0, 

278 ... flow=100.0, 

279 ... c_left=10.0, 

280 ... c_right=2.0, 

281 ... sorption=sorption, 

282 ... ) 

283 >>> shock.velocity > 0 

284 True 

285 >>> shock.satisfies_entropy() 

286 True 

287 """ 

288 

289 c_left: float 

290 """Concentration upstream (behind) shock [mass/volume].""" 

291 c_right: float 

292 """Concentration downstream (ahead of) shock [mass/volume].""" 

293 sorption: SorptionModel 

294 """Sorption model.""" 

295 velocity: float | None = None 

296 """Shock velocity computed from Rankine-Hugoniot [m³/day].""" 

297 

298 def __post_init__(self) -> None: 

299 """Compute shock velocity from Rankine-Hugoniot condition.""" 

300 if self.velocity is None: 

301 self.velocity = self.sorption.shock_velocity(self.c_left, self.c_right, self.flow) 

302 

303 def position_at_time(self, t: float) -> float | None: 

304 """ 

305 Compute shock position at time t. 

306 

307 Shock propagates at constant velocity: V(t) = v_start + velocity*(t - t_start). 

308 

309 Parameters 

310 ---------- 

311 t : float 

312 Time [days]. 

313 

314 Returns 

315 ------- 

316 position : float or None 

317 Position [m³], or None if t < t_start or inactive. 

318 

319 Raises 

320 ------ 

321 RuntimeError 

322 If shock velocity is None (should have been set in ``__post_init__``). 

323 """ 

324 if t < self.t_start or not self.is_active: 

325 return None 

326 if self.velocity is None: 

327 msg = "Shock velocity should be set in __post_init__" 

328 raise RuntimeError(msg) 

329 return self.v_start + self.velocity * (t - self.t_start) 

330 

331 def concentration_left(self) -> float: 

332 """ 

333 Get upstream concentration. 

334 

335 Returns 

336 ------- 

337 c_left : float 

338 Upstream concentration [mass/volume]. 

339 """ 

340 return self.c_left 

341 

342 def concentration_right(self) -> float: 

343 """ 

344 Get downstream concentration. 

345 

346 Returns 

347 ------- 

348 c_right : float 

349 Downstream concentration [mass/volume]. 

350 """ 

351 return self.c_right 

352 

353 def concentration_at_point(self, v: float, t: float) -> float | None: 

354 """ 

355 Get concentration at point based on which side of shock. 

356 

357 Parameters 

358 ---------- 

359 v : float 

360 Position [m³]. 

361 t : float 

362 Time [days]. 

363 

364 Returns 

365 ------- 

366 concentration : float or None 

367 c_left if upstream of shock, c_right if downstream, None if shock hasn't formed yet. 

368 

369 Notes 

370 ----- 

371 At the exact shock position, returns the average of left and right values. 

372 This is a convention for the singular point; in practice, the shock is 

373 infinitesimally thin. 

374 """ 

375 v_shock = self.position_at_time(t) 

376 if v_shock is None: 

377 return None 

378 

379 # Tolerance for exact shock position 

380 tol = 1e-15 

381 

382 if v < v_shock - tol: 

383 # Upstream of shock 

384 return self.c_left 

385 if v > v_shock + tol: 

386 # Downstream of shock 

387 return self.c_right 

388 # Exactly at shock (rarely happens in practice) 

389 return 0.5 * (self.c_left + self.c_right) 

390 

391 def satisfies_entropy(self) -> bool: 

392 """ 

393 Check if shock satisfies Lax entropy condition. 

394 

395 The entropy condition ensures characteristics flow INTO the shock 

396 from both sides, which is required for physical admissibility. 

397 

398 Returns 

399 ------- 

400 satisfies : bool 

401 True if shock satisfies entropy condition. 

402 

403 Raises 

404 ------ 

405 RuntimeError 

406 If shock velocity is None (should have been set in ``__post_init__``). 

407 

408 Notes 

409 ----- 

410 The condition is: lambda(c_left) > shock_velocity > lambda(c_right), 

411 where lambda(C) = flow / R(C) is the characteristic velocity. 

412 

413 Shocks that violate this condition are unphysical and should be 

414 replaced by rarefaction waves. 

415 """ 

416 if self.velocity is None: 

417 msg = "Shock velocity should be set in __post_init__" 

418 raise RuntimeError(msg) 

419 return self.sorption.check_entropy_condition(self.c_left, self.c_right, self.velocity, self.flow) 

420 

421 

422@dataclass 

423class RarefactionWave(Wave): 

424 """ 

425 Rarefaction (expansion fan) with smooth concentration gradient. 

426 

427 Rarefactions form when slower water follows faster water, creating an 

428 expanding region where concentration varies smoothly. The solution is 

429 self-similar: C = C(V/t). 

430 

431 Parameters 

432 ---------- 

433 t_start : float 

434 Formation time [days]. 

435 v_start : float 

436 Formation position [m³]. 

437 flow : float 

438 Flow rate [m³/day]. 

439 c_head : float 

440 Concentration at leading edge (faster) [mass/volume]. 

441 c_tail : float 

442 Concentration at trailing edge (slower) [mass/volume]. 

443 sorption : FreundlichSorption or ConstantRetardation 

444 Sorption model (must be concentration-dependent). 

445 is_active : bool, optional 

446 Activity status. Default True. 

447 

448 Raises 

449 ------ 

450 ValueError 

451 If head velocity <= tail velocity (would be compression, not rarefaction). 

452 

453 Examples 

454 -------- 

455 >>> sorption = FreundlichSorption( 

456 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

457 ... ) 

458 >>> raref = RarefactionWave( 

459 ... t_start=0.0, 

460 ... v_start=0.0, 

461 ... flow=100.0, 

462 ... c_head=10.0, 

463 ... c_tail=2.0, 

464 ... sorption=sorption, 

465 ... ) 

466 >>> raref.head_velocity() > raref.tail_velocity() 

467 True 

468 >>> raref.contains_point(v=150.0, t=20.0) 

469 True 

470 """ 

471 

472 c_head: float 

473 """Concentration at leading edge (faster) [mass/volume].""" 

474 c_tail: float 

475 """Concentration at trailing edge (slower) [mass/volume].""" 

476 sorption: SorptionModel 

477 """Sorption model (must be concentration-dependent).""" 

478 

479 def __post_init__(self): 

480 """ 

481 Verify this is actually a rarefaction (head faster than tail). 

482 

483 Raises 

484 ------ 

485 ValueError 

486 If head velocity is not greater than tail velocity, indicating a 

487 compression wave (shock) rather than a rarefaction. 

488 """ 

489 v_head = self.head_velocity() 

490 v_tail = self.tail_velocity() 

491 

492 if v_head <= v_tail: 

493 msg = ( 

494 f"Not a rarefaction: head_velocity={v_head:.3f} <= tail_velocity={v_tail:.3f}. " 

495 f"This would be a compression (shock) instead." 

496 ) 

497 raise ValueError(msg) 

498 

499 def head_velocity(self) -> float: 

500 """ 

501 Compute velocity of rarefaction head (leading edge). 

502 

503 Returns 

504 ------- 

505 velocity : float 

506 Head velocity [m³/day]. 

507 """ 

508 return float(self.flow / self.sorption.retardation(self.c_head)) 

509 

510 def tail_velocity(self) -> float: 

511 """ 

512 Compute velocity of rarefaction tail (trailing edge). 

513 

514 Returns 

515 ------- 

516 velocity : float 

517 Tail velocity [m³/day]. 

518 """ 

519 return float(self.flow / self.sorption.retardation(self.c_tail)) 

520 

521 def head_position_at_time(self, t: float) -> float | None: 

522 """ 

523 Compute position of rarefaction head at time t. 

524 

525 Parameters 

526 ---------- 

527 t : float 

528 Time [days]. 

529 

530 Returns 

531 ------- 

532 position : float or None 

533 Head position [m³], or None if t < t_start or inactive. 

534 """ 

535 if t < self.t_start or not self.is_active: 

536 return None 

537 return self.v_start + self.head_velocity() * (t - self.t_start) 

538 

539 def tail_position_at_time(self, t: float) -> float | None: 

540 """ 

541 Compute position of rarefaction tail at time t. 

542 

543 Parameters 

544 ---------- 

545 t : float 

546 Time [days]. 

547 

548 Returns 

549 ------- 

550 position : float or None 

551 Tail position [m³], or None if t < t_start or inactive. 

552 """ 

553 if t < self.t_start or not self.is_active: 

554 return None 

555 return self.v_start + self.tail_velocity() * (t - self.t_start) 

556 

557 def position_at_time(self, t: float) -> float | None: 

558 """ 

559 Return head position (leading edge of rarefaction). 

560 

561 This implements the abstract Wave method. 

562 

563 Parameters 

564 ---------- 

565 t : float 

566 Time [days]. 

567 

568 Returns 

569 ------- 

570 position : float or None 

571 Head position [m³]. 

572 """ 

573 return self.head_position_at_time(t) 

574 

575 def contains_point(self, v: float, t: float) -> bool: 

576 """ 

577 Check if point (v, t) is inside the rarefaction fan. 

578 

579 Parameters 

580 ---------- 

581 v : float 

582 Position [m³]. 

583 t : float 

584 Time [days]. 

585 

586 Returns 

587 ------- 

588 contains : bool 

589 True if point is between tail and head. 

590 """ 

591 if t <= self.t_start or not self.is_active: 

592 return False 

593 

594 v_head = self.head_position_at_time(t) 

595 v_tail = self.tail_position_at_time(t) 

596 

597 if v_head is None or v_tail is None: 

598 return False 

599 

600 return v_tail <= v <= v_head 

601 

602 def concentration_left(self) -> float: 

603 """ 

604 Get upstream concentration (tail). 

605 

606 Returns 

607 ------- 

608 c_left : float 

609 Upstream concentration at the trailing edge [mass/volume]. 

610 """ 

611 return self.c_tail 

612 

613 def concentration_right(self) -> float: 

614 """ 

615 Get downstream concentration (head). 

616 

617 Returns 

618 ------- 

619 c_right : float 

620 Downstream concentration at the leading edge [mass/volume]. 

621 """ 

622 return self.c_head 

623 

624 def concentration_at_point(self, v: float, t: float) -> float | None: 

625 """ 

626 Compute concentration at point (v, t) via self-similar solution. 

627 

628 Within the rarefaction fan, concentration varies smoothly according to: 

629 R(C) = flow * (t - t_start) / (v - v_start) 

630 

631 This is inverted to find C using the sorption model. 

632 

633 Parameters 

634 ---------- 

635 v : float 

636 Position [m³]. 

637 t : float 

638 Time [days]. 

639 

640 Returns 

641 ------- 

642 concentration : float or None 

643 Concentration if point is in rarefaction, None otherwise. 

644 

645 Notes 

646 ----- 

647 The self-similar solution automatically maintains mass balance and 

648 provides the exact analytical form of the concentration profile. 

649 

650 For ConstantRetardation, rarefactions don't form (all concentrations 

651 travel at same speed), so this returns None. 

652 

653 Examples 

654 -------- 

655 >>> sorption = FreundlichSorption( 

656 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

657 ... ) 

658 >>> raref = RarefactionWave(0.0, 0.0, 100.0, 10.0, 2.0, sorption) 

659 >>> c = raref.concentration_at_point(v=150.0, t=20.0) 

660 >>> c is not None 

661 True 

662 >>> 2.0 <= c <= 10.0 

663 True 

664 """ 

665 # Special case: at origin of rarefaction (before contains_point check) 

666 if abs(v - self.v_start) < EPSILON_POSITION and t >= self.t_start: 

667 return self.c_tail 

668 

669 if not self.contains_point(v, t): 

670 return None 

671 

672 # Self-similar solution: R(C) = flow*(t - t_start)/(v - v_start) 

673 r_target = self.flow * (t - self.t_start) / (v - self.v_start) 

674 

675 if r_target <= 1.0: 

676 return None # Unphysical 

677 

678 # Invert R to get C 

679 # For ConstantRetardation, this would raise NotImplementedError 

680 try: 

681 c = self.sorption.concentration_from_retardation(r_target) 

682 except NotImplementedError: 

683 # ConstantRetardation case - rarefactions don't form 

684 return None 

685 

686 # Verify C is in valid range [c_tail, c_head] 

687 c_min = min(self.c_tail, self.c_head) 

688 c_max = max(self.c_tail, self.c_head) 

689 

690 c_float = float(c) 

691 if c_min <= c_float <= c_max: 

692 return c_float 

693 

694 return None