Module rok4_tools.joincache_utils.finisher

Expand source code
import logging
import os
import tempfile
from typing import Dict, List, Tuple, Union

from rok4 import storage
from rok4.pyramid import Pyramid

from rok4_tools.global_utils.source import SourcePyramids


def work(config: Dict) -> None:
    """Finisher steps : finalize the pyramid's processing

    Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location,
    write the output pyramid's list to the final location (from the todo lists) and remove the todo lists

    Args:
        config (Dict): JOINCACHE configuration

    Raises:
        Exception: Cannot load the input or output pyramid
        Exception: Cannot write output pyramid's descriptor
        Exception: Cannot concatenate splits' done lists and write the final output pyramid's list to the final location
    """

    datasources = []
    for i in range(len(config["datasources"])):
        sources = SourcePyramids(
            config["datasources"][i]["bottom"],
            config["datasources"][i]["top"],
            config["datasources"][i]["source"]["descriptors"],
        )
        datasources.append(sources)

    # Chargement de la pyramide à écrire
    storage_pyramid = {
        "type": datasources[0].pyramids[0].storage_type,
        "root": config["pyramid"]["root"],
    }
    try:
        to_pyramid = Pyramid.from_other(
            datasources[0].pyramids[0],
            config["pyramid"]["name"],
            storage_pyramid,
            mask=config["pyramid"]["mask"],
        )
    except Exception as e:
        raise Exception(
            f"Cannot create the destination pyramid descriptor from the source one: {e}"
        )

    for sources in datasources:
        from_pyramids = sources.pyramids
        levels = from_pyramids[0].get_levels(sources.bottom, sources.top)
        for level in levels:
            try:
                to_pyramid.delete_level(level.id)
            except:
                pass
            info = sources.info_level(level.id)
            to_pyramid.add_level(level.id, info[0], info[1], info[2])

    try:
        to_pyramid.write_descriptor()
    except Exception as e:
        raise Exception(f"Cannot write output pyramid's descriptor to final location: {e}")

    try:
        with tempfile.NamedTemporaryFile(mode="w", delete=False) as list_file_obj:
            list_file_tmp = list_file_obj.name

            # Écriture de l'en-tête du fichier liste, avec toutes les racines des pyramides utilisées
            to_root = os.path.join(to_pyramid.storage_root, to_pyramid.name)
            list_file_obj.write(f"0={to_root}\n")

            todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
            storage.copy(
                os.path.join(config["process"]["directory"], f"todo.finisher.list"),
                f"file://{todo_list_obj.name}",
            )

            used_pyramids_roots = {}

            # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
            todo_list_obj = open(todo_list_obj.name)
            for line in todo_list_obj:
                line = line.rstrip()
                list_file_obj.write(f"{line}\n")
                index, root = line.split("=")
                used_pyramids_roots[index] = root

            todo_list_obj.close()
            storage.remove(f"file://{todo_list_obj.name}")
            storage.remove(os.path.join(config["process"]["directory"], f"todo.finisher.list"))

            list_file_obj.write("#\n")

            for i in range(0, config["process"]["parallelization"]):
                todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
                storage.copy(
                    os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
                    f"file://{todo_list_obj.name}",
                )

                # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
                todo_list_obj = open(todo_list_obj.name)
                for line in todo_list_obj:
                    line = line.rstrip()
                    parts = line.split(" ")

                    if parts[0] == "w2c":
                        storage_type, path, tray, base_name = storage.get_infos_from_path(parts[1])
                        # La dalle a été recalculée, elle appartient donc à la pyramide de sortie
                        path = path.replace(to_root, "0")
                        list_file_obj.write(f"{path}\n")

                    elif parts[0] == "link":
                        storage_type, path, tray, base_name = storage.get_infos_from_path(parts[2])
                        # On a fait un lien, on met donc dans la liste la racine de la pyramide source
                        path = path.replace(used_pyramids_roots[parts[3]], parts[3])
                        list_file_obj.write(f"{path}\n")

                todo_list_obj.close()
                storage.remove(f"file://{todo_list_obj.name}")
                storage.remove(os.path.join(config["process"]["directory"], f"todo.{i+1}.list"))

        storage.copy(f"file://{list_file_tmp}", to_pyramid.list)
        storage.remove(f"file://{list_file_tmp}")

    except Exception as e:
        raise Exception(
            f"Cannot concatenate splits' done lists and write the final output pyramid's list to the final location: {e}"
        )

Functions

def work(config: Dict[~KT, ~VT]) ‑> None

Finisher steps : finalize the pyramid's processing

Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location, write the output pyramid's list to the final location (from the todo lists) and remove the todo lists

Args

config : Dict
JOINCACHE configuration

Raises

Exception
Cannot load the input or output pyramid
Exception
Cannot write output pyramid's descriptor
Exception
Cannot concatenate splits' done lists and write the final output pyramid's list to the final location
Expand source code
def work(config: Dict) -> None:
    """Finisher steps : finalize the pyramid's processing

    Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location,
    write the output pyramid's list to the final location (from the todo lists) and remove the todo lists

    Args:
        config (Dict): JOINCACHE configuration

    Raises:
        Exception: Cannot load the input or output pyramid
        Exception: Cannot write output pyramid's descriptor
        Exception: Cannot concatenate splits' done lists and write the final output pyramid's list to the final location
    """

    datasources = []
    for i in range(len(config["datasources"])):
        sources = SourcePyramids(
            config["datasources"][i]["bottom"],
            config["datasources"][i]["top"],
            config["datasources"][i]["source"]["descriptors"],
        )
        datasources.append(sources)

    # Chargement de la pyramide à écrire
    storage_pyramid = {
        "type": datasources[0].pyramids[0].storage_type,
        "root": config["pyramid"]["root"],
    }
    try:
        to_pyramid = Pyramid.from_other(
            datasources[0].pyramids[0],
            config["pyramid"]["name"],
            storage_pyramid,
            mask=config["pyramid"]["mask"],
        )
    except Exception as e:
        raise Exception(
            f"Cannot create the destination pyramid descriptor from the source one: {e}"
        )

    for sources in datasources:
        from_pyramids = sources.pyramids
        levels = from_pyramids[0].get_levels(sources.bottom, sources.top)
        for level in levels:
            try:
                to_pyramid.delete_level(level.id)
            except:
                pass
            info = sources.info_level(level.id)
            to_pyramid.add_level(level.id, info[0], info[1], info[2])

    try:
        to_pyramid.write_descriptor()
    except Exception as e:
        raise Exception(f"Cannot write output pyramid's descriptor to final location: {e}")

    try:
        with tempfile.NamedTemporaryFile(mode="w", delete=False) as list_file_obj:
            list_file_tmp = list_file_obj.name

            # Écriture de l'en-tête du fichier liste, avec toutes les racines des pyramides utilisées
            to_root = os.path.join(to_pyramid.storage_root, to_pyramid.name)
            list_file_obj.write(f"0={to_root}\n")

            todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
            storage.copy(
                os.path.join(config["process"]["directory"], f"todo.finisher.list"),
                f"file://{todo_list_obj.name}",
            )

            used_pyramids_roots = {}

            # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
            todo_list_obj = open(todo_list_obj.name)
            for line in todo_list_obj:
                line = line.rstrip()
                list_file_obj.write(f"{line}\n")
                index, root = line.split("=")
                used_pyramids_roots[index] = root

            todo_list_obj.close()
            storage.remove(f"file://{todo_list_obj.name}")
            storage.remove(os.path.join(config["process"]["directory"], f"todo.finisher.list"))

            list_file_obj.write("#\n")

            for i in range(0, config["process"]["parallelization"]):
                todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
                storage.copy(
                    os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
                    f"file://{todo_list_obj.name}",
                )

                # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
                todo_list_obj = open(todo_list_obj.name)
                for line in todo_list_obj:
                    line = line.rstrip()
                    parts = line.split(" ")

                    if parts[0] == "w2c":
                        storage_type, path, tray, base_name = storage.get_infos_from_path(parts[1])
                        # La dalle a été recalculée, elle appartient donc à la pyramide de sortie
                        path = path.replace(to_root, "0")
                        list_file_obj.write(f"{path}\n")

                    elif parts[0] == "link":
                        storage_type, path, tray, base_name = storage.get_infos_from_path(parts[2])
                        # On a fait un lien, on met donc dans la liste la racine de la pyramide source
                        path = path.replace(used_pyramids_roots[parts[3]], parts[3])
                        list_file_obj.write(f"{path}\n")

                todo_list_obj.close()
                storage.remove(f"file://{todo_list_obj.name}")
                storage.remove(os.path.join(config["process"]["directory"], f"todo.{i+1}.list"))

        storage.copy(f"file://{list_file_tmp}", to_pyramid.list)
        storage.remove(f"file://{list_file_tmp}")

    except Exception as e:
        raise Exception(
            f"Cannot concatenate splits' done lists and write the final output pyramid's list to the final location: {e}"
        )