Coverage for src / gwtransport / fronttracking / waves.py: 92%
131 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-04 18:51 +0000
1"""
2Wave Representation for Front Tracking.
4This module implements wave classes for representing characteristics, shocks,
5and rarefaction waves in the front tracking algorithm. Each wave type knows
6how to compute its position and concentration at any point in space-time.
8This file is part of gwtransport which is released under AGPL-3.0 license.
9See the ./LICENSE file or go to https://github.com/gwtransport/gwtransport/blob/main/LICENSE for full license details.
10"""
12from abc import ABC, abstractmethod
13from dataclasses import dataclass, field
15from gwtransport.fronttracking.math import ConstantRetardation, FreundlichSorption
17# Numerical tolerance constants
18EPSILON_POSITION = 1e-15 # Tolerance for checking if two positions are equal
21@dataclass
22class Wave(ABC):
23 """
24 Abstract base class for all wave types in front tracking.
26 All waves share common attributes and must implement methods for
27 computing position and concentration. Waves can be active or inactive
28 (deactivated waves are preserved for history but don't participate in
29 future interactions).
31 Parameters
32 ----------
33 t_start : float
34 Time when wave forms [days].
35 v_start : float
36 Position where wave forms [m³].
37 flow : float
38 Flow rate at formation time [m³/day].
39 is_active : bool, optional
40 Whether wave is currently active. Default True.
41 """
43 t_start: float
44 """Time when wave forms [days]."""
45 v_start: float
46 """Position where wave forms [m³]."""
47 flow: float
48 """Flow rate at formation time [m³/day]."""
49 is_active: bool = field(default=True, kw_only=True)
50 """Whether wave is currently active."""
52 @abstractmethod
53 def position_at_time(self, t: float) -> float | None:
54 """
55 Compute wave position at time t.
57 Parameters
58 ----------
59 t : float
60 Time [days].
62 Returns
63 -------
64 position : float or None
65 Position [m³], or None if t < t_start or wave is inactive.
66 """
68 @abstractmethod
69 def concentration_left(self) -> float:
70 """
71 Get concentration on left (upstream) side of wave.
73 Returns
74 -------
75 c_left : float
76 Upstream concentration [mass/volume].
77 """
79 @abstractmethod
80 def concentration_right(self) -> float:
81 """
82 Get concentration on right (downstream) side of wave.
84 Returns
85 -------
86 c_right : float
87 Downstream concentration [mass/volume].
88 """
90 @abstractmethod
91 def concentration_at_point(self, v: float, t: float) -> float | None:
92 """
93 Compute concentration at point (v, t) if wave controls it.
95 Parameters
96 ----------
97 v : float
98 Position [m³].
99 t : float
100 Time [days].
102 Returns
103 -------
104 concentration : float or None
105 Concentration [mass/volume] if wave controls this point, None otherwise.
106 """
109@dataclass
110class CharacteristicWave(Wave):
111 """
112 Characteristic line along which concentration is constant.
114 In smooth regions, concentration travels at velocity flow/R(C). Along
115 each characteristic line, the concentration value is constant. This is
116 the fundamental solution element for hyperbolic conservation laws.
118 Parameters
119 ----------
120 t_start : float
121 Formation time [days].
122 v_start : float
123 Starting position [m³].
124 flow : float
125 Flow rate [m³/day].
126 concentration : float
127 Constant concentration carried [mass/volume].
128 sorption : FreundlichSorption or ConstantRetardation
129 Sorption model determining velocity.
130 is_active : bool, optional
131 Activity status. Default True.
133 Examples
134 --------
135 >>> sorption = ConstantRetardation(retardation_factor=2.0)
136 >>> char = CharacteristicWave(
137 ... t_start=0.0, v_start=0.0, flow=100.0, concentration=5.0, sorption=sorption
138 ... )
139 >>> char.velocity()
140 50.0
141 >>> char.position_at_time(10.0)
142 500.0
143 """
145 concentration: float
146 """Constant concentration carried [mass/volume]."""
147 sorption: FreundlichSorption | ConstantRetardation
148 """Sorption model determining velocity."""
150 def velocity(self) -> float:
151 """
152 Compute characteristic velocity.
154 The velocity is v = flow / R(C), where R is the retardation factor.
156 Returns
157 -------
158 velocity : float
159 Characteristic velocity [m³/day].
160 """
161 return float(self.flow / self.sorption.retardation(self.concentration))
163 def position_at_time(self, t: float) -> float | None:
164 """
165 Compute position at time t.
167 Characteristics propagate linearly: V(t) = v_start + velocity*(t - t_start).
169 Parameters
170 ----------
171 t : float
172 Time [days].
174 Returns
175 -------
176 position : float or None
177 Position [m³], or None if t < t_start or inactive.
178 """
179 if t < self.t_start or not self.is_active:
180 return None
181 return self.v_start + self.velocity() * (t - self.t_start)
183 def concentration_left(self) -> float:
184 """Get upstream concentration (same as concentration for characteristics)."""
185 return self.concentration
187 def concentration_right(self) -> float:
188 """Get downstream concentration (same as concentration for characteristics)."""
189 return self.concentration
191 def concentration_at_point(self, v: float, t: float) -> float | None:
192 """
193 Get concentration if point is on this characteristic.
195 For practical purposes, we check if the characteristic has reached
196 position v by time t.
198 Parameters
199 ----------
200 v : float
201 Position [m³].
202 t : float
203 Time [days].
205 Returns
206 -------
207 concentration : float or None
208 Concentration if point is on characteristic, None otherwise.
210 Notes
211 -----
212 In practice, this method is used by higher-level algorithms to
213 determine which wave controls a given point. The exact point-on-line
214 check is handled by the solver.
215 """
216 v_at_t = self.position_at_time(t)
217 if v_at_t is None:
218 return None
220 # If characteristic has reached or passed this position
221 if v_at_t >= v:
222 return self.concentration
224 return None
227@dataclass
228class ShockWave(Wave):
229 """
230 Shock wave (discontinuity) with jump in concentration.
232 Shocks form when faster water overtakes slower water, creating a sharp
233 front. The shock velocity is determined by the Rankine-Hugoniot condition
234 to ensure mass conservation across the discontinuity.
236 Parameters
237 ----------
238 t_start : float
239 Formation time [days].
240 v_start : float
241 Formation position [m³].
242 flow : float
243 Flow rate [m³/day].
244 c_left : float
245 Concentration upstream (behind) shock [mass/volume].
246 c_right : float
247 Concentration downstream (ahead of) shock [mass/volume].
248 sorption : FreundlichSorption or ConstantRetardation
249 Sorption model.
250 is_active : bool, optional
251 Activity status. Default True.
252 velocity : float, optional
253 Shock velocity computed from Rankine-Hugoniot. Computed automatically
254 if not provided.
256 Examples
257 --------
258 >>> sorption = FreundlichSorption(
259 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
260 ... )
261 >>> shock = ShockWave(
262 ... t_start=0.0,
263 ... v_start=0.0,
264 ... flow=100.0,
265 ... c_left=10.0,
266 ... c_right=2.0,
267 ... sorption=sorption,
268 ... )
269 >>> shock.velocity > 0
270 True
271 >>> shock.satisfies_entropy()
272 True
273 """
275 c_left: float
276 """Concentration upstream (behind) shock [mass/volume]."""
277 c_right: float
278 """Concentration downstream (ahead of) shock [mass/volume]."""
279 sorption: FreundlichSorption | ConstantRetardation
280 """Sorption model."""
281 velocity: float | None = None
282 """Shock velocity computed from Rankine-Hugoniot [m³/day]."""
284 def __post_init__(self) -> None:
285 """Compute shock velocity from Rankine-Hugoniot condition."""
286 if self.velocity is None:
287 self.velocity = self.sorption.shock_velocity(self.c_left, self.c_right, self.flow)
289 def position_at_time(self, t: float) -> float | None:
290 """
291 Compute shock position at time t.
293 Shock propagates at constant velocity: V(t) = v_start + velocity*(t - t_start).
295 Parameters
296 ----------
297 t : float
298 Time [days].
300 Returns
301 -------
302 position : float or None
303 Position [m³], or None if t < t_start or inactive.
304 """
305 if t < self.t_start or not self.is_active:
306 return None
307 if self.velocity is None:
308 msg = "Shock velocity should be set in __post_init__"
309 raise RuntimeError(msg)
310 return self.v_start + self.velocity * (t - self.t_start)
312 def concentration_left(self) -> float:
313 """Get upstream concentration."""
314 return self.c_left
316 def concentration_right(self) -> float:
317 """Get downstream concentration."""
318 return self.c_right
320 def concentration_at_point(self, v: float, t: float) -> float | None:
321 """
322 Get concentration at point based on which side of shock.
324 Parameters
325 ----------
326 v : float
327 Position [m³].
328 t : float
329 Time [days].
331 Returns
332 -------
333 concentration : float or None
334 c_left if upstream of shock, c_right if downstream, None if shock hasn't formed yet.
336 Notes
337 -----
338 At the exact shock position, returns the average of left and right values.
339 This is a convention for the singular point; in practice, the shock is
340 infinitesimally thin.
341 """
342 v_shock = self.position_at_time(t)
343 if v_shock is None:
344 return None
346 # Tolerance for exact shock position
347 tol = 1e-15
349 if v < v_shock - tol:
350 # Upstream of shock
351 return self.c_left
352 if v > v_shock + tol:
353 # Downstream of shock
354 return self.c_right
355 # Exactly at shock (rarely happens in practice)
356 return 0.5 * (self.c_left + self.c_right)
358 def satisfies_entropy(self) -> bool:
359 """
360 Check if shock satisfies Lax entropy condition.
362 The entropy condition ensures characteristics flow INTO the shock
363 from both sides, which is required for physical admissibility.
365 Returns
366 -------
367 satisfies : bool
368 True if shock satisfies entropy condition.
370 Notes
371 -----
372 The condition is: lambda(c_left) > shock_velocity > lambda(c_right),
373 where lambda(C) = flow / R(C) is the characteristic velocity.
375 Shocks that violate this condition are unphysical and should be
376 replaced by rarefaction waves.
377 """
378 if self.velocity is None:
379 msg = "Shock velocity should be set in __post_init__"
380 raise RuntimeError(msg)
381 return self.sorption.check_entropy_condition(self.c_left, self.c_right, self.velocity, self.flow)
384@dataclass
385class RarefactionWave(Wave):
386 """
387 Rarefaction (expansion fan) with smooth concentration gradient.
389 Rarefactions form when slower water follows faster water, creating an
390 expanding region where concentration varies smoothly. The solution is
391 self-similar: C = C(V/t).
393 Parameters
394 ----------
395 t_start : float
396 Formation time [days].
397 v_start : float
398 Formation position [m³].
399 flow : float
400 Flow rate [m³/day].
401 c_head : float
402 Concentration at leading edge (faster) [mass/volume].
403 c_tail : float
404 Concentration at trailing edge (slower) [mass/volume].
405 sorption : FreundlichSorption or ConstantRetardation
406 Sorption model (must be concentration-dependent).
407 is_active : bool, optional
408 Activity status. Default True.
410 Raises
411 ------
412 ValueError
413 If head velocity <= tail velocity (would be compression, not rarefaction).
415 Examples
416 --------
417 >>> sorption = FreundlichSorption(
418 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
419 ... )
420 >>> raref = RarefactionWave(
421 ... t_start=0.0,
422 ... v_start=0.0,
423 ... flow=100.0,
424 ... c_head=10.0,
425 ... c_tail=2.0,
426 ... sorption=sorption,
427 ... )
428 >>> raref.head_velocity() > raref.tail_velocity()
429 True
430 >>> raref.contains_point(v=150.0, t=20.0)
431 True
432 """
434 c_head: float
435 """Concentration at leading edge (faster) [mass/volume]."""
436 c_tail: float
437 """Concentration at trailing edge (slower) [mass/volume]."""
438 sorption: FreundlichSorption | ConstantRetardation
439 """Sorption model (must be concentration-dependent)."""
441 def __post_init__(self):
442 """Verify this is actually a rarefaction (head faster than tail)."""
443 v_head = self.head_velocity()
444 v_tail = self.tail_velocity()
446 if v_head <= v_tail:
447 msg = (
448 f"Not a rarefaction: head_velocity={v_head:.3f} <= tail_velocity={v_tail:.3f}. "
449 f"This would be a compression (shock) instead."
450 )
451 raise ValueError(msg)
453 def head_velocity(self) -> float:
454 """
455 Compute velocity of rarefaction head (leading edge).
457 Returns
458 -------
459 velocity : float
460 Head velocity [m³/day].
461 """
462 return float(self.flow / self.sorption.retardation(self.c_head))
464 def tail_velocity(self) -> float:
465 """
466 Compute velocity of rarefaction tail (trailing edge).
468 Returns
469 -------
470 velocity : float
471 Tail velocity [m³/day].
472 """
473 return float(self.flow / self.sorption.retardation(self.c_tail))
475 def head_position_at_time(self, t: float) -> float | None:
476 """
477 Compute position of rarefaction head at time t.
479 Parameters
480 ----------
481 t : float
482 Time [days].
484 Returns
485 -------
486 position : float or None
487 Head position [m³], or None if t < t_start or inactive.
488 """
489 if t < self.t_start or not self.is_active:
490 return None
491 return self.v_start + self.head_velocity() * (t - self.t_start)
493 def tail_position_at_time(self, t: float) -> float | None:
494 """
495 Compute position of rarefaction tail at time t.
497 Parameters
498 ----------
499 t : float
500 Time [days].
502 Returns
503 -------
504 position : float or None
505 Tail position [m³], or None if t < t_start or inactive.
506 """
507 if t < self.t_start or not self.is_active:
508 return None
509 return self.v_start + self.tail_velocity() * (t - self.t_start)
511 def position_at_time(self, t: float) -> float | None:
512 """
513 Return head position (leading edge of rarefaction).
515 This implements the abstract Wave method.
517 Parameters
518 ----------
519 t : float
520 Time [days].
522 Returns
523 -------
524 position : float or None
525 Head position [m³].
526 """
527 return self.head_position_at_time(t)
529 def contains_point(self, v: float, t: float) -> bool:
530 """
531 Check if point (v, t) is inside the rarefaction fan.
533 Parameters
534 ----------
535 v : float
536 Position [m³].
537 t : float
538 Time [days].
540 Returns
541 -------
542 contains : bool
543 True if point is between tail and head.
544 """
545 if t <= self.t_start or not self.is_active:
546 return False
548 v_head = self.head_position_at_time(t)
549 v_tail = self.tail_position_at_time(t)
551 if v_head is None or v_tail is None:
552 return False
554 return v_tail <= v <= v_head
556 def concentration_left(self) -> float:
557 """Get upstream concentration (tail)."""
558 return self.c_tail
560 def concentration_right(self) -> float:
561 """Get downstream concentration (head)."""
562 return self.c_head
564 def concentration_at_point(self, v: float, t: float) -> float | None:
565 """
566 Compute concentration at point (v, t) via self-similar solution.
568 Within the rarefaction fan, concentration varies smoothly according to:
569 R(C) = flow * (t - t_start) / (v - v_start)
571 This is inverted to find C using the sorption model.
573 Parameters
574 ----------
575 v : float
576 Position [m³].
577 t : float
578 Time [days].
580 Returns
581 -------
582 concentration : float or None
583 Concentration if point is in rarefaction, None otherwise.
585 Notes
586 -----
587 The self-similar solution automatically maintains mass balance and
588 provides the exact analytical form of the concentration profile.
590 For ConstantRetardation, rarefactions don't form (all concentrations
591 travel at same speed), so this returns None.
593 Examples
594 --------
595 >>> sorption = FreundlichSorption(
596 ... k_f=0.01, n=2.0, bulk_density=1500.0, porosity=0.3
597 ... )
598 >>> raref = RarefactionWave(0.0, 0.0, 100.0, 10.0, 2.0, sorption)
599 >>> c = raref.concentration_at_point(v=150.0, t=20.0)
600 >>> c is not None
601 True
602 >>> 2.0 <= c <= 10.0
603 True
604 """
605 # Special case: at origin of rarefaction (before contains_point check)
606 if abs(v - self.v_start) < EPSILON_POSITION and t >= self.t_start:
607 return self.c_tail
609 if not self.contains_point(v, t):
610 return None
612 # Self-similar solution: R(C) = flow*(t - t_start)/(v - v_start)
613 r_target = self.flow * (t - self.t_start) / (v - self.v_start)
615 if r_target <= 1.0:
616 return None # Unphysical
618 # Invert R to get C
619 # For ConstantRetardation, this would raise NotImplementedError
620 try:
621 c = self.sorption.concentration_from_retardation(r_target)
622 except NotImplementedError:
623 # ConstantRetardation case - rarefactions don't form
624 return None
626 # Verify C is in valid range [c_tail, c_head]
627 c_min = min(self.c_tail, self.c_head)
628 c_max = max(self.c_tail, self.c_head)
630 c_float = float(c)
631 if c_min <= c_float <= c_max:
632 return c_float
634 return None