Coverage for src/rok4/storage.py: 80%
537 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-01 15:35 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-01 15:35 +0000
1"""Provide functions to read or write data
3Available storage types are :
5- S3 (path are preffixed with `s3://`)
6- CEPH (path are prefixed with `ceph://`)
7- FILE (path are prefixed with `file://`, but it is the default paths' interpretation)
8- HTTP (path are prefixed with `http://`)
9- HTTPS (path are prefixed with `https://`)
11According to functions, all storage types are not necessarily available.
13Readings uses a LRU cache system with a TTL. It's possible to configure it with environment variables :
15- ROK4_READING_LRU_CACHE_SIZE : Number of cached element. Default 64. Set 0 or a negative integer to configure a cache without bound. A power of two make cache more efficient.
16- ROK4_READING_LRU_CACHE_TTL : Validity duration of cached element, in seconds. Default 300. 0 or negative integer to get cache without expiration date.
18To disable cache (always read data on storage), set ROK4_READING_LRU_CACHE_SIZE to 1 and ROK4_READING_LRU_CACHE_TTL to 1.
20Using CEPH storage requires environment variables :
22- ROK4_CEPH_CONFFILE
23- ROK4_CEPH_USERNAME
24- ROK4_CEPH_CLUSTERNAME
26Using S3 storage requires environment variables :
28- ROK4_S3_KEY
29- ROK4_S3_SECRETKEY
30- ROK4_S3_URL
32To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements
34Example, work with 2 S3 clusters:
36- ROK4_S3_KEY=KEY1,KEY2
37- ROK4_S3_SECRETKEY=SKEY1,SKEY2
38- ROK4_S3_URL=https://s3.storage.fr,https://s4.storage.fr
40To precise the cluster to use, bucket name should be bucket_name@s3.storage.fr or bucket_name@s4.storage.fr. If no host is defined (no @) in the bucket name, first S3 cluster is used
41"""
43import hashlib
44import os
45import re
46import tempfile
47import time
48from functools import lru_cache
49from shutil import copyfile
50from typing import Dict, Tuple, Union
52import boto3
53import botocore.exceptions
54import requests
56# conditional import
58try:
59 from osgeo import gdal
61 # Enable GDAL/OGR exceptions
62 gdal.UseExceptions()
64 GDAL_AVAILABLE: bool = True
65except ImportError:
66 GDAL_AVAILABLE: bool = False
67 gdal = None
70try:
71 import rados
73 CEPH_RADOS_AVAILABLE: bool = True
74except ImportError:
75 CEPH_RADOS_AVAILABLE: bool = False
76 rados = None
78# package
79from rok4.enums import StorageType
80from rok4.exceptions import MissingEnvironmentError, StorageError
82# -- GLOBALS --
85__CEPH_CLIENT = None
86__CEPH_IOCTXS = {}
87__OBJECT_SYMLINK_SIGNATURE = "SYMLINK#"
88__S3_CLIENTS = {}
89__S3_DEFAULT_CLIENT = None
90__LRU_SIZE = 64
91__LRU_TTL = 300
93try:
94 __LRU_SIZE = int(os.environ["ROK4_READING_LRU_CACHE_SIZE"])
95 if __LRU_SIZE < 1:
96 __LRU_SIZE = None
97except ValueError:
98 pass
99except KeyError:
100 pass
102try:
103 __LRU_TTL = int(os.environ["ROK4_READING_LRU_CACHE_TTL"])
104 if __LRU_TTL < 0:
105 __LRU_TTL = 0
106except ValueError:
107 pass
108except KeyError:
109 pass
112def __get_ttl_hash() -> int:
113 """Return the time string rounded according to time-to-live value"""
114 if __LRU_TTL == 0:
115 return 0
116 else:
117 return round(time.time() / __LRU_TTL)
120def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]:
121 """Get the S3 client
123 Create it if not already done
125 Args:
126 bucket_name (str): S3 bucket name. Could be just the bucket name, or <bucket name>@<cluster host>
128 Raises:
129 MissingEnvironmentError: Missing S3 storage informations
130 StorageError: S3 client configuration issue
132 Returns:
133 Tuple[Dict[str, Union['boto3.client',str]], str]: the S3 informations (client, host, key, secret) and the simple bucket name
134 """
136 global __S3_CLIENTS, __S3_DEFAULT_CLIENT
138 if not __S3_CLIENTS:
139 verify = True
140 if "ROK4_SSL_NO_VERIFY" in os.environ and os.environ["ROK4_SSL_NO_VERIFY"] != "":
141 verify = False
142 # C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement
143 try:
144 keys = os.environ["ROK4_S3_KEY"].split(",")
145 secret_keys = os.environ["ROK4_S3_SECRETKEY"].split(",")
146 urls = os.environ["ROK4_S3_URL"].split(",")
148 if len(keys) != len(secret_keys) or len(keys) != len(urls):
149 raise StorageError(
150 "S3",
151 "S3 informations in environment variables are inconsistent : same number of element in each list is required",
152 )
154 for i in range(len(keys)):
155 h = re.sub("https?://", "", urls[i])
157 if h in __S3_CLIENTS:
158 raise StorageError("S3", "A S3 cluster is defined twice (based on URL)")
160 __S3_CLIENTS[h] = {
161 "client": boto3.client(
162 "s3",
163 aws_access_key_id=keys[i],
164 aws_secret_access_key=secret_keys[i],
165 verify=verify,
166 endpoint_url=urls[i],
167 config=botocore.config.Config(tcp_keepalive=True, max_pool_connections=10),
168 ),
169 "key": keys[i],
170 "secret_key": secret_keys[i],
171 "url": urls[i],
172 "host": h,
173 "secure": urls[i].startswith("https://"),
174 }
176 if i == 0:
177 # Le premier cluster est celui par défaut
178 __S3_DEFAULT_CLIENT = h
180 except KeyError as e:
181 raise MissingEnvironmentError(e)
182 except Exception as e:
183 raise StorageError("S3", e)
185 try:
186 host = bucket_name.split("@")[1]
187 except IndexError:
188 host = __S3_DEFAULT_CLIENT
190 bucket_name = bucket_name.split("@")[0]
192 if host not in __S3_CLIENTS:
193 raise StorageError("S3", f"Unknown S3 cluster, according to host '{host}'")
195 return __S3_CLIENTS[host], bucket_name
198def disconnect_s3_clients() -> None:
199 """Clean S3 clients"""
201 global __S3_CLIENTS, __S3_DEFAULT_CLIENT
202 __S3_CLIENTS = {}
203 __S3_DEFAULT_CLIENT = None
206def __get_ceph_ioctx(pool: str) -> "rados.Ioctx":
207 """Get the CEPH IO context
209 Create it (client and context) if not already done
211 Args:
212 pool (str): CEPH pool's name
214 Raises:
215 MissingEnvironmentError: Missing CEPH storage informations
216 StorageError: CEPH IO context configuration issue
218 Returns:
219 rados.Ioctx: IO ceph context
220 """
221 global __CEPH_CLIENT, __CEPH_IOCTXS
223 if __CEPH_CLIENT is None:
224 try:
225 __CEPH_CLIENT = rados.Rados(
226 conffile=os.environ["ROK4_CEPH_CONFFILE"],
227 clustername=os.environ["ROK4_CEPH_CLUSTERNAME"],
228 name=os.environ["ROK4_CEPH_USERNAME"],
229 )
231 __CEPH_CLIENT.connect()
233 except KeyError as e:
234 raise MissingEnvironmentError(e)
235 except Exception as e:
236 raise StorageError("CEPH", e)
238 if pool not in __CEPH_IOCTXS:
239 try:
240 __CEPH_IOCTXS[pool] = __CEPH_CLIENT.open_ioctx(pool)
241 except Exception as e:
242 raise StorageError("CEPH", e)
244 return __CEPH_IOCTXS[pool]
247def disconnect_ceph_clients() -> None:
248 """Clean CEPH clients"""
249 global __CEPH_CLIENT, __CEPH_IOCTXS
250 __CEPH_CLIENT = None
251 __CEPH_IOCTXS = {}
254def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]:
255 """Extract storage type, the unprefixed path, the container and the basename from path (Default: FILE storage)
257 For a FILE storage, the tray is the directory and the basename is the file name.
259 For an object storage (CEPH or S3), the tray is the bucket or the pool and the basename is the object name.
260 For a S3 bucket, format can be <bucket name>@<cluster name> to use several clusters. Cluster name is the host (without protocol)
262 Args:
263 path (str): path to analyse
265 Returns:
266 Tuple[StorageType, str, str, str]: storage type, unprefixed path, the container and the basename
267 """
269 if path.startswith("s3://"):
270 bucket_name, object_name = path[5:].split("/", 1)
271 return StorageType.S3, path[5:], bucket_name, object_name
272 elif path.startswith("ceph://"):
273 pool_name, object_name = path[7:].split("/", 1)
274 return StorageType.CEPH, path[7:], pool_name, object_name
275 elif path.startswith("file://"):
276 return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
277 elif path.startswith("http://"):
278 return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
279 elif path.startswith("https://"):
280 return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:])
281 else:
282 return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path)
285def get_path_from_infos(storage_type: StorageType, *args) -> str:
286 """Write full path from elements
288 Prefixed wih storage's type, elements are joined with a slash
290 Args:
291 storage_type (StorageType): Storage's type for path
293 Returns:
294 str: Full path
295 """
296 return f"{storage_type.value}{os.path.join(*args)}"
299def hash_file(path: str) -> str:
300 """Process MD5 sum of the provided file
302 Args:
303 path (str): path to file
305 Returns:
306 str: hexadeimal MD5 sum
307 """
309 checker = hashlib.md5()
311 with open(path, "rb") as file:
312 chunk = 0
313 while chunk != b"":
314 chunk = file.read(65536)
315 checker.update(chunk)
317 return checker.hexdigest()
320def get_data_str(path: str) -> str:
321 """Load full data into a string
323 Args:
324 path (str): path to data
326 Raises:
327 MissingEnvironmentError: Missing object storage informations
328 StorageError: Storage read issue
329 FileNotFoundError: File or object does not exist
330 NotImplementedError: Storage type not handled
332 Returns:
333 str: Data content
334 """
336 return get_data_binary(path).decode("utf-8")
339@lru_cache(maxsize=__LRU_SIZE)
340def __get_cached_data_binary(path: str, ttl_hash: int, range: Tuple[int, int] = None) -> str:
341 """Load data into a binary string, using a LRU cache
343 Args:
344 path (str): path to data
345 ttl_hash (int): time hash, to invalid cache
346 range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.
348 Raises:
349 MissingEnvironmentError: Missing object storage informations
350 StorageError: Storage read issue
351 FileNotFoundError: File or object does not exist
352 NotImplementedError: Storage type not handled
354 Returns:
355 str: Data binary content
356 """
357 storage_type, path, tray_name, base_name = get_infos_from_path(path)
359 if storage_type == StorageType.S3:
360 s3_client, bucket_name = __get_s3_client(tray_name)
362 try:
363 if range is None:
364 data = (
365 s3_client["client"]
366 .get_object(
367 Bucket=bucket_name,
368 Key=base_name,
369 )["Body"]
370 .read()
371 )
372 else:
373 data = (
374 s3_client["client"]
375 .get_object(
376 Bucket=bucket_name,
377 Key=base_name,
378 Range=f"bytes={range[0]}-{range[0] + range[1] - 1}",
379 )["Body"]
380 .read()
381 )
383 except botocore.exceptions.ClientError as e:
384 if e.response["Error"]["Code"] == "NoSuchKey":
385 raise FileNotFoundError(f"{storage_type.value}{path}")
386 else:
387 raise StorageError("S3", e)
389 except Exception as e:
390 raise StorageError("S3", e)
392 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
393 ioctx = __get_ceph_ioctx(tray_name)
395 try:
396 if range is None:
397 size, mtime = ioctx.stat(base_name)
398 data = ioctx.read(base_name, size)
399 else:
400 data = ioctx.read(base_name, range[1], range[0])
402 except rados.ObjectNotFound:
403 raise FileNotFoundError(f"{storage_type.value}{path}")
405 except Exception as e:
406 raise StorageError("CEPH", e)
408 elif storage_type == StorageType.FILE:
409 try:
410 f = open(path, "rb")
411 if range is None:
412 data = f.read()
413 else:
414 f.seek(range[0])
415 data = f.read(range[1])
417 f.close()
419 except FileNotFoundError:
420 raise FileNotFoundError(f"{storage_type.value}{path}")
422 except Exception as e:
423 raise StorageError("FILE", e)
425 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
426 if range is None:
427 try:
428 reponse = requests.get(f"{storage_type.value}{path}", stream=True)
429 data = reponse.content
430 if reponse.status_code == 404:
431 raise FileNotFoundError(f"{storage_type.value}{path}")
432 except Exception as e:
433 raise StorageError(storage_type.name, e)
434 else:
435 raise NotImplementedError("Cannot get partial data for storage type HTTP(S)")
437 else:
438 raise NotImplementedError(f"Cannot get data for storage type {storage_type.name}")
440 return data
443def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
444 """Load data into a binary string
446 This function uses a LRU cache, with a TTL of 5 minutes
448 Args:
449 path (str): path to data
450 range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.
452 Raises:
453 MissingEnvironmentError: Missing object storage informations
454 StorageError: Storage read issue
455 FileNotFoundError: File or object does not exist
456 NotImplementedError: Storage type not handled
458 Returns:
459 str: Data binary content
460 """
461 return __get_cached_data_binary(path, __get_ttl_hash(), range)
464def put_data_str(data: str, path: str) -> None:
465 """Store string data into a file or an object
467 UTF-8 encoding is used for bytes conversion
469 Args:
470 data (str): data to write
471 path (str): destination path, where to write data
473 Raises:
474 MissingEnvironmentError: Missing object storage informations
475 StorageError: Storage write issue
476 NotImplementedError: Storage type not handled
477 """
479 storage_type, path, tray_name, base_name = get_infos_from_path(path)
481 if storage_type == StorageType.S3:
482 s3_client, bucket_name = __get_s3_client(tray_name)
484 try:
485 s3_client["client"].put_object(
486 Body=data.encode("utf-8"), Bucket=bucket_name, Key=base_name
487 )
488 except Exception as e:
489 raise StorageError("S3", e)
491 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
492 ioctx = __get_ceph_ioctx(tray_name)
494 try:
495 ioctx.write_full(base_name, data.encode("utf-8"))
496 except Exception as e:
497 raise StorageError("CEPH", e)
499 elif storage_type == StorageType.FILE:
500 try:
501 f = open(path, "w")
502 f.write(data)
503 f.close()
504 except Exception as e:
505 raise StorageError("FILE", e)
507 else:
508 raise NotImplementedError(f"Cannot write data for storage type {storage_type.name}")
511def get_size(path: str) -> int:
512 """Get size of file or object
514 Args:
515 path (str): path of file/object whom size is asked
517 Raises:
518 MissingEnvironmentError: Missing object storage informations
519 StorageError: Storage read issue
520 NotImplementedError: Storage type not handled
522 Returns:
523 int: file/object size, in bytes
524 """
526 storage_type, path, tray_name, base_name = get_infos_from_path(path)
528 if storage_type == StorageType.S3:
529 s3_client, bucket_name = __get_s3_client(tray_name)
531 try:
532 size = s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)[
533 "ContentLength"
534 ]
535 return int(size)
536 except Exception as e:
537 raise StorageError("S3", e)
539 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
540 ioctx = __get_ceph_ioctx(tray_name)
542 try:
543 size, mtime = ioctx.stat(base_name)
544 return size
545 except Exception as e:
546 raise StorageError("CEPH", e)
548 elif storage_type == StorageType.FILE:
549 try:
550 file_stats = os.stat(path)
551 return file_stats.st_size
552 except Exception as e:
553 raise StorageError("FILE", e)
555 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
556 try:
557 # Le stream=True permet de ne télécharger que le header initialement
558 reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
559 return reponse
560 except Exception as e:
561 raise StorageError(storage_type.name, e)
563 else:
564 raise NotImplementedError(f"Cannot get size for storage type {storage_type.name}")
567def exists(path: str) -> bool:
568 """Do the file or object exist ?
570 Args:
571 path (str): path of file/object to test
573 Raises:
574 MissingEnvironmentError: Missing object storage informations
575 StorageError: Storage read issue
576 NotImplementedError: Storage type not handled
578 Returns:
579 bool: file/object existing status
580 """
582 storage_type, path, tray_name, base_name = get_infos_from_path(path)
584 if storage_type == StorageType.S3:
585 s3_client, bucket_name = __get_s3_client(tray_name)
587 try:
588 s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)
589 return True
590 except botocore.exceptions.ClientError as e:
591 if e.response["Error"]["Code"] == "404":
592 return False
593 else:
594 raise StorageError("S3", e)
596 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
597 ioctx = __get_ceph_ioctx(tray_name)
599 try:
600 ioctx.stat(base_name)
601 return True
602 except rados.ObjectNotFound:
603 return False
604 except Exception as e:
605 raise StorageError("CEPH", e)
607 elif storage_type == StorageType.FILE:
608 return os.path.exists(path)
610 elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
611 try:
612 response = requests.get(storage_type.value + path, stream=True)
613 if response.status_code == 200:
614 return True
615 else:
616 return False
617 except Exception as e:
618 raise StorageError(storage_type.name, e)
620 else:
621 raise NotImplementedError(f"Cannot test existence for storage type {storage_type.name}")
624def remove(path: str) -> None:
625 """Remove the file/object
627 Args:
628 path (str): path of file/object to remove
630 Raises:
631 MissingEnvironmentError: Missing object storage informations
632 StorageError: Storage removal issue
633 NotImplementedError: Storage type not handled
634 """
635 storage_type, path, tray_name, base_name = get_infos_from_path(path)
637 if storage_type == StorageType.S3:
638 s3_client, bucket_name = __get_s3_client(tray_name)
640 try:
641 s3_client["client"].delete_object(Bucket=bucket_name, Key=base_name)
642 except Exception as e:
643 raise StorageError("S3", e)
645 elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
646 ioctx = __get_ceph_ioctx(tray_name)
648 try:
649 ioctx.remove_object(base_name)
650 except rados.ObjectNotFound:
651 pass
652 except Exception as e:
653 raise StorageError("CEPH", e)
655 elif storage_type == StorageType.FILE:
656 try:
657 os.remove(path)
658 except FileNotFoundError:
659 pass
660 except Exception as e:
661 raise StorageError("FILE", e)
663 else:
664 raise NotImplementedError(f"Cannot remove data for storage type {storage_type.name}")
667def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
668 """Copy a file or object to a file or object place. If MD5 sum is provided, it is compared to sum after the copy.
670 Args:
671 from_path (str): source file/object path, to copy
672 to_path (str): destination file/object path
673 from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None.
675 Raises:
676 StorageError: Copy issue
677 MissingEnvironmentError: Missing object storage informations
678 NotImplementedError: Storage type not handled
679 """
681 from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path)
682 to_type, to_path, to_tray, to_base_name = get_infos_from_path(to_path)
684 # Réalisation de la copie, selon les types de stockage
685 if from_type == StorageType.FILE and to_type == StorageType.FILE:
686 try:
687 if to_tray != "":
688 os.makedirs(to_tray, exist_ok=True)
690 copyfile(from_path, to_path)
692 if from_md5 is not None:
693 to_md5 = hash_file(to_path)
694 if to_md5 != from_md5:
695 raise StorageError(
696 "FILE",
697 f"Invalid MD5 sum control for copy file {from_path} to {to_path} : {from_md5} != {to_md5}",
698 )
700 except Exception as e:
701 raise StorageError("FILE", f"Cannot copy file {from_path} to {to_path} : {e}")
703 elif from_type == StorageType.S3 and to_type == StorageType.FILE:
704 s3_client, from_bucket = __get_s3_client(from_tray)
706 try:
707 if to_tray != "":
708 os.makedirs(to_tray, exist_ok=True)
710 s3_client["client"].download_file(from_bucket, from_base_name, to_path)
712 if from_md5 is not None:
713 to_md5 = hash_file(to_path)
714 if to_md5 != from_md5:
715 raise StorageError(
716 "S3 and FILE",
717 f"Invalid MD5 sum control for copy S3 object {from_path} to file {to_path} : {from_md5} != {to_md5}",
718 )
720 except Exception as e:
721 raise StorageError(
722 "S3 and FILE", f"Cannot copy S3 object {from_path} to file {to_path} : {e}"
723 )
725 elif from_type == StorageType.FILE and to_type == StorageType.S3:
726 s3_client, to_bucket = __get_s3_client(to_tray)
728 try:
729 s3_client["client"].upload_file(from_path, to_bucket, to_base_name)
731 if from_md5 is not None:
732 to_md5 = (
733 s3_client["client"]
734 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
735 .strip('"')
736 )
737 if to_md5 != from_md5:
738 raise StorageError(
739 "FILE and S3",
740 f"Invalid MD5 sum control for copy file {from_path} to S3 object {to_path} : {from_md5} != {to_md5}",
741 )
742 except Exception as e:
743 raise StorageError(
744 "FILE and S3", f"Cannot copy file {from_path} to S3 object {to_path} : {e}"
745 )
747 elif from_type == StorageType.S3 and to_type == StorageType.S3:
748 from_s3_client, from_bucket = __get_s3_client(from_tray)
749 to_s3_client, to_bucket = __get_s3_client(to_tray)
751 try:
752 if to_s3_client["host"] == from_s3_client["host"]:
753 to_s3_client["client"].copy(
754 {"Bucket": from_bucket, "Key": from_base_name}, to_bucket, to_base_name
755 )
756 else:
757 with tempfile.NamedTemporaryFile("w+b") as f:
758 from_s3_client["client"].download_fileobj(from_bucket, from_base_name, f)
759 to_s3_client["client"].upload_file(f.name, to_bucket, to_base_name)
761 if from_md5 is not None:
762 to_md5 = (
763 to_s3_client["client"]
764 .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
765 .strip('"')
766 )
767 if to_md5 != from_md5:
768 raise StorageError(
769 "S3",
770 f"Invalid MD5 sum control for copy S3 object {from_path} to {to_path} : {from_md5} != {to_md5}",
771 )
773 except Exception as e:
774 raise StorageError("S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}")
776 elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.FILE:
777 ioctx = __get_ceph_ioctx(from_tray)
779 if from_md5 is not None:
780 checker = hashlib.md5()
782 try:
783 if to_tray != "":
784 os.makedirs(to_tray, exist_ok=True)
785 f = open(to_path, "wb")
787 offset = 0
788 size = 0
790 while True:
791 chunk = ioctx.read(from_base_name, 65536, offset)
792 size = len(chunk)
793 offset += size
794 f.write(chunk)
796 if from_md5 is not None:
797 checker.update(chunk)
799 if size < 65536:
800 break
802 f.close()
804 if from_md5 is not None and from_md5 != checker.hexdigest():
805 raise StorageError(
806 "CEPH and FILE",
807 f"Invalid MD5 sum control for copy CEPH object {from_path} to file {to_path} : {from_md5} != {checker.hexdigest()}",
808 )
810 except Exception as e:
811 raise StorageError(
812 "CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}"
813 )
815 elif from_type == StorageType.FILE and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
816 ioctx = __get_ceph_ioctx(to_tray)
818 if from_md5 is not None:
819 checker = hashlib.md5()
821 try:
822 f = open(from_path, "rb")
824 offset = 0
825 size = 0
827 while True:
828 chunk = f.read(65536)
829 size = len(chunk)
830 ioctx.write(to_base_name, chunk, offset)
831 offset += size
833 if from_md5 is not None:
834 checker.update(chunk)
836 if size < 65536:
837 break
839 f.close()
841 if from_md5 is not None and from_md5 != checker.hexdigest():
842 raise StorageError(
843 "FILE and CEPH",
844 f"Invalid MD5 sum control for copy file {from_path} to CEPH object {to_path} : {from_md5} != {checker.hexdigest()}",
845 )
847 except Exception as e:
848 raise StorageError(
849 "FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}"
850 )
852 elif from_type == StorageType.CEPH and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
853 from_ioctx = __get_ceph_ioctx(from_tray)
854 to_ioctx = __get_ceph_ioctx(to_tray)
856 if from_md5 is not None:
857 checker = hashlib.md5()
859 try:
860 offset = 0
861 size = 0
863 while True:
864 chunk = from_ioctx.read(from_base_name, 65536, offset)
865 size = len(chunk)
866 to_ioctx.write(to_base_name, chunk, offset)
867 offset += size
869 if from_md5 is not None:
870 checker.update(chunk)
872 if size < 65536:
873 break
875 if from_md5 is not None and from_md5 != checker.hexdigest():
876 raise StorageError(
877 "FILE and CEPH",
878 f"Invalid MD5 sum control for copy CEPH object {from_path} to {to_path} : {from_md5} != {checker.hexdigest()}",
879 )
881 except Exception as e:
882 raise StorageError("CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}")
884 elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.S3:
885 from_ioctx = __get_ceph_ioctx(from_tray)
887 s3_client, to_bucket = __get_s3_client(to_tray)
889 if from_md5 is not None:
890 checker = hashlib.md5()
892 try:
893 offset = 0
894 size = 0
896 with tempfile.NamedTemporaryFile("w+b", delete=False) as f:
897 name_tmp = f.name
898 while True:
899 chunk = from_ioctx.read(from_base_name, 65536, offset)
900 size = len(chunk)
901 offset += size
902 f.write(chunk)
904 if from_md5 is not None:
905 checker.update(chunk)
907 if size < 65536:
908 break
910 s3_client["client"].upload_file(name_tmp, to_bucket, to_base_name)
912 os.remove(name_tmp)
914 if from_md5 is not None and from_md5 != checker.hexdigest():
915 raise StorageError(
916 "CEPH and S3",
917 f"Invalid MD5 sum control for copy CEPH object {from_path} to S3 object {to_path} : {from_md5} != {checker.hexdigest()}",
918 )
920 except Exception as e:
921 raise StorageError(
922 "CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
923 )
925 elif (
926 from_type == StorageType.HTTP or from_type == StorageType.HTTPS
927 ) and to_type == StorageType.FILE:
928 try:
929 response = requests.get(from_type.value + from_path, stream=True)
930 with open(to_path, "wb") as f:
931 for chunk in response.iter_content(chunk_size=65536):
932 if chunk:
933 f.write(chunk)
935 except Exception as e:
936 raise StorageError(
937 "HTTP(S) and FILE",
938 f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}",
939 )
941 elif (
942 (from_type == StorageType.HTTP or from_type == StorageType.HTTPS)
943 and to_type == StorageType.CEPH
944 and CEPH_RADOS_AVAILABLE
945 ):
946 to_ioctx = __get_ceph_ioctx(to_tray)
948 try:
949 response = requests.get(from_type.value + from_path, stream=True)
950 offset = 0
951 for chunk in response.iter_content(chunk_size=65536):
952 if chunk:
953 size = len(chunk)
954 to_ioctx.write(to_base_name, chunk, offset)
955 offset += size
957 except Exception as e:
958 raise StorageError(
959 "HTTP(S) and CEPH",
960 f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}",
961 )
963 elif (
964 from_type == StorageType.HTTP or from_type == StorageType.HTTPS
965 ) and to_type == StorageType.S3:
966 to_s3_client, to_bucket = __get_s3_client(to_tray)
968 try:
969 response = requests.get(from_type.value + from_path, stream=True)
970 with tempfile.NamedTemporaryFile("w+b", delete=False) as f:
971 name_fich = f.name
972 for chunk in response.iter_content(chunk_size=65536):
973 if chunk:
974 f.write(chunk)
976 to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name)
978 os.remove(name_fich)
980 except Exception as e:
981 raise StorageError(
982 "HTTP(S) and S3",
983 f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}",
984 )
986 else:
987 raise NotImplementedError(
988 f"Cannot copy data from storage type {from_type.name} to storage type {to_type.name}"
989 )
992def link(target_path: str, link_path: str, hard: bool = False) -> None:
993 """Create a symbolic link
995 Args:
996 target_path (str): file/object to link
997 link_path (str): link to create
998 hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False.
1000 Raises:
1001 StorageError: link issue
1002 MissingEnvironmentError: Missing object storage informations
1003 NotImplementedError: Storage type not handled
1004 """
1006 target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path)
1007 link_type, link_path, link_tray, link_base_name = get_infos_from_path(link_path)
1009 if target_type != link_type:
1010 raise StorageError(
1011 f"{target_type.name} and {link_type.name}",
1012 "Cannot make link between two different storage types",
1013 )
1015 if hard and target_type != StorageType.FILE:
1016 raise StorageError(target_type.name, "Hard link is available only for FILE storage")
1018 # Réalisation du lien, selon les types de stockage
1019 if target_type == StorageType.S3:
1020 target_s3_client, target_bucket = __get_s3_client(target_tray)
1021 link_s3_client, link_bucket = __get_s3_client(link_tray)
1023 if target_s3_client["host"] != link_s3_client["host"]:
1024 raise StorageError(
1025 "S3",
1026 f"Cannot make link {link_path} -> {target_path} : link works only on the same S3 cluster",
1027 )
1029 try:
1030 target_s3_client["client"].put_object(
1031 Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode(),
1032 Bucket=link_bucket,
1033 Key=link_base_name,
1034 )
1035 except Exception as e:
1036 raise StorageError("S3", e)
1038 elif target_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
1039 ioctx = __get_ceph_ioctx(link_tray)
1041 try:
1042 ioctx.write_full(link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode())
1043 except Exception as e:
1044 raise StorageError("CEPH", e)
1046 elif target_type == StorageType.FILE:
1047 try:
1048 to_tray = get_infos_from_path(link_path)[2]
1049 if to_tray != "":
1050 os.makedirs(to_tray, exist_ok=True)
1052 if exists(link_path):
1053 remove(link_path)
1054 if hard:
1055 os.link(target_path, link_path)
1056 else:
1057 os.symlink(target_path, link_path)
1058 except Exception as e:
1059 raise StorageError("FILE", e)
1061 else:
1062 raise NotImplementedError(f"Cannot make link for storage type {target_type.name}")
1065def get_osgeo_path(path: str) -> str:
1066 """Return GDAL/OGR Open compliant path and configure storage access
1068 For a S3 input path, endpoint, access and secret keys are set and path is built with "/vsis3" root.
1070 For a FILE input path, only storage prefix is removed
1072 Args:
1073 path (str): Source path
1075 Raises:
1076 NotImplementedError: Storage type not handled
1078 Returns:
1079 str: GDAL/OGR Open compliant path
1080 """
1082 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)
1084 if storage_type == StorageType.S3 and GDAL_AVAILABLE:
1085 s3_client, bucket_name = __get_s3_client(tray_name)
1087 gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", s3_client["secret_key"])
1088 gdal.SetConfigOption("AWS_ACCESS_KEY_ID", s3_client["key"])
1089 gdal.SetConfigOption("AWS_S3_ENDPOINT", s3_client["host"])
1090 gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE")
1091 if not s3_client["secure"]:
1092 gdal.SetConfigOption("AWS_HTTPS", "NO")
1094 return f"/vsis3/{bucket_name}/{base_name}"
1096 elif storage_type == StorageType.FILE:
1097 return unprefixed_path
1099 else:
1100 raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}")
1103def size_path(path: str) -> int:
1104 """Return the size of the given path (or, for the CEPH, the sum of the size of each object of the .list)
1106 Args:
1107 path (str): Source path
1109 Raises:
1110 StorageError: Unhandled link or link issue
1111 MissingEnvironmentError: Missing object storage informations
1112 NotImplementedError: Storage type not handled
1114 Returns:
1115 int: size of the path
1116 """
1117 storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)
1119 if storage_type == StorageType.FILE:
1120 try:
1121 total = 0
1122 with os.scandir(unprefixed_path) as it:
1123 for entry in it:
1124 if entry.is_file():
1125 total += entry.stat().st_size
1126 elif entry.is_dir():
1127 total += size_path(entry.path)
1129 except Exception as e:
1130 raise StorageError("FILE", e)
1132 elif storage_type == StorageType.S3:
1133 s3_client, bucket_name = __get_s3_client(tray_name)
1135 try:
1136 paginator = s3_client["client"].get_paginator("list_objects_v2")
1137 pages = paginator.paginate(
1138 Bucket=bucket_name,
1139 Prefix=base_name + "/",
1140 PaginationConfig={
1141 "PageSize": 10000,
1142 },
1143 )
1144 total = 0
1145 for page in pages:
1146 for key in page["Contents"]:
1147 total += key["Size"]
1149 except Exception as e:
1150 raise StorageError("S3", e)
1152 else:
1153 raise NotImplementedError(
1154 f"Cannot get prefix path size for storage type {storage_type.name}"
1155 )
1157 return total