Module rok4_tools.pyr2pyr_utils.finisher

Expand source code
import logging
import os
import tempfile
from typing import Dict, List, Tuple, Union

from rok4 import storage
from rok4.pyramid import Pyramid


def work(config: Dict) -> None:
    """Finisher steps : finalize the pyramid's transfer

    Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location,
    write the output pyramid's list to the final location (from the todo lists) and remove the todo lists

    Args:
        config (Dict): PYR2PYR configuration

    Raises:
        Exception: Cannot load the input or the output pyramid
        Exception: Cannot write output pyramid's descriptor
        Exception: Cannot concatenate todo lists to write the output pyramid's list
    """

    # Chargement de la pyramide à recopier
    try:
        from_pyramid = Pyramid.from_descriptor(config["from"]["descriptor"])
    except Exception as e:
        raise Exception(f"Cannot load source pyramid descriptor: {e}")

    # Chargement de la pyramide à écrire
    try:
        to_pyramid = Pyramid.from_other(from_pyramid, config["to"]["name"], config["to"]["storage"])
    except Exception as e:
        raise Exception(
            f"Cannot create the destination pyramid descriptor from the source one: {e}"
        )

    try:
        to_pyramid.write_descriptor()
    except Exception as e:
        raise Exception(f"Cannot write output pyramid's descriptor to final location: {e}")

    try:
        with tempfile.NamedTemporaryFile(mode="w", delete=False) as list_file_obj:
            list_file_tmp = list_file_obj.name

            # Écriture de l'en-tête du fichier liste : une seule racine, celle de la pyramide en sortie
            to_root = os.path.join(to_pyramid.storage_root, to_pyramid.name)
            list_file_obj.write(f"0={to_root}\n#\n")

            if to_pyramid.storage_s3_cluster is not None:
                # Les chemins de destination contiendront l'hôte du cluster S3 utilisé,
                # Il faut donc l'inclure dans la racine à supprimer des chemins vers les dalles
                to_root = os.path.join(
                    f"{to_pyramid.storage_root}@{to_pyramid.storage_s3_cluster}", to_pyramid.name
                )

            for i in range(0, config["process"]["parallelization"]):
                todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
                storage.copy(
                    os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
                    f"file://{todo_list_obj.name}",
                )

                # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
                todo_list_obj = open(todo_list_obj.name)
                for line in todo_list_obj:
                    line = line.rstrip()
                    parts = line.split(" ")

                    if (len(parts) != 3 and len(parts) != 4) or parts[0] != "cp":
                        raise Exception(
                            f"Invalid todo list line: we need a cp command and 3 or 4 more elements (source and destination): {line}"
                        )

                    storage_type, path, tray, base_name = storage.get_infos_from_path(parts[2])
                    path = path.replace(to_root, "0")
                    list_file_obj.write(f"{path}\n")

                todo_list_obj.close()
                storage.remove(f"file://{todo_list_obj.name}")
                storage.remove(os.path.join(config["process"]["directory"], f"todo.{i+1}.list"))

        storage.copy(f"file://{list_file_tmp}", to_pyramid.list)
        storage.remove(f"file://{list_file_tmp}")

    except Exception as e:
        raise Exception(
            f"Cannot concatenate splits' done lists and write the final output pyramid's list to the final location: {e}"
        )

Functions

def work(config: Dict[~KT, ~VT]) ‑> None

Finisher steps : finalize the pyramid's transfer

Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location, write the output pyramid's list to the final location (from the todo lists) and remove the todo lists

Args

config : Dict
PYR2PYR configuration

Raises

Exception
Cannot load the input or the output pyramid
Exception
Cannot write output pyramid's descriptor
Exception
Cannot concatenate todo lists to write the output pyramid's list
Expand source code
def work(config: Dict) -> None:
    """Finisher steps : finalize the pyramid's transfer

    Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location,
    write the output pyramid's list to the final location (from the todo lists) and remove the todo lists

    Args:
        config (Dict): PYR2PYR configuration

    Raises:
        Exception: Cannot load the input or the output pyramid
        Exception: Cannot write output pyramid's descriptor
        Exception: Cannot concatenate todo lists to write the output pyramid's list
    """

    # Chargement de la pyramide à recopier
    try:
        from_pyramid = Pyramid.from_descriptor(config["from"]["descriptor"])
    except Exception as e:
        raise Exception(f"Cannot load source pyramid descriptor: {e}")

    # Chargement de la pyramide à écrire
    try:
        to_pyramid = Pyramid.from_other(from_pyramid, config["to"]["name"], config["to"]["storage"])
    except Exception as e:
        raise Exception(
            f"Cannot create the destination pyramid descriptor from the source one: {e}"
        )

    try:
        to_pyramid.write_descriptor()
    except Exception as e:
        raise Exception(f"Cannot write output pyramid's descriptor to final location: {e}")

    try:
        with tempfile.NamedTemporaryFile(mode="w", delete=False) as list_file_obj:
            list_file_tmp = list_file_obj.name

            # Écriture de l'en-tête du fichier liste : une seule racine, celle de la pyramide en sortie
            to_root = os.path.join(to_pyramid.storage_root, to_pyramid.name)
            list_file_obj.write(f"0={to_root}\n#\n")

            if to_pyramid.storage_s3_cluster is not None:
                # Les chemins de destination contiendront l'hôte du cluster S3 utilisé,
                # Il faut donc l'inclure dans la racine à supprimer des chemins vers les dalles
                to_root = os.path.join(
                    f"{to_pyramid.storage_root}@{to_pyramid.storage_s3_cluster}", to_pyramid.name
                )

            for i in range(0, config["process"]["parallelization"]):
                todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
                storage.copy(
                    os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
                    f"file://{todo_list_obj.name}",
                )

                # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
                todo_list_obj = open(todo_list_obj.name)
                for line in todo_list_obj:
                    line = line.rstrip()
                    parts = line.split(" ")

                    if (len(parts) != 3 and len(parts) != 4) or parts[0] != "cp":
                        raise Exception(
                            f"Invalid todo list line: we need a cp command and 3 or 4 more elements (source and destination): {line}"
                        )

                    storage_type, path, tray, base_name = storage.get_infos_from_path(parts[2])
                    path = path.replace(to_root, "0")
                    list_file_obj.write(f"{path}\n")

                todo_list_obj.close()
                storage.remove(f"file://{todo_list_obj.name}")
                storage.remove(os.path.join(config["process"]["directory"], f"todo.{i+1}.list"))

        storage.copy(f"file://{list_file_tmp}", to_pyramid.list)
        storage.remove(f"file://{list_file_tmp}")

    except Exception as e:
        raise Exception(
            f"Cannot concatenate splits' done lists and write the final output pyramid's list to the final location: {e}"
        )