Coverage for src / gwtransport / fronttracking / events.py: 93%
147 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-27 06:32 +0000
1"""
2Event Detection for Front Tracking.
4====================================
6This module provides exact analytical event detection for the front tracking
7algorithm. All intersections are computed using closed-form formulas with no
8numerical iteration or tolerance-based checks.
10Events include:
11- Characteristic-characteristic collisions
12- Shock-shock collisions
13- Shock-characteristic collisions
14- Rarefaction boundary interactions
15- Outlet crossings
17All calculations return exact floating-point results with machine precision.
18"""
20from dataclasses import dataclass
21from enum import Enum
23from gwtransport.fronttracking.math import characteristic_position, characteristic_velocity
24from gwtransport.fronttracking.waves import CharacteristicWave, RarefactionWave, ShockWave
26# Numerical tolerance constants
27EPSILON_VELOCITY = 1e-15 # Tolerance for checking if two velocities are equal (machine precision)
30class EventType(Enum):
31 """All possible event types in front tracking simulation."""
33 CHAR_CHAR_COLLISION = "characteristic_collision"
34 """Two characteristics intersect (will form shock)."""
35 SHOCK_SHOCK_COLLISION = "shock_collision"
36 """Two shocks collide (will merge)."""
37 SHOCK_CHAR_COLLISION = "shock_characteristic_collision"
38 """Shock catches or is caught by characteristic."""
39 RAREF_CHAR_COLLISION = "rarefaction_characteristic_collision"
40 """Rarefaction boundary intersects with characteristic."""
41 SHOCK_RAREF_COLLISION = "shock_rarefaction_collision"
42 """Shock intersects with rarefaction boundary."""
43 RAREF_RAREF_COLLISION = "rarefaction_rarefaction_collision"
44 """Rarefaction boundary intersects with another rarefaction boundary."""
45 OUTLET_CROSSING = "outlet_crossing"
46 """Wave crosses outlet boundary."""
47 INLET_CHANGE = "inlet_concentration_change"
48 """Inlet concentration changes (creates new wave)."""
49 FLOW_CHANGE = "flow_change"
50 """Flow rate changes (all waves get new velocities)."""
53@dataclass
54class Event:
55 """
56 Represents a single event in the simulation.
58 Events are ordered by time for processing in chronological order.
60 Parameters
61 ----------
62 time : float
63 Time when event occurs [days]
64 event_type : EventType
65 Type of event
66 waves_involved : list
67 List of wave objects involved in this event
68 location : float
69 Volumetric position where event occurs [m³]
70 flow_new : float, optional
71 New flow rate for FLOW_CHANGE events [m³/day]
72 boundary_type : str, optional
73 Which rarefaction boundary collided: ``'head'`` or ``'tail'``.
74 Set for rarefaction collision events.
76 Examples
77 --------
78 ::
80 event = Event(
81 time=15.5,
82 event_type=EventType.SHOCK_CHAR_COLLISION,
83 waves_involved=[shock1, char1],
84 location=250.0,
85 )
86 print(f"Event at t={event.time}: {event.event_type.value}")
87 """
89 time: float
90 event_type: EventType
91 waves_involved: list # List[Wave] - can't type hint due to circular import
92 location: float
93 flow_new: float | None = None
94 boundary_type: str | None = None
96 def __lt__(self, other):
97 """Events ordered by time (for priority queue).
99 Returns
100 -------
101 bool
102 True if this event occurs before ``other``.
103 """
104 return self.time < other.time
106 def __repr__(self):
107 """Return string representation of Event.
109 Returns
110 -------
111 str
112 Human-readable summary of the event.
113 """
114 return (
115 f"Event(t={self.time:.3f}, type={self.event_type.value}, "
116 f"location={self.location:.3f}, n_waves={len(self.waves_involved)})"
117 )
120def find_characteristic_intersection(char1, char2, t_current: float) -> tuple[float, float] | None:
121 """
122 Find exact analytical intersection of two characteristics.
124 Solves the linear system:
125 V1 = v1_start + vel1*(t - t1_start)
126 V2 = v2_start + vel2*(t - t2_start)
127 V1 = V2
129 This reduces to:
130 t = (v2_start - v1_start - vel2*t2_start + vel1*t1_start) / (vel1 - vel2)
132 Parameters
133 ----------
134 char1 : CharacteristicWave
135 First characteristic
136 char2 : CharacteristicWave
137 Second characteristic
138 t_current : float
139 Current simulation time [days]
141 Returns
142 -------
143 tuple of float or None
144 (t_intersect, v_intersect) if intersection exists in future, None otherwise
146 Notes
147 -----
148 Returns None if:
149 - Characteristics are parallel (velocities equal within machine precision)
150 - Intersection would occur in the past (t <= t_current)
151 - Either characteristic is not yet active at intersection time
153 The algorithm uses exact floating-point arithmetic with no tolerance checks
154 except for detecting parallel lines (``abs(vel1 - vel2) < 1e-15``).
156 Examples
157 --------
158 ::
160 result = find_characteristic_intersection(char1, char2, t_current=10.0)
161 if result:
162 t_int, v_int = result
163 print(f"Intersection at t={t_int:.6f}, V={v_int:.6f}")
164 """
165 # Import here to avoid circular dependency
167 # Get velocities
168 vel1 = characteristic_velocity(char1.concentration, char1.flow, char1.sorption)
169 vel2 = characteristic_velocity(char2.concentration, char2.flow, char2.sorption)
171 # Check if parallel (within machine precision)
172 if abs(vel1 - vel2) < EPSILON_VELOCITY:
173 return None
175 # Both characteristics must be active at some common time
176 t_both_active = max(char1.t_start, char2.t_start, t_current)
178 # Compute positions when both are active
180 v1 = characteristic_position(
181 char1.concentration, char1.flow, char1.sorption, char1.t_start, char1.v_start, t_both_active
182 )
183 v2 = characteristic_position(
184 char2.concentration, char2.flow, char2.sorption, char2.t_start, char2.v_start, t_both_active
185 )
187 if v1 is None or v2 is None:
188 return None
190 # Time until intersection from t_both_active
191 # Solve: v1 + vel1*dt = v2 + vel2*dt
192 dt = (v2 - v1) / (vel1 - vel2)
194 if dt <= 0: # Intersection in past or at current time
195 return None
197 t_intersect = t_both_active + dt
198 v_intersect = v1 + vel1 * dt
200 return (t_intersect, v_intersect)
203def find_shock_shock_intersection(shock1, shock2, t_current: float) -> tuple[float, float] | None:
204 """
205 Find exact analytical intersection of two shocks.
207 Similar to characteristic intersection but uses shock velocities from
208 Rankine-Hugoniot condition.
210 Parameters
211 ----------
212 shock1 : ShockWave
213 First shock
214 shock2 : ShockWave
215 Second shock
216 t_current : float
217 Current simulation time [days]
219 Returns
220 -------
221 tuple of float or None
222 (t_intersect, v_intersect) if intersection exists in future, None otherwise
224 Notes
225 -----
226 Shock velocities are constant (already computed from Rankine-Hugoniot),
227 so this is a simple linear intersection problem.
229 Examples
230 --------
231 ::
233 result = find_shock_shock_intersection(shock1, shock2, t_current=10.0)
234 if result:
235 t_int, v_int = result
236 print(f"Shocks collide at t={t_int:.6f}, V={v_int:.6f}")
237 """
238 vel1 = shock1.velocity
239 vel2 = shock2.velocity
241 # Check if parallel
242 if abs(vel1 - vel2) < EPSILON_VELOCITY:
243 return None
245 t_both_active = max(shock1.t_start, shock2.t_start, t_current)
247 # Compute positions when both are active
248 v1_ref = shock1.v_start + shock1.velocity * (t_both_active - shock1.t_start)
249 v2_ref = shock2.v_start + shock2.velocity * (t_both_active - shock2.t_start)
251 if not shock1.is_active or not shock2.is_active:
252 return None
254 # Time until intersection from t_both_active
255 dt = (v2_ref - v1_ref) / (vel1 - vel2)
257 if dt <= 0:
258 return None
260 t_intersect = t_both_active + dt
261 v_intersect = v1_ref + vel1 * dt
263 return (t_intersect, v_intersect)
266def find_shock_characteristic_intersection(shock, char, t_current: float) -> tuple[float, float] | None:
267 """
268 Find exact analytical intersection of shock and characteristic.
270 Parameters
271 ----------
272 shock : ShockWave
273 Shock wave
274 char : CharacteristicWave
275 Characteristic wave
276 t_current : float
277 Current simulation time [days]
279 Returns
280 -------
281 tuple of float or None
282 (t_intersect, v_intersect) if intersection exists in future, None otherwise
284 Examples
285 --------
286 ::
288 result = find_shock_characteristic_intersection(shock, char, t_current=10.0)
289 if result:
290 t_int, v_int = result
291 print(f"Shock catches characteristic at t={t_int:.6f}, V={v_int:.6f}")
292 """
293 vel_shock = shock.velocity
294 vel_char = characteristic_velocity(char.concentration, char.flow, char.sorption)
296 # Check if parallel
297 if abs(vel_shock - vel_char) < EPSILON_VELOCITY:
298 return None
300 t_both_active = max(shock.t_start, char.t_start, t_current)
302 # Positions when both are active
303 v_shock = shock.v_start + shock.velocity * (t_both_active - shock.t_start)
305 v_char = characteristic_position(
306 char.concentration, char.flow, char.sorption, char.t_start, char.v_start, t_both_active
307 )
309 if v_char is None or not shock.is_active or not char.is_active:
310 return None
312 # Time until intersection
313 dt = (v_char - v_shock) / (vel_shock - vel_char)
315 if dt <= 0:
316 return None
318 t_intersect = t_both_active + dt
319 v_intersect = v_shock + vel_shock * dt
321 return (t_intersect, v_intersect)
324def find_rarefaction_boundary_intersections(raref, other_wave, t_current: float) -> list[tuple[float, float, str]]:
325 """
326 Find intersections of rarefaction head/tail with another wave.
328 A rarefaction has two boundaries (head and tail), each traveling at
329 characteristic velocities. This function finds intersections of both
330 boundaries with the given wave.
332 Parameters
333 ----------
334 raref : RarefactionWave
335 Rarefaction wave
336 other_wave : Wave
337 Any other wave (Characteristic, Shock, or Rarefaction)
338 t_current : float
339 Current simulation time [days]
341 Returns
342 -------
343 list of tuple
344 List of (t_intersect, v_intersect, boundary_type) where boundary_type
345 is either 'head' or 'tail'
347 Notes
348 -----
349 The head travels at velocity corresponding to c_head, and the tail at
350 velocity corresponding to c_tail. Both are treated as characteristics
351 for intersection calculation.
353 Examples
354 --------
355 ::
357 intersections = find_rarefaction_boundary_intersections(
358 raref, char, t_current=10.0
359 )
360 for t, v, boundary in intersections:
361 print(f"{boundary} intersects at t={t:.3f}, V={v:.3f}")
362 """
363 # Import wave classes to avoid circular dependency
365 intersections = []
367 # Create temporary characteristics for head and tail boundaries
368 head_char = CharacteristicWave(
369 t_start=raref.t_start,
370 v_start=raref.v_start,
371 flow=raref.flow,
372 concentration=raref.c_head,
373 sorption=raref.sorption,
374 is_active=raref.is_active,
375 )
377 tail_char = CharacteristicWave(
378 t_start=raref.t_start,
379 v_start=raref.v_start,
380 flow=raref.flow,
381 concentration=raref.c_tail,
382 sorption=raref.sorption,
383 is_active=raref.is_active,
384 )
386 # Check intersections based on other wave type
387 if isinstance(other_wave, CharacteristicWave):
388 # Head intersection
389 result = find_characteristic_intersection(head_char, other_wave, t_current)
390 if result:
391 intersections.append((result[0], result[1], "head"))
393 # Tail intersection
394 result = find_characteristic_intersection(tail_char, other_wave, t_current)
395 if result:
396 intersections.append((result[0], result[1], "tail"))
398 elif isinstance(other_wave, ShockWave):
399 # Head intersection
400 result = find_shock_characteristic_intersection(other_wave, head_char, t_current)
401 if result:
402 intersections.append((result[0], result[1], "head"))
404 # Tail intersection
405 result = find_shock_characteristic_intersection(other_wave, tail_char, t_current)
406 if result:
407 intersections.append((result[0], result[1], "tail"))
409 elif isinstance(other_wave, RarefactionWave):
410 # Rarefaction-rarefaction intersections: treat all boundaries as
411 # characteristics and reuse the analytical intersection helpers.
413 other_head_char = CharacteristicWave(
414 t_start=other_wave.t_start,
415 v_start=other_wave.v_start,
416 flow=other_wave.flow,
417 concentration=other_wave.c_head,
418 sorption=other_wave.sorption,
419 is_active=other_wave.is_active,
420 )
422 other_tail_char = CharacteristicWave(
423 t_start=other_wave.t_start,
424 v_start=other_wave.v_start,
425 flow=other_wave.flow,
426 concentration=other_wave.c_tail,
427 sorption=other_wave.sorption,
428 is_active=other_wave.is_active,
429 )
431 # head(head) and head(tail)
432 result = find_characteristic_intersection(head_char, other_head_char, t_current)
433 if result:
434 intersections.append((result[0], result[1], "head"))
436 result = find_characteristic_intersection(head_char, other_tail_char, t_current)
437 if result:
438 intersections.append((result[0], result[1], "head"))
440 # tail(head) and tail(tail)
441 result = find_characteristic_intersection(tail_char, other_head_char, t_current)
442 if result:
443 intersections.append((result[0], result[1], "tail"))
445 result = find_characteristic_intersection(tail_char, other_tail_char, t_current)
446 if result:
447 intersections.append((result[0], result[1], "tail"))
449 return intersections
452def find_outlet_crossing(wave, v_outlet: float, t_current: float) -> float | None:
453 """
454 Find exact analytical time when wave crosses outlet.
456 For characteristics and shocks, solves:
457 v_start + velocity*(t - t_start) = v_outlet
459 For rarefactions, finds when head (leading edge) crosses.
461 Parameters
462 ----------
463 wave : Wave
464 Any wave type (Characteristic, Shock, or Rarefaction)
465 v_outlet : float
466 Outlet position [m³]
467 t_current : float
468 Current simulation time [days]
470 Returns
471 -------
472 float or None
473 Time when wave crosses outlet, or None if:
474 - Wave already past outlet
475 - Wave moving away from outlet
476 - Wave not yet active
478 Notes
479 -----
480 This function assumes waves always move in positive V direction (toward outlet).
481 Negative velocities would indicate unphysical backward flow.
483 Examples
484 --------
485 ::
487 t_cross = find_outlet_crossing(shock, v_outlet=500.0, t_current=10.0)
488 if t_cross:
489 print(f"Shock exits at t={t_cross:.3f} days")
490 """
491 if not wave.is_active:
492 return None
494 if isinstance(wave, CharacteristicWave):
495 # Get current position (use wave start time if not yet active)
497 t_eval = max(t_current, wave.t_start)
498 v_current = characteristic_position(
499 wave.concentration, wave.flow, wave.sorption, wave.t_start, wave.v_start, t_eval
500 )
502 if v_current is None or v_current >= v_outlet:
503 return None # Already past outlet
505 # Get velocity
506 vel = characteristic_velocity(wave.concentration, wave.flow, wave.sorption)
508 if vel <= 0:
509 return None # Moving backward (unphysical)
511 # Solve: v_current + vel*(t - t_eval) = v_outlet
512 dt = (v_outlet - v_current) / vel
513 return t_eval + dt
515 if isinstance(wave, ShockWave):
516 if wave.velocity is None:
517 return None
518 # Current position (use wave start time if not yet active)
519 t_eval = max(t_current, wave.t_start)
520 v_current = wave.v_start + wave.velocity * (t_eval - wave.t_start)
522 if v_current >= v_outlet:
523 return None # Already past outlet
525 if wave.velocity <= 0:
526 return None # Moving backward (unphysical)
528 # Solve: v_current + velocity*(t - t_eval) = v_outlet
529 dt = (v_outlet - v_current) / wave.velocity
530 return t_eval + dt
532 if isinstance(wave, RarefactionWave):
533 # Head crosses first (leading edge)
534 t_eval = max(t_current, wave.t_start)
535 vel_head = characteristic_velocity(wave.c_head, wave.flow, wave.sorption)
537 v_head = characteristic_position(wave.c_head, wave.flow, wave.sorption, wave.t_start, wave.v_start, t_eval)
539 if v_head is None or v_head >= v_outlet:
540 return None
542 if vel_head <= 0:
543 return None
545 dt = (v_outlet - v_head) / vel_head
546 return t_eval + dt
548 return None