Coverage for src / gwtransport / fronttracking / events.py: 93%

147 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-27 06:32 +0000

1""" 

2Event Detection for Front Tracking. 

3 

4==================================== 

5 

6This module provides exact analytical event detection for the front tracking 

7algorithm. All intersections are computed using closed-form formulas with no 

8numerical iteration or tolerance-based checks. 

9 

10Events include: 

11- Characteristic-characteristic collisions 

12- Shock-shock collisions 

13- Shock-characteristic collisions 

14- Rarefaction boundary interactions 

15- Outlet crossings 

16 

17All calculations return exact floating-point results with machine precision. 

18""" 

19 

20from dataclasses import dataclass 

21from enum import Enum 

22 

23from gwtransport.fronttracking.math import characteristic_position, characteristic_velocity 

24from gwtransport.fronttracking.waves import CharacteristicWave, RarefactionWave, ShockWave 

25 

26# Numerical tolerance constants 

27EPSILON_VELOCITY = 1e-15 # Tolerance for checking if two velocities are equal (machine precision) 

28 

29 

30class EventType(Enum): 

31 """All possible event types in front tracking simulation.""" 

32 

33 CHAR_CHAR_COLLISION = "characteristic_collision" 

34 """Two characteristics intersect (will form shock).""" 

35 SHOCK_SHOCK_COLLISION = "shock_collision" 

36 """Two shocks collide (will merge).""" 

37 SHOCK_CHAR_COLLISION = "shock_characteristic_collision" 

38 """Shock catches or is caught by characteristic.""" 

39 RAREF_CHAR_COLLISION = "rarefaction_characteristic_collision" 

40 """Rarefaction boundary intersects with characteristic.""" 

41 SHOCK_RAREF_COLLISION = "shock_rarefaction_collision" 

42 """Shock intersects with rarefaction boundary.""" 

43 RAREF_RAREF_COLLISION = "rarefaction_rarefaction_collision" 

44 """Rarefaction boundary intersects with another rarefaction boundary.""" 

45 OUTLET_CROSSING = "outlet_crossing" 

46 """Wave crosses outlet boundary.""" 

47 INLET_CHANGE = "inlet_concentration_change" 

48 """Inlet concentration changes (creates new wave).""" 

49 FLOW_CHANGE = "flow_change" 

50 """Flow rate changes (all waves get new velocities).""" 

51 

52 

53@dataclass 

54class Event: 

55 """ 

56 Represents a single event in the simulation. 

57 

58 Events are ordered by time for processing in chronological order. 

59 

60 Parameters 

61 ---------- 

62 time : float 

63 Time when event occurs [days] 

64 event_type : EventType 

65 Type of event 

66 waves_involved : list 

67 List of wave objects involved in this event 

68 location : float 

69 Volumetric position where event occurs [m³] 

70 flow_new : float, optional 

71 New flow rate for FLOW_CHANGE events [m³/day] 

72 boundary_type : str, optional 

73 Which rarefaction boundary collided: ``'head'`` or ``'tail'``. 

74 Set for rarefaction collision events. 

75 

76 Examples 

77 -------- 

78 :: 

79 

80 event = Event( 

81 time=15.5, 

82 event_type=EventType.SHOCK_CHAR_COLLISION, 

83 waves_involved=[shock1, char1], 

84 location=250.0, 

85 ) 

86 print(f"Event at t={event.time}: {event.event_type.value}") 

87 """ 

88 

89 time: float 

90 event_type: EventType 

91 waves_involved: list # List[Wave] - can't type hint due to circular import 

92 location: float 

93 flow_new: float | None = None 

94 boundary_type: str | None = None 

95 

96 def __lt__(self, other): 

97 """Events ordered by time (for priority queue). 

98 

99 Returns 

100 ------- 

101 bool 

102 True if this event occurs before ``other``. 

103 """ 

104 return self.time < other.time 

105 

106 def __repr__(self): 

107 """Return string representation of Event. 

108 

109 Returns 

110 ------- 

111 str 

112 Human-readable summary of the event. 

113 """ 

114 return ( 

115 f"Event(t={self.time:.3f}, type={self.event_type.value}, " 

116 f"location={self.location:.3f}, n_waves={len(self.waves_involved)})" 

117 ) 

118 

119 

120def find_characteristic_intersection(char1, char2, t_current: float) -> tuple[float, float] | None: 

121 """ 

122 Find exact analytical intersection of two characteristics. 

123 

124 Solves the linear system: 

125 V1 = v1_start + vel1*(t - t1_start) 

126 V2 = v2_start + vel2*(t - t2_start) 

127 V1 = V2 

128 

129 This reduces to: 

130 t = (v2_start - v1_start - vel2*t2_start + vel1*t1_start) / (vel1 - vel2) 

131 

132 Parameters 

133 ---------- 

134 char1 : CharacteristicWave 

135 First characteristic 

136 char2 : CharacteristicWave 

137 Second characteristic 

138 t_current : float 

139 Current simulation time [days] 

140 

141 Returns 

142 ------- 

143 tuple of float or None 

144 (t_intersect, v_intersect) if intersection exists in future, None otherwise 

145 

146 Notes 

147 ----- 

148 Returns None if: 

149 - Characteristics are parallel (velocities equal within machine precision) 

150 - Intersection would occur in the past (t <= t_current) 

151 - Either characteristic is not yet active at intersection time 

152 

153 The algorithm uses exact floating-point arithmetic with no tolerance checks 

154 except for detecting parallel lines (``abs(vel1 - vel2) < 1e-15``). 

155 

156 Examples 

157 -------- 

158 :: 

159 

160 result = find_characteristic_intersection(char1, char2, t_current=10.0) 

161 if result: 

162 t_int, v_int = result 

163 print(f"Intersection at t={t_int:.6f}, V={v_int:.6f}") 

164 """ 

165 # Import here to avoid circular dependency 

166 

167 # Get velocities 

168 vel1 = characteristic_velocity(char1.concentration, char1.flow, char1.sorption) 

169 vel2 = characteristic_velocity(char2.concentration, char2.flow, char2.sorption) 

170 

171 # Check if parallel (within machine precision) 

172 if abs(vel1 - vel2) < EPSILON_VELOCITY: 

173 return None 

174 

175 # Both characteristics must be active at some common time 

176 t_both_active = max(char1.t_start, char2.t_start, t_current) 

177 

178 # Compute positions when both are active 

179 

180 v1 = characteristic_position( 

181 char1.concentration, char1.flow, char1.sorption, char1.t_start, char1.v_start, t_both_active 

182 ) 

183 v2 = characteristic_position( 

184 char2.concentration, char2.flow, char2.sorption, char2.t_start, char2.v_start, t_both_active 

185 ) 

186 

187 if v1 is None or v2 is None: 

188 return None 

189 

190 # Time until intersection from t_both_active 

191 # Solve: v1 + vel1*dt = v2 + vel2*dt 

192 dt = (v2 - v1) / (vel1 - vel2) 

193 

194 if dt <= 0: # Intersection in past or at current time 

195 return None 

196 

197 t_intersect = t_both_active + dt 

198 v_intersect = v1 + vel1 * dt 

199 

200 return (t_intersect, v_intersect) 

201 

202 

203def find_shock_shock_intersection(shock1, shock2, t_current: float) -> tuple[float, float] | None: 

204 """ 

205 Find exact analytical intersection of two shocks. 

206 

207 Similar to characteristic intersection but uses shock velocities from 

208 Rankine-Hugoniot condition. 

209 

210 Parameters 

211 ---------- 

212 shock1 : ShockWave 

213 First shock 

214 shock2 : ShockWave 

215 Second shock 

216 t_current : float 

217 Current simulation time [days] 

218 

219 Returns 

220 ------- 

221 tuple of float or None 

222 (t_intersect, v_intersect) if intersection exists in future, None otherwise 

223 

224 Notes 

225 ----- 

226 Shock velocities are constant (already computed from Rankine-Hugoniot), 

227 so this is a simple linear intersection problem. 

228 

229 Examples 

230 -------- 

231 :: 

232 

233 result = find_shock_shock_intersection(shock1, shock2, t_current=10.0) 

234 if result: 

235 t_int, v_int = result 

236 print(f"Shocks collide at t={t_int:.6f}, V={v_int:.6f}") 

237 """ 

238 vel1 = shock1.velocity 

239 vel2 = shock2.velocity 

240 

241 # Check if parallel 

242 if abs(vel1 - vel2) < EPSILON_VELOCITY: 

243 return None 

244 

245 t_both_active = max(shock1.t_start, shock2.t_start, t_current) 

246 

247 # Compute positions when both are active 

248 v1_ref = shock1.v_start + shock1.velocity * (t_both_active - shock1.t_start) 

249 v2_ref = shock2.v_start + shock2.velocity * (t_both_active - shock2.t_start) 

250 

251 if not shock1.is_active or not shock2.is_active: 

252 return None 

253 

254 # Time until intersection from t_both_active 

255 dt = (v2_ref - v1_ref) / (vel1 - vel2) 

256 

257 if dt <= 0: 

258 return None 

259 

260 t_intersect = t_both_active + dt 

261 v_intersect = v1_ref + vel1 * dt 

262 

263 return (t_intersect, v_intersect) 

264 

265 

266def find_shock_characteristic_intersection(shock, char, t_current: float) -> tuple[float, float] | None: 

267 """ 

268 Find exact analytical intersection of shock and characteristic. 

269 

270 Parameters 

271 ---------- 

272 shock : ShockWave 

273 Shock wave 

274 char : CharacteristicWave 

275 Characteristic wave 

276 t_current : float 

277 Current simulation time [days] 

278 

279 Returns 

280 ------- 

281 tuple of float or None 

282 (t_intersect, v_intersect) if intersection exists in future, None otherwise 

283 

284 Examples 

285 -------- 

286 :: 

287 

288 result = find_shock_characteristic_intersection(shock, char, t_current=10.0) 

289 if result: 

290 t_int, v_int = result 

291 print(f"Shock catches characteristic at t={t_int:.6f}, V={v_int:.6f}") 

292 """ 

293 vel_shock = shock.velocity 

294 vel_char = characteristic_velocity(char.concentration, char.flow, char.sorption) 

295 

296 # Check if parallel 

297 if abs(vel_shock - vel_char) < EPSILON_VELOCITY: 

298 return None 

299 

300 t_both_active = max(shock.t_start, char.t_start, t_current) 

301 

302 # Positions when both are active 

303 v_shock = shock.v_start + shock.velocity * (t_both_active - shock.t_start) 

304 

305 v_char = characteristic_position( 

306 char.concentration, char.flow, char.sorption, char.t_start, char.v_start, t_both_active 

307 ) 

308 

309 if v_char is None or not shock.is_active or not char.is_active: 

310 return None 

311 

312 # Time until intersection 

313 dt = (v_char - v_shock) / (vel_shock - vel_char) 

314 

315 if dt <= 0: 

316 return None 

317 

318 t_intersect = t_both_active + dt 

319 v_intersect = v_shock + vel_shock * dt 

320 

321 return (t_intersect, v_intersect) 

322 

323 

324def find_rarefaction_boundary_intersections(raref, other_wave, t_current: float) -> list[tuple[float, float, str]]: 

325 """ 

326 Find intersections of rarefaction head/tail with another wave. 

327 

328 A rarefaction has two boundaries (head and tail), each traveling at 

329 characteristic velocities. This function finds intersections of both 

330 boundaries with the given wave. 

331 

332 Parameters 

333 ---------- 

334 raref : RarefactionWave 

335 Rarefaction wave 

336 other_wave : Wave 

337 Any other wave (Characteristic, Shock, or Rarefaction) 

338 t_current : float 

339 Current simulation time [days] 

340 

341 Returns 

342 ------- 

343 list of tuple 

344 List of (t_intersect, v_intersect, boundary_type) where boundary_type 

345 is either 'head' or 'tail' 

346 

347 Notes 

348 ----- 

349 The head travels at velocity corresponding to c_head, and the tail at 

350 velocity corresponding to c_tail. Both are treated as characteristics 

351 for intersection calculation. 

352 

353 Examples 

354 -------- 

355 :: 

356 

357 intersections = find_rarefaction_boundary_intersections( 

358 raref, char, t_current=10.0 

359 ) 

360 for t, v, boundary in intersections: 

361 print(f"{boundary} intersects at t={t:.3f}, V={v:.3f}") 

362 """ 

363 # Import wave classes to avoid circular dependency 

364 

365 intersections = [] 

366 

367 # Create temporary characteristics for head and tail boundaries 

368 head_char = CharacteristicWave( 

369 t_start=raref.t_start, 

370 v_start=raref.v_start, 

371 flow=raref.flow, 

372 concentration=raref.c_head, 

373 sorption=raref.sorption, 

374 is_active=raref.is_active, 

375 ) 

376 

377 tail_char = CharacteristicWave( 

378 t_start=raref.t_start, 

379 v_start=raref.v_start, 

380 flow=raref.flow, 

381 concentration=raref.c_tail, 

382 sorption=raref.sorption, 

383 is_active=raref.is_active, 

384 ) 

385 

386 # Check intersections based on other wave type 

387 if isinstance(other_wave, CharacteristicWave): 

388 # Head intersection 

389 result = find_characteristic_intersection(head_char, other_wave, t_current) 

390 if result: 

391 intersections.append((result[0], result[1], "head")) 

392 

393 # Tail intersection 

394 result = find_characteristic_intersection(tail_char, other_wave, t_current) 

395 if result: 

396 intersections.append((result[0], result[1], "tail")) 

397 

398 elif isinstance(other_wave, ShockWave): 

399 # Head intersection 

400 result = find_shock_characteristic_intersection(other_wave, head_char, t_current) 

401 if result: 

402 intersections.append((result[0], result[1], "head")) 

403 

404 # Tail intersection 

405 result = find_shock_characteristic_intersection(other_wave, tail_char, t_current) 

406 if result: 

407 intersections.append((result[0], result[1], "tail")) 

408 

409 elif isinstance(other_wave, RarefactionWave): 

410 # Rarefaction-rarefaction intersections: treat all boundaries as 

411 # characteristics and reuse the analytical intersection helpers. 

412 

413 other_head_char = CharacteristicWave( 

414 t_start=other_wave.t_start, 

415 v_start=other_wave.v_start, 

416 flow=other_wave.flow, 

417 concentration=other_wave.c_head, 

418 sorption=other_wave.sorption, 

419 is_active=other_wave.is_active, 

420 ) 

421 

422 other_tail_char = CharacteristicWave( 

423 t_start=other_wave.t_start, 

424 v_start=other_wave.v_start, 

425 flow=other_wave.flow, 

426 concentration=other_wave.c_tail, 

427 sorption=other_wave.sorption, 

428 is_active=other_wave.is_active, 

429 ) 

430 

431 # head(head) and head(tail) 

432 result = find_characteristic_intersection(head_char, other_head_char, t_current) 

433 if result: 

434 intersections.append((result[0], result[1], "head")) 

435 

436 result = find_characteristic_intersection(head_char, other_tail_char, t_current) 

437 if result: 

438 intersections.append((result[0], result[1], "head")) 

439 

440 # tail(head) and tail(tail) 

441 result = find_characteristic_intersection(tail_char, other_head_char, t_current) 

442 if result: 

443 intersections.append((result[0], result[1], "tail")) 

444 

445 result = find_characteristic_intersection(tail_char, other_tail_char, t_current) 

446 if result: 

447 intersections.append((result[0], result[1], "tail")) 

448 

449 return intersections 

450 

451 

452def find_outlet_crossing(wave, v_outlet: float, t_current: float) -> float | None: 

453 """ 

454 Find exact analytical time when wave crosses outlet. 

455 

456 For characteristics and shocks, solves: 

457 v_start + velocity*(t - t_start) = v_outlet 

458 

459 For rarefactions, finds when head (leading edge) crosses. 

460 

461 Parameters 

462 ---------- 

463 wave : Wave 

464 Any wave type (Characteristic, Shock, or Rarefaction) 

465 v_outlet : float 

466 Outlet position [m³] 

467 t_current : float 

468 Current simulation time [days] 

469 

470 Returns 

471 ------- 

472 float or None 

473 Time when wave crosses outlet, or None if: 

474 - Wave already past outlet 

475 - Wave moving away from outlet 

476 - Wave not yet active 

477 

478 Notes 

479 ----- 

480 This function assumes waves always move in positive V direction (toward outlet). 

481 Negative velocities would indicate unphysical backward flow. 

482 

483 Examples 

484 -------- 

485 :: 

486 

487 t_cross = find_outlet_crossing(shock, v_outlet=500.0, t_current=10.0) 

488 if t_cross: 

489 print(f"Shock exits at t={t_cross:.3f} days") 

490 """ 

491 if not wave.is_active: 

492 return None 

493 

494 if isinstance(wave, CharacteristicWave): 

495 # Get current position (use wave start time if not yet active) 

496 

497 t_eval = max(t_current, wave.t_start) 

498 v_current = characteristic_position( 

499 wave.concentration, wave.flow, wave.sorption, wave.t_start, wave.v_start, t_eval 

500 ) 

501 

502 if v_current is None or v_current >= v_outlet: 

503 return None # Already past outlet 

504 

505 # Get velocity 

506 vel = characteristic_velocity(wave.concentration, wave.flow, wave.sorption) 

507 

508 if vel <= 0: 

509 return None # Moving backward (unphysical) 

510 

511 # Solve: v_current + vel*(t - t_eval) = v_outlet 

512 dt = (v_outlet - v_current) / vel 

513 return t_eval + dt 

514 

515 if isinstance(wave, ShockWave): 

516 if wave.velocity is None: 

517 return None 

518 # Current position (use wave start time if not yet active) 

519 t_eval = max(t_current, wave.t_start) 

520 v_current = wave.v_start + wave.velocity * (t_eval - wave.t_start) 

521 

522 if v_current >= v_outlet: 

523 return None # Already past outlet 

524 

525 if wave.velocity <= 0: 

526 return None # Moving backward (unphysical) 

527 

528 # Solve: v_current + velocity*(t - t_eval) = v_outlet 

529 dt = (v_outlet - v_current) / wave.velocity 

530 return t_eval + dt 

531 

532 if isinstance(wave, RarefactionWave): 

533 # Head crosses first (leading edge) 

534 t_eval = max(t_current, wave.t_start) 

535 vel_head = characteristic_velocity(wave.c_head, wave.flow, wave.sorption) 

536 

537 v_head = characteristic_position(wave.c_head, wave.flow, wave.sorption, wave.t_start, wave.v_start, t_eval) 

538 

539 if v_head is None or v_head >= v_outlet: 

540 return None 

541 

542 if vel_head <= 0: 

543 return None 

544 

545 dt = (v_outlet - v_head) / vel_head 

546 return t_eval + dt 

547 

548 return None