Coverage for src / gwtransport / fronttracking / waves.py: 92%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
1"""
2Wave Representation for Front Tracking.
4This module implements wave classes for representing characteristics, shocks,
5and rarefaction waves in the front tracking algorithm. Each wave type knows
6how to compute its position and concentration at any point in space-time.
8This file is part of gwtransport which is released under AGPL-3.0 license.
9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
10"""
12from abc import ABC, abstractmethod
13from dataclasses import dataclass, field
15from gwtransport.fronttracking.math import SorptionModel
17# Numerical tolerance constants
18EPSILON_POSITION = 1e-15 # Tolerance for checking if two positions are equal
21@dataclass
22class Wave(ABC):
23 """
24 Abstract base class for all wave types in front tracking.
26 All waves share common attributes and must implement methods for
27 computing position and concentration. Waves can be active or inactive
28 (deactivated waves are preserved for history but don't participate in
29 future interactions).
31 Parameters
32 ----------
33 t_start : float
34 Time when wave forms [days].
35 v_start : float
36 Position where wave forms [m³].
37 flow : float
38 Flow rate at formation time [m³/day].
39 is_active : bool, optional
40 Whether wave is currently active. Default True.
41 """
43 t_start: float
44 """Time when wave forms [days]."""
45 v_start: float
46 """Position where wave forms [m³]."""
47 flow: float
48 """Flow rate at formation time [m³/day]."""
49 is_active: bool = field(default=True, kw_only=True)
50 """Whether wave is currently active."""
52 @abstractmethod
53 def position_at_time(self, t: float) -> float | None:
54 """
55 Compute wave position at time t.
57 Parameters
58 ----------
59 t : float
60 Time [days].
62 Returns
63 -------
64 position : float or None
65 Position [m³], or None if t < t_start or wave is inactive.
66 """
68 @abstractmethod
69 def concentration_left(self) -> float:
70 """
71 Get concentration on left (upstream) side of wave.
73 Returns
74 -------
75 c_left : float
76 Upstream concentration [mass/volume].
77 """
79 @abstractmethod
80 def concentration_right(self) -> float:
81 """
82 Get concentration on right (downstream) side of wave.
84 Returns
85 -------
86 c_right : float
87 Downstream concentration [mass/volume].
88 """
90 @abstractmethod
91 def concentration_at_point(self, v: float, t: float) -> float | None:
92 """
93 Compute concentration at point (v, t) if wave controls it.
95 Parameters
96 ----------
97 v : float
98 Position [m³].
99 t : float
100 Time [days].
102 Returns
103 -------
104 concentration : float or None
105 Concentration [mass/volume] if wave controls this point, None otherwise.
106 """
109@dataclass
110class CharacteristicWave(Wave):
111 """
112 Characteristic line along which concentration is constant.
114 In smooth regions, concentration travels at velocity flow/R(C). Along
115 each characteristic line, the concentration value is constant. This is
116 the fundamental solution element for hyperbolic conservation laws.
118 Parameters
119 ----------
120 t_start : float
121 Formation time [days].
122 v_start : float
123 Starting position [m³].
124 flow : float
125 Flow rate [m³/day].
126 concentration : float
127 Constant concentration carried [mass/volume].
128 sorption : FreundlichSorption or ConstantRetardation
129 Sorption model determining velocity.
130 is_active : bool, optional
131 Activity status. Default True.
133 Examples
134 --------
135 >>> sorption = ConstantRetardation(retardation_factor=2.0)
136 >>> char = CharacteristicWave(
137 ... t_start=0.0, v_start=0.0, flow=100.0, concentration=5.0, sorption=sorption
138 ... )
139 >>> char.velocity()
140 50.0
141 >>> char.position_at_time(10.0)
142 500.0
143 """
145 concentration: float
146 """Constant concentration carried [mass/volume]."""
147 sorption: SorptionModel
148 """Sorption model determining velocity."""
150 def velocity(self) -> float:
151 """
152 Compute characteristic velocity.
154 The velocity is v = flow / R(C), where R is the retardation factor.
156 Returns
157 -------
158 velocity : float
159 Characteristic velocity [m³/day].
160 """
161 return float(self.flow / self.sorption.retardation(self.concentration))
163 def position_at_time(self, t: float) -> float | None:
164 """
165 Compute position at time t.
167 Characteristics propagate linearly: V(t) = v_start + velocity*(t - t_start).
169 Parameters
170 ----------
171 t : float
172 Time [days].
174 Returns
175 -------
176 position : float or None
177 Position [m³], or None if t < t_start or inactive.
178 """
179 if t < self.t_start or not self.is_active:
180 return None
181 return self.v_start + self.velocity() * (t - self.t_start)
183 def concentration_left(self) -> float:
184 """
185 Get upstream concentration (same as concentration for characteristics).
187 Returns
188 -------
189 c_left : float
190 Upstream concentration [mass/volume].
191 """
192 return self.concentration
194 def concentration_right(self) -> float:
195 """
196 Get downstream concentration (same as concentration for characteristics).
198 Returns
199 -------
200 c_right : float
201 Downstream concentration [mass/volume].
202 """
203 return self.concentration
205 def concentration_at_point(self, v: float, t: float) -> float | None:
206 """
207 Get concentration if point is on this characteristic.
209 For practical purposes, we check if the characteristic has reached
210 position v by time t.
212 Parameters
213 ----------
214 v : float
215 Position [m³].
216 t : float
217 Time [days].
219 Returns
220 -------
221 concentration : float or None
222 Concentration if point is on characteristic, None otherwise.
224 Notes
225 -----
226 In practice, this method is used by higher-level algorithms to
227 determine which wave controls a given point. The exact point-on-line
228 check is handled by the solver.
229 """
230 v_at_t = self.position_at_time(t)
231 if v_at_t is None:
232 return None
234 # If characteristic has reached or passed this position
235 if v_at_t >= v:
236 return self.concentration
238 return None
241@dataclass
242class ShockWave(Wave):
243 """
244 Shock wave (discontinuity) with jump in concentration.
246 Shocks form when faster water overtakes slower water, creating a sharp
247 front. The shock velocity is determined by the Rankine-Hugoniot condition
248 to ensure mass conservation across the discontinuity.
250 Parameters
251 ----------
252 t_start : float
253 Formation time [days].
254 v_start : float
255 Formation position [m³].
256 flow : float
257 Flow rate [m³/day].
258 c_left : float
259 Concentration upstream (behind) shock [mass/volume].
260 c_right : float
261 Concentration downstream (ahead of) shock [mass/volume].
262 sorption : FreundlichSorption or ConstantRetardation
263 Sorption model.
264 is_active : bool, optional
265 Activity status. Default True.
266 velocity : float, optional
267 Shock velocity computed from Rankine-Hugoniot. Computed automatically
268 if not provided.
270 Examples
271 --------
272 >>> sorption = FreundlichSorption(
273 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
274 ... )
275 >>> shock = ShockWave(
276 ... t_start=0.0,
277 ... v_start=0.0,
278 ... flow=100.0,
279 ... c_left=10.0,
280 ... c_right=2.0,
281 ... sorption=sorption,
282 ... )
283 >>> shock.velocity > 0
284 True
285 >>> shock.satisfies_entropy()
286 True
287 """
289 c_left: float
290 """Concentration upstream (behind) shock [mass/volume]."""
291 c_right: float
292 """Concentration downstream (ahead of) shock [mass/volume]."""
293 sorption: SorptionModel
294 """Sorption model."""
295 velocity: float | None = None
296 """Shock velocity computed from Rankine-Hugoniot [m³/day]."""
298 def __post_init__(self) -> None:
299 """Compute shock velocity from Rankine-Hugoniot condition."""
300 if self.velocity is None:
301 self.velocity = self.sorption.shock_velocity(self.c_left, self.c_right, self.flow)
303 def position_at_time(self, t: float) -> float | None:
304 """
305 Compute shock position at time t.
307 Shock propagates at constant velocity: V(t) = v_start + velocity*(t - t_start).
309 Parameters
310 ----------
311 t : float
312 Time [days].
314 Returns
315 -------
316 position : float or None
317 Position [m³], or None if t < t_start or inactive.
319 Raises
320 ------
321 RuntimeError
322 If shock velocity is None (should have been set in ``__post_init__``).
323 """
324 if t < self.t_start or not self.is_active:
325 return None
326 if self.velocity is None:
327 msg = "Shock velocity should be set in __post_init__"
328 raise RuntimeError(msg)
329 return self.v_start + self.velocity * (t - self.t_start)
331 def concentration_left(self) -> float:
332 """
333 Get upstream concentration.
335 Returns
336 -------
337 c_left : float
338 Upstream concentration [mass/volume].
339 """
340 return self.c_left
342 def concentration_right(self) -> float:
343 """
344 Get downstream concentration.
346 Returns
347 -------
348 c_right : float
349 Downstream concentration [mass/volume].
350 """
351 return self.c_right
353 def concentration_at_point(self, v: float, t: float) -> float | None:
354 """
355 Get concentration at point based on which side of shock.
357 Parameters
358 ----------
359 v : float
360 Position [m³].
361 t : float
362 Time [days].
364 Returns
365 -------
366 concentration : float or None
367 c_left if upstream of shock, c_right if downstream, None if shock hasn't formed yet.
369 Notes
370 -----
371 At the exact shock position, returns the average of left and right values.
372 This is a convention for the singular point; in practice, the shock is
373 infinitesimally thin.
374 """
375 v_shock = self.position_at_time(t)
376 if v_shock is None:
377 return None
379 # Tolerance for exact shock position
380 tol = 1e-15
382 if v < v_shock - tol:
383 # Upstream of shock
384 return self.c_left
385 if v > v_shock + tol:
386 # Downstream of shock
387 return self.c_right
388 # Exactly at shock (rarely happens in practice)
389 return 0.5 * (self.c_left + self.c_right)
391 def satisfies_entropy(self) -> bool:
392 """
393 Check if shock satisfies Lax entropy condition.
395 The entropy condition ensures characteristics flow INTO the shock
396 from both sides, which is required for physical admissibility.
398 Returns
399 -------
400 satisfies : bool
401 True if shock satisfies entropy condition.
403 Raises
404 ------
405 RuntimeError
406 If shock velocity is None (should have been set in ``__post_init__``).
408 Notes
409 -----
410 The condition is: lambda(c_left) > shock_velocity > lambda(c_right),
411 where lambda(C) = flow / R(C) is the characteristic velocity.
413 Shocks that violate this condition are unphysical and should be
414 replaced by rarefaction waves.
415 """
416 if self.velocity is None:
417 msg = "Shock velocity should be set in __post_init__"
418 raise RuntimeError(msg)
419 return self.sorption.check_entropy_condition(self.c_left, self.c_right, self.velocity, self.flow)
422@dataclass
423class RarefactionWave(Wave):
424 """
425 Rarefaction (expansion fan) with smooth concentration gradient.
427 Rarefactions form when slower water follows faster water, creating an
428 expanding region where concentration varies smoothly. The solution is
429 self-similar: C = C(V/t).
431 Parameters
432 ----------
433 t_start : float
434 Formation time [days].
435 v_start : float
436 Formation position [m³].
437 flow : float
438 Flow rate [m³/day].
439 c_head : float
440 Concentration at leading edge (faster) [mass/volume].
441 c_tail : float
442 Concentration at trailing edge (slower) [mass/volume].
443 sorption : FreundlichSorption or ConstantRetardation
444 Sorption model (must be concentration-dependent).
445 is_active : bool, optional
446 Activity status. Default True.
448 Raises
449 ------
450 ValueError
451 If head velocity <= tail velocity (would be compression, not rarefaction).
453 Examples
454 --------
455 >>> sorption = FreundlichSorption(
456 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
457 ... )
458 >>> raref = RarefactionWave(
459 ... t_start=0.0,
460 ... v_start=0.0,
461 ... flow=100.0,
462 ... c_head=10.0,
463 ... c_tail=2.0,
464 ... sorption=sorption,
465 ... )
466 >>> raref.head_velocity() > raref.tail_velocity()
467 True
468 >>> raref.contains_point(v=150.0, t=20.0)
469 True
470 """
472 c_head: float
473 """Concentration at leading edge (faster) [mass/volume]."""
474 c_tail: float
475 """Concentration at trailing edge (slower) [mass/volume]."""
476 sorption: SorptionModel
477 """Sorption model (must be concentration-dependent)."""
479 def __post_init__(self):
480 """
481 Verify this is actually a rarefaction (head faster than tail).
483 Raises
484 ------
485 ValueError
486 If head velocity is not greater than tail velocity, indicating a
487 compression wave (shock) rather than a rarefaction.
488 """
489 v_head = self.head_velocity()
490 v_tail = self.tail_velocity()
492 if v_head <= v_tail:
493 msg = (
494 f"Not a rarefaction: head_velocity={v_head:.3f} <= tail_velocity={v_tail:.3f}. "
495 f"This would be a compression (shock) instead."
496 )
497 raise ValueError(msg)
499 def head_velocity(self) -> float:
500 """
501 Compute velocity of rarefaction head (leading edge).
503 Returns
504 -------
505 velocity : float
506 Head velocity [m³/day].
507 """
508 return float(self.flow / self.sorption.retardation(self.c_head))
510 def tail_velocity(self) -> float:
511 """
512 Compute velocity of rarefaction tail (trailing edge).
514 Returns
515 -------
516 velocity : float
517 Tail velocity [m³/day].
518 """
519 return float(self.flow / self.sorption.retardation(self.c_tail))
521 def head_position_at_time(self, t: float) -> float | None:
522 """
523 Compute position of rarefaction head at time t.
525 Parameters
526 ----------
527 t : float
528 Time [days].
530 Returns
531 -------
532 position : float or None
533 Head position [m³], or None if t < t_start or inactive.
534 """
535 if t < self.t_start or not self.is_active:
536 return None
537 return self.v_start + self.head_velocity() * (t - self.t_start)
539 def tail_position_at_time(self, t: float) -> float | None:
540 """
541 Compute position of rarefaction tail at time t.
543 Parameters
544 ----------
545 t : float
546 Time [days].
548 Returns
549 -------
550 position : float or None
551 Tail position [m³], or None if t < t_start or inactive.
552 """
553 if t < self.t_start or not self.is_active:
554 return None
555 return self.v_start + self.tail_velocity() * (t - self.t_start)
557 def position_at_time(self, t: float) -> float | None:
558 """
559 Return head position (leading edge of rarefaction).
561 This implements the abstract Wave method.
563 Parameters
564 ----------
565 t : float
566 Time [days].
568 Returns
569 -------
570 position : float or None
571 Head position [m³].
572 """
573 return self.head_position_at_time(t)
575 def contains_point(self, v: float, t: float) -> bool:
576 """
577 Check if point (v, t) is inside the rarefaction fan.
579 Parameters
580 ----------
581 v : float
582 Position [m³].
583 t : float
584 Time [days].
586 Returns
587 -------
588 contains : bool
589 True if point is between tail and head.
590 """
591 if t <= self.t_start or not self.is_active:
592 return False
594 v_head = self.head_position_at_time(t)
595 v_tail = self.tail_position_at_time(t)
597 if v_head is None or v_tail is None:
598 return False
600 return v_tail <= v <= v_head
602 def concentration_left(self) -> float:
603 """
604 Get upstream concentration (tail).
606 Returns
607 -------
608 c_left : float
609 Upstream concentration at the trailing edge [mass/volume].
610 """
611 return self.c_tail
613 def concentration_right(self) -> float:
614 """
615 Get downstream concentration (head).
617 Returns
618 -------
619 c_right : float
620 Downstream concentration at the leading edge [mass/volume].
621 """
622 return self.c_head
624 def concentration_at_point(self, v: float, t: float) -> float | None:
625 """
626 Compute concentration at point (v, t) via self-similar solution.
628 Within the rarefaction fan, concentration varies smoothly according to:
629 R(C) = flow * (t - t_start) / (v - v_start)
631 This is inverted to find C using the sorption model.
633 Parameters
634 ----------
635 v : float
636 Position [m³].
637 t : float
638 Time [days].
640 Returns
641 -------
642 concentration : float or None
643 Concentration if point is in rarefaction, None otherwise.
645 Notes
646 -----
647 The self-similar solution automatically maintains mass balance and
648 provides the exact analytical form of the concentration profile.
650 For ConstantRetardation, rarefactions don't form (all concentrations
651 travel at same speed), so this returns None.
653 Examples
654 --------
655 >>> sorption = FreundlichSorption(
656 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
657 ... )
658 >>> raref = RarefactionWave(0.0, 0.0, 100.0, 10.0, 2.0, sorption)
659 >>> c = raref.concentration_at_point(v=150.0, t=20.0)
660 >>> c is not None
661 True
662 >>> 2.0 <= c <= 10.0
663 True
664 """
665 # Special case: at origin of rarefaction (before contains_point check)
666 if abs(v - self.v_start) < EPSILON_POSITION and t >= self.t_start:
667 return self.c_tail
669 if not self.contains_point(v, t):
670 return None
672 # Self-similar solution: R(C) = flow*(t - t_start)/(v - v_start)
673 r_target = self.flow * (t - self.t_start) / (v - self.v_start)
675 if r_target <= 1.0:
676 return None # Unphysical
678 # Invert R to get C
679 # For ConstantRetardation, this would raise NotImplementedError
680 try:
681 c = self.sorption.concentration_from_retardation(r_target)
682 except NotImplementedError:
683 # ConstantRetardation case - rarefactions don't form
684 return None
686 # Verify C is in valid range [c_tail, c_head]
687 c_min = min(self.c_tail, self.c_head)
688 c_max = max(self.c_tail, self.c_head)
690 c_float = float(c)
691 if c_min <= c_float <= c_max:
692 return c_float
694 return None