Coverage for src/gwtransport/advection.py: 91%

113 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 20:01 +0000

1""" 

2Advection Analysis for 1D Aquifer Systems. 

3 

4This module provides functions to analyze compound transport by advection 

5in aquifer systems. It includes tools for computing concentrations of the extracted water 

6based on the concentration of the infiltrating water, extraction data and aquifer properties. 

7 

8The model assumes requires the groundwaterflow to be reduced to a 1D system. On one side, 

9water with a certain concentration infiltrates ('cin'), the water flows through the aquifer and 

10the compound of interest flows through the aquifer with a retarded velocity. The water is 

11extracted ('cout'). 

12 

13Main functions: 

14- infiltration_to_extraction: Compute the concentration of the extracted water by shifting cin with its residence time. This corresponds to a convolution operation. 

15- gamma_infiltration_to_extraction: Similar to infiltration_to_extraction, but for a gamma distribution of aquifer pore volumes. 

16- distribution_infiltration_to_extraction: Similar to infiltration_to_extraction, but for an arbitrary distribution of aquifer pore volumes. 

17""" 

18 

19import numpy as np 

20import numpy.typing as npt 

21import pandas as pd 

22 

23from gwtransport import gamma 

24from gwtransport.residence_time import residence_time 

25from gwtransport.utils import compute_time_edges, interp_series, partial_isin 

26 

27 

28def infiltration_to_extraction( 

29 cin_series: pd.Series, 

30 flow_series: pd.Series, 

31 aquifer_pore_volume: float, 

32 retardation_factor: float = 1.0, 

33 cout_index: str = "cin", 

34) -> pd.Series: 

35 """ 

36 Compute the concentration of the extracted water by shifting cin with its residence time. 

37 

38 The compound is retarded in the aquifer with a retardation factor. The residence 

39 time is computed based on the flow rate of the water in the aquifer and the pore volume 

40 of the aquifer. 

41 

42 This function represents infiltration to extraction modeling (equivalent to convolution). 

43 

44 Parameters 

45 ---------- 

46 cin_series : pandas.Series 

47 Concentration of the compound in the infiltrating water [ng/m3]. The cin_series should be the average concentration of a time bin. The index should be a pandas.DatetimeIndex 

48 and is labeled at the end of the time bin. 

49 flow_series : pandas.Series 

50 Flow rate of water in the aquifer [m3/day]. The flow_series should be the average flow of a time bin. The index should be a pandas.DatetimeIndex 

51 and is labeled at the end of the time bin. 

52 aquifer_pore_volume : float 

53 Pore volume of the aquifer [m3]. 

54 cout_index : str, optional 

55 The index of the output series. Can be 'cin', 'flow', or 'cout'. Default is 'cin'. 

56 - 'cin': The output series will have the same index as `cin_series`. 

57 - 'flow': The output series will have the same index as `flow_series`. 

58 - 'cout': The output series will have the same index as `cin_series + residence_time`. 

59 

60 Returns 

61 ------- 

62 pandas.Series or numpy.ndarray 

63 Concentration of the compound in the extracted water [ng/m3]. 

64 Returns pandas.Series when cout_index is 'cin' or 'flow', or numpy.ndarray when cout_index is 'cout'. 

65 

66 Examples 

67 -------- 

68 Basic usage with single pore volume: 

69 

70 >>> import pandas as pd 

71 >>> import numpy as np 

72 >>> from gwtransport.advection import infiltration_to_extraction 

73 >>> 

74 >>> # Create input data 

75 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-10", freq="D") 

76 >>> cin = pd.Series(np.ones(len(dates)), index=dates) 

77 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day 

78 >>> 

79 >>> # Single aquifer pore volume 

80 >>> aquifer_pore_volume = 500.0 # m3 

81 >>> 

82 >>> # Run infiltration to extraction model (default returns on cin index) 

83 >>> cout = infiltration_to_extraction(cin, flow, aquifer_pore_volume) 

84 >>> len(cout) == len(cin) 

85 True 

86 

87 Different output index options: 

88 

89 >>> # Output on cin index (default) 

90 >>> cout_cin = infiltration_to_extraction( 

91 ... cin, flow, aquifer_pore_volume, cout_index="cin" 

92 ... ) 

93 >>> 

94 >>> # Output on flow index 

95 >>> cout_flow = infiltration_to_extraction( 

96 ... cin, flow, aquifer_pore_volume, cout_index="flow" 

97 ... ) 

98 >>> 

99 >>> # Output on shifted time index (cin + residence_time) 

100 >>> cout_shifted = infiltration_to_extraction( 

101 ... cin, flow, aquifer_pore_volume, cout_index="cout" 

102 ... ) 

103 

104 With retardation factor: 

105 

106 >>> # Compound moves twice as slowly due to sorption 

107 >>> cout = infiltration_to_extraction( 

108 ... cin, flow, aquifer_pore_volume, retardation_factor=2.0 

109 ... ) 

110 """ 

111 # Create flow tedges from the flow series index (assuming it's at the end of bins) 

112 flow_tedges = compute_time_edges( 

113 tedges=None, tstart=None, tend=pd.DatetimeIndex(flow_series.index), number_of_bins=len(flow_series) 

114 ) 

115 rt_array = residence_time( 

116 flow=flow_series, 

117 flow_tedges=flow_tedges, 

118 index=pd.DatetimeIndex(cin_series.index), 

119 aquifer_pore_volume=aquifer_pore_volume, 

120 retardation_factor=retardation_factor, 

121 direction="infiltration_to_extraction", 

122 ) 

123 

124 rt = pd.to_timedelta(rt_array[0], unit="D", errors="coerce") 

125 index = cin_series.index + rt 

126 

127 cout = pd.Series(data=cin_series.values, index=index, name="cout") 

128 

129 if cout_index == "cin": 

130 return interp_series(cout, pd.DatetimeIndex(cin_series.index)) 

131 if cout_index == "flow": 

132 # If cout_index is 'flow', we need to resample cout to the flow index 

133 return interp_series(cout, pd.DatetimeIndex(flow_series.index)) 

134 if cout_index == "cout": 

135 # If cout_index is 'cout', we return the cout as is 

136 return cout.values 

137 

138 msg = f"Invalid cout_index: {cout_index}. Must be 'cin', 'flow', or 'cout'." 

139 raise ValueError(msg) 

140 

141 

142def extraction_to_infiltration( 

143 cout: pd.Series, flow: pd.Series, aquifer_pore_volume: float, retardation_factor: float = 1.0, resample_dates=None 

144) -> pd.Series: 

145 """ 

146 Compute the concentration of the infiltrating water by shifting cout with its residence time. 

147 

148 This function represents extraction to infiltration modeling (equivalent to deconvolution). 

149 

150 Parameters 

151 ---------- 

152 cout : pandas.Series 

153 Concentration of the compound in the extracted water [ng/m3]. 

154 flow : pandas.Series 

155 Flow rate of water in the aquifer [m3/day]. 

156 aquifer_pore_volume : float 

157 Pore volume of the aquifer [m3]. 

158 

159 Returns 

160 ------- 

161 numpy.ndarray 

162 Concentration of the compound in the infiltrating water [ng/m3]. 

163 """ 

164 msg = "Extraction to infiltration advection (deconvolution) is not implemented yet" 

165 raise NotImplementedError(msg) 

166 

167 

168def gamma_infiltration_to_extraction( 

169 *, 

170 cin, 

171 flow, 

172 tedges, 

173 cout_tedges, 

174 alpha=None, 

175 beta=None, 

176 mean=None, 

177 std=None, 

178 n_bins=100, 

179 retardation_factor=1.0, 

180): 

181 """ 

182 Compute the concentration of the extracted water by shifting cin with its residence time. 

183 

184 The compound is retarded in the aquifer with a retardation factor. The residence 

185 time is computed based on the flow rate of the water in the aquifer and the pore volume 

186 of the aquifer. The aquifer pore volume is approximated by a gamma distribution, with 

187 parameters alpha and beta. 

188 

189 This function represents infiltration to extraction modeling (equivalent to convolution). 

190 

191 Provide either alpha and beta or mean and std. 

192 

193 Parameters 

194 ---------- 

195 cin : pandas.Series 

196 Concentration of the compound in infiltrating water or temperature of infiltrating 

197 water. 

198 cin_tedges : pandas.DatetimeIndex 

199 Time edges for the concentration data. Used to compute the cumulative concentration. 

200 Has a length of one more than `cin`. 

201 cout_tedges : pandas.DatetimeIndex 

202 Time edges for the output data. Used to compute the cumulative concentration. 

203 Has a length of one more than `flow`. 

204 flow : pandas.Series 

205 Flow rate of water in the aquifer [m3/day]. 

206 flow_tedges : pandas.DatetimeIndex 

207 Time edges for the flow data. Used to compute the cumulative flow. 

208 Has a length of one more than `flow`. 

209 alpha : float, optional 

210 Shape parameter of gamma distribution of the aquifer pore volume (must be > 0) 

211 beta : float, optional 

212 Scale parameter of gamma distribution of the aquifer pore volume (must be > 0) 

213 mean : float, optional 

214 Mean of the gamma distribution. 

215 std : float, optional 

216 Standard deviation of the gamma distribution. 

217 n_bins : int 

218 Number of bins to discretize the gamma distribution. 

219 retardation_factor : float 

220 Retardation factor of the compound in the aquifer. 

221 

222 Returns 

223 ------- 

224 numpy.ndarray 

225 Concentration of the compound in the extracted water [ng/m3] or temperature. 

226 

227 Examples 

228 -------- 

229 Basic usage with alpha and beta parameters: 

230 

231 >>> import pandas as pd 

232 >>> import numpy as np 

233 >>> from gwtransport.utils import compute_time_edges 

234 >>> from gwtransport.advection import gamma_infiltration_to_extraction 

235 >>> 

236 >>> # Create input data with aligned time edges 

237 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

238 >>> tedges = compute_time_edges( 

239 ... tedges=None, tstart=None, tend=dates, number_of_bins=len(dates) 

240 ... ) 

241 >>> 

242 >>> # Create output time edges (can be different alignment) 

243 >>> cout_dates = pd.date_range(start="2020-01-05", end="2020-01-15", freq="D") 

244 >>> cout_tedges = compute_time_edges( 

245 ... tedges=None, tstart=None, tend=cout_dates, number_of_bins=len(cout_dates) 

246 ... ) 

247 >>> 

248 >>> # Input concentration and flow (same length, aligned with tedges) 

249 >>> cin = pd.Series(np.ones(len(dates)), index=dates) 

250 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day 

251 >>> 

252 >>> # Run gamma_infiltration_to_extraction with alpha/beta parameters 

253 >>> cout = gamma_infiltration_to_extraction( 

254 ... cin=cin, 

255 ... cin_tedges=tedges, 

256 ... cout_tedges=cout_tedges, 

257 ... flow=flow, 

258 ... flow_tedges=tedges, # Must be identical to cin_tedges 

259 ... alpha=10.0, 

260 ... beta=10.0, 

261 ... n_bins=5, 

262 ... ) 

263 >>> cout.shape 

264 (11,) 

265 

266 Using mean and std parameters instead: 

267 

268 >>> cout = gamma_infiltration_to_extraction( 

269 ... cin=cin, 

270 ... cin_tedges=tedges, 

271 ... cout_tedges=cout_tedges, 

272 ... flow=flow, 

273 ... flow_tedges=tedges, 

274 ... mean=100.0, 

275 ... std=20.0, 

276 ... n_bins=5, 

277 ... ) 

278 

279 With retardation factor: 

280 

281 >>> cout = gamma_infiltration_to_extraction( 

282 ... cin=cin, 

283 ... cin_tedges=tedges, 

284 ... cout_tedges=cout_tedges, 

285 ... flow=flow, 

286 ... flow_tedges=tedges, 

287 ... alpha=10.0, 

288 ... beta=10.0, 

289 ... retardation_factor=2.0, # Doubles residence time 

290 ... ) 

291 """ 

292 bins = gamma.bins(alpha=alpha, beta=beta, mean=mean, std=std, n_bins=n_bins) 

293 return distribution_infiltration_to_extraction( 

294 cin=cin, 

295 flow=flow, 

296 tedges=tedges, 

297 cout_tedges=cout_tedges, 

298 aquifer_pore_volumes=bins["expected_values"], 

299 retardation_factor=retardation_factor, 

300 ) 

301 

302 

303def gamma_extraction_to_infiltration(cout, flow, alpha, beta, n_bins=100, retardation_factor=1.0): 

304 """ 

305 Compute the concentration of the infiltrating water by shifting cout with its residence time. 

306 

307 This function represents extraction to infiltration modeling (equivalent to deconvolution). 

308 

309 Parameters 

310 ---------- 

311 cout : pandas.Series 

312 Concentration of the compound in the extracted water [ng/m3]. 

313 flow : pandas.Series 

314 Flow rate of water in the aquifer [m3/day]. 

315 alpha : float 

316 Shape parameter of gamma distribution of the aquifer pore volume (must be > 0) 

317 beta : float 

318 Scale parameter of gamma distribution of the aquifer pore volume (must be > 0) 

319 n_bins : int 

320 Number of bins to discretize the gamma distribution. 

321 retardation_factor : float 

322 Retardation factor of the compound in the aquifer. 

323 

324 Returns 

325 ------- 

326 NotImplementedError 

327 This function is not yet implemented. 

328 """ 

329 msg = "Extraction to infiltration advection gamma (deconvolution) is not implemented yet" 

330 raise NotImplementedError(msg) 

331 

332 

333def distribution_infiltration_to_extraction( 

334 *, 

335 cin: npt.ArrayLike, 

336 flow: npt.ArrayLike, 

337 tedges: pd.DatetimeIndex, 

338 cout_tedges: pd.DatetimeIndex, 

339 aquifer_pore_volumes: npt.ArrayLike, 

340 retardation_factor: float = 1.0, 

341) -> np.ndarray: 

342 """ 

343 Compute the concentration of the extracted water using flow-weighted advection. 

344 

345 This function implements an infiltration to extraction advection model where cin and flow values 

346 correspond to the same aligned time bins defined by tedges. 

347 

348 The algorithm: 

349 1. Computes residence times for each pore volume at cout time edges 

350 2. Calculates infiltration time edges by subtracting residence times 

351 3. Determines temporal overlaps between infiltration and cin time windows 

352 4. Creates flow-weighted overlap matrices normalized by total weights 

353 5. Computes weighted contributions and averages across pore volumes 

354 

355 Parameters 

356 ---------- 

357 cin : array-like 

358 Concentration values of infiltrating water or temperature [concentration units]. 

359 Length must match the number of time bins defined by tedges. 

360 flow : array-like 

361 Flow rate values in the aquifer [m3/day]. 

362 Length must match cin and the number of time bins defined by tedges. 

363 tedges : pandas.DatetimeIndex 

364 Time edges defining bins for both cin and flow data. Has length of 

365 len(cin) + 1 and len(flow) + 1. 

366 cout_tedges : pandas.DatetimeIndex 

367 Time edges for output data bins. Has length of desired output + 1. 

368 Can have different time alignment and resolution than tedges. 

369 aquifer_pore_volumes : array-like 

370 Array of aquifer pore volumes [m3] representing the distribution 

371 of residence times in the aquifer system. 

372 retardation_factor : float, optional 

373 Retardation factor of the compound in the aquifer (default 1.0). 

374 Values > 1.0 indicate slower transport due to sorption/interaction. 

375 

376 Returns 

377 ------- 

378 numpy.ndarray 

379 Flow-weighted concentration in the extracted water. Same units as cin. 

380 Length equals len(cout_tedges) - 1. NaN values indicate time periods 

381 with no valid contributions from the infiltration data. 

382 

383 Raises 

384 ------ 

385 ValueError 

386 If tedges length doesn't match cin/flow arrays plus one, or if 

387 infiltration time edges become non-monotonic (invalid input conditions). 

388 

389 Examples 

390 -------- 

391 Basic usage with pandas Series: 

392 

393 >>> import pandas as pd 

394 >>> import numpy as np 

395 >>> from gwtransport.utils import compute_time_edges 

396 >>> from gwtransport.advection import distribution_infiltration_to_extraction 

397 >>> 

398 >>> # Create input data 

399 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

400 >>> tedges = compute_time_edges( 

401 ... tedges=None, tstart=None, tend=dates, number_of_bins=len(dates) 

402 ... ) 

403 >>> 

404 >>> # Create output time edges (different alignment) 

405 >>> cout_dates = pd.date_range(start="2020-01-05", end="2020-01-15", freq="D") 

406 >>> cout_tedges = compute_time_edges( 

407 ... tedges=None, tstart=None, tend=cout_dates, number_of_bins=len(cout_dates) 

408 ... ) 

409 >>> 

410 >>> # Input concentration and flow 

411 >>> cin = pd.Series(np.ones(len(dates)), index=dates) 

412 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day 

413 >>> 

414 >>> # Define distribution of aquifer pore volumes 

415 >>> aquifer_pore_volumes = np.array([50, 100, 200]) # m3 

416 >>> 

417 >>> # Run distribution_infiltration_to_extraction 

418 >>> cout = distribution_infiltration_to_extraction( 

419 ... cin=cin, 

420 ... flow=flow, 

421 ... tedges=tedges, 

422 ... cout_tedges=cout_tedges, 

423 ... aquifer_pore_volumes=aquifer_pore_volumes, 

424 ... ) 

425 >>> cout.shape 

426 (11,) 

427 

428 Using array inputs instead of pandas Series: 

429 

430 >>> # Convert to arrays 

431 >>> cin_values = cin.values 

432 >>> flow_values = flow.values 

433 >>> 

434 >>> cout = distribution_infiltration_to_extraction( 

435 ... cin=cin_values, 

436 ... flow=flow_values, 

437 ... tedges=tedges, 

438 ... cout_tedges=cout_tedges, 

439 ... aquifer_pore_volumes=aquifer_pore_volumes, 

440 ... ) 

441 

442 With retardation factor: 

443 

444 >>> cout = distribution_infiltration_to_extraction( 

445 ... cin=cin, 

446 ... flow=flow, 

447 ... tedges=tedges, 

448 ... cout_tedges=cout_tedges, 

449 ... aquifer_pore_volumes=aquifer_pore_volumes, 

450 ... retardation_factor=2.0, # Compound moves twice as slowly 

451 ... ) 

452 

453 Using single pore volume: 

454 

455 >>> single_volume = np.array([100]) # Single 100 m3 pore volume 

456 >>> cout = distribution_infiltration_to_extraction( 

457 ... cin=cin, 

458 ... flow=flow, 

459 ... tedges=tedges, 

460 ... cout_tedges=cout_tedges, 

461 ... aquifer_pore_volumes=single_volume, 

462 ... ) 

463 """ 

464 tedges = pd.DatetimeIndex(tedges) 

465 cout_tedges = pd.DatetimeIndex(cout_tedges) 

466 

467 if len(tedges) != len(cin) + 1: 

468 msg = "tedges must have one more element than cin" 

469 raise ValueError(msg) 

470 if len(tedges) != len(flow) + 1: 

471 msg = "tedges must have one more element than flow" 

472 raise ValueError(msg) 

473 

474 # Convert to arrays for vectorized operations 

475 cin_values = np.asarray(cin) 

476 flow_values = np.asarray(flow) 

477 

478 # Validate inputs do not contain NaN values 

479 if np.any(np.isnan(cin_values)): 

480 msg = "cin contains NaN values, which are not allowed" 

481 raise ValueError(msg) 

482 if np.any(np.isnan(flow_values)): 

483 msg = "flow contains NaN values, which are not allowed" 

484 raise ValueError(msg) 

485 cin_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values 

486 cout_tedges_days = ((cout_tedges - tedges[0]) / pd.Timedelta(days=1)).values 

487 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes) 

488 

489 # Pre-compute all residence times and infiltration edges 

490 rt_edges_2d = residence_time( 

491 flow=flow_values, 

492 flow_tedges=tedges, 

493 index=cout_tedges, 

494 aquifer_pore_volume=aquifer_pore_volumes, 

495 retardation_factor=retardation_factor, 

496 direction="extraction_to_infiltration", 

497 ) 

498 infiltration_tedges_2d = cout_tedges_days[None, :] - rt_edges_2d 

499 

500 # Pre-compute valid bins and count 

501 valid_bins_2d = ~(np.isnan(infiltration_tedges_2d[:, :-1]) | np.isnan(infiltration_tedges_2d[:, 1:])) 

502 valid_pv_count = np.sum(valid_bins_2d, axis=0) 

503 

504 # Initialize accumulator 

505 normalized_weights = _distribution_infiltration_to_extraction_weights( 

506 cout_tedges, 

507 aquifer_pore_volumes, 

508 cin_values, 

509 flow_values, 

510 cin_tedges_days, 

511 infiltration_tedges_2d, 

512 valid_bins_2d, 

513 valid_pv_count, 

514 ) 

515 

516 # Apply to concentrations and handle NaN for periods with no contributions 

517 out = normalized_weights.dot(cin_values) 

518 out[valid_pv_count == 0] = np.nan 

519 

520 return out 

521 

522 

523def _distribution_infiltration_to_extraction_weights( 

524 cout_tedges, 

525 aquifer_pore_volumes, 

526 cin_values, 

527 flow_values, 

528 cin_tedges_days, 

529 infiltration_tedges_2d, 

530 valid_bins_2d, 

531 valid_pv_count, 

532): 

533 accumulated_weights = np.zeros((len(cout_tedges) - 1, len(cin_values))) 

534 

535 # Loop over each pore volume 

536 for i in range(len(aquifer_pore_volumes)): 

537 if np.any(valid_bins_2d[i, :]): 

538 overlap_matrix = partial_isin(infiltration_tedges_2d[i, :], cin_tedges_days) 

539 accumulated_weights[valid_bins_2d[i, :], :] += overlap_matrix[valid_bins_2d[i, :], :] 

540 

541 # Average across valid pore volumes and apply flow weighting 

542 averaged_weights = np.zeros_like(accumulated_weights) 

543 valid_cout = valid_pv_count > 0 

544 averaged_weights[valid_cout, :] = accumulated_weights[valid_cout, :] / valid_pv_count[valid_cout, None] 

545 

546 # Apply flow weighting after averaging 

547 flow_weighted_averaged = averaged_weights * flow_values[None, :] 

548 

549 total_weights = np.sum(flow_weighted_averaged, axis=1) 

550 valid_weights = total_weights > 0 

551 normalized_weights = np.zeros_like(flow_weighted_averaged) 

552 normalized_weights[valid_weights, :] = flow_weighted_averaged[valid_weights, :] / total_weights[valid_weights, None] 

553 return normalized_weights 

554 

555 

556def distribution_extraction_to_infiltration( 

557 *, 

558 cout: npt.ArrayLike, 

559 flow: npt.ArrayLike, 

560 tedges: pd.DatetimeIndex, 

561 cin_tedges: pd.DatetimeIndex, 

562 aquifer_pore_volumes: npt.ArrayLike, 

563 retardation_factor: float = 1.0, 

564) -> np.ndarray: 

565 """ 

566 Compute the concentration of the infiltrating water from extracted water (deconvolution). 

567 

568 This function implements an extraction to infiltration advection model (inverse of distribution_infiltration_to_extraction) 

569 where cout and flow values correspond to the same aligned time bins defined by tedges. 

570 

571 SYMMETRIC RELATIONSHIP: 

572 - distribution_infiltration_to_extraction: cin + tedges → cout + cout_tedges 

573 - distribution_extraction_to_infiltration: cout + tedges → cin + cin_tedges 

574 

575 The algorithm (symmetric to distribution_infiltration_to_extraction): 

576 1. Computes residence times for each pore volume at cint time edges 

577 2. Calculates extraction time edges by adding residence times (reverse of infiltration_to_extraction) 

578 3. Determines temporal overlaps between extraction and cout time windows 

579 4. Creates flow-weighted overlap matrices normalized by total weights 

580 5. Computes weighted contributions and averages across pore volumes 

581 

582 Parameters 

583 ---------- 

584 cout : array-like 

585 Concentration values of extracted water [concentration units]. 

586 Length must match the number of time bins defined by tedges. 

587 flow : array-like 

588 Flow rate values in the aquifer [m3/day]. 

589 Length must match cout and the number of time bins defined by tedges. 

590 tedges : pandas.DatetimeIndex 

591 Time edges defining bins for both cout and flow data. Has length of 

592 len(cout) + 1 and len(flow) + 1. 

593 cin_tedges : pandas.DatetimeIndex 

594 Time edges for output (infiltration) data bins. Has length of desired output + 1. 

595 Can have different time alignment and resolution than tedges. 

596 aquifer_pore_volumes : array-like 

597 Array of aquifer pore volumes [m3] representing the distribution 

598 of residence times in the aquifer system. 

599 retardation_factor : float, optional 

600 Retardation factor of the compound in the aquifer (default 1.0). 

601 Values > 1.0 indicate slower transport due to sorption/interaction. 

602 

603 Returns 

604 ------- 

605 numpy.ndarray 

606 Flow-weighted concentration in the infiltrating water. Same units as cout. 

607 Length equals len(cin_tedges) - 1. NaN values indicate time periods 

608 with no valid contributions from the extraction data. 

609 

610 Raises 

611 ------ 

612 ValueError 

613 If tedges length doesn't match cout/flow arrays plus one, or if 

614 extraction time edges become non-monotonic (invalid input conditions). 

615 

616 Examples 

617 -------- 

618 Basic usage with pandas Series: 

619 

620 >>> import pandas as pd 

621 >>> import numpy as np 

622 >>> from gwtransport.utils import compute_time_edges 

623 >>> from gwtransport.advection import distribution_extraction_to_infiltration 

624 >>> 

625 >>> # Create input data 

626 >>> dates = pd.date_range(start="2020-01-01", end="2020-01-20", freq="D") 

627 >>> tedges = compute_time_edges( 

628 ... tedges=None, tstart=None, tend=dates, number_of_bins=len(dates) 

629 ... ) 

630 >>> 

631 >>> # Create output time edges (different alignment) 

632 >>> cint_dates = pd.date_range(start="2019-12-25", end="2020-01-15", freq="D") 

633 >>> cin_tedges = compute_time_edges( 

634 ... tedges=None, tstart=None, tend=cint_dates, number_of_bins=len(cint_dates) 

635 ... ) 

636 >>> 

637 >>> # Input concentration and flow 

638 >>> cout = pd.Series(np.ones(len(dates)), index=dates) 

639 >>> flow = pd.Series(np.ones(len(dates)) * 100, index=dates) # 100 m3/day 

640 >>> 

641 >>> # Define distribution of aquifer pore volumes 

642 >>> aquifer_pore_volumes = np.array([50, 100, 200]) # m3 

643 >>> 

644 >>> # Run distribution_extraction_to_infiltration 

645 >>> cin = distribution_extraction_to_infiltration( 

646 ... cout=cout, 

647 ... flow=flow, 

648 ... tedges=tedges, 

649 ... cin_tedges=cin_tedges, 

650 ... aquifer_pore_volumes=aquifer_pore_volumes, 

651 ... ) 

652 >>> cin.shape 

653 (22,) 

654 

655 Using array inputs instead of pandas Series: 

656 

657 >>> # Convert to arrays 

658 >>> cout_values = cout.values 

659 >>> flow_values = flow.values 

660 >>> 

661 >>> cin = distribution_extraction_to_infiltration( 

662 ... cout=cout_values, 

663 ... flow=flow_values, 

664 ... tedges=tedges, 

665 ... cin_tedges=cin_tedges, 

666 ... aquifer_pore_volumes=aquifer_pore_volumes, 

667 ... ) 

668 

669 With retardation factor: 

670 

671 >>> cin = distribution_extraction_to_infiltration( 

672 ... cout=cout, 

673 ... flow=flow, 

674 ... tedges=tedges, 

675 ... cin_tedges=cin_tedges, 

676 ... aquifer_pore_volumes=aquifer_pore_volumes, 

677 ... retardation_factor=2.0, # Compound moves twice as slowly 

678 ... ) 

679 

680 Using single pore volume: 

681 

682 >>> single_volume = np.array([100]) # Single 100 m3 pore volume 

683 >>> cin = distribution_extraction_to_infiltration( 

684 ... cout=cout, 

685 ... flow=flow, 

686 ... tedges=tedges, 

687 ... cin_tedges=cin_tedges, 

688 ... aquifer_pore_volumes=single_volume, 

689 ... ) 

690 """ 

691 tedges = pd.DatetimeIndex(tedges) 

692 cin_tedges = pd.DatetimeIndex(cin_tedges) 

693 

694 if len(tedges) != len(cout) + 1: 

695 msg = "tedges must have one more element than cout" 

696 raise ValueError(msg) 

697 if len(tedges) != len(flow) + 1: 

698 msg = "tedges must have one more element than flow" 

699 raise ValueError(msg) 

700 

701 # Convert to arrays for vectorized operations 

702 cout_values = np.asarray(cout) 

703 flow_values = np.asarray(flow) 

704 

705 # Validate inputs do not contain NaN values 

706 if np.any(np.isnan(cout_values)): 

707 msg = "cout contains NaN values, which are not allowed" 

708 raise ValueError(msg) 

709 if np.any(np.isnan(flow_values)): 

710 msg = "flow contains NaN values, which are not allowed" 

711 raise ValueError(msg) 

712 cout_tedges_days = ((tedges - tedges[0]) / pd.Timedelta(days=1)).values 

713 cin_tedges_days = ((cin_tedges - tedges[0]) / pd.Timedelta(days=1)).values 

714 aquifer_pore_volumes = np.asarray(aquifer_pore_volumes) 

715 

716 # Pre-compute all residence times and extraction edges (symmetric to infiltration_to_extraction) 

717 rt_edges_2d = residence_time( 

718 flow=flow_values, 

719 flow_tedges=tedges, 

720 index=cin_tedges, 

721 aquifer_pore_volume=aquifer_pore_volumes, 

722 retardation_factor=retardation_factor, 

723 direction="infiltration_to_extraction", # Computing from infiltration perspective 

724 ) 

725 extraction_tedges_2d = cin_tedges_days[None, :] + rt_edges_2d 

726 

727 # Pre-compute valid bins and count 

728 valid_bins_2d = ~(np.isnan(extraction_tedges_2d[:, :-1]) | np.isnan(extraction_tedges_2d[:, 1:])) 

729 valid_pv_count = np.sum(valid_bins_2d, axis=0) 

730 

731 # Initialize accumulator 

732 normalized_weights = _distribution_extraction_to_infiltration_weights( 

733 cin_tedges, 

734 aquifer_pore_volumes, 

735 cout_values, 

736 flow_values, 

737 cout_tedges_days, 

738 extraction_tedges_2d, 

739 valid_bins_2d, 

740 valid_pv_count, 

741 ) 

742 

743 # Apply to concentrations and handle NaN for periods with no contributions 

744 out = normalized_weights.dot(cout_values) 

745 out[valid_pv_count == 0] = np.nan 

746 

747 return out 

748 

749 

750def _distribution_extraction_to_infiltration_weights( 

751 cin_tedges, 

752 aquifer_pore_volumes, 

753 cout_values, 

754 flow_values, 

755 cout_tedges_days, 

756 extraction_tedges_2d, 

757 valid_bins_2d, 

758 valid_pv_count, 

759): 

760 """ 

761 Compute extraction to infiltration transformation weights matrix. 

762 

763 Computes the weight matrix for the extraction to infiltration transformation, 

764 ensuring mathematical symmetry with the infiltration to extraction operation. The extraction to infiltration 

765 weights represent the transpose relationship needed for deconvolution. 

766 

767 SYMMETRIC RELATIONSHIP: 

768 - Infiltration to extraction weights: W_infiltration_to_extraction maps cin → cout 

769 - Extraction to infiltration weights: W_extraction_to_infiltration maps cout → cin 

770 - Mathematical constraint: W_extraction_to_infiltration should be the pseudo-inverse of W_infiltration_to_extraction 

771 

772 The algorithm mirrors _distribution_infiltration_to_extraction_weights but with transposed 

773 temporal overlap computations to ensure mathematical consistency. 

774 

775 Parameters 

776 ---------- 

777 cin_tedges : pandas.DatetimeIndex 

778 Time edges for output (infiltration) data bins. 

779 aquifer_pore_volumes : array-like 

780 Array of aquifer pore volumes [m3]. 

781 cout_values : array-like 

782 Concentration values of extracted water. 

783 flow_values : array-like 

784 Flow rate values in the aquifer [m3/day]. 

785 cout_tedges_days : array-like 

786 Time edges for cout data in days since reference. 

787 extraction_tedges_2d : array-like 

788 2D array of extraction time edges for each pore volume. 

789 valid_bins_2d : array-like 

790 2D boolean array indicating valid time bins for each pore volume. 

791 valid_pv_count : array-like 

792 Count of valid pore volumes for each output time bin. 

793 

794 Returns 

795 ------- 

796 numpy.ndarray 

797 Normalized weight matrix for extraction to infiltration transformation. 

798 Shape: (len(cin_tedges) - 1, len(cout_values)) 

799 """ 

800 accumulated_weights = np.zeros((len(cin_tedges) - 1, len(cout_values))) 

801 

802 # Loop over each pore volume (same structure as infiltration_to_extraction) 

803 for i in range(len(aquifer_pore_volumes)): 

804 if np.any(valid_bins_2d[i, :]): 

805 # SYMMETRIC temporal overlap computation: 

806 # Infiltration to extraction: maps infiltration → cout time windows 

807 # Extraction to infiltration: maps extraction → cout time windows (transposed relationship) 

808 overlap_matrix = partial_isin(extraction_tedges_2d[i, :], cout_tedges_days) 

809 accumulated_weights[valid_bins_2d[i, :], :] += overlap_matrix[valid_bins_2d[i, :], :] 

810 

811 # Average across valid pore volumes (symmetric to infiltration_to_extraction) 

812 averaged_weights = np.zeros_like(accumulated_weights) 

813 valid_cout = valid_pv_count > 0 

814 averaged_weights[valid_cout, :] = accumulated_weights[valid_cout, :] / valid_pv_count[valid_cout, None] 

815 

816 # Apply flow weighting (symmetric to infiltration_to_extraction) 

817 flow_weighted_averaged = averaged_weights * flow_values[None, :] 

818 

819 # Normalize by total weights (symmetric to infiltration_to_extraction) 

820 total_weights = np.sum(flow_weighted_averaged, axis=1) 

821 valid_weights = total_weights > 0 

822 normalized_weights = np.zeros_like(flow_weighted_averaged) 

823 normalized_weights[valid_weights, :] = flow_weighted_averaged[valid_weights, :] / total_weights[valid_weights, None] 

824 

825 return normalized_weights