Coverage for src / gwtransport / fronttracking / waves.py: 92%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-04 18:51 +0000

1""" 

2Wave Representation for Front Tracking. 

3 

4This module implements wave classes for representing characteristics, shocks, 

5and rarefaction waves in the front tracking algorithm. Each wave type knows 

6how to compute its position and concentration at any point in space-time. 

7 

8This file is part of gwtransport which is released under AGPL-3.0 license. 

9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details. 

10""" 

11 

12from abc import ABC, abstractmethod 

13from dataclasses import dataclass, field 

14 

15from gwtransport.fronttracking.math import ConstantRetardation, FreundlichSorption 

16 

17# Numerical tolerance constants 

18EPSILON_POSITION = 1e-15 # Tolerance for checking if two positions are equal 

19 

20 

21@dataclass 

22class Wave(ABC): 

23 """ 

24 Abstract base class for all wave types in front tracking. 

25 

26 All waves share common attributes and must implement methods for 

27 computing position and concentration. Waves can be active or inactive 

28 (deactivated waves are preserved for history but don't participate in 

29 future interactions). 

30 

31 Parameters 

32 ---------- 

33 t_start : float 

34 Time when wave forms [days]. 

35 v_start : float 

36 Position where wave forms [m³]. 

37 flow : float 

38 Flow rate at formation time [m³/day]. 

39 is_active : bool, optional 

40 Whether wave is currently active. Default True. 

41 """ 

42 

43 t_start: float 

44 """Time when wave forms [days].""" 

45 v_start: float 

46 """Position where wave forms [m³].""" 

47 flow: float 

48 """Flow rate at formation time [m³/day].""" 

49 is_active: bool = field(default=True, kw_only=True) 

50 """Whether wave is currently active.""" 

51 

52 @abstractmethod 

53 def position_at_time(self, t: float) -> float | None: 

54 """ 

55 Compute wave position at time t. 

56 

57 Parameters 

58 ---------- 

59 t : float 

60 Time [days]. 

61 

62 Returns 

63 ------- 

64 position : float or None 

65 Position [m³], or None if t < t_start or wave is inactive. 

66 """ 

67 

68 @abstractmethod 

69 def concentration_left(self) -> float: 

70 """ 

71 Get concentration on left (upstream) side of wave. 

72 

73 Returns 

74 ------- 

75 c_left : float 

76 Upstream concentration [mass/volume]. 

77 """ 

78 

79 @abstractmethod 

80 def concentration_right(self) -> float: 

81 """ 

82 Get concentration on right (downstream) side of wave. 

83 

84 Returns 

85 ------- 

86 c_right : float 

87 Downstream concentration [mass/volume]. 

88 """ 

89 

90 @abstractmethod 

91 def concentration_at_point(self, v: float, t: float) -> float | None: 

92 """ 

93 Compute concentration at point (v, t) if wave controls it. 

94 

95 Parameters 

96 ---------- 

97 v : float 

98 Position [m³]. 

99 t : float 

100 Time [days]. 

101 

102 Returns 

103 ------- 

104 concentration : float or None 

105 Concentration [mass/volume] if wave controls this point, None otherwise. 

106 """ 

107 

108 

109@dataclass 

110class CharacteristicWave(Wave): 

111 """ 

112 Characteristic line along which concentration is constant. 

113 

114 In smooth regions, concentration travels at velocity flow/R(C). Along 

115 each characteristic line, the concentration value is constant. This is 

116 the fundamental solution element for hyperbolic conservation laws. 

117 

118 Parameters 

119 ---------- 

120 t_start : float 

121 Formation time [days]. 

122 v_start : float 

123 Starting position [m³]. 

124 flow : float 

125 Flow rate [m³/day]. 

126 concentration : float 

127 Constant concentration carried [mass/volume]. 

128 sorption : FreundlichSorption or ConstantRetardation 

129 Sorption model determining velocity. 

130 is_active : bool, optional 

131 Activity status. Default True. 

132 

133 Examples 

134 -------- 

135 >>> sorption = ConstantRetardation(retardation_factor=2.0) 

136 >>> char = CharacteristicWave( 

137 ... t_start=0.0, v_start=0.0, flow=100.0, concentration=5.0, sorption=sorption 

138 ... ) 

139 >>> char.velocity() 

140 50.0 

141 >>> char.position_at_time(10.0) 

142 500.0 

143 """ 

144 

145 concentration: float 

146 """Constant concentration carried [mass/volume].""" 

147 sorption: FreundlichSorption | ConstantRetardation 

148 """Sorption model determining velocity.""" 

149 

150 def velocity(self) -> float: 

151 """ 

152 Compute characteristic velocity. 

153 

154 The velocity is v = flow / R(C), where R is the retardation factor. 

155 

156 Returns 

157 ------- 

158 velocity : float 

159 Characteristic velocity [m³/day]. 

160 """ 

161 return float(self.flow / self.sorption.retardation(self.concentration)) 

162 

163 def position_at_time(self, t: float) -> float | None: 

164 """ 

165 Compute position at time t. 

166 

167 Characteristics propagate linearly: V(t) = v_start + velocity*(t - t_start). 

168 

169 Parameters 

170 ---------- 

171 t : float 

172 Time [days]. 

173 

174 Returns 

175 ------- 

176 position : float or None 

177 Position [m³], or None if t < t_start or inactive. 

178 """ 

179 if t < self.t_start or not self.is_active: 

180 return None 

181 return self.v_start + self.velocity() * (t - self.t_start) 

182 

183 def concentration_left(self) -> float: 

184 """Get upstream concentration (same as concentration for characteristics).""" 

185 return self.concentration 

186 

187 def concentration_right(self) -> float: 

188 """Get downstream concentration (same as concentration for characteristics).""" 

189 return self.concentration 

190 

191 def concentration_at_point(self, v: float, t: float) -> float | None: 

192 """ 

193 Get concentration if point is on this characteristic. 

194 

195 For practical purposes, we check if the characteristic has reached 

196 position v by time t. 

197 

198 Parameters 

199 ---------- 

200 v : float 

201 Position [m³]. 

202 t : float 

203 Time [days]. 

204 

205 Returns 

206 ------- 

207 concentration : float or None 

208 Concentration if point is on characteristic, None otherwise. 

209 

210 Notes 

211 ----- 

212 In practice, this method is used by higher-level algorithms to 

213 determine which wave controls a given point. The exact point-on-line 

214 check is handled by the solver. 

215 """ 

216 v_at_t = self.position_at_time(t) 

217 if v_at_t is None: 

218 return None 

219 

220 # If characteristic has reached or passed this position 

221 if v_at_t >= v: 

222 return self.concentration 

223 

224 return None 

225 

226 

227@dataclass 

228class ShockWave(Wave): 

229 """ 

230 Shock wave (discontinuity) with jump in concentration. 

231 

232 Shocks form when faster water overtakes slower water, creating a sharp 

233 front. The shock velocity is determined by the Rankine-Hugoniot condition 

234 to ensure mass conservation across the discontinuity. 

235 

236 Parameters 

237 ---------- 

238 t_start : float 

239 Formation time [days]. 

240 v_start : float 

241 Formation position [m³]. 

242 flow : float 

243 Flow rate [m³/day]. 

244 c_left : float 

245 Concentration upstream (behind) shock [mass/volume]. 

246 c_right : float 

247 Concentration downstream (ahead of) shock [mass/volume]. 

248 sorption : FreundlichSorption or ConstantRetardation 

249 Sorption model. 

250 is_active : bool, optional 

251 Activity status. Default True. 

252 velocity : float, optional 

253 Shock velocity computed from Rankine-Hugoniot. Computed automatically 

254 if not provided. 

255 

256 Examples 

257 -------- 

258 >>> sorption = FreundlichSorption( 

259 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

260 ... ) 

261 >>> shock = ShockWave( 

262 ... t_start=0.0, 

263 ... v_start=0.0, 

264 ... flow=100.0, 

265 ... c_left=10.0, 

266 ... c_right=2.0, 

267 ... sorption=sorption, 

268 ... ) 

269 >>> shock.velocity > 0 

270 True 

271 >>> shock.satisfies_entropy() 

272 True 

273 """ 

274 

275 c_left: float 

276 """Concentration upstream (behind) shock [mass/volume].""" 

277 c_right: float 

278 """Concentration downstream (ahead of) shock [mass/volume].""" 

279 sorption: FreundlichSorption | ConstantRetardation 

280 """Sorption model.""" 

281 velocity: float | None = None 

282 """Shock velocity computed from Rankine-Hugoniot [m³/day].""" 

283 

284 def __post_init__(self) -> None: 

285 """Compute shock velocity from Rankine-Hugoniot condition.""" 

286 if self.velocity is None: 

287 self.velocity = self.sorption.shock_velocity(self.c_left, self.c_right, self.flow) 

288 

289 def position_at_time(self, t: float) -> float | None: 

290 """ 

291 Compute shock position at time t. 

292 

293 Shock propagates at constant velocity: V(t) = v_start + velocity*(t - t_start). 

294 

295 Parameters 

296 ---------- 

297 t : float 

298 Time [days]. 

299 

300 Returns 

301 ------- 

302 position : float or None 

303 Position [m³], or None if t < t_start or inactive. 

304 """ 

305 if t < self.t_start or not self.is_active: 

306 return None 

307 if self.velocity is None: 

308 msg = "Shock velocity should be set in __post_init__" 

309 raise RuntimeError(msg) 

310 return self.v_start + self.velocity * (t - self.t_start) 

311 

312 def concentration_left(self) -> float: 

313 """Get upstream concentration.""" 

314 return self.c_left 

315 

316 def concentration_right(self) -> float: 

317 """Get downstream concentration.""" 

318 return self.c_right 

319 

320 def concentration_at_point(self, v: float, t: float) -> float | None: 

321 """ 

322 Get concentration at point based on which side of shock. 

323 

324 Parameters 

325 ---------- 

326 v : float 

327 Position [m³]. 

328 t : float 

329 Time [days]. 

330 

331 Returns 

332 ------- 

333 concentration : float or None 

334 c_left if upstream of shock, c_right if downstream, None if shock hasn't formed yet. 

335 

336 Notes 

337 ----- 

338 At the exact shock position, returns the average of left and right values. 

339 This is a convention for the singular point; in practice, the shock is 

340 infinitesimally thin. 

341 """ 

342 v_shock = self.position_at_time(t) 

343 if v_shock is None: 

344 return None 

345 

346 # Tolerance for exact shock position 

347 tol = 1e-15 

348 

349 if v < v_shock - tol: 

350 # Upstream of shock 

351 return self.c_left 

352 if v > v_shock + tol: 

353 # Downstream of shock 

354 return self.c_right 

355 # Exactly at shock (rarely happens in practice) 

356 return 0.5 * (self.c_left + self.c_right) 

357 

358 def satisfies_entropy(self) -> bool: 

359 """ 

360 Check if shock satisfies Lax entropy condition. 

361 

362 The entropy condition ensures characteristics flow INTO the shock 

363 from both sides, which is required for physical admissibility. 

364 

365 Returns 

366 ------- 

367 satisfies : bool 

368 True if shock satisfies entropy condition. 

369 

370 Notes 

371 ----- 

372 The condition is: lambda(c_left) > shock_velocity > lambda(c_right), 

373 where lambda(C) = flow / R(C) is the characteristic velocity. 

374 

375 Shocks that violate this condition are unphysical and should be 

376 replaced by rarefaction waves. 

377 """ 

378 if self.velocity is None: 

379 msg = "Shock velocity should be set in __post_init__" 

380 raise RuntimeError(msg) 

381 return self.sorption.check_entropy_condition(self.c_left, self.c_right, self.velocity, self.flow) 

382 

383 

384@dataclass 

385class RarefactionWave(Wave): 

386 """ 

387 Rarefaction (expansion fan) with smooth concentration gradient. 

388 

389 Rarefactions form when slower water follows faster water, creating an 

390 expanding region where concentration varies smoothly. The solution is 

391 self-similar: C = C(V/t). 

392 

393 Parameters 

394 ---------- 

395 t_start : float 

396 Formation time [days]. 

397 v_start : float 

398 Formation position [m³]. 

399 flow : float 

400 Flow rate [m³/day]. 

401 c_head : float 

402 Concentration at leading edge (faster) [mass/volume]. 

403 c_tail : float 

404 Concentration at trailing edge (slower) [mass/volume]. 

405 sorption : FreundlichSorption or ConstantRetardation 

406 Sorption model (must be concentration-dependent). 

407 is_active : bool, optional 

408 Activity status. Default True. 

409 

410 Raises 

411 ------ 

412 ValueError 

413 If head velocity <= tail velocity (would be compression, not rarefaction). 

414 

415 Examples 

416 -------- 

417 >>> sorption = FreundlichSorption( 

418 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

419 ... ) 

420 >>> raref = RarefactionWave( 

421 ... t_start=0.0, 

422 ... v_start=0.0, 

423 ... flow=100.0, 

424 ... c_head=10.0, 

425 ... c_tail=2.0, 

426 ... sorption=sorption, 

427 ... ) 

428 >>> raref.head_velocity() > raref.tail_velocity() 

429 True 

430 >>> raref.contains_point(v=150.0, t=20.0) 

431 True 

432 """ 

433 

434 c_head: float 

435 """Concentration at leading edge (faster) [mass/volume].""" 

436 c_tail: float 

437 """Concentration at trailing edge (slower) [mass/volume].""" 

438 sorption: FreundlichSorption | ConstantRetardation 

439 """Sorption model (must be concentration-dependent).""" 

440 

441 def __post_init__(self): 

442 """Verify this is actually a rarefaction (head faster than tail).""" 

443 v_head = self.head_velocity() 

444 v_tail = self.tail_velocity() 

445 

446 if v_head <= v_tail: 

447 msg = ( 

448 f"Not a rarefaction: head_velocity={v_head:.3f} <= tail_velocity={v_tail:.3f}. " 

449 f"This would be a compression (shock) instead." 

450 ) 

451 raise ValueError(msg) 

452 

453 def head_velocity(self) -> float: 

454 """ 

455 Compute velocity of rarefaction head (leading edge). 

456 

457 Returns 

458 ------- 

459 velocity : float 

460 Head velocity [m³/day]. 

461 """ 

462 return float(self.flow / self.sorption.retardation(self.c_head)) 

463 

464 def tail_velocity(self) -> float: 

465 """ 

466 Compute velocity of rarefaction tail (trailing edge). 

467 

468 Returns 

469 ------- 

470 velocity : float 

471 Tail velocity [m³/day]. 

472 """ 

473 return float(self.flow / self.sorption.retardation(self.c_tail)) 

474 

475 def head_position_at_time(self, t: float) -> float | None: 

476 """ 

477 Compute position of rarefaction head at time t. 

478 

479 Parameters 

480 ---------- 

481 t : float 

482 Time [days]. 

483 

484 Returns 

485 ------- 

486 position : float or None 

487 Head position [m³], or None if t < t_start or inactive. 

488 """ 

489 if t < self.t_start or not self.is_active: 

490 return None 

491 return self.v_start + self.head_velocity() * (t - self.t_start) 

492 

493 def tail_position_at_time(self, t: float) -> float | None: 

494 """ 

495 Compute position of rarefaction tail at time t. 

496 

497 Parameters 

498 ---------- 

499 t : float 

500 Time [days]. 

501 

502 Returns 

503 ------- 

504 position : float or None 

505 Tail position [m³], or None if t < t_start or inactive. 

506 """ 

507 if t < self.t_start or not self.is_active: 

508 return None 

509 return self.v_start + self.tail_velocity() * (t - self.t_start) 

510 

511 def position_at_time(self, t: float) -> float | None: 

512 """ 

513 Return head position (leading edge of rarefaction). 

514 

515 This implements the abstract Wave method. 

516 

517 Parameters 

518 ---------- 

519 t : float 

520 Time [days]. 

521 

522 Returns 

523 ------- 

524 position : float or None 

525 Head position [m³]. 

526 """ 

527 return self.head_position_at_time(t) 

528 

529 def contains_point(self, v: float, t: float) -> bool: 

530 """ 

531 Check if point (v, t) is inside the rarefaction fan. 

532 

533 Parameters 

534 ---------- 

535 v : float 

536 Position [m³]. 

537 t : float 

538 Time [days]. 

539 

540 Returns 

541 ------- 

542 contains : bool 

543 True if point is between tail and head. 

544 """ 

545 if t <= self.t_start or not self.is_active: 

546 return False 

547 

548 v_head = self.head_position_at_time(t) 

549 v_tail = self.tail_position_at_time(t) 

550 

551 if v_head is None or v_tail is None: 

552 return False 

553 

554 return v_tail <= v <= v_head 

555 

556 def concentration_left(self) -> float: 

557 """Get upstream concentration (tail).""" 

558 return self.c_tail 

559 

560 def concentration_right(self) -> float: 

561 """Get downstream concentration (head).""" 

562 return self.c_head 

563 

564 def concentration_at_point(self, v: float, t: float) -> float | None: 

565 """ 

566 Compute concentration at point (v, t) via self-similar solution. 

567 

568 Within the rarefaction fan, concentration varies smoothly according to: 

569 R(C) = flow * (t - t_start) / (v - v_start) 

570 

571 This is inverted to find C using the sorption model. 

572 

573 Parameters 

574 ---------- 

575 v : float 

576 Position [m³]. 

577 t : float 

578 Time [days]. 

579 

580 Returns 

581 ------- 

582 concentration : float or None 

583 Concentration if point is in rarefaction, None otherwise. 

584 

585 Notes 

586 ----- 

587 The self-similar solution automatically maintains mass balance and 

588 provides the exact analytical form of the concentration profile. 

589 

590 For ConstantRetardation, rarefactions don't form (all concentrations 

591 travel at same speed), so this returns None. 

592 

593 Examples 

594 -------- 

595 >>> sorption = FreundlichSorption( 

596 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3 

597 ... ) 

598 >>> raref = RarefactionWave(0.0, 0.0, 100.0, 10.0, 2.0, sorption) 

599 >>> c = raref.concentration_at_point(v=150.0, t=20.0) 

600 >>> c is not None 

601 True 

602 >>> 2.0 <= c <= 10.0 

603 True 

604 """ 

605 # Special case: at origin of rarefaction (before contains_point check) 

606 if abs(v - self.v_start) < EPSILON_POSITION and t >= self.t_start: 

607 return self.c_tail 

608 

609 if not self.contains_point(v, t): 

610 return None 

611 

612 # Self-similar solution: R(C) = flow*(t - t_start)/(v - v_start) 

613 r_target = self.flow * (t - self.t_start) / (v - self.v_start) 

614 

615 if r_target <= 1.0: 

616 return None # Unphysical 

617 

618 # Invert R to get C 

619 # For ConstantRetardation, this would raise NotImplementedError 

620 try: 

621 c = self.sorption.concentration_from_retardation(r_target) 

622 except NotImplementedError: 

623 # ConstantRetardation case - rarefactions don't form 

624 return None 

625 

626 # Verify C is in valid range [c_tail, c_head] 

627 c_min = min(self.c_tail, self.c_head) 

628 c_max = max(self.c_tail, self.c_head) 

629 

630 c_float = float(c) 

631 if c_min <= c_float <= c_max: 

632 return c_float 

633 

634 return None