Coverage for src / gwtransport / fronttracking / events.py: 93%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-04 18:51 +0000

1""" 

2Event Detection for Front Tracking. 

3 

4==================================== 

5 

6This module provides exact analytical event detection for the front tracking 

7algorithm. All intersections are computed using closed-form formulas with no 

8numerical iteration or tolerance-based checks. 

9 

10Events include: 

11- Characteristic-characteristic collisions 

12- Shock-shock collisions 

13- Shock-characteristic collisions 

14- Rarefaction boundary interactions 

15- Outlet crossings 

16 

17All calculations return exact floating-point results with machine precision. 

18""" 

19 

20from dataclasses import dataclass 

21from enum import Enum 

22from typing import Optional 

23 

24from gwtransport.fronttracking.math import characteristic_position, characteristic_velocity 

25from gwtransport.fronttracking.waves import CharacteristicWave, RarefactionWave, ShockWave 

26 

27# Numerical tolerance constants 

28EPSILON_VELOCITY = 1e-15 # Tolerance for checking if two velocities are equal (machine precision) 

29 

30 

31class EventType(Enum): 

32 """All possible event types in front tracking simulation.""" 

33 

34 CHAR_CHAR_COLLISION = "characteristic_collision" 

35 """Two characteristics intersect (will form shock).""" 

36 SHOCK_SHOCK_COLLISION = "shock_collision" 

37 """Two shocks collide (will merge).""" 

38 SHOCK_CHAR_COLLISION = "shock_characteristic_collision" 

39 """Shock catches or is caught by characteristic.""" 

40 RAREF_CHAR_COLLISION = "rarefaction_characteristic_collision" 

41 """Rarefaction boundary intersects with characteristic.""" 

42 SHOCK_RAREF_COLLISION = "shock_rarefaction_collision" 

43 """Shock intersects with rarefaction boundary.""" 

44 RAREF_RAREF_COLLISION = "rarefaction_rarefaction_collision" 

45 """Rarefaction boundary intersects with another rarefaction boundary.""" 

46 OUTLET_CROSSING = "outlet_crossing" 

47 """Wave crosses outlet boundary.""" 

48 INLET_CHANGE = "inlet_concentration_change" 

49 """Inlet concentration changes (creates new wave).""" 

50 FLOW_CHANGE = "flow_change" 

51 """Flow rate changes (all waves get new velocities).""" 

52 

53 

54@dataclass 

55class Event: 

56 """ 

57 Represents a single event in the simulation. 

58 

59 Events are ordered by time for processing in chronological order. 

60 

61 Parameters 

62 ---------- 

63 time : float 

64 Time when event occurs [days] 

65 event_type : EventType 

66 Type of event 

67 waves_involved : list 

68 List of wave objects involved in this event 

69 location : float 

70 Volumetric position where event occurs [m³] 

71 flow_new : float, optional 

72 New flow rate for FLOW_CHANGE events [m³/day] 

73 boundary_type : str, optional 

74 Which rarefaction boundary collided: ``'head'`` or ``'tail'``. 

75 Set for rarefaction collision events. 

76 

77 Examples 

78 -------- 

79 :: 

80 

81 event = Event( 

82 time=15.5, 

83 event_type=EventType.SHOCK_CHAR_COLLISION, 

84 waves_involved=[shock1, char1], 

85 location=250.0, 

86 ) 

87 print(f"Event at t={event.time}: {event.event_type.value}") 

88 """ 

89 

90 time: float 

91 event_type: EventType 

92 waves_involved: list # List[Wave] - can't type hint due to circular import 

93 location: float 

94 flow_new: Optional[float] = None 

95 boundary_type: Optional[str] = None 

96 

97 def __lt__(self, other): 

98 """Events ordered by time (for priority queue).""" 

99 return self.time < other.time 

100 

101 def __repr__(self): 

102 """Return string representation of Event.""" 

103 return ( 

104 f"Event(t={self.time:.3f}, type={self.event_type.value}, " 

105 f"location={self.location:.3f}, n_waves={len(self.waves_involved)})" 

106 ) 

107 

108 

109def find_characteristic_intersection(char1, char2, t_current: float) -> Optional[tuple[float, float]]: 

110 """ 

111 Find exact analytical intersection of two characteristics. 

112 

113 Solves the linear system: 

114 V1 = v1_start + vel1*(t - t1_start) 

115 V2 = v2_start + vel2*(t - t2_start) 

116 V1 = V2 

117 

118 This reduces to: 

119 t = (v2_start - v1_start - vel2*t2_start + vel1*t1_start) / (vel1 - vel2) 

120 

121 Parameters 

122 ---------- 

123 char1 : CharacteristicWave 

124 First characteristic 

125 char2 : CharacteristicWave 

126 Second characteristic 

127 t_current : float 

128 Current simulation time [days] 

129 

130 Returns 

131 ------- 

132 tuple of float or None 

133 (t_intersect, v_intersect) if intersection exists in future, None otherwise 

134 

135 Notes 

136 ----- 

137 Returns None if: 

138 - Characteristics are parallel (velocities equal within machine precision) 

139 - Intersection would occur in the past (t <= t_current) 

140 - Either characteristic is not yet active at intersection time 

141 

142 The algorithm uses exact floating-point arithmetic with no tolerance checks 

143 except for detecting parallel lines (``abs(vel1 - vel2) < 1e-15``). 

144 

145 Examples 

146 -------- 

147 :: 

148 

149 result = find_characteristic_intersection(char1, char2, t_current=10.0) 

150 if result: 

151 t_int, v_int = result 

152 print(f"Intersection at t={t_int:.6f}, V={v_int:.6f}") 

153 """ 

154 # Import here to avoid circular dependency 

155 

156 # Get velocities 

157 vel1 = characteristic_velocity(char1.concentration, char1.flow, char1.sorption) 

158 vel2 = characteristic_velocity(char2.concentration, char2.flow, char2.sorption) 

159 

160 # Check if parallel (within machine precision) 

161 if abs(vel1 - vel2) < EPSILON_VELOCITY: 

162 return None 

163 

164 # Both characteristics must be active at some common time 

165 t_both_active = max(char1.t_start, char2.t_start, t_current) 

166 

167 # Compute positions when both are active 

168 

169 v1 = characteristic_position( 

170 char1.concentration, char1.flow, char1.sorption, char1.t_start, char1.v_start, t_both_active 

171 ) 

172 v2 = characteristic_position( 

173 char2.concentration, char2.flow, char2.sorption, char2.t_start, char2.v_start, t_both_active 

174 ) 

175 

176 if v1 is None or v2 is None: 

177 return None 

178 

179 # Time until intersection from t_both_active 

180 # Solve: v1 + vel1*dt = v2 + vel2*dt 

181 dt = (v2 - v1) / (vel1 - vel2) 

182 

183 if dt <= 0: # Intersection in past or at current time 

184 return None 

185 

186 t_intersect = t_both_active + dt 

187 v_intersect = v1 + vel1 * dt 

188 

189 return (t_intersect, v_intersect) 

190 

191 

192def find_shock_shock_intersection(shock1, shock2, t_current: float) -> Optional[tuple[float, float]]: 

193 """ 

194 Find exact analytical intersection of two shocks. 

195 

196 Similar to characteristic intersection but uses shock velocities from 

197 Rankine-Hugoniot condition. 

198 

199 Parameters 

200 ---------- 

201 shock1 : ShockWave 

202 First shock 

203 shock2 : ShockWave 

204 Second shock 

205 t_current : float 

206 Current simulation time [days] 

207 

208 Returns 

209 ------- 

210 tuple of float or None 

211 (t_intersect, v_intersect) if intersection exists in future, None otherwise 

212 

213 Notes 

214 ----- 

215 Shock velocities are constant (already computed from Rankine-Hugoniot), 

216 so this is a simple linear intersection problem. 

217 

218 Examples 

219 -------- 

220 :: 

221 

222 result = find_shock_shock_intersection(shock1, shock2, t_current=10.0) 

223 if result: 

224 t_int, v_int = result 

225 print(f"Shocks collide at t={t_int:.6f}, V={v_int:.6f}") 

226 """ 

227 vel1 = shock1.velocity 

228 vel2 = shock2.velocity 

229 

230 # Check if parallel 

231 if abs(vel1 - vel2) < EPSILON_VELOCITY: 

232 return None 

233 

234 t_both_active = max(shock1.t_start, shock2.t_start, t_current) 

235 

236 # Compute positions when both are active 

237 v1_ref = shock1.v_start + shock1.velocity * (t_both_active - shock1.t_start) 

238 v2_ref = shock2.v_start + shock2.velocity * (t_both_active - shock2.t_start) 

239 

240 if not shock1.is_active or not shock2.is_active: 

241 return None 

242 

243 # Time until intersection from t_both_active 

244 dt = (v2_ref - v1_ref) / (vel1 - vel2) 

245 

246 if dt <= 0: 

247 return None 

248 

249 t_intersect = t_both_active + dt 

250 v_intersect = v1_ref + vel1 * dt 

251 

252 return (t_intersect, v_intersect) 

253 

254 

255def find_shock_characteristic_intersection(shock, char, t_current: float) -> Optional[tuple[float, float]]: 

256 """ 

257 Find exact analytical intersection of shock and characteristic. 

258 

259 Parameters 

260 ---------- 

261 shock : ShockWave 

262 Shock wave 

263 char : CharacteristicWave 

264 Characteristic wave 

265 t_current : float 

266 Current simulation time [days] 

267 

268 Returns 

269 ------- 

270 tuple of float or None 

271 (t_intersect, v_intersect) if intersection exists in future, None otherwise 

272 

273 Examples 

274 -------- 

275 :: 

276 

277 result = find_shock_characteristic_intersection(shock, char, t_current=10.0) 

278 if result: 

279 t_int, v_int = result 

280 print(f"Shock catches characteristic at t={t_int:.6f}, V={v_int:.6f}") 

281 """ 

282 vel_shock = shock.velocity 

283 vel_char = characteristic_velocity(char.concentration, char.flow, char.sorption) 

284 

285 # Check if parallel 

286 if abs(vel_shock - vel_char) < EPSILON_VELOCITY: 

287 return None 

288 

289 t_both_active = max(shock.t_start, char.t_start, t_current) 

290 

291 # Positions when both are active 

292 v_shock = shock.v_start + shock.velocity * (t_both_active - shock.t_start) 

293 

294 v_char = characteristic_position( 

295 char.concentration, char.flow, char.sorption, char.t_start, char.v_start, t_both_active 

296 ) 

297 

298 if v_char is None or not shock.is_active or not char.is_active: 

299 return None 

300 

301 # Time until intersection 

302 dt = (v_char - v_shock) / (vel_shock - vel_char) 

303 

304 if dt <= 0: 

305 return None 

306 

307 t_intersect = t_both_active + dt 

308 v_intersect = v_shock + vel_shock * dt 

309 

310 return (t_intersect, v_intersect) 

311 

312 

313def find_rarefaction_boundary_intersections(raref, other_wave, t_current: float) -> list[tuple[float, float, str]]: 

314 """ 

315 Find intersections of rarefaction head/tail with another wave. 

316 

317 A rarefaction has two boundaries (head and tail), each traveling at 

318 characteristic velocities. This function finds intersections of both 

319 boundaries with the given wave. 

320 

321 Parameters 

322 ---------- 

323 raref : RarefactionWave 

324 Rarefaction wave 

325 other_wave : Wave 

326 Any other wave (Characteristic, Shock, or Rarefaction) 

327 t_current : float 

328 Current simulation time [days] 

329 

330 Returns 

331 ------- 

332 list of tuple 

333 List of (t_intersect, v_intersect, boundary_type) where boundary_type 

334 is either 'head' or 'tail' 

335 

336 Notes 

337 ----- 

338 The head travels at velocity corresponding to c_head, and the tail at 

339 velocity corresponding to c_tail. Both are treated as characteristics 

340 for intersection calculation. 

341 

342 Examples 

343 -------- 

344 :: 

345 

346 intersections = find_rarefaction_boundary_intersections( 

347 raref, char, t_current=10.0 

348 ) 

349 for t, v, boundary in intersections: 

350 print(f"{boundary} intersects at t={t:.3f}, V={v:.3f}") 

351 """ 

352 # Import wave classes to avoid circular dependency 

353 

354 intersections = [] 

355 

356 # Create temporary characteristics for head and tail boundaries 

357 head_char = CharacteristicWave( 

358 t_start=raref.t_start, 

359 v_start=raref.v_start, 

360 flow=raref.flow, 

361 concentration=raref.c_head, 

362 sorption=raref.sorption, 

363 is_active=raref.is_active, 

364 ) 

365 

366 tail_char = CharacteristicWave( 

367 t_start=raref.t_start, 

368 v_start=raref.v_start, 

369 flow=raref.flow, 

370 concentration=raref.c_tail, 

371 sorption=raref.sorption, 

372 is_active=raref.is_active, 

373 ) 

374 

375 # Check intersections based on other wave type 

376 if isinstance(other_wave, CharacteristicWave): 

377 # Head intersection 

378 result = find_characteristic_intersection(head_char, other_wave, t_current) 

379 if result: 

380 intersections.append((result[0], result[1], "head")) 

381 

382 # Tail intersection 

383 result = find_characteristic_intersection(tail_char, other_wave, t_current) 

384 if result: 

385 intersections.append((result[0], result[1], "tail")) 

386 

387 elif isinstance(other_wave, ShockWave): 

388 # Head intersection 

389 result = find_shock_characteristic_intersection(other_wave, head_char, t_current) 

390 if result: 

391 intersections.append((result[0], result[1], "head")) 

392 

393 # Tail intersection 

394 result = find_shock_characteristic_intersection(other_wave, tail_char, t_current) 

395 if result: 

396 intersections.append((result[0], result[1], "tail")) 

397 

398 elif isinstance(other_wave, RarefactionWave): 

399 # Rarefaction-rarefaction intersections: treat all boundaries as 

400 # characteristics and reuse the analytical intersection helpers. 

401 

402 other_head_char = CharacteristicWave( 

403 t_start=other_wave.t_start, 

404 v_start=other_wave.v_start, 

405 flow=other_wave.flow, 

406 concentration=other_wave.c_head, 

407 sorption=other_wave.sorption, 

408 is_active=other_wave.is_active, 

409 ) 

410 

411 other_tail_char = CharacteristicWave( 

412 t_start=other_wave.t_start, 

413 v_start=other_wave.v_start, 

414 flow=other_wave.flow, 

415 concentration=other_wave.c_tail, 

416 sorption=other_wave.sorption, 

417 is_active=other_wave.is_active, 

418 ) 

419 

420 # head(head) and head(tail) 

421 result = find_characteristic_intersection(head_char, other_head_char, t_current) 

422 if result: 

423 intersections.append((result[0], result[1], "head")) 

424 

425 result = find_characteristic_intersection(head_char, other_tail_char, t_current) 

426 if result: 

427 intersections.append((result[0], result[1], "head")) 

428 

429 # tail(head) and tail(tail) 

430 result = find_characteristic_intersection(tail_char, other_head_char, t_current) 

431 if result: 

432 intersections.append((result[0], result[1], "tail")) 

433 

434 result = find_characteristic_intersection(tail_char, other_tail_char, t_current) 

435 if result: 

436 intersections.append((result[0], result[1], "tail")) 

437 

438 return intersections 

439 

440 

441def find_outlet_crossing(wave, v_outlet: float, t_current: float) -> Optional[float]: 

442 """ 

443 Find exact analytical time when wave crosses outlet. 

444 

445 For characteristics and shocks, solves: 

446 v_start + velocity*(t - t_start) = v_outlet 

447 

448 For rarefactions, finds when head (leading edge) crosses. 

449 

450 Parameters 

451 ---------- 

452 wave : Wave 

453 Any wave type (Characteristic, Shock, or Rarefaction) 

454 v_outlet : float 

455 Outlet position [m³] 

456 t_current : float 

457 Current simulation time [days] 

458 

459 Returns 

460 ------- 

461 float or None 

462 Time when wave crosses outlet, or None if: 

463 - Wave already past outlet 

464 - Wave moving away from outlet 

465 - Wave not yet active 

466 

467 Notes 

468 ----- 

469 This function assumes waves always move in positive V direction (toward outlet). 

470 Negative velocities would indicate unphysical backward flow. 

471 

472 Examples 

473 -------- 

474 :: 

475 

476 t_cross = find_outlet_crossing(shock, v_outlet=500.0, t_current=10.0) 

477 if t_cross: 

478 print(f"Shock exits at t={t_cross:.3f} days") 

479 """ 

480 if not wave.is_active: 

481 return None 

482 

483 if isinstance(wave, CharacteristicWave): 

484 # Get current position (use wave start time if not yet active) 

485 

486 t_eval = max(t_current, wave.t_start) 

487 v_current = characteristic_position( 

488 wave.concentration, wave.flow, wave.sorption, wave.t_start, wave.v_start, t_eval 

489 ) 

490 

491 if v_current is None or v_current >= v_outlet: 

492 return None # Already past outlet 

493 

494 # Get velocity 

495 vel = characteristic_velocity(wave.concentration, wave.flow, wave.sorption) 

496 

497 if vel <= 0: 

498 return None # Moving backward (unphysical) 

499 

500 # Solve: v_current + vel*(t - t_eval) = v_outlet 

501 dt = (v_outlet - v_current) / vel 

502 return t_eval + dt 

503 

504 if isinstance(wave, ShockWave): 

505 # Current position (use wave start time if not yet active) 

506 t_eval = max(t_current, wave.t_start) 

507 v_current = wave.v_start + wave.velocity * (t_eval - wave.t_start) 

508 

509 if v_current >= v_outlet: 

510 return None # Already past outlet 

511 

512 if wave.velocity <= 0: 

513 return None # Moving backward (unphysical) 

514 

515 # Solve: v_current + velocity*(t - t_eval) = v_outlet 

516 dt = (v_outlet - v_current) / wave.velocity 

517 return t_eval + dt 

518 

519 if isinstance(wave, RarefactionWave): 

520 # Head crosses first (leading edge) 

521 t_eval = max(t_current, wave.t_start) 

522 vel_head = characteristic_velocity(wave.c_head, wave.flow, wave.sorption) 

523 

524 v_head = characteristic_position(wave.c_head, wave.flow, wave.sorption, wave.t_start, wave.v_start, t_eval) 

525 

526 if v_head is None or v_head >= v_outlet: 

527 return None 

528 

529 if vel_head <= 0: 

530 return None 

531 

532 dt = (v_outlet - v_head) / vel_head 

533 return t_eval + dt 

534 

535 return None