Coverage for src/rok4/Storage.py: 82%

498 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-01-29 10:29 +0100

1"""Provide functions to use read or write 

2 

3Available storage types are : 

4- S3 (path are preffixed with `s3://`) 

5- CEPH (path are prefixed with `ceph://`) 

6- FILE (path are prefixed with `file://`, but it is the default paths' interpretation) 

7- HTTP (path are prefixed with `http://`) 

8- HTTPS (path are prefixed with `https://`) 

9 

10According to functions, all storage types are not necessarily available. 

11 

12Using CEPH storage requires environment variables : 

13- ROK4_CEPH_CONFFILE 

14- ROK4_CEPH_USERNAME 

15- ROK4_CEPH_CLUSTERNAME 

16 

17Using S3 storage requires environment variables : 

18- ROK4_S3_KEY 

19- ROK4_S3_SECRETKEY 

20- ROK4_S3_URL 

21 

22To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements 

23 

24Example: work with 2 S3 clusters: 

25 

26- ROK4_S3_KEY=KEY1,KEY2 

27- ROK4_S3_SECRETKEY=SKEY1,SKEY2 

28- ROK4_S3_URL=https://s3.storage.fr,https://s4.storage.fr 

29 

30To precise the cluster to use, bucket name should be bucket_name@s3.storage.fr or bucket_name@s4.storage.fr. If no host is defined (no @) in the bucket name, first S3 cluster is used 

31""" 

32 

33import boto3 

34import botocore.exceptions 

35import tempfile 

36import re 

37import os 

38import rados 

39import hashlib 

40import requests 

41from typing import Dict, List, Tuple, Union 

42from enum import Enum 

43from shutil import copyfile 

44from osgeo import gdal 

45 

46gdal.UseExceptions() 

47 

48from rok4.Exceptions import * 

49 

50 

51class StorageType(Enum): 

52 FILE = "file://" 

53 S3 = "s3://" 

54 CEPH = "ceph://" 

55 HTTP = "http://" 

56 HTTPS = "https://" 

57 

58 

59__S3_CLIENTS = {} 

60__S3_DEFAULT_CLIENT = None 

61 

62 

63def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]: 

64 """Get the S3 client 

65 

66 Create it if not already done 

67 

68 Args: 

69 bucket_name (str): S3 bucket name. Could be just the bucket name, or <bucket name>@<cluster host> 

70 

71 Raises: 

72 MissingEnvironmentError: Missing S3 storage informations 

73 StorageError: S3 client configuration issue 

74 

75 Returns: 

76 Tuple[Dict[str, Union['boto3.client',str]], str, str]: the S3 informations (client, host, key, secret) and the simple bucket name 

77 """ 

78 

79 global __S3_CLIENTS, __S3_DEFAULT_CLIENT 

80 

81 if not __S3_CLIENTS: 

82 # C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement 

83 try: 

84 keys = os.environ["ROK4_S3_KEY"].split(",") 

85 secret_keys = os.environ["ROK4_S3_SECRETKEY"].split(",") 

86 urls = os.environ["ROK4_S3_URL"].split(",") 

87 

88 if len(keys) != len(secret_keys) or len(keys) != len(urls): 

89 raise StorageError( 

90 "S3", 

91 "S3 informations in environment variables are inconsistent : same number of element in each list is required", 

92 ) 

93 

94 for i in range(len(keys)): 

95 h = re.sub("https?://", "", urls[i]) 

96 

97 if h in __S3_CLIENTS: 

98 raise StorageError("S3", "A S3 cluster is defined twice (based on URL)") 

99 

100 __S3_CLIENTS[h] = { 

101 "client": boto3.client( 

102 "s3", 

103 aws_access_key_id=keys[i], 

104 aws_secret_access_key=secret_keys[i], 

105 endpoint_url=urls[i], 

106 ), 

107 "key": keys[i], 

108 "secret_key": secret_keys[i], 

109 "url": urls[i], 

110 "host": h, 

111 } 

112 

113 if i == 0: 

114 # Le premier cluster est celui par défaut 

115 __S3_DEFAULT_CLIENT = h 

116 

117 except KeyError as e: 

118 raise MissingEnvironmentError(e) 

119 except Exception as e: 

120 raise StorageError("S3", e) 

121 

122 try: 

123 host = bucket_name.split("@")[1] 

124 except IndexError: 

125 host = __S3_DEFAULT_CLIENT 

126 

127 bucket_name = bucket_name.split("@")[0] 

128 

129 if host not in __S3_CLIENTS: 

130 raise StorageError("S3", f"Unknown S3 cluster, according to host '{host}'") 

131 

132 return __S3_CLIENTS[host], bucket_name 

133 

134 

135def disconnect_s3_clients() -> None: 

136 """Clean S3 clients""" 

137 

138 global __S3_CLIENTS, __S3_DEFAULT_CLIENT 

139 __S3_CLIENTS = {} 

140 __S3_DEFAULT_CLIENT = None 

141 

142 

143__CEPH_CLIENT = None 

144__CEPH_IOCTXS = {} 

145 

146 

147def __get_ceph_ioctx(pool: str) -> "rados.Ioctx": 

148 """Get the CEPH IO context 

149 

150 Create it (client and context) if not already done 

151 

152 Args: 

153 pool (str): CEPH pool's name 

154 

155 Raises: 

156 MissingEnvironmentError: Missing CEPH storage informations 

157 StorageError: CEPH IO context configuration issue 

158 

159 Returns: 

160 rados.Ioctx: IO ceph context 

161 """ 

162 global __CEPH_CLIENT, __CEPH_IOCTXS 

163 

164 if __CEPH_CLIENT is None: 

165 try: 

166 __CEPH_CLIENT = rados.Rados( 

167 conffile=os.environ["ROK4_CEPH_CONFFILE"], 

168 clustername=os.environ["ROK4_CEPH_CLUSTERNAME"], 

169 name=os.environ["ROK4_CEPH_USERNAME"], 

170 ) 

171 

172 __CEPH_CLIENT.connect() 

173 

174 except KeyError as e: 

175 raise MissingEnvironmentError(e) 

176 except Exception as e: 

177 raise StorageError("CEPH", e) 

178 

179 if pool not in __CEPH_IOCTXS: 

180 try: 

181 __CEPH_IOCTXS[pool] = __CEPH_CLIENT.open_ioctx(pool) 

182 except Exception as e: 

183 raise StorageError("CEPH", e) 

184 

185 return __CEPH_IOCTXS[pool] 

186 

187 

188def disconnect_ceph_clients() -> None: 

189 """Clean CEPH clients""" 

190 global __CEPH_CLIENT, __CEPH_IOCTXS 

191 __CEPH_CLIENT = None 

192 __CEPH_IOCTXS = {} 

193 

194 

195__OBJECT_SYMLINK_SIGNATURE = "SYMLINK#" 

196 

197 

198def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]: 

199 """Extract storage type, the unprefixed path, the container and the basename from path (Default: FILE storage) 

200 

201 For a FILE storage, the tray is the directory and the basename is the file name. 

202 

203 For an object storage (CEPH or S3), the tray is the bucket or the pool and the basename is the object name. 

204 For a S3 bucket, format can be <bucket name>@<cluster name> to use several clusters. Cluster name is the host (without protocol) 

205 

206 Args: 

207 path (str): path to analyse 

208 

209 Returns: 

210 Tuple[StorageType, str, str, str]: storage type, unprefixed path, the container and the basename 

211 """ 

212 

213 if path.startswith("s3://"): 

214 bucket_name, object_name = path[5:].split("/", 1) 

215 return StorageType.S3, path[5:], bucket_name, object_name 

216 elif path.startswith("ceph://"): 

217 pool_name, object_name = path[7:].split("/", 1) 

218 return StorageType.CEPH, path[7:], pool_name, object_name 

219 elif path.startswith("file://"): 

220 return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:]) 

221 elif path.startswith("http://"): 

222 return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:]) 

223 elif path.startswith("https://"): 

224 return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:]) 

225 else: 

226 return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path) 

227 

228 

229def get_path_from_infos(storage_type: StorageType, *args) -> str: 

230 """Write full path from elements 

231 

232 Prefixed wih storage's type, elements are joined with a slash 

233 

234 Args: 

235 storage_type (StorageType): Storage's type for path 

236 

237 Returns: 

238 str: Full path 

239 """ 

240 return f"{storage_type.value}{os.path.join(*args)}" 

241 

242 

243def hash_file(path: str) -> str: 

244 """Process MD5 sum of the provided file 

245 

246 Args: 

247 path (str): path to file 

248 

249 Returns: 

250 str: hexadeimal MD5 sum 

251 """ 

252 

253 checker = hashlib.md5() 

254 

255 with open(path, "rb") as file: 

256 chunk = 0 

257 while chunk != b"": 

258 chunk = file.read(65536) 

259 checker.update(chunk) 

260 

261 return checker.hexdigest() 

262 

263 

264def get_data_str(path: str) -> str: 

265 """Load full data into a string 

266 

267 Args: 

268 path (str): path to data 

269 

270 Raises: 

271 MissingEnvironmentError: Missing object storage informations 

272 StorageError: Storage read issue 

273 FileNotFoundError: File or object does not exist 

274 

275 Returns: 

276 str: Data content 

277 """ 

278 

279 return get_data_binary(path).decode("utf-8") 

280 

281 

282def get_data_binary(path: str, range: Tuple[int, int] = None) -> str: 

283 """Load data into a binary string 

284 

285 Args: 

286 path (str): path to data 

287 range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None. 

288 

289 Raises: 

290 MissingEnvironmentError: Missing object storage informations 

291 StorageError: Storage read issue 

292 FileNotFoundError: File or object does not exist 

293 

294 Returns: 

295 str: Data binary content 

296 """ 

297 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

298 

299 if storage_type == StorageType.S3: 

300 s3_client, bucket_name = __get_s3_client(tray_name) 

301 

302 try: 

303 if range is None: 

304 data = ( 

305 s3_client["client"] 

306 .get_object( 

307 Bucket=bucket_name, 

308 Key=base_name, 

309 )["Body"] 

310 .read() 

311 ) 

312 else: 

313 data = ( 

314 s3_client["client"] 

315 .get_object( 

316 Bucket=bucket_name, 

317 Key=base_name, 

318 Range=f"bytes={range[0]}-{range[0] + range[1] - 1}", 

319 )["Body"] 

320 .read() 

321 ) 

322 

323 except botocore.exceptions.ClientError as e: 

324 if e.response["Error"]["Code"] == "404": 

325 raise FileNotFoundError(f"{storage_type.value}{path}") 

326 else: 

327 raise StorageError("S3", e) 

328 

329 except Exception as e: 

330 raise StorageError("S3", e) 

331 

332 elif storage_type == StorageType.CEPH: 

333 ioctx = __get_ceph_ioctx(tray_name) 

334 

335 try: 

336 if range is None: 

337 size, mtime = ioctx.stat(base_name) 

338 data = ioctx.read(base_name, size) 

339 else: 

340 data = ioctx.read(base_name, range[1], range[0]) 

341 

342 except rados.ObjectNotFound as e: 

343 raise FileNotFoundError(f"{storage_type.value}{path}") 

344 

345 except Exception as e: 

346 raise StorageError("CEPH", e) 

347 

348 elif storage_type == StorageType.FILE: 

349 try: 

350 f = open(path, "rb") 

351 if range is None: 

352 data = f.read() 

353 else: 

354 f.seek(range[0]) 

355 data = f.read(range[1]) 

356 

357 f.close() 

358 

359 except FileNotFoundError as e: 

360 raise FileNotFoundError(f"{storage_type.value}{path}") 

361 

362 except Exception as e: 

363 raise StorageError("FILE", e) 

364 

365 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS: 

366 

367 if range is None : 

368 try: 

369 reponse = requests.get(f"{storage_type.value}{path}", stream=True) 

370 data = reponse.content 

371 if reponse.status_code == 404 : 

372 raise FileNotFoundError(f"{storage_type.value}{path}") 

373 except Exception as e: 

374 raise StorageError(storage_type.name, e) 

375 else : 

376 raise NotImplementedError 

377 

378 else: 

379 raise StorageError("UNKNOWN", "Unhandled storage type to read binary data") 

380 

381 return data 

382 

383 

384def put_data_str(data: str, path: str) -> None: 

385 """Store string data into a file or an object 

386 

387 UTF-8 encoding is used for bytes conversion 

388 

389 Args: 

390 data (str): data to write 

391 path (str): destination path, where to write data 

392 

393 Raises: 

394 MissingEnvironmentError: Missing object storage informations 

395 StorageError: Storage write issue 

396 """ 

397 

398 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

399 

400 if storage_type == StorageType.S3: 

401 s3_client, bucket_name = __get_s3_client(tray_name) 

402 

403 try: 

404 s3_client["client"].put_object( 

405 Body=data.encode("utf-8"), Bucket=bucket_name, Key=base_name 

406 ) 

407 except Exception as e: 

408 raise StorageError("S3", e) 

409 

410 elif storage_type == StorageType.CEPH: 

411 ioctx = __get_ceph_ioctx(tray_name) 

412 

413 try: 

414 ioctx.write_full(base_name, data.encode("utf-8")) 

415 except Exception as e: 

416 raise StorageError("CEPH", e) 

417 

418 elif storage_type == StorageType.FILE: 

419 try: 

420 f = open(path, "w") 

421 f.write(data) 

422 f.close() 

423 except Exception as e: 

424 raise StorageError("FILE", e) 

425 

426 else: 

427 raise StorageError("UNKNOWN", "Unhandled storage type to write string data") 

428 

429 

430def get_size(path: str) -> int: 

431 """Get size of file or object 

432 

433 Args: 

434 path (str): path of file/object whom size is asked 

435 

436 Raises: 

437 MissingEnvironmentError: Missing object storage informations 

438 StorageError: Storage read issue 

439 

440 Returns: 

441 int: file/object size, in bytes 

442 """ 

443 

444 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

445 

446 if storage_type == StorageType.S3: 

447 s3_client, bucket_name = __get_s3_client(tray_name) 

448 

449 try: 

450 size = s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)[ 

451 "ContentLength" 

452 ] 

453 return int(size) 

454 except Exception as e: 

455 raise StorageError("S3", e) 

456 

457 elif storage_type == StorageType.CEPH: 

458 ioctx = __get_ceph_ioctx(tray_name) 

459 

460 try: 

461 size, mtime = ioctx.stat(base_name) 

462 return size 

463 except Exception as e: 

464 raise StorageError("CEPH", e) 

465 

466 elif storage_type == StorageType.FILE: 

467 try: 

468 file_stats = os.stat(path) 

469 return file_stats.st_size 

470 except Exception as e: 

471 raise StorageError("FILE", e) 

472 

473 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS: 

474 

475 try: 

476 # Le stream=True permet de ne télécharger que le header initialement 

477 reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"] 

478 return reponse 

479 except Exception as e: 

480 raise StorageError(storage_type.name, e) 

481 

482 else: 

483 raise StorageError("UNKNOWN", "Unhandled storage type to get size") 

484 

485 

486def exists(path: str) -> bool: 

487 """Do the file or object exist ? 

488 

489 Args: 

490 path (str): path of file/object to test 

491 

492 Raises: 

493 MissingEnvironmentError: Missing object storage informations 

494 StorageError: Storage read issue 

495 

496 Returns: 

497 bool: file/object existing status 

498 """ 

499 

500 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

501 

502 if storage_type == StorageType.S3: 

503 s3_client, bucket_name = __get_s3_client(tray_name) 

504 

505 try: 

506 s3_client["client"].head_object(Bucket=bucket_name, Key=base_name) 

507 return True 

508 except botocore.exceptions.ClientError as e: 

509 if e.response["Error"]["Code"] == "404": 

510 return False 

511 else: 

512 raise StorageError("S3", e) 

513 

514 elif storage_type == StorageType.CEPH: 

515 ioctx = __get_ceph_ioctx(tray_name) 

516 

517 try: 

518 ioctx.stat(base_name) 

519 return True 

520 except rados.ObjectNotFound as e: 

521 return False 

522 except Exception as e: 

523 raise StorageError("CEPH", e) 

524 

525 elif storage_type == StorageType.FILE: 

526 return os.path.exists(path) 

527 

528 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS: 

529 

530 try: 

531 response = requests.get(storage_type.value + path, stream=True) 

532 if response.status_code == 200 : 

533 return True 

534 else : 

535 return False 

536 except Exception as e: 

537 raise StorageError(storage_type.name, e) 

538 

539 else: 

540 raise StorageError("UNKNOWN", "Unhandled storage type to test if exists") 

541 

542 

543def remove(path: str) -> None: 

544 """Remove the file/object 

545 

546 Args: 

547 path (str): path of file/object to remove 

548 

549 Raises: 

550 MissingEnvironmentError: Missing object storage informations 

551 StorageError: Storage removal issue 

552 """ 

553 storage_type, path, tray_name, base_name = get_infos_from_path(path) 

554 

555 if storage_type == StorageType.S3: 

556 s3_client, bucket_name = __get_s3_client(tray_name) 

557 

558 try: 

559 s3_client["client"].delete_object(Bucket=bucket_name, Key=base_name) 

560 except Exception as e: 

561 raise StorageError("S3", e) 

562 

563 elif storage_type == StorageType.CEPH: 

564 ioctx = __get_ceph_ioctx(tray_name) 

565 

566 try: 

567 ioctx.remove_object(base_name) 

568 except rados.ObjectNotFound as e: 

569 pass 

570 except Exception as e: 

571 raise StorageError("CEPH", e) 

572 

573 elif storage_type == StorageType.FILE: 

574 try: 

575 os.remove(path) 

576 except FileNotFoundError as e: 

577 pass 

578 except Exception as e: 

579 raise StorageError("FILE", e) 

580 

581 else: 

582 raise StorageError("UNKNOWN", "Unhandled storage type to remove things") 

583 

584 

585def copy(from_path: str, to_path: str, from_md5: str = None) -> None: 

586 """Copy a file or object to a file or object place. If MD5 sum is provided, it is compared to sum after the copy. 

587 

588 Args: 

589 from_path (str): source file/object path, to copy 

590 to_path (str): destination file/object path 

591 from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None. 

592 

593 Raises: 

594 StorageError: Unhandled copy or copy issue 

595 MissingEnvironmentError: Missing object storage informations 

596 """ 

597 

598 from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path) 

599 to_type, to_path, to_tray, to_base_name = get_infos_from_path(to_path) 

600 

601 # Réalisation de la copie, selon les types de stockage 

602 if from_type == StorageType.FILE and to_type == StorageType.FILE: 

603 try: 

604 if to_tray != "": 

605 os.makedirs(to_tray, exist_ok=True) 

606 

607 copyfile(from_path, to_path) 

608 

609 if from_md5 is not None: 

610 to_md5 = hash_file(to_path) 

611 if to_md5 != from_md5: 

612 raise StorageError( 

613 f"FILE", 

614 f"Invalid MD5 sum control for copy file {from_path} to {to_path} : {from_md5} != {to_md5}", 

615 ) 

616 

617 except Exception as e: 

618 raise StorageError(f"FILE", f"Cannot copy file {from_path} to {to_path} : {e}") 

619 

620 elif from_type == StorageType.S3 and to_type == StorageType.FILE: 

621 s3_client, from_bucket = __get_s3_client(from_tray) 

622 

623 try: 

624 if to_tray != "": 

625 os.makedirs(to_tray, exist_ok=True) 

626 

627 s3_client["client"].download_file(from_bucket, from_base_name, to_path) 

628 

629 if from_md5 is not None: 

630 to_md5 = hash_file(to_path) 

631 if to_md5 != from_md5: 

632 raise StorageError( 

633 "S3 and FILE", 

634 f"Invalid MD5 sum control for copy S3 object {from_path} to file {to_path} : {from_md5} != {to_md5}", 

635 ) 

636 

637 except Exception as e: 

638 raise StorageError( 

639 f"S3 and FILE", f"Cannot copy S3 object {from_path} to file {to_path} : {e}" 

640 ) 

641 

642 elif from_type == StorageType.FILE and to_type == StorageType.S3: 

643 s3_client, to_bucket = __get_s3_client(to_tray) 

644 

645 try: 

646 s3_client["client"].upload_file(from_path, to_bucket, to_base_name) 

647 

648 if from_md5 is not None: 

649 to_md5 = ( 

650 s3_client["client"] 

651 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"] 

652 .strip('"') 

653 ) 

654 if to_md5 != from_md5: 

655 raise StorageError( 

656 f"FILE and S3", 

657 f"Invalid MD5 sum control for copy file {from_path} to S3 object {to_path} : {from_md5} != {to_md5}", 

658 ) 

659 except Exception as e: 

660 raise StorageError( 

661 f"FILE and S3", f"Cannot copy file {from_path} to S3 object {to_path} : {e}" 

662 ) 

663 

664 elif from_type == StorageType.S3 and to_type == StorageType.S3: 

665 from_s3_client, from_bucket = __get_s3_client(from_tray) 

666 to_s3_client, to_bucket = __get_s3_client(to_tray) 

667 

668 try: 

669 if to_s3_client["host"] == from_s3_client["host"]: 

670 to_s3_client["client"].copy( 

671 {"Bucket": from_bucket, "Key": from_base_name}, to_bucket, to_base_name 

672 ) 

673 else: 

674 with tempfile.NamedTemporaryFile("w+b") as f: 

675 from_s3_client["client"].download_fileobj(from_bucket, from_base_name, f) 

676 to_s3_client["client"].upload_file(f.name, to_bucket, to_base_name) 

677 

678 if from_md5 is not None: 

679 to_md5 = ( 

680 to_s3_client["client"] 

681 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"] 

682 .strip('"') 

683 ) 

684 if to_md5 != from_md5: 

685 raise StorageError( 

686 f"S3", 

687 f"Invalid MD5 sum control for copy S3 object {from_path} to {to_path} : {from_md5} != {to_md5}", 

688 ) 

689 

690 except Exception as e: 

691 raise StorageError(f"S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}") 

692 

693 elif from_type == StorageType.CEPH and to_type == StorageType.FILE: 

694 ioctx = __get_ceph_ioctx(from_tray) 

695 

696 if from_md5 is not None: 

697 checker = hashlib.md5() 

698 

699 try: 

700 if to_tray != "": 

701 os.makedirs(to_tray, exist_ok=True) 

702 f = open(to_path, "wb") 

703 

704 offset = 0 

705 size = 0 

706 

707 while True: 

708 chunk = ioctx.read(from_base_name, 65536, offset) 

709 size = len(chunk) 

710 offset += size 

711 f.write(chunk) 

712 

713 if from_md5 is not None: 

714 checker.update(chunk) 

715 

716 if size < 65536: 

717 break 

718 

719 f.close() 

720 

721 if from_md5 is not None and from_md5 != checker.hexdigest(): 

722 raise StorageError( 

723 f"CEPH and FILE", 

724 f"Invalid MD5 sum control for copy CEPH object {from_path} to file {to_path} : {from_md5} != {checker.hexdigest()}", 

725 ) 

726 

727 except Exception as e: 

728 raise StorageError( 

729 f"CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}" 

730 ) 

731 

732 elif from_type == StorageType.FILE and to_type == StorageType.CEPH: 

733 ioctx = __get_ceph_ioctx(to_tray) 

734 

735 if from_md5 is not None: 

736 checker = hashlib.md5() 

737 

738 try: 

739 f = open(from_path, "rb") 

740 

741 offset = 0 

742 size = 0 

743 

744 while True: 

745 chunk = f.read(65536) 

746 size = len(chunk) 

747 ioctx.write(to_base_name, chunk, offset) 

748 offset += size 

749 

750 if from_md5 is not None: 

751 checker.update(chunk) 

752 

753 if size < 65536: 

754 break 

755 

756 f.close() 

757 

758 if from_md5 is not None and from_md5 != checker.hexdigest(): 

759 raise StorageError( 

760 f"FILE and CEPH", 

761 f"Invalid MD5 sum control for copy file {from_path} to CEPH object {to_path} : {from_md5} != {checker.hexdigest()}", 

762 ) 

763 

764 except Exception as e: 

765 raise StorageError( 

766 f"FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}" 

767 ) 

768 

769 elif from_type == StorageType.CEPH and to_type == StorageType.CEPH: 

770 from_ioctx = __get_ceph_ioctx(from_tray) 

771 to_ioctx = __get_ceph_ioctx(to_tray) 

772 

773 if from_md5 is not None: 

774 checker = hashlib.md5() 

775 

776 try: 

777 offset = 0 

778 size = 0 

779 

780 while True: 

781 chunk = from_ioctx.read(from_base_name, 65536, offset) 

782 size = len(chunk) 

783 to_ioctx.write(to_base_name, chunk, offset) 

784 offset += size 

785 

786 if from_md5 is not None: 

787 checker.update(chunk) 

788 

789 if size < 65536: 

790 break 

791 

792 if from_md5 is not None and from_md5 != checker.hexdigest(): 

793 raise StorageError( 

794 f"FILE and CEPH", 

795 f"Invalid MD5 sum control for copy CEPH object {from_path} to {to_path} : {from_md5} != {checker.hexdigest()}", 

796 ) 

797 

798 except Exception as e: 

799 raise StorageError(f"CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}") 

800 

801 elif from_type == StorageType.CEPH and to_type == StorageType.S3: 

802 from_ioctx = __get_ceph_ioctx(from_tray) 

803 

804 s3_client, to_bucket = __get_s3_client(to_tray) 

805 

806 if from_md5 is not None: 

807 checker = hashlib.md5() 

808 

809 try: 

810 offset = 0 

811 size = 0 

812 

813 with tempfile.NamedTemporaryFile("w+b", delete=False) as f: 

814 name_tmp = f.name 

815 while True: 

816 chunk = from_ioctx.read(from_base_name, 65536, offset) 

817 size = len(chunk) 

818 offset += size 

819 f.write(chunk) 

820 

821 if from_md5 is not None: 

822 checker.update(chunk) 

823 

824 if size < 65536: 

825 break 

826 

827 s3_client["client"].upload_file(name_tmp, to_bucket, to_base_name) 

828 

829 os.remove(name_tmp) 

830 

831 if from_md5 is not None and from_md5 != checker.hexdigest(): 

832 raise StorageError( 

833 f"CEPH and S3", 

834 f"Invalid MD5 sum control for copy CEPH object {from_path} to S3 object {to_path} : {from_md5} != {checker.hexdigest()}", 

835 ) 

836 

837 except Exception as e: 

838 raise StorageError( 

839 f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}" 

840 ) 

841 

842 elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE : 

843 

844 try: 

845 response = requests.get(from_type.value + from_path, stream = True) 

846 with open(to_path, "wb") as f: 

847 for chunk in response.iter_content(chunk_size=65536) : 

848 if chunk: 

849 f.write(chunk) 

850 

851 except Exception as e: 

852 raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}") 

853 

854 elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH : 

855 

856 to_ioctx = __get_ceph_ioctx(to_tray) 

857 

858 try: 

859 response = requests.get(from_type.value + from_path, stream = True) 

860 offset = 0 

861 for chunk in response.iter_content(chunk_size=65536) : 

862 if chunk: 

863 size = len(chunk) 

864 to_ioctx.write(to_base_name, chunk, offset) 

865 offset += size 

866 

867 except Exception as e: 

868 raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}") 

869 

870 elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 : 

871 

872 to_s3_client, to_bucket = __get_s3_client(to_tray) 

873 

874 try: 

875 response = requests.get(from_type.value + from_path, stream = True) 

876 with tempfile.NamedTemporaryFile("w+b",delete=False) as f: 

877 name_fich = f.name 

878 for chunk in response.iter_content(chunk_size=65536) : 

879 if chunk: 

880 f.write(chunk) 

881 

882 to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name) 

883 

884 os.remove(name_fich) 

885 

886 except Exception as e: 

887 raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}") 

888 

889 else: 

890 raise StorageError( 

891 f"{from_type.name} and {to_type.name}", 

892 f"Cannot copy from {from_type.name} to {to_type.name}", 

893 ) 

894 

895 

896def link(target_path: str, link_path: str, hard: bool = False) -> None: 

897 """Create a symbolic link 

898 

899 Args: 

900 target_path (str): file/object to link 

901 link_path (str): link to create 

902 hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False. 

903 

904 Raises: 

905 StorageError: Unhandled link or link issue 

906 MissingEnvironmentError: Missing object storage informations 

907 """ 

908 

909 target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path) 

910 link_type, link_path, link_tray, link_base_name = get_infos_from_path(link_path) 

911 

912 if target_type != link_type: 

913 raise StorageError( 

914 f"{target_type.name} and {link_type.name}", 

915 f"Cannot make link between two different storage types", 

916 ) 

917 

918 if hard and target_type != StorageType.FILE: 

919 raise StorageError(target_type.name, "Hard link is available only for FILE storage") 

920 

921 # Réalisation du lien, selon les types de stockage 

922 if target_type == StorageType.S3: 

923 target_s3_client, target_bucket = __get_s3_client(target_tray) 

924 link_s3_client, link_bucket = __get_s3_client(link_tray) 

925 

926 if target_s3_client["host"] != link_s3_client["host"]: 

927 raise StorageError( 

928 f"S3", 

929 f"Cannot make link {link_path} -> {target_path} : link works only on the same S3 cluster", 

930 ) 

931 

932 try: 

933 target_s3_client["client"].put_object( 

934 Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode( 

935 "utf-8" 

936 ), 

937 Bucket=link_bucket, 

938 Key=link_base_name, 

939 ) 

940 except Exception as e: 

941 raise StorageError("S3", e) 

942 

943 elif target_type == StorageType.CEPH: 

944 ioctx = __get_ceph_ioctx(link_tray) 

945 

946 try: 

947 ioctx.write_full( 

948 link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode("utf-8") 

949 ) 

950 except Exception as e: 

951 raise StorageError("CEPH", e) 

952 

953 elif target_type == StorageType.FILE: 

954 try: 

955 if hard: 

956 os.link(target_path, link_path) 

957 else: 

958 os.symlink(target_path, link_path) 

959 except Exception as e: 

960 raise StorageError("FILE", e) 

961 

962 else: 

963 raise StorageError("UNKNOWN", "Unhandled storage type to make link") 

964 

965 

966def get_osgeo_path(path: str) -> str: 

967 """Return GDAL/OGR Open compliant path and configure storage access 

968 

969 For a S3 input path, endpoint, access and secret keys are set and path is built with "/vsis3" root. 

970 

971 For a FILE input path, only storage prefix is removed 

972 

973 Args: 

974 path (str): Source path 

975 

976 Raises: 

977 NotImplementedError: Storage type not handled 

978 

979 Returns: 

980 str: GDAL/OGR Open compliant path 

981 """ 

982 

983 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path) 

984 

985 if storage_type == StorageType.S3: 

986 s3_client, bucket_name = __get_s3_client(tray_name) 

987 

988 gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", s3_client["secret_key"]) 

989 gdal.SetConfigOption("AWS_ACCESS_KEY_ID", s3_client["key"]) 

990 gdal.SetConfigOption("AWS_S3_ENDPOINT", s3_client["host"]) 

991 gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE") 

992 

993 return f"/vsis3/{bucket_name}/{base_name}" 

994 

995 elif storage_type == StorageType.FILE: 

996 return unprefixed_path 

997 

998 else: 

999 raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}") 

1000 

1001def size_path(path: str) -> int : 

1002 """Return the size of the path given (or, for the CEPH, the sum of the size of each object of the .list) 

1003 

1004 Args: 

1005 path (str): Source path 

1006 

1007 Raises: 

1008 StorageError: Unhandled link or link issue 

1009 MissingEnvironmentError: Missing object storage informations 

1010 

1011 Returns: 

1012 int: size of the path 

1013 """ 

1014 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path) 

1015 

1016 if storage_type == StorageType.FILE: 

1017 try : 

1018 total = 0 

1019 with os.scandir(unprefixed_path) as it: 

1020 for entry in it: 

1021 if entry.is_file(): 

1022 total += entry.stat().st_size 

1023 elif entry.is_dir(): 

1024 total += size_path(entry.path) 

1025 

1026 except Exception as e: 

1027 raise StorageError("FILE", e) 

1028 

1029 elif storage_type == StorageType.S3: 

1030 s3_client, bucket_name = __get_s3_client(tray_name) 

1031 

1032 try : 

1033 paginator = s3_client["client"].get_paginator('list_objects_v2') 

1034 pages = paginator.paginate( 

1035 Bucket=bucket_name, 

1036 Prefix=base_name+"/", 

1037 PaginationConfig={ 

1038 'PageSize': 10000, 

1039 } 

1040 ) 

1041 total = 0 

1042 for page in pages: 

1043 for key in page['Contents']: 

1044 total += key['Size'] 

1045 

1046 except Exception as e: 

1047 raise StorageError("S3", e) 

1048 

1049 

1050 elif storage_type == StorageType.CEPH: 

1051 raise NotImplementedError 

1052 else: 

1053 raise StorageError("UNKNOWN", "Unhandled storage type to calculate size") 

1054 

1055 return total