Module rok4.Storage

Provide functions to use read or write

Available storage types are : - S3 (path are preffixed with s3://) - CEPH (path are prefixed with ceph://) - FILE (path are prefixed with file://, but it is the default paths' interpretation) - HTTP (path are prefixed with http://) - HTTPS (path are prefixed with https://)

According to functions, all storage types are not necessarily available.

Using CEPH storage requires environment variables : - ROK4_CEPH_CONFFILE - ROK4_CEPH_USERNAME - ROK4_CEPH_CLUSTERNAME

Using S3 storage requires environment variables : - ROK4_S3_KEY - ROK4_S3_SECRETKEY - ROK4_S3_URL

To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements

Example: work with 2 S3 clusters:

To precise the cluster to use, bucket name should be bucket_name@s3.storage.fr or bucket_name@s4.storage.fr. If no host is defined (no @) in the bucket name, first S3 cluster is used

Expand source code
"""Provide functions to use read or write

Available storage types are :
- S3 (path are preffixed with `s3://`)
- CEPH (path are prefixed with `ceph://`)
- FILE (path are prefixed with `file://`, but it is the default paths' interpretation)
- HTTP (path are prefixed with `http://`)
- HTTPS (path are prefixed with `https://`)

According to functions, all storage types are not necessarily available.

Using CEPH storage requires environment variables :
- ROK4_CEPH_CONFFILE
- ROK4_CEPH_USERNAME
- ROK4_CEPH_CLUSTERNAME

Using S3 storage requires environment variables :
- ROK4_S3_KEY
- ROK4_S3_SECRETKEY
- ROK4_S3_URL

To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements

Example: work with 2 S3 clusters:

- ROK4_S3_KEY=KEY1,KEY2
- ROK4_S3_SECRETKEY=SKEY1,SKEY2
- ROK4_S3_URL=https://s3.storage.fr,https://s4.storage.fr

To precise the cluster to use, bucket name should be bucket_name@s3.storage.fr or bucket_name@s4.storage.fr. If no host is defined (no @) in the bucket name, first S3 cluster is used
"""

import boto3
import botocore.exceptions
import tempfile
import re
import os
import rados
import hashlib
import requests
from typing import Dict, List, Tuple, Union
from enum import Enum
from shutil import copyfile
from osgeo import gdal

gdal.UseExceptions()

from rok4.Exceptions import *


class StorageType(Enum):
    FILE = "file://"
    S3 = "s3://"
    CEPH = "ceph://"
    HTTP = "http://"
    HTTPS = "https://"


__S3_CLIENTS = {}
__S3_DEFAULT_CLIENT = None


def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]:
    """Get the S3 client

    Create it if not already done

    Args:
        bucket_name (str): S3 bucket name. Could be just the bucket name, or <bucket name>@<cluster host>

    Raises:
        MissingEnvironmentError: Missing S3 storage informations
        StorageError: S3 client configuration issue

    Returns:
        Tuple[Dict[str, Union['boto3.client',str]], str, str]: the S3 informations (client, host, key, secret) and the simple bucket name
    """
    
    global __S3_CLIENTS, __S3_DEFAULT_CLIENT

    if not __S3_CLIENTS:
        # C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement
        try:
            keys = os.environ["ROK4_S3_KEY"].split(",")
            secret_keys = os.environ["ROK4_S3_SECRETKEY"].split(",")
            urls = os.environ["ROK4_S3_URL"].split(",")

            if len(keys) != len(secret_keys) or len(keys) != len(urls):
                raise StorageError(
                    "S3",
                    "S3 informations in environment variables are inconsistent : same number of element in each list is required",
                )

            for i in range(len(keys)):
                h = re.sub("https?://", "", urls[i])

                if h in __S3_CLIENTS:
                    raise StorageError("S3", "A S3 cluster is defined twice (based on URL)")

                __S3_CLIENTS[h] = {
                    "client": boto3.client(
                        "s3",
                        aws_access_key_id=keys[i],
                        aws_secret_access_key=secret_keys[i],
                        endpoint_url=urls[i],
                    ),
                    "key": keys[i],
                    "secret_key": secret_keys[i],
                    "url": urls[i],
                    "host": h,
                }

                if i == 0:
                    # Le premier cluster est celui par défaut
                    __S3_DEFAULT_CLIENT = h

        except KeyError as e:
            raise MissingEnvironmentError(e)
        except Exception as e:
            raise StorageError("S3", e)

    try:
        host = bucket_name.split("@")[1]
    except IndexError:
        host = __S3_DEFAULT_CLIENT

    bucket_name = bucket_name.split("@")[0]

    if host not in __S3_CLIENTS:
        raise StorageError("S3", f"Unknown S3 cluster, according to host '{host}'")

    return __S3_CLIENTS[host], bucket_name


def disconnect_s3_clients() -> None:
    """Clean S3 clients"""
  
    global __S3_CLIENTS, __S3_DEFAULT_CLIENT
    __S3_CLIENTS = {}
    __S3_DEFAULT_CLIENT = None


__CEPH_CLIENT = None
__CEPH_IOCTXS = {}


def __get_ceph_ioctx(pool: str) -> "rados.Ioctx":
    """Get the CEPH IO context

    Create it (client and context) if not already done

    Args:
        pool (str): CEPH pool's name

    Raises:
        MissingEnvironmentError: Missing CEPH storage informations
        StorageError: CEPH IO context configuration issue

    Returns:
        rados.Ioctx: IO ceph context
    """
    global __CEPH_CLIENT, __CEPH_IOCTXS

    if __CEPH_CLIENT is None:
        try:
            __CEPH_CLIENT = rados.Rados(
                conffile=os.environ["ROK4_CEPH_CONFFILE"],
                clustername=os.environ["ROK4_CEPH_CLUSTERNAME"],
                name=os.environ["ROK4_CEPH_USERNAME"],
            )

            __CEPH_CLIENT.connect()

        except KeyError as e:
            raise MissingEnvironmentError(e)
        except Exception as e:
            raise StorageError("CEPH", e)

    if pool not in __CEPH_IOCTXS:
        try:
            __CEPH_IOCTXS[pool] = __CEPH_CLIENT.open_ioctx(pool)
        except Exception as e:
            raise StorageError("CEPH", e)

    return __CEPH_IOCTXS[pool]


def disconnect_ceph_clients() -> None:
    """Clean CEPH clients"""
    global __CEPH_CLIENT, __CEPH_IOCTXS
    __CEPH_CLIENT = None
    __CEPH_IOCTXS = {}


__OBJECT_SYMLINK_SIGNATURE = "SYMLINK#"


def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]:
    """Extract storage type, the unprefixed path, the container and the basename from path (Default: FILE storage)

    For a FILE storage, the tray is the directory and the basename is the file name.

    For an object storage (CEPH or S3), the tray is the bucket or the pool and the basename is the object name.
    For a S3 bucket, format can be <bucket name>@<cluster name> to use several clusters. Cluster name is the host (without protocol)

    Args:
        path (str): path to analyse

    Returns:
        Tuple[StorageType, str, str, str]: storage type, unprefixed path, the container and the basename
    """

    if path.startswith("s3://"):
        bucket_name, object_name = path[5:].split("/", 1)
        return StorageType.S3, path[5:], bucket_name, object_name
    elif path.startswith("ceph://"):
        pool_name, object_name = path[7:].split("/", 1)
        return StorageType.CEPH, path[7:], pool_name, object_name
    elif path.startswith("file://"):
        return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
    elif path.startswith("http://"):
        return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
    elif path.startswith("https://"):
        return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:])
    else:
        return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path)


def get_path_from_infos(storage_type: StorageType, *args) -> str:
    """Write full path from elements

    Prefixed wih storage's type, elements are joined with a slash

    Args:
        storage_type (StorageType): Storage's type for path

    Returns:
        str: Full path
    """
    return f"{storage_type.value}{os.path.join(*args)}"


def hash_file(path: str) -> str:
    """Process MD5 sum of the provided file

    Args:
        path (str): path to file

    Returns:
        str: hexadeimal MD5 sum
    """

    checker = hashlib.md5()

    with open(path, "rb") as file:
        chunk = 0
        while chunk != b"":
            chunk = file.read(65536)
            checker.update(chunk)

    return checker.hexdigest()


def get_data_str(path: str) -> str:
    """Load full data into a string

    Args:
        path (str): path to data

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue
        FileNotFoundError: File or object does not exist

    Returns:
        str: Data content
    """

    return get_data_binary(path).decode("utf-8")


def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
    """Load data into a binary string

    Args:
        path (str): path to data
        range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue
        FileNotFoundError: File or object does not exist

    Returns:
        str: Data binary content
    """
    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            if range is None:
                data = (
                    s3_client["client"]
                    .get_object(
                        Bucket=bucket_name,
                        Key=base_name,
                    )["Body"]
                    .read()
                )
            else:
                data = (
                    s3_client["client"]
                    .get_object(
                        Bucket=bucket_name,
                        Key=base_name,
                        Range=f"bytes={range[0]}-{range[0] + range[1] - 1}",
                    )["Body"]
                    .read()
                )

        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                raise FileNotFoundError(f"{storage_type.value}{path}")
            else:
                raise StorageError("S3", e)

        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            if range is None:
                size, mtime = ioctx.stat(base_name)
                data = ioctx.read(base_name, size)
            else:
                data = ioctx.read(base_name, range[1], range[0])

        except rados.ObjectNotFound as e:
            raise FileNotFoundError(f"{storage_type.value}{path}")

        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            f = open(path, "rb")
            if range is None:
                data = f.read()
            else:
                f.seek(range[0])
                data = f.read(range[1])

            f.close()

        except FileNotFoundError as e:
            raise FileNotFoundError(f"{storage_type.value}{path}")

        except Exception as e:
            raise StorageError("FILE", e)

    elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

        if range is None :
            try:
                reponse = requests.get(f"{storage_type.value}{path}", stream=True)
                data = reponse.content
                if reponse.status_code == 404 :
                    raise FileNotFoundError(f"{storage_type.value}{path}")
            except Exception as e:
                raise StorageError(storage_type.name, e)
        else :
            raise NotImplementedError

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to read binary data")

    return data


def put_data_str(data: str, path: str) -> None:
    """Store string data into a file or an object

    UTF-8 encoding is used for bytes conversion

    Args:
        data (str): data to write
        path (str): destination path, where to write data

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage write issue
    """

    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            s3_client["client"].put_object(
                Body=data.encode("utf-8"), Bucket=bucket_name, Key=base_name
            )
        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            ioctx.write_full(base_name, data.encode("utf-8"))
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            f = open(path, "w")
            f.write(data)
            f.close()
        except Exception as e:
            raise StorageError("FILE", e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to write string data")


def get_size(path: str) -> int:
    """Get size of file or object

    Args:
        path (str): path of file/object whom size is asked

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue

    Returns:
        int: file/object size, in bytes
    """

    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            size = s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)[
                "ContentLength"
            ]
            return int(size)
        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            size, mtime = ioctx.stat(base_name)
            return size
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            file_stats = os.stat(path)
            return file_stats.st_size
        except Exception as e:
            raise StorageError("FILE", e)

    elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

        try:
            # Le stream=True permet de ne télécharger que le header initialement
            reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
            return reponse
        except Exception as e:
            raise StorageError(storage_type.name, e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to get size")


def exists(path: str) -> bool:
    """Do the file or object exist ?

    Args:
        path (str): path of file/object to test

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue

    Returns:
        bool: file/object existing status
    """

    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)
            return True
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                return False
            else:
                raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            ioctx.stat(base_name)
            return True
        except rados.ObjectNotFound as e:
            return False
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        return os.path.exists(path)

    elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

        try:
            response = requests.get(storage_type.value + path, stream=True)
            if response.status_code == 200 :
                return True
            else :
                return False
        except Exception as e:
            raise StorageError(storage_type.name, e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to test if exists")


def remove(path: str) -> None:
    """Remove the file/object

    Args:
        path (str): path of file/object to remove

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage removal issue
    """
    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            s3_client["client"].delete_object(Bucket=bucket_name, Key=base_name)
        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            ioctx.remove_object(base_name)
        except rados.ObjectNotFound as e:
            pass
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            os.remove(path)
        except FileNotFoundError as e:
            pass
        except Exception as e:
            raise StorageError("FILE", e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to remove things")


def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
    """Copy a file or object to a file or object place. If MD5 sum is provided, it is compared to sum after the copy.

    Args:
        from_path (str): source file/object path, to copy
        to_path (str): destination file/object path
        from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None.

    Raises:
        StorageError: Unhandled copy or copy issue
        MissingEnvironmentError: Missing object storage informations
    """

    from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path)
    to_type, to_path, to_tray, to_base_name = get_infos_from_path(to_path)

    # Réalisation de la copie, selon les types de stockage
    if from_type == StorageType.FILE and to_type == StorageType.FILE:
        try:
            if to_tray != "":
                os.makedirs(to_tray, exist_ok=True)

            copyfile(from_path, to_path)

            if from_md5 is not None:
                to_md5 = hash_file(to_path)
                if to_md5 != from_md5:
                    raise StorageError(
                        f"FILE",
                        f"Invalid MD5 sum control for copy file {from_path} to {to_path} : {from_md5} != {to_md5}",
                    )

        except Exception as e:
            raise StorageError(f"FILE", f"Cannot copy file {from_path} to {to_path} : {e}")

    elif from_type == StorageType.S3 and to_type == StorageType.FILE:
        s3_client, from_bucket = __get_s3_client(from_tray)

        try:
            if to_tray != "":
                os.makedirs(to_tray, exist_ok=True)

            s3_client["client"].download_file(from_bucket, from_base_name, to_path)

            if from_md5 is not None:
                to_md5 = hash_file(to_path)
                if to_md5 != from_md5:
                    raise StorageError(
                        "S3 and FILE",
                        f"Invalid MD5 sum control for copy S3 object {from_path} to file {to_path} : {from_md5} != {to_md5}",
                    )

        except Exception as e:
            raise StorageError(
                f"S3 and FILE", f"Cannot copy S3 object {from_path} to file {to_path} : {e}"
            )

    elif from_type == StorageType.FILE and to_type == StorageType.S3:
        s3_client, to_bucket = __get_s3_client(to_tray)

        try:
            s3_client["client"].upload_file(from_path, to_bucket, to_base_name)

            if from_md5 is not None:
                to_md5 = (
                    s3_client["client"]
                    .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
                    .strip('"')
                )
                if to_md5 != from_md5:
                    raise StorageError(
                        f"FILE and S3",
                        f"Invalid MD5 sum control for copy file {from_path} to S3 object {to_path} : {from_md5} != {to_md5}",
                    )
        except Exception as e:
            raise StorageError(
                f"FILE and S3", f"Cannot copy file {from_path} to S3 object {to_path} : {e}"
            )

    elif from_type == StorageType.S3 and to_type == StorageType.S3:
        from_s3_client, from_bucket = __get_s3_client(from_tray)
        to_s3_client, to_bucket = __get_s3_client(to_tray)

        try:
            if to_s3_client["host"] == from_s3_client["host"]:
                to_s3_client["client"].copy(
                    {"Bucket": from_bucket, "Key": from_base_name}, to_bucket, to_base_name
                )
            else:
                with tempfile.NamedTemporaryFile("w+b") as f:
                    from_s3_client["client"].download_fileobj(from_bucket, from_base_name, f)
                    to_s3_client["client"].upload_file(f.name, to_bucket, to_base_name)

            if from_md5 is not None:
                to_md5 = (
                    to_s3_client["client"]
                    .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
                    .strip('"')
                )
                if to_md5 != from_md5:
                    raise StorageError(
                        f"S3",
                        f"Invalid MD5 sum control for copy S3 object {from_path} to {to_path} : {from_md5} != {to_md5}",
                    )

        except Exception as e:
            raise StorageError(f"S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}")

    elif from_type == StorageType.CEPH and to_type == StorageType.FILE:
        ioctx = __get_ceph_ioctx(from_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            if to_tray != "":
                os.makedirs(to_tray, exist_ok=True)
            f = open(to_path, "wb")

            offset = 0
            size = 0

            while True:
                chunk = ioctx.read(from_base_name, 65536, offset)
                size = len(chunk)
                offset += size
                f.write(chunk)

                if from_md5 is not None:
                    checker.update(chunk)

                if size < 65536:
                    break

            f.close()

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"CEPH and FILE",
                    f"Invalid MD5 sum control for copy CEPH object {from_path} to file {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(
                f"CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}"
            )

    elif from_type == StorageType.FILE and to_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(to_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            f = open(from_path, "rb")

            offset = 0
            size = 0

            while True:
                chunk = f.read(65536)
                size = len(chunk)
                ioctx.write(to_base_name, chunk, offset)
                offset += size

                if from_md5 is not None:
                    checker.update(chunk)

                if size < 65536:
                    break

            f.close()

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"FILE and CEPH",
                    f"Invalid MD5 sum control for copy file {from_path} to CEPH object {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(
                f"FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}"
            )

    elif from_type == StorageType.CEPH and to_type == StorageType.CEPH:
        from_ioctx = __get_ceph_ioctx(from_tray)
        to_ioctx = __get_ceph_ioctx(to_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            offset = 0
            size = 0

            while True:
                chunk = from_ioctx.read(from_base_name, 65536, offset)
                size = len(chunk)
                to_ioctx.write(to_base_name, chunk, offset)
                offset += size

                if from_md5 is not None:
                    checker.update(chunk)

                if size < 65536:
                    break

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"FILE and CEPH",
                    f"Invalid MD5 sum control for copy CEPH object {from_path} to {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(f"CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}")

    elif from_type == StorageType.CEPH and to_type == StorageType.S3:
        from_ioctx = __get_ceph_ioctx(from_tray)

        s3_client, to_bucket = __get_s3_client(to_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            offset = 0
            size = 0

            with tempfile.NamedTemporaryFile("w+b", delete=False) as f:
                name_tmp = f.name
                while True:
                    chunk = from_ioctx.read(from_base_name, 65536, offset)
                    size = len(chunk)
                    offset += size
                    f.write(chunk)

                    if from_md5 is not None:
                        checker.update(chunk)

                    if size < 65536:
                        break

            s3_client["client"].upload_file(name_tmp, to_bucket, to_base_name)

            os.remove(name_tmp)

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"CEPH and S3",
                    f"Invalid MD5 sum control for copy CEPH object {from_path} to S3 object {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(
                f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
            )

    elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE :

        try:
            response = requests.get(from_type.value + from_path, stream = True)
            with open(to_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=65536) :
                    if chunk:
                        f.write(chunk)

        except Exception as e:
            raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}")

    elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH :

        to_ioctx = __get_ceph_ioctx(to_tray)

        try:
            response = requests.get(from_type.value + from_path, stream = True)
            offset = 0
            for chunk in response.iter_content(chunk_size=65536) :
                if chunk:
                    size = len(chunk)
                    to_ioctx.write(to_base_name, chunk, offset)
                    offset += size

        except Exception as e:
            raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}")

    elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 :

        to_s3_client, to_bucket = __get_s3_client(to_tray)

        try:
            response = requests.get(from_type.value + from_path, stream = True)
            with tempfile.NamedTemporaryFile("w+b",delete=False) as f:
                name_fich = f.name
                for chunk in response.iter_content(chunk_size=65536) :
                    if chunk:
                        f.write(chunk)

            to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name)

            os.remove(name_fich)

        except Exception as e:
            raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}")

    else:
        raise StorageError(
            f"{from_type.name} and {to_type.name}",
            f"Cannot copy from {from_type.name} to {to_type.name}",
        )


def link(target_path: str, link_path: str, hard: bool = False) -> None:
    """Create a symbolic link

    Args:
        target_path (str): file/object to link
        link_path (str): link to create
        hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False.

    Raises:
        StorageError: Unhandled link or link issue
        MissingEnvironmentError: Missing object storage informations
    """

    target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path)
    link_type, link_path, link_tray, link_base_name = get_infos_from_path(link_path)

    if target_type != link_type:
        raise StorageError(
            f"{target_type.name} and {link_type.name}",
            f"Cannot make link between two different storage types",
        )

    if hard and target_type != StorageType.FILE:
        raise StorageError(target_type.name, "Hard link is available only for FILE storage")

    # Réalisation du lien, selon les types de stockage
    if target_type == StorageType.S3:
        target_s3_client, target_bucket = __get_s3_client(target_tray)
        link_s3_client, link_bucket = __get_s3_client(link_tray)

        if target_s3_client["host"] != link_s3_client["host"]:
            raise StorageError(
                f"S3",
                f"Cannot make link {link_path} -> {target_path} : link works only on the same S3 cluster",
            )

        try:
            target_s3_client["client"].put_object(
                Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode(
                    "utf-8"
                ),
                Bucket=link_bucket,
                Key=link_base_name,
            )
        except Exception as e:
            raise StorageError("S3", e)

    elif target_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(link_tray)

        try:
            ioctx.write_full(
                link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode("utf-8")
            )
        except Exception as e:
            raise StorageError("CEPH", e)

    elif target_type == StorageType.FILE:
        try:
            if hard:
                os.link(target_path, link_path)
            else:
                os.symlink(target_path, link_path)
        except Exception as e:
            raise StorageError("FILE", e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to make link")


def get_osgeo_path(path: str) -> str:
    """Return GDAL/OGR Open compliant path and configure storage access

    For a S3 input path, endpoint, access and secret keys are set and path is built with "/vsis3" root.

    For a FILE input path, only storage prefix is removed

    Args:
        path (str): Source path

    Raises:
        NotImplementedError: Storage type not handled

    Returns:
        str: GDAL/OGR Open compliant path
    """

    storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", s3_client["secret_key"])
        gdal.SetConfigOption("AWS_ACCESS_KEY_ID", s3_client["key"])
        gdal.SetConfigOption("AWS_S3_ENDPOINT", s3_client["host"])
        gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE")

        return f"/vsis3/{bucket_name}/{base_name}"

    elif storage_type == StorageType.FILE:
        return unprefixed_path

    else:
        raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}")

def size_path(path: str) -> int :
    """Return the size of the path given (or, for the CEPH, the sum of the size of each object of the .list)

    Args:
        path (str): Source path

    Raises:
        StorageError: Unhandled link or link issue
        MissingEnvironmentError: Missing object storage informations

    Returns:
        int: size of the path
    """
    storage_type, unprefixed_path, tray_name, base_name  = get_infos_from_path(path)

    if storage_type == StorageType.FILE:
        try :
            total = 0
            with os.scandir(unprefixed_path) as it:
                for entry in it:
                    if entry.is_file():
                        total += entry.stat().st_size
                    elif entry.is_dir():
                        total += size_path(entry.path)

        except Exception as e:
            raise StorageError("FILE", e)

    elif storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try :
            paginator = s3_client["client"].get_paginator('list_objects_v2')
            pages = paginator.paginate(
                Bucket=bucket_name,
                Prefix=base_name+"/",
                PaginationConfig={
                    'PageSize': 10000,
                }
            )
            total = 0
            for page in pages:
                for key in page['Contents']:
                    total += key['Size']

        except Exception as e:
            raise StorageError("S3", e)


    elif storage_type == StorageType.CEPH:
        raise NotImplementedError
    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to calculate size")

    return total

Functions

def copy(from_path: str, to_path: str, from_md5: str = None) ‑> None

Copy a file or object to a file or object place. If MD5 sum is provided, it is compared to sum after the copy.

Args

from_path : str
source file/object path, to copy
to_path : str
destination file/object path
from_md5 : str, optional
MD5 sum, re-processed after copy and controlled. Defaults to None.

Raises

StorageError
Unhandled copy or copy issue
MissingEnvironmentError
Missing object storage informations
Expand source code
def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
    """Copy a file or object to a file or object place. If MD5 sum is provided, it is compared to sum after the copy.

    Args:
        from_path (str): source file/object path, to copy
        to_path (str): destination file/object path
        from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None.

    Raises:
        StorageError: Unhandled copy or copy issue
        MissingEnvironmentError: Missing object storage informations
    """

    from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path)
    to_type, to_path, to_tray, to_base_name = get_infos_from_path(to_path)

    # Réalisation de la copie, selon les types de stockage
    if from_type == StorageType.FILE and to_type == StorageType.FILE:
        try:
            if to_tray != "":
                os.makedirs(to_tray, exist_ok=True)

            copyfile(from_path, to_path)

            if from_md5 is not None:
                to_md5 = hash_file(to_path)
                if to_md5 != from_md5:
                    raise StorageError(
                        f"FILE",
                        f"Invalid MD5 sum control for copy file {from_path} to {to_path} : {from_md5} != {to_md5}",
                    )

        except Exception as e:
            raise StorageError(f"FILE", f"Cannot copy file {from_path} to {to_path} : {e}")

    elif from_type == StorageType.S3 and to_type == StorageType.FILE:
        s3_client, from_bucket = __get_s3_client(from_tray)

        try:
            if to_tray != "":
                os.makedirs(to_tray, exist_ok=True)

            s3_client["client"].download_file(from_bucket, from_base_name, to_path)

            if from_md5 is not None:
                to_md5 = hash_file(to_path)
                if to_md5 != from_md5:
                    raise StorageError(
                        "S3 and FILE",
                        f"Invalid MD5 sum control for copy S3 object {from_path} to file {to_path} : {from_md5} != {to_md5}",
                    )

        except Exception as e:
            raise StorageError(
                f"S3 and FILE", f"Cannot copy S3 object {from_path} to file {to_path} : {e}"
            )

    elif from_type == StorageType.FILE and to_type == StorageType.S3:
        s3_client, to_bucket = __get_s3_client(to_tray)

        try:
            s3_client["client"].upload_file(from_path, to_bucket, to_base_name)

            if from_md5 is not None:
                to_md5 = (
                    s3_client["client"]
                    .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
                    .strip('"')
                )
                if to_md5 != from_md5:
                    raise StorageError(
                        f"FILE and S3",
                        f"Invalid MD5 sum control for copy file {from_path} to S3 object {to_path} : {from_md5} != {to_md5}",
                    )
        except Exception as e:
            raise StorageError(
                f"FILE and S3", f"Cannot copy file {from_path} to S3 object {to_path} : {e}"
            )

    elif from_type == StorageType.S3 and to_type == StorageType.S3:
        from_s3_client, from_bucket = __get_s3_client(from_tray)
        to_s3_client, to_bucket = __get_s3_client(to_tray)

        try:
            if to_s3_client["host"] == from_s3_client["host"]:
                to_s3_client["client"].copy(
                    {"Bucket": from_bucket, "Key": from_base_name}, to_bucket, to_base_name
                )
            else:
                with tempfile.NamedTemporaryFile("w+b") as f:
                    from_s3_client["client"].download_fileobj(from_bucket, from_base_name, f)
                    to_s3_client["client"].upload_file(f.name, to_bucket, to_base_name)

            if from_md5 is not None:
                to_md5 = (
                    to_s3_client["client"]
                    .head_object(Bucket=to_bucket, Key=to_base_name)["ETag"]
                    .strip('"')
                )
                if to_md5 != from_md5:
                    raise StorageError(
                        f"S3",
                        f"Invalid MD5 sum control for copy S3 object {from_path} to {to_path} : {from_md5} != {to_md5}",
                    )

        except Exception as e:
            raise StorageError(f"S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}")

    elif from_type == StorageType.CEPH and to_type == StorageType.FILE:
        ioctx = __get_ceph_ioctx(from_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            if to_tray != "":
                os.makedirs(to_tray, exist_ok=True)
            f = open(to_path, "wb")

            offset = 0
            size = 0

            while True:
                chunk = ioctx.read(from_base_name, 65536, offset)
                size = len(chunk)
                offset += size
                f.write(chunk)

                if from_md5 is not None:
                    checker.update(chunk)

                if size < 65536:
                    break

            f.close()

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"CEPH and FILE",
                    f"Invalid MD5 sum control for copy CEPH object {from_path} to file {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(
                f"CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}"
            )

    elif from_type == StorageType.FILE and to_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(to_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            f = open(from_path, "rb")

            offset = 0
            size = 0

            while True:
                chunk = f.read(65536)
                size = len(chunk)
                ioctx.write(to_base_name, chunk, offset)
                offset += size

                if from_md5 is not None:
                    checker.update(chunk)

                if size < 65536:
                    break

            f.close()

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"FILE and CEPH",
                    f"Invalid MD5 sum control for copy file {from_path} to CEPH object {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(
                f"FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}"
            )

    elif from_type == StorageType.CEPH and to_type == StorageType.CEPH:
        from_ioctx = __get_ceph_ioctx(from_tray)
        to_ioctx = __get_ceph_ioctx(to_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            offset = 0
            size = 0

            while True:
                chunk = from_ioctx.read(from_base_name, 65536, offset)
                size = len(chunk)
                to_ioctx.write(to_base_name, chunk, offset)
                offset += size

                if from_md5 is not None:
                    checker.update(chunk)

                if size < 65536:
                    break

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"FILE and CEPH",
                    f"Invalid MD5 sum control for copy CEPH object {from_path} to {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(f"CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}")

    elif from_type == StorageType.CEPH and to_type == StorageType.S3:
        from_ioctx = __get_ceph_ioctx(from_tray)

        s3_client, to_bucket = __get_s3_client(to_tray)

        if from_md5 is not None:
            checker = hashlib.md5()

        try:
            offset = 0
            size = 0

            with tempfile.NamedTemporaryFile("w+b", delete=False) as f:
                name_tmp = f.name
                while True:
                    chunk = from_ioctx.read(from_base_name, 65536, offset)
                    size = len(chunk)
                    offset += size
                    f.write(chunk)

                    if from_md5 is not None:
                        checker.update(chunk)

                    if size < 65536:
                        break

            s3_client["client"].upload_file(name_tmp, to_bucket, to_base_name)

            os.remove(name_tmp)

            if from_md5 is not None and from_md5 != checker.hexdigest():
                raise StorageError(
                    f"CEPH and S3",
                    f"Invalid MD5 sum control for copy CEPH object {from_path} to S3 object {to_path} : {from_md5} != {checker.hexdigest()}",
                )

        except Exception as e:
            raise StorageError(
                f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
            )

    elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE :

        try:
            response = requests.get(from_type.value + from_path, stream = True)
            with open(to_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=65536) :
                    if chunk:
                        f.write(chunk)

        except Exception as e:
            raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}")

    elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH :

        to_ioctx = __get_ceph_ioctx(to_tray)

        try:
            response = requests.get(from_type.value + from_path, stream = True)
            offset = 0
            for chunk in response.iter_content(chunk_size=65536) :
                if chunk:
                    size = len(chunk)
                    to_ioctx.write(to_base_name, chunk, offset)
                    offset += size

        except Exception as e:
            raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}")

    elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 :

        to_s3_client, to_bucket = __get_s3_client(to_tray)

        try:
            response = requests.get(from_type.value + from_path, stream = True)
            with tempfile.NamedTemporaryFile("w+b",delete=False) as f:
                name_fich = f.name
                for chunk in response.iter_content(chunk_size=65536) :
                    if chunk:
                        f.write(chunk)

            to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name)

            os.remove(name_fich)

        except Exception as e:
            raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}")

    else:
        raise StorageError(
            f"{from_type.name} and {to_type.name}",
            f"Cannot copy from {from_type.name} to {to_type.name}",
        )
def disconnect_ceph_clients() ‑> None

Clean CEPH clients

Expand source code
def disconnect_ceph_clients() -> None:
    """Clean CEPH clients"""
    global __CEPH_CLIENT, __CEPH_IOCTXS
    __CEPH_CLIENT = None
    __CEPH_IOCTXS = {}
def disconnect_s3_clients() ‑> None

Clean S3 clients

Expand source code
def disconnect_s3_clients() -> None:
    """Clean S3 clients"""
  
    global __S3_CLIENTS, __S3_DEFAULT_CLIENT
    __S3_CLIENTS = {}
    __S3_DEFAULT_CLIENT = None
def exists(path: str) ‑> bool

Do the file or object exist ?

Args

path : str
path of file/object to test

Raises

MissingEnvironmentError
Missing object storage informations
StorageError
Storage read issue

Returns

bool
file/object existing status
Expand source code
def exists(path: str) -> bool:
    """Do the file or object exist ?

    Args:
        path (str): path of file/object to test

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue

    Returns:
        bool: file/object existing status
    """

    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)
            return True
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                return False
            else:
                raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            ioctx.stat(base_name)
            return True
        except rados.ObjectNotFound as e:
            return False
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        return os.path.exists(path)

    elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

        try:
            response = requests.get(storage_type.value + path, stream=True)
            if response.status_code == 200 :
                return True
            else :
                return False
        except Exception as e:
            raise StorageError(storage_type.name, e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to test if exists")
def get_data_binary(path: str, range: Tuple[int, int] = None) ‑> str

Load data into a binary string

Args

path : str
path to data
range : Tuple[int, int], optional
offset and size, to make a partial read. Defaults to None.

Raises

MissingEnvironmentError
Missing object storage informations
StorageError
Storage read issue
FileNotFoundError
File or object does not exist

Returns

str
Data binary content
Expand source code
def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
    """Load data into a binary string

    Args:
        path (str): path to data
        range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue
        FileNotFoundError: File or object does not exist

    Returns:
        str: Data binary content
    """
    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            if range is None:
                data = (
                    s3_client["client"]
                    .get_object(
                        Bucket=bucket_name,
                        Key=base_name,
                    )["Body"]
                    .read()
                )
            else:
                data = (
                    s3_client["client"]
                    .get_object(
                        Bucket=bucket_name,
                        Key=base_name,
                        Range=f"bytes={range[0]}-{range[0] + range[1] - 1}",
                    )["Body"]
                    .read()
                )

        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                raise FileNotFoundError(f"{storage_type.value}{path}")
            else:
                raise StorageError("S3", e)

        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            if range is None:
                size, mtime = ioctx.stat(base_name)
                data = ioctx.read(base_name, size)
            else:
                data = ioctx.read(base_name, range[1], range[0])

        except rados.ObjectNotFound as e:
            raise FileNotFoundError(f"{storage_type.value}{path}")

        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            f = open(path, "rb")
            if range is None:
                data = f.read()
            else:
                f.seek(range[0])
                data = f.read(range[1])

            f.close()

        except FileNotFoundError as e:
            raise FileNotFoundError(f"{storage_type.value}{path}")

        except Exception as e:
            raise StorageError("FILE", e)

    elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

        if range is None :
            try:
                reponse = requests.get(f"{storage_type.value}{path}", stream=True)
                data = reponse.content
                if reponse.status_code == 404 :
                    raise FileNotFoundError(f"{storage_type.value}{path}")
            except Exception as e:
                raise StorageError(storage_type.name, e)
        else :
            raise NotImplementedError

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to read binary data")

    return data
def get_data_str(path: str) ‑> str

Load full data into a string

Args

path : str
path to data

Raises

MissingEnvironmentError
Missing object storage informations
StorageError
Storage read issue
FileNotFoundError
File or object does not exist

Returns

str
Data content
Expand source code
def get_data_str(path: str) -> str:
    """Load full data into a string

    Args:
        path (str): path to data

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue
        FileNotFoundError: File or object does not exist

    Returns:
        str: Data content
    """

    return get_data_binary(path).decode("utf-8")
def get_infos_from_path(path: str) ‑> Tuple[StorageType, str, str, str]

Extract storage type, the unprefixed path, the container and the basename from path (Default: FILE storage)

For a FILE storage, the tray is the directory and the basename is the file name.

For an object storage (CEPH or S3), the tray is the bucket or the pool and the basename is the object name. For a S3 bucket, format can be @ to use several clusters. Cluster name is the host (without protocol)

Args

path : str
path to analyse

Returns

Tuple[StorageType, str, str, str]
storage type, unprefixed path, the container and the basename
Expand source code
def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]:
    """Extract storage type, the unprefixed path, the container and the basename from path (Default: FILE storage)

    For a FILE storage, the tray is the directory and the basename is the file name.

    For an object storage (CEPH or S3), the tray is the bucket or the pool and the basename is the object name.
    For a S3 bucket, format can be <bucket name>@<cluster name> to use several clusters. Cluster name is the host (without protocol)

    Args:
        path (str): path to analyse

    Returns:
        Tuple[StorageType, str, str, str]: storage type, unprefixed path, the container and the basename
    """

    if path.startswith("s3://"):
        bucket_name, object_name = path[5:].split("/", 1)
        return StorageType.S3, path[5:], bucket_name, object_name
    elif path.startswith("ceph://"):
        pool_name, object_name = path[7:].split("/", 1)
        return StorageType.CEPH, path[7:], pool_name, object_name
    elif path.startswith("file://"):
        return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
    elif path.startswith("http://"):
        return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
    elif path.startswith("https://"):
        return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:])
    else:
        return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path)
def get_osgeo_path(path: str) ‑> str

Return GDAL/OGR Open compliant path and configure storage access

For a S3 input path, endpoint, access and secret keys are set and path is built with "/vsis3" root.

For a FILE input path, only storage prefix is removed

Args

path : str
Source path

Raises

NotImplementedError
Storage type not handled

Returns

str
GDAL/OGR Open compliant path
Expand source code
def get_osgeo_path(path: str) -> str:
    """Return GDAL/OGR Open compliant path and configure storage access

    For a S3 input path, endpoint, access and secret keys are set and path is built with "/vsis3" root.

    For a FILE input path, only storage prefix is removed

    Args:
        path (str): Source path

    Raises:
        NotImplementedError: Storage type not handled

    Returns:
        str: GDAL/OGR Open compliant path
    """

    storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", s3_client["secret_key"])
        gdal.SetConfigOption("AWS_ACCESS_KEY_ID", s3_client["key"])
        gdal.SetConfigOption("AWS_S3_ENDPOINT", s3_client["host"])
        gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE")

        return f"/vsis3/{bucket_name}/{base_name}"

    elif storage_type == StorageType.FILE:
        return unprefixed_path

    else:
        raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}")
def get_path_from_infos(storage_type: StorageType, *args) ‑> str

Write full path from elements

Prefixed wih storage's type, elements are joined with a slash

Args

storage_type : StorageType
Storage's type for path

Returns

str
Full path
Expand source code
def get_path_from_infos(storage_type: StorageType, *args) -> str:
    """Write full path from elements

    Prefixed wih storage's type, elements are joined with a slash

    Args:
        storage_type (StorageType): Storage's type for path

    Returns:
        str: Full path
    """
    return f"{storage_type.value}{os.path.join(*args)}"
def get_size(path: str) ‑> int

Get size of file or object

Args

path : str
path of file/object whom size is asked

Raises

MissingEnvironmentError
Missing object storage informations
StorageError
Storage read issue

Returns

int
file/object size, in bytes
Expand source code
def get_size(path: str) -> int:
    """Get size of file or object

    Args:
        path (str): path of file/object whom size is asked

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage read issue

    Returns:
        int: file/object size, in bytes
    """

    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            size = s3_client["client"].head_object(Bucket=bucket_name, Key=base_name)[
                "ContentLength"
            ]
            return int(size)
        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            size, mtime = ioctx.stat(base_name)
            return size
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            file_stats = os.stat(path)
            return file_stats.st_size
        except Exception as e:
            raise StorageError("FILE", e)

    elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

        try:
            # Le stream=True permet de ne télécharger que le header initialement
            reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
            return reponse
        except Exception as e:
            raise StorageError(storage_type.name, e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to get size")
def hash_file(path: str) ‑> str

Process MD5 sum of the provided file

Args

path : str
path to file

Returns

str
hexadeimal MD5 sum
Expand source code
def hash_file(path: str) -> str:
    """Process MD5 sum of the provided file

    Args:
        path (str): path to file

    Returns:
        str: hexadeimal MD5 sum
    """

    checker = hashlib.md5()

    with open(path, "rb") as file:
        chunk = 0
        while chunk != b"":
            chunk = file.read(65536)
            checker.update(chunk)

    return checker.hexdigest()

Create a symbolic link

Args

target_path : str
file/object to link
link_path : str
link to create
hard : bool, optional
hard link rather than symbolic. Only for FILE storage. Defaults to False.

Raises

StorageError
Unhandled link or link issue
MissingEnvironmentError
Missing object storage informations
Expand source code
def link(target_path: str, link_path: str, hard: bool = False) -> None:
    """Create a symbolic link

    Args:
        target_path (str): file/object to link
        link_path (str): link to create
        hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False.

    Raises:
        StorageError: Unhandled link or link issue
        MissingEnvironmentError: Missing object storage informations
    """

    target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path)
    link_type, link_path, link_tray, link_base_name = get_infos_from_path(link_path)

    if target_type != link_type:
        raise StorageError(
            f"{target_type.name} and {link_type.name}",
            f"Cannot make link between two different storage types",
        )

    if hard and target_type != StorageType.FILE:
        raise StorageError(target_type.name, "Hard link is available only for FILE storage")

    # Réalisation du lien, selon les types de stockage
    if target_type == StorageType.S3:
        target_s3_client, target_bucket = __get_s3_client(target_tray)
        link_s3_client, link_bucket = __get_s3_client(link_tray)

        if target_s3_client["host"] != link_s3_client["host"]:
            raise StorageError(
                f"S3",
                f"Cannot make link {link_path} -> {target_path} : link works only on the same S3 cluster",
            )

        try:
            target_s3_client["client"].put_object(
                Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode(
                    "utf-8"
                ),
                Bucket=link_bucket,
                Key=link_base_name,
            )
        except Exception as e:
            raise StorageError("S3", e)

    elif target_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(link_tray)

        try:
            ioctx.write_full(
                link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode("utf-8")
            )
        except Exception as e:
            raise StorageError("CEPH", e)

    elif target_type == StorageType.FILE:
        try:
            if hard:
                os.link(target_path, link_path)
            else:
                os.symlink(target_path, link_path)
        except Exception as e:
            raise StorageError("FILE", e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to make link")
def put_data_str(data: str, path: str) ‑> None

Store string data into a file or an object

UTF-8 encoding is used for bytes conversion

Args

data : str
data to write
path : str
destination path, where to write data

Raises

MissingEnvironmentError
Missing object storage informations
StorageError
Storage write issue
Expand source code
def put_data_str(data: str, path: str) -> None:
    """Store string data into a file or an object

    UTF-8 encoding is used for bytes conversion

    Args:
        data (str): data to write
        path (str): destination path, where to write data

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage write issue
    """

    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            s3_client["client"].put_object(
                Body=data.encode("utf-8"), Bucket=bucket_name, Key=base_name
            )
        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            ioctx.write_full(base_name, data.encode("utf-8"))
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            f = open(path, "w")
            f.write(data)
            f.close()
        except Exception as e:
            raise StorageError("FILE", e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to write string data")
def remove(path: str) ‑> None

Remove the file/object

Args

path : str
path of file/object to remove

Raises

MissingEnvironmentError
Missing object storage informations
StorageError
Storage removal issue
Expand source code
def remove(path: str) -> None:
    """Remove the file/object

    Args:
        path (str): path of file/object to remove

    Raises:
        MissingEnvironmentError: Missing object storage informations
        StorageError: Storage removal issue
    """
    storage_type, path, tray_name, base_name = get_infos_from_path(path)

    if storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try:
            s3_client["client"].delete_object(Bucket=bucket_name, Key=base_name)
        except Exception as e:
            raise StorageError("S3", e)

    elif storage_type == StorageType.CEPH:
        ioctx = __get_ceph_ioctx(tray_name)

        try:
            ioctx.remove_object(base_name)
        except rados.ObjectNotFound as e:
            pass
        except Exception as e:
            raise StorageError("CEPH", e)

    elif storage_type == StorageType.FILE:
        try:
            os.remove(path)
        except FileNotFoundError as e:
            pass
        except Exception as e:
            raise StorageError("FILE", e)

    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to remove things")
def size_path(path: str) ‑> int

Return the size of the path given (or, for the CEPH, the sum of the size of each object of the .list)

Args

path : str
Source path

Raises

StorageError
Unhandled link or link issue
MissingEnvironmentError
Missing object storage informations

Returns

int
size of the path
Expand source code
def size_path(path: str) -> int :
    """Return the size of the path given (or, for the CEPH, the sum of the size of each object of the .list)

    Args:
        path (str): Source path

    Raises:
        StorageError: Unhandled link or link issue
        MissingEnvironmentError: Missing object storage informations

    Returns:
        int: size of the path
    """
    storage_type, unprefixed_path, tray_name, base_name  = get_infos_from_path(path)

    if storage_type == StorageType.FILE:
        try :
            total = 0
            with os.scandir(unprefixed_path) as it:
                for entry in it:
                    if entry.is_file():
                        total += entry.stat().st_size
                    elif entry.is_dir():
                        total += size_path(entry.path)

        except Exception as e:
            raise StorageError("FILE", e)

    elif storage_type == StorageType.S3:
        s3_client, bucket_name = __get_s3_client(tray_name)

        try :
            paginator = s3_client["client"].get_paginator('list_objects_v2')
            pages = paginator.paginate(
                Bucket=bucket_name,
                Prefix=base_name+"/",
                PaginationConfig={
                    'PageSize': 10000,
                }
            )
            total = 0
            for page in pages:
                for key in page['Contents']:
                    total += key['Size']

        except Exception as e:
            raise StorageError("S3", e)


    elif storage_type == StorageType.CEPH:
        raise NotImplementedError
    else:
        raise StorageError("UNKNOWN", "Unhandled storage type to calculate size")

    return total

Classes

class StorageType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class StorageType(Enum):
    FILE = "file://"
    S3 = "s3://"
    CEPH = "ceph://"
    HTTP = "http://"
    HTTPS = "https://"

Ancestors

  • enum.Enum

Class variables

var CEPH
var FILE
var HTTP
var HTTPS
var S3