Module rok4_tools.pyr2pyr_utils.master
Expand source code
import itertools
import logging
import os
import tempfile
from typing import Dict, List, Tuple, Union
from rok4 import storage
from rok4.pyramid import Pyramid
"""Todo list instructions
* cp <source slab> <destination slab> [<md5>] - Make a copy from source to destination. If a MD5 hash is present, it is calculate again afetr copy and have to be equal
"""
def work(config: Dict) -> None:
"""Master steps : prepare and split copies to do
Load the input pyramid from the descriptor and write the todo lists, splitting the copies to do
Args:
config (Dict): PYR2PYR configuration
Raises:
Exception: Cannot load the input or the output pyramid
Exception: Cannot write temporary todo lists
storageError: Cannot read source pyramid list
MissingEnvironmentError: Missing object storage informations
"""
# Chargement de la pyramide à recopier
try:
from_pyramid = Pyramid.from_descriptor(config["from"]["descriptor"])
except Exception as e:
raise Exception(f"Cannot load source pyramid descriptor: {e}")
# Chargement de la pyramide à écrire
try:
to_pyramid = Pyramid.from_other(from_pyramid, config["to"]["name"], config["to"]["storage"])
except Exception as e:
raise Exception(
f"Cannot create the destination pyramid descriptor from the source one: {e}"
)
# Ouverture des flux vers les listes de recopies à faire
file_objects = []
try:
for i in range(0, config["process"]["parallelization"]):
tmp = tempfile.NamedTemporaryFile(mode="w", delete=False)
file_objects.append(tmp)
except Exception as e:
raise Exception(f"Cannot open stream to write todo lists: {e}")
round_robin = itertools.cycle(file_objects)
for (slab_type, level, column, row), infos in from_pyramid.list_generator():
# On traite une dalle
if infos["link"] is True and not config["process"]["follow_links"]:
# On ne veut pas traiter les dalles symboliques, et c'en est une
continue
from_slab_path = storage.get_path_from_infos(
from_pyramid.storage_type, infos["root"], infos["slab"]
)
if (
config["process"]["slab_limit"] != 0
and storage.get_size(from_slab_path) < config["process"]["slab_limit"]
):
logging.debug(f"Slab {from_slab_path} too small, skip it")
continue
to_slab_path = to_pyramid.get_slab_path_from_infos(slab_type, level, column, row)
if infos["md5"] is None:
next(round_robin).write(f"cp {from_slab_path} {to_slab_path}\n")
else:
next(round_robin).write(f"cp {from_slab_path} {to_slab_path} {infos['md5']}\n")
# Copie des listes de recopies à l'emplacement partagé (peut être du stockage objet)
try:
for i in range(0, config["process"]["parallelization"]):
tmp = file_objects[i]
tmp.close()
storage.copy(
f"file://{tmp.name}",
os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
)
storage.remove(f"file://{tmp.name}")
except Exception as e:
raise Exception(f"Cannot copy todo lists to final location and clean: {e}")
Functions
def work(config: Dict[~KT, ~VT]) ‑> None
-
Master steps : prepare and split copies to do
Load the input pyramid from the descriptor and write the todo lists, splitting the copies to do
Args
config
:Dict
- PYR2PYR configuration
Raises
Exception
- Cannot load the input or the output pyramid
Exception
- Cannot write temporary todo lists
storageError
- Cannot read source pyramid list
MissingEnvironmentError
- Missing object storage informations
Expand source code
def work(config: Dict) -> None: """Master steps : prepare and split copies to do Load the input pyramid from the descriptor and write the todo lists, splitting the copies to do Args: config (Dict): PYR2PYR configuration Raises: Exception: Cannot load the input or the output pyramid Exception: Cannot write temporary todo lists storageError: Cannot read source pyramid list MissingEnvironmentError: Missing object storage informations """ # Chargement de la pyramide à recopier try: from_pyramid = Pyramid.from_descriptor(config["from"]["descriptor"]) except Exception as e: raise Exception(f"Cannot load source pyramid descriptor: {e}") # Chargement de la pyramide à écrire try: to_pyramid = Pyramid.from_other(from_pyramid, config["to"]["name"], config["to"]["storage"]) except Exception as e: raise Exception( f"Cannot create the destination pyramid descriptor from the source one: {e}" ) # Ouverture des flux vers les listes de recopies à faire file_objects = [] try: for i in range(0, config["process"]["parallelization"]): tmp = tempfile.NamedTemporaryFile(mode="w", delete=False) file_objects.append(tmp) except Exception as e: raise Exception(f"Cannot open stream to write todo lists: {e}") round_robin = itertools.cycle(file_objects) for (slab_type, level, column, row), infos in from_pyramid.list_generator(): # On traite une dalle if infos["link"] is True and not config["process"]["follow_links"]: # On ne veut pas traiter les dalles symboliques, et c'en est une continue from_slab_path = storage.get_path_from_infos( from_pyramid.storage_type, infos["root"], infos["slab"] ) if ( config["process"]["slab_limit"] != 0 and storage.get_size(from_slab_path) < config["process"]["slab_limit"] ): logging.debug(f"Slab {from_slab_path} too small, skip it") continue to_slab_path = to_pyramid.get_slab_path_from_infos(slab_type, level, column, row) if infos["md5"] is None: next(round_robin).write(f"cp {from_slab_path} {to_slab_path}\n") else: next(round_robin).write(f"cp {from_slab_path} {to_slab_path} {infos['md5']}\n") # Copie des listes de recopies à l'emplacement partagé (peut être du stockage objet) try: for i in range(0, config["process"]["parallelization"]): tmp = file_objects[i] tmp.close() storage.copy( f"file://{tmp.name}", os.path.join(config["process"]["directory"], f"todo.{i+1}.list"), ) storage.remove(f"file://{tmp.name}") except Exception as e: raise Exception(f"Cannot copy todo lists to final location and clean: {e}")