Module rok4_tools.joincache_utils.master

Expand source code
import itertools
import os
import tempfile
from typing import Dict, List, Tuple, Union

from rok4 import storage
from rok4.enums import PyramidType, SlabType
from rok4.pyramid import Pyramid

from rok4_tools.global_utils.source import SourcePyramids

"""Todo list instructions

* c2w <source slab> - Convert a source pyramid slab (MASK ro DATA) to work format
* oNt - Overlay  previous converted data slabs (considering possible masks)
* w2c <destination slab> - Convert the output of last overlayNtiff to slab format, into the output pyramid
* link <destination slab> <source slab> <source index> - Make a symbolic slab to source slab. Source index will be used to generate the pyramid's list

"""


def work(config: Dict) -> None:
    """Master steps : prepare and split copies and merge to do

    Load and check input pyramids from the descriptors and write the todo lists, splitting the copies or merge to do

    Args:
        config (Dict): JOINCACHE configuration

    Raises:
        Exception: Cannot load the input or the output pyramid
        Exception: S3 cluster host have not to be provided into bucket names (output or inputs)
        Exception: Sources pyramid have different features
        Exception: Cannot open stream to write todo lists
        Exception: Cannot copy todo lists
    """

    datasources = []
    tms_reference = None
    format_reference = None
    channels_reference = None
    for datasource in config["datasources"]:
        sources = SourcePyramids(
            datasource["bottom"],
            datasource["top"],
            datasource["source"]["descriptors"],
        )
        # Vérification de l'unicité du TMS
        if not tms_reference:
            tms_reference = sources.tms
        elif tms_reference.name != sources.tms.name:
            raise Exception(
                f"Sources pyramids cannot have two different TMS : {tms_reference} and {sources.tms}"
            )

        # Vérification de l'unicité du format
        if not format_reference:
            format_reference = sources.format
        elif format_reference != sources.format:
            raise Exception(
                f"Sources pyramids cannot have two different format : {format_reference} and {sources.format}"
            )

        # Vérification de l'unicité du nombre de canaux
        if not channels_reference:
            channels_reference = sources.channels
        elif channels_reference != sources.channels:
            raise Exception(
                f"Sources pyramids cannot have two different numbers of channels : {channels_reference} and {sources.channels}"
            )

        # Vérification du type des pyramides
        if sources.type != PyramidType.RASTER:
            raise Exception(f"Some sources pyramids are not a raster")

        datasources += [sources]

    # Chargement de la pyramide à écrire
    storage_pyramid = {
        "type": datasources[0].pyramids[0].storage_type,
        "root": config["pyramid"]["root"],
    }
    try:
        to_pyramid = Pyramid.from_other(
            datasources[0].pyramids[0],
            config["pyramid"]["name"],
            storage_pyramid,
            mask=config["pyramid"]["mask"],
        )
    except Exception as e:
        raise Exception(
            f"Cannot create the destination pyramid descriptor from the source one: {e}"
        )

    if to_pyramid.storage_s3_cluster is not None:
        # On ne travaille que sur un unique cluster S3, il ne faut pas préciser lequel dans les chemins
        raise Exception(
            f"Do not set S3 cluster host into output bucket name ({config['pyramid']['root']}) : only one cluster can be used with JOINCACHE"
        )

    # Ouverture des flux vers les listes de travail à faire
    temp_agent_todos = []
    temp_finisher_todo = None
    try:
        for i in range(0, config["process"]["parallelization"]):
            tmp = tempfile.NamedTemporaryFile(mode="w", delete=False)
            temp_agent_todos.append(tmp)

        temp_finisher_todo = tempfile.NamedTemporaryFile(mode="w", delete=False)

    except Exception as e:
        raise Exception(f"Cannot open stream to write todo lists: {e}")

    round_robin = itertools.cycle(temp_agent_todos)

    slab_finish = []
    level_finish = []
    used_pyramids_roots = {}
    used_pyramids_count = 0

    for sources in datasources:
        from_pyramids = sources.pyramids
        levels = from_pyramids[0].get_levels(sources.bottom, sources.top)
        for level in levels:
            # Vérification que plusieurs datasources ne définissent pas un même niveau
            if level.id not in level_finish:
                try:
                    to_pyramid.delete_level(level.id)
                except:
                    pass
                info = sources.info_level(level.id)
                to_pyramid.add_level(level.id, info[0], info[1], info[2])
                level_finish += [level.id]
            else:
                raise Exception(f"Different datasources cannot define the same level : {level.id}")

            for i in range(len(from_pyramids)):
                # Récupération des dalles de la pyramide
                slabs = from_pyramids[i].list_generator(level.id)
                slabs_mask = from_pyramids[i].list_generator(level.id)
                # Vérification si la dalle à déjà été traitée
                for slab in slabs:
                    if slab[0] in slab_finish:
                        continue
                    if slab[0][0].name == "MASK":
                        continue
                    from_slab_path = storage.get_path_from_infos(
                        from_pyramids[i].storage_type, slab[1]["root"], slab[1]["slab"]
                    )
                    process = [from_slab_path]
                    slab_finish += [slab[0]]

                    # Recherche du masque correspondant à la dalle
                    if config["process"]["mask"]:
                        slab_mask = (
                            (SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3]),
                            slab[1],
                        )
                        slab_mask[1]["slab"] = from_pyramids[i].get_slab_path_from_infos(
                            SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3], False
                        )
                        if slab_mask in slabs_mask:
                            mask = [
                                storage.get_path_from_infos(
                                    from_pyramids[i].storage_type,
                                    slab_mask[1]["root"],
                                    slab_mask[1]["slab"],
                                )
                            ]
                        else:
                            mask = [""]

                    # Recherche de la dalle dans d'autres pyramides sources
                    if not config["process"]["only_links"]:
                        for j in range(i + 1, len(from_pyramids)):
                            slabs_other = from_pyramids[j].list_generator(level.id)
                            slabs_other_mask = from_pyramids[j].list_generator(level.id)
                            for slab_other in slabs_other:
                                if slab_other[0] == slab[0]:
                                    from_slab_path_other = storage.get_path_from_infos(
                                        from_pyramids[j].storage_type,
                                        slab_other[1]["root"],
                                        slab_other[1]["slab"],
                                    )
                                    process += [from_slab_path_other]
                                    if config["process"]["mask"]:
                                        slab_mask_other = (
                                            (
                                                SlabType["MASK"],
                                                slab_other[0][1],
                                                slab_other[0][2],
                                                slab_other[0][3],
                                            ),
                                            slab_other[1],
                                        )
                                        slab_mask_other[1]["slab"] = from_pyramids[
                                            j
                                        ].get_slab_path_from_infos(
                                            SlabType["MASK"],
                                            slab_other[0][1],
                                            slab_other[0][2],
                                            slab_other[0][3],
                                            False,
                                        )
                                        if slab_mask_other in slabs_other_mask:
                                            mask += [
                                                storage.get_path_from_infos(
                                                    from_pyramids[i].storage_type,
                                                    slab_mask_other[1]["root"],
                                                    slab_mask_other[1]["slab"],
                                                )
                                            ]
                                        else:
                                            mask += [""]
                                    continue

                    to_slab_path = to_pyramid.get_slab_path_from_infos(
                        slab[0][0], slab[0][1], slab[0][2], slab[0][3]
                    )
                    if config["pyramid"]["mask"]:
                        to_slab_path_mask = to_pyramid.get_slab_path_from_infos(
                            SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3]
                        )

                    # Ecriture des commandes dans les todo-lists
                    if len(process) == 1:
                        if slab[1]["root"] in used_pyramids_roots:
                            root_index = used_pyramids_roots[slab[1]["root"]]
                        else:
                            used_pyramids_count += 1
                            root_index = used_pyramids_count
                            used_pyramids_roots[slab[1]["root"]] = used_pyramids_count

                        next(round_robin).write(
                            f"link {to_slab_path} {from_slab_path} {root_index}\n"
                        )
                        if config["pyramid"]["mask"]:
                            if mask[0] != "":
                                next(round_robin).write(
                                    f"link {to_slab_path_mask} {mask[0]} {root_index}\n"
                                )
                    else:
                        command = ""
                        for j in range(len(process)):
                            command += f"c2w {process[j]}\n"
                            if config["process"]["mask"]:
                                if mask[j] != "":
                                    command += f"c2w {mask[j]}\n"
                        command += "oNt\n"
                        command += f"w2c {to_slab_path}\n"
                        if config["pyramid"]["mask"]:
                            command += f"w2c {to_slab_path_mask}\n"
                        next(round_robin).write(command)

    for root in used_pyramids_roots:
        temp_finisher_todo.write(f"{used_pyramids_roots[root]}={root}\n")

    # Copie des listes de recopies à l'emplacement partagé (peut être du stockage objet)
    try:
        for i in range(0, config["process"]["parallelization"]):
            tmp = temp_agent_todos[i]
            tmp.close()
            storage.copy(
                f"file://{tmp.name}",
                os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
            )
            storage.remove(f"file://{tmp.name}")

        temp_finisher_todo.close()
        storage.copy(
            f"file://{temp_finisher_todo.name}",
            os.path.join(config["process"]["directory"], f"todo.finisher.list"),
        )
        storage.remove(f"file://{temp_finisher_todo.name}")

    except Exception as e:
        raise Exception(f"Cannot copy todo lists to final location and clean: {e}")

Functions

def work(config: Dict[~KT, ~VT]) ‑> None

Master steps : prepare and split copies and merge to do

Load and check input pyramids from the descriptors and write the todo lists, splitting the copies or merge to do

Args

config : Dict
JOINCACHE configuration

Raises

Exception
Cannot load the input or the output pyramid
Exception
S3 cluster host have not to be provided into bucket names (output or inputs)
Exception
Sources pyramid have different features
Exception
Cannot open stream to write todo lists
Exception
Cannot copy todo lists
Expand source code
def work(config: Dict) -> None:
    """Master steps : prepare and split copies and merge to do

    Load and check input pyramids from the descriptors and write the todo lists, splitting the copies or merge to do

    Args:
        config (Dict): JOINCACHE configuration

    Raises:
        Exception: Cannot load the input or the output pyramid
        Exception: S3 cluster host have not to be provided into bucket names (output or inputs)
        Exception: Sources pyramid have different features
        Exception: Cannot open stream to write todo lists
        Exception: Cannot copy todo lists
    """

    datasources = []
    tms_reference = None
    format_reference = None
    channels_reference = None
    for datasource in config["datasources"]:
        sources = SourcePyramids(
            datasource["bottom"],
            datasource["top"],
            datasource["source"]["descriptors"],
        )
        # Vérification de l'unicité du TMS
        if not tms_reference:
            tms_reference = sources.tms
        elif tms_reference.name != sources.tms.name:
            raise Exception(
                f"Sources pyramids cannot have two different TMS : {tms_reference} and {sources.tms}"
            )

        # Vérification de l'unicité du format
        if not format_reference:
            format_reference = sources.format
        elif format_reference != sources.format:
            raise Exception(
                f"Sources pyramids cannot have two different format : {format_reference} and {sources.format}"
            )

        # Vérification de l'unicité du nombre de canaux
        if not channels_reference:
            channels_reference = sources.channels
        elif channels_reference != sources.channels:
            raise Exception(
                f"Sources pyramids cannot have two different numbers of channels : {channels_reference} and {sources.channels}"
            )

        # Vérification du type des pyramides
        if sources.type != PyramidType.RASTER:
            raise Exception(f"Some sources pyramids are not a raster")

        datasources += [sources]

    # Chargement de la pyramide à écrire
    storage_pyramid = {
        "type": datasources[0].pyramids[0].storage_type,
        "root": config["pyramid"]["root"],
    }
    try:
        to_pyramid = Pyramid.from_other(
            datasources[0].pyramids[0],
            config["pyramid"]["name"],
            storage_pyramid,
            mask=config["pyramid"]["mask"],
        )
    except Exception as e:
        raise Exception(
            f"Cannot create the destination pyramid descriptor from the source one: {e}"
        )

    if to_pyramid.storage_s3_cluster is not None:
        # On ne travaille que sur un unique cluster S3, il ne faut pas préciser lequel dans les chemins
        raise Exception(
            f"Do not set S3 cluster host into output bucket name ({config['pyramid']['root']}) : only one cluster can be used with JOINCACHE"
        )

    # Ouverture des flux vers les listes de travail à faire
    temp_agent_todos = []
    temp_finisher_todo = None
    try:
        for i in range(0, config["process"]["parallelization"]):
            tmp = tempfile.NamedTemporaryFile(mode="w", delete=False)
            temp_agent_todos.append(tmp)

        temp_finisher_todo = tempfile.NamedTemporaryFile(mode="w", delete=False)

    except Exception as e:
        raise Exception(f"Cannot open stream to write todo lists: {e}")

    round_robin = itertools.cycle(temp_agent_todos)

    slab_finish = []
    level_finish = []
    used_pyramids_roots = {}
    used_pyramids_count = 0

    for sources in datasources:
        from_pyramids = sources.pyramids
        levels = from_pyramids[0].get_levels(sources.bottom, sources.top)
        for level in levels:
            # Vérification que plusieurs datasources ne définissent pas un même niveau
            if level.id not in level_finish:
                try:
                    to_pyramid.delete_level(level.id)
                except:
                    pass
                info = sources.info_level(level.id)
                to_pyramid.add_level(level.id, info[0], info[1], info[2])
                level_finish += [level.id]
            else:
                raise Exception(f"Different datasources cannot define the same level : {level.id}")

            for i in range(len(from_pyramids)):
                # Récupération des dalles de la pyramide
                slabs = from_pyramids[i].list_generator(level.id)
                slabs_mask = from_pyramids[i].list_generator(level.id)
                # Vérification si la dalle à déjà été traitée
                for slab in slabs:
                    if slab[0] in slab_finish:
                        continue
                    if slab[0][0].name == "MASK":
                        continue
                    from_slab_path = storage.get_path_from_infos(
                        from_pyramids[i].storage_type, slab[1]["root"], slab[1]["slab"]
                    )
                    process = [from_slab_path]
                    slab_finish += [slab[0]]

                    # Recherche du masque correspondant à la dalle
                    if config["process"]["mask"]:
                        slab_mask = (
                            (SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3]),
                            slab[1],
                        )
                        slab_mask[1]["slab"] = from_pyramids[i].get_slab_path_from_infos(
                            SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3], False
                        )
                        if slab_mask in slabs_mask:
                            mask = [
                                storage.get_path_from_infos(
                                    from_pyramids[i].storage_type,
                                    slab_mask[1]["root"],
                                    slab_mask[1]["slab"],
                                )
                            ]
                        else:
                            mask = [""]

                    # Recherche de la dalle dans d'autres pyramides sources
                    if not config["process"]["only_links"]:
                        for j in range(i + 1, len(from_pyramids)):
                            slabs_other = from_pyramids[j].list_generator(level.id)
                            slabs_other_mask = from_pyramids[j].list_generator(level.id)
                            for slab_other in slabs_other:
                                if slab_other[0] == slab[0]:
                                    from_slab_path_other = storage.get_path_from_infos(
                                        from_pyramids[j].storage_type,
                                        slab_other[1]["root"],
                                        slab_other[1]["slab"],
                                    )
                                    process += [from_slab_path_other]
                                    if config["process"]["mask"]:
                                        slab_mask_other = (
                                            (
                                                SlabType["MASK"],
                                                slab_other[0][1],
                                                slab_other[0][2],
                                                slab_other[0][3],
                                            ),
                                            slab_other[1],
                                        )
                                        slab_mask_other[1]["slab"] = from_pyramids[
                                            j
                                        ].get_slab_path_from_infos(
                                            SlabType["MASK"],
                                            slab_other[0][1],
                                            slab_other[0][2],
                                            slab_other[0][3],
                                            False,
                                        )
                                        if slab_mask_other in slabs_other_mask:
                                            mask += [
                                                storage.get_path_from_infos(
                                                    from_pyramids[i].storage_type,
                                                    slab_mask_other[1]["root"],
                                                    slab_mask_other[1]["slab"],
                                                )
                                            ]
                                        else:
                                            mask += [""]
                                    continue

                    to_slab_path = to_pyramid.get_slab_path_from_infos(
                        slab[0][0], slab[0][1], slab[0][2], slab[0][3]
                    )
                    if config["pyramid"]["mask"]:
                        to_slab_path_mask = to_pyramid.get_slab_path_from_infos(
                            SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3]
                        )

                    # Ecriture des commandes dans les todo-lists
                    if len(process) == 1:
                        if slab[1]["root"] in used_pyramids_roots:
                            root_index = used_pyramids_roots[slab[1]["root"]]
                        else:
                            used_pyramids_count += 1
                            root_index = used_pyramids_count
                            used_pyramids_roots[slab[1]["root"]] = used_pyramids_count

                        next(round_robin).write(
                            f"link {to_slab_path} {from_slab_path} {root_index}\n"
                        )
                        if config["pyramid"]["mask"]:
                            if mask[0] != "":
                                next(round_robin).write(
                                    f"link {to_slab_path_mask} {mask[0]} {root_index}\n"
                                )
                    else:
                        command = ""
                        for j in range(len(process)):
                            command += f"c2w {process[j]}\n"
                            if config["process"]["mask"]:
                                if mask[j] != "":
                                    command += f"c2w {mask[j]}\n"
                        command += "oNt\n"
                        command += f"w2c {to_slab_path}\n"
                        if config["pyramid"]["mask"]:
                            command += f"w2c {to_slab_path_mask}\n"
                        next(round_robin).write(command)

    for root in used_pyramids_roots:
        temp_finisher_todo.write(f"{used_pyramids_roots[root]}={root}\n")

    # Copie des listes de recopies à l'emplacement partagé (peut être du stockage objet)
    try:
        for i in range(0, config["process"]["parallelization"]):
            tmp = temp_agent_todos[i]
            tmp.close()
            storage.copy(
                f"file://{tmp.name}",
                os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
            )
            storage.remove(f"file://{tmp.name}")

        temp_finisher_todo.close()
        storage.copy(
            f"file://{temp_finisher_todo.name}",
            os.path.join(config["process"]["directory"], f"todo.finisher.list"),
        )
        storage.remove(f"file://{temp_finisher_todo.name}")

    except Exception as e:
        raise Exception(f"Cannot copy todo lists to final location and clean: {e}")