Coverage for src/rok4/storage.py: 80%

537 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-01 15:35 +0000

1"""Provide functions to read or write data 

2 

3Available storage types are : 

4 

5- S3 (path are preffixed with `s3://`) 

6- CEPH (path are prefixed with `ceph://`) 

7- FILE (path are prefixed with `file://`, but it is the default paths' interpretation) 

8- HTTP (path are prefixed with `http://`) 

9- HTTPS (path are prefixed with `https://`) 

10 

11According to functions, all storage types are not necessarily available. 

12 

13Readings uses a LRU cache system with a TTL. It's possible to configure it with environment variables : 

14 

15- ROK4_READING_LRU_CACHE_SIZE : Number of cached element. Default 64. Set 0 or a negative integer to configure a cache without bound. A power of two make cache more efficient. 

16- ROK4_READING_LRU_CACHE_TTL : Validity duration of cached element, in seconds. Default 300. 0 or negative integer to get cache without expiration date. 

17 

18To disable cache (always read data on storage), set ROK4_READING_LRU_CACHE_SIZE to 1 and ROK4_READING_LRU_CACHE_TTL to 1. 

19 

20Using CEPH storage requires environment variables : 

21 

22- ROK4_CEPH_CONFFILE 

23- ROK4_CEPH_USERNAME 

24- ROK4_CEPH_CLUSTERNAME 

25 

26Using S3 storage requires environment variables : 

27 

28- ROK4_S3_KEY 

29- ROK4_S3_SECRETKEY 

30- ROK4_S3_URL 

31 

32To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements 

33 

34Example, work with 2 S3 clusters: 

35 

36- ROK4_S3_KEY=KEY1,KEY2 

37- ROK4_S3_SECRETKEY=SKEY1,SKEY2 

38- ROK4_S3_URL=https://s3.storage.fr,https://s4.storage.fr 

39 

40To precise the cluster to use, bucket name should be bucket_name@s3.storage.fr or bucket_name@s4.storage.fr. If no host is defined (no @) in the bucket name, first S3 cluster is used 

41""" 

42 

43import hashlib 

44import os 

45import re 

46import tempfile 

47import time 

48from functools import lru_cache 

49from shutil import copyfile 

50from typing import Dict, Tuple, Union 

51 

52import boto3 

53import botocore.exceptions 

54import requests 

55 

56# conditional import 

57 

58try: 

59 from osgeo import gdal 

60 

61 # Enable GDAL/OGR exceptions 

62 gdal.UseExceptions() 

63 

64 GDAL_AVAILABLE: bool = True 

65except ImportError: 

66 GDAL_AVAILABLE: bool = False 

67 gdal = None 

68 

69 

70try: 

71 import rados 

72 

73 CEPH_RADOS_AVAILABLE: bool = True 

74except ImportError: 

75 CEPH_RADOS_AVAILABLE: bool = False 

76 rados = None 

77 

78# package 

79from rok4.enums import StorageType 

80from rok4.exceptions import MissingEnvironmentError, StorageError 

81 

82# -- GLOBALS -- 

83 

84 

85__CEPH_CLIENT = None 

86__CEPH_IOCTXS = {} 

87__OBJECT_SYMLINK_SIGNATURE = "SYMLINK#" 

88__S3_CLIENTS = {} 

89__S3_DEFAULT_CLIENT = None 

90__LRU_SIZE = 64 

91__LRU_TTL = 300 

92 

93try: 

94 __LRU_SIZE = int(os.environ["ROK4_READING_LRU_CACHE_SIZE"]) 

95 if __LRU_SIZE < 1: 

96 __LRU_SIZE = None 

97except ValueError: 

98 pass 

99except KeyError: 

100 pass 

101 

102try: 

103 __LRU_TTL = int(os.environ["ROK4_READING_LRU_CACHE_TTL"]) 

104 if __LRU_TTL < 0: 

105 __LRU_TTL = 0 

106except ValueError: 

107 pass 

108except KeyError: 

109 pass 

110 

111 

112def __get_ttl_hash() -> int: 

113 """Return the time string rounded according to time-to-live value""" 

114 if __LRU_TTL == 0: 

115 return 0 

116 else: 

117 return round(time.time() / __LRU_TTL) 

118 

119 

120def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]: 

121 """Get the S3 client 

122 

123 Create it if not already done 

124 

125 Args: 

126 bucket_name (str): S3 bucket name. Could be just the bucket name, or <bucket name>@<cluster host> 

127 

128 Raises: 

129 MissingEnvironmentError: Missing S3 storage informations 

130 StorageError: S3 client configuration issue 

131 

132 Returns: 

133 Tuple[Dict[str, Union['boto3.client',str]], str]: the S3 informations (client, host, key, secret) and the simple bucket name 

134 """ 

135 

136 global __S3_CLIENTS, __S3_DEFAULT_CLIENT 

137 

138 if not __S3_CLIENTS: 

139 verify = True 

140 if "ROK4_SSL_NO_VERIFY" in os.environ and os.environ["ROK4_SSL_NO_VERIFY"] != "": 

141 verify = False 

142 # C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement 

143 try: 

144 keys = os.environ["ROK4_S3_KEY"].split(",") 

145 secret_keys = os.environ["ROK4_S3_SECRETKEY"].split(",") 

146 urls = os.environ["ROK4_S3_URL"].split(",") 

147 

148 if len(keys) != len(secret_keys) or len(keys) != len(urls): 

149 raise StorageError( 

150 "S3", 

151 "S3 informations in environment variables are inconsistent : same number of element in each list is required", 

152 ) 

153 

154 for i in range(len(keys)): 

155 h = re.sub("https?://", "", urls[i]) 

156 

157 if h in __S3_CLIENTS: 

158 raise StorageError("S3", "A S3 cluster is defined twice (based on URL)") 

159 

160 __S3_CLIENTS[h] = { 

161 "client": boto3.client( 

162 "s3", 

163 aws_access_key_id=keys[i], 

164 aws_secret_access_key=secret_keys[i], 

165 verify=verify, 

166 endpoint_url=urls[i], 

167 config=botocore.config.Config(tcp_keepalive=True, max_pool_connections=10), 

168 ), 

169 "key": keys[i], 

170 "secret_key": secret_keys[i], 

171 "url": urls[i], 

172 "host": h, 

173 "secure": urls[i].startswith("https://"), 

174 } 

175 

176 if i == 0: 

177 # Le premier cluster est celui par défaut 

178 __S3_DEFAULT_CLIENT = h 

179 

180 except KeyError as e: 

181 raise MissingEnvironmentError(e) 

182 except Exception as e: 

183 raise StorageError("S3", e) 

184 

185 try: 

186 host = bucket_name.split("@")[1] 

187 except IndexError: 

188 host = __S3_DEFAULT_CLIENT 

189 

190 bucket_name = bucket_name.split("@")[0] 

191 

192 if host not in __S3_CLIENTS: 

193 raise StorageError("S3", f"Unknown S3 cluster, according to host '{host}'") 

194 

195 return __S3_CLIENTS[host], bucket_name 

196 

197 

198def disconnect_s3_clients() -> None: 

199 """Clean S3 clients""" 

200 

201 global __S3_CLIENTS, __S3_DEFAULT_CLIENT 

202 __S3_CLIENTS = {} 

203 __S3_DEFAULT_CLIENT = None 

204 

205 

206def __get_ceph_ioctx(pool: str) -> "rados.Ioctx": 

207 """Get the CEPH IO context 

208 

209 Create it (client and context) if not already done 

210 

211 Args: 

212 pool (str): CEPH pool's name 

213 

214 Raises: 

215 MissingEnvironmentError: Missing CEPH storage informations 

216 StorageError: CEPH IO context configuration issue 

217 

218 Returns: 

219 rados.Ioctx: IO ceph context 

220 """ 

221 global __CEPH_CLIENT, __CEPH_IOCTXS 

222 

223 if __CEPH_CLIENT is None: 

224 try: 

225 __CEPH_CLIENT = rados.Rados( 

226 conffile=os.environ["ROK4_CEPH_CONFFILE"], 

227 clustername=os.environ["ROK4_CEPH_CLUSTERNAME"], 

228 name=os.environ["ROK4_CEPH_USERNAME"], 

229 ) 

230 

231 __CEPH_CLIENT.connect() 

232 

233 except KeyError as e: 

234 raise MissingEnvironmentError(e) 

235 except Exception as e: 

236 raise StorageError("CEPH", e) 

237 

238 if pool not in __CEPH_IOCTXS: 

239 try: 

240 __CEPH_IOCTXS[pool] = __CEPH_CLIENT.open_ioctx(pool) 

241 except Exception as e: 

242 raise StorageError("CEPH", e) 

243 

244 return __CEPH_IOCTXS[pool] 

245 

246 

247def disconnect_ceph_clients() -> None: 

248 """Clean CEPH clients""" 

249 global __CEPH_CLIENT, __CEPH_IOCTXS 

250 __CEPH_CLIENT = None 

251 __CEPH_IOCTXS = {} 

252 

253 

254def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]: 

255 """Extract storage type, the unprefixed path, the container and the basename from path (Default: FILE storage) 

256 

257 For a FILE storage, the tray is the directory and the basename is the file name. 

258 

259 For an object storage (CEPH or S3), the tray is the bucket or the pool and the basename is the object name. 

260 For a S3 bucket, format can be <bucket name>@<cluster name> to use several clusters. Cluster name is the host (without protocol) 

261 

262 Args: 

263 path (str): path to analyse 

264 

265 Returns: 

266 Tuple[StorageType, str, str, str]: storage type, unprefixed path, the container and the basename 

267 """ 

268 

269 if path.startswith("s3://"): 

270 bucket_name, object_name = path[5:].split("/", 1) 

271 return StorageType.S3, path[5:], bucket_name, object_name 

272 elif path.startswith("ceph://"): 

273 pool_name, object_name = path[7:].split("/", 1) 

274 return StorageType.CEPH, path[7:], pool_name, object_name 

275 elif path.startswith("file://"): 

276 return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:]) 

277 elif path.startswith("http://"): 

278 return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:]) 

279 elif path.startswith("https://"): 

280 return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:]) 

281 else: 

282 return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path) 

283 

284 

285def get_path_from_infos(storage_type: StorageType, *args) -> str: 

286 """Write full path from elements 

287 

288 Prefixed wih storage's type, elements are joined with a slash 

289 

290 Args: 

291 storage_type (StorageType): Storage's type for path 

292 

293 Returns: 

294 str: Full path 

295 """ 

296 return f"{storage_type.value}{os.path.join(*args)}" 

297 

298 

299def hash_file(path: str) -> str: 

300 """Process MD5 sum of the provided file 

301 

302 Args: 

303 path (str): path to file 

304 

305 Returns: 

306 str: hexadeimal MD5 sum 

307 """ 

308 

309 checker = hashlib.md5() 

310 

311 with open(path, "rb") as file: 

312 chunk = 0 

313 while chunk != b"": 

314 chunk = file.read(65536) 

315 checker.update(chunk) 

316 

317 return checker.hexdigest() 

318 

319 

320def get_data_str(path: str) -> str: 

321 """Load full data into a string 

322 

323 Args: 

324 path (str): path to data 

325 

326 Raises: 

327 MissingEnvironmentError: Missing object storage informations 

328 StorageError: Storage read issue 

329 FileNotFoundError: File or object does not exist 

330 NotImplementedError: Storage type not handled 

331 

332 Returns: 

333 str: Data content 

334 """ 

335 

336 return get_data_binary(path).decode("utf-8") 

337 

338 

339@lru_cache(maxsize=__LRU_SIZE) 

340def __get_cached_data_binary(path: str, ttl_hash: int, range: Tuple[int, int] = None) -> str: 

341 """Load data into a binary string, using a LRU cache 

342 

343 Args: 

344 path (str): path to data 

345 ttl_hash (int): time hash, to invalid cache 

346 range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None. 

347 

348 Raises: 

349 MissingEnvironmentError: Missing object storage informations 

350 StorageError: Storage read issue 

351 FileNotFoundError: File or object does not exist 

352 NotImplementedError: Storage type not handled 

353 

354 Returns: 

355 str: Data binary content 

356 """ 

357 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

358 

359 if storage_type == StorageType.S3: 

360 s3_client, bucket_name = __get_s3_client(tray_name) 

361 

362 try: 

363 if range is None: 

364 data = ( 

365 s3_client["client"] 

366 .get_object( 

367 Bucket=bucket_name, 

368 Key=base_name, 

369 )["Body"] 

370 .read() 

371 ) 

372 else: 

373 data = ( 

374 s3_client["client"] 

375 .get_object( 

376 Bucket=bucket_name, 

377 Key=base_name, 

378 Range=f"bytes={range[0]}-{range[0] + range[1] - 1}", 

379 )["Body"] 

380 .read() 

381 ) 

382 

383 except botocore.exceptions.ClientError as e: 

384 if e.response["Error"]["Code"] == "NoSuchKey": 

385 raise FileNotFoundError(f"{storage_type.value}{path}") 

386 else: 

387 raise StorageError("S3", e) 

388 

389 except Exception as e: 

390 raise StorageError("S3", e) 

391 

392 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

393 ioctx = __get_ceph_ioctx(tray_name) 

394 

395 try: 

396 if range is None: 

397 size, mtime = ioctx.stat(base_name) 

398 data = ioctx.read(base_name, size) 

399 else: 

400 data = ioctx.read(base_name, range[1], range[0]) 

401 

402 except rados.ObjectNotFound: 

403 raise FileNotFoundError(f"{storage_type.value}{path}") 

404 

405 except Exception as e: 

406 raise StorageError("CEPH", e) 

407 

408 elif storage_type == StorageType.FILE: 

409 try: 

410 f = open(path, "rb") 

411 if range is None: 

412 data = f.read() 

413 else: 

414 f.seek(range[0]) 

415 data = f.read(range[1]) 

416 

417 f.close() 

418 

419 except FileNotFoundError: 

420 raise FileNotFoundError(f"{storage_type.value}{path}") 

421 

422 except Exception as e: 

423 raise StorageError("FILE", e) 

424 

425 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS: 

426 if range is None: 

427 try: 

428 reponse = requests.get(f"{storage_type.value}{path}", stream=True) 

429 data = reponse.content 

430 if reponse.status_code == 404: 

431 raise FileNotFoundError(f"{storage_type.value}{path}") 

432 except Exception as e: 

433 raise StorageError(storage_type.name, e) 

434 else: 

435 raise NotImplementedError("Cannot get partial data for storage type HTTP(S)") 

436 

437 else: 

438 raise NotImplementedError(f"Cannot get data for storage type {storage_type.name}") 

439 

440 return data 

441 

442 

443def get_data_binary(path: str, range: Tuple[int, int] = None) -> str: 

444 """Load data into a binary string 

445 

446 This function uses a LRU cache, with a TTL of 5 minutes 

447 

448 Args: 

449 path (str): path to data 

450 range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None. 

451 

452 Raises: 

453 MissingEnvironmentError: Missing object storage informations 

454 StorageError: Storage read issue 

455 FileNotFoundError: File or object does not exist 

456 NotImplementedError: Storage type not handled 

457 

458 Returns: 

459 str: Data binary content 

460 """ 

461 return __get_cached_data_binary(path, __get_ttl_hash(), range) 

462 

463 

464def put_data_str(data: str, path: str) -> None: 

465 """Store string data into a file or an object 

466 

467 UTF-8 encoding is used for bytes conversion 

468 

469 Args: 

470 data (str): data to write 

471 path (str): destination path, where to write data 

472 

473 Raises: 

474 MissingEnvironmentError: Missing object storage informations 

475 StorageError: Storage write issue 

476 NotImplementedError: Storage type not handled 

477 """ 

478 

479 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

480 

481 if storage_type == StorageType.S3: 

482 s3_client, bucket_name = __get_s3_client(tray_name) 

483 

484 try: 

485 s3_client["client"].put_object( 

486 Body=data.encode("utf-8"), Bucket=bucket_name, Key=base_name 

487 ) 

488 except Exception as e: 

489 raise StorageError("S3", e) 

490 

491 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

492 ioctx = __get_ceph_ioctx(tray_name) 

493 

494 try: 

495 ioctx.write_full(base_name, data.encode("utf-8")) 

496 except Exception as e: 

497 raise StorageError("CEPH", e) 

498 

499 elif storage_type == StorageType.FILE: 

500 try: 

501 f = open(path, "w") 

502 f.write(data) 

503 f.close() 

504 except Exception as e: 

505 raise StorageError("FILE", e) 

506 

507 else: 

508 raise NotImplementedError(f"Cannot write data for storage type {storage_type.name}") 

509 

510 

511def get_size(path: str) -> int: 

512 """Get size of file or object 

513 

514 Args: 

515 path (str): path of file/object whom size is asked 

516 

517 Raises: 

518 MissingEnvironmentError: Missing object storage informations 

519 StorageError: Storage read issue 

520 NotImplementedError: Storage type not handled 

521 

522 Returns: 

523 int: file/object size, in bytes 

524 """ 

525 

526 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

527 

528 if storage_type == StorageType.S3: 

529 s3_client, bucket_name = __get_s3_client(tray_name) 

530 

531 try: 

532 size = s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)[ 

533 "ContentLength" 

534 ] 

535 return int(size) 

536 except Exception as e: 

537 raise StorageError("S3", e) 

538 

539 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

540 ioctx = __get_ceph_ioctx(tray_name) 

541 

542 try: 

543 size, mtime = ioctx.stat(base_name) 

544 return size 

545 except Exception as e: 

546 raise StorageError("CEPH", e) 

547 

548 elif storage_type == StorageType.FILE: 

549 try: 

550 file_stats = os.stat(path) 

551 return file_stats.st_size 

552 except Exception as e: 

553 raise StorageError("FILE", e) 

554 

555 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS: 

556 try: 

557 # Le stream=True permet de ne télécharger que le header initialement 

558 reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"] 

559 return reponse 

560 except Exception as e: 

561 raise StorageError(storage_type.name, e) 

562 

563 else: 

564 raise NotImplementedError(f"Cannot get size for storage type {storage_type.name}") 

565 

566 

567def exists(path: str) -> bool: 

568 """Do the file or object exist ? 

569 

570 Args: 

571 path (str): path of file/object to test 

572 

573 Raises: 

574 MissingEnvironmentError: Missing object storage informations 

575 StorageError: Storage read issue 

576 NotImplementedError: Storage type not handled 

577 

578 Returns: 

579 bool: file/object existing status 

580 """ 

581 

582 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

583 

584 if storage_type == StorageType.S3: 

585 s3_client, bucket_name = __get_s3_client(tray_name) 

586 

587 try: 

588 s3_client["client"].head_object(Bucket=bucket_name, Key=base_name) 

589 return True 

590 except botocore.exceptions.ClientError as e: 

591 if e.response["Error"]["Code"] == "404": 

592 return False 

593 else: 

594 raise StorageError("S3", e) 

595 

596 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

597 ioctx = __get_ceph_ioctx(tray_name) 

598 

599 try: 

600 ioctx.stat(base_name) 

601 return True 

602 except rados.ObjectNotFound: 

603 return False 

604 except Exception as e: 

605 raise StorageError("CEPH", e) 

606 

607 elif storage_type == StorageType.FILE: 

608 return os.path.exists(path) 

609 

610 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS: 

611 try: 

612 response = requests.get(storage_type.value + path, stream=True) 

613 if response.status_code == 200: 

614 return True 

615 else: 

616 return False 

617 except Exception as e: 

618 raise StorageError(storage_type.name, e) 

619 

620 else: 

621 raise NotImplementedError(f"Cannot test existence for storage type {storage_type.name}") 

622 

623 

624def remove(path: str) -> None: 

625 """Remove the file/object 

626 

627 Args: 

628 path (str): path of file/object to remove 

629 

630 Raises: 

631 MissingEnvironmentError: Missing object storage informations 

632 StorageError: Storage removal issue 

633 NotImplementedError: Storage type not handled 

634 """ 

635 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

636 

637 if storage_type == StorageType.S3: 

638 s3_client, bucket_name = __get_s3_client(tray_name) 

639 

640 try: 

641 s3_client["client"].delete_object(Bucket=bucket_name, Key=base_name) 

642 except Exception as e: 

643 raise StorageError("S3", e) 

644 

645 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

646 ioctx = __get_ceph_ioctx(tray_name) 

647 

648 try: 

649 ioctx.remove_object(base_name) 

650 except rados.ObjectNotFound: 

651 pass 

652 except Exception as e: 

653 raise StorageError("CEPH", e) 

654 

655 elif storage_type == StorageType.FILE: 

656 try: 

657 os.remove(path) 

658 except FileNotFoundError: 

659 pass 

660 except Exception as e: 

661 raise StorageError("FILE", e) 

662 

663 else: 

664 raise NotImplementedError(f"Cannot remove data for storage type {storage_type.name}") 

665 

666 

667def copy(from_path: str, to_path: str, from_md5: str = None) -> None: 

668 """Copy a file or object to a file or object place. If MD5 sum is provided, it is compared to sum after the copy. 

669 

670 Args: 

671 from_path (str): source file/object path, to copy 

672 to_path (str): destination file/object path 

673 from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None. 

674 

675 Raises: 

676 StorageError: Copy issue 

677 MissingEnvironmentError: Missing object storage informations 

678 NotImplementedError: Storage type not handled 

679 """ 

680 

681 from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path) 

682 to_type, to_path, to_tray, to_base_name = get_infos_from_path(to_path) 

683 

684 # Réalisation de la copie, selon les types de stockage 

685 if from_type == StorageType.FILE and to_type == StorageType.FILE: 

686 try: 

687 if to_tray != "": 

688 os.makedirs(to_tray, exist_ok=True) 

689 

690 copyfile(from_path, to_path) 

691 

692 if from_md5 is not None: 

693 to_md5 = hash_file(to_path) 

694 if to_md5 != from_md5: 

695 raise StorageError( 

696 "FILE", 

697 f"Invalid MD5 sum control for copy file {from_path} to {to_path} : {from_md5} != {to_md5}", 

698 ) 

699 

700 except Exception as e: 

701 raise StorageError("FILE", f"Cannot copy file {from_path} to {to_path} : {e}") 

702 

703 elif from_type == StorageType.S3 and to_type == StorageType.FILE: 

704 s3_client, from_bucket = __get_s3_client(from_tray) 

705 

706 try: 

707 if to_tray != "": 

708 os.makedirs(to_tray, exist_ok=True) 

709 

710 s3_client["client"].download_file(from_bucket, from_base_name, to_path) 

711 

712 if from_md5 is not None: 

713 to_md5 = hash_file(to_path) 

714 if to_md5 != from_md5: 

715 raise StorageError( 

716 "S3 and FILE", 

717 f"Invalid MD5 sum control for copy S3 object {from_path} to file {to_path} : {from_md5} != {to_md5}", 

718 ) 

719 

720 except Exception as e: 

721 raise StorageError( 

722 "S3 and FILE", f"Cannot copy S3 object {from_path} to file {to_path} : {e}" 

723 ) 

724 

725 elif from_type == StorageType.FILE and to_type == StorageType.S3: 

726 s3_client, to_bucket = __get_s3_client(to_tray) 

727 

728 try: 

729 s3_client["client"].upload_file(from_path, to_bucket, to_base_name) 

730 

731 if from_md5 is not None: 

732 to_md5 = ( 

733 s3_client["client"] 

734 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"] 

735 .strip('"') 

736 ) 

737 if to_md5 != from_md5: 

738 raise StorageError( 

739 "FILE and S3", 

740 f"Invalid MD5 sum control for copy file {from_path} to S3 object {to_path} : {from_md5} != {to_md5}", 

741 ) 

742 except Exception as e: 

743 raise StorageError( 

744 "FILE and S3", f"Cannot copy file {from_path} to S3 object {to_path} : {e}" 

745 ) 

746 

747 elif from_type == StorageType.S3 and to_type == StorageType.S3: 

748 from_s3_client, from_bucket = __get_s3_client(from_tray) 

749 to_s3_client, to_bucket = __get_s3_client(to_tray) 

750 

751 try: 

752 if to_s3_client["host"] == from_s3_client["host"]: 

753 to_s3_client["client"].copy( 

754 {"Bucket": from_bucket, "Key": from_base_name}, to_bucket, to_base_name 

755 ) 

756 else: 

757 with tempfile.NamedTemporaryFile("w+b") as f: 

758 from_s3_client["client"].download_fileobj(from_bucket, from_base_name, f) 

759 to_s3_client["client"].upload_file(f.name, to_bucket, to_base_name) 

760 

761 if from_md5 is not None: 

762 to_md5 = ( 

763 to_s3_client["client"] 

764 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"] 

765 .strip('"') 

766 ) 

767 if to_md5 != from_md5: 

768 raise StorageError( 

769 "S3", 

770 f"Invalid MD5 sum control for copy S3 object {from_path} to {to_path} : {from_md5} != {to_md5}", 

771 ) 

772 

773 except Exception as e: 

774 raise StorageError("S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}") 

775 

776 elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.FILE: 

777 ioctx = __get_ceph_ioctx(from_tray) 

778 

779 if from_md5 is not None: 

780 checker = hashlib.md5() 

781 

782 try: 

783 if to_tray != "": 

784 os.makedirs(to_tray, exist_ok=True) 

785 f = open(to_path, "wb") 

786 

787 offset = 0 

788 size = 0 

789 

790 while True: 

791 chunk = ioctx.read(from_base_name, 65536, offset) 

792 size = len(chunk) 

793 offset += size 

794 f.write(chunk) 

795 

796 if from_md5 is not None: 

797 checker.update(chunk) 

798 

799 if size < 65536: 

800 break 

801 

802 f.close() 

803 

804 if from_md5 is not None and from_md5 != checker.hexdigest(): 

805 raise StorageError( 

806 "CEPH and FILE", 

807 f"Invalid MD5 sum control for copy CEPH object {from_path} to file {to_path} : {from_md5} != {checker.hexdigest()}", 

808 ) 

809 

810 except Exception as e: 

811 raise StorageError( 

812 "CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}" 

813 ) 

814 

815 elif from_type == StorageType.FILE and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

816 ioctx = __get_ceph_ioctx(to_tray) 

817 

818 if from_md5 is not None: 

819 checker = hashlib.md5() 

820 

821 try: 

822 f = open(from_path, "rb") 

823 

824 offset = 0 

825 size = 0 

826 

827 while True: 

828 chunk = f.read(65536) 

829 size = len(chunk) 

830 ioctx.write(to_base_name, chunk, offset) 

831 offset += size 

832 

833 if from_md5 is not None: 

834 checker.update(chunk) 

835 

836 if size < 65536: 

837 break 

838 

839 f.close() 

840 

841 if from_md5 is not None and from_md5 != checker.hexdigest(): 

842 raise StorageError( 

843 "FILE and CEPH", 

844 f"Invalid MD5 sum control for copy file {from_path} to CEPH object {to_path} : {from_md5} != {checker.hexdigest()}", 

845 ) 

846 

847 except Exception as e: 

848 raise StorageError( 

849 "FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}" 

850 ) 

851 

852 elif from_type == StorageType.CEPH and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

853 from_ioctx = __get_ceph_ioctx(from_tray) 

854 to_ioctx = __get_ceph_ioctx(to_tray) 

855 

856 if from_md5 is not None: 

857 checker = hashlib.md5() 

858 

859 try: 

860 offset = 0 

861 size = 0 

862 

863 while True: 

864 chunk = from_ioctx.read(from_base_name, 65536, offset) 

865 size = len(chunk) 

866 to_ioctx.write(to_base_name, chunk, offset) 

867 offset += size 

868 

869 if from_md5 is not None: 

870 checker.update(chunk) 

871 

872 if size < 65536: 

873 break 

874 

875 if from_md5 is not None and from_md5 != checker.hexdigest(): 

876 raise StorageError( 

877 "FILE and CEPH", 

878 f"Invalid MD5 sum control for copy CEPH object {from_path} to {to_path} : {from_md5} != {checker.hexdigest()}", 

879 ) 

880 

881 except Exception as e: 

882 raise StorageError("CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}") 

883 

884 elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.S3: 

885 from_ioctx = __get_ceph_ioctx(from_tray) 

886 

887 s3_client, to_bucket = __get_s3_client(to_tray) 

888 

889 if from_md5 is not None: 

890 checker = hashlib.md5() 

891 

892 try: 

893 offset = 0 

894 size = 0 

895 

896 with tempfile.NamedTemporaryFile("w+b", delete=False) as f: 

897 name_tmp = f.name 

898 while True: 

899 chunk = from_ioctx.read(from_base_name, 65536, offset) 

900 size = len(chunk) 

901 offset += size 

902 f.write(chunk) 

903 

904 if from_md5 is not None: 

905 checker.update(chunk) 

906 

907 if size < 65536: 

908 break 

909 

910 s3_client["client"].upload_file(name_tmp, to_bucket, to_base_name) 

911 

912 os.remove(name_tmp) 

913 

914 if from_md5 is not None and from_md5 != checker.hexdigest(): 

915 raise StorageError( 

916 "CEPH and S3", 

917 f"Invalid MD5 sum control for copy CEPH object {from_path} to S3 object {to_path} : {from_md5} != {checker.hexdigest()}", 

918 ) 

919 

920 except Exception as e: 

921 raise StorageError( 

922 "CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}" 

923 ) 

924 

925 elif ( 

926 from_type == StorageType.HTTP or from_type == StorageType.HTTPS 

927 ) and to_type == StorageType.FILE: 

928 try: 

929 response = requests.get(from_type.value + from_path, stream=True) 

930 with open(to_path, "wb") as f: 

931 for chunk in response.iter_content(chunk_size=65536): 

932 if chunk: 

933 f.write(chunk) 

934 

935 except Exception as e: 

936 raise StorageError( 

937 "HTTP(S) and FILE", 

938 f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}", 

939 ) 

940 

941 elif ( 

942 (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) 

943 and to_type == StorageType.CEPH 

944 and CEPH_RADOS_AVAILABLE 

945 ): 

946 to_ioctx = __get_ceph_ioctx(to_tray) 

947 

948 try: 

949 response = requests.get(from_type.value + from_path, stream=True) 

950 offset = 0 

951 for chunk in response.iter_content(chunk_size=65536): 

952 if chunk: 

953 size = len(chunk) 

954 to_ioctx.write(to_base_name, chunk, offset) 

955 offset += size 

956 

957 except Exception as e: 

958 raise StorageError( 

959 "HTTP(S) and CEPH", 

960 f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}", 

961 ) 

962 

963 elif ( 

964 from_type == StorageType.HTTP or from_type == StorageType.HTTPS 

965 ) and to_type == StorageType.S3: 

966 to_s3_client, to_bucket = __get_s3_client(to_tray) 

967 

968 try: 

969 response = requests.get(from_type.value + from_path, stream=True) 

970 with tempfile.NamedTemporaryFile("w+b", delete=False) as f: 

971 name_fich = f.name 

972 for chunk in response.iter_content(chunk_size=65536): 

973 if chunk: 

974 f.write(chunk) 

975 

976 to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name) 

977 

978 os.remove(name_fich) 

979 

980 except Exception as e: 

981 raise StorageError( 

982 "HTTP(S) and S3", 

983 f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}", 

984 ) 

985 

986 else: 

987 raise NotImplementedError( 

988 f"Cannot copy data from storage type {from_type.name} to storage type {to_type.name}" 

989 ) 

990 

991 

992def link(target_path: str, link_path: str, hard: bool = False) -> None: 

993 """Create a symbolic link 

994 

995 Args: 

996 target_path (str): file/object to link 

997 link_path (str): link to create 

998 hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False. 

999 

1000 Raises: 

1001 StorageError: link issue 

1002 MissingEnvironmentError: Missing object storage informations 

1003 NotImplementedError: Storage type not handled 

1004 """ 

1005 

1006 target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path) 

1007 link_type, link_path, link_tray, link_base_name = get_infos_from_path(link_path) 

1008 

1009 if target_type != link_type: 

1010 raise StorageError( 

1011 f"{target_type.name} and {link_type.name}", 

1012 "Cannot make link between two different storage types", 

1013 ) 

1014 

1015 if hard and target_type != StorageType.FILE: 

1016 raise StorageError(target_type.name, "Hard link is available only for FILE storage") 

1017 

1018 # Réalisation du lien, selon les types de stockage 

1019 if target_type == StorageType.S3: 

1020 target_s3_client, target_bucket = __get_s3_client(target_tray) 

1021 link_s3_client, link_bucket = __get_s3_client(link_tray) 

1022 

1023 if target_s3_client["host"] != link_s3_client["host"]: 

1024 raise StorageError( 

1025 "S3", 

1026 f"Cannot make link {link_path} -> {target_path} : link works only on the same S3 cluster", 

1027 ) 

1028 

1029 try: 

1030 target_s3_client["client"].put_object( 

1031 Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode(), 

1032 Bucket=link_bucket, 

1033 Key=link_base_name, 

1034 ) 

1035 except Exception as e: 

1036 raise StorageError("S3", e) 

1037 

1038 elif target_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: 

1039 ioctx = __get_ceph_ioctx(link_tray) 

1040 

1041 try: 

1042 ioctx.write_full(link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode()) 

1043 except Exception as e: 

1044 raise StorageError("CEPH", e) 

1045 

1046 elif target_type == StorageType.FILE: 

1047 try: 

1048 to_tray = get_infos_from_path(link_path)[2] 

1049 if to_tray != "": 

1050 os.makedirs(to_tray, exist_ok=True) 

1051 

1052 if exists(link_path): 

1053 remove(link_path) 

1054 if hard: 

1055 os.link(target_path, link_path) 

1056 else: 

1057 os.symlink(target_path, link_path) 

1058 except Exception as e: 

1059 raise StorageError("FILE", e) 

1060 

1061 else: 

1062 raise NotImplementedError(f"Cannot make link for storage type {target_type.name}") 

1063 

1064 

1065def get_osgeo_path(path: str) -> str: 

1066 """Return GDAL/OGR Open compliant path and configure storage access 

1067 

1068 For a S3 input path, endpoint, access and secret keys are set and path is built with "/vsis3" root. 

1069 

1070 For a FILE input path, only storage prefix is removed 

1071 

1072 Args: 

1073 path (str): Source path 

1074 

1075 Raises: 

1076 NotImplementedError: Storage type not handled 

1077 

1078 Returns: 

1079 str: GDAL/OGR Open compliant path 

1080 """ 

1081 

1082 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path) 

1083 

1084 if storage_type == StorageType.S3 and GDAL_AVAILABLE: 

1085 s3_client, bucket_name = __get_s3_client(tray_name) 

1086 

1087 gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", s3_client["secret_key"]) 

1088 gdal.SetConfigOption("AWS_ACCESS_KEY_ID", s3_client["key"]) 

1089 gdal.SetConfigOption("AWS_S3_ENDPOINT", s3_client["host"]) 

1090 gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE") 

1091 if not s3_client["secure"]: 

1092 gdal.SetConfigOption("AWS_HTTPS", "NO") 

1093 

1094 return f"/vsis3/{bucket_name}/{base_name}" 

1095 

1096 elif storage_type == StorageType.FILE: 

1097 return unprefixed_path 

1098 

1099 else: 

1100 raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}") 

1101 

1102 

1103def size_path(path: str) -> int: 

1104 """Return the size of the given path (or, for the CEPH, the sum of the size of each object of the .list) 

1105 

1106 Args: 

1107 path (str): Source path 

1108 

1109 Raises: 

1110 StorageError: Unhandled link or link issue 

1111 MissingEnvironmentError: Missing object storage informations 

1112 NotImplementedError: Storage type not handled 

1113 

1114 Returns: 

1115 int: size of the path 

1116 """ 

1117 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path) 

1118 

1119 if storage_type == StorageType.FILE: 

1120 try: 

1121 total = 0 

1122 with os.scandir(unprefixed_path) as it: 

1123 for entry in it: 

1124 if entry.is_file(): 

1125 total += entry.stat().st_size 

1126 elif entry.is_dir(): 

1127 total += size_path(entry.path) 

1128 

1129 except Exception as e: 

1130 raise StorageError("FILE", e) 

1131 

1132 elif storage_type == StorageType.S3: 

1133 s3_client, bucket_name = __get_s3_client(tray_name) 

1134 

1135 try: 

1136 paginator = s3_client["client"].get_paginator("list_objects_v2") 

1137 pages = paginator.paginate( 

1138 Bucket=bucket_name, 

1139 Prefix=base_name + "/", 

1140 PaginationConfig={ 

1141 "PageSize": 10000, 

1142 }, 

1143 ) 

1144 total = 0 

1145 for page in pages: 

1146 for key in page["Contents"]: 

1147 total += key["Size"] 

1148 

1149 except Exception as e: 

1150 raise StorageError("S3", e) 

1151 

1152 else: 

1153 raise NotImplementedError( 

1154 f"Cannot get prefix path size for storage type {storage_type.name}" 

1155 ) 

1156 

1157 return total