Coverage for src/rok4/Storage.py: 82%
498 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-29 10:29 +0100
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-29 10:29 +0100
1"""Provide functions to use read or write
3Available storage types are :
4- S3 (path are preffixed with `s3://`)
5- CEPH (path are prefixed with `ceph://`)
6- FILE (path are prefixed with `file://`, but it is the default paths' interpretation)
7- HTTP (path are prefixed with `http://`)
8- HTTPS (path are prefixed with `https://`)
10According to functions, all storage types are not necessarily available.
12Using CEPH storage requires environment variables :
13- ROK4_CEPH_CONFFILE
14- ROK4_CEPH_USERNAME
15- ROK4_CEPH_CLUSTERNAME
17Using S3 storage requires environment variables :
18- ROK4_S3_KEY
19- ROK4_S3_SECRETKEY
20- ROK4_S3_URL
22To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements
24Example: work with 2 S3 clusters:
26- ROK4_S3_KEY=KEY1,KEY2
27- ROK4_S3_SECRETKEY=SKEY1,SKEY2
28- ROK4_S3_URL=https://s3.storage.fr,https://s4.storage.fr
30To precise the cluster to use, bucket name should be bucket_name@s3.storage.fr or bucket_name@s4.storage.fr. If no host is defined (no @) in the bucket name, first S3 cluster is used
31"""
33import boto3
34import botocore.exceptions
35import tempfile
36import re
37import os
38import rados
39import hashlib
40import requests
41from typing import Dict, List, Tuple, Union
42from enum import Enum
43from shutil import copyfile
44from osgeo import gdal
46gdal.UseExceptions()
48from rok4.Exceptions import *
51class StorageType(Enum):
52 FILE = "file://"
53 S3 = "s3://"
54 CEPH = "ceph://"
55 HTTP = "http://"
56 HTTPS = "https://"
59__S3_CLIENTS = {}
60__S3_DEFAULT_CLIENT = None
63def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]:
64 """Get the S3 client
66 Create it if not already done
68 Args:
69 bucket_name (str): S3 bucket name. Could be just the bucket name, or <bucket name>@<cluster host>
71 Raises:
72 MissingEnvironmentError: Missing S3 storage informations
73 StorageError: S3 client configuration issue
75 Returns:
76 Tuple[Dict[str, Union['boto3.client',str]], str, str]: the S3 informations (client, host, key, secret) and the simple bucket name
77 """
79 global __S3_CLIENTS, __S3_DEFAULT_CLIENT
81 if not __S3_CLIENTS:
82 # C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement
83 try:
84 keys = os.environ["ROK4_S3_KEY"].split(",")
85 secret_keys = os.environ["ROK4_S3_SECRETKEY"].split(",")
86 urls = os.environ["ROK4_S3_URL"].split(",")
88 if len(keys) != len(secret_keys) or len(keys) != len(urls):
89 raise StorageError(
90 "S3",
91 "S3 informations in environment variables are inconsistent : same number of element in each list is required",
92 )
94 for i in range(len(keys)):
95 h = re.sub("https?://", "", urls[i])
97 if h in __S3_CLIENTS:
98 raise StorageError("S3", "A S3 cluster is defined twice (based on URL)")
100 __S3_CLIENTS[h] = {
101 "client": boto3.client(
102 "s3",
103 aws_access_key_id=keys[i],
104 aws_secret_access_key=secret_keys[i],
105 endpoint_url=urls[i],
106 ),
107 "key": keys[i],
108 "secret_key": secret_keys[i],
109 "url": urls[i],
110 "host": h,
111 }
113 if i == 0:
114 # Le premier cluster est celui par défaut
115 __S3_DEFAULT_CLIENT = h
117 except KeyError as e:
118 raise MissingEnvironmentError(e)
119 except Exception as e:
120 raise StorageError("S3", e)
122 try:
123 host = bucket_name.split("@")[1]
124 except IndexError:
125 host = __S3_DEFAULT_CLIENT
127 bucket_name = bucket_name.split("@")[0]
129 if host not in __S3_CLIENTS:
130 raise StorageError("S3", f"Unknown S3 cluster, according to host '{host}'")
132 return __S3_CLIENTS[host], bucket_name
135def disconnect_s3_clients() -> None:
136 """Clean S3 clients"""
138 global __S3_CLIENTS, __S3_DEFAULT_CLIENT
139 __S3_CLIENTS = {}
140 __S3_DEFAULT_CLIENT = None
143__CEPH_CLIENT = None
144__CEPH_IOCTXS = {}
147def __get_ceph_ioctx(pool: str) -> "rados.Ioctx":
148 """Get the CEPH IO context
150 Create it (client and context) if not already done
152 Args:
153 pool (str): CEPH pool's name
155 Raises:
156 MissingEnvironmentError: Missing CEPH storage informations
157 StorageError: CEPH IO context configuration issue
159 Returns:
160 rados.Ioctx: IO ceph context
161 """
162 global __CEPH_CLIENT, __CEPH_IOCTXS
164 if __CEPH_CLIENT is None:
165 try:
166 __CEPH_CLIENT = rados.Rados(
167 conffile=os.environ["ROK4_CEPH_CONFFILE"],
168 clustername=os.environ["ROK4_CEPH_CLUSTERNAME"],
169 name=os.environ["ROK4_CEPH_USERNAME"],
170 )
172 __CEPH_CLIENT.connect()
174 except KeyError as e:
175 raise MissingEnvironmentError(e)
176 except Exception as e:
177 raise StorageError("CEPH", e)
179 if pool not in __CEPH_IOCTXS:
180 try:
181 __CEPH_IOCTXS[pool] = __CEPH_CLIENT.open_ioctx(pool)
182 except Exception as e:
183 raise StorageError("CEPH", e)
185 return __CEPH_IOCTXS[pool]
188def disconnect_ceph_clients() -> None:
189 """Clean CEPH clients"""
190 global __CEPH_CLIENT, __CEPH_IOCTXS
191 __CEPH_CLIENT = None
192 __CEPH_IOCTXS = {}
195__OBJECT_SYMLINK_SIGNATURE = "SYMLINK#"
198def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]:
199 """Extract storage type, the unprefixed path, the container and the basename from path (Default: FILE storage)
201 For a FILE storage, the tray is the directory and the basename is the file name.
203 For an object storage (CEPH or S3), the tray is the bucket or the pool and the basename is the object name.
204 For a S3 bucket, format can be <bucket name>@<cluster name> to use several clusters. Cluster name is the host (without protocol)
206 Args:
207 path (str): path to analyse
209 Returns:
210 Tuple[StorageType, str, str, str]: storage type, unprefixed path, the container and the basename
211 """
213 if path.startswith("s3://"):
214 bucket_name, object_name = path[5:].split("/", 1)
215 return StorageType.S3, path[5:], bucket_name, object_name
216 elif path.startswith("ceph://"):
217 pool_name, object_name = path[7:].split("/", 1)
218 return StorageType.CEPH, path[7:], pool_name, object_name
219 elif path.startswith("file://"):
220 return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
221 elif path.startswith("http://"):
222 return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
223 elif path.startswith("https://"):
224 return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:])
225 else:
226 return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path)
229def get_path_from_infos(storage_type: StorageType, *args) -> str:
230 """Write full path from elements
232 Prefixed wih storage's type, elements are joined with a slash
234 Args:
235 storage_type (StorageType): Storage's type for path
237 Returns:
238 str: Full path
239 """
240 return f"{storage_type.value}{os.path.join(*args)}"
243def hash_file(path: str) -> str:
244 """Process MD5 sum of the provided file
246 Args:
247 path (str): path to file
249 Returns:
250 str: hexadeimal MD5 sum
251 """
253 checker = hashlib.md5()
255 with open(path, "rb") as file:
256 chunk = 0
257 while chunk != b"":
258 chunk = file.read(65536)
259 checker.update(chunk)
261 return checker.hexdigest()
264def get_data_str(path: str) -> str:
265 """Load full data into a string
267 Args:
268 path (str): path to data
270 Raises:
271 MissingEnvironmentError: Missing object storage informations
272 StorageError: Storage read issue
273 FileNotFoundError: File or object does not exist
275 Returns:
276 str: Data content
277 """
279 return get_data_binary(path).decode("utf-8")
282def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
283 """Load data into a binary string
285 Args:
286 path (str): path to data
287 range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.
289 Raises:
290 MissingEnvironmentError: Missing object storage informations
291 StorageError: Storage read issue
292 FileNotFoundError: File or object does not exist
294 Returns:
295 str: Data binary content
296 """
297 storage_type, path, tray_name, base_name = get_infos_from_path(path)
299 if storage_type == StorageType.S3:
300 s3_client, bucket_name = __get_s3_client(tray_name)
302 try:
303 if range is None:
304 data = (
305 s3_client["client"]
306 .get_object(
307 Bucket=bucket_name,
308 Key=base_name,
309 )["Body"]
310 .read()
311 )
312 else:
313 data = (
314 s3_client["client"]
315 .get_object(
316 Bucket=bucket_name,
317 Key=base_name,
318 Range=f"bytes={range[0]}-{range[0] + range[1] - 1}",
319 )["Body"]
320 .read()
321 )
323 except botocore.exceptions.ClientError as e:
324 if e.response["Error"]["Code"] == "404":
325 raise FileNotFoundError(f"{storage_type.value}{path}")
326 else:
327 raise StorageError("S3", e)
329 except Exception as e:
330 raise StorageError("S3", e)
332 elif storage_type == StorageType.CEPH:
333 ioctx = __get_ceph_ioctx(tray_name)
335 try:
336 if range is None:
337 size, mtime = ioctx.stat(base_name)
338 data = ioctx.read(base_name, size)
339 else:
340 data = ioctx.read(base_name, range[1], range[0])
342 except rados.ObjectNotFound as e:
343 raise FileNotFoundError(f"{storage_type.value}{path}")
345 except Exception as e:
346 raise StorageError("CEPH", e)
348 elif storage_type == StorageType.FILE:
349 try:
350 f = open(path, "rb")
351 if range is None:
352 data = f.read()
353 else:
354 f.seek(range[0])
355 data = f.read(range[1])
357 f.close()
359 except FileNotFoundError as e:
360 raise FileNotFoundError(f"{storage_type.value}{path}")
362 except Exception as e:
363 raise StorageError("FILE", e)
365 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
367 if range is None :
368 try:
369 reponse = requests.get(f"{storage_type.value}{path}", stream=True)
370 data = reponse.content
371 if reponse.status_code == 404 :
372 raise FileNotFoundError(f"{storage_type.value}{path}")
373 except Exception as e:
374 raise StorageError(storage_type.name, e)
375 else :
376 raise NotImplementedError
378 else:
379 raise StorageError("UNKNOWN", "Unhandled storage type to read binary data")
381 return data
384def put_data_str(data: str, path: str) -> None:
385 """Store string data into a file or an object
387 UTF-8 encoding is used for bytes conversion
389 Args:
390 data (str): data to write
391 path (str): destination path, where to write data
393 Raises:
394 MissingEnvironmentError: Missing object storage informations
395 StorageError: Storage write issue
396 """
398 storage_type, path, tray_name, base_name = get_infos_from_path(path)
400 if storage_type == StorageType.S3:
401 s3_client, bucket_name = __get_s3_client(tray_name)
403 try:
404 s3_client["client"].put_object(
405 Body=data.encode("utf-8"), Bucket=bucket_name, Key=base_name
406 )
407 except Exception as e:
408 raise StorageError("S3", e)
410 elif storage_type == StorageType.CEPH:
411 ioctx = __get_ceph_ioctx(tray_name)
413 try:
414 ioctx.write_full(base_name, data.encode("utf-8"))
415 except Exception as e:
416 raise StorageError("CEPH", e)
418 elif storage_type == StorageType.FILE:
419 try:
420 f = open(path, "w")
421 f.write(data)
422 f.close()
423 except Exception as e:
424 raise StorageError("FILE", e)
426 else:
427 raise StorageError("UNKNOWN", "Unhandled storage type to write string data")
430def get_size(path: str) -> int:
431 """Get size of file or object
433 Args:
434 path (str): path of file/object whom size is asked
436 Raises:
437 MissingEnvironmentError: Missing object storage informations
438 StorageError: Storage read issue
440 Returns:
441 int: file/object size, in bytes
442 """
444 storage_type, path, tray_name, base_name = get_infos_from_path(path)
446 if storage_type == StorageType.S3:
447 s3_client, bucket_name = __get_s3_client(tray_name)
449 try:
450 size = s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)[
451 "ContentLength"
452 ]
453 return int(size)
454 except Exception as e:
455 raise StorageError("S3", e)
457 elif storage_type == StorageType.CEPH:
458 ioctx = __get_ceph_ioctx(tray_name)
460 try:
461 size, mtime = ioctx.stat(base_name)
462 return size
463 except Exception as e:
464 raise StorageError("CEPH", e)
466 elif storage_type == StorageType.FILE:
467 try:
468 file_stats = os.stat(path)
469 return file_stats.st_size
470 except Exception as e:
471 raise StorageError("FILE", e)
473 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
475 try:
476 # Le stream=True permet de ne télécharger que le header initialement
477 reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
478 return reponse
479 except Exception as e:
480 raise StorageError(storage_type.name, e)
482 else:
483 raise StorageError("UNKNOWN", "Unhandled storage type to get size")
486def exists(path: str) -> bool:
487 """Do the file or object exist ?
489 Args:
490 path (str): path of file/object to test
492 Raises:
493 MissingEnvironmentError: Missing object storage informations
494 StorageError: Storage read issue
496 Returns:
497 bool: file/object existing status
498 """
500 storage_type, path, tray_name, base_name = get_infos_from_path(path)
502 if storage_type == StorageType.S3:
503 s3_client, bucket_name = __get_s3_client(tray_name)
505 try:
506 s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)
507 return True
508 except botocore.exceptions.ClientError as e:
509 if e.response["Error"]["Code"] == "404":
510 return False
511 else:
512 raise StorageError("S3", e)
514 elif storage_type == StorageType.CEPH:
515 ioctx = __get_ceph_ioctx(tray_name)
517 try:
518 ioctx.stat(base_name)
519 return True
520 except rados.ObjectNotFound as e:
521 return False
522 except Exception as e:
523 raise StorageError("CEPH", e)
525 elif storage_type == StorageType.FILE:
526 return os.path.exists(path)
528 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
530 try:
531 response = requests.get(storage_type.value + path, stream=True)
532 if response.status_code == 200 :
533 return True
534 else :
535 return False
536 except Exception as e:
537 raise StorageError(storage_type.name, e)
539 else:
540 raise StorageError("UNKNOWN", "Unhandled storage type to test if exists")
543def remove(path: str) -> None:
544 """Remove the file/object
546 Args:
547 path (str): path of file/object to remove
549 Raises:
550 MissingEnvironmentError: Missing object storage informations
551 StorageError: Storage removal issue
552 """
553 storage_type, path, tray_name, base_name = get_infos_from_path(path)
555 if storage_type == StorageType.S3:
556 s3_client, bucket_name = __get_s3_client(tray_name)
558 try:
559 s3_client["client"].delete_object(Bucket=bucket_name, Key=base_name)
560 except Exception as e:
561 raise StorageError("S3", e)
563 elif storage_type == StorageType.CEPH:
564 ioctx = __get_ceph_ioctx(tray_name)
566 try:
567 ioctx.remove_object(base_name)
568 except rados.ObjectNotFound as e:
569 pass
570 except Exception as e:
571 raise StorageError("CEPH", e)
573 elif storage_type == StorageType.FILE:
574 try:
575 os.remove(path)
576 except FileNotFoundError as e:
577 pass
578 except Exception as e:
579 raise StorageError("FILE", e)
581 else:
582 raise StorageError("UNKNOWN", "Unhandled storage type to remove things")
585def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
586 """Copy a file or object to a file or object place. If MD5 sum is provided, it is compared to sum after the copy.
588 Args:
589 from_path (str): source file/object path, to copy
590 to_path (str): destination file/object path
591 from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None.
593 Raises:
594 StorageError: Unhandled copy or copy issue
595 MissingEnvironmentError: Missing object storage informations
596 """
598 from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path)
599 to_type, to_path, to_tray, to_base_name = get_infos_from_path(to_path)
601 # Réalisation de la copie, selon les types de stockage
602 if from_type == StorageType.FILE and to_type == StorageType.FILE:
603 try:
604 if to_tray != "":
605 os.makedirs(to_tray, exist_ok=True)
607 copyfile(from_path, to_path)
609 if from_md5 is not None:
610 to_md5 = hash_file(to_path)
611 if to_md5 != from_md5:
612 raise StorageError(
613 f"FILE",
614 f"Invalid MD5 sum control for copy file {from_path} to {to_path} : {from_md5} != {to_md5}",
615 )
617 except Exception as e:
618 raise StorageError(f"FILE", f"Cannot copy file {from_path} to {to_path} : {e}")
620 elif from_type == StorageType.S3 and to_type == StorageType.FILE:
621 s3_client, from_bucket = __get_s3_client(from_tray)
623 try:
624 if to_tray != "":
625 os.makedirs(to_tray, exist_ok=True)
627 s3_client["client"].download_file(from_bucket, from_base_name, to_path)
629 if from_md5 is not None:
630 to_md5 = hash_file(to_path)
631 if to_md5 != from_md5:
632 raise StorageError(
633 "S3 and FILE",
634 f"Invalid MD5 sum control for copy S3 object {from_path} to file {to_path} : {from_md5} != {to_md5}",
635 )
637 except Exception as e:
638 raise StorageError(
639 f"S3 and FILE", f"Cannot copy S3 object {from_path} to file {to_path} : {e}"
640 )
642 elif from_type == StorageType.FILE and to_type == StorageType.S3:
643 s3_client, to_bucket = __get_s3_client(to_tray)
645 try:
646 s3_client["client"].upload_file(from_path, to_bucket, to_base_name)
648 if from_md5 is not None:
649 to_md5 = (
650 s3_client["client"]
651 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
652 .strip('"')
653 )
654 if to_md5 != from_md5:
655 raise StorageError(
656 f"FILE and S3",
657 f"Invalid MD5 sum control for copy file {from_path} to S3 object {to_path} : {from_md5} != {to_md5}",
658 )
659 except Exception as e:
660 raise StorageError(
661 f"FILE and S3", f"Cannot copy file {from_path} to S3 object {to_path} : {e}"
662 )
664 elif from_type == StorageType.S3 and to_type == StorageType.S3:
665 from_s3_client, from_bucket = __get_s3_client(from_tray)
666 to_s3_client, to_bucket = __get_s3_client(to_tray)
668 try:
669 if to_s3_client["host"] == from_s3_client["host"]:
670 to_s3_client["client"].copy(
671 {"Bucket": from_bucket, "Key": from_base_name}, to_bucket, to_base_name
672 )
673 else:
674 with tempfile.NamedTemporaryFile("w+b") as f:
675 from_s3_client["client"].download_fileobj(from_bucket, from_base_name, f)
676 to_s3_client["client"].upload_file(f.name, to_bucket, to_base_name)
678 if from_md5 is not None:
679 to_md5 = (
680 to_s3_client["client"]
681 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
682 .strip('"')
683 )
684 if to_md5 != from_md5:
685 raise StorageError(
686 f"S3",
687 f"Invalid MD5 sum control for copy S3 object {from_path} to {to_path} : {from_md5} != {to_md5}",
688 )
690 except Exception as e:
691 raise StorageError(f"S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}")
693 elif from_type == StorageType.CEPH and to_type == StorageType.FILE:
694 ioctx = __get_ceph_ioctx(from_tray)
696 if from_md5 is not None:
697 checker = hashlib.md5()
699 try:
700 if to_tray != "":
701 os.makedirs(to_tray, exist_ok=True)
702 f = open(to_path, "wb")
704 offset = 0
705 size = 0
707 while True:
708 chunk = ioctx.read(from_base_name, 65536, offset)
709 size = len(chunk)
710 offset += size
711 f.write(chunk)
713 if from_md5 is not None:
714 checker.update(chunk)
716 if size < 65536:
717 break
719 f.close()
721 if from_md5 is not None and from_md5 != checker.hexdigest():
722 raise StorageError(
723 f"CEPH and FILE",
724 f"Invalid MD5 sum control for copy CEPH object {from_path} to file {to_path} : {from_md5} != {checker.hexdigest()}",
725 )
727 except Exception as e:
728 raise StorageError(
729 f"CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}"
730 )
732 elif from_type == StorageType.FILE and to_type == StorageType.CEPH:
733 ioctx = __get_ceph_ioctx(to_tray)
735 if from_md5 is not None:
736 checker = hashlib.md5()
738 try:
739 f = open(from_path, "rb")
741 offset = 0
742 size = 0
744 while True:
745 chunk = f.read(65536)
746 size = len(chunk)
747 ioctx.write(to_base_name, chunk, offset)
748 offset += size
750 if from_md5 is not None:
751 checker.update(chunk)
753 if size < 65536:
754 break
756 f.close()
758 if from_md5 is not None and from_md5 != checker.hexdigest():
759 raise StorageError(
760 f"FILE and CEPH",
761 f"Invalid MD5 sum control for copy file {from_path} to CEPH object {to_path} : {from_md5} != {checker.hexdigest()}",
762 )
764 except Exception as e:
765 raise StorageError(
766 f"FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}"
767 )
769 elif from_type == StorageType.CEPH and to_type == StorageType.CEPH:
770 from_ioctx = __get_ceph_ioctx(from_tray)
771 to_ioctx = __get_ceph_ioctx(to_tray)
773 if from_md5 is not None:
774 checker = hashlib.md5()
776 try:
777 offset = 0
778 size = 0
780 while True:
781 chunk = from_ioctx.read(from_base_name, 65536, offset)
782 size = len(chunk)
783 to_ioctx.write(to_base_name, chunk, offset)
784 offset += size
786 if from_md5 is not None:
787 checker.update(chunk)
789 if size < 65536:
790 break
792 if from_md5 is not None and from_md5 != checker.hexdigest():
793 raise StorageError(
794 f"FILE and CEPH",
795 f"Invalid MD5 sum control for copy CEPH object {from_path} to {to_path} : {from_md5} != {checker.hexdigest()}",
796 )
798 except Exception as e:
799 raise StorageError(f"CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}")
801 elif from_type == StorageType.CEPH and to_type == StorageType.S3:
802 from_ioctx = __get_ceph_ioctx(from_tray)
804 s3_client, to_bucket = __get_s3_client(to_tray)
806 if from_md5 is not None:
807 checker = hashlib.md5()
809 try:
810 offset = 0
811 size = 0
813 with tempfile.NamedTemporaryFile("w+b", delete=False) as f:
814 name_tmp = f.name
815 while True:
816 chunk = from_ioctx.read(from_base_name, 65536, offset)
817 size = len(chunk)
818 offset += size
819 f.write(chunk)
821 if from_md5 is not None:
822 checker.update(chunk)
824 if size < 65536:
825 break
827 s3_client["client"].upload_file(name_tmp, to_bucket, to_base_name)
829 os.remove(name_tmp)
831 if from_md5 is not None and from_md5 != checker.hexdigest():
832 raise StorageError(
833 f"CEPH and S3",
834 f"Invalid MD5 sum control for copy CEPH object {from_path} to S3 object {to_path} : {from_md5} != {checker.hexdigest()}",
835 )
837 except Exception as e:
838 raise StorageError(
839 f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
840 )
842 elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE :
844 try:
845 response = requests.get(from_type.value + from_path, stream = True)
846 with open(to_path, "wb") as f:
847 for chunk in response.iter_content(chunk_size=65536) :
848 if chunk:
849 f.write(chunk)
851 except Exception as e:
852 raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}")
854 elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH :
856 to_ioctx = __get_ceph_ioctx(to_tray)
858 try:
859 response = requests.get(from_type.value + from_path, stream = True)
860 offset = 0
861 for chunk in response.iter_content(chunk_size=65536) :
862 if chunk:
863 size = len(chunk)
864 to_ioctx.write(to_base_name, chunk, offset)
865 offset += size
867 except Exception as e:
868 raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}")
870 elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 :
872 to_s3_client, to_bucket = __get_s3_client(to_tray)
874 try:
875 response = requests.get(from_type.value + from_path, stream = True)
876 with tempfile.NamedTemporaryFile("w+b",delete=False) as f:
877 name_fich = f.name
878 for chunk in response.iter_content(chunk_size=65536) :
879 if chunk:
880 f.write(chunk)
882 to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name)
884 os.remove(name_fich)
886 except Exception as e:
887 raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}")
889 else:
890 raise StorageError(
891 f"{from_type.name} and {to_type.name}",
892 f"Cannot copy from {from_type.name} to {to_type.name}",
893 )
896def link(target_path: str, link_path: str, hard: bool = False) -> None:
897 """Create a symbolic link
899 Args:
900 target_path (str): file/object to link
901 link_path (str): link to create
902 hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False.
904 Raises:
905 StorageError: Unhandled link or link issue
906 MissingEnvironmentError: Missing object storage informations
907 """
909 target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path)
910 link_type, link_path, link_tray, link_base_name = get_infos_from_path(link_path)
912 if target_type != link_type:
913 raise StorageError(
914 f"{target_type.name} and {link_type.name}",
915 f"Cannot make link between two different storage types",
916 )
918 if hard and target_type != StorageType.FILE:
919 raise StorageError(target_type.name, "Hard link is available only for FILE storage")
921 # Réalisation du lien, selon les types de stockage
922 if target_type == StorageType.S3:
923 target_s3_client, target_bucket = __get_s3_client(target_tray)
924 link_s3_client, link_bucket = __get_s3_client(link_tray)
926 if target_s3_client["host"] != link_s3_client["host"]:
927 raise StorageError(
928 f"S3",
929 f"Cannot make link {link_path} -> {target_path} : link works only on the same S3 cluster",
930 )
932 try:
933 target_s3_client["client"].put_object(
934 Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode(
935 "utf-8"
936 ),
937 Bucket=link_bucket,
938 Key=link_base_name,
939 )
940 except Exception as e:
941 raise StorageError("S3", e)
943 elif target_type == StorageType.CEPH:
944 ioctx = __get_ceph_ioctx(link_tray)
946 try:
947 ioctx.write_full(
948 link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode("utf-8")
949 )
950 except Exception as e:
951 raise StorageError("CEPH", e)
953 elif target_type == StorageType.FILE:
954 try:
955 if hard:
956 os.link(target_path, link_path)
957 else:
958 os.symlink(target_path, link_path)
959 except Exception as e:
960 raise StorageError("FILE", e)
962 else:
963 raise StorageError("UNKNOWN", "Unhandled storage type to make link")
966def get_osgeo_path(path: str) -> str:
967 """Return GDAL/OGR Open compliant path and configure storage access
969 For a S3 input path, endpoint, access and secret keys are set and path is built with "/vsis3" root.
971 For a FILE input path, only storage prefix is removed
973 Args:
974 path (str): Source path
976 Raises:
977 NotImplementedError: Storage type not handled
979 Returns:
980 str: GDAL/OGR Open compliant path
981 """
983 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)
985 if storage_type == StorageType.S3:
986 s3_client, bucket_name = __get_s3_client(tray_name)
988 gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", s3_client["secret_key"])
989 gdal.SetConfigOption("AWS_ACCESS_KEY_ID", s3_client["key"])
990 gdal.SetConfigOption("AWS_S3_ENDPOINT", s3_client["host"])
991 gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE")
993 return f"/vsis3/{bucket_name}/{base_name}"
995 elif storage_type == StorageType.FILE:
996 return unprefixed_path
998 else:
999 raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}")
1001def size_path(path: str) -> int :
1002 """Return the size of the path given (or, for the CEPH, the sum of the size of each object of the .list)
1004 Args:
1005 path (str): Source path
1007 Raises:
1008 StorageError: Unhandled link or link issue
1009 MissingEnvironmentError: Missing object storage informations
1011 Returns:
1012 int: size of the path
1013 """
1014 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)
1016 if storage_type == StorageType.FILE:
1017 try :
1018 total = 0
1019 with os.scandir(unprefixed_path) as it:
1020 for entry in it:
1021 if entry.is_file():
1022 total += entry.stat().st_size
1023 elif entry.is_dir():
1024 total += size_path(entry.path)
1026 except Exception as e:
1027 raise StorageError("FILE", e)
1029 elif storage_type == StorageType.S3:
1030 s3_client, bucket_name = __get_s3_client(tray_name)
1032 try :
1033 paginator = s3_client["client"].get_paginator('list_objects_v2')
1034 pages = paginator.paginate(
1035 Bucket=bucket_name,
1036 Prefix=base_name+"/",
1037 PaginationConfig={
1038 'PageSize': 10000,
1039 }
1040 )
1041 total = 0
1042 for page in pages:
1043 for key in page['Contents']:
1044 total += key['Size']
1046 except Exception as e:
1047 raise StorageError("S3", e)
1050 elif storage_type == StorageType.CEPH:
1051 raise NotImplementedError
1052 else:
1053 raise StorageError("UNKNOWN", "Unhandled storage type to calculate size")
1055 return total